""" Benchmark de grounding — 3 approches testées en boucle. Compare la robustesse et la précision de : 1. Baseline : qwen2.5vl direct 2. Zoom progressif : 2 passes (full → crop → re-grounding) 3. OCR-first : docTR localise le texte, VLM seulement pour les icônes Chaque approche est testée N fois sur les mêmes cibles. Mesure : taux de détection, variance des coordonnées, temps moyen. """ import base64 import io import json import os import re import sys import time from pathlib import Path from typing import Dict, List, Optional, Tuple import pytest _ROOT = str(Path(__file__).resolve().parents[2]) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) _SHOTS_DIR = Path(_ROOT) / "data/training/live_sessions/DESKTOP-ST3VBSD_windows/sess_20260404T135010_cec5c8/shots" # Nombre d'itérations par test N_ITERATIONS = 5 def _load_screenshot(name: str) -> str: path = _SHOTS_DIR / name if not path.is_file(): pytest.skip(f"Screenshot {name} non disponible") return base64.b64encode(path.read_bytes()).decode() def _load_screenshot_pil(name: str): from PIL import Image path = _SHOTS_DIR / name if not path.is_file(): pytest.skip(f"Screenshot {name} non disponible") return Image.open(path) # ========================================================================= # Approche 1 : Baseline qwen2.5vl direct # ========================================================================= def _parse_bbox_2d(content: str) -> Optional[Tuple[int, int, int, int]]: """Parser les coordonnées bbox_2d depuis une réponse qwen2.5vl. qwen2.5vl retourne du JSON : ```json [{"bbox_2d": [x1, y1, x2, y2], "label": "..."}] ``` Les coordonnées sont en pixels relatifs à l'image envoyée. """ # Stratégie 1 : parser le JSON complet (le plus fiable) # Nettoyer les fences markdown cleaned = re.sub(r'```(?:json)?\s*', '', content).strip() try: data = json.loads(cleaned) if isinstance(data, list) and len(data) > 0: bbox = data[0].get("bbox_2d") if bbox and len(bbox) >= 4: return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])) elif isinstance(data, dict): bbox = data.get("bbox_2d") if bbox and len(bbox) >= 4: return (int(bbox[0]), int(bbox[1]), int(bbox[2]), int(bbox[3])) except (json.JSONDecodeError, ValueError, TypeError): pass # Stratégie 2 : regex ciblé sur "bbox_2d": [x1, y1, x2, y2] bbox_match = re.search( r'"bbox_2d"\s*:\s*\[\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\]', content, ) if bbox_match: return tuple(int(bbox_match.group(i)) for i in range(1, 5)) return None def grounding_baseline(screenshot_b64: str, description: str, img_width: int = 1280, img_height: int = 800) -> Optional[Tuple[float, float]]: """Grounding qwen2.5vl direct — retourne (x_pct, y_pct) normalisées. qwen2.5vl retourne des coordonnées en pixels relatifs à l'image envoyée. On normalise en divisant par les dimensions de l'image. """ import requests try: resp = requests.post( "http://localhost:11434/api/chat", json={ "model": "qwen2.5vl:7b", "messages": [{"role": "user", "content": f"Detect '{description}' with a bounding box.", "images": [screenshot_b64]}], "stream": False, "options": {"temperature": 0.0, "num_predict": 100}, }, timeout=30, ) if not resp.ok: return None content = resp.json().get("message", {}).get("content", "") bbox = _parse_bbox_2d(content) if bbox: x1, y1, x2, y2 = bbox # Normaliser par les dimensions de l'image (pixels → 0-1) cx = (x1 + x2) / 2 / img_width cy = (y1 + y2) / 2 / img_height if 0.0 <= cx <= 1.0 and 0.0 <= cy <= 1.0: return (cx, cy) except Exception: pass return None # ========================================================================= # Approche 2 : Zoom progressif (2 passes) # ========================================================================= def grounding_zoom(screenshot_b64: str, description: str, img_width: int = 1280, img_height: int = 800) -> Optional[Tuple[float, float]]: """Zoom progressif — passe 1 (full) puis passe 2 (crop 2x).""" import requests from PIL import Image # Passe 1 : grounding sur l'image complète result1 = grounding_baseline(screenshot_b64, description, img_width, img_height) if result1 is None: return None x1_pct, y1_pct = result1 # Passe 2 : crop autour de la zone trouvée, re-grounding try: img_bytes = base64.b64decode(screenshot_b64) img = Image.open(io.BytesIO(img_bytes)) w, h = img.size # Crop 2x autour du point trouvé (25% de l'image de chaque côté) crop_size = 0.25 cx_px = int(x1_pct * w) cy_px = int(y1_pct * h) x_left = max(0, cx_px - int(crop_size * w)) y_top = max(0, cy_px - int(crop_size * h)) x_right = min(w, cx_px + int(crop_size * w)) y_bottom = min(h, cy_px + int(crop_size * h)) cropped = img.crop((x_left, y_top, x_right, y_bottom)) crop_w, crop_h = cropped.size # Encoder le crop en base64 buf = io.BytesIO() cropped.save(buf, format="JPEG", quality=85) crop_b64 = base64.b64encode(buf.getvalue()).decode() # Passe 2 : re-grounding sur le crop (dimensions du crop) result2 = grounding_baseline(crop_b64, description, crop_w, crop_h) if result2 is None: return result1 # Fallback sur passe 1 # Reconvertir les coordonnées du crop vers l'image originale x2_in_crop, y2_in_crop = result2 x_final = (x_left + x2_in_crop * crop_w) / w y_final = (y_top + y2_in_crop * crop_h) / h return (x_final, y_final) except Exception: return result1 # Fallback # ========================================================================= # Approche 3 : OCR-first (docTR) # ========================================================================= def grounding_ocr_first(screenshot_b64: str, description: str) -> Optional[Tuple[float, float]]: """OCR-first — docTR localise le texte, VLM pour les icônes.""" try: from doctr.io import DocumentFile from doctr.models import ocr_predictor # Décoder l'image img_bytes = base64.b64decode(screenshot_b64) # OCR predictor = ocr_predictor(det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True) doc = DocumentFile.from_images([img_bytes]) result = predictor(doc) # Chercher le texte dans les résultats OCR target_lower = description.lower() best_match = None best_score = 0 for page in result.pages: for block in page.blocks: for line_obj in block.lines: for word in line_obj.words: word_text = word.value.lower() # Match exact ou partiel if target_lower in word_text or word_text in target_lower: score = len(word_text) / max(len(target_lower), 1) if score > best_score: # Coordonnées normalisées (docTR retourne 0-1) box = word.geometry # ((x1,y1), (x2,y2)) cx = (box[0][0] + box[1][0]) / 2 cy = (box[0][1] + box[1][1]) / 2 best_match = (cx, cy) best_score = score if best_match and best_score > 0.5: return best_match except ImportError: pass # docTR non disponible except Exception: pass # Fallback VLM pour les éléments sans texte return grounding_baseline(screenshot_b64, description) # ========================================================================= # Framework de benchmark # ========================================================================= def run_benchmark( approach_fn, approach_name: str, screenshot_b64: str, description: str, n_iterations: int = N_ITERATIONS, ) -> Dict: """Exécuter un benchmark : N itérations, mesurer variance et temps.""" results = [] times = [] for i in range(n_iterations): t_start = time.time() result = approach_fn(screenshot_b64, description) elapsed = time.time() - t_start times.append(elapsed) if result is not None: results.append(result) # Statistiques n_found = len(results) detection_rate = n_found / n_iterations stats = { "approach": approach_name, "target": description, "iterations": n_iterations, "detection_rate": round(detection_rate, 2), "avg_time_ms": round(sum(times) / len(times) * 1000, 0), } if n_found >= 2: xs = [r[0] for r in results] ys = [r[1] for r in results] stats["x_mean"] = round(sum(xs) / len(xs), 4) stats["y_mean"] = round(sum(ys) / len(ys), 4) stats["x_variance"] = round(max(xs) - min(xs), 4) stats["y_variance"] = round(max(ys) - min(ys), 4) stats["stable"] = stats["x_variance"] < 0.05 and stats["y_variance"] < 0.05 elif n_found == 1: stats["x_mean"] = round(results[0][0], 4) stats["y_mean"] = round(results[0][1], 4) stats["x_variance"] = 0 stats["y_variance"] = 0 stats["stable"] = True else: stats["stable"] = False return stats # ========================================================================= # Tests de benchmark comparatif # ========================================================================= # Cibles à tester (screenshot, description, nom) _TARGETS = [ ("shot_0001_full.png", "Rechercher", "Rechercher taskbar"), ("shot_0001_full.png", "agent_v1", "Dossier agent_v1"), ("shot_0004_full.png", "Fichier", "Menu Fichier"), ("shot_0004_full.png", "Modifier", "Menu Modifier"), ("shot_0004_full.png", "Ceci est un test.txt", "Onglet fichier"), ("shot_0014_full.png", "Rechercher sur Google ou saisir une URL", "Recherche Google"), ("shot_0014_full.png", "Gmail", "Lien Gmail"), ] @pytest.mark.visual class TestBenchmarkBaseline: """Benchmark de l'approche baseline (qwen2.5vl direct).""" @pytest.mark.parametrize("shot,desc,name", _TARGETS) def test_baseline_robustesse(self, shot, desc, name): screenshot = _load_screenshot(shot) stats = run_benchmark(grounding_baseline, "baseline", screenshot, desc, N_ITERATIONS) print(f"\n [{stats['approach']}] {name}:") print(f" Détection: {stats['detection_rate']*100:.0f}% ({int(stats['detection_rate']*N_ITERATIONS)}/{N_ITERATIONS})") print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms") if stats.get("x_mean") is not None: print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})") print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}") print(f" Stable: {'OUI' if stats['stable'] else 'NON'}") assert stats["detection_rate"] >= 0.6, f"{name}: détection trop faible ({stats['detection_rate']})" @pytest.mark.visual class TestBenchmarkZoom: """Benchmark de l'approche zoom progressif.""" @pytest.mark.parametrize("shot,desc,name", _TARGETS) def test_zoom_robustesse(self, shot, desc, name): screenshot = _load_screenshot(shot) stats = run_benchmark(grounding_zoom, "zoom", screenshot, desc, N_ITERATIONS) print(f"\n [{stats['approach']}] {name}:") print(f" Détection: {stats['detection_rate']*100:.0f}% ({int(stats['detection_rate']*N_ITERATIONS)}/{N_ITERATIONS})") print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms") if stats.get("x_mean") is not None: print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})") print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}") print(f" Stable: {'OUI' if stats['stable'] else 'NON'}") assert stats["detection_rate"] >= 0.6, f"{name}: détection trop faible ({stats['detection_rate']})" @pytest.mark.visual class TestBenchmarkCitrix: """Benchmark baseline sur images dégradées (simulation Citrix JPEG Q20).""" def _degrade_citrix(self, screenshot_b64: str) -> str: """Simuler compression Citrix (JPEG qualité 20).""" from PIL import Image img_bytes = base64.b64decode(screenshot_b64) img = Image.open(io.BytesIO(img_bytes)) buf = io.BytesIO() img.save(buf, "JPEG", quality=20) return base64.b64encode(buf.getvalue()).decode() @pytest.mark.parametrize("shot,desc,name", _TARGETS) def test_citrix_robustesse(self, shot, desc, name): screenshot = _load_screenshot(shot) citrix = self._degrade_citrix(screenshot) stats = run_benchmark(grounding_baseline, "citrix_q20", citrix, desc, N_ITERATIONS) print(f"\n [{stats['approach']}] {name}:") print(f" Détection: {stats['detection_rate']*100:.0f}%") print(f" Temps moyen: {stats['avg_time_ms']:.0f}ms") if stats.get("x_mean") is not None: print(f" Position: ({stats['x_mean']:.3f}, {stats['y_mean']:.3f})") print(f" Variance: X={stats['x_variance']:.4f} Y={stats['y_variance']:.4f}") print(f" Stable: {'OUI' if stats['stable'] else 'NON'}") # Citrix peut être moins fiable — seuil plus bas assert stats["detection_rate"] >= 0.4, f"{name} Citrix: détection trop faible ({stats['detection_rate']})" @pytest.mark.visual class TestRapportComparatif: """Génère un rapport comparatif des 3 approches.""" def test_rapport_complet(self): """Exécuter les 3 approches sur toutes les cibles et comparer.""" from PIL import Image all_results = [] for shot, desc, name in _TARGETS: screenshot = _load_screenshot(shot) # Citrix img_bytes = base64.b64decode(screenshot) img = Image.open(io.BytesIO(img_bytes)) buf = io.BytesIO() img.save(buf, "JPEG", quality=20) citrix = base64.b64encode(buf.getvalue()).decode() for approach_fn, approach_name, img_b64 in [ (grounding_baseline, "baseline", screenshot), (grounding_zoom, "zoom", screenshot), (grounding_baseline, "citrix_q20", citrix), ]: stats = run_benchmark(approach_fn, approach_name, img_b64, desc, 3) stats["target_name"] = name all_results.append(stats) # Rapport print("\n" + "=" * 80) print("RAPPORT COMPARATIF — GROUNDING BENCHMARK") print("=" * 80) print(f"{'Cible':<25s} {'Approche':<12s} {'Détect.':<8s} {'Temps':<8s} {'Position':<20s} {'Var X':<8s} {'Var Y':<8s} {'Stable'}") print("-" * 80) for r in all_results: pos = f"({r.get('x_mean',0):.3f}, {r.get('y_mean',0):.3f})" if r.get('x_mean') is not None else "N/A" var_x = f"{r.get('x_variance',0):.4f}" if r.get('x_variance') is not None else "N/A" var_y = f"{r.get('y_variance',0):.4f}" if r.get('y_variance') is not None else "N/A" stable = "OUI" if r.get('stable') else "NON" print(f"{r['target_name']:<25s} {r['approach']:<12s} {r['detection_rate']*100:5.0f}% {r['avg_time_ms']:5.0f}ms {pos:<20s} {var_x:<8s} {var_y:<8s} {stable}") print("=" * 80)