fix(dashboard,worker): vérité produit P0 — dashboard+worker+VWB export
Some checks failed
tests / Lint (ruff + black) (push) Failing after 1m46s
tests / Tests unitaires (sans GPU) (push) Failing after 2m0s
tests / Tests sécurité (critique) (push) Has been skipped

War-room clôture DGX 2026-06-18 (recadrage Dom : graphe/apprentissage/mémoire/dashboard = surface produit P0).
Le dashboard et le statut worker affichaient des états faux ; corrige pour refléter la vérité du produit.

- dashboard FAISS: distingue index brut / metadata HMAC invalide / runtime / absent (plus de faux "inactif")
- dashboard process-mining: 503 explicite missing_dependency (plus de message trompeur)
- dashboard /api/workflows + system/status: lecture DB VWB v3 canonique (total réel = 24, plus de 0)
- worker /processing/status: véridique (lit _worker_health.json) + statut "idle/armé (lazy)" distinct de "dégradé (échec)"
- VWB export: N steps -> N actions/edges (dernière action n'est plus perdue)
- tests: dashboard routes, worker status truthfulness, export VWB

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-18 17:50:12 +02:00
parent 6d5ef51c60
commit ec1fb81054
8 changed files with 626 additions and 56 deletions

View File

@@ -555,6 +555,7 @@ LIVE_SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
_DATA_DIR = ROOT_DIR / "data" / "training" _DATA_DIR = ROOT_DIR / "data" / "training"
WORKER_QUEUE_FILE = _DATA_DIR / "_worker_queue.txt" WORKER_QUEUE_FILE = _DATA_DIR / "_worker_queue.txt"
REPLAY_LOCK_FILE = _DATA_DIR / "_replay_active.lock" REPLAY_LOCK_FILE = _DATA_DIR / "_replay_active.lock"
WORKER_HEALTH_FILE = _DATA_DIR / "_worker_health.json"
# Instance globale partagée (le StreamProcessor reste dans le serveur HTTP # Instance globale partagée (le StreamProcessor reste dans le serveur HTTP
# pour le CLIP, l'indexation FAISS, la gestion des sessions, le replay — # pour le CLIP, l'indexation FAISS, la gestion des sessions, le replay —
@@ -807,7 +808,7 @@ def _memory_window_title_for_action(action_meta: Dict[str, Any]) -> str:
def _get_worker_queue_status() -> Dict[str, Any]: def _get_worker_queue_status() -> Dict[str, Any]:
"""Retourne l'état de la queue du worker VLM (pour le monitoring).""" """Retourne l'état réel de la queue et du worker VLM (pour le monitoring)."""
queue = [] queue = []
if WORKER_QUEUE_FILE.exists(): if WORKER_QUEUE_FILE.exists():
try: try:
@@ -819,16 +820,108 @@ def _get_worker_queue_status() -> Dict[str, Any]:
except Exception: except Exception:
pass pass
health = None
health_error = None
health_age_seconds = None
if WORKER_HEALTH_FILE.exists():
try:
health = json.loads(WORKER_HEALTH_FILE.read_text(encoding="utf-8"))
health_age_seconds = max(0.0, time.time() - WORKER_HEALTH_FILE.stat().st_mtime)
except Exception as exc:
health_error = str(exc)
health_stale = health_age_seconds is None or health_age_seconds > 180
components = (health or {}).get("components") or {}
components_ready = bool(components) and all(bool(v) for v in components.values())
health_status = (health or {}).get("status")
running = bool(health) and not health_stale and health_status != "stopped"
# Distinction VEILLE (armé, lazy) vs DÉGRADÉ (vrai échec).
#
# Les composants lourds (ScreenAnalyzer/CLIP/FAISS/StateEmbedding) sont
# chargés en lazy par run_worker : le processor n'est instancié qu'au
# premier _process_session (cf. run_worker._get_processor / _process_session).
# Un worker neuf qui n'a jamais reçu de session écrit donc status="healthy"
# avec tous les composants à false — c'est l'état NORMAL « en veille », pas
# une panne. L'étiqueter "degraded" fait lire une panne là où il n'y en a pas.
#
# Signal retenu pour « init jamais tentée » : TOUS les composants à false ET
# sessions_processed == 0 ET sessions_failed == 0. Justification : run_worker
# n'appelle _get_processor() (donc l'init lazy) que dans _process_session, qui
# incrémente toujours exactement un compteur (processed / failed / skipped).
# Tant que processed == 0 ET failed == 0, aucune session n'a déclenché une
# init suivie d'un traitement — le worker est armé en attente. Un simple skip
# (dossier/shots absents) passe quand même par _get_processor() : les
# composants se chargent, donc tous-à-false devient faux et on n'entre pas ici.
# run_worker._health_components() écrit toujours les 4 clés (jamais un dict
# vide), d'où le test sur les VALEURS et non sur la présence des clés.
# Si run_worker a lui-même forcé status="degraded" (VLM + ScreenAnalyzer
# absent, cf. run_worker._write_health), c'est un VRAI échec : on le conserve.
stats = (health or {}).get("stats") or {}
init_attempted = bool(stats.get("sessions_processed", 0)) or bool(
stats.get("sessions_failed", 0)
)
components_all_false = bool(components) and not any(
bool(v) for v in components.values()
)
armed = (
running
and not components_ready
and health_status == "healthy"
and components_all_false # aucun composant lourd encore chargé
and not init_attempted
)
status = health_status or "unknown"
if not running:
status = "stale" if health else "unknown"
elif armed:
# En veille : worker sain, composants chargés à la 1re session.
status = "idle"
elif not components_ready:
status = "degraded"
return { return {
"running": True, # On ne sait pas si le worker process tourne, mais la queue existe "running": running,
"status": status,
"armed": armed,
"queue_length": len(queue), "queue_length": len(queue),
"queue": queue, "queue": queue,
"replay_lock_active": REPLAY_LOCK_FILE.exists(), "replay_lock_active": REPLAY_LOCK_FILE.exists(),
"queue_file": str(WORKER_QUEUE_FILE), "queue_file": str(WORKER_QUEUE_FILE),
"note": "Le worker VLM tourne dans un process séparé (run_worker.py)", "health_file": str(WORKER_HEALTH_FILE),
"health_error": health_error,
"health_age_seconds": health_age_seconds,
"health_stale": health_stale,
"worker_pid": (health or {}).get("pid"),
"last_cycle": (health or {}).get("last_cycle"),
"current_session": (health or {}).get("current_session"),
"components": components,
"components_ready": components_ready,
"processing_ready": running and not REPLAY_LOCK_FILE.exists() and components_ready,
"status_hint": _worker_status_hint(status, armed),
"stats": stats,
"note": "Le worker VLM tourne dans un process séparé (agent_v0.server_v1.run_worker).",
} }
def _worker_status_hint(status: str, armed: bool) -> str:
"""Message humain pour le statut worker (consommé par le dashboard)."""
if armed or status == "idle":
return "En veille — composants chargés à la 1re session."
if status == "degraded":
return "Worker apprentissage dégradé — init des composants en échec."
if status == "stale":
return "Health file périmé (> 180s) — worker peut-être arrêté."
if status == "stopped":
return "Worker arrêté."
if status == "busy":
return "Traitement d'une session en cours."
if status == "healthy":
return "Worker prêt — composants chargés."
return "État worker inconnu."
# ========================================================================= # =========================================================================
# Compteur d'analyses en cours par session (pour attendre avant finalize) # Compteur d'analyses en cours par session (pour attendre avant finalize)
# ========================================================================= # =========================================================================
@@ -1614,13 +1707,14 @@ async def startup():
threading.Thread(target=_smoke_model_health, name="model-health-smoke", daemon=True).start() threading.Thread(target=_smoke_model_health, name="model-health-smoke", daemon=True).start()
# Afficher le token API au démarrage pour que l'utilisateur puisse configurer l'agent # Ne jamais imprimer le token complet dans journald/stdout.
_token_source = "env RPA_API_TOKEN" if os.environ.get("RPA_API_TOKEN") else "auto-généré" _token_source = "env RPA_API_TOKEN" if os.environ.get("RPA_API_TOKEN") else "auto-généré"
logger.info(f"API Token ({_token_source}): {API_TOKEN}") _token_hint = f"{API_TOKEN[:8]}{API_TOKEN[-4:]}" if API_TOKEN else "<absent>"
logger.info("API Token (%s): %s — auth Bearer obligatoire", _token_source, _token_hint)
print(f"\n{'='*60}") print(f"\n{'='*60}")
print(f" API Token ({_token_source}):") print(f" API Token ({_token_source}):")
print(f" {API_TOKEN}") print(f" {_token_hint} (masqué)")
print(f" Configurer l'agent : export RPA_API_TOKEN={API_TOKEN}") print(" Configurer l'agent via .env.local ou l'enrollment; ne pas copier depuis les logs.")
print(f"{'='*60}\n") print(f"{'='*60}\n")
worker.start(blocking=False) worker.start(blocking=False)

View File

@@ -119,8 +119,10 @@ def test_export_vwb_workflow_with_pause_step():
] ]
core = convert_vwb_to_core_workflow(workflow_data, steps_data) core = convert_vwb_to_core_workflow(workflow_data, steps_data)
assert core["learning_state"] == "COACHING" assert core["learning_state"] == "COACHING"
assert len(core["nodes"]) == 3 assert len(core["nodes"]) == 4
assert len(core["edges"]) == 2 assert len(core["edges"]) == 3
assert core["edges"][-1]["action"]["type"] == "mouse_click"
assert core["nodes"][-1]["is_end"] is True
# L'edge sortant du node de pause doit avoir le bon type + message # L'edge sortant du node de pause doit avoir le bon type + message
pause_edges = [ pause_edges = [

View File

@@ -1289,3 +1289,158 @@ class TestAPIEndpoints:
assert len(workflows) == 1 assert len(workflows) == 1
assert workflows[0]["workflow_id"] == "wf_api_001" assert workflows[0]["workflow_id"] == "wf_api_001"
assert workflows[0]["nodes"] == 2 assert workflows[0]["nodes"] == 2
class TestWorkerStatusTruthfulness:
"""Truthfulness du statut worker exposé par _get_worker_queue_status.
Distingue VEILLE (armé, lazy : worker neuf qui n'a jamais traité de
session, composants chargés à la 1re session) de DÉGRADÉ (init tentée
et en échec). Un worker en veille ne doit JAMAIS être étiqueté 'degraded'.
"""
# Même contrainte que TestAPIEndpoints : api_stream fail-closed à l'import
# si RPA_API_TOKEN absent.
_TEST_API_TOKEN = "test_token_for_worker_status_0123456789abcdef"
@pytest.fixture(autouse=True)
def _ensure_api_token(self, monkeypatch):
monkeypatch.setenv("RPA_API_TOKEN", self._TEST_API_TOKEN)
@pytest.fixture
def status_env(self, tmp_path, monkeypatch):
"""Isole les fichiers worker (health/queue/lock) sur tmp_path."""
from agent_v0.server_v1 import api_stream
health_file = tmp_path / "_worker_health.json"
queue_file = tmp_path / "_worker_queue.txt"
lock_file = tmp_path / "_replay_active.lock"
monkeypatch.setattr(api_stream, "WORKER_HEALTH_FILE", health_file)
monkeypatch.setattr(api_stream, "WORKER_QUEUE_FILE", queue_file)
monkeypatch.setattr(api_stream, "REPLAY_LOCK_FILE", lock_file)
return api_stream, health_file
@staticmethod
def _write_health(health_file, **overrides):
"""Écrit un health file frais (mtime récent => non stale)."""
payload = {
"pid": 1234,
"started_at": "2026-06-18T10:00:00",
"last_cycle": "2026-06-18T10:00:30",
"current_session": None,
"queue_length": 0,
"components": {
"screen_analyzer": False,
"clip_embedder": False,
"faiss_manager": False,
"state_embedding_builder": False,
},
"stats": {
"sessions_processed": 0,
"sessions_failed": 0,
"sessions_skipped": 0,
"total_screenshots_analyzed": 0,
},
"status": "healthy",
}
payload.update(overrides)
health_file.write_text(json.dumps(payload), encoding="utf-8")
def test_fresh_worker_is_idle_not_degraded(self, status_env):
"""Worker neuf : healthy, 0 session, tous composants false
=> statut 'idle' (en veille / armé), PAS 'degraded'."""
api_stream, health_file = status_env
self._write_health(health_file) # défaut = état neuf
status = api_stream._get_worker_queue_status()
assert status["running"] is True
assert status["status"] == "idle", status
assert status["armed"] is True
assert status["components_ready"] is False
# processing_ready reste False tant que les composants ne sont pas chargés
assert status["processing_ready"] is False
assert "veille" in status["status_hint"].lower()
def test_worker_init_failed_is_degraded(self, status_env):
"""Init tentée et en échec : run_worker force status='degraded'
(VLM + ScreenAnalyzer absent) => on conserve 'degraded'."""
api_stream, health_file = status_env
self._write_health(
health_file,
status="degraded", # forcé par run_worker._write_health
components={
"screen_analyzer": False,
"clip_embedder": True,
"faiss_manager": True,
"state_embedding_builder": False,
},
stats={
"sessions_processed": 0,
"sessions_failed": 1, # une session a tenté l'init et échoué
"sessions_skipped": 0,
"total_screenshots_analyzed": 0,
},
)
status = api_stream._get_worker_queue_status()
assert status["running"] is True
assert status["status"] == "degraded", status
assert status["armed"] is False
assert status["processing_ready"] is False
assert "dégradé" in status["status_hint"].lower()
def test_worker_partial_components_after_attempt_is_degraded(self, status_env):
"""Composants partiels après tentative de traitement (sessions_failed>0),
sans status forcé par le worker => 'degraded' (pas 'idle')."""
api_stream, health_file = status_env
self._write_health(
health_file,
status="healthy",
components={
"screen_analyzer": True,
"clip_embedder": True,
"faiss_manager": False, # un composant manquant
"state_embedding_builder": True,
},
stats={
"sessions_processed": 0,
"sessions_failed": 2,
"sessions_skipped": 0,
"total_screenshots_analyzed": 0,
},
)
status = api_stream._get_worker_queue_status()
assert status["status"] == "degraded", status
assert status["armed"] is False
def test_worker_ready_after_processing_is_healthy(self, status_env):
"""Worker ayant traité au moins une session, tous composants chargés
=> 'healthy' et processing_ready=True."""
api_stream, health_file = status_env
self._write_health(
health_file,
status="healthy",
components={
"screen_analyzer": True,
"clip_embedder": True,
"faiss_manager": True,
"state_embedding_builder": True,
},
stats={
"sessions_processed": 3,
"sessions_failed": 0,
"sessions_skipped": 0,
"total_screenshots_analyzed": 42,
},
)
status = api_stream._get_worker_queue_status()
assert status["status"] == "healthy", status
assert status["armed"] is False
assert status["components_ready"] is True
assert status["processing_ready"] is True

View File

@@ -296,9 +296,11 @@ def test_export_workflow_with_t2a_chain():
] ]
core = convert_vwb_to_core_workflow(workflow_data, steps_data) core = convert_vwb_to_core_workflow(workflow_data, steps_data)
edge_types = [e["action"]["type"] for e in core["edges"]] edge_types = [e["action"]["type"] for e in core["edges"]]
assert len(core["edges"]) == len(steps_data)
assert "extract_text" in edge_types assert "extract_text" in edge_types
assert "t2a_decision" in edge_types assert "t2a_decision" in edge_types
assert "pause_for_human" in edge_types assert "pause_for_human" in edge_types
assert edge_types[-1] == "mouse_click"
# Vérifier que le templating est bien transporté # Vérifier que le templating est bien transporté
t2a_edge = next(e for e in core["edges"] if e["action"]["type"] == "t2a_decision") t2a_edge = next(e for e in core["edges"] if e["action"]["type"] == "t2a_decision")
assert t2a_edge["action"]["parameters"]["input_template"] == "{{dpi}}" assert t2a_edge["action"]["parameters"]["input_template"] == "{{dpi}}"

View File

@@ -114,6 +114,37 @@ class TestDashboardRoutes:
'timeout': 30, 'timeout': 30,
}] }]
def test_streaming_status_snapshot_aggregates_existing_endpoints(self, monkeypatch):
"""Le proxy legacy /status agrège les endpoints streaming reels."""
calls = []
def fake_fetch(endpoint, query_string=''):
calls.append((endpoint, query_string))
if endpoint == 'stats':
return {'active_sessions': 1, 'total_events': 7}
if endpoint == 'sessions':
return {'sessions': [{'session_id': 's1'}]}
if endpoint == 'processing/status':
return {'status': 'degraded', 'components_ready': False}
if endpoint == 'replays':
return {'replays': [{'replay_id': 'r1', 'active': True}]}
raise AssertionError(endpoint)
monkeypatch.setattr(dashboard_app, '_fetch_streaming_json', fake_fetch)
snapshot = dashboard_app._streaming_status_snapshot()
assert snapshot['active_sessions'] == 1
assert snapshot['sessions'] == [{'session_id': 's1'}]
assert snapshot['processing']['status'] == 'degraded'
assert snapshot['replay']['replay_id'] == 'r1'
assert calls == [
('stats', ''),
('sessions', ''),
('processing/status', ''),
('replays', ''),
]
def test_dashboard_submit_competence_verdict(self, client, monkeypatch): def test_dashboard_submit_competence_verdict(self, client, monkeypatch):
"""Le dashboard journalise un verdict sans write-back YAML.""" """Le dashboard journalise un verdict sans write-back YAML."""
import core.competences.verdicts as verdicts_module import core.competences.verdicts as verdicts_module
@@ -212,6 +243,58 @@ class TestDashboardRoutes:
data = resp.get_json() data = resp.get_json()
assert 'workflows' in data assert 'workflows' in data
def test_workflows_list_reads_vwb_db(self, client, monkeypatch, tmp_path):
"""Régression red-gate : /api/workflows reflète la base VWB v3, pas 0.
Avant correctif l'endpoint globait un store JSON vide et renvoyait
toujours total:0. On construit une DB VWB minimale (schéma canonique
workflows + steps) et on vérifie que l'endpoint expose le compte réel.
"""
import sqlite3
from pathlib import Path
db_path = tmp_path / "instance" / "workflows.db"
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute(
"CREATE TABLE workflows (id VARCHAR(64) PRIMARY KEY, name VARCHAR(255), "
"description TEXT, created_at DATETIME, updated_at DATETIME, "
"is_active BOOLEAN, source VARCHAR(64), review_status VARCHAR(32))"
)
conn.execute(
"CREATE TABLE steps (id VARCHAR(64) PRIMARY KEY, workflow_id VARCHAR(64), "
"action_type VARCHAR(64))"
)
conn.execute(
"INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)",
("wf_aiva", "Urgence_aiva_demo", "demo", "2026-06-01", "2026-06-18",
1, "manual", ""),
)
conn.execute(
"INSERT INTO workflows VALUES (?,?,?,?,?,?,?,?)",
("wf_learned", "Learned_flow", "", "2026-06-02", "2026-06-17",
1, "learned_import", "pending"),
)
# 3 steps pour wf_aiva → nodes_count attendu = 3
for i in range(3):
conn.execute(
"INSERT INTO steps VALUES (?,?,?)", (f"s{i}", "wf_aiva", "click")
)
conn.commit()
conn.close()
monkeypatch.setattr(dashboard_app, "VWB_DB_PATH", Path(db_path))
resp = client.get('/api/workflows')
assert resp.status_code == 200
data = resp.get_json()
assert data['total'] == 2, f"attendu 2 workflows, obtenu {data['total']}"
names = {w['name'] for w in data['workflows']}
assert 'Urgence_aiva_demo' in names
aiva = next(w for w in data['workflows'] if w['name'] == 'Urgence_aiva_demo')
assert aiva['nodes_count'] == 3
assert aiva['source'] == 'manual'
def test_sessions_list(self, client): def test_sessions_list(self, client):
"""L'API sessions retourne la liste.""" """L'API sessions retourne la liste."""
resp = client.get('/api/agent/sessions') resp = client.get('/api/agent/sessions')

View File

@@ -480,21 +480,24 @@ def convert_vwb_to_core_workflow(
now = datetime.now().isoformat() now = datetime.now().isoformat()
wf_id = workflow_data.get("id", f"wf_{uuid.uuid4().hex[:12]}") wf_id = workflow_data.get("id", f"wf_{uuid.uuid4().hex[:12]}")
# Créer les nodes : un node par étape (chaque étape = un état écran) # Les actions sont portées par les edges. N étapes VWB doivent donc donner
# N edges core, et N+1 états écran (avant chaque action + terminal).
nodes = [] nodes = []
edges = [] edges = []
for idx, step in enumerate(steps_data): node_count = len(steps_data) + 1 if steps_data else 0
for idx in range(node_count):
step = steps_data[idx] if idx < len(steps_data) else {}
node_id = f"node_{idx:03d}" node_id = f"node_{idx:03d}"
action_type = step.get("action_type", "click_anchor") action_type = step.get("action_type", "click_anchor")
params = step.get("parameters", {}) params = step.get("parameters", {})
label = step.get("label", action_type) label = step.get("label", action_type) if idx < len(steps_data) else "Fin du workflow"
# Créer le node (template minimal) # Créer le node (template minimal)
node = { node = {
"node_id": node_id, "node_id": node_id,
"name": label, "name": label,
"description": f"Étape {idx + 1} : {label}", "description": f"Étape {idx + 1} : {label}" if idx < len(steps_data) else "État terminal",
"template": { "template": {
"window": { "window": {
"title_pattern": params.get("window_title"), "title_pattern": params.get("window_title"),
@@ -518,7 +521,7 @@ def convert_vwb_to_core_workflow(
}, },
}, },
"is_entry": idx == 0, "is_entry": idx == 0,
"is_end": idx == len(steps_data) - 1, "is_end": idx == node_count - 1,
"variants": [], "variants": [],
"primary_variant_id": None, "primary_variant_id": None,
"max_variants": 5, "max_variants": 5,
@@ -529,12 +532,13 @@ def convert_vwb_to_core_workflow(
"metadata": { "metadata": {
"vwb_step_id": step.get("id", ""), "vwb_step_id": step.get("id", ""),
"visual_type": _action_type_to_visual(action_type), "visual_type": _action_type_to_visual(action_type),
"terminal": idx >= len(steps_data),
}, },
} }
nodes.append(node) nodes.append(node)
# Créer l'edge vers le node suivant (sauf pour le dernier) # Créer un edge/action pour chaque step VWB, y compris la dernière.
if idx < len(steps_data) - 1: if idx < len(steps_data):
next_node_id = f"node_{idx + 1:03d}" next_node_id = f"node_{idx + 1:03d}"
# Convertir l'action VWB → action core # Convertir l'action VWB → action core

View File

@@ -189,6 +189,20 @@ SESSIONS_PATH = DATA_PATH / "sessions"
WORKFLOWS_PATH = DATA_PATH / "workflows" WORKFLOWS_PATH = DATA_PATH / "workflows"
LOGS_PATH = BASE_PATH / "logs" LOGS_PATH = BASE_PATH / "logs"
# Source canonique des workflows (décision produit D3) : la base VWB v3
# (SQLAlchemy/SQLite) que Léa lit déjà au runtime. Chemin absolu robuste (PAS la
# DB fantôme vide à la racine du repo `instance/workflows.db`, schéma obsolète,
# ni l'ancien store JSON `data/training/workflows/` créé vide sur DGX).
# Surchargeable via RPA_VWB_DB_PATH pour les déploiements atypiques.
def _resolve_vwb_db_path() -> Path:
override = os.getenv("RPA_VWB_DB_PATH", "").strip()
if override:
return Path(override).expanduser()
return BASE_PATH / "visual_workflow_builder" / "backend" / "instance" / "workflows.db"
VWB_DB_PATH = _resolve_vwb_db_path()
# StorageManager # StorageManager
storage = StorageManager(base_path=str(DATA_PATH)) storage = StorageManager(base_path=str(DATA_PATH))
@@ -261,7 +275,9 @@ def system_status():
"""Statut du système.""" """Statut du système."""
try: try:
sessions_count = len(list(SESSIONS_PATH.glob('*'))) if SESSIONS_PATH.exists() else 0 sessions_count = len(list(SESSIONS_PATH.glob('*'))) if SESSIONS_PATH.exists() else 0
workflows_count = len(list(WORKFLOWS_PATH.glob('*.json'))) if WORKFLOWS_PATH.exists() else 0 # Source canonique D3 : base VWB v3 (même comptage que /api/workflows),
# pas l'ancien store JSON `data/training/workflows/` créé vide sur DGX.
workflows_count = len(_load_workflows_from_vwb_db())
dependencies_ok = True dependencies_ok = True
try: try:
@@ -296,8 +312,26 @@ def system_performance():
faiss_metadata_path = DATA_PATH / "faiss_index" / "main.metadata" faiss_metadata_path = DATA_PATH / "faiss_index" / "main.metadata"
if faiss_index_path.exists() and faiss_metadata_path.exists(): if faiss_index_path.exists() and faiss_metadata_path.exists():
try:
fm = FAISSManager.load(faiss_index_path, faiss_metadata_path) fm = FAISSManager.load(faiss_index_path, faiss_metadata_path)
faiss_stats = fm.get_stats() faiss_stats = fm.get_stats()
faiss_stats.setdefault("status", "active")
faiss_stats.setdefault("metadata_status", "valid")
except Exception as e:
faiss_stats = _read_raw_faiss_index_stats(faiss_index_path)
faiss_stats.update({
"status": "metadata_invalid",
"metadata_status": "invalid",
"metadata_error": str(e),
"action_required": "re-sign-or-rebuild-faiss-metadata",
})
elif faiss_index_path.exists():
faiss_stats = _read_raw_faiss_index_stats(faiss_index_path)
faiss_stats.update({
"status": "metadata_missing",
"metadata_status": "missing",
"action_required": "rebuild-faiss-metadata",
})
else: else:
faiss_stats = {"total_vectors": 0, "status": "index_not_found"} faiss_stats = {"total_vectors": 0, "status": "index_not_found"}
except Exception as e: except Exception as e:
@@ -319,6 +353,26 @@ def system_performance():
return jsonify({'error': str(e)}), 500 return jsonify({'error': str(e)}), 500
def _read_raw_faiss_index_stats(index_path: Path) -> dict:
"""Read non-authoritative FAISS stats when signed metadata cannot load."""
try:
import faiss
index = faiss.read_index(str(index_path))
return {
"raw_index_available": True,
"total_vectors": int(getattr(index, "ntotal", 0)),
"dimensions": int(getattr(index, "d", 0)),
"is_trained": bool(getattr(index, "is_trained", False)),
"index_type": type(index).__name__.replace("Index", "") or "FAISS",
}
except Exception as e:
return {
"raw_index_available": False,
"total_vectors": 0,
"error": f"Index FAISS illisible: {e}",
}
@app.route('/api/system/faiss/test', methods=['POST']) @app.route('/api/system/faiss/test', methods=['POST'])
def test_faiss_index(): def test_faiss_index():
"""Teste l'index FAISS avec une recherche aléatoire.""" """Teste l'index FAISS avec une recherche aléatoire."""
@@ -785,36 +839,83 @@ def rename_session_workflow(session_id):
# API Workflows # API Workflows
# ============================================================================= # =============================================================================
def _load_workflows_from_vwb_db() -> list:
"""Charge les workflows depuis la base VWB v3 (source canonique D3).
Lit directement le SQLite que Léa interroge au runtime (cf.
`agent_chat/app.py` → `GET /api/v3/session/state`). On compte les `steps`
par workflow pour `nodes_count` (pas de notion d'`edges` en DAG linéaire :
`edges_count` = max(steps-1, 0)). Robuste à l'absence de la DB ou des
colonnes `source`/`review_status` (DB ancienne) : retourne [] sans planter.
"""
import sqlite3
if not VWB_DB_PATH.exists():
return []
workflows = []
conn = sqlite3.connect(str(VWB_DB_PATH))
try:
conn.row_factory = sqlite3.Row
# Colonnes disponibles (la DB fantôme/ancienne n'a pas source/review_status)
cols = {row[1] for row in conn.execute("PRAGMA table_info(workflows)")}
has_source = 'source' in cols
has_review = 'review_status' in cols
select_cols = ['id', 'name', 'description', 'created_at', 'updated_at']
if has_source:
select_cols.append('source')
if has_review:
select_cols.append('review_status')
# Nombre de steps par workflow (= nodes du DAG)
step_counts = {
row[0]: row[1]
for row in conn.execute(
"SELECT workflow_id, COUNT(*) FROM steps GROUP BY workflow_id"
)
}
rows = conn.execute(
f"SELECT {', '.join(select_cols)} FROM workflows ORDER BY updated_at DESC"
).fetchall()
for row in rows:
wf_id = row['id']
nodes_count = step_counts.get(wf_id, 0)
workflows.append({
'workflow_id': wf_id,
'name': row['name'] or wf_id,
'description': row['description'] or '',
'nodes_count': nodes_count,
'edges_count': max(nodes_count - 1, 0),
'learning_state': 'OBSERVATION',
'created_at': str(row['created_at'] or ''),
'updated_at': str(row['updated_at'] or ''),
'execution_count': 0,
'source': row['source'] if has_source else 'manual',
'review_status': row['review_status'] if has_review else '',
'file_path': f"vwb_db://{wf_id}",
})
finally:
conn.close()
return workflows
@app.route('/api/workflows') @app.route('/api/workflows')
def list_workflows(): def list_workflows():
"""Liste tous les workflows.""" """Liste tous les workflows depuis la base VWB v3 (source canonique D3).
Avant ce correctif, l'endpoint globait `data/training/workflows/*.json`
(ancien store JSON, créé vide sur DGX) et renvoyait toujours `total: 0`,
rendant la surface « ce que Léa sait » faussement vide. On lit désormais la
même base SQLite que Léa au runtime.
"""
try: try:
workflows = []
hide_unnamed = request.args.get('hide_unnamed', 'true').lower() == 'true' hide_unnamed = request.args.get('hide_unnamed', 'true').lower() == 'true'
if not WORKFLOWS_PATH.exists(): workflows = _load_workflows_from_vwb_db()
WORKFLOWS_PATH.mkdir(parents=True, exist_ok=True)
return jsonify({'workflows': [], 'total': 0, 'hidden_unnamed': 0})
for wf_file in WORKFLOWS_PATH.glob('*.json'):
try:
with open(wf_file, 'r') as f:
wf_data = json.load(f)
workflows.append({
'workflow_id': wf_data.get('workflow_id', wf_file.stem),
'name': wf_data.get('name', wf_file.stem),
'description': wf_data.get('description', ''),
'nodes_count': len(wf_data.get('nodes', [])),
'edges_count': len(wf_data.get('edges', [])),
'learning_state': wf_data.get('learning_state', 'OBSERVATION'),
'created_at': wf_data.get('created_at', ''),
'updated_at': wf_data.get('updated_at', ''),
'execution_count': wf_data.get('execution_count', 0),
'file_path': str(wf_file)
})
except Exception as e:
print(f"Erreur lecture workflow {wf_file}: {e}")
# Filtrer les workflows "Unnamed" si demandé # Filtrer les workflows "Unnamed" si demandé
if hide_unnamed: if hide_unnamed:
@@ -1970,19 +2071,83 @@ def import_config():
# API Streaming - Proxy vers le serveur de streaming (port 5005) # API Streaming - Proxy vers le serveur de streaming (port 5005)
# ============================================================================= # =============================================================================
STREAMING_BASE_URL = 'http://localhost:5005/api/v1/traces/stream' def _normalize_streaming_base_url(raw_url: str) -> str:
"""Normalise l'URL du serveur streaming vers le préfixe API attendu."""
base = (raw_url or 'http://localhost:5005').strip().rstrip('/')
if base.endswith('/api/v1/traces/stream'):
return base
if base.endswith('/api/v1/traces'):
return f'{base}/stream'
return f'{base}/api/v1/traces/stream'
STREAMING_BASE_URL = _normalize_streaming_base_url(
os.getenv('RPA_STREAMING_URL')
or os.getenv('STREAMING_BASE_URL')
or 'http://localhost:5005'
)
def _streaming_headers():
headers = {'Accept': 'application/json'}
token = os.getenv('RPA_API_TOKEN', '').strip()
if token:
headers['Authorization'] = f'Bearer {token}'
return headers
def _fetch_streaming_json(endpoint, query_string=''):
import urllib.request
endpoint = endpoint.strip('/')
url = f'{STREAMING_BASE_URL}/{endpoint}'
if query_string:
url = f'{url}?{query_string}'
req = urllib.request.Request(url, headers=_streaming_headers())
with urllib.request.urlopen(req, timeout=5) as response:
return json.loads(response.read().decode())
def _streaming_status_snapshot():
"""Compat legacy dashboard: agrège les endpoints streaming qui existent."""
snapshot = _fetch_streaming_json('stats')
try:
sessions = _fetch_streaming_json('sessions')
snapshot['sessions'] = sessions.get('sessions', sessions if isinstance(sessions, list) else [])
except Exception as exc:
snapshot['sessions_error'] = str(exc)
try:
snapshot['processing'] = _fetch_streaming_json('processing/status')
except Exception as exc:
snapshot['processing_error'] = str(exc)
try:
replays = _fetch_streaming_json('replays')
replay_items = replays.get('replays', []) if isinstance(replays, dict) else []
snapshot['replay'] = next((r for r in replay_items if r.get('active')), None)
snapshot['replays'] = replay_items
except Exception as exc:
snapshot['replay_error'] = str(exc)
return snapshot
@app.route('/api/streaming/<path:endpoint>') @app.route('/api/streaming/<path:endpoint>')
def proxy_streaming(endpoint): def proxy_streaming(endpoint):
"""Proxy vers le serveur de streaming pour éviter les problèmes CORS.""" """Proxy vers le serveur de streaming pour éviter les problèmes CORS."""
import urllib.request
import urllib.error import urllib.error
try: try:
url = f'{STREAMING_BASE_URL}/{endpoint}' clean_endpoint = endpoint.strip('/')
req = urllib.request.Request(url, headers={'Accept': 'application/json'}) if clean_endpoint == 'status':
with urllib.request.urlopen(req, timeout=5) as response: data = _streaming_status_snapshot()
data = json.loads(response.read().decode()) else:
query_string = request.query_string.decode('utf-8')
data = _fetch_streaming_json(clean_endpoint, query_string)
return jsonify(data) return jsonify(data)
return jsonify(data)
except urllib.error.HTTPError as e:
try:
payload = json.loads(e.read().decode())
except Exception:
payload = {'error': e.reason}
return jsonify(payload), e.code
except urllib.error.URLError as e: except urllib.error.URLError as e:
return jsonify({'error': f'Serveur streaming inaccessible: {e}'}), 502 return jsonify({'error': f'Serveur streaming inaccessible: {e}'}), 502
except Exception as e: except Exception as e:
@@ -2273,10 +2438,17 @@ def process_mining_discover():
load_jsonl_session, load_jsonl_session,
PM4PY_AVAILABLE, PM4PY_AVAILABLE,
) )
except ImportError: except ImportError as exc:
missing = getattr(exc, "name", None) or str(exc)
return jsonify({ return jsonify({
'error': "Module d'analyse non disponible", 'error': "Module d'analyse non disponible",
'detail': "Le module core.analytics.process_mining_bridge est introuvable.", 'detail': (
"Dépendance analytics manquante pendant l'import du bridge "
f"({missing}). Installer le bundle process-mining dans le venv DGX "
"avant de générer la cartographie."
),
'missing_dependency': missing,
'action_required': "install-process-mining-dependencies",
}), 503 }), 503
if not PM4PY_AVAILABLE: if not PM4PY_AVAILABLE:

View File

@@ -1392,7 +1392,13 @@
// Status indicator // Status indicator
const faissStatusEl = document.getElementById('faissStatus'); const faissStatusEl = document.getElementById('faissStatus');
if (faiss.error) { if (faiss.status === 'metadata_invalid') {
faissStatusEl.textContent = '⚠️';
faissStatusEl.title = 'Index brut présent, métadonnées invalides';
} else if (faiss.status === 'metadata_missing') {
faissStatusEl.textContent = '⚠️';
faissStatusEl.title = 'Index brut présent, métadonnées absentes';
} else if (faiss.error) {
faissStatusEl.textContent = '❌'; faissStatusEl.textContent = '❌';
faissStatusEl.title = faiss.error; faissStatusEl.title = faiss.error;
} else if (faiss.status === 'index_not_found') { } else if (faiss.status === 'index_not_found') {
@@ -1419,6 +1425,8 @@
if (faiss.nlist) details.push(`nlist: ${faiss.nlist}`); if (faiss.nlist) details.push(`nlist: ${faiss.nlist}`);
if (faiss.nprobe) details.push(`nprobe: ${faiss.nprobe}`); if (faiss.nprobe) details.push(`nprobe: ${faiss.nprobe}`);
if (faiss.metadata_count) details.push(`Métadonnées: ${faiss.metadata_count}`); if (faiss.metadata_count) details.push(`Métadonnées: ${faiss.metadata_count}`);
if (faiss.metadata_status) details.push(`Metadata: ${faiss.metadata_status}`);
if (faiss.metadata_error) details.push(`Metadata error: ${faiss.metadata_error}`);
document.getElementById('faissDetails').textContent = details.length > 0 ? document.getElementById('faissDetails').textContent = details.length > 0 ?
details.join(' • ') : (faiss.error || faiss.status || 'Aucune info disponible'); details.join(' • ') : (faiss.error || faiss.status || 'Aucune info disponible');
@@ -1434,6 +1442,12 @@
if (faiss.status === 'index_not_found') { if (faiss.status === 'index_not_found') {
recommendations.push('📝 Traitez des sessions pour créer l\'index FAISS'); recommendations.push('📝 Traitez des sessions pour créer l\'index FAISS');
} }
if (faiss.status === 'metadata_invalid') {
recommendations.push('⚠️ Index brut présent mais métadonnées invalides : re-signer ou régénérer les métadonnées FAISS');
}
if (faiss.status === 'metadata_missing') {
recommendations.push('⚠️ Index brut présent sans métadonnées : reconstruire les métadonnées FAISS');
}
if (recommendations.length > 0) { if (recommendations.length > 0) {
recoEl.innerHTML = recommendations.join('<br>'); recoEl.innerHTML = recommendations.join('<br>');
recoEl.style.display = 'block'; recoEl.style.display = 'block';
@@ -2818,10 +2832,29 @@
const detailsEl = document.getElementById('streamServerDetails'); const detailsEl = document.getElementById('streamServerDetails');
try { try {
const data = await fetchJSON(`${STREAMING_BASE}/stats`); const [data, processing] = await Promise.all([
fetchJSON(`${STREAMING_BASE}/stats`),
fetchJSON(`${STREAMING_BASE}/processing/status`).catch(e => ({error: e.message}))
]);
statusEl.innerHTML = '<span style="color:#22c55e;">✅</span>'; const processingReady = processing && processing.processing_ready === true;
statusEl.title = 'Serveur streaming en ligne'; // « En veille » (armé/lazy) ≠ « dégradé » : un worker neuf sans
// session a tous ses composants à false par design (chargement à la
// 1re session), ce n'est PAS une panne. Seul status==='degraded'
// (init tentée et en échec) est une vraie alerte.
const processingArmed = processing && (processing.armed === true || processing.status === 'idle');
const processingDegraded = processing && !processing.error && processing.status === 'degraded';
const statusHint = (processing && processing.status_hint) || '';
statusEl.innerHTML = processingDegraded
? '<span style="color:#f59e0b;">⚠️</span>'
: processingArmed
? '<span style="color:#3b82f6;">⏸️</span>'
: '<span style="color:#22c55e;">✅</span>';
statusEl.title = processingDegraded
? `Streaming en ligne, worker apprentissage dégradé${statusHint ? ' — ' + statusHint : ''}`
: processingArmed
? `Streaming en ligne, worker en veille${statusHint ? ' — ' + statusHint : ''}`
: 'Serveur streaming en ligne';
document.getElementById('streamActiveSessions').textContent = data.active_sessions || 0; document.getElementById('streamActiveSessions').textContent = data.active_sessions || 0;
document.getElementById('streamTotalEvents').textContent = data.total_events || 0; document.getElementById('streamTotalEvents').textContent = data.total_events || 0;
@@ -2837,6 +2870,31 @@
if (data.events_per_second !== undefined) rows.push({label: 'Événements/sec', value: (data.events_per_second || 0).toFixed(2)}); if (data.events_per_second !== undefined) rows.push({label: 'Événements/sec', value: (data.events_per_second || 0).toFixed(2)});
if (data.memory_usage_mb !== undefined) rows.push({label: 'Mémoire utilisée', value: Math.round(data.memory_usage_mb) + ' MB'}); if (data.memory_usage_mb !== undefined) rows.push({label: 'Mémoire utilisée', value: Math.round(data.memory_usage_mb) + ' MB'});
if (data.server_version) rows.push({label: 'Version serveur', value: data.server_version}); if (data.server_version) rows.push({label: 'Version serveur', value: data.server_version});
if (processing && !processing.error) {
const status = processing.status || 'unknown';
const workerIcon = processingReady ? '✅' : (processingArmed ? '⏸️' : '⚠️');
rows.push({
label: 'Worker apprentissage',
value: `${workerIcon} ${status}`
});
rows.push({
label: 'Composants intelligence',
value: processing.components_ready
? 'prêts'
: (processingArmed ? 'en veille (chargés à la 1re session)' : 'non prêts')
});
if (statusHint) {
rows.push({label: 'Détail worker', value: statusHint});
}
if (processing.queue_length !== undefined) {
rows.push({label: 'Queue apprentissage', value: processing.queue_length});
}
if (processing.last_cycle) {
rows.push({label: 'Dernier cycle worker', value: new Date(processing.last_cycle).toLocaleString('fr-FR')});
}
} else if (processing && processing.error) {
rows.push({label: 'Worker apprentissage', value: `${processing.error}`});
}
if (rows.length === 0) { if (rows.length === 0) {
// Afficher les données brutes si les clés attendues ne sont pas présentes // Afficher les données brutes si les clés attendues ne sont pas présentes