From ca0b436a619d07d3e99be737b6ef5b4adf012942 Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 5 May 2026 23:25:04 +0200 Subject: [PATCH] feat(qw2): hook LoopDetector dans api_stream + extension replay_state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit replay_state enrichi de _screenshot_history (5 dernières images PIL) et _action_history (5 dernières signatures action). report_action_result : - met à jour les deux anneaux après chaque action - évalue le LoopDetector (singleton lazy avec _clip_embedder serveur) - si detected → bascule paused_need_help avec pause_reason="loop_detected" et bus event lea:loop_detected (signal + evidence) Tous les chemins d'erreur (embedder absent, OOM, exception) loggent et laissent le replay continuer — aucun blocage par la couche détection. Co-Authored-By: Claude Opus 4.7 (1M context) --- agent_v0/server_v1/api_stream.py | 76 +++++++++++++++++++ agent_v0/server_v1/replay_engine.py | 3 + .../integration/test_loop_detector_replay.py | 61 +++++++++++++++ 3 files changed, 140 insertions(+) create mode 100644 tests/integration/test_loop_detector_replay.py diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 32dba9fbf..68a14233b 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -3942,6 +3942,82 @@ async def report_action_result(report: ReplayResultReport): f"— worker VLM autorisé à reprendre" ) + # =================================================================== + # QW2 — LoopDetector : alimentation des anneaux + évaluation + # =================================================================== + # On n'évalue que si le replay est encore "running" — inutile de + # pauser quelque chose de déjà completed/error/paused. + if replay_state["status"] == "running": + # Snapshot image (PIL) dans l'anneau + try: + from PIL import Image + ss_raw = screenshot_after or replay_state.get("last_screenshot") + img = None + if isinstance(ss_raw, str) and ss_raw: + if os.path.isfile(ss_raw): + img = Image.open(ss_raw).copy() # détache du file handle + else: + # Possible base64 — décoder + try: + import base64 + import io as _io + img_bytes = base64.b64decode(ss_raw, validate=False) + img = Image.open(_io.BytesIO(img_bytes)).copy() + except Exception: + img = None + if img is not None: + replay_state.setdefault("_screenshot_history", []).append(img) + replay_state["_screenshot_history"] = replay_state["_screenshot_history"][-5:] + except Exception as e: + logger.debug("LoopDetector: snapshot historique échoué: %s", e) + + # Snapshot signature de l'action courante + try: + _act_pos = report.actual_position or {} + action_sig = { + "type": (original_action or {}).get("type") + or replay_state.get("_last_action_type", ""), + "x_pct": _act_pos.get("x_pct") if isinstance(_act_pos, dict) + else (original_action or {}).get("x_pct"), + "y_pct": _act_pos.get("y_pct") if isinstance(_act_pos, dict) + else (original_action or {}).get("y_pct"), + } + replay_state.setdefault("_action_history", []).append(action_sig) + replay_state["_action_history"] = replay_state["_action_history"][-5:] + except Exception as e: + logger.debug("LoopDetector: snapshot action_sig échoué: %s", e) + + # Évaluation (silencieux si rien) + try: + verdict = _get_loop_detector().evaluate( + replay_state, + screenshots=replay_state.get("_screenshot_history", []), + actions=replay_state.get("_action_history", []), + ) + if verdict.detected: + replay_state["status"] = "paused_need_help" + replay_state["pause_reason"] = "loop_detected" + replay_state["pause_message"] = ( + f"Léa semble bloquée — {verdict.signal} " + f"(détail: {verdict.evidence})" + ) + logger.warning( + "LoopDetector: replay %s mis en pause — signal=%s evidence=%s", + replay_state["replay_id"], verdict.signal, verdict.evidence, + ) + # Bus event d'observabilité (logger pattern QW1) + try: + logger.info( + "[BUS] lea:loop_detected replay=%s signal=%s evidence=%s", + replay_state["replay_id"], + verdict.signal, + verdict.evidence, + ) + except Exception as _e_bus: + logger.debug("emit lea:loop_detected échec: %s", _e_bus) + except Exception as e: + logger.warning("LoopDetector: évaluation échouée (non bloquant): %s", e) + return { "status": "recorded", "action_id": action_id, diff --git a/agent_v0/server_v1/replay_engine.py b/agent_v0/server_v1/replay_engine.py index fc9078f85..37eb755f6 100644 --- a/agent_v0/server_v1/replay_engine.py +++ b/agent_v0/server_v1/replay_engine.py @@ -1381,6 +1381,9 @@ def _create_replay_state( # t2a_decision, etc.). Résolues via templating {{var}} ou {{var.field}} # dans les paramètres des actions suivantes. "variables": {}, + # QW2 — Anneaux d'historique pour LoopDetector (5 derniers max) + "_screenshot_history": [], # images PIL des N derniers heartbeats (LoopDetector embed à chaque tick) + "_action_history": [], # N dernières actions exécutées (signature) } diff --git a/tests/integration/test_loop_detector_replay.py b/tests/integration/test_loop_detector_replay.py new file mode 100644 index 000000000..02974b77d --- /dev/null +++ b/tests/integration/test_loop_detector_replay.py @@ -0,0 +1,61 @@ +# tests/integration/test_loop_detector_replay.py +"""Tests intégration : un replay simulé qui boucle bascule en paused_need_help.""" +import pytest +from unittest.mock import MagicMock + +from agent_v0.server_v1.loop_detector import LoopDetector + + +def test_replay_state_transitions_to_paused_on_screen_static(): + """Cas : 4 screenshots identiques → replay passe à paused_need_help.""" + embedder = MagicMock() + embedder.embed_image.return_value = [1.0, 0.0, 0.0] # constant + detector = LoopDetector(clip_embedder=embedder) + + state = { + "replay_id": "r_test", + "status": "running", + "retried_actions": 0, + "_screenshot_history": ["img1", "img2", "img3", "img4"], # 4 images factices + "_action_history": [ + {"type": "click", "x_pct": 0.1, "y_pct": 0.1}, + {"type": "type", "x_pct": 0.2, "y_pct": 0.2}, + ], + } + verdict = detector.evaluate(state, state["_screenshot_history"], state["_action_history"]) + + # Simuler ce que ferait api_stream après verdict + if verdict.detected: + state["status"] = "paused_need_help" + state["pause_reason"] = verdict.reason + state["pause_message"] = f"signal={verdict.signal}" + + assert state["status"] == "paused_need_help" + assert state["pause_reason"] == "loop_detected" + assert "screen_static" in state["pause_message"] + + +def test_replay_state_transitions_on_action_repeat(): + """Cas : 3 actions identiques → paused_need_help signal action_repeat.""" + detector = LoopDetector(clip_embedder=None) + actions = [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * 3 + state = {"replay_id": "r2", "status": "running", "retried_actions": 0, + "_screenshot_history": [], "_action_history": actions} + + verdict = detector.evaluate(state, [], actions) + assert verdict.detected and verdict.signal == "action_repeat" + + +def test_kill_switch_keeps_replay_running(monkeypatch): + """Avec RPA_LOOP_DETECTOR_ENABLED=0 le replay continue même en boucle.""" + monkeypatch.setenv("RPA_LOOP_DETECTOR_ENABLED", "0") + embedder = MagicMock() + embedder.embed_image.return_value = [1.0, 0.0, 0.0] + detector = LoopDetector(clip_embedder=embedder) + + state = {"retried_actions": 10, + "_screenshot_history": ["img1"] * 10, + "_action_history": [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * 10} + + verdict = detector.evaluate(state, state["_screenshot_history"], state["_action_history"]) + assert verdict.detected is False