diff --git a/tools/anonymize_demo.py b/tools/anonymize_demo.py new file mode 100644 index 000000000..15539f540 --- /dev/null +++ b/tools/anonymize_demo.py @@ -0,0 +1,118 @@ +#!/usr/bin/env python3 +"""ZIP de démo (Amina + Dom) : capture + JSON de ce que Léa récupère. + +Règle d'anonymisation (décision Dom 30/06) : on garde TOUT lisible — interface, +menus, libellés, valeurs cliniques — et on ne masque QUE l'identité directe du +patient, qui se trouve dans le BANDEAU DU HAUT (titre du dossier / onglets). + +- Capture : floutage CIBLÉ de la bande supérieure uniquement (top_frac). Le reste + (menus de navigation, formulaire, valeurs) reste lisible — c'est l'interface + qu'on apprend et ce qui sert à naviguer. +- JSON : vraies valeurs des champs (lisibles), + une section `patient` où nom / + prénom / date de naissance sont remplacés par des tokens. + +Tourne sur le DGX. Le détail (vraies valeurs) n'est pas affiché par le script — +seuls des compteurs et la plage Y floutée le sont (pas de PID dans les logs). +""" +import argparse +import json +import sys +import zipfile +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from core.llm.ocr_extractor import extract_grid_from_image # noqa: E402 +from core.extraction.role_mapper import tokens_from_grid # noqa: E402 + +from PIL import Image, ImageFilter # noqa: E402 + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--image", required=True) + ap.add_argument("--extraction-json", required=True) + ap.add_argument("--out", default="/tmp/demo_lecture_ecran.zip") + ap.add_argument("--top-frac", type=float, default=0.15, + help="fraction haute de l'écran à flouter (bandeau identité patient)") + a = ap.parse_args() + + grid = extract_grid_from_image(a.image) + tokens = tokens_from_grid(grid) + fields = json.loads(Path(a.extraction_json).read_text()) + + img = Image.open(a.image).convert("RGB") + H = img.height + seuil = int(a.top_frac * H) + + # Floutage CIBLÉ : uniquement les tokens texte de la bande supérieure + # (bandeau d'identité patient). Tout le reste reste lisible. + blurred = 0 + ys = [] + PAD = 2 + for t in tokens: + if not t.bbox: + continue + x0, y0, x1, y1 = t.bbox + if y0 < seuil: # token dans le bandeau du haut + xx0 = max(0, x0 - PAD); yy0 = max(0, y0 - PAD) + xx1 = min(img.width, x1 + PAD); yy1 = min(img.height, y1 + PAD) + if xx1 > xx0 and yy1 > yy0: + region = img.crop((xx0, yy0, xx1, yy1)).filter(ImageFilter.GaussianBlur(12)) + img.paste(region, (xx0, yy0)) + blurred += 1 + ys.append(y0) + + # JSON démo : vraies valeurs des champs + identité patient tokenisée + demo = { + "ecran": "Dossier patient — Urgences (DPI réel)", + "note": "Données cliniques réelles. Identité directe du patient remplacée par des tokens ; le reste est ce que Léa lit tel quel.", + "patient": { + "nom": "[nom]", + "prenom": "[prenom]", + "date_naissance": "[date de naissance]", + }, + "champs": [ + {"label": f.get("label"), + "valeur": f.get("value"), + "confiance_ocr": round(float(f.get("confidence", 0)), 2), + "ancre_ocr": bool(f.get("anchored"))} + for f in fields + ], + } + + tmp = Path("/tmp/_demo_build"); tmp.mkdir(exist_ok=True) + for old in tmp.glob("*"): + old.unlink() + cap = tmp / "capture.png" + img.save(cap) + js = tmp / "ce_que_lea_recupere.json" + js.write_text(json.dumps(demo, ensure_ascii=False, indent=2)) + readme = tmp / "LISEZMOI.txt" + readme.write_text( + "DÉMO — Lecture d'écran par Léa (RPA 100% vision)\n" + "================================================\n\n" + "1) capture.png : un vrai écran de dossier patient (Urgences). Tout est\n" + " lisible (interface, menus, libellés, valeurs cliniques) ; SEUL le\n" + " bandeau d'identité du patient (en haut) est flouté.\n\n" + "2) ce_que_lea_recupere.json : ce que Léa extrait de cet écran. L'OCR fournit\n" + " les valeurs exactes (vérité), le modèle de vision identifie le RÔLE de\n" + " chaque champ. Valeurs cliniques réelles ; identité patient = tokens\n" + " [nom]/[prenom]/[date de naissance]. 0 hallucination (valeur = OCR).\n\n" + f" {len(demo['champs'])} champs reconnus sur cet écran.\n" + ) + + with zipfile.ZipFile(a.out, "w", zipfile.ZIP_DEFLATED) as z: + z.write(cap, cap.name) + z.write(js, js.name) + z.write(readme, readme.name) + + plage = f"{min(ys)}..{max(ys)}px" if ys else "—" + print(f"# Hauteur image : {H}px | seuil bandeau = {seuil}px (top {a.top_frac:.0%})") + print(f"# Tokens floutés (bandeau haut) : {blurred} | plage Y : {plage}") + print(f"# Tokens TOTAL : {len(tokens)} (le reste reste lisible)") + print(f"# Champs JSON (vraies valeurs) : {len(demo['champs'])}") + print(f"# ZIP : {a.out}") + + +if __name__ == "__main__": + main() diff --git a/tools/e2e_map_roles.py b/tools/e2e_map_roles.py new file mode 100644 index 000000000..7f50df35d --- /dev/null +++ b/tools/e2e_map_roles.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +"""E2E — valide le MODULE `core.extraction.role_mapper` en conditions réelles. + +Remplace le POC ad hoc (`poc_lecture_ecran.py`) : au lieu de logique inline, on +appelle la brique TESTÉE `map_roles` avec un vrai client vLLM. Prouve la parité +module ↔ POC sur un vrai écran DGX. + +Pipeline : extract_grid_from_image (OCR) → tokens_from_grid → map_roles(client réel). +Sortie masquée (PII) ; détail complet dumpé dans /tmp (reste sur le DGX). +""" +import argparse +import base64 +import json +import re +import sys +import time +from io import BytesIO +from pathlib import Path + +import requests +from PIL import Image + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) +from core.llm.ocr_extractor import extract_grid_from_image # noqa: E402 +from core.extraction.role_mapper import tokens_from_grid, map_roles # noqa: E402 + +VLLM_URL = "http://localhost:8001/v1/chat/completions" +MODEL = "Qwen/Qwen3-VL-4B-Instruct" + + +def _img_data_url(path, max_w=1280): + img = Image.open(path).convert("RGB") + if img.width > max_w: + h = int(img.height * max_w / img.width) + img = img.resize((max_w, h), Image.LANCZOS) + buf = BytesIO() + img.save(buf, format="PNG") + return "data:image/png;base64," + base64.b64encode(buf.getvalue()).decode() + + +def make_client(max_tokens=1500, max_w=1280): + """Construit un client VLM (image_path, prompt) -> texte, branché sur vLLM:8001.""" + def client(image_path, prompt): + body = { + "model": MODEL, + "messages": [{"role": "user", "content": [ + {"type": "image_url", "image_url": {"url": _img_data_url(image_path, max_w)}}, + {"type": "text", "text": prompt}, + ]}], + "temperature": 0.0, + "max_tokens": max_tokens, + "chat_template_kwargs": {"enable_thinking": False}, + } + r = requests.post(VLLM_URL, json=body, timeout=120) + if r.status_code != 200: + raise RuntimeError(f"vLLM {r.status_code}: {r.text[:300]}") + return r.json()["choices"][0]["message"]["content"] + return client + + +def _mask(v): + v = str(v) + if not v: + return "" + if re.fullmatch(r"[\d .,/:%€-]+", v): + k = "num/date" + elif len(v.split()) >= 4: + k = "texte" + else: + k = "court" + return f"<{k}:{len(v)}c>" + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--extract", required=True) + ap.add_argument("--roles", default="", help="rôles attendus, séparés par des virgules (mode guidé)") + a = ap.parse_args() + roles = [r.strip() for r in a.roles.split(",") if r.strip()] or None + + t0 = time.time() + grid = extract_grid_from_image(a.extract) + t_ocr = time.time() - t0 + tokens = tokens_from_grid(grid) + confs = sorted(t.confidence for t in tokens) + med = confs[len(confs) // 2] if confs else 0.0 + + client = make_client() + t1 = time.time() + fields = map_roles(a.extract, tokens, client, roles) + t_vlm = time.time() - t1 + + out = Path(f"/tmp/e2e_{Path(a.extract).stem}.json") + out.write_text(json.dumps( + [{"label": f.label, "value": f.value, "confidence": f.confidence, + "anchored": f.anchored, "value_ids": f.value_ids} for f in fields], + ensure_ascii=False, indent=2)) + + anc = sum(1 for f in fields if f.anchored) + print(f"# Image : {Path(a.extract).name}") + print(f"# Mode : {'guidé ' + str(roles) if roles else 'libre'}") + print(f"# OCR : {len(tokens)} tokens, conf médiane {med:.2f}, {t_ocr:.1f}s") + print(f"# VLM : {t_vlm:.1f}s | via map_roles (module testé)") + print(f"# Champs : {len(fields)} (ancrés OCR: {anc})") + for f in fields: + flag = "·" if f.anchored else "∅" + print(f" {flag} {str(f.label)[:28]:28s} = {_mask(f.value)}") + print(f"# Ancrage strict : {anc}/{len(fields)} | détail PII -> {out} (DGX, NE PAS rapatrier)") + + +if __name__ == "__main__": + main() diff --git a/tools/enrichment_eval_multi.py b/tools/enrichment_eval_multi.py new file mode 100644 index 000000000..b716c3a75 --- /dev/null +++ b/tools/enrichment_eval_multi.py @@ -0,0 +1,141 @@ +#!/usr/bin/env python3 +"""Éval ENRICHISSEMENT multi-modèles : qualité de la description d'un élément UI. + +Au runtime, le worker enrichit chaque action avec un `target_spec` (by_text, +by_role, vlm_description) via le VLM. On mesure ici cette capacité : on montre +un crop autour de la cible et on demande au modèle de l'identifier. On compare +le `by_text` généré au texte réel (ground-truth OCR du cas). + +Dimensions : exactitude by_text, plausibilité by_role, latence. + +Usage: + venv_v3/bin/python3 tools/enrichment_eval_multi.py \ + --cases benchmarks/computer_use/cases/leabench_easily_clean_2026-06-12.jsonl \ + --models gemma4:26b qwen2.5vl:7b-rpa qwen3-vl:8b \ + --out benchmarks/computer_use/predictions/easily_enrich +""" +import argparse +import base64 +import io +import json +import os +import re +import time +import unicodedata + +import requests +from PIL import Image + +ROLES = ("bouton", "onglet", "champ", "lien", "liste", "menu", "icône", "texte", "case") + + +def norm(s): + s = unicodedata.normalize("NFKD", s or "") + return "".join(c for c in s if not unicodedata.combining(c)).lower().strip() + + +def crop_b64(path, xp, yp, half_w=0.10, half_h=0.045): + im = Image.open(path).convert("RGB"); W, H = im.size + cx, cy = xp * W, yp * H + box = (max(0, int(cx - half_w * W)), max(0, int(cy - half_h * H)), + min(W, int(cx + half_w * W)), min(H, int(cy + half_h * H))) + crop = im.crop(box) + if max(crop.size) < 320: # upscale pour lisibilité + r = 320 / max(crop.size) + crop = crop.resize((int(crop.width * r), int(crop.height * r)), Image.LANCZOS) + buf = io.BytesIO(); crop.save(buf, format="JPEG", quality=92) + return base64.b64encode(buf.getvalue()).decode() + + +def call(endpoint, model, b64, timeout): + prompt = ("Voici un gros plan d'un élément d'interface (logiciel médical). " + "Identifie-le. Réponds UNIQUEMENT par un JSON: " + '{"by_text": "", "by_role": ' + '"bouton|onglet|champ|lien|liste|menu|icône|texte|case", ' + '"description": ""}.') + # think=False OBLIGATOIRE pour gemma4 même en enrichissement : avec le mode + # thinking + format JSON, Ollama (>=0.20) renvoie des "tokens vides" → by_text + # manquant (vérifié : 10/18 vides avec thinking). Doc 2026-06-08. + payload = {"model": model, "stream": False, "format": "json", + "think": False, + "messages": [{"role": "user", "content": prompt, "images": [b64]}], + "options": {"temperature": 0.0}} + t0 = time.time() + r = requests.post(f"{endpoint}/api/chat", json=payload, timeout=timeout) + dt = time.time() - t0 + r.raise_for_status() + return r.json().get("message", {}).get("content", ""), dt + + +def text_score(gen, real): + """0..1 : correspondance du by_text généré au texte réel.""" + g, t = norm(gen), norm(real) + if not t: + return None + if not g: + return 0.0 + if g == t: + return 1.0 + if t in g or g in t: + return min(len(g), len(t)) / max(len(g), len(t)) + gt, tt = set(g.split()), set(t.split()) + inter = gt & tt + return round(len(inter) / max(1, len(tt)), 2) if inter else 0.0 + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--cases", required=True) + ap.add_argument("--models", nargs="+", required=True) + ap.add_argument("--endpoint", default="http://127.0.0.1:11434") + ap.add_argument("--timeout", type=int, default=120) + ap.add_argument("--out", required=True) + args = ap.parse_args() + + # ne garder que les cas avec un texte cible réel exploitable + cases = [c for c in (json.loads(l) for l in open(args.cases)) + if c["task"]["target_text"] and len(c["task"]["target_text"]) >= 3 + and any(ch.isalpha() for ch in c["task"]["target_text"])] + os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True) + summary = [] + for model in args.models: + rows = [] + print(f"\n===== ENRICH {model} =====", flush=True) + for c in cases: + reg = c["expectation"]["click_region"] + b64 = crop_b64(c["screenshot_path"], reg["x_pct"], reg["y_pct"]) + try: + text, dt = call(args.endpoint, model, b64, args.timeout) + j = json.loads(re.search(r"\{.*\}", text, re.S).group(0)) + by_text = j.get("by_text", ""); by_role = norm(j.get("by_role", "")) + except Exception as e: + text, dt, by_text, by_role = f"ERR:{e}", None, "", "" + sc = text_score(by_text, c["task"]["target_text"]) + role_ok = by_role in ROLES + rows.append({"case_id": c["case_id"], "model": model, + "real": c["task"]["target_text"], "gen_by_text": by_text, + "by_role": by_role, "text_score": sc, "role_valid": role_ok, + "latency_s": round(dt, 2) if dt else None}) + print(f" réel={c['task']['target_text'][:18]!r:22s} gén={by_text[:22]!r:26s} " + f"score={sc} role={by_role[:8]}", flush=True) + with open(f"{args.out}_{model.replace(':','_').replace('/','_')}.jsonl", "w") as f: + for r in rows: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + scored = [r["text_score"] for r in rows if r["text_score"] is not None] + lats = [r["latency_s"] for r in rows if r["latency_s"]] + summary.append({"model": model, "n": len(rows), + "text_acc_mean": round(sum(scored) / len(scored), 3) if scored else None, + "exact": sum(1 for s in scored if s == 1.0), + "role_valid": sum(r["role_valid"] for r in rows), + "latency_med": round(sorted(lats)[len(lats)//2], 1) if lats else None}) + print("\n\n========== SYNTHÈSE ENRICHISSEMENT ==========") + print(f"{'modèle':22s} {'by_text_acc':>11} {'exact':>6} {'role_ok':>8} {'lat_méd':>8}") + for s in summary: + print(f"{s['model']:22s} {str(s['text_acc_mean']):>11} {s['exact']:>6} " + f"{s['role_valid']:>8} {str(s['latency_med'])+'s':>8}") + with open(f"{args.out}_summary.json", "w") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + +if __name__ == "__main__": + main() diff --git a/tools/extract_easily_bench_cases.py b/tools/extract_easily_bench_cases.py new file mode 100644 index 000000000..4ca924e95 --- /dev/null +++ b/tools/extract_easily_bench_cases.py @@ -0,0 +1,158 @@ +#!/usr/bin/env python3 +"""Extracteur de cas LeaBench à partir des replay_failures Easily. + +Ground-truth obtenue par OCR (docTR) : on localise le `by_text` du target_spec +sur le screenshot réel → centre de sa bbox = (x_pct, y_pct). Les cas sans +`by_text` exploitable (ou texte introuvable) sont marqués `needs_human_check` +pour validation/annotation visuelle. + +Usage: + venv_v3/bin/python3 tools/extract_easily_bench_cases.py \ + --files /tmp/ez_files.txt \ + --out benchmarks/computer_use/cases/leabench_easily_2026-06-12.jsonl +""" +import argparse +import json +import os +import unicodedata + + +def norm(s: str) -> str: + s = unicodedata.normalize("NFKD", s or "") + s = "".join(c for c in s if not unicodedata.combining(c)) + return s.lower().strip() + + +def ocr_lines(model, shot): + """Retourne [(texte_ligne, (cx, cy))] en coords normalisées 0-1.""" + from doctr.io import DocumentFile + doc = DocumentFile.from_images(shot) + res = model(doc) + out = [] + for page in res.pages: + for block in page.blocks: + for line in block.lines: + txt = " ".join(w.value for w in line.words) + xs, ys = [], [] + for w in line.words: + (x0, y0), (x1, y1) = w.geometry + xs += [x0, x1] + ys += [y0, y1] + if not xs: + continue + cx = (min(xs) + max(xs)) / 2.0 + cy = (min(ys) + max(ys)) / 2.0 + out.append((txt, (cx, cy))) + # aussi par mot pour cibles courtes + for w in line.words: + (x0, y0), (x1, y1) = w.geometry + out.append((w.value, ((x0 + x1) / 2, (y0 + y1) / 2))) + return out + + +def best_match(bytext, lines): + """Trouve la ligne/mot OCR couvrant le mieux by_text. Retourne (cx,cy,score).""" + nb = norm(bytext) + if not nb: + return None + best = None + for txt, (cx, cy) in lines: + nt = norm(txt) + if not nt: + continue + if nb == nt: + score = 1.0 + elif nb in nt or nt in nb: + score = min(len(nb), len(nt)) / max(len(nb), len(nt)) + else: + # recouvrement de tokens + tb, tt = set(nb.split()), set(nt.split()) + inter = tb & tt + score = len(inter) / max(1, len(tb)) * 0.8 if inter else 0.0 + if best is None or score > best[2]: + best = (round(cx, 4), round(cy, 4), round(score, 3)) + return best + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--files", default="/tmp/ez_files.txt") + ap.add_argument("--out", required=True) + args = ap.parse_args() + + from doctr.models import ocr_predictor + print("chargement docTR…", flush=True) + model = ocr_predictor(pretrained=True) + + files = [l.strip() for l in open(args.files) if l.strip()] + cases, report = [], [] + + for fp in files: + sess = os.path.basename(os.path.dirname(fp)) + for line in open(fp): + try: + o = json.loads(line) + except Exception: + continue + ts = o.get("target_spec", {}) or {} + shot = o.get("screenshot_path", "") + if not shot or not os.path.exists(shot): + continue + bytext = (ts.get("by_text") or "").strip() + vlmd = (ts.get("vlm_description") or "").strip() + m = None + try: + if bytext: + m = best_match(bytext, ocr_lines(model, shot)) + except Exception as e: + report.append((sess, os.path.basename(shot), "ocr_err", str(e)[:40])) + if m and m[2] >= 0.6: + x_pct, y_pct, score = m + needs = False + else: + x_pct, y_pct, score = 0.5, 0.5, (m[2] if m else 0.0) + needs = True + base = os.path.splitext(os.path.basename(shot))[0] + cases.append({ + "case_id": f"easily_{sess}_{base}"[:70], + "screenshot_path": shot, + "task": { + "intent": (o.get("intent") or "").strip() or ( + f"cliquer sur « {bytext} »" if bytext else "cliquer sur la cible"), + "target_text": bytext, + "current_window": "Easily Assure (maquette POC)", + "expected_next_window": "", + "question": ( + f"La cible « {bytext} » est-elle visible ? Clique uniquement dessus." + if bytext else f"Cible : {vlmd[:120]}. Clique uniquement dessus."), + }, + "expectation": { + "decision": "click", + "click_region": {"x_pct": x_pct, "y_pct": y_pct, "radius_pct": 0.06}, + "accepted_reasons": ["ocr_text_match"], + }, + "metadata": { + "source": "easily_replay_failure", + "session": sess, + "ocr_match_score": score, + "by_text_source": ts.get("by_text_source"), + "needs_human_check": needs, + }, + }) + flag = " ⚠CHECK" if needs else "" + report.append((sess, os.path.basename(shot), f"score={score}", + f"({x_pct},{y_pct}) text={bytext!r}{flag}")) + + os.makedirs(os.path.dirname(args.out), exist_ok=True) + with open(args.out, "w") as f: + for c in cases: + f.write(json.dumps(c, ensure_ascii=False) + "\n") + low = sum(1 for c in cases if c["metadata"]["needs_human_check"]) + print(f"\n{len(cases)} cas écrits → {args.out}") + print(f" auto (OCR ok): {len(cases)-low} | à valider visuellement: {low}\n") + for r in report: + print(" ", *r) + + +if __name__ == "__main__": + main() diff --git a/tools/extract_record_bench_cases.py b/tools/extract_record_bench_cases.py new file mode 100644 index 000000000..70236e081 --- /dev/null +++ b/tools/extract_record_bench_cases.py @@ -0,0 +1,178 @@ +#!/usr/bin/env python3 +"""Extracteur de cas LeaBench depuis une session RECORD (clics humains réels). + +G1 (2026-06-13) : ground-truth = position du clic humain. Instruction = la LIGNE +OCR (docTR) qui CONTIENT le clic (= l'onglet/élément complet, pas un mot isolé). +Les cibles AMBIGUËS (texte présent plusieurs fois à l'écran) et PARASITES +(scrollbar/barre des tâches, charabia) sont filtrées et tracées. + +Usage: + venv_v3/bin/python3 tools/extract_record_bench_cases.py \ + --session /tmp/easily_session \ + --out benchmarks/computer_use/cases/leabench_easily_clean_v2.jsonl +""" +import argparse +import ast +import json +import math +import os +import unicodedata + +from PIL import Image + + +def parse_event(o): + e = o.get("event") + if isinstance(e, dict): + return e + if isinstance(e, str): + try: + return ast.literal_eval(e) + except Exception: + return None + return None + + +def norm(s): + s = unicodedata.normalize("NFKD", s or "") + return "".join(c for c in s if not unicodedata.combining(c)).lower().strip() + + +def ocr_lines(model, path): + """[(text, x0, y0, x1, y1)] par MOT (docTR fusionne les onglets adjacents sur + une même ligne ; le mot est la bonne granularité pour une cible d'onglet).""" + from doctr.io import DocumentFile + res = model(DocumentFile.from_images(path)) + out = [] + for page in res.pages: + for block in page.blocks: + for line in block.lines: + for w in line.words: + (x0, y0), (x1, y1) = w.geometry + if w.value.strip(): + out.append((w.value, x0, y0, x1, y1)) + return out + + +def pick_target(lines, xp, yp): + """Retourne (text, n_occurrences, contained). + - text : la ligne contenant le clic (sinon la plus proche dans 0.04). + - n_occurrences : combien de lignes ont ce même texte (ambiguïté si >1). + - contained : True si le clic est dans la bbox de la ligne. + """ + contained = [(t, x0, y0, x1, y1) for (t, x0, y0, x1, y1) in lines + if x0 <= xp <= x1 and y0 <= yp <= y1 and t.strip()] + if contained: + # la plus petite ligne contenant le clic (la plus spécifique) + contained.sort(key=lambda r: (r[3] - r[1]) * (r[4] - r[2])) + text = contained[0][0].strip() + else: + best = None + for (t, x0, y0, x1, y1) in lines: + if not t.strip(): + continue + cx, cy = (x0 + x1) / 2, (y0 + y1) / 2 + d = math.hypot(cx - xp, cy - yp) + if d <= 0.04 and (best is None or d < best[0]): + best = (d, t.strip()) + if not best: + return None, 0, False + text = best[1] + n_occ = sum(1 for (t, *_b) in lines if norm(t) == norm(text)) + return text, n_occ, bool(contained) + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--session", required=True) + ap.add_argument("--out", required=True) + args = ap.parse_args() + + from doctr.models import ocr_predictor + print("chargement docTR…", flush=True) + model = ocr_predictor(pretrained=True) + + ev = os.path.join(args.session, "live_events.jsonl") + shots = os.path.join(args.session, "shots") + kept, dropped = [], [] + shot_cache = {} + + for line in open(ev): + try: + o = json.loads(line) + except Exception: + continue + e = parse_event(o) + if not e or e.get("type") not in ("mouse_click", "double_click"): + continue + pos = e.get("pos") + sid = e.get("screenshot_id") + if not pos or not sid: + continue + if isinstance(pos, str): + try: pos = ast.literal_eval(pos) + except Exception: continue + full = os.path.join(shots, f"{sid}_full.png") + if not os.path.exists(full): + continue + if full not in shot_cache: + shot_cache[full] = (Image.open(full).size, ocr_lines(model, full)) + (W, H), lines = shot_cache[full] + xp, yp = pos[0] / W, pos[1] / H + cid = f"easily_{sid}_{int(pos[0])}_{int(pos[1])}" + + # --- filtres --- + if not (0 <= xp <= 1 and 0 <= yp <= 1): + dropped.append((cid, "click_out_of_shot")); continue + if xp > 0.95 or yp > 0.92: + dropped.append((cid, "parasite_zone (scrollbar/barre tâches)")); continue + text, n_occ, contained = pick_target(lines, xp, yp) + if not text: + dropped.append((cid, "no_text_under_click")); continue + n_alpha = sum(c.isalpha() for c in text) + is_id = text.replace(" ", "").isdigit() and len(text.replace(" ", "")) >= 6 + if n_alpha < 3 and not is_id: + dropped.append((cid, f"charabia/court {text!r}")); continue + if len(text) > 18: # onglet/bouton court ; long = OCR cassé/texte collé + dropped.append((cid, f"trop long (OCR cassé) {text[:24]!r}")); continue + if n_occ > 1: + dropped.append((cid, f"AMBIGU {text!r} ×{n_occ}")); continue + + win = e.get("window") + wtitle = win.get("title", "") if isinstance(win, dict) else (str(win)[:80] if win else "") + kept.append({ + "case_id": cid, + "screenshot_path": os.path.abspath(full), + "task": { + "intent": f"cliquer sur « {text} »", + "target_text": text, + "current_window": wtitle[:80], + "expected_next_window": "", + "question": f"L'élément « {text} » est-il visible ? Clique uniquement dessus.", + }, + "expectation": { + "decision": "click", + "click_region": {"x_pct": round(xp, 4), "y_pct": round(yp, 4), + "radius_pct": 0.05}, + "accepted_reasons": ["human_click_groundtruth"], + }, + "metadata": {"source": "easily_record", "session": os.path.basename(args.session), + "click_type": e.get("type"), "contained_in_line": contained, + "ocr_occurrences": n_occ}, + }) + + os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True) + with open(args.out, "w") as f: + for c in kept: + f.write(json.dumps(c, ensure_ascii=False) + "\n") + print(f"\n{len(kept)} cas PROPRES → {args.out}") + print(f"{len(dropped)} cas écartés (tracés) :") + for cid, why in dropped: + print(f" - {cid}: {why}") + print("\nCibles retenues :") + for c in kept: + print(f" {c['task']['target_text']!r:30s} @ ({c['expectation']['click_region']['x_pct']},{c['expectation']['click_region']['y_pct']})") + + +if __name__ == "__main__": + main() diff --git a/tools/grounding_e2e_resolve_engine.py b/tools/grounding_e2e_resolve_engine.py new file mode 100644 index 000000000..638757ba0 --- /dev/null +++ b/tools/grounding_e2e_resolve_engine.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +"""E2E grounding via le VRAI chemin de résolution `_resolve_by_grounding`. + +Contrairement à `grounding_eval_multi.py` (qui fait son propre appel vLLM direct +avec son prompt/parser = chemin parallèle « unit-mocké »), ce harness exerce le +chemin de production réel : `agent_v0.server_v1.resolve_engine._resolve_by_grounding` +en mode `RPA_GROUNDING_ENGINE=qwen3vl_vllm`. + +But : confirmer que le 0.933 du bench (tuple modèle+moteur+prompt+parse+think) se +reproduit quand c'est le code de prod qui construit le prompt, encode l'image et +parse la réponse — pas un script de bench séparé. + +Scoring identique au bench original (distance euclidienne au click_region humain). + +Usage (env + tunnel vLLM 8001 requis) : + RPA_GROUNDING_ENGINE=qwen3vl_vllm .venv/bin/python3 \ + tools/grounding_e2e_resolve_engine.py \ + --cases benchmarks/computer_use/cases/leabench_easily_clean_v2.jsonl +""" +import argparse +import json +import logging +import math +import time + +from PIL import Image + +from agent_v0.server_v1.resolve_engine import _resolve_by_grounding + +logging.basicConfig(level=logging.WARNING) + + +def score(case, resolved): + """(status, correct, dangerous, x_pct, y_pct). + + resolved = dict de _resolve_by_grounding ou None (abstention). + Règle identique à grounding_eval_multi.score : clic dans le rayon = juste, + hors rayon = dangereux, abstention = raté non-dangereux. + """ + reg = case["expectation"]["click_region"] + if not resolved or resolved.get("x_pct") is None: + return "abstain", False, False, None, None + xp, yp = float(resolved["x_pct"]), float(resolved["y_pct"]) + d = math.hypot(xp - reg["x_pct"], yp - reg["y_pct"]) + if d <= reg["radius_pct"]: + return "in_region", True, False, xp, yp + return "outside_region", False, True, xp, yp + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--cases", required=True) + ap.add_argument("--out", default="benchmarks/computer_use/predictions/easily_e2e_resolve_engine.jsonl") + args = ap.parse_args() + + cases = [json.loads(line) for line in open(args.cases)] + rows = [] + print(f"\n===== E2E _resolve_by_grounding (qwen3vl_vllm) — {len(cases)} cas =====", flush=True) + for c in cases: + W, H = Image.open(c["screenshot_path"]).size + target_spec = {"by_text": c["task"]["target_text"]} + t0 = time.time() + try: + resolved = _resolve_by_grounding(c["screenshot_path"], target_spec, W, H) + err = None + except Exception as e: # noqa: BLE001 + resolved, err = None, f"{type(e).__name__}: {e}" + dt = time.time() - t0 + status, ok, dang, xp, yp = score(c, resolved) + method = resolved.get("method") if resolved else None + rows.append({ + "case_id": c["case_id"], "target": c["task"]["target_text"], + "status": status, "correct": ok, "dangerous": dang, + "x_pct": xp, "y_pct": yp, "method": method, + "latency_s": round(dt, 2), "error": err, + }) + flag = "OK " if ok else ("DANGER" if dang else "abst") + print(f" {c['case_id'][:30]:30s} {flag:6s} {status:14s} {dt:5.1f}s " + f"{(c['task']['target_text'][:20]):20s} " + f"{('('+str(xp)+','+str(yp)+')') if xp is not None else (err or '-')}", + flush=True) + + with open(args.out, "w") as f: + for r in rows: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + + n = len(rows) + correct = sum(r["correct"] for r in rows) + dang = sum(r["dangerous"] for r in rows) + abst = sum(1 for r in rows if r["status"] == "abstain") + lats = [r["latency_s"] for r in rows if r["latency_s"]] + lat_med = sorted(lats)[len(lats) // 2] if lats else None + print("\n========== SYNTHÈSE E2E (vrai chemin resolve_engine) ==========") + print(f" n={n} accuracy={correct/n:.3f} justes={correct} " + f"DANGEREUX={dang} abstentions={abst} lat_méd={lat_med}s") + print(f" prédictions → {args.out}") + + +if __name__ == "__main__": + main() diff --git a/tools/grounding_eval_multi.py b/tools/grounding_eval_multi.py new file mode 100644 index 000000000..44a9f8391 --- /dev/null +++ b/tools/grounding_eval_multi.py @@ -0,0 +1,215 @@ +#!/usr/bin/env python3 +"""Éval grounding multi-modèles avec prompt+parser ADAPTÉS par modèle. + +Chaque famille de modèle a sa convention native de coordonnées (vérifié 2026-06): + - qwen2.5-vl : pixels absolus (bbox_2d / point en px de l'image envoyée) + - qwen3-vl : normalisé 0-1000 (résolution-indépendant) + - gemma 3 : pas de grounding natif → on demande du 0-1 best-effort +Toutes les sorties sont reconverties en (x_pct, y_pct) ∈ [0,1] pour un scoreur commun. + +Mesure : grounding (clic juste / dangereux), latence par appel, abstentions. + +Usage: + venv_v3/bin/python3 tools/grounding_eval_multi.py \ + --cases benchmarks/computer_use/cases/leabench_easily_clean_2026-06-12.jsonl \ + --models gemma4:26b qwen2.5vl:7b-rpa qwen3-vl:8b \ + --endpoint http://127.0.0.1:11434 --engine ollama \ + --out benchmarks/computer_use/predictions/easily_multi +""" +import argparse +import base64 +import io +import json +import math +import os +import re +import time + +import requests +from PIL import Image + +INSTR = ("Tu localises une cible sur une capture d'écran d'interface. " + "Si la cible n'est pas clairement visible, réponds par une abstention.") + + +def profile(model: str): + m = model.lower() + if "qwen3" in m or "qwen3.5" in m: + return "qwen3" + if "qwen2" in m or "qwen2.5" in m or "qwenvl" in m: + return "qwen25" + return "gemma" # gemma et défaut générique + + +def build_prompt(prof, case, W, H): + """Prompt universel : on demande du 0-1 à tous (le plus robuste au resize). + Le parser rattrape si un modèle sort quand même son format natif.""" + q = case["task"]["question"] + tgt = case["task"]["target_text"] + win = case["task"].get("current_window", "") + ctx = f"Fenêtre: {win}. Cible: « {tgt} ». {q}\n" + fmt = ("Donne le point de clic en FRACTIONS de l'image : x et y entre 0.0 et 1.0 " + "(0,0 = coin haut-gauche, 1,1 = coin bas-droite). " + 'Réponds UNIQUEMENT par un JSON {"x":0.xx,"y":0.xx} ' + 'ou {"abstain":true} si la cible n\'est pas clairement visible.') + return ctx + fmt + + +def parse_pred(prof, text, W, H): + """(decision, x_pct, y_pct). Accepte 0-1 ; sinon désambiguïse selon le profil + (filet si le modèle a ignoré la consigne et sorti son format natif).""" + if not text: + return "parse_error", None, None + if re.search(r'"?abstain"?\s*:\s*true', text, re.I): + return "abstain", None, None + try: + j = json.loads(re.search(r"\{.*\}", text, re.S).group(0)) + except Exception: + nums = re.findall(r"-?\d+\.?\d*", text) + if len(nums) < 2: + return "parse_error", None, None + j = {"x": float(nums[0]), "y": float(nums[1])} + if "x" in j and "y" in j: + x, y = float(j["x"]), float(j["y"]) + else: + pt = (j.get("point") or j.get("point_2d") or j.get("bbox_2d") + or j.get("click") or j.get("coordinate")) + if isinstance(pt, (list, tuple)) and len(pt) >= 2: + x, y = float(pt[0]), float(pt[1]) # bbox → coin = approx point + else: + return "parse_error", None, None + + def rescale(v, dim): + if 0 <= v <= 1.0: + return v # déjà 0-1 (consigne respectée) + if v <= 1000 and prof == "qwen3": + return v / 1000.0 # qwen3 natif 0-1000 + if v > 1.0 and prof == "qwen25": + return v / dim # qwen2.5 natif pixels (de l'image envoyée) + if v <= 1000: + return v / 1000.0 # filet générique 0-1000 + return v / dim # filet pixels + xp, yp = rescale(x, W), rescale(y, H) + if not (0 <= xp <= 1 and 0 <= yp <= 1): + return "parse_error", None, None + return "click", round(xp, 4), round(yp, 4) + + +def img_b64(path, max_edge=1280): + im = Image.open(path).convert("RGB") + W0, H0 = im.size + if max(im.size) > max_edge: + r = max_edge / max(im.size) + im = im.resize((int(im.width * r), int(im.height * r)), Image.LANCZOS) + buf = io.BytesIO(); im.save(buf, format="JPEG", quality=90) + return base64.b64encode(buf.getvalue()).decode(), W0, H0, im.size + + +def call_ollama(endpoint, model, prompt, b64, timeout): + payload = {"model": model, "stream": False, "format": "json", + "think": False, # désactive le raisonnement (grounding : réponse directe) + "messages": [{"role": "system", "content": INSTR}, + {"role": "user", "content": prompt, "images": [b64]}], + "options": {"temperature": 0.0}} + t0 = time.time() + r = requests.post(f"{endpoint}/api/chat", json=payload, timeout=timeout) + dt = time.time() - t0 + r.raise_for_status() + return r.json().get("message", {}).get("content", ""), dt + + +def call_vllm(endpoint, model, prompt, b64, timeout): + """API OpenAI-compatible (vLLM) : image en data-URI base64.""" + payload = {"model": model, "temperature": 0.0, "max_tokens": 256, + "chat_template_kwargs": {"enable_thinking": False}, # pas de raisonnement + "messages": [{"role": "system", "content": INSTR}, + {"role": "user", "content": [ + {"type": "text", "text": prompt}, + {"type": "image_url", + "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}]}]} + t0 = time.time() + r = requests.post(f"{endpoint}/v1/chat/completions", json=payload, timeout=timeout) + dt = time.time() - t0 + r.raise_for_status() + return r.json()["choices"][0]["message"]["content"], dt + + +def call_model(engine, endpoint, model, prompt, b64, timeout): + if engine == "vllm": + return call_vllm(endpoint, model, prompt, b64, timeout) + return call_ollama(endpoint, model, prompt, b64, timeout) + + +def score(case, decision, xp, yp): + reg = case["expectation"]["click_region"] + if decision != "click": + return "abstain", False, False # ni correct ni dangereux (sur cas click attendu = raté non-dangereux) + d = math.hypot(xp - reg["x_pct"], yp - reg["y_pct"]) + if d <= reg["radius_pct"]: + return "in_region", True, False + return "outside_region", False, True + + +def main(): + ap = argparse.ArgumentParser() + ap.add_argument("--cases", required=True) + ap.add_argument("--models", nargs="+", required=True) + ap.add_argument("--endpoint", default="http://127.0.0.1:11434") + ap.add_argument("--engine", default="ollama", choices=["ollama", "vllm"]) + ap.add_argument("--timeout", type=int, default=120) + ap.add_argument("--out", required=True) + args = ap.parse_args() + + cases = [json.loads(l) for l in open(args.cases)] + os.makedirs(os.path.dirname(args.out) or ".", exist_ok=True) + summary = [] + + for model in args.models: + prof = profile(model) + rows = [] + print(f"\n===== {model} (profil={prof}) =====", flush=True) + for c in cases: + b64, W0, H0, sent = img_b64(c["screenshot_path"]) + Ws, Hs = sent # taille réellement envoyée (pour le filet pixels) + prompt = build_prompt(prof, c, Ws, Hs) + try: + text, dt = call_model(args.engine, args.endpoint, model, prompt, b64, args.timeout) + dec, xp, yp = parse_pred(prof, text, Ws, Hs) + except Exception as e: + text, dt, dec, xp, yp = f"ERR:{e}", None, "error", None, None + status, ok, dang = score(c, dec, xp, yp) + rows.append({"case_id": c["case_id"], "model": model, "profile": prof, + "decision": dec, "x_pct": xp, "y_pct": yp, + "latency_s": round(dt, 2) if dt else None, + "status": status, "correct": ok, "dangerous": dang, + "target": c["task"]["target_text"]}) + print(f" {c['case_id'][:34]:34s} {dec:11s} {status:14s} " + f"{(str(round(dt,1))+'s') if dt else '-':>6} {c['task']['target_text'][:18]!r}", + flush=True) + pred_path = f"{args.out}_{model.replace(':','_').replace('/','_')}.jsonl" + with open(pred_path, "w") as f: + for r in rows: + f.write(json.dumps(r, ensure_ascii=False) + "\n") + n = len(rows) + correct = sum(r["correct"] for r in rows) + dang = sum(r["dangerous"] for r in rows) + abst = sum(1 for r in rows if r["decision"] in ("abstain", "parse_error", "error")) + lats = [r["latency_s"] for r in rows if r["latency_s"]] + summary.append({"model": model, "profile": prof, "n": n, + "accuracy": round(correct / n, 3), "correct": correct, + "dangerous": dang, "abstain_or_err": abst, + "latency_med": round(sorted(lats)[len(lats)//2], 1) if lats else None, + "latency_max": round(max(lats), 1) if lats else None}) + + print("\n\n========== SYNTHÈSE GROUNDING (Easily réel) ==========") + print(f"{'modèle':22s} {'prof':7s} {'acc':>5} {'just':>5} {'DANG':>5} {'abst':>5} {'lat_méd':>8} {'lat_max':>8}") + for s in summary: + print(f"{s['model']:22s} {s['profile']:7s} {s['accuracy']:>5} " + f"{s['correct']:>5} {s['dangerous']:>5} {s['abstain_or_err']:>5} " + f"{str(s['latency_med'])+'s':>8} {str(s['latency_max'])+'s':>8}") + with open(f"{args.out}_summary.json", "w") as f: + json.dump(summary, f, indent=2, ensure_ascii=False) + + +if __name__ == "__main__": + main()