From ec1fb81054e9ceb55384308ad7d3693ae43215ab Mon Sep 17 00:00:00 2001 From: Dom Date: Thu, 18 Jun 2026 17:50:12 +0200 Subject: [PATCH] =?UTF-8?q?fix(dashboard,worker):=20v=C3=A9rit=C3=A9=20pro?= =?UTF-8?q?duit=20P0=20=E2=80=94=20dashboard+worker+VWB=20export?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit War-room clôture DGX 2026-06-18 (recadrage Dom : graphe/apprentissage/mémoire/dashboard = surface produit P0). Le dashboard et le statut worker affichaient des états faux ; corrige pour refléter la vérité du produit. - dashboard FAISS: distingue index brut / metadata HMAC invalide / runtime / absent (plus de faux "inactif") - dashboard process-mining: 503 explicite missing_dependency (plus de message trompeur) - dashboard /api/workflows + system/status: lecture DB VWB v3 canonique (total réel = 24, plus de 0) - worker /processing/status: véridique (lit _worker_health.json) + statut "idle/armé (lazy)" distinct de "dégradé (échec)" - VWB export: N steps -> N actions/edges (dernière action n'est plus perdue) - tests: dashboard routes, worker status truthfulness, export VWB Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/api_stream.py | 108 +++++++- tests/integration/test_pause_for_human.py | 6 +- tests/integration/test_stream_processor.py | 155 +++++++++++ tests/integration/test_t2a_extract.py | 2 + tests/unit/test_dashboard_routes.py | 83 ++++++ .../services/learned_workflow_bridge.py | 18 +- web_dashboard/app.py | 244 +++++++++++++++--- web_dashboard/templates/index.html | 66 ++++- 8 files changed, 626 insertions(+), 56 deletions(-) diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 90ff9b39a..aa620853b 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -555,6 +555,7 @@ LIVE_SESSIONS_DIR.mkdir(parents=True, exist_ok=True) _DATA_DIR = ROOT_DIR / "data" / "training" WORKER_QUEUE_FILE = _DATA_DIR / "_worker_queue.txt" REPLAY_LOCK_FILE = _DATA_DIR / "_replay_active.lock" +WORKER_HEALTH_FILE = _DATA_DIR / "_worker_health.json" # Instance globale partagée (le StreamProcessor reste dans le serveur HTTP # pour le CLIP, l'indexation FAISS, la gestion des sessions, le replay — @@ -807,7 +808,7 @@ def _memory_window_title_for_action(action_meta: Dict[str, Any]) -> str: def _get_worker_queue_status() -> Dict[str, Any]: - """Retourne l'état de la queue du worker VLM (pour le monitoring).""" + """Retourne l'état réel de la queue et du worker VLM (pour le monitoring).""" queue = [] if WORKER_QUEUE_FILE.exists(): try: @@ -819,16 +820,108 @@ def _get_worker_queue_status() -> Dict[str, Any]: except Exception: pass + health = None + health_error = None + health_age_seconds = None + if WORKER_HEALTH_FILE.exists(): + try: + health = json.loads(WORKER_HEALTH_FILE.read_text(encoding="utf-8")) + health_age_seconds = max(0.0, time.time() - WORKER_HEALTH_FILE.stat().st_mtime) + except Exception as exc: + health_error = str(exc) + + health_stale = health_age_seconds is None or health_age_seconds > 180 + components = (health or {}).get("components") or {} + components_ready = bool(components) and all(bool(v) for v in components.values()) + health_status = (health or {}).get("status") + running = bool(health) and not health_stale and health_status != "stopped" + + # Distinction VEILLE (armé, lazy) vs DÉGRADÉ (vrai échec). + # + # Les composants lourds (ScreenAnalyzer/CLIP/FAISS/StateEmbedding) sont + # chargés en lazy par run_worker : le processor n'est instancié qu'au + # premier _process_session (cf. run_worker._get_processor / _process_session). + # Un worker neuf qui n'a jamais reçu de session écrit donc status="healthy" + # avec tous les composants à false — c'est l'état NORMAL « en veille », pas + # une panne. L'étiqueter "degraded" fait lire une panne là où il n'y en a pas. + # + # Signal retenu pour « init jamais tentée » : TOUS les composants à false ET + # sessions_processed == 0 ET sessions_failed == 0. Justification : run_worker + # n'appelle _get_processor() (donc l'init lazy) que dans _process_session, qui + # incrémente toujours exactement un compteur (processed / failed / skipped). + # Tant que processed == 0 ET failed == 0, aucune session n'a déclenché une + # init suivie d'un traitement — le worker est armé en attente. Un simple skip + # (dossier/shots absents) passe quand même par _get_processor() : les + # composants se chargent, donc tous-à-false devient faux et on n'entre pas ici. + # run_worker._health_components() écrit toujours les 4 clés (jamais un dict + # vide), d'où le test sur les VALEURS et non sur la présence des clés. + # Si run_worker a lui-même forcé status="degraded" (VLM + ScreenAnalyzer + # absent, cf. run_worker._write_health), c'est un VRAI échec : on le conserve. + stats = (health or {}).get("stats") or {} + init_attempted = bool(stats.get("sessions_processed", 0)) or bool( + stats.get("sessions_failed", 0) + ) + components_all_false = bool(components) and not any( + bool(v) for v in components.values() + ) + armed = ( + running + and not components_ready + and health_status == "healthy" + and components_all_false # aucun composant lourd encore chargé + and not init_attempted + ) + + status = health_status or "unknown" + if not running: + status = "stale" if health else "unknown" + elif armed: + # En veille : worker sain, composants chargés à la 1re session. + status = "idle" + elif not components_ready: + status = "degraded" + return { - "running": True, # On ne sait pas si le worker process tourne, mais la queue existe + "running": running, + "status": status, + "armed": armed, "queue_length": len(queue), "queue": queue, "replay_lock_active": REPLAY_LOCK_FILE.exists(), "queue_file": str(WORKER_QUEUE_FILE), - "note": "Le worker VLM tourne dans un process séparé (run_worker.py)", + "health_file": str(WORKER_HEALTH_FILE), + "health_error": health_error, + "health_age_seconds": health_age_seconds, + "health_stale": health_stale, + "worker_pid": (health or {}).get("pid"), + "last_cycle": (health or {}).get("last_cycle"), + "current_session": (health or {}).get("current_session"), + "components": components, + "components_ready": components_ready, + "processing_ready": running and not REPLAY_LOCK_FILE.exists() and components_ready, + "status_hint": _worker_status_hint(status, armed), + "stats": stats, + "note": "Le worker VLM tourne dans un process séparé (agent_v0.server_v1.run_worker).", } +def _worker_status_hint(status: str, armed: bool) -> str: + """Message humain pour le statut worker (consommé par le dashboard).""" + if armed or status == "idle": + return "En veille — composants chargés à la 1re session." + if status == "degraded": + return "Worker apprentissage dégradé — init des composants en échec." + if status == "stale": + return "Health file périmé (> 180s) — worker peut-être arrêté." + if status == "stopped": + return "Worker arrêté." + if status == "busy": + return "Traitement d'une session en cours." + if status == "healthy": + return "Worker prêt — composants chargés." + return "État worker inconnu." + + # ========================================================================= # Compteur d'analyses en cours par session (pour attendre avant finalize) # ========================================================================= @@ -1614,13 +1707,14 @@ async def startup(): threading.Thread(target=_smoke_model_health, name="model-health-smoke", daemon=True).start() - # Afficher le token API au démarrage pour que l'utilisateur puisse configurer l'agent + # Ne jamais imprimer le token complet dans journald/stdout. _token_source = "env RPA_API_TOKEN" if os.environ.get("RPA_API_TOKEN") else "auto-généré" - logger.info(f"API Token ({_token_source}): {API_TOKEN}") + _token_hint = f"{API_TOKEN[:8]}…{API_TOKEN[-4:]}" if API_TOKEN else "" + logger.info("API Token (%s): %s — auth Bearer obligatoire", _token_source, _token_hint) print(f"\n{'='*60}") print(f" API Token ({_token_source}):") - print(f" {API_TOKEN}") - print(f" Configurer l'agent : export RPA_API_TOKEN={API_TOKEN}") + print(f" {_token_hint} (masqué)") + print(" Configurer l'agent via .env.local ou l'enrollment; ne pas copier depuis les logs.") print(f"{'='*60}\n") worker.start(blocking=False) diff --git a/tests/integration/test_pause_for_human.py b/tests/integration/test_pause_for_human.py index 47ffb1a68..a7d2317d4 100644 --- a/tests/integration/test_pause_for_human.py +++ b/tests/integration/test_pause_for_human.py @@ -119,8 +119,10 @@ def test_export_vwb_workflow_with_pause_step(): ] core = convert_vwb_to_core_workflow(workflow_data, steps_data) assert core["learning_state"] == "COACHING" - assert len(core["nodes"]) == 3 - assert len(core["edges"]) == 2 + assert len(core["nodes"]) == 4 + assert len(core["edges"]) == 3 + assert core["edges"][-1]["action"]["type"] == "mouse_click" + assert core["nodes"][-1]["is_end"] is True # L'edge sortant du node de pause doit avoir le bon type + message pause_edges = [ diff --git a/tests/integration/test_stream_processor.py b/tests/integration/test_stream_processor.py index 660187901..344e614cb 100644 --- a/tests/integration/test_stream_processor.py +++ b/tests/integration/test_stream_processor.py @@ -1289,3 +1289,158 @@ class TestAPIEndpoints: assert len(workflows) == 1 assert workflows[0]["workflow_id"] == "wf_api_001" assert workflows[0]["nodes"] == 2 + + +class TestWorkerStatusTruthfulness: + """Truthfulness du statut worker exposé par _get_worker_queue_status. + + Distingue VEILLE (armé, lazy : worker neuf qui n'a jamais traité de + session, composants chargés à la 1re session) de DÉGRADÉ (init tentée + et en échec). Un worker en veille ne doit JAMAIS être étiqueté 'degraded'. + """ + + # Même contrainte que TestAPIEndpoints : api_stream fail-closed à l'import + # si RPA_API_TOKEN absent. + _TEST_API_TOKEN = "test_token_for_worker_status_0123456789abcdef" + + @pytest.fixture(autouse=True) + def _ensure_api_token(self, monkeypatch): + monkeypatch.setenv("RPA_API_TOKEN", self._TEST_API_TOKEN) + + @pytest.fixture + def status_env(self, tmp_path, monkeypatch): + """Isole les fichiers worker (health/queue/lock) sur tmp_path.""" + from agent_v0.server_v1 import api_stream + + health_file = tmp_path / "_worker_health.json" + queue_file = tmp_path / "_worker_queue.txt" + lock_file = tmp_path / "_replay_active.lock" + monkeypatch.setattr(api_stream, "WORKER_HEALTH_FILE", health_file) + monkeypatch.setattr(api_stream, "WORKER_QUEUE_FILE", queue_file) + monkeypatch.setattr(api_stream, "REPLAY_LOCK_FILE", lock_file) + return api_stream, health_file + + @staticmethod + def _write_health(health_file, **overrides): + """Écrit un health file frais (mtime récent => non stale).""" + payload = { + "pid": 1234, + "started_at": "2026-06-18T10:00:00", + "last_cycle": "2026-06-18T10:00:30", + "current_session": None, + "queue_length": 0, + "components": { + "screen_analyzer": False, + "clip_embedder": False, + "faiss_manager": False, + "state_embedding_builder": False, + }, + "stats": { + "sessions_processed": 0, + "sessions_failed": 0, + "sessions_skipped": 0, + "total_screenshots_analyzed": 0, + }, + "status": "healthy", + } + payload.update(overrides) + health_file.write_text(json.dumps(payload), encoding="utf-8") + + def test_fresh_worker_is_idle_not_degraded(self, status_env): + """Worker neuf : healthy, 0 session, tous composants false + => statut 'idle' (en veille / armé), PAS 'degraded'.""" + api_stream, health_file = status_env + self._write_health(health_file) # défaut = état neuf + + status = api_stream._get_worker_queue_status() + + assert status["running"] is True + assert status["status"] == "idle", status + assert status["armed"] is True + assert status["components_ready"] is False + # processing_ready reste False tant que les composants ne sont pas chargés + assert status["processing_ready"] is False + assert "veille" in status["status_hint"].lower() + + def test_worker_init_failed_is_degraded(self, status_env): + """Init tentée et en échec : run_worker force status='degraded' + (VLM + ScreenAnalyzer absent) => on conserve 'degraded'.""" + api_stream, health_file = status_env + self._write_health( + health_file, + status="degraded", # forcé par run_worker._write_health + components={ + "screen_analyzer": False, + "clip_embedder": True, + "faiss_manager": True, + "state_embedding_builder": False, + }, + stats={ + "sessions_processed": 0, + "sessions_failed": 1, # une session a tenté l'init et échoué + "sessions_skipped": 0, + "total_screenshots_analyzed": 0, + }, + ) + + status = api_stream._get_worker_queue_status() + + assert status["running"] is True + assert status["status"] == "degraded", status + assert status["armed"] is False + assert status["processing_ready"] is False + assert "dégradé" in status["status_hint"].lower() + + def test_worker_partial_components_after_attempt_is_degraded(self, status_env): + """Composants partiels après tentative de traitement (sessions_failed>0), + sans status forcé par le worker => 'degraded' (pas 'idle').""" + api_stream, health_file = status_env + self._write_health( + health_file, + status="healthy", + components={ + "screen_analyzer": True, + "clip_embedder": True, + "faiss_manager": False, # un composant manquant + "state_embedding_builder": True, + }, + stats={ + "sessions_processed": 0, + "sessions_failed": 2, + "sessions_skipped": 0, + "total_screenshots_analyzed": 0, + }, + ) + + status = api_stream._get_worker_queue_status() + + assert status["status"] == "degraded", status + assert status["armed"] is False + + def test_worker_ready_after_processing_is_healthy(self, status_env): + """Worker ayant traité au moins une session, tous composants chargés + => 'healthy' et processing_ready=True.""" + api_stream, health_file = status_env + self._write_health( + health_file, + status="healthy", + components={ + "screen_analyzer": True, + "clip_embedder": True, + "faiss_manager": True, + "state_embedding_builder": True, + }, + stats={ + "sessions_processed": 3, + "sessions_failed": 0, + "sessions_skipped": 0, + "total_screenshots_analyzed": 42, + }, + ) + + status = api_stream._get_worker_queue_status() + + assert status["status"] == "healthy", status + assert status["armed"] is False + assert status["components_ready"] is True + assert status["processing_ready"] is True diff --git a/tests/integration/test_t2a_extract.py b/tests/integration/test_t2a_extract.py index 0ffe44148..e40da5f8b 100644 --- a/tests/integration/test_t2a_extract.py +++ b/tests/integration/test_t2a_extract.py @@ -296,9 +296,11 @@ def test_export_workflow_with_t2a_chain(): ] core = convert_vwb_to_core_workflow(workflow_data, steps_data) edge_types = [e["action"]["type"] for e in core["edges"]] + assert len(core["edges"]) == len(steps_data) assert "extract_text" in edge_types assert "t2a_decision" in edge_types assert "pause_for_human" in edge_types + assert edge_types[-1] == "mouse_click" # Vérifier que le templating est bien transporté t2a_edge = next(e for e in core["edges"] if e["action"]["type"] == "t2a_decision") assert t2a_edge["action"]["parameters"]["input_template"] == "{{dpi}}" diff --git a/tests/unit/test_dashboard_routes.py b/tests/unit/test_dashboard_routes.py index 3f8f0528c..a202bd676 100644 --- a/tests/unit/test_dashboard_routes.py +++ b/tests/unit/test_dashboard_routes.py @@ -114,6 +114,37 @@ class TestDashboardRoutes: 'timeout': 30, }] + def test_streaming_status_snapshot_aggregates_existing_endpoints(self, monkeypatch): + """Le proxy legacy /status agrège les endpoints streaming reels.""" + calls = [] + + def fake_fetch(endpoint, query_string=''): + calls.append((endpoint, query_string)) + if endpoint == 'stats': + return {'active_sessions': 1, 'total_events': 7} + if endpoint == 'sessions': + return {'sessions': [{'session_id': 's1'}]} + if endpoint == 'processing/status': + return {'status': 'degraded', 'components_ready': False} + if endpoint == 'replays': + return {'replays': [{'replay_id': 'r1', 'active': True}]} + raise AssertionError(endpoint) + + monkeypatch.setattr(dashboard_app, '_fetch_streaming_json', fake_fetch) + + snapshot = dashboard_app._streaming_status_snapshot() + + assert snapshot['active_sessions'] == 1 + assert snapshot['sessions'] == [{'session_id': 's1'}] + assert snapshot['processing']['status'] == 'degraded' + assert snapshot['replay']['replay_id'] == 'r1' + assert calls == [ + ('stats', ''), + ('sessions', ''), + ('processing/status', ''), + ('replays', ''), + ] + def test_dashboard_submit_competence_verdict(self, client, monkeypatch): """Le dashboard journalise un verdict sans write-back YAML.""" import core.competences.verdicts as verdicts_module @@ -212,6 +243,58 @@ class TestDashboardRoutes: data = resp.get_json() assert 'workflows' in data + def test_workflows_list_reads_vwb_db(self, client, monkeypatch, tmp_path): + """Régression red-gate : /api/workflows reflète la base VWB v3, pas 0. + + Avant correctif l'endpoint globait un store JSON vide et renvoyait + toujours total:0. On construit une DB VWB minimale (schéma canonique + workflows + steps) et on vérifie que l'endpoint expose le compte réel. + """ + import sqlite3 + from pathlib import Path + + db_path = tmp_path / "instance" / "workflows.db" + db_path.parent.mkdir(parents=True, exist_ok=True) + conn = sqlite3.connect(str(db_path)) + conn.execute( + "CREATE TABLE workflows (id VARCHAR(64) PRIMARY KEY, name VARCHAR(255), " + "description TEXT, created_at DATETIME, updated_at DATETIME, " + "is_active BOOLEAN, source VARCHAR(64), review_status VARCHAR(32))" + ) + conn.execute( + "CREATE TABLE steps (id VARCHAR(64) PRIMARY KEY, workflow_id VARCHAR(64), " + "action_type VARCHAR(64))" + ) + conn.execute( + "INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)", + ("wf_aiva", "Urgence_aiva_demo", "demo", "2026-06-01", "2026-06-18", + 1, "manual", ""), + ) + conn.execute( + "INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)", + ("wf_learned", "Learned_flow", "", "2026-06-02", "2026-06-17", + 1, "learned_import", "pending"), + ) + # 3 steps pour wf_aiva → nodes_count attendu = 3 + for i in range(3): + conn.execute( + "INSERT INTO steps VALUES (?,?,?)", (f"s{i}", "wf_aiva", "click") + ) + conn.commit() + conn.close() + + monkeypatch.setattr(dashboard_app, "VWB_DB_PATH", Path(db_path)) + + resp = client.get('/api/workflows') + assert resp.status_code == 200 + data = resp.get_json() + assert data['total'] == 2, f"attendu 2 workflows, obtenu {data['total']}" + names = {w['name'] for w in data['workflows']} + assert 'Urgence_aiva_demo' in names + aiva = next(w for w in data['workflows'] if w['name'] == 'Urgence_aiva_demo') + assert aiva['nodes_count'] == 3 + assert aiva['source'] == 'manual' + def test_sessions_list(self, client): """L'API sessions retourne la liste.""" resp = client.get('/api/agent/sessions') diff --git a/visual_workflow_builder/backend/services/learned_workflow_bridge.py b/visual_workflow_builder/backend/services/learned_workflow_bridge.py index 34694e704..2177a2031 100644 --- a/visual_workflow_builder/backend/services/learned_workflow_bridge.py +++ b/visual_workflow_builder/backend/services/learned_workflow_bridge.py @@ -480,21 +480,24 @@ def convert_vwb_to_core_workflow( now = datetime.now().isoformat() wf_id = workflow_data.get("id", f"wf_{uuid.uuid4().hex[:12]}") - # Créer les nodes : un node par étape (chaque étape = un état écran) + # Les actions sont portées par les edges. N étapes VWB doivent donc donner + # N edges core, et N+1 états écran (avant chaque action + terminal). nodes = [] edges = [] - for idx, step in enumerate(steps_data): + node_count = len(steps_data) + 1 if steps_data else 0 + for idx in range(node_count): + step = steps_data[idx] if idx < len(steps_data) else {} node_id = f"node_{idx:03d}" action_type = step.get("action_type", "click_anchor") params = step.get("parameters", {}) - label = step.get("label", action_type) + label = step.get("label", action_type) if idx < len(steps_data) else "Fin du workflow" # Créer le node (template minimal) node = { "node_id": node_id, "name": label, - "description": f"Étape {idx + 1} : {label}", + "description": f"Étape {idx + 1} : {label}" if idx < len(steps_data) else "État terminal", "template": { "window": { "title_pattern": params.get("window_title"), @@ -518,7 +521,7 @@ def convert_vwb_to_core_workflow( }, }, "is_entry": idx == 0, - "is_end": idx == len(steps_data) - 1, + "is_end": idx == node_count - 1, "variants": [], "primary_variant_id": None, "max_variants": 5, @@ -529,12 +532,13 @@ def convert_vwb_to_core_workflow( "metadata": { "vwb_step_id": step.get("id", ""), "visual_type": _action_type_to_visual(action_type), + "terminal": idx >= len(steps_data), }, } nodes.append(node) - # Créer l'edge vers le node suivant (sauf pour le dernier) - if idx < len(steps_data) - 1: + # Créer un edge/action pour chaque step VWB, y compris la dernière. + if idx < len(steps_data): next_node_id = f"node_{idx + 1:03d}" # Convertir l'action VWB → action core diff --git a/web_dashboard/app.py b/web_dashboard/app.py index 7ee00c811..9f8f018b8 100644 --- a/web_dashboard/app.py +++ b/web_dashboard/app.py @@ -189,6 +189,20 @@ SESSIONS_PATH = DATA_PATH / "sessions" WORKFLOWS_PATH = DATA_PATH / "workflows" LOGS_PATH = BASE_PATH / "logs" +# Source canonique des workflows (décision produit D3) : la base VWB v3 +# (SQLAlchemy/SQLite) que Léa lit déjà au runtime. Chemin absolu robuste (PAS la +# DB fantôme vide à la racine du repo `instance/workflows.db`, schéma obsolète, +# ni l'ancien store JSON `data/training/workflows/` créé vide sur DGX). +# Surchargeable via RPA_VWB_DB_PATH pour les déploiements atypiques. +def _resolve_vwb_db_path() -> Path: + override = os.getenv("RPA_VWB_DB_PATH", "").strip() + if override: + return Path(override).expanduser() + return BASE_PATH / "visual_workflow_builder" / "backend" / "instance" / "workflows.db" + + +VWB_DB_PATH = _resolve_vwb_db_path() + # StorageManager storage = StorageManager(base_path=str(DATA_PATH)) @@ -261,7 +275,9 @@ def system_status(): """Statut du système.""" try: sessions_count = len(list(SESSIONS_PATH.glob('*'))) if SESSIONS_PATH.exists() else 0 - workflows_count = len(list(WORKFLOWS_PATH.glob('*.json'))) if WORKFLOWS_PATH.exists() else 0 + # Source canonique D3 : base VWB v3 (même comptage que /api/workflows), + # pas l'ancien store JSON `data/training/workflows/` créé vide sur DGX. + workflows_count = len(_load_workflows_from_vwb_db()) dependencies_ok = True try: @@ -296,8 +312,26 @@ def system_performance(): faiss_metadata_path = DATA_PATH / "faiss_index" / "main.metadata" if faiss_index_path.exists() and faiss_metadata_path.exists(): - fm = FAISSManager.load(faiss_index_path, faiss_metadata_path) - faiss_stats = fm.get_stats() + try: + fm = FAISSManager.load(faiss_index_path, faiss_metadata_path) + faiss_stats = fm.get_stats() + faiss_stats.setdefault("status", "active") + faiss_stats.setdefault("metadata_status", "valid") + except Exception as e: + faiss_stats = _read_raw_faiss_index_stats(faiss_index_path) + faiss_stats.update({ + "status": "metadata_invalid", + "metadata_status": "invalid", + "metadata_error": str(e), + "action_required": "re-sign-or-rebuild-faiss-metadata", + }) + elif faiss_index_path.exists(): + faiss_stats = _read_raw_faiss_index_stats(faiss_index_path) + faiss_stats.update({ + "status": "metadata_missing", + "metadata_status": "missing", + "action_required": "rebuild-faiss-metadata", + }) else: faiss_stats = {"total_vectors": 0, "status": "index_not_found"} except Exception as e: @@ -319,6 +353,26 @@ def system_performance(): return jsonify({'error': str(e)}), 500 +def _read_raw_faiss_index_stats(index_path: Path) -> dict: + """Read non-authoritative FAISS stats when signed metadata cannot load.""" + try: + import faiss + index = faiss.read_index(str(index_path)) + return { + "raw_index_available": True, + "total_vectors": int(getattr(index, "ntotal", 0)), + "dimensions": int(getattr(index, "d", 0)), + "is_trained": bool(getattr(index, "is_trained", False)), + "index_type": type(index).__name__.replace("Index", "") or "FAISS", + } + except Exception as e: + return { + "raw_index_available": False, + "total_vectors": 0, + "error": f"Index FAISS illisible: {e}", + } + + @app.route('/api/system/faiss/test', methods=['POST']) def test_faiss_index(): """Teste l'index FAISS avec une recherche aléatoire.""" @@ -785,36 +839,83 @@ def rename_session_workflow(session_id): # API Workflows # ============================================================================= +def _load_workflows_from_vwb_db() -> list: + """Charge les workflows depuis la base VWB v3 (source canonique D3). + + Lit directement le SQLite que Léa interroge au runtime (cf. + `agent_chat/app.py` → `GET /api/v3/session/state`). On compte les `steps` + par workflow pour `nodes_count` (pas de notion d'`edges` en DAG linéaire : + `edges_count` = max(steps-1, 0)). Robuste à l'absence de la DB ou des + colonnes `source`/`review_status` (DB ancienne) : retourne [] sans planter. + """ + import sqlite3 + + if not VWB_DB_PATH.exists(): + return [] + + workflows = [] + conn = sqlite3.connect(str(VWB_DB_PATH)) + try: + conn.row_factory = sqlite3.Row + # Colonnes disponibles (la DB fantôme/ancienne n'a pas source/review_status) + cols = {row[1] for row in conn.execute("PRAGMA table_info(workflows)")} + has_source = 'source' in cols + has_review = 'review_status' in cols + + select_cols = ['id', 'name', 'description', 'created_at', 'updated_at'] + if has_source: + select_cols.append('source') + if has_review: + select_cols.append('review_status') + + # Nombre de steps par workflow (= nodes du DAG) + step_counts = { + row[0]: row[1] + for row in conn.execute( + "SELECT workflow_id, COUNT(*) FROM steps GROUP BY workflow_id" + ) + } + + rows = conn.execute( + f"SELECT {', '.join(select_cols)} FROM workflows ORDER BY updated_at DESC" + ).fetchall() + + for row in rows: + wf_id = row['id'] + nodes_count = step_counts.get(wf_id, 0) + workflows.append({ + 'workflow_id': wf_id, + 'name': row['name'] or wf_id, + 'description': row['description'] or '', + 'nodes_count': nodes_count, + 'edges_count': max(nodes_count - 1, 0), + 'learning_state': 'OBSERVATION', + 'created_at': str(row['created_at'] or ''), + 'updated_at': str(row['updated_at'] or ''), + 'execution_count': 0, + 'source': row['source'] if has_source else 'manual', + 'review_status': row['review_status'] if has_review else '', + 'file_path': f"vwb_db://{wf_id}", + }) + finally: + conn.close() + + return workflows + + @app.route('/api/workflows') def list_workflows(): - """Liste tous les workflows.""" + """Liste tous les workflows depuis la base VWB v3 (source canonique D3). + + Avant ce correctif, l'endpoint globait `data/training/workflows/*.json` + (ancien store JSON, créé vide sur DGX) et renvoyait toujours `total: 0`, + rendant la surface « ce que Léa sait » faussement vide. On lit désormais la + même base SQLite que Léa au runtime. + """ try: - workflows = [] hide_unnamed = request.args.get('hide_unnamed', 'true').lower() == 'true' - if not WORKFLOWS_PATH.exists(): - WORKFLOWS_PATH.mkdir(parents=True, exist_ok=True) - return jsonify({'workflows': [], 'total': 0, 'hidden_unnamed': 0}) - - for wf_file in WORKFLOWS_PATH.glob('*.json'): - try: - with open(wf_file, 'r') as f: - wf_data = json.load(f) - - workflows.append({ - 'workflow_id': wf_data.get('workflow_id', wf_file.stem), - 'name': wf_data.get('name', wf_file.stem), - 'description': wf_data.get('description', ''), - 'nodes_count': len(wf_data.get('nodes', [])), - 'edges_count': len(wf_data.get('edges', [])), - 'learning_state': wf_data.get('learning_state', 'OBSERVATION'), - 'created_at': wf_data.get('created_at', ''), - 'updated_at': wf_data.get('updated_at', ''), - 'execution_count': wf_data.get('execution_count', 0), - 'file_path': str(wf_file) - }) - except Exception as e: - print(f"Erreur lecture workflow {wf_file}: {e}") + workflows = _load_workflows_from_vwb_db() # Filtrer les workflows "Unnamed" si demandé if hide_unnamed: @@ -1970,19 +2071,83 @@ def import_config(): # API Streaming - Proxy vers le serveur de streaming (port 5005) # ============================================================================= -STREAMING_BASE_URL = 'http://localhost:5005/api/v1/traces/stream' +def _normalize_streaming_base_url(raw_url: str) -> str: + """Normalise l'URL du serveur streaming vers le préfixe API attendu.""" + base = (raw_url or 'http://localhost:5005').strip().rstrip('/') + if base.endswith('/api/v1/traces/stream'): + return base + if base.endswith('/api/v1/traces'): + return f'{base}/stream' + return f'{base}/api/v1/traces/stream' + + +STREAMING_BASE_URL = _normalize_streaming_base_url( + os.getenv('RPA_STREAMING_URL') + or os.getenv('STREAMING_BASE_URL') + or 'http://localhost:5005' +) + + +def _streaming_headers(): + headers = {'Accept': 'application/json'} + token = os.getenv('RPA_API_TOKEN', '').strip() + if token: + headers['Authorization'] = f'Bearer {token}' + return headers + + +def _fetch_streaming_json(endpoint, query_string=''): + import urllib.request + + endpoint = endpoint.strip('/') + url = f'{STREAMING_BASE_URL}/{endpoint}' + if query_string: + url = f'{url}?{query_string}' + req = urllib.request.Request(url, headers=_streaming_headers()) + with urllib.request.urlopen(req, timeout=5) as response: + return json.loads(response.read().decode()) + + +def _streaming_status_snapshot(): + """Compat legacy dashboard: agrège les endpoints streaming qui existent.""" + snapshot = _fetch_streaming_json('stats') + try: + sessions = _fetch_streaming_json('sessions') + snapshot['sessions'] = sessions.get('sessions', sessions if isinstance(sessions, list) else []) + except Exception as exc: + snapshot['sessions_error'] = str(exc) + try: + snapshot['processing'] = _fetch_streaming_json('processing/status') + except Exception as exc: + snapshot['processing_error'] = str(exc) + try: + replays = _fetch_streaming_json('replays') + replay_items = replays.get('replays', []) if isinstance(replays, dict) else [] + snapshot['replay'] = next((r for r in replay_items if r.get('active')), None) + snapshot['replays'] = replay_items + except Exception as exc: + snapshot['replay_error'] = str(exc) + return snapshot @app.route('/api/streaming/') def proxy_streaming(endpoint): """Proxy vers le serveur de streaming pour éviter les problèmes CORS.""" - import urllib.request import urllib.error try: - url = f'{STREAMING_BASE_URL}/{endpoint}' - req = urllib.request.Request(url, headers={'Accept': 'application/json'}) - with urllib.request.urlopen(req, timeout=5) as response: - data = json.loads(response.read().decode()) + clean_endpoint = endpoint.strip('/') + if clean_endpoint == 'status': + data = _streaming_status_snapshot() + else: + query_string = request.query_string.decode('utf-8') + data = _fetch_streaming_json(clean_endpoint, query_string) return jsonify(data) + return jsonify(data) + except urllib.error.HTTPError as e: + try: + payload = json.loads(e.read().decode()) + except Exception: + payload = {'error': e.reason} + return jsonify(payload), e.code except urllib.error.URLError as e: return jsonify({'error': f'Serveur streaming inaccessible: {e}'}), 502 except Exception as e: @@ -2273,10 +2438,17 @@ def process_mining_discover(): load_jsonl_session, PM4PY_AVAILABLE, ) - except ImportError: + except ImportError as exc: + missing = getattr(exc, "name", None) or str(exc) return jsonify({ 'error': "Module d'analyse non disponible", - 'detail': "Le module core.analytics.process_mining_bridge est introuvable.", + 'detail': ( + "Dépendance analytics manquante pendant l'import du bridge " + f"({missing}). Installer le bundle process-mining dans le venv DGX " + "avant de générer la cartographie." + ), + 'missing_dependency': missing, + 'action_required': "install-process-mining-dependencies", }), 503 if not PM4PY_AVAILABLE: diff --git a/web_dashboard/templates/index.html b/web_dashboard/templates/index.html index 970970209..aeb0e7fa8 100644 --- a/web_dashboard/templates/index.html +++ b/web_dashboard/templates/index.html @@ -1392,7 +1392,13 @@ // Status indicator const faissStatusEl = document.getElementById('faissStatus'); - if (faiss.error) { + if (faiss.status === 'metadata_invalid') { + faissStatusEl.textContent = '⚠️'; + faissStatusEl.title = 'Index brut présent, métadonnées invalides'; + } else if (faiss.status === 'metadata_missing') { + faissStatusEl.textContent = '⚠️'; + faissStatusEl.title = 'Index brut présent, métadonnées absentes'; + } else if (faiss.error) { faissStatusEl.textContent = '❌'; faissStatusEl.title = faiss.error; } else if (faiss.status === 'index_not_found') { @@ -1419,6 +1425,8 @@ if (faiss.nlist) details.push(`nlist: ${faiss.nlist}`); if (faiss.nprobe) details.push(`nprobe: ${faiss.nprobe}`); if (faiss.metadata_count) details.push(`Métadonnées: ${faiss.metadata_count}`); + if (faiss.metadata_status) details.push(`Metadata: ${faiss.metadata_status}`); + if (faiss.metadata_error) details.push(`Metadata error: ${faiss.metadata_error}`); document.getElementById('faissDetails').textContent = details.length > 0 ? details.join(' • ') : (faiss.error || faiss.status || 'Aucune info disponible'); @@ -1434,6 +1442,12 @@ if (faiss.status === 'index_not_found') { recommendations.push('📝 Traitez des sessions pour créer l\'index FAISS'); } + if (faiss.status === 'metadata_invalid') { + recommendations.push('⚠️ Index brut présent mais métadonnées invalides : re-signer ou régénérer les métadonnées FAISS'); + } + if (faiss.status === 'metadata_missing') { + recommendations.push('⚠️ Index brut présent sans métadonnées : reconstruire les métadonnées FAISS'); + } if (recommendations.length > 0) { recoEl.innerHTML = recommendations.join('
'); recoEl.style.display = 'block'; @@ -2818,10 +2832,29 @@ const detailsEl = document.getElementById('streamServerDetails'); try { - const data = await fetchJSON(`${STREAMING_BASE}/stats`); + const [data, processing] = await Promise.all([ + fetchJSON(`${STREAMING_BASE}/stats`), + fetchJSON(`${STREAMING_BASE}/processing/status`).catch(e => ({error: e.message})) + ]); - statusEl.innerHTML = ''; - statusEl.title = 'Serveur streaming en ligne'; + const processingReady = processing && processing.processing_ready === true; + // « En veille » (armé/lazy) ≠ « dégradé » : un worker neuf sans + // session a tous ses composants à false par design (chargement à la + // 1re session), ce n'est PAS une panne. Seul status==='degraded' + // (init tentée et en échec) est une vraie alerte. + const processingArmed = processing && (processing.armed === true || processing.status === 'idle'); + const processingDegraded = processing && !processing.error && processing.status === 'degraded'; + const statusHint = (processing && processing.status_hint) || ''; + statusEl.innerHTML = processingDegraded + ? '⚠️' + : processingArmed + ? '⏸️' + : ''; + statusEl.title = processingDegraded + ? `Streaming en ligne, worker apprentissage dégradé${statusHint ? ' — ' + statusHint : ''}` + : processingArmed + ? `Streaming en ligne, worker en veille${statusHint ? ' — ' + statusHint : ''}` + : 'Serveur streaming en ligne'; document.getElementById('streamActiveSessions').textContent = data.active_sessions || 0; document.getElementById('streamTotalEvents').textContent = data.total_events || 0; @@ -2837,6 +2870,31 @@ if (data.events_per_second !== undefined) rows.push({label: 'Événements/sec', value: (data.events_per_second || 0).toFixed(2)}); if (data.memory_usage_mb !== undefined) rows.push({label: 'Mémoire utilisée', value: Math.round(data.memory_usage_mb) + ' MB'}); if (data.server_version) rows.push({label: 'Version serveur', value: data.server_version}); + if (processing && !processing.error) { + const status = processing.status || 'unknown'; + const workerIcon = processingReady ? '✅' : (processingArmed ? '⏸️' : '⚠️'); + rows.push({ + label: 'Worker apprentissage', + value: `${workerIcon} ${status}` + }); + rows.push({ + label: 'Composants intelligence', + value: processing.components_ready + ? 'prêts' + : (processingArmed ? 'en veille (chargés à la 1re session)' : 'non prêts') + }); + if (statusHint) { + rows.push({label: 'Détail worker', value: statusHint}); + } + if (processing.queue_length !== undefined) { + rows.push({label: 'Queue apprentissage', value: processing.queue_length}); + } + if (processing.last_cycle) { + rows.push({label: 'Dernier cycle worker', value: new Date(processing.last_cycle).toLocaleString('fr-FR')}); + } + } else if (processing && processing.error) { + rows.push({label: 'Worker apprentissage', value: `❌ ${processing.error}`}); + } if (rows.length === 0) { // Afficher les données brutes si les clés attendues ne sont pas présentes