Files
rpa_vision_v3/tests/unit/test_dashboard_routes.py
Dom ec1fb81054
Some checks failed
tests / Lint (ruff + black) (push) Failing after 1m46s
tests / Tests unitaires (sans GPU) (push) Failing after 2m0s
tests / Tests sécurité (critique) (push) Has been skipped
fix(dashboard,worker): vérité produit P0 — dashboard+worker+VWB export
War-room clôture DGX 2026-06-18 (recadrage Dom : graphe/apprentissage/mémoire/dashboard = surface produit P0).
Le dashboard et le statut worker affichaient des états faux ; corrige pour refléter la vérité du produit.

- dashboard FAISS: distingue index brut / metadata HMAC invalide / runtime / absent (plus de faux "inactif")
- dashboard process-mining: 503 explicite missing_dependency (plus de message trompeur)
- dashboard /api/workflows + system/status: lecture DB VWB v3 canonique (total réel = 24, plus de 0)
- worker /processing/status: véridique (lit _worker_health.json) + statut "idle/armé (lazy)" distinct de "dégradé (échec)"
- VWB export: N steps -> N actions/edges (dernière action n'est plus perdue)
- tests: dashboard routes, worker status truthfulness, export VWB

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-18 17:50:12 +02:00

391 lines
14 KiB
Python

"""
Tests pour le dashboard web RPA Vision V3.
Vérifie que les routes principales répondent correctement
et que le template se rend sans erreur.
"""
import sys
from pathlib import Path
import pytest
# Ajouter le répertoire racine au path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
import web_dashboard.app as dashboard_app
from web_dashboard.app import app
@pytest.fixture
def client():
"""Client de test Flask."""
app.config['TESTING'] = True
with app.test_client() as c:
yield c
class TestDashboardRoutes:
"""Tests des routes du dashboard."""
def test_index_renders(self, client):
"""La page d'accueil se rend correctement."""
resp = client.get('/')
assert resp.status_code == 200
assert b'RPA Vision V3' in resp.data
def test_healthz(self, client):
"""Le healthcheck retourne OK."""
resp = client.get('/healthz')
assert resp.status_code == 200
data = resp.get_json()
assert data['status'] == 'ok'
def test_system_status(self, client):
"""L'API system/status retourne les compteurs."""
resp = client.get('/api/system/status')
assert resp.status_code == 200
data = resp.get_json()
assert 'sessions_count' in data
assert 'workflows_count' in data
def test_system_performance(self, client):
"""L'API system/performance retourne les metriques."""
resp = client.get('/api/system/performance')
assert resp.status_code == 200
data = resp.get_json()
assert 'faiss' in data
def test_knowledge_base_stats_include_competences(self, client):
"""La base de connaissances expose les competences supervisees."""
resp = client.get('/api/knowledge-base/stats')
assert resp.status_code == 200
data = resp.get_json()
assert 'competences' in data
assert 'items' in data['competences']
def test_knowledge_base_page_includes_test_safety_guards(self, client):
"""Le bouton Tester embarque les garde-fous Win+R et evidence vide."""
resp = client.get('/knowledge-base')
assert resp.status_code == 200
html = resp.get_data(as_text=True)
assert 'confirmRunDialogReplay' in html
assert 'peut ouvrir Win+R / Exécuter' in html
assert 'hasReplayEvidence' in html
assert 'Verdict valide refusé' in html
def test_dashboard_replay_competence_proxy(self, client, monkeypatch):
"""Le dashboard lance un replay competence supervise via streaming."""
calls = []
def fake_streaming(method, path, *, payload=None, timeout=10):
calls.append({
'method': method,
'path': path,
'payload': payload,
'timeout': timeout,
})
return dashboard_app.jsonify({
'success': True,
'status': 'started',
'replay': {'replay_id': 'replay_free_test'},
}), 200
monkeypatch.setattr(
dashboard_app,
'_dashboard_streaming_json_request',
fake_streaming,
)
resp = client.post(
'/api/v1/lea/competences/key_win_r_wait_explorer_exe/replay',
json={},
)
assert resp.status_code == 200
assert resp.get_json()['replay']['replay_id'] == 'replay_free_test'
assert calls == [{
'method': 'POST',
'path': '/api/v1/lea/competences/key_win_r_wait_explorer_exe/replay',
'payload': {
'supervised': True,
'start_replay': True,
'session_id': '',
},
'timeout': 30,
}]
def test_streaming_status_snapshot_aggregates_existing_endpoints(self, monkeypatch):
"""Le proxy legacy /status agrège les endpoints streaming reels."""
calls = []
def fake_fetch(endpoint, query_string=''):
calls.append((endpoint, query_string))
if endpoint == 'stats':
return {'active_sessions': 1, 'total_events': 7}
if endpoint == 'sessions':
return {'sessions': [{'session_id': 's1'}]}
if endpoint == 'processing/status':
return {'status': 'degraded', 'components_ready': False}
if endpoint == 'replays':
return {'replays': [{'replay_id': 'r1', 'active': True}]}
raise AssertionError(endpoint)
monkeypatch.setattr(dashboard_app, '_fetch_streaming_json', fake_fetch)
snapshot = dashboard_app._streaming_status_snapshot()
assert snapshot['active_sessions'] == 1
assert snapshot['sessions'] == [{'session_id': 's1'}]
assert snapshot['processing']['status'] == 'degraded'
assert snapshot['replay']['replay_id'] == 'r1'
assert calls == [
('stats', ''),
('sessions', ''),
('processing/status', ''),
('replays', ''),
]
def test_dashboard_submit_competence_verdict(self, client, monkeypatch):
"""Le dashboard journalise un verdict sans write-back YAML."""
import core.competences.verdicts as verdicts_module
def fake_store(competence_id, payload):
assert competence_id == 'key_win_r_wait_explorer_exe'
assert payload['verdict_kind'] == 'valid'
return {
'verdict_id': payload['verdict_id'],
'competence_id': competence_id,
'verdict_kind': 'valid',
'duplicate': False,
'write_back_enabled': False,
'yaml_write': False,
}
monkeypatch.setattr(verdicts_module, 'store_competence_verdict', fake_store)
resp = client.post(
'/api/v1/lea/competences/key_win_r_wait_explorer_exe/verdict',
json={
'verdict_id': '123e4567-e89b-42d3-a456-426614174000',
'verdict_kind': 'valid',
'workflow_id': 'free_task:test',
'step_results': [{'step_id': 's1', 'status': 'success'}],
'context_signature': {'machine_id': 'DESKTOP-58D5CAC_windows'},
},
)
assert resp.status_code == 201
data = resp.get_json()
assert data['success'] is True
assert data['write_back_enabled'] is False
assert data['yaml_write'] is False
def test_version(self, client):
"""L'API version retourne la version actuelle."""
resp = client.get('/api/version')
assert resp.status_code == 200
data = resp.get_json()
assert 'version' in data
# version est un dict avec la clé 'version' (string)
assert 'version' in data['version']
def test_version_system_info(self, client):
"""L'API version/system-info retourne les infos systeme."""
resp = client.get('/api/version/system-info')
assert resp.status_code == 200
data = resp.get_json()
assert 'system_info' in data
si = data['system_info']
assert 'system' in si
assert 'python_version' in si['system']
def test_version_backups(self, client):
"""L'API version/backups retourne la liste."""
resp = client.get('/api/version/backups')
assert resp.status_code == 200
data = resp.get_json()
assert 'backups' in data
assert isinstance(data['backups'], list)
def test_services_list(self, client):
"""L'API services retourne la liste des services."""
resp = client.get('/api/services')
assert resp.status_code == 200
data = resp.get_json()
assert 'services' in data
services = data['services']
assert len(services) >= 5 # Au moins 5 services configurés
# Vérifier que le dashboard est dans la liste
ids = [s['service_id'] for s in services]
assert 'web_dashboard' in ids
def test_config_get(self, client):
"""L'API config retourne la configuration."""
resp = client.get('/api/config')
assert resp.status_code == 200
data = resp.get_json()
assert data['success'] is True
assert 'config' in data
def test_backup_stats(self, client):
"""L'API backup/stats retourne les statistiques."""
resp = client.get('/api/backup/stats')
assert resp.status_code == 200
data = resp.get_json()
assert 'stats' in data
stats = data['stats']
assert 'workflows' in stats
def test_workflows_list(self, client):
"""L'API workflows retourne la liste."""
resp = client.get('/api/workflows')
assert resp.status_code == 200
data = resp.get_json()
assert 'workflows' in data
def test_workflows_list_reads_vwb_db(self, client, monkeypatch, tmp_path):
"""Régression red-gate : /api/workflows reflète la base VWB v3, pas 0.
Avant correctif l'endpoint globait un store JSON vide et renvoyait
toujours total:0. On construit une DB VWB minimale (schéma canonique
workflows + steps) et on vérifie que l'endpoint expose le compte réel.
"""
import sqlite3
from pathlib import Path
db_path = tmp_path / "instance" / "workflows.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"CREATE TABLE workflows (id VARCHAR(64) PRIMARY KEY, name VARCHAR(255), "
"description TEXT, created_at DATETIME, updated_at DATETIME, "
"is_active BOOLEAN, source VARCHAR(64), review_status VARCHAR(32))"
)
conn.execute(
"CREATE TABLE steps (id VARCHAR(64) PRIMARY KEY, workflow_id VARCHAR(64), "
"action_type VARCHAR(64))"
)
conn.execute(
"INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)",
("wf_aiva", "Urgence_aiva_demo", "demo", "2026-06-01", "2026-06-18",
1, "manual", ""),
)
conn.execute(
"INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)",
("wf_learned", "Learned_flow", "", "2026-06-02", "2026-06-17",
1, "learned_import", "pending"),
)
# 3 steps pour wf_aiva → nodes_count attendu = 3
for i in range(3):
conn.execute(
"INSERT INTO steps VALUES (?,?,?)", (f"s{i}", "wf_aiva", "click")
)
conn.commit()
conn.close()
monkeypatch.setattr(dashboard_app, "VWB_DB_PATH", Path(db_path))
resp = client.get('/api/workflows')
assert resp.status_code == 200
data = resp.get_json()
assert data['total'] == 2, f"attendu 2 workflows, obtenu {data['total']}"
names = {w['name'] for w in data['workflows']}
assert 'Urgence_aiva_demo' in names
aiva = next(w for w in data['workflows'] if w['name'] == 'Urgence_aiva_demo')
assert aiva['nodes_count'] == 3
assert aiva['source'] == 'manual'
def test_sessions_list(self, client):
"""L'API sessions retourne la liste."""
resp = client.get('/api/agent/sessions')
assert resp.status_code == 200
data = resp.get_json()
assert 'sessions' in data
def test_tests_list_removed(self, client):
"""L'API /api/tests a été retirée (RCE via subprocess)."""
resp = client.get('/api/tests')
assert resp.status_code == 404
def test_logs(self, client):
"""L'API logs retourne les logs."""
resp = client.get('/api/logs')
assert resp.status_code == 200
data = resp.get_json()
assert 'logs' in data
def test_chains(self, client):
"""L'API chains retourne la liste."""
resp = client.get('/api/chains')
assert resp.status_code == 200
data = resp.get_json()
assert 'chains' in data
def test_triggers(self, client):
"""L'API triggers retourne la liste."""
resp = client.get('/api/triggers')
assert resp.status_code == 200
data = resp.get_json()
assert 'triggers' in data
def test_automation_status_removed(self, client):
"""L'API /api/automation/status a été retirée."""
resp = client.get('/api/automation/status')
assert resp.status_code == 404
def test_metrics_endpoint(self, client):
"""L'endpoint Prometheus /metrics fonctionne."""
resp = client.get('/metrics')
assert resp.status_code == 200
def test_no_rollback_route(self, client):
"""La route /api/version/rollback n'existe pas (non implementee)."""
resp = client.post('/api/version/rollback/test-id')
assert resp.status_code == 404 or resp.status_code == 405
class TestRemovedRoutes:
"""Vérifie que les routes supprimées retournent 404."""
def test_gestures_page_removed(self, client):
"""La page /gestures a été retirée."""
resp = client.get('/gestures')
assert resp.status_code == 404
def test_api_gestures_removed(self, client):
"""L'API /api/gestures a été retirée."""
resp = client.get('/api/gestures')
assert resp.status_code == 404
def test_streaming_page_removed(self, client):
"""La page /streaming a été retirée."""
resp = client.get('/streaming')
assert resp.status_code == 404
def test_extractions_page_removed(self, client):
"""La page /extractions a été retirée."""
resp = client.get('/extractions')
assert resp.status_code == 404
def test_api_extractions_removed(self, client):
"""L'API /api/extractions a été retirée."""
resp = client.get('/api/extractions')
assert resp.status_code == 404
def test_chat_page_removed(self, client):
"""La page /chat a été retirée."""
resp = client.get('/chat')
assert resp.status_code == 404
class TestFleetProxy:
"""Tests du proxy fleet (requiert serveur streaming, donc 502 attendu)."""
def test_fleet_list_proxy(self, client):
"""Le proxy /api/fleet/fleet retourne 200, 401 ou 502 (serveur offline/auth)."""
resp = client.get('/api/fleet/fleet')
# 200 = ok, 401 = streaming server rejette le token, 502 = serveur offline
assert resp.status_code in (200, 401, 502)
data = resp.get_json()
assert isinstance(data, dict)