diff --git a/agent_chat/app.py b/agent_chat/app.py index 0dbbd6666..1d0127fe5 100644 --- a/agent_chat/app.py +++ b/agent_chat/app.py @@ -1683,6 +1683,52 @@ def handle_copilot_abort(): }) +# ============================================================================= +# Bulle paused_need_help — handlers SocketIO depuis ChatWindow (J3.5) +# ============================================================================= + +@socketio.on('lea:replay_resume') +def handle_lea_replay_resume(data): + """Bouton Continuer : relayer le resume vers le streaming server.""" + replay_id = (data or {}).get("replay_id") + if not replay_id: + _emit_lea("resume_acked", {"status": "error", "detail": "replay_id manquant"}) + return + try: + resp = http_requests.post( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay/{replay_id}/resume", + headers=_streaming_headers(), + timeout=5, + ) + if resp.ok: + logger.info(f"Replay {replay_id} resume relayé OK") + _emit_lea("resume_acked", {"replay_id": replay_id, "status": "ok"}) + else: + detail = resp.text[:200] + logger.warning(f"Resume échoué (HTTP {resp.status_code}): {detail}") + _emit_lea("resume_acked", { + "replay_id": replay_id, "status": "error", + "http_status": resp.status_code, "detail": detail, + }) + except Exception as e: + logger.warning(f"Resume relay error: {e}") + _emit_lea("resume_acked", { + "replay_id": replay_id, "status": "error", "detail": str(e), + }) + + +@socketio.on('lea:replay_abort') +def handle_lea_replay_abort(data): + """Bouton Annuler : arrêter le polling local. Le replay côté streaming sera + cleaned up naturellement au prochain replay (cf api_stream._replay_states stale).""" + global execution_status + replay_id = (data or {}).get("replay_id") + execution_status["running"] = False + execution_status["message"] = "Annulé par l'utilisateur" + logger.info(f"Replay {replay_id or '?'} abort par l'utilisateur (paused bubble)") + _emit_lea("abort_acked", {"replay_id": replay_id, "status": "ok"}) + + # ============================================================================= # Exécution de workflow # ============================================================================= diff --git a/agent_v0/agent_v1/network/feedback_bus.py b/agent_v0/agent_v1/network/feedback_bus.py index 05cfd3dea..f4c33879d 100644 --- a/agent_v0/agent_v1/network/feedback_bus.py +++ b/agent_v0/agent_v1/network/feedback_bus.py @@ -122,3 +122,28 @@ class FeedbackBusClient: @property def connected(self) -> bool: return bool(self._sio.connected) + + # ------------------------------------------------------------------ + # Actions utilisateur depuis la bulle paused_need_help (J3.5) + # ------------------------------------------------------------------ + + def resume_replay(self, replay_id: str) -> bool: + """Bouton Continuer : émet 'lea:replay_resume' vers agent_chat. + + Retourne True si l'event a pu être émis, False sinon (déconnecté/erreur). + """ + return self._safe_emit("lea:replay_resume", {"replay_id": replay_id}) + + def abort_replay(self, replay_id: str) -> bool: + """Bouton Annuler : émet 'lea:replay_abort' vers agent_chat.""" + return self._safe_emit("lea:replay_abort", {"replay_id": replay_id}) + + def _safe_emit(self, event: str, payload: dict) -> bool: + try: + if not self._sio.connected: + return False + self._sio.emit(event, payload) + return True + except Exception: + logger.debug("FeedbackBus _safe_emit silenced", exc_info=True) + return False diff --git a/agent_v0/agent_v1/ui/chat_window.py b/agent_v0/agent_v1/ui/chat_window.py index f393f3d72..c35294fb8 100644 --- a/agent_v0/agent_v1/ui/chat_window.py +++ b/agent_v0/agent_v1/ui/chat_window.py @@ -51,6 +51,15 @@ SCROLLBAR_BG = "#E5E7EB" # Fond scrollbar SCROLLBAR_FG = "#9CA3AF" # Curseur scrollbar MSG_BORDER_COLOR = "#D1D5DB" # Bordure subtile des bulles de messages +# Bulle paused_need_help (J3.5) — alerte non bloquante, asset démo majeur +PAUSED_BG = "#FEF3C7" # Jaune pâle +PAUSED_BORDER = "#F59E0B" # Orange ambré +PAUSED_FG = "#92400E" # Brun foncé (lisible sur fond jaune) +PAUSED_BTN_RESUME_BG = "#22C55E" # Vert +PAUSED_BTN_RESUME_HOVER = "#16A34A" +PAUSED_BTN_ABORT_BG = "#9CA3AF" # Gris neutre (pas dramatique) +PAUSED_BTN_ABORT_HOVER = "#6B7280" + # Dimensions — confortables WIN_WIDTH = 600 WIN_HEIGHT = 800 @@ -101,6 +110,7 @@ class ChatWindow: self._ready = threading.Event() self._messages = [] # historique local self._bus: Optional[Any] = None # FeedbackBusClient (J3.3, peut rester None) + self._active_paused_bubble: Optional[Dict[str, Any]] = None # bulle paused active (J3.5) # S'abonner aux changements de l'etat partage if self._shared_state is not None: @@ -660,6 +670,15 @@ class ChatWindow: def _on_lea_event(self, event: str, payload: Dict[str, Any]) -> None: """Callback bus → bulle Lea. Thread-safe : _add_lea_message utilise root.after.""" + # J3.5 : la pause supervisée a sa propre bulle interactive + if event == "lea:paused": + self._add_paused_bubble(payload or {}) + return + if event in ("lea:resumed", "lea:done"): + self._close_active_paused_bubble(reason=event) + # ne pas return — on affiche aussi la bulle plate ci-dessous + + # J3.3 : formatage minimal (J3.4 affinera avec le vocabulaire métier validé Amina) short = event.removeprefix("lea:") if event.startswith("lea:") else event parts = [] for key in ("workflow", "step", "reason", "message", "failed_action"): @@ -669,6 +688,129 @@ class ChatWindow: suffix = " — " + ", ".join(parts) if parts else "" self._add_lea_message(f"[{short}]{suffix}") + # ------------------------------------------------------------------ + # Bulle paused_need_help interactive (J3.5) + # ------------------------------------------------------------------ + + def _add_paused_bubble(self, payload: Dict[str, Any]) -> None: + """Ajouter une bulle paused interactive (asset démo : Léa demande de l'aide).""" + if self._root is None: + return + self._root.after(0, lambda: self._render_paused_bubble(payload)) + + def _render_paused_bubble(self, payload: Dict[str, Any]) -> None: + tk = self._tk + if getattr(self, "_msg_frame", None) is None: + return + + replay_id = str(payload.get("replay_id", "") or "") + workflow = payload.get("workflow", "?") + reason = payload.get("reason") or "Action incertaine — j'ai besoin de votre validation." + completed = payload.get("completed", 0) + total = payload.get("total", "?") + now = datetime.now().strftime("%H:%M") + + container = tk.Frame(self._msg_frame, bg=BG_COLOR) + container.pack(fill=tk.X, padx=MARGIN, pady=6) + + inner = tk.Frame( + container, bg=PAUSED_BG, padx=14, pady=12, + highlightbackground=PAUSED_BORDER, highlightthickness=2, + ) + inner.pack(anchor=tk.W, padx=(0, 50), fill=tk.X) + + tk.Label( + inner, text=f"⏸ Pause supervisée • {now}", + bg=PAUSED_BG, fg=PAUSED_FG, + font=("Segoe UI", 12, "bold"), anchor="w", + ).pack(fill=tk.X, anchor=tk.W) + + tk.Label( + inner, text=reason, bg=PAUSED_BG, fg=PAUSED_FG, + font=FONT_MSG, wraplength=MSG_WRAP_WIDTH - 30, + anchor="w", justify=tk.LEFT, + ).pack(fill=tk.X, anchor=tk.W, pady=(6, 0)) + + tk.Label( + inner, text=f"{workflow} — étape {completed}/{total}", + bg=PAUSED_BG, fg=TIMESTAMP_FG, font=FONT_TIMESTAMP, anchor="w", + ).pack(fill=tk.X, anchor=tk.W, pady=(4, 8)) + + btn_frame = tk.Frame(inner, bg=PAUSED_BG) + btn_frame.pack(fill=tk.X, anchor=tk.W) + + btn_resume = tk.Button( + btn_frame, text="Continuer", + bg=PAUSED_BTN_RESUME_BG, fg="white", font=FONT_QUICK_BTN, + padx=14, pady=4, bd=0, cursor="hand2", + activebackground=PAUSED_BTN_RESUME_HOVER, activeforeground="white", + command=lambda: self._on_paused_resume(replay_id), + ) + btn_resume.pack(side=tk.LEFT, padx=(0, 8)) + + btn_abort = tk.Button( + btn_frame, text="Annuler", + bg=PAUSED_BTN_ABORT_BG, fg="white", font=FONT_QUICK_BTN, + padx=14, pady=4, bd=0, cursor="hand2", + activebackground=PAUSED_BTN_ABORT_HOVER, activeforeground="white", + command=lambda: self._on_paused_abort(replay_id), + ) + btn_abort.pack(side=tk.LEFT) + + self._active_paused_bubble = { + "container": container, "inner": inner, + "btn_resume": btn_resume, "btn_abort": btn_abort, + "replay_id": replay_id, + } + + def _close_active_paused_bubble(self, reason: str) -> None: + if self._active_paused_bubble is None or self._root is None: + return + self._root.after(0, lambda: self._do_close_paused_bubble(reason)) + + def _do_close_paused_bubble(self, reason: str) -> None: + bubble = self._active_paused_bubble + if bubble is None: + return + try: + bubble["btn_resume"].config(state="disabled") + bubble["btn_abort"].config(state="disabled") + label_text = { + "lea:resumed": "→ Reprise", + "lea:done": "→ Terminé", + }.get(reason, f"→ {reason}") + self._tk.Label( + bubble["inner"], text=label_text, + bg=PAUSED_BG, fg=PAUSED_FG, font=FONT_TIMESTAMP, anchor="w", + ).pack(fill="x", anchor="w", pady=(6, 0)) + except Exception: + logger.debug("close paused bubble silenced", exc_info=True) + self._active_paused_bubble = None + + def _on_paused_resume(self, replay_id: str) -> None: + if not replay_id or self._bus is None or not self._bus.connected: + self._add_lea_message("⚠ Bus indisponible — impossible de relancer") + return + self._bus.resume_replay(replay_id) + if self._active_paused_bubble: + try: + self._active_paused_bubble["btn_resume"].config(state="disabled") + self._active_paused_bubble["btn_abort"].config(state="disabled") + except Exception: + pass + + def _on_paused_abort(self, replay_id: str) -> None: + if self._bus is None or not self._bus.connected: + self._add_lea_message("⚠ Bus indisponible — impossible d'annuler") + return + self._bus.abort_replay(replay_id) + if self._active_paused_bubble: + try: + self._active_paused_bubble["btn_resume"].config(state="disabled") + self._active_paused_bubble["btn_abort"].config(state="disabled") + except Exception: + pass + # ====================================================================== # Ajout de messages dans la zone de chat # ====================================================================== diff --git a/tests/integration/test_feedback_bus.py b/tests/integration/test_feedback_bus.py index c8ee5634c..a81867961 100644 --- a/tests/integration/test_feedback_bus.py +++ b/tests/integration/test_feedback_bus.py @@ -100,3 +100,65 @@ def test_emit_lea_silenced_on_socketio_error(app_on, monkeypatch): raise RuntimeError("socketio fail") monkeypatch.setattr(app_on.socketio, "emit", boom) app_on._emit_lea("paused", {"x": 1}) + + +# ---------------------------------------------------------------------- +# J3.5 — Handlers SocketIO depuis ChatWindow +# ---------------------------------------------------------------------- + +class _FakeResponse: + def __init__(self, ok=True, status_code=200, text=""): + self.ok = ok + self.status_code = status_code + self.text = text + + +def test_replay_resume_handler_relays_post_to_streaming(app_on, monkeypatch): + """Le handler 'lea:replay_resume' doit POSTer sur /replay/{id}/resume du streaming.""" + captured = {} + + def fake_post(url, headers=None, **kwargs): + captured["url"] = url + captured["headers"] = headers + return _FakeResponse(ok=True, status_code=200) + + monkeypatch.setattr(app_on.http_requests, "post", fake_post) + emit_calls = _capture_emits(monkeypatch, app_on) + + app_on.handle_lea_replay_resume({"replay_id": "rep_abc123"}) + + assert "rep_abc123" in captured["url"] + assert captured["url"].endswith("/api/v1/traces/stream/replay/rep_abc123/resume") + # Le bus doit propager un ack + acked = [c for c in emit_calls if c[0] == "lea:resume_acked"] + assert len(acked) == 1 + assert acked[0][1]["status"] == "ok" + + +def test_replay_resume_handler_emits_error_on_http_failure(app_on, monkeypatch): + monkeypatch.setattr( + app_on.http_requests, "post", + lambda *a, **k: _FakeResponse(ok=False, status_code=500, text="boom"), + ) + emit_calls = _capture_emits(monkeypatch, app_on) + app_on.handle_lea_replay_resume({"replay_id": "rep_x"}) + acked = [c for c in emit_calls if c[0] == "lea:resume_acked"] + assert acked[0][1]["status"] == "error" + assert acked[0][1]["http_status"] == 500 + + +def test_replay_resume_handler_emits_error_on_no_replay_id(app_on, monkeypatch): + emit_calls = _capture_emits(monkeypatch, app_on) + app_on.handle_lea_replay_resume({}) + acked = [c for c in emit_calls if c[0] == "lea:resume_acked"] + assert acked[0][1]["status"] == "error" + assert "replay_id manquant" in acked[0][1]["detail"] + + +def test_replay_abort_handler_stops_local_execution(app_on, monkeypatch): + app_on.execution_status["running"] = True + emit_calls = _capture_emits(monkeypatch, app_on) + app_on.handle_lea_replay_abort({"replay_id": "rep_y"}) + assert app_on.execution_status["running"] is False + acked = [c for c in emit_calls if c[0] == "lea:abort_acked"] + assert acked[0][1]["status"] == "ok" diff --git a/tests/integration/test_feedback_bus_client.py b/tests/integration/test_feedback_bus_client.py index f69815ed7..8c86190d8 100644 --- a/tests/integration/test_feedback_bus_client.py +++ b/tests/integration/test_feedback_bus_client.py @@ -123,3 +123,42 @@ def test_stop_silenced_on_disconnect_error(): with patch.object(bus._sio, 'disconnect', side_effect=RuntimeError("boom")): bus._sio.connected = True bus.stop() # ne doit pas raise + + +# ---------------------------------------------------------------------- +# J3.5 — Actions utilisateur (resume_replay / abort_replay) +# ---------------------------------------------------------------------- + +def test_resume_replay_emits_when_connected(): + bus = FeedbackBusClient("http://localhost:5004") + bus._sio.connected = True + with patch.object(bus._sio, 'emit') as mock_emit: + ok = bus.resume_replay("rep_abc") + assert ok is True + mock_emit.assert_called_once_with("lea:replay_resume", {"replay_id": "rep_abc"}) + + +def test_resume_replay_returns_false_when_disconnected(): + bus = FeedbackBusClient("http://localhost:5004") + # _sio.connected reste False par défaut + with patch.object(bus._sio, 'emit') as mock_emit: + ok = bus.resume_replay("rep_abc") + assert ok is False + mock_emit.assert_not_called() + + +def test_abort_replay_emits_when_connected(): + bus = FeedbackBusClient("http://localhost:5004") + bus._sio.connected = True + with patch.object(bus._sio, 'emit') as mock_emit: + ok = bus.abort_replay("rep_xyz") + assert ok is True + mock_emit.assert_called_once_with("lea:replay_abort", {"replay_id": "rep_xyz"}) + + +def test_safe_emit_silenced_on_error(): + bus = FeedbackBusClient("http://localhost:5004") + bus._sio.connected = True + with patch.object(bus._sio, 'emit', side_effect=RuntimeError("boom")): + ok = bus.resume_replay("rep_abc") + assert ok is False # erreur avalée silencieusement