feat(qw2): LoopDetector composite (screen_static + action_repeat + retry)
Module isolé, 3 signaux indépendants : - screen_static : CLIP similarity > 0.99 sur N captures consécutives - action_repeat : N actions identiques (type+coords) - retry_threshold : retried_actions >= seuil Premier signal positif → LoopVerdict.detected=True (caller responsable de la bascule en paused_need_help). Configurable env vars : RPA_LOOP_DETECTOR_ENABLED (kill-switch), RPA_LOOP_SCREEN_STATIC_N/THRESHOLD, RPA_LOOP_ACTION_REPEAT_N, RPA_LOOP_RETRY_THRESHOLD. Tests : 8 cas (chaque signal isolé, kill-switch, embedder absent, exception). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
154
agent_v0/server_v1/loop_detector.py
Normal file
154
agent_v0/server_v1/loop_detector.py
Normal file
@@ -0,0 +1,154 @@
|
||||
# agent_v0/server_v1/loop_detector.py
|
||||
"""LoopDetector composite — détection de stagnation de Léa pendant un replay (QW2).
|
||||
|
||||
Trois signaux indépendants :
|
||||
- screen_static : N captures consécutives avec CLIP similarity > seuil
|
||||
- action_repeat : N actions consécutives identiques (type + coords)
|
||||
- retry_threshold : nombre de retries cumulés >= seuil
|
||||
|
||||
Un seul signal positif → verdict.detected=True. Le serveur bascule alors le
|
||||
replay en paused_need_help avec pause_reason explicite.
|
||||
|
||||
Désactivable via env var RPA_LOOP_DETECTOR_ENABLED=0.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoopVerdict:
|
||||
detected: bool = False
|
||||
reason: str = ""
|
||||
signal: str = "" # "screen_static" | "action_repeat" | "retry_threshold" | ""
|
||||
evidence: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
def _env_int(name: str, default: int) -> int:
|
||||
try:
|
||||
return int(os.environ.get(name, default))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
try:
|
||||
return float(os.environ.get(name, default))
|
||||
except (TypeError, ValueError):
|
||||
return default
|
||||
|
||||
|
||||
def _env_bool_enabled(name: str) -> bool:
|
||||
val = os.environ.get(name, "1").strip().lower()
|
||||
return val not in ("0", "false", "no", "off", "")
|
||||
|
||||
|
||||
def _cosine_similarity(a, b) -> float:
|
||||
"""Similarité cosine entre deux vecteurs (listes ou np.array). Robuste vecteur nul."""
|
||||
import numpy as np
|
||||
av = np.asarray(a, dtype=np.float32).flatten()
|
||||
bv = np.asarray(b, dtype=np.float32).flatten()
|
||||
na, nb = float(np.linalg.norm(av)), float(np.linalg.norm(bv))
|
||||
if na < 1e-8 or nb < 1e-8:
|
||||
return 0.0
|
||||
return float(np.dot(av, bv) / (na * nb))
|
||||
|
||||
|
||||
class LoopDetector:
|
||||
def __init__(self, clip_embedder=None):
|
||||
self.clip_embedder = clip_embedder
|
||||
|
||||
def evaluate(
|
||||
self,
|
||||
state: Dict[str, Any],
|
||||
screenshots: List[Any],
|
||||
actions: List[Dict[str, Any]],
|
||||
) -> LoopVerdict:
|
||||
"""Évalue les 3 signaux. Retourne le premier déclenché.
|
||||
|
||||
Args:
|
||||
state: replay_state (utilisé pour retried_actions)
|
||||
screenshots: anneau d'embeddings CLIP (les N derniers)
|
||||
actions: anneau des N dernières actions exécutées
|
||||
"""
|
||||
if not _env_bool_enabled("RPA_LOOP_DETECTOR_ENABLED"):
|
||||
return LoopVerdict(detected=False)
|
||||
|
||||
# Signal A : screen_static
|
||||
verdict = self._check_screen_static(screenshots)
|
||||
if verdict.detected:
|
||||
return verdict
|
||||
|
||||
# Signal B : action_repeat
|
||||
verdict = self._check_action_repeat(actions)
|
||||
if verdict.detected:
|
||||
return verdict
|
||||
|
||||
# Signal C : retry_threshold
|
||||
verdict = self._check_retry_threshold(state)
|
||||
if verdict.detected:
|
||||
return verdict
|
||||
|
||||
return LoopVerdict(detected=False)
|
||||
|
||||
def _check_screen_static(self, screenshots: List[Any]) -> LoopVerdict:
|
||||
n_required = _env_int("RPA_LOOP_SCREEN_STATIC_N", 4)
|
||||
threshold = _env_float("RPA_LOOP_SCREEN_STATIC_THRESHOLD", 0.99)
|
||||
|
||||
if self.clip_embedder is None or len(screenshots) < n_required:
|
||||
return LoopVerdict()
|
||||
|
||||
try:
|
||||
recent = screenshots[-n_required:]
|
||||
# Embed chaque capture via le CLIP embedder (peut lever)
|
||||
embeddings = [self.clip_embedder.embed_image(img) for img in recent]
|
||||
sims = [_cosine_similarity(embeddings[i], embeddings[i + 1])
|
||||
for i in range(len(embeddings) - 1)]
|
||||
min_sim = min(sims)
|
||||
if min_sim > threshold:
|
||||
return LoopVerdict(
|
||||
detected=True,
|
||||
reason="loop_detected",
|
||||
signal="screen_static",
|
||||
evidence={"min_similarity": round(min_sim, 4),
|
||||
"n_captures": n_required,
|
||||
"threshold": threshold},
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning("LoopDetector signal_A erreur (%s) — signal inerte ce tick", e)
|
||||
return LoopVerdict()
|
||||
|
||||
def _check_action_repeat(self, actions: List[Dict[str, Any]]) -> LoopVerdict:
|
||||
n_required = _env_int("RPA_LOOP_ACTION_REPEAT_N", 3)
|
||||
if len(actions) < n_required:
|
||||
return LoopVerdict()
|
||||
recent = actions[-n_required:]
|
||||
|
||||
def _signature(a: Dict[str, Any]) -> tuple:
|
||||
return (a.get("type"), a.get("x_pct"), a.get("y_pct"))
|
||||
|
||||
sigs = [_signature(a) for a in recent]
|
||||
if all(s == sigs[0] for s in sigs):
|
||||
return LoopVerdict(
|
||||
detected=True,
|
||||
reason="loop_detected",
|
||||
signal="action_repeat",
|
||||
evidence={"signature": sigs[0], "count": n_required},
|
||||
)
|
||||
return LoopVerdict()
|
||||
|
||||
def _check_retry_threshold(self, state: Dict[str, Any]) -> LoopVerdict:
|
||||
threshold = _env_int("RPA_LOOP_RETRY_THRESHOLD", 3)
|
||||
retried = int(state.get("retried_actions", 0))
|
||||
if retried >= threshold:
|
||||
return LoopVerdict(
|
||||
detected=True,
|
||||
reason="loop_detected",
|
||||
signal="retry_threshold",
|
||||
evidence={"retried_actions": retried, "threshold": threshold},
|
||||
)
|
||||
return LoopVerdict()
|
||||
96
tests/unit/test_loop_detector.py
Normal file
96
tests/unit/test_loop_detector.py
Normal file
@@ -0,0 +1,96 @@
|
||||
# tests/unit/test_loop_detector.py
|
||||
"""Tests unitaires pour LoopDetector composite (QW2)."""
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from agent_v0.server_v1.loop_detector import LoopDetector, LoopVerdict
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def detector():
|
||||
"""LoopDetector avec embedder mocké (signal A toujours dispo)."""
|
||||
embedder = MagicMock()
|
||||
# Par défaut : 4 embeddings tous identiques → similarity 1.0
|
||||
embedder.embed_image.return_value = [1.0, 0.0, 0.0]
|
||||
return LoopDetector(clip_embedder=embedder)
|
||||
|
||||
|
||||
def _state(retried=0, n_screenshots=0, n_actions=0):
|
||||
return {
|
||||
"retried_actions": retried,
|
||||
"_screenshot_history": [[1.0, 0.0, 0.0]] * n_screenshots,
|
||||
"_action_history": [{"type": "click", "x_pct": 0.5, "y_pct": 0.5}] * n_actions,
|
||||
}
|
||||
|
||||
|
||||
def test_screen_static_triggers_when_n_identical_embeddings(detector):
|
||||
"""Signal A : 4 captures identiques (similarity > 0.99) → detected."""
|
||||
state = _state(n_screenshots=4)
|
||||
verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
|
||||
assert verdict.detected is True
|
||||
assert verdict.signal == "screen_static"
|
||||
|
||||
|
||||
def test_screen_static_skipped_when_history_too_short(detector):
|
||||
"""Signal A : moins de N captures → pas de détection."""
|
||||
state = _state(n_screenshots=2)
|
||||
verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
|
||||
# Si seul A pourrait déclencher mais skip, et B/C pas remplis : detected=False
|
||||
assert verdict.detected is False
|
||||
|
||||
|
||||
def test_action_repeat_triggers_when_n_identical_actions(detector):
|
||||
"""Signal B : 3 actions consécutives identiques → detected."""
|
||||
state = _state(n_actions=3)
|
||||
verdict = detector.evaluate(state, screenshots=[], actions=state["_action_history"])
|
||||
assert verdict.detected is True
|
||||
assert verdict.signal == "action_repeat"
|
||||
|
||||
|
||||
def test_action_repeat_skipped_when_actions_differ(detector):
|
||||
"""Signal B : actions différentes → pas de détection."""
|
||||
actions = [
|
||||
{"type": "click", "x_pct": 0.1, "y_pct": 0.1},
|
||||
{"type": "click", "x_pct": 0.2, "y_pct": 0.2},
|
||||
{"type": "click", "x_pct": 0.3, "y_pct": 0.3},
|
||||
]
|
||||
verdict = detector.evaluate(_state(), screenshots=[], actions=actions)
|
||||
assert verdict.detected is False
|
||||
|
||||
|
||||
def test_retry_threshold_triggers_at_3(detector):
|
||||
"""Signal C : retried_actions >= 3 → detected."""
|
||||
state = _state(retried=3)
|
||||
verdict = detector.evaluate(state, screenshots=[], actions=[])
|
||||
assert verdict.detected is True
|
||||
assert verdict.signal == "retry_threshold"
|
||||
|
||||
|
||||
def test_kill_switch_disables_all_signals(monkeypatch, detector):
|
||||
"""Si RPA_LOOP_DETECTOR_ENABLED=0 → toujours detected=False."""
|
||||
monkeypatch.setenv("RPA_LOOP_DETECTOR_ENABLED", "0")
|
||||
state = _state(retried=10, n_screenshots=10, n_actions=10)
|
||||
verdict = detector.evaluate(state, screenshots=state["_screenshot_history"],
|
||||
actions=state["_action_history"])
|
||||
assert verdict.detected is False
|
||||
|
||||
|
||||
def test_embedder_unavailable_skips_signal_A_continues_others():
|
||||
"""Si CLIP embedder None → signal A skip, B et C continuent."""
|
||||
detector = LoopDetector(clip_embedder=None)
|
||||
# Trigger signal C
|
||||
state = _state(retried=3)
|
||||
verdict = detector.evaluate(state, screenshots=[], actions=[])
|
||||
assert verdict.detected is True
|
||||
assert verdict.signal == "retry_threshold"
|
||||
|
||||
|
||||
def test_embedder_exception_does_not_crash(detector):
|
||||
"""Si embed_image lève une exception → log + verdict detected=False."""
|
||||
detector.clip_embedder.embed_image.side_effect = RuntimeError("CUDA OOM")
|
||||
state = _state(n_screenshots=4)
|
||||
# Ne doit PAS lever : signal A devient inerte
|
||||
verdict = detector.evaluate(state, screenshots=state["_screenshot_history"], actions=[])
|
||||
# Signal A inerte, B/C pas remplis → detected False
|
||||
assert verdict.detected is False
|
||||
Reference in New Issue
Block a user