From 2a51a844b9ccb6365f0e6510359299ce23b658f9 Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 5 May 2026 23:09:43 +0200 Subject: [PATCH] feat(qw2): LoopDetector composite (screen_static + action_repeat + retry) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Module isolé, 3 signaux indépendants : - screen_static : CLIP similarity > 0.99 sur N captures consécutives - action_repeat : N actions identiques (type+coords) - retry_threshold : retried_actions >= seuil Premier signal positif → LoopVerdict.detected=True (caller responsable de la bascule en paused_need_help). Configurable env vars : RPA_LOOP_DETECTOR_ENABLED (kill-switch), RPA_LOOP_SCREEN_STATIC_N/THRESHOLD, RPA_LOOP_ACTION_REPEAT_N, RPA_LOOP_RETRY_THRESHOLD. Tests : 8 cas (chaque signal isolé, kill-switch, embedder absent, exception). Co-Authored-By: Claude Opus 4.7 (1M context) --- agent_v0/server_v1/loop_detector.py | 154 ++++++++++++++++++++++++++++ tests/unit/test_loop_detector.py | 96 +++++++++++++++++ 2 files changed, 250 insertions(+) create mode 100644 agent_v0/server_v1/loop_detector.py create mode 100644 tests/unit/test_loop_detector.py diff --git a/agent_v0/server_v1/loop_detector.py b/agent_v0/server_v1/loop_detector.py new file mode 100644 index 000000000..949ef5303 --- /dev/null +++ b/agent_v0/server_v1/loop_detector.py @@ -0,0 +1,154 @@ +# agent_v0/server_v1/loop_detector.py +"""LoopDetector composite — détection de stagnation de Léa pendant un replay (QW2). + +Trois signaux indépendants : +- screen_static : N captures consécutives avec CLIP similarity > seuil +- action_repeat : N actions consécutives identiques (type + coords) +- retry_threshold : nombre de retries cumulés >= seuil + +Un seul signal positif → verdict.detected=True. Le serveur bascule alors le +replay en paused_need_help avec pause_reason explicite. + +Désactivable via env var RPA_LOOP_DETECTOR_ENABLED=0. +""" + +import logging +import os +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class LoopVerdict: + detected: bool = False + reason: str = "" + signal: str = "" # "screen_static" | "action_repeat" | "retry_threshold" | "" + evidence: Dict[str, Any] = field(default_factory=dict) + + +def _env_int(name: str, default: int) -> int: + try: + return int(os.environ.get(name, default)) + except (TypeError, ValueError): + return default + + +def _env_float(name: str, default: float) -> float: + try: + return float(os.environ.get(name, default)) + except (TypeError, ValueError): + return default + + +def _env_bool_enabled(name: str) -> bool: + val = os.environ.get(name, "1").strip().lower() + return val not in ("0", "false", "no", "off", "") + + +def _cosine_similarity(a, b) -> float: + """Similarité cosine entre deux vecteurs (listes ou np.array). Robuste vecteur nul.""" + import numpy as np + av = np.asarray(a, dtype=np.float32).flatten() + bv = np.asarray(b, dtype=np.float32).flatten() + na, nb = float(np.linalg.norm(av)), float(np.linalg.norm(bv)) + if na < 1e-8 or nb < 1e-8: + return 0.0 + return float(np.dot(av, bv) / (na * nb)) + + +class LoopDetector: + def __init__(self, clip_embedder=None): + self.clip_embedder = clip_embedder + + def evaluate( + self, + state: Dict[str, Any], + screenshots: List[Any], + actions: List[Dict[str, Any]], + ) -> LoopVerdict: + """Évalue les 3 signaux. Retourne le premier déclenché. + + Args: + state: replay_state (utilisé pour retried_actions) + screenshots: anneau d'embeddings CLIP (les N derniers) + actions: anneau des N dernières actions exécutées + """ + if not _env_bool_enabled("RPA_LOOP_DETECTOR_ENABLED"): + return LoopVerdict(detected=False) + + # Signal A : screen_static + verdict = self._check_screen_static(screenshots) + if verdict.detected: + return verdict + + # Signal B : action_repeat + verdict = self._check_action_repeat(actions) + if verdict.detected: + return verdict + + # Signal C : retry_threshold + verdict = self._check_retry_threshold(state) + if verdict.detected: + return verdict + + return LoopVerdict(detected=False) + + def _check_screen_static(self, screenshots: List[Any]) -> LoopVerdict: + n_required = _env_int("RPA_LOOP_SCREEN_STATIC_N", 4) + threshold = _env_float("RPA_LOOP_SCREEN_STATIC_THRESHOLD", 0.99) + + if self.clip_embedder is None or len(screenshots) < n_required: + return LoopVerdict() + + try: + recent = screenshots[-n_required:] + # Embed chaque capture via le CLIP embedder (peut lever) + embeddings = [self.clip_embedder.embed_image(img) for img in recent] + sims = [_cosine_similarity(embeddings[i], embeddings[i + 1]) + for i in range(len(embeddings) - 1)] + min_sim = min(sims) + if min_sim > threshold: + return LoopVerdict( + detected=True, + reason="loop_detected", + signal="screen_static", + evidence={"min_similarity": round(min_sim, 4), + "n_captures": n_required, + "threshold": threshold}, + ) + except Exception as e: + logger.warning("LoopDetector signal_A erreur (%s) — signal inerte ce tick", e) + return LoopVerdict() + + def _check_action_repeat(self, actions: List[Dict[str, Any]]) -> LoopVerdict: + n_required = _env_int("RPA_LOOP_ACTION_REPEAT_N", 3) + if len(actions) < n_required: + return LoopVerdict() + recent = actions[-n_required:] + + def _signature(a: Dict[str, Any]) -> tuple: + return (a.get("type"), a.get("x_pct"), a.get("y_pct")) + + sigs = [_signature(a) for a in recent] + if all(s == sigs[0] for s in sigs): + return LoopVerdict( + detected=True, + reason="loop_detected", + signal="action_repeat", + evidence={"signature": sigs[0], "count": n_required}, + ) + return LoopVerdict() + + def _check_retry_threshold(self, state: Dict[str, Any]) -> LoopVerdict: + threshold = _env_int("RPA_LOOP_RETRY_THRESHOLD", 3) + retried = int(state.get("retried_actions", 0)) + if retried >= threshold: + return LoopVerdict( + detected=True, + reason="loop_detected", + signal="retry_threshold", + evidence={"retried_actions": retried, "threshold": threshold}, + ) + return LoopVerdict() diff --git a/tests/unit/test_loop_detector.py b/tests/unit/test_loop_detector.py new file mode 100644 index 000000000..1405d2f10 --- /dev/null +++ b/tests/unit/test_loop_detector.py @@ -0,0 +1,96 @@ +# tests/unit/test_loop_detector.py +"""Tests unitaires pour LoopDetector composite (QW2).""" +import os +import pytest +from unittest.mock import MagicMock + +from agent_v0.server_v1.loop_detector import LoopDetector, LoopVerdict + + +@pytest.fixture +def detector(): + """LoopDetector avec embedder mocké (signal A toujours dispo).""" + embedder = MagicMock() + # Par défaut : 4 embeddings tous identiques → similarity 1.0 + embedder.embed_image.return_value = [1.0, 0.0, 0.0] + return LoopDetector(clip_embedder=embedder) + + +def _state(retried=0, n_screenshots=0, n_actions=0): + return { + "retried_actions": retried, + "_screenshot_history": [[1.0, 0.0, 0.0]] * n_screenshots, + "_action_history": [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * n_actions, + } + + +def test_screen_static_triggers_when_n_identical_embeddings(detector): + """Signal A : 4 captures identiques (similarity > 0.99) → detected.""" + state = _state(n_screenshots=4) + verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[]) + assert verdict.detected is True + assert verdict.signal == "screen_static" + + +def test_screen_static_skipped_when_history_too_short(detector): + """Signal A : moins de N captures → pas de détection.""" + state = _state(n_screenshots=2) + verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[]) + # Si seul A pourrait déclencher mais skip, et B/C pas remplis : detected=False + assert verdict.detected is False + + +def test_action_repeat_triggers_when_n_identical_actions(detector): + """Signal B : 3 actions consécutives identiques → detected.""" + state = _state(n_actions=3) + verdict = detector.evaluate(state, screenshots=[], actions=state["_action_history"]) + assert verdict.detected is True + assert verdict.signal == "action_repeat" + + +def test_action_repeat_skipped_when_actions_differ(detector): + """Signal B : actions différentes → pas de détection.""" + actions = [ + {"type": "click", "x_pct": 0.1, "y_pct": 0.1}, + {"type": "click", "x_pct": 0.2, "y_pct": 0.2}, + {"type": "click", "x_pct": 0.3, "y_pct": 0.3}, + ] + verdict = detector.evaluate(_state(), screenshots=[], actions=actions) + assert verdict.detected is False + + +def test_retry_threshold_triggers_at_3(detector): + """Signal C : retried_actions >= 3 → detected.""" + state = _state(retried=3) + verdict = detector.evaluate(state, screenshots=[], actions=[]) + assert verdict.detected is True + assert verdict.signal == "retry_threshold" + + +def test_kill_switch_disables_all_signals(monkeypatch, detector): + """Si RPA_LOOP_DETECTOR_ENABLED=0 → toujours detected=False.""" + monkeypatch.setenv("RPA_LOOP_DETECTOR_ENABLED", "0") + state = _state(retried=10, n_screenshots=10, n_actions=10) + verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], + actions=state["_action_history"]) + assert verdict.detected is False + + +def test_embedder_unavailable_skips_signal_A_continues_others(): + """Si CLIP embedder None → signal A skip, B et C continuent.""" + detector = LoopDetector(clip_embedder=None) + # Trigger signal C + state = _state(retried=3) + verdict = detector.evaluate(state, screenshots=[], actions=[]) + assert verdict.detected is True + assert verdict.signal == "retry_threshold" + + +def test_embedder_exception_does_not_crash(detector): + """Si embed_image lève une exception → log + verdict detected=False.""" + detector.clip_embedder.embed_image.side_effect = RuntimeError("CUDA OOM") + state = _state(n_screenshots=4) + # Ne doit PAS lever : signal A devient inerte + verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[]) + # Signal A inerte, B/C pas remplis → detected False + assert verdict.detected is False