feat(validator): R1 MVP P0 — OcrRoiChecker + orchestrator (flag OFF default)

Package core/validation/ minimal :
- result.py : Verdict, FailureCategory, ValidationResult
- pixel_diff_checker.py : wrapper de ReplayVerifier.verify_action
- ocr_roi_checker.py : ROI 80px autour du clic, détecte WRONG_APPLICATION
  via SUSPECT_TOKENS (edge/https/explorateur de fichiers/…)
- orchestrator.py : Validator dispatch action_type → checkers + agrégation

Wiring api_stream.py:3646 derrière RPA_VALIDATOR_V2_ENABLED (OFF par défaut).
Si verdict ≠ COMPLETE, override report.success=False et expose failure_category
dans result_entry. Zero régression flag OFF.

Tests :
- tests/unit/test_validator_v2.py : 13 tests (Checkers + Validator + sérialisation)
- tests/integration/test_validator_step10.py : 2 tests reproduisant le bug
  replay_sess_4c38dbb8 / act_raw_6c1432b3 (clic Enregistrer fait basculer
  vers Explorateur de fichiers) — Validator retourne WRONG_APPLICATION

Activation pour test live : RPA_VALIDATOR_V2_ENABLED=true

Cf. docs/recherche/SPEC_VALIDATOR_MATRICE.md, AXE_B2_DEEP_VALIDATOR.md.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-24 17:52:06 +02:00
parent bd100bc538
commit 1b4e64960b
8 changed files with 851 additions and 0 deletions

View File

@@ -0,0 +1,31 @@
"""core.validation — Validator V2 (MVP P0).
Pattern Planner-Actor-Validator (cf. SPEC_VALIDATOR_MATRICE.md).
Donne un verdict structuré (Verdict / FailureCategory) sur l'effet d'une action
en agrégeant plusieurs Checkers spécialisés.
Périmètre P0 :
- PixelDiffChecker (wrapper ReplayVerifier existant)
- OcrRoiChecker (ROI 80px autour du clic, détecte WRONG_APPLICATION = bug step 10)
- Validator orchestrateur (dispatch action_type → checkers + agrégation conf)
Flag d'activation : variable d'env RPA_VALIDATOR_V2_ENABLED=true (OFF par défaut).
"""
from core.validation.result import (
FailureCategory,
ValidationResult,
Verdict,
)
from core.validation.pixel_diff_checker import PixelDiffChecker
from core.validation.ocr_roi_checker import OcrRoiChecker
from core.validation.orchestrator import Validator
__all__ = [
"Validator",
"Verdict",
"FailureCategory",
"ValidationResult",
"PixelDiffChecker",
"OcrRoiChecker",
]

View File

@@ -0,0 +1,171 @@
"""OcrRoiChecker — ROI 80px (ou 120 px pour type) autour du clic.
Détecte WRONG_APPLICATION (bug step 10) si un token suspect navigateur/système
apparaît dans la ROI alors qu'on attendait un label métier.
"""
from __future__ import annotations
import time
import unicodedata
from typing import Any, Callable, Dict, Optional
from core.validation.result import FailureCategory, ValidationResult, Verdict
def _strip_accents(s: str) -> str:
return "".join(
c for c in unicodedata.normalize("NFKD", s) if not unicodedata.combining(c)
).lower().strip()
class OcrRoiChecker:
name = "ocr_roi"
budget_ms = 200.0
SUSPECT_TOKENS = (
"edge", "chrome", "firefox", "mozilla", "opera",
"http", "https", "www.",
".com", ".fr", ".org", ".net", ".html",
"favoris", "favorite", "bookmark",
"barre d'adresse", "address bar",
"nouvel onglet", "new tab",
"securite windows", "windows security",
"user account control", "controle de compte",
"explorateur de fichiers", "file explorer",
)
def __init__(
self,
ocr_fn: Optional[Callable] = None,
radius_px: int = 80,
suspect_min_confidence: float = 0.85,
expected_min_confidence: float = 0.90,
):
self._ocr = ocr_fn # callable(PIL.Image) -> str ; lazy via TitleVerifier si None
self._radius = radius_px
self._suspect_conf = suspect_min_confidence
self._expected_conf = expected_min_confidence
def _ensure_ocr(self) -> Optional[Callable]:
if self._ocr is not None:
return self._ocr
try:
from core.grounding.title_verifier import TitleVerifier
tv = TitleVerifier()
self._ocr = tv._get_ocr()
except Exception:
self._ocr = None
return self._ocr
def check(
self,
action: Dict[str, Any],
result: Dict[str, Any],
screenshot_before: Optional[str],
screenshot_after: Optional[str],
context: Dict[str, Any],
) -> ValidationResult:
t0 = time.time()
target_spec = action.get("target_spec") or {}
expected_text = (
action.get("by_text")
or target_spec.get("by_text")
or context.get("expected_text")
or ""
)
actual_pos = result.get("actual_position") or {}
x_pct = actual_pos.get("x_pct") or action.get("x_pct") or target_spec.get("x_pct")
y_pct = actual_pos.get("y_pct") or action.get("y_pct") or target_spec.get("y_pct")
if not screenshot_after or x_pct is None or y_pct is None or not expected_text:
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.2,
check_used=self.name, elapsed_ms=(time.time() - t0) * 1000,
reasoning="ROI indéfinie (coords ou expected_text manquants)",
)
try:
from agent_v0.server_v1.replay_verifier import ReplayVerifier
img = ReplayVerifier()._load_single_image(screenshot_after)
except Exception as exc:
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.1,
check_used=self.name, elapsed_ms=(time.time() - t0) * 1000,
reasoning=f"Chargement image impossible: {exc}",
)
w, h = img.size
cx, cy = int(float(x_pct) * w), int(float(y_pct) * h)
r = self._radius
bbox = (max(0, cx - r), max(0, cy - r), min(w, cx + r), min(h, cy + r))
roi = img.crop(bbox)
ocr_fn = self._ensure_ocr()
if ocr_fn is None:
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.1,
check_used=self.name, elapsed_ms=(time.time() - t0) * 1000,
reasoning="OCR indisponible (EasyOCR/docTR non chargés)",
)
try:
raw_text = ocr_fn(roi) or ""
except Exception as exc:
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.1,
check_used=self.name, elapsed_ms=(time.time() - t0) * 1000,
reasoning=f"OCR erreur: {exc}",
)
text_norm = _strip_accents(raw_text)
expected_norm = _strip_accents(expected_text)
elapsed_ms = (time.time() - t0) * 1000
evidence = {
"roi_text": raw_text[:200],
"roi_bbox": list(bbox),
"expected": expected_text,
}
# Priorité absolue : token suspect → WRONG_APPLICATION (bug step 10 / dialog perdu)
for suspect in self.SUSPECT_TOKENS:
if suspect in text_norm and suspect not in expected_norm:
return ValidationResult(
verdict=Verdict.TERMINATE, confidence=self._suspect_conf,
check_used=self.name, elapsed_ms=elapsed_ms,
failure_category=FailureCategory.WRONG_APPLICATION,
reasoning=(
f"Token suspect '{suspect}' dans ROI clic "
f"(attendu '{expected_text[:40]}') — cible hors-app"
),
raw_evidence=evidence,
)
# Match exact normalisé
if expected_norm and expected_norm in text_norm:
return ValidationResult(
verdict=Verdict.COMPLETE, confidence=self._expected_conf,
check_used=self.name, elapsed_ms=elapsed_ms,
reasoning=f"Texte '{expected_text[:40]}' trouvé dans ROI",
raw_evidence=evidence,
)
# Match partiel mot-à-mot
toks = [t for t in expected_norm.split() if len(t) > 2]
if toks:
hits = sum(1 for tok in toks if tok in text_norm)
ratio = hits / len(toks)
if ratio >= 0.5:
return ValidationResult(
verdict=Verdict.COMPLETE, confidence=0.6 + 0.3 * ratio,
check_used=self.name, elapsed_ms=elapsed_ms,
reasoning=f"Match partiel {hits}/{len(toks)} tokens",
raw_evidence=evidence,
)
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.4,
check_used=self.name, elapsed_ms=elapsed_ms,
failure_category=FailureCategory.OCR_TEXT_MISSING,
reasoning=f"Texte '{expected_text[:40]}' non trouvé dans ROI",
raw_evidence=evidence,
)

View File

@@ -0,0 +1,79 @@
"""Validator orchestrator — dispatch action_type → checkers + agrégation.
Règles d'agrégation (cf. SPEC_VALIDATOR_MATRICE.md §6.2) :
- Si un checker rend TERMINATE conf ≥ 0.85 → return immédiat
- Si un checker rend COMPLETE conf ≥ accept_confidence → return (max conf)
- Sinon → dernier résultat (CONTINUE), à charge du caller d'escalader/retrier
"""
from __future__ import annotations
import logging
from typing import Any, Dict, List, Optional
from core.validation.result import ValidationResult, Verdict
logger = logging.getLogger(__name__)
class Validator:
def __init__(
self,
checkers: Dict[str, List[Any]],
default_checkers: Optional[List[Any]] = None,
accept_confidence: float = 0.70,
terminate_confidence: float = 0.85,
):
self._checkers = checkers
self._default = default_checkers or []
self._accept = accept_confidence
self._terminate_conf = terminate_confidence
def validate(
self,
action: Dict[str, Any],
result: Dict[str, Any],
screenshot_before: Optional[str] = None,
screenshot_after: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
) -> ValidationResult:
ctx = context or {}
action_type = action.get("type", "")
candidates = self._checkers.get(action_type) or self._default
results: List[ValidationResult] = []
for checker in candidates:
try:
res = checker.check(
action, result, screenshot_before, screenshot_after, ctx
)
except Exception as exc:
logger.warning(
"[VALIDATOR] checker %s a planté: %s",
getattr(checker, "name", checker), exc,
)
continue
results.append(res)
logger.info(
"[VALIDATOR] check=%s verdict=%s conf=%.2f elapsed=%.0fms",
res.check_used, res.verdict.value, res.confidence, res.elapsed_ms,
)
# Règle 1 — TERMINATE haute conf : court-circuit
if res.verdict == Verdict.TERMINATE and res.confidence >= self._terminate_conf:
return res
# Règle 2 — COMPLETE haute conf : court-circuit
if res.verdict == Verdict.COMPLETE and res.confidence >= self._accept:
return res
# Aucun checker concluant : agrégation finale
if results:
# Préférer un COMPLETE si présent, sinon le plus confiant
completes = [r for r in results if r.verdict == Verdict.COMPLETE]
if completes:
return max(completes, key=lambda r: r.confidence)
return max(results, key=lambda r: r.confidence)
return ValidationResult(
verdict=Verdict.CONTINUE, confidence=0.3,
check_used="no_checker", elapsed_ms=0.0,
reasoning=f"Aucun checker pour action_type='{action_type}'",
)

View File

@@ -0,0 +1,68 @@
"""PixelDiffChecker — wrapper de ReplayVerifier.verify_action (~15 ms).
Pré-filtre rapide : si l'écran n'a pas du tout changé, l'action a probablement
échoué. Réutilise l'instance _replay_verifier globale d'api_stream.
"""
from __future__ import annotations
import time
from typing import Any, Dict, Optional
from core.validation.result import FailureCategory, ValidationResult, Verdict
class PixelDiffChecker:
name = "pixel_diff"
budget_ms = 15.0
def __init__(self, replay_verifier):
self._rv = replay_verifier
def check(
self,
action: Dict[str, Any],
result: Dict[str, Any],
screenshot_before: Optional[str],
screenshot_after: Optional[str],
context: Dict[str, Any],
) -> ValidationResult:
t0 = time.time()
try:
pr = self._rv.verify_action(
action=action,
result=result,
screenshot_before=screenshot_before,
screenshot_after=screenshot_after,
)
except Exception as exc:
return ValidationResult(
verdict=Verdict.CONTINUE,
confidence=0.1,
check_used=self.name,
elapsed_ms=(time.time() - t0) * 1000,
reasoning=f"PixelDiff erreur: {exc}",
)
elapsed = (time.time() - t0) * 1000
# Map verdict ReplayVerifier → Verdict Validator
if pr.suggestion == "continue" and pr.changes_detected:
verdict, conf, fc = Verdict.COMPLETE, pr.confidence, None
elif pr.suggestion == "retry":
verdict = Verdict.CONTINUE
conf = max(0.4, pr.confidence - 0.2)
fc = FailureCategory.NO_VISUAL_CHANGE
else:
verdict, conf, fc = Verdict.CONTINUE, 0.3, None
return ValidationResult(
verdict=verdict,
confidence=conf,
check_used=self.name,
elapsed_ms=elapsed,
reasoning=pr.detail,
failure_category=fc,
raw_evidence={
"change_area_pct": pr.change_area_pct,
"local_change_pct": pr.local_change_pct,
},
)

53
core/validation/result.py Normal file
View File

@@ -0,0 +1,53 @@
"""Dataclasses du Validator — Verdict, FailureCategory, ValidationResult.
Cf. SPEC_VALIDATOR_MATRICE.md §1 et AXE_B2_DEEP_VALIDATOR.md §3.1.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional
class Verdict(str, Enum):
"""Trois verdicts possibles (calque Skyvern complete/terminate/continue)."""
COMPLETE = "complete" # l'action a eu l'effet voulu
CONTINUE = "continue" # effet pas encore visible → recheck/wait
TERMINATE = "terminate" # échec irrécupérable → pause supervisée
class FailureCategory(str, Enum):
"""Classification des échecs (restreinte au contexte rpa_vision_v3)."""
WRONG_TARGET = "wrong_target"
WRONG_APPLICATION = "wrong_application" # bug step 10 (clic hors-app)
NO_VISUAL_CHANGE = "no_visual_change"
UNEXPECTED_DIALOG = "unexpected_dialog"
OCR_TEXT_MISSING = "ocr_text_missing"
SCHEMA_INVALID = "schema_invalid"
UI_LOADING = "ui_loading"
UNKNOWN = "unknown"
@dataclass
class ValidationResult:
"""Résultat d'un check. Toujours sérialisable JSON."""
verdict: Verdict
confidence: float
check_used: str
elapsed_ms: float
reasoning: str = ""
failure_category: Optional[FailureCategory] = None
raw_evidence: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"verdict": self.verdict.value,
"confidence": round(self.confidence, 3),
"check_used": self.check_used,
"elapsed_ms": round(self.elapsed_ms, 1),
"reasoning": self.reasoning,
"failure_category": (
self.failure_category.value if self.failure_category else None
),
"raw_evidence": self.raw_evidence,
}