From 5e31cdf666aa0fca4a04d5482ffe6389e9ce0cff Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 27 Apr 2026 21:48:38 +0200 Subject: [PATCH] =?UTF-8?q?feat(agent=5Fchat):=20bus=20feedback=20L=C3=A9a?= =?UTF-8?q?=20'lea:*'=20derri=C3=A8re=20flag=20LEA=5FFEEDBACK=5FBUS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Surface d'observation pour bulles temps réel ChatWindow (J2 démo GHT Sud 95). - Helper _emit_lea(event, payload): no-op silencieux si flag off - Helper _emit_dual(legacy, lea, payload): émet event existant + alias 'lea:*' - Détection paused_need_help dans _poll_replay_progress → lea:paused - Détection sortie de pause → lea:resumed - Timeout étendu (120s→600s) pendant pause supervisée - 12 emits SocketIO existants aliasés (execution_started/progress/completed, copilot_step/step_result/complete) — payloads identiques, zéro régression Flag LEA_FEEDBACK_BUS=0 par défaut. Comportement legacy strictement préservé. 8 tests pytest verts (tests/integration/test_feedback_bus.py). Co-Authored-By: Claude Opus 4.7 (1M context) --- .env.example | 8 ++ agent_chat/app.py | 89 ++++++++++++++++----- tests/integration/test_feedback_bus.py | 102 +++++++++++++++++++++++++ 3 files changed, 178 insertions(+), 21 deletions(-) create mode 100644 tests/integration/test_feedback_bus.py diff --git a/.env.example b/.env.example index b1f74f606..45efd76d5 100644 --- a/.env.example +++ b/.env.example @@ -46,6 +46,14 @@ LOGS_PATH=logs UPLOADS_PATH=data/training/uploads SESSIONS_PATH=data/training/sessions +# ============================================================================ +# Feedback Bus (Léa parle pendant exécution) +# ============================================================================ +# Bus SocketIO unifié 'lea:*' (action_started, action_done, need_confirm, paused). +# Désactivé par défaut. Mettre à 1 pour activer les bulles temps réel dans ChatWindow. +# Si la connexion bus échoue, l'exécution continue normalement (fail-safe). +LEA_FEEDBACK_BUS=0 + # ============================================================================ # FAISS # ============================================================================ diff --git a/agent_chat/app.py b/agent_chat/app.py index 70dd96031..0dbbd6666 100644 --- a/agent_chat/app.py +++ b/agent_chat/app.py @@ -133,6 +133,28 @@ def _streaming_headers() -> dict: headers["Authorization"] = f"Bearer {_STREAMING_API_TOKEN}" return headers + +# ============================================================ +# Feedback Bus — events 'lea:*' temps réel vers ChatWindow +# ============================================================ +LEA_FEEDBACK_BUS = os.environ.get("LEA_FEEDBACK_BUS", "0").lower() in ("1", "true", "yes", "on") + + +def _emit_lea(event: str, payload: Dict[str, Any]) -> None: + """Émet 'lea:{event}' sur le bus SocketIO. No-op silencieux si flag off ou erreur.""" + if not LEA_FEEDBACK_BUS: + return + try: + socketio.emit(f"lea:{event}", payload) + except Exception: + logger.debug("_emit_lea silenced", exc_info=True) + + +def _emit_dual(legacy_event: str, lea_event: str, payload: Dict[str, Any], **kwargs) -> None: + """Émet l'event legacy (compat dashboard) ET l'alias lea:* (ChatWindow tkinter).""" + socketio.emit(legacy_event, payload, **kwargs) + _emit_lea(lea_event, payload) + execution_status = { "running": False, "workflow": None, @@ -623,7 +645,7 @@ def api_execute(): } # Notifier via WebSocket - socketio.emit('execution_started', { + _emit_dual('execution_started', 'action_started', { "workflow": match.workflow_name, "params": all_params }) @@ -1181,28 +1203,28 @@ def _execute_gesture(gesture): ) if resp.status_code == 200: - socketio.emit('execution_completed', { + _emit_dual('execution_completed', 'done', { "workflow": gesture.name, "success": True, "message": f"Geste '{gesture.name}' ({'+'.join(gesture.keys)}) envoyé", }) else: error = resp.text[:200] - socketio.emit('execution_completed', { + _emit_dual('execution_completed', 'done', { "workflow": gesture.name, "success": False, "message": f"Erreur: {error}", }) except http_requests.ConnectionError: - socketio.emit('execution_completed', { + _emit_dual('execution_completed', 'done', { "workflow": gesture.name, "success": False, "message": "Serveur de streaming non disponible (port 5005).", }) except Exception as e: logger.error(f"Gesture execution error: {e}") - socketio.emit('execution_completed', { + _emit_dual('execution_completed', 'done', { "workflow": gesture.name, "success": False, "message": f"Erreur: {str(e)}", @@ -1730,14 +1752,20 @@ def _poll_replay_progress(replay_id: str, workflow_name: str, total_actions: int """Suivre la progression d'un replay distant via polling.""" import time - max_wait = 120 # 2 minutes max + max_wait_running = 120 # 2 min en exécution active + max_wait_paused = 600 # 10 min en pause supervisée (humain peut prendre son temps) poll_interval = 2.0 elapsed = 0 + was_paused = False - while elapsed < max_wait and execution_status.get("running"): + while execution_status.get("running"): time.sleep(poll_interval) elapsed += poll_interval + cap = max_wait_paused if was_paused else max_wait_running + if elapsed >= cap: + break + try: resp = http_requests.get( f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay/{replay_id}", @@ -1753,7 +1781,26 @@ def _poll_replay_progress(replay_id: str, workflow_name: str, total_actions: int failed = data.get("failed_actions", 0) progress = int(10 + (completed / max(total_actions, 1)) * 80) - socketio.emit('execution_progress', { + if status == "paused_need_help" and not was_paused: + _emit_lea("paused", { + "workflow": workflow_name, + "replay_id": replay_id, + "completed": completed, + "total": total_actions, + "failed_action": data.get("failed_action"), + "reason": data.get("error") or "Action incertaine", + }) + was_paused = True + elapsed = 0 + elif was_paused and status != "paused_need_help": + _emit_lea("resumed", { + "workflow": workflow_name, + "replay_id": replay_id, + "status_after": status, + }) + was_paused = False + + _emit_dual('execution_progress', 'action_progress', { "progress": progress, "step": f"Action {completed}/{total_actions} exécutée", "current": completed, @@ -1922,7 +1969,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): actions = _build_actions_from_workflow(match, params) if not actions: - socketio.emit('copilot_complete', { + _emit_dual('copilot_complete', 'done', { "workflow": workflow_name, "status": "error", "message": "Aucune action exécutable dans ce workflow.", @@ -1959,7 +2006,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): break copilot_state["status"] = "waiting_approval" - socketio.emit('copilot_step', { + _emit_dual('copilot_step', 'need_confirm', { "workflow": workflow_name, "step_index": idx, "total": total, @@ -1982,7 +2029,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): if waited >= max_wait: copilot_state["status"] = "aborted" - socketio.emit('copilot_complete', { + _emit_dual('copilot_complete', 'done', { "workflow": workflow_name, "status": "timeout", "message": f"Timeout : pas de réponse après {max_wait}s.", @@ -1999,7 +2046,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): elif decision == "skipped": copilot_state["skipped"] += 1 logger.info(f"Copilot skip étape {idx + 1}/{total}") - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "skipped", @@ -2034,7 +2081,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): if action_success: copilot_state["completed"] += 1 - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "completed", @@ -2042,7 +2089,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): }) else: copilot_state["failed"] += 1 - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "failed", @@ -2051,7 +2098,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): else: error = resp.text[:200] copilot_state["failed"] += 1 - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "failed", @@ -2060,7 +2107,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): except http_requests.ConnectionError: copilot_state["failed"] += 1 - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "failed", @@ -2070,7 +2117,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): except Exception as e: copilot_state["failed"] += 1 logger.error(f"Copilot action error: {e}") - socketio.emit('copilot_step_result', { + _emit_dual('copilot_step_result', 'step_result', { "step_index": idx, "total": total, "status": "failed", @@ -2098,7 +2145,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]): f"Copilot terminé : {completed} réussies, " f"{skipped} passées, {failed} échouées sur {total} étapes." ) - socketio.emit('copilot_complete', { + _emit_dual('copilot_complete', 'done', { "workflow": workflow_name, "status": "completed" if success else "partial", "message": message, @@ -2175,7 +2222,7 @@ def execute_workflow(match, params): execution_status["progress"] = 10 execution_status["message"] = f"Envoyé à l'Agent V1 ({target_session})" - socketio.emit('execution_progress', { + _emit_dual('execution_progress', 'action_progress', { "progress": 10, "step": f"Replay envoyé à l'Agent V1 — {total_actions} actions en attente", "current": 0, @@ -2523,7 +2570,7 @@ def update_progress(progress: int, message: str, current: int, total: int): execution_status["progress"] = progress execution_status["message"] = message - socketio.emit('execution_progress', { + _emit_dual('execution_progress', 'action_progress', { "progress": progress, "step": message, "current": current, @@ -2543,7 +2590,7 @@ def finish_execution(workflow_name: str, success: bool, message: str): if command_history: command_history[-1]["status"] = "completed" if success else "failed" - socketio.emit('execution_completed', { + _emit_dual('execution_completed', 'done', { "workflow": workflow_name, "success": success, "message": message diff --git a/tests/integration/test_feedback_bus.py b/tests/integration/test_feedback_bus.py new file mode 100644 index 000000000..c8ee5634c --- /dev/null +++ b/tests/integration/test_feedback_bus.py @@ -0,0 +1,102 @@ +"""Tests du bus feedback Léa (events lea:* via Flask-SocketIO). + +Couvre J2.5 et J2.6 : +- Flag LEA_FEEDBACK_BUS=0 → _emit_lea no-op, _emit_dual ne propage que l'event legacy +- Flag LEA_FEEDBACK_BUS=1 → _emit_lea propage 'lea:{event}', _emit_dual propage les deux + +Approche : on intercepte socketio.emit avec monkeypatch (plus fiable que test_client +de Flask-SocketIO qui ne capte pas toujours les broadcasts hors contexte requête). +""" + +import importlib + +import pytest + + +def _reload_app(monkeypatch, flag_value: str): + monkeypatch.setenv("LEA_FEEDBACK_BUS", flag_value) + import agent_chat.app as app_mod + importlib.reload(app_mod) + return app_mod + + +def _capture_emits(monkeypatch, app_mod): + calls = [] + monkeypatch.setattr( + app_mod.socketio, "emit", + lambda event, payload=None, **kwargs: calls.append((event, payload, kwargs)), + ) + return calls + + +@pytest.fixture +def app_off(monkeypatch): + return _reload_app(monkeypatch, "0") + + +@pytest.fixture +def app_on(monkeypatch): + return _reload_app(monkeypatch, "1") + + +def test_flag_off_by_default(monkeypatch): + monkeypatch.delenv("LEA_FEEDBACK_BUS", raising=False) + import agent_chat.app as app_mod + importlib.reload(app_mod) + assert app_mod.LEA_FEEDBACK_BUS is False + + +def test_flag_accepts_truthy_values(monkeypatch): + for truthy in ["1", "true", "True", "yes", "on", "TRUE"]: + monkeypatch.setenv("LEA_FEEDBACK_BUS", truthy) + import agent_chat.app as app_mod + importlib.reload(app_mod) + assert app_mod.LEA_FEEDBACK_BUS is True, f"{truthy!r} devrait activer le flag" + + +def test_emit_lea_noop_when_flag_off(app_off, monkeypatch): + calls = _capture_emits(monkeypatch, app_off) + app_off._emit_lea("paused", {"workflow": "demo", "reason": "test"}) + assert calls == [] + + +def test_emit_lea_emits_when_flag_on(app_on, monkeypatch): + calls = _capture_emits(monkeypatch, app_on) + app_on._emit_lea("paused", {"workflow": "demo", "reason": "test"}) + assert len(calls) == 1 + event, payload, _ = calls[0] + assert event == "lea:paused" + assert payload == {"workflow": "demo", "reason": "test"} + + +def test_emit_dual_emits_only_legacy_when_flag_off(app_off, monkeypatch): + calls = _capture_emits(monkeypatch, app_off) + app_off._emit_dual("execution_started", "action_started", {"workflow": "demo"}) + assert len(calls) == 1 + assert calls[0][0] == "execution_started" + + +def test_emit_dual_emits_both_when_flag_on(app_on, monkeypatch): + calls = _capture_emits(monkeypatch, app_on) + payload = {"workflow": "demo", "params": {"k": "v"}} + app_on._emit_dual("execution_started", "action_started", payload) + events = [c[0] for c in calls] + assert "execution_started" in events + assert "lea:action_started" in events + assert len(calls) == 2 + + +def test_emit_dual_preserves_kwargs(app_on, monkeypatch): + """broadcast=True et autres kwargs Flask-SocketIO doivent être propagés au legacy.""" + calls = _capture_emits(monkeypatch, app_on) + app_on._emit_dual("execution_cancelled", "cancelled", {}, broadcast=True) + legacy_call = next(c for c in calls if c[0] == "execution_cancelled") + assert legacy_call[2].get("broadcast") is True + + +def test_emit_lea_silenced_on_socketio_error(app_on, monkeypatch): + """Une exception dans socketio.emit ne doit jamais remonter.""" + def boom(*args, **kwargs): + raise RuntimeError("socketio fail") + monkeypatch.setattr(app_on.socketio, "emit", boom) + app_on._emit_lea("paused", {"x": 1})