feat(tools): add 7 wired+bench utility scripts (A+B classification)
- A (wired, imports project modules): e2e_map_roles, anonymize_demo, grounding_e2e_resolve_engine - B (orphan projection, standalone benches): enrichment_eval_multi, extract_easily_bench_cases, extract_record_bench_cases, grounding_eval_multi
This commit is contained in:
158
tools/extract_easily_bench_cases.py
Normal file
158
tools/extract_easily_bench_cases.py
Normal file
@@ -0,0 +1,158 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Extracteur de cas LeaBench à partir des replay_failures Easily.
|
||||
|
||||
Ground-truth obtenue par OCR (docTR) : on localise le `by_text` du target_spec
|
||||
sur le screenshot réel → centre de sa bbox = (x_pct, y_pct). Les cas sans
|
||||
`by_text` exploitable (ou texte introuvable) sont marqués `needs_human_check`
|
||||
pour validation/annotation visuelle.
|
||||
|
||||
Usage:
|
||||
venv_v3/bin/python3 tools/extract_easily_bench_cases.py \
|
||||
--files /tmp/ez_files.txt \
|
||||
--out benchmarks/computer_use/cases/leabench_easily_2026-06-12.jsonl
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import unicodedata
|
||||
|
||||
|
||||
def norm(s: str) -> str:
|
||||
s = unicodedata.normalize("NFKD", s or "")
|
||||
s = "".join(c for c in s if not unicodedata.combining(c))
|
||||
return s.lower().strip()
|
||||
|
||||
|
||||
def ocr_lines(model, shot):
|
||||
"""Retourne [(texte_ligne, (cx, cy))] en coords normalisées 0-1."""
|
||||
from doctr.io import DocumentFile
|
||||
doc = DocumentFile.from_images(shot)
|
||||
res = model(doc)
|
||||
out = []
|
||||
for page in res.pages:
|
||||
for block in page.blocks:
|
||||
for line in block.lines:
|
||||
txt = " ".join(w.value for w in line.words)
|
||||
xs, ys = [], []
|
||||
for w in line.words:
|
||||
(x0, y0), (x1, y1) = w.geometry
|
||||
xs += [x0, x1]
|
||||
ys += [y0, y1]
|
||||
if not xs:
|
||||
continue
|
||||
cx = (min(xs) + max(xs)) / 2.0
|
||||
cy = (min(ys) + max(ys)) / 2.0
|
||||
out.append((txt, (cx, cy)))
|
||||
# aussi par mot pour cibles courtes
|
||||
for w in line.words:
|
||||
(x0, y0), (x1, y1) = w.geometry
|
||||
out.append((w.value, ((x0 + x1) / 2, (y0 + y1) / 2)))
|
||||
return out
|
||||
|
||||
|
||||
def best_match(bytext, lines):
|
||||
"""Trouve la ligne/mot OCR couvrant le mieux by_text. Retourne (cx,cy,score)."""
|
||||
nb = norm(bytext)
|
||||
if not nb:
|
||||
return None
|
||||
best = None
|
||||
for txt, (cx, cy) in lines:
|
||||
nt = norm(txt)
|
||||
if not nt:
|
||||
continue
|
||||
if nb == nt:
|
||||
score = 1.0
|
||||
elif nb in nt or nt in nb:
|
||||
score = min(len(nb), len(nt)) / max(len(nb), len(nt))
|
||||
else:
|
||||
# recouvrement de tokens
|
||||
tb, tt = set(nb.split()), set(nt.split())
|
||||
inter = tb & tt
|
||||
score = len(inter) / max(1, len(tb)) * 0.8 if inter else 0.0
|
||||
if best is None or score > best[2]:
|
||||
best = (round(cx, 4), round(cy, 4), round(score, 3))
|
||||
return best
|
||||
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser()
|
||||
ap.add_argument("--files", default="/tmp/ez_files.txt")
|
||||
ap.add_argument("--out", required=True)
|
||||
args = ap.parse_args()
|
||||
|
||||
from doctr.models import ocr_predictor
|
||||
print("chargement docTR…", flush=True)
|
||||
model = ocr_predictor(pretrained=True)
|
||||
|
||||
files = [l.strip() for l in open(args.files) if l.strip()]
|
||||
cases, report = [], []
|
||||
|
||||
for fp in files:
|
||||
sess = os.path.basename(os.path.dirname(fp))
|
||||
for line in open(fp):
|
||||
try:
|
||||
o = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
ts = o.get("target_spec", {}) or {}
|
||||
shot = o.get("screenshot_path", "")
|
||||
if not shot or not os.path.exists(shot):
|
||||
continue
|
||||
bytext = (ts.get("by_text") or "").strip()
|
||||
vlmd = (ts.get("vlm_description") or "").strip()
|
||||
m = None
|
||||
try:
|
||||
if bytext:
|
||||
m = best_match(bytext, ocr_lines(model, shot))
|
||||
except Exception as e:
|
||||
report.append((sess, os.path.basename(shot), "ocr_err", str(e)[:40]))
|
||||
if m and m[2] >= 0.6:
|
||||
x_pct, y_pct, score = m
|
||||
needs = False
|
||||
else:
|
||||
x_pct, y_pct, score = 0.5, 0.5, (m[2] if m else 0.0)
|
||||
needs = True
|
||||
base = os.path.splitext(os.path.basename(shot))[0]
|
||||
cases.append({
|
||||
"case_id": f"easily_{sess}_{base}"[:70],
|
||||
"screenshot_path": shot,
|
||||
"task": {
|
||||
"intent": (o.get("intent") or "").strip() or (
|
||||
f"cliquer sur « {bytext} »" if bytext else "cliquer sur la cible"),
|
||||
"target_text": bytext,
|
||||
"current_window": "Easily Assure (maquette POC)",
|
||||
"expected_next_window": "",
|
||||
"question": (
|
||||
f"La cible « {bytext} » est-elle visible ? Clique uniquement dessus."
|
||||
if bytext else f"Cible : {vlmd[:120]}. Clique uniquement dessus."),
|
||||
},
|
||||
"expectation": {
|
||||
"decision": "click",
|
||||
"click_region": {"x_pct": x_pct, "y_pct": y_pct, "radius_pct": 0.06},
|
||||
"accepted_reasons": ["ocr_text_match"],
|
||||
},
|
||||
"metadata": {
|
||||
"source": "easily_replay_failure",
|
||||
"session": sess,
|
||||
"ocr_match_score": score,
|
||||
"by_text_source": ts.get("by_text_source"),
|
||||
"needs_human_check": needs,
|
||||
},
|
||||
})
|
||||
flag = " ⚠CHECK" if needs else ""
|
||||
report.append((sess, os.path.basename(shot), f"score={score}",
|
||||
f"({x_pct},{y_pct}) text={bytext!r}{flag}"))
|
||||
|
||||
os.makedirs(os.path.dirname(args.out), exist_ok=True)
|
||||
with open(args.out, "w") as f:
|
||||
for c in cases:
|
||||
f.write(json.dumps(c, ensure_ascii=False) + "\n")
|
||||
low = sum(1 for c in cases if c["metadata"]["needs_human_check"])
|
||||
print(f"\n{len(cases)} cas écrits → {args.out}")
|
||||
print(f" auto (OCR ok): {len(cases)-low} | à valider visuellement: {low}\n")
|
||||
for r in report:
|
||||
print(" ", *r)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user