Files
rpa_vision_v3/tests/visual/test_grounding_benchmark.py
Dom 99041f0117 feat: pipeline complet MACRO/MÉSO/MICRO — Critic, Observer, Policy, Recovery, Learning, Audit Trail, TaskPlanner
Architecture 3 niveaux implémentée et testée (137 tests unitaires + 21 visuels) :

MÉSO (acteur intelligent) :
- P0 Critic : vérification sémantique post-action via gemma4 (replay_verifier.py)
- P1 Observer : pré-analyse écran avant chaque action (api_stream.py /pre_analyze)
- P2 Grounding/Policy : séparation localisation (grounding.py) et décision (policy.py)
- P3 Recovery : rollback automatique Ctrl+Z/Escape/Alt+F4 (recovery.py)
- P4 Learning : apprentissage runtime avec boucle de consolidation (replay_learner.py)

MACRO (planificateur) :
- TaskPlanner : comprend les ordres en langage naturel via gemma4 (task_planner.py)
- Contexte métier TIM/CIM-10 pour les hôpitaux (domain_context.py)
- Endpoint POST /api/v1/task pour l'exécution par instruction

Traçabilité :
- Audit trail complet avec 18 champs par action (audit_trail.py)
- Endpoints GET /audit/history, /audit/summary, /audit/export (CSV)

Grounding :
- Fix parsing bbox_2d qwen2.5vl (pixels relatifs, pas grille 1000x1000)
- Benchmarks visuels sur captures réelles (3 approches : baseline, zoom, Citrix)
- Reproductibilité validée : variance < 0.008 sur 10 itérations

Sécurité :
- Tokens de production retirés du code source → .env.local
- Secret key aléatoire si non configuré
- Suppression logs qui leakent les tokens

Résultats : 80% de replay (vs 12.5% avant), 100% détection visuelle Citrix JPEG Q20

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 21:03:25 +02:00

420 lines
16 KiB
Python

"""
Benchmark de grounding — 3 approches testées en boucle.
Compare la robustesse et la précision de :
1. Baseline : qwen2.5vl direct
2. Zoom progressif : 2 passes (full → crop → re-grounding)
3. OCR-first : docTR localise le texte, VLM seulement pour les icônes
Chaque approche est testée N fois sur les mêmes cibles.
Mesure : taux de détection, variance des coordonnées, temps moyen.
"""
import base64
import io
import json
import os
import re
import sys
import time
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
_SHOTS_DIR = Path(_ROOT) / "data/training/live_sessions/DESKTOP-ST3VBSD_windows/sess_20260404T135010_cec5c8/shots"
# Nombre d'itérations par test
N_ITERATIONS = 5
def _load_screenshot(name: str) -> str:
path = _SHOTS_DIR / name
if not path.is_file():
pytest.skip(f"Screenshot {name} non disponible")
return base64.b64encode(path.read_bytes()).decode()
def _load_screenshot_pil(name: str):
from PIL import Image
path = _SHOTS_DIR / name
if not path.is_file():
pytest.skip(f"Screenshot {name} non disponible")
return Image.open(path)
# =========================================================================
# Approche 1 : Baseline qwen2.5vl direct
# =========================================================================
def _parse_bbox_2d(content: str) -> Optional[Tuple[int, int, int, int]]:
"""Parser les coordonnées bbox_2d depuis une réponse qwen2.5vl.
qwen2.5vl retourne du JSON :
```json
[{"bbox_2d": [x1, y1, x2, y2], "label": "..."}]
```
Les coordonnées sont en pixels relatifs à l'image envoyée.
"""
# Stratégie 1 : parser le JSON complet (le plus fiable)
# Nettoyer les fences markdown
cleaned = re.sub(r'```(?:json)?\s*', '', content).strip()
try:
data = json.loads(cleaned)
if isinstance(data, list) and len(data) > 0:
bbox = data[0].get("bbox_2d")
if bbox and len(bbox) >= 4:
return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
elif isinstance(data, dict):
bbox = data.get("bbox_2d")
if bbox and len(bbox) >= 4:
return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3]))
except (json.JSONDecodeError, ValueError, TypeError):
pass
# Stratégie 2 : regex ciblé sur "bbox_2d": [x1, y1, x2, y2]
bbox_match = re.search(
r'"bbox_2d"\s*:\s*\[\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\]',
content,
)
if bbox_match:
return tuple(int(bbox_match.group(i)) for i in range(1, 5))
return None
def grounding_baseline(screenshot_b64: str, description: str, img_width: int = 1280, img_height: int = 800) -> Optional[Tuple[float, float]]:
"""Grounding qwen2.5vl direct — retourne (x_pct, y_pct) normalisées.
qwen2.5vl retourne des coordonnées en pixels relatifs à l'image envoyée.
On normalise en divisant par les dimensions de l'image.
"""
import requests
try:
resp = requests.post(
"http://localhost:11434/api/chat",
json={
"model": "qwen2.5vl:7b",
"messages": [{"role": "user", "content": f"Detect '{description}' with a bounding box.", "images": [screenshot_b64]}],
"stream": False,
"options": {"temperature": 0.0, "num_predict": 100},
},
timeout=30,
)
if not resp.ok:
return None
content = resp.json().get("message", {}).get("content", "")
bbox = _parse_bbox_2d(content)
if bbox:
x1, y1, x2, y2 = bbox
# Normaliser par les dimensions de l'image (pixels → 0-1)
cx = (x1 + x2) / 2 / img_width
cy = (y1 + y2) / 2 / img_height
if 0.0 <= cx <= 1.0 and 0.0 <= cy <= 1.0:
return (cx, cy)
except Exception:
pass
return None
# =========================================================================
# Approche 2 : Zoom progressif (2 passes)
# =========================================================================
def grounding_zoom(screenshot_b64: str, description: str, img_width: int = 1280, img_height: int = 800) -> Optional[Tuple[float, float]]:
"""Zoom progressif — passe 1 (full) puis passe 2 (crop 2x)."""
import requests
from PIL import Image
# Passe 1 : grounding sur l'image complète
result1 = grounding_baseline(screenshot_b64, description, img_width, img_height)
if result1 is None:
return None
x1_pct, y1_pct = result1
# Passe 2 : crop autour de la zone trouvée, re-grounding
try:
img_bytes = base64.b64decode(screenshot_b64)
img = Image.open(io.BytesIO(img_bytes))
w, h = img.size
# Crop 2x autour du point trouvé (25% de l'image de chaque côté)
crop_size = 0.25
cx_px = int(x1_pct * w)
cy_px = int(y1_pct * h)
x_left = max(0, cx_px - int(crop_size * w))
y_top = max(0, cy_px - int(crop_size * h))
x_right = min(w, cx_px + int(crop_size * w))
y_bottom = min(h, cy_px + int(crop_size * h))
cropped = img.crop((x_left, y_top, x_right, y_bottom))
crop_w, crop_h = cropped.size
# Encoder le crop en base64
buf = io.BytesIO()
cropped.save(buf, format="JPEG", quality=85)
crop_b64 = base64.b64encode(buf.getvalue()).decode()
# Passe 2 : re-grounding sur le crop (dimensions du crop)
result2 = grounding_baseline(crop_b64, description, crop_w, crop_h)
if result2 is None:
return result1 # Fallback sur passe 1
# Reconvertir les coordonnées du crop vers l'image originale
x2_in_crop, y2_in_crop = result2
x_final = (x_left + x2_in_crop * crop_w) / w
y_final = (y_top + y2_in_crop * crop_h) / h
return (x_final, y_final)
except Exception:
return result1 # Fallback
# =========================================================================
# Approche 3 : OCR-first (docTR)
# =========================================================================
def grounding_ocr_first(screenshot_b64: str, description: str) -> Optional[Tuple[float, float]]:
"""OCR-first — docTR localise le texte, VLM pour les icônes."""
try:
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
# Décoder l'image
img_bytes = base64.b64decode(screenshot_b64)
# OCR
predictor = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True)
doc = DocumentFile.from_images([img_bytes])
result = predictor(doc)
# Chercher le texte dans les résultats OCR
target_lower = description.lower()
best_match = None
best_score = 0
for page in result.pages:
for block in page.blocks:
for line_obj in block.lines:
for word in line_obj.words:
word_text = word.value.lower()
# Match exact ou partiel
if target_lower in word_text or word_text in target_lower:
score = len(word_text) / max(len(target_lower), 1)
if score > best_score:
# Coordonnées normalisées (docTR retourne 0-1)
box = word.geometry # ((x1,y1), (x2,y2))
cx = (box[0][0] + box[1][0]) / 2
cy = (box[0][1] + box[1][1]) / 2
best_match = (cx, cy)
best_score = score
if best_match and best_score > 0.5:
return best_match
except ImportError:
pass # docTR non disponible
except Exception:
pass
# Fallback VLM pour les éléments sans texte
return grounding_baseline(screenshot_b64, description)
# =========================================================================
# Framework de benchmark
# =========================================================================
def run_benchmark(
approach_fn,
approach_name: str,
screenshot_b64: str,
description: str,
n_iterations: int = N_ITERATIONS,
) -> Dict:
"""Exécuter un benchmark : N itérations, mesurer variance et temps."""
results = []
times = []
for i in range(n_iterations):
t_start = time.time()
result = approach_fn(screenshot_b64, description)
elapsed = time.time() - t_start
times.append(elapsed)
if result is not None:
results.append(result)
# Statistiques
n_found = len(results)
detection_rate = n_found / n_iterations
stats = {
"approach": approach_name,
"target": description,
"iterations": n_iterations,
"detection_rate": round(detection_rate, 2),
"avg_time_ms": round(sum(times) / len(times) * 1000, 0),
}
if n_found >= 2:
xs = [r[0] for r in results]
ys = [r[1] for r in results]
stats["x_mean"] = round(sum(xs) / len(xs), 4)
stats["y_mean"] = round(sum(ys) / len(ys), 4)
stats["x_variance"] = round(max(xs) - min(xs), 4)
stats["y_variance"] = round(max(ys) - min(ys), 4)
stats["stable"] = stats["x_variance"] < 0.05 and stats["y_variance"] < 0.05
elif n_found == 1:
stats["x_mean"] = round(results[0][0], 4)
stats["y_mean"] = round(results[0][1], 4)
stats["x_variance"] = 0
stats["y_variance"] = 0
stats["stable"] = True
else:
stats["stable"] = False
return stats
# =========================================================================
# Tests de benchmark comparatif
# =========================================================================
# Cibles à tester (screenshot, description, nom)
_TARGETS = [
("shot_0001_full.png", "Rechercher", "Rechercher taskbar"),
("shot_0001_full.png", "agent_v1", "Dossier agent_v1"),
("shot_0004_full.png", "Fichier", "Menu Fichier"),
("shot_0004_full.png", "Modifier", "Menu Modifier"),
("shot_0004_full.png", "Ceci est un test.txt", "Onglet fichier"),
("shot_0014_full.png", "Rechercher sur Google ou saisir une URL", "Recherche Google"),
("shot_0014_full.png", "Gmail", "Lien Gmail"),
]
@pytest.mark.visual
class TestBenchmarkBaseline:
"""Benchmark de l'approche baseline (qwen2.5vl direct)."""
@pytest.mark.parametrize("shot,desc,name", _TARGETS)
def test_baseline_robustesse(self, shot, desc, name):
screenshot = _load_screenshot(shot)
stats = run_benchmark(grounding_baseline, "baseline", screenshot, desc, N_ITERATIONS)
print(f"\n [{stats['approach']}] {name}:")
print(f" Détection: {stats['detection_rate']*100:.0f}% ({int(stats['detection_rate']*N_ITERATIONS)}/{N_ITERATIONS})")
print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms")
if stats.get("x_mean") is not None:
print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})")
print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}")
print(f" Stable: {'OUI' if stats['stable'] else 'NON'}")
assert stats["detection_rate"] >= 0.6, f"{name}: détection trop faible ({stats['detection_rate']})"
@pytest.mark.visual
class TestBenchmarkZoom:
"""Benchmark de l'approche zoom progressif."""
@pytest.mark.parametrize("shot,desc,name", _TARGETS)
def test_zoom_robustesse(self, shot, desc, name):
screenshot = _load_screenshot(shot)
stats = run_benchmark(grounding_zoom, "zoom", screenshot, desc, N_ITERATIONS)
print(f"\n [{stats['approach']}] {name}:")
print(f" Détection: {stats['detection_rate']*100:.0f}% ({int(stats['detection_rate']*N_ITERATIONS)}/{N_ITERATIONS})")
print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms")
if stats.get("x_mean") is not None:
print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})")
print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}")
print(f" Stable: {'OUI' if stats['stable'] else 'NON'}")
assert stats["detection_rate"] >= 0.6, f"{name}: détection trop faible ({stats['detection_rate']})"
@pytest.mark.visual
class TestBenchmarkCitrix:
"""Benchmark baseline sur images dégradées (simulation Citrix JPEG Q20)."""
def _degrade_citrix(self, screenshot_b64: str) -> str:
"""Simuler compression Citrix (JPEG qualité 20)."""
from PIL import Image
img_bytes = base64.b64decode(screenshot_b64)
img = Image.open(io.BytesIO(img_bytes))
buf = io.BytesIO()
img.save(buf, "JPEG", quality=20)
return base64.b64encode(buf.getvalue()).decode()
@pytest.mark.parametrize("shot,desc,name", _TARGETS)
def test_citrix_robustesse(self, shot, desc, name):
screenshot = _load_screenshot(shot)
citrix = self._degrade_citrix(screenshot)
stats = run_benchmark(grounding_baseline, "citrix_q20", citrix, desc, N_ITERATIONS)
print(f"\n [{stats['approach']}] {name}:")
print(f" Détection: {stats['detection_rate']*100:.0f}%")
print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms")
if stats.get("x_mean") is not None:
print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})")
print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}")
print(f" Stable: {'OUI' if stats['stable'] else 'NON'}")
# Citrix peut être moins fiable — seuil plus bas
assert stats["detection_rate"] >= 0.4, f"{name} Citrix: détection trop faible ({stats['detection_rate']})"
@pytest.mark.visual
class TestRapportComparatif:
"""Génère un rapport comparatif des 3 approches."""
def test_rapport_complet(self):
"""Exécuter les 3 approches sur toutes les cibles et comparer."""
from PIL import Image
all_results = []
for shot, desc, name in _TARGETS:
screenshot = _load_screenshot(shot)
# Citrix
img_bytes = base64.b64decode(screenshot)
img = Image.open(io.BytesIO(img_bytes))
buf = io.BytesIO()
img.save(buf, "JPEG", quality=20)
citrix = base64.b64encode(buf.getvalue()).decode()
for approach_fn, approach_name, img_b64 in [
(grounding_baseline, "baseline", screenshot),
(grounding_zoom, "zoom", screenshot),
(grounding_baseline, "citrix_q20", citrix),
]:
stats = run_benchmark(approach_fn, approach_name, img_b64, desc, 3)
stats["target_name"] = name
all_results.append(stats)
# Rapport
print("\n" + "=" * 80)
print("RAPPORT COMPARATIF — GROUNDING BENCHMARK")
print("=" * 80)
print(f"{'Cible':<25s} {'Approche':<12s} {'Détect.':<8s} {'Temps':<8s} {'Position':<20s} {'Var X':<8s} {'Var Y':<8s} {'Stable'}")
print("-" * 80)
for r in all_results:
pos = f"({r.get('x_mean',0):.3f}, {r.get('y_mean',0):.3f})" if r.get('x_mean') is not None else "N/A"
var_x = f"{r.get('x_variance',0):.4f}" if r.get('x_variance') is not None else "N/A"
var_y = f"{r.get('y_variance',0):.4f}" if r.get('y_variance') is not None else "N/A"
stable = "OUI" if r.get('stable') else "NON"
print(f"{r['target_name']:<25s} {r['approach']:<12s} {r['detection_rate']*100:5.0f}% {r['avg_time_ms']:5.0f}ms {pos:<20s} {var_x:<8s} {var_y:<8s} {stable}")
print("=" * 80)