feat(agent_chat): bus feedback Léa 'lea:*' derrière flag LEA_FEEDBACK_BUS
Surface d'observation pour bulles temps réel ChatWindow (J2 démo GHT Sud 95). - Helper _emit_lea(event, payload): no-op silencieux si flag off - Helper _emit_dual(legacy, lea, payload): émet event existant + alias 'lea:*' - Détection paused_need_help dans _poll_replay_progress → lea:paused - Détection sortie de pause → lea:resumed - Timeout étendu (120s→600s) pendant pause supervisée - 12 emits SocketIO existants aliasés (execution_started/progress/completed, copilot_step/step_result/complete) — payloads identiques, zéro régression Flag LEA_FEEDBACK_BUS=0 par défaut. Comportement legacy strictement préservé. 8 tests pytest verts (tests/integration/test_feedback_bus.py). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -46,6 +46,14 @@ LOGS_PATH=logs
|
||||
UPLOADS_PATH=data/training/uploads
|
||||
SESSIONS_PATH=data/training/sessions
|
||||
|
||||
# ============================================================================
|
||||
# Feedback Bus (Léa parle pendant exécution)
|
||||
# ============================================================================
|
||||
# Bus SocketIO unifié 'lea:*' (action_started, action_done, need_confirm, paused).
|
||||
# Désactivé par défaut. Mettre à 1 pour activer les bulles temps réel dans ChatWindow.
|
||||
# Si la connexion bus échoue, l'exécution continue normalement (fail-safe).
|
||||
LEA_FEEDBACK_BUS=0
|
||||
|
||||
# ============================================================================
|
||||
# FAISS
|
||||
# ============================================================================
|
||||
|
||||
@@ -133,6 +133,28 @@ def _streaming_headers() -> dict:
|
||||
headers["Authorization"] = f"Bearer {_STREAMING_API_TOKEN}"
|
||||
return headers
|
||||
|
||||
|
||||
# ============================================================
|
||||
# Feedback Bus — events 'lea:*' temps réel vers ChatWindow
|
||||
# ============================================================
|
||||
LEA_FEEDBACK_BUS = os.environ.get("LEA_FEEDBACK_BUS", "0").lower() in ("1", "true", "yes", "on")
|
||||
|
||||
|
||||
def _emit_lea(event: str, payload: Dict[str, Any]) -> None:
|
||||
"""Émet 'lea:{event}' sur le bus SocketIO. No-op silencieux si flag off ou erreur."""
|
||||
if not LEA_FEEDBACK_BUS:
|
||||
return
|
||||
try:
|
||||
socketio.emit(f"lea:{event}", payload)
|
||||
except Exception:
|
||||
logger.debug("_emit_lea silenced", exc_info=True)
|
||||
|
||||
|
||||
def _emit_dual(legacy_event: str, lea_event: str, payload: Dict[str, Any], **kwargs) -> None:
|
||||
"""Émet l'event legacy (compat dashboard) ET l'alias lea:* (ChatWindow tkinter)."""
|
||||
socketio.emit(legacy_event, payload, **kwargs)
|
||||
_emit_lea(lea_event, payload)
|
||||
|
||||
execution_status = {
|
||||
"running": False,
|
||||
"workflow": None,
|
||||
@@ -623,7 +645,7 @@ def api_execute():
|
||||
}
|
||||
|
||||
# Notifier via WebSocket
|
||||
socketio.emit('execution_started', {
|
||||
_emit_dual('execution_started', 'action_started', {
|
||||
"workflow": match.workflow_name,
|
||||
"params": all_params
|
||||
})
|
||||
@@ -1181,28 +1203,28 @@ def _execute_gesture(gesture):
|
||||
)
|
||||
|
||||
if resp.status_code == 200:
|
||||
socketio.emit('execution_completed', {
|
||||
_emit_dual('execution_completed', 'done', {
|
||||
"workflow": gesture.name,
|
||||
"success": True,
|
||||
"message": f"Geste '{gesture.name}' ({'+'.join(gesture.keys)}) envoyé",
|
||||
})
|
||||
else:
|
||||
error = resp.text[:200]
|
||||
socketio.emit('execution_completed', {
|
||||
_emit_dual('execution_completed', 'done', {
|
||||
"workflow": gesture.name,
|
||||
"success": False,
|
||||
"message": f"Erreur: {error}",
|
||||
})
|
||||
|
||||
except http_requests.ConnectionError:
|
||||
socketio.emit('execution_completed', {
|
||||
_emit_dual('execution_completed', 'done', {
|
||||
"workflow": gesture.name,
|
||||
"success": False,
|
||||
"message": "Serveur de streaming non disponible (port 5005).",
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Gesture execution error: {e}")
|
||||
socketio.emit('execution_completed', {
|
||||
_emit_dual('execution_completed', 'done', {
|
||||
"workflow": gesture.name,
|
||||
"success": False,
|
||||
"message": f"Erreur: {str(e)}",
|
||||
@@ -1730,14 +1752,20 @@ def _poll_replay_progress(replay_id: str, workflow_name: str, total_actions: int
|
||||
"""Suivre la progression d'un replay distant via polling."""
|
||||
import time
|
||||
|
||||
max_wait = 120 # 2 minutes max
|
||||
max_wait_running = 120 # 2 min en exécution active
|
||||
max_wait_paused = 600 # 10 min en pause supervisée (humain peut prendre son temps)
|
||||
poll_interval = 2.0
|
||||
elapsed = 0
|
||||
was_paused = False
|
||||
|
||||
while elapsed < max_wait and execution_status.get("running"):
|
||||
while execution_status.get("running"):
|
||||
time.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
|
||||
cap = max_wait_paused if was_paused else max_wait_running
|
||||
if elapsed >= cap:
|
||||
break
|
||||
|
||||
try:
|
||||
resp = http_requests.get(
|
||||
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay/{replay_id}",
|
||||
@@ -1753,7 +1781,26 @@ def _poll_replay_progress(replay_id: str, workflow_name: str, total_actions: int
|
||||
failed = data.get("failed_actions", 0)
|
||||
progress = int(10 + (completed / max(total_actions, 1)) * 80)
|
||||
|
||||
socketio.emit('execution_progress', {
|
||||
if status == "paused_need_help" and not was_paused:
|
||||
_emit_lea("paused", {
|
||||
"workflow": workflow_name,
|
||||
"replay_id": replay_id,
|
||||
"completed": completed,
|
||||
"total": total_actions,
|
||||
"failed_action": data.get("failed_action"),
|
||||
"reason": data.get("error") or "Action incertaine",
|
||||
})
|
||||
was_paused = True
|
||||
elapsed = 0
|
||||
elif was_paused and status != "paused_need_help":
|
||||
_emit_lea("resumed", {
|
||||
"workflow": workflow_name,
|
||||
"replay_id": replay_id,
|
||||
"status_after": status,
|
||||
})
|
||||
was_paused = False
|
||||
|
||||
_emit_dual('execution_progress', 'action_progress', {
|
||||
"progress": progress,
|
||||
"step": f"Action {completed}/{total_actions} exécutée",
|
||||
"current": completed,
|
||||
@@ -1922,7 +1969,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
|
||||
actions = _build_actions_from_workflow(match, params)
|
||||
if not actions:
|
||||
socketio.emit('copilot_complete', {
|
||||
_emit_dual('copilot_complete', 'done', {
|
||||
"workflow": workflow_name,
|
||||
"status": "error",
|
||||
"message": "Aucune action exécutable dans ce workflow.",
|
||||
@@ -1959,7 +2006,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
break
|
||||
|
||||
copilot_state["status"] = "waiting_approval"
|
||||
socketio.emit('copilot_step', {
|
||||
_emit_dual('copilot_step', 'need_confirm', {
|
||||
"workflow": workflow_name,
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
@@ -1982,7 +2029,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
|
||||
if waited >= max_wait:
|
||||
copilot_state["status"] = "aborted"
|
||||
socketio.emit('copilot_complete', {
|
||||
_emit_dual('copilot_complete', 'done', {
|
||||
"workflow": workflow_name,
|
||||
"status": "timeout",
|
||||
"message": f"Timeout : pas de réponse après {max_wait}s.",
|
||||
@@ -1999,7 +2046,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
elif decision == "skipped":
|
||||
copilot_state["skipped"] += 1
|
||||
logger.info(f"Copilot skip étape {idx + 1}/{total}")
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "skipped",
|
||||
@@ -2034,7 +2081,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
|
||||
if action_success:
|
||||
copilot_state["completed"] += 1
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "completed",
|
||||
@@ -2042,7 +2089,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
})
|
||||
else:
|
||||
copilot_state["failed"] += 1
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "failed",
|
||||
@@ -2051,7 +2098,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
else:
|
||||
error = resp.text[:200]
|
||||
copilot_state["failed"] += 1
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "failed",
|
||||
@@ -2060,7 +2107,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
|
||||
except http_requests.ConnectionError:
|
||||
copilot_state["failed"] += 1
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "failed",
|
||||
@@ -2070,7 +2117,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
except Exception as e:
|
||||
copilot_state["failed"] += 1
|
||||
logger.error(f"Copilot action error: {e}")
|
||||
socketio.emit('copilot_step_result', {
|
||||
_emit_dual('copilot_step_result', 'step_result', {
|
||||
"step_index": idx,
|
||||
"total": total,
|
||||
"status": "failed",
|
||||
@@ -2098,7 +2145,7 @@ def execute_workflow_copilot(match, params: Dict[str, Any]):
|
||||
f"Copilot terminé : {completed} réussies, "
|
||||
f"{skipped} passées, {failed} échouées sur {total} étapes."
|
||||
)
|
||||
socketio.emit('copilot_complete', {
|
||||
_emit_dual('copilot_complete', 'done', {
|
||||
"workflow": workflow_name,
|
||||
"status": "completed" if success else "partial",
|
||||
"message": message,
|
||||
@@ -2175,7 +2222,7 @@ def execute_workflow(match, params):
|
||||
execution_status["progress"] = 10
|
||||
execution_status["message"] = f"Envoyé à l'Agent V1 ({target_session})"
|
||||
|
||||
socketio.emit('execution_progress', {
|
||||
_emit_dual('execution_progress', 'action_progress', {
|
||||
"progress": 10,
|
||||
"step": f"Replay envoyé à l'Agent V1 — {total_actions} actions en attente",
|
||||
"current": 0,
|
||||
@@ -2523,7 +2570,7 @@ def update_progress(progress: int, message: str, current: int, total: int):
|
||||
execution_status["progress"] = progress
|
||||
execution_status["message"] = message
|
||||
|
||||
socketio.emit('execution_progress', {
|
||||
_emit_dual('execution_progress', 'action_progress', {
|
||||
"progress": progress,
|
||||
"step": message,
|
||||
"current": current,
|
||||
@@ -2543,7 +2590,7 @@ def finish_execution(workflow_name: str, success: bool, message: str):
|
||||
if command_history:
|
||||
command_history[-1]["status"] = "completed" if success else "failed"
|
||||
|
||||
socketio.emit('execution_completed', {
|
||||
_emit_dual('execution_completed', 'done', {
|
||||
"workflow": workflow_name,
|
||||
"success": success,
|
||||
"message": message
|
||||
|
||||
102
tests/integration/test_feedback_bus.py
Normal file
102
tests/integration/test_feedback_bus.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Tests du bus feedback Léa (events lea:* via Flask-SocketIO).
|
||||
|
||||
Couvre J2.5 et J2.6 :
|
||||
- Flag LEA_FEEDBACK_BUS=0 → _emit_lea no-op, _emit_dual ne propage que l'event legacy
|
||||
- Flag LEA_FEEDBACK_BUS=1 → _emit_lea propage 'lea:{event}', _emit_dual propage les deux
|
||||
|
||||
Approche : on intercepte socketio.emit avec monkeypatch (plus fiable que test_client
|
||||
de Flask-SocketIO qui ne capte pas toujours les broadcasts hors contexte requête).
|
||||
"""
|
||||
|
||||
import importlib
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
def _reload_app(monkeypatch, flag_value: str):
|
||||
monkeypatch.setenv("LEA_FEEDBACK_BUS", flag_value)
|
||||
import agent_chat.app as app_mod
|
||||
importlib.reload(app_mod)
|
||||
return app_mod
|
||||
|
||||
|
||||
def _capture_emits(monkeypatch, app_mod):
|
||||
calls = []
|
||||
monkeypatch.setattr(
|
||||
app_mod.socketio, "emit",
|
||||
lambda event, payload=None, **kwargs: calls.append((event, payload, kwargs)),
|
||||
)
|
||||
return calls
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_off(monkeypatch):
|
||||
return _reload_app(monkeypatch, "0")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def app_on(monkeypatch):
|
||||
return _reload_app(monkeypatch, "1")
|
||||
|
||||
|
||||
def test_flag_off_by_default(monkeypatch):
|
||||
monkeypatch.delenv("LEA_FEEDBACK_BUS", raising=False)
|
||||
import agent_chat.app as app_mod
|
||||
importlib.reload(app_mod)
|
||||
assert app_mod.LEA_FEEDBACK_BUS is False
|
||||
|
||||
|
||||
def test_flag_accepts_truthy_values(monkeypatch):
|
||||
for truthy in ["1", "true", "True", "yes", "on", "TRUE"]:
|
||||
monkeypatch.setenv("LEA_FEEDBACK_BUS", truthy)
|
||||
import agent_chat.app as app_mod
|
||||
importlib.reload(app_mod)
|
||||
assert app_mod.LEA_FEEDBACK_BUS is True, f"{truthy!r} devrait activer le flag"
|
||||
|
||||
|
||||
def test_emit_lea_noop_when_flag_off(app_off, monkeypatch):
|
||||
calls = _capture_emits(monkeypatch, app_off)
|
||||
app_off._emit_lea("paused", {"workflow": "demo", "reason": "test"})
|
||||
assert calls == []
|
||||
|
||||
|
||||
def test_emit_lea_emits_when_flag_on(app_on, monkeypatch):
|
||||
calls = _capture_emits(monkeypatch, app_on)
|
||||
app_on._emit_lea("paused", {"workflow": "demo", "reason": "test"})
|
||||
assert len(calls) == 1
|
||||
event, payload, _ = calls[0]
|
||||
assert event == "lea:paused"
|
||||
assert payload == {"workflow": "demo", "reason": "test"}
|
||||
|
||||
|
||||
def test_emit_dual_emits_only_legacy_when_flag_off(app_off, monkeypatch):
|
||||
calls = _capture_emits(monkeypatch, app_off)
|
||||
app_off._emit_dual("execution_started", "action_started", {"workflow": "demo"})
|
||||
assert len(calls) == 1
|
||||
assert calls[0][0] == "execution_started"
|
||||
|
||||
|
||||
def test_emit_dual_emits_both_when_flag_on(app_on, monkeypatch):
|
||||
calls = _capture_emits(monkeypatch, app_on)
|
||||
payload = {"workflow": "demo", "params": {"k": "v"}}
|
||||
app_on._emit_dual("execution_started", "action_started", payload)
|
||||
events = [c[0] for c in calls]
|
||||
assert "execution_started" in events
|
||||
assert "lea:action_started" in events
|
||||
assert len(calls) == 2
|
||||
|
||||
|
||||
def test_emit_dual_preserves_kwargs(app_on, monkeypatch):
|
||||
"""broadcast=True et autres kwargs Flask-SocketIO doivent être propagés au legacy."""
|
||||
calls = _capture_emits(monkeypatch, app_on)
|
||||
app_on._emit_dual("execution_cancelled", "cancelled", {}, broadcast=True)
|
||||
legacy_call = next(c for c in calls if c[0] == "execution_cancelled")
|
||||
assert legacy_call[2].get("broadcast") is True
|
||||
|
||||
|
||||
def test_emit_lea_silenced_on_socketio_error(app_on, monkeypatch):
|
||||
"""Une exception dans socketio.emit ne doit jamais remonter."""
|
||||
def boom(*args, **kwargs):
|
||||
raise RuntimeError("socketio fail")
|
||||
monkeypatch.setattr(app_on.socketio, "emit", boom)
|
||||
app_on._emit_lea("paused", {"x": 1})
|
||||
Reference in New Issue
Block a user