Files
rpa_vision_v3/tools/analyze_bench_results.py
Dom 5ea4960e65
Some checks failed
tests / Lint (ruff + black) (push) Successful in 1m50s
tests / Tests unitaires (sans GPU) (push) Failing after 1m50s
tests / Tests sécurité (critique) (push) Has been skipped
backup: snapshot post-démo GHT 2026-05-19
Backup état complet après enregistrement vidéo démo de bout en bout.
À utiliser comme point de référence pour la consolidation post-démo.

Changements majeurs de la session 18-19 mai :
- AIVA-URGENCE : page autonome avec preset URL + auto-focus chain
- Workflow Demo_urgence_3_db : merge linux_db + steps AIVA + pause humaine NoMachine
- Bypass LLM (static_result / static_text) dans replay_engine
  pour démos déterministes sans appel Ollama
- Fix api_stream:3013 — replay_paused au premier polling /next
- dag_execute : lift duration_ms vers top-level pour wait runtime
- NPM bypass auth /aiva-urgence/ via location ^~ (proxy_host/10.conf hors git)
- scripts/cancel-replays.sh — workaround Stop VWB qui ne purge pas la queue

Anchors visuels (468) forcés dans le commit pour garantir restorabilité.
DB workflows actuelle + ~12 .bak DB de la journée incluses.

Sujets identifiés pour consolidation post-démo (TODO) :
1. Bug VWB recapture anchor ne régénère pas le PNG
2. Léa client accumule état mémoire (restart périodique requis)
3. Stop VWB ne purge pas la queue serveur (lien manquant vers /replay/cancel)
4. Bug coord client mss tronqué 2560x60 → mapping Y cassé
5. delay_before/delay_after ignorés au runtime (fix partiel duration_ms)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-19 14:55:06 +02:00

304 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Analyse les résultats bench_baseline.json + bench_postfix.json.
Calcule :
- accuracy par dossier (3 runs, vote majoritaire)
- accuracy globale, UHCD, Forfait
- stabilité inter-runs
- score qualité justification (présence CCMU, GEMSA, durée, citations,
cohérence type_forfait)
- Δ baseline vs postfix par dossier
Sortie : tables markdown sur stdout + JSON brut sauvegardé.
"""
from __future__ import annotations
import json
import re
import sys
from collections import Counter
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
RES = ROOT / "tools" / "_bench_t2a_out"
# (ipp, label court, ground truth, type_forfait attendu)
GT = [
("25003284", "Pneumo VRS 78a 3h37", "FORFAIT_URGENCE", "Standard"),
("25003362", "Intox enfant 3a 4h41", "FORFAIT_URGENCE", "PE2"),
("25003364", "Pneumo SLA 71a 7h35", "REQUALIFICATION_HOSPITALISATION", None),
("25003451", "Plaie suturée 3a 2h00", "FORFAIT_URGENCE", "SU2"),
("25003475", "Aura migr. 34a 4h03", "REQUALIFICATION_HOSPITALISATION", None),
("25005866", "TC hockey 17a 12h01", "REQUALIFICATION_HOSPITALISATION", None),
("25010621", "Laryngite 5a 2h49", "FORFAIT_URGENCE", "PE2"),
("25012257", "Douleur abdo 76a 7h20", "REQUALIFICATION_HOSPITALISATION", None),
("25048485", "CTCG ado 13a 6h50", "FORFAIT_URGENCE", "PE2"),
("25056615", "Salpingite 39a 4h30", "FORFAIT_URGENCE", "Standard"),
("25151530", "Colique nephr. 58a 6h21", "FORFAIT_URGENCE", "Standard"),
]
LITIGIEUX = {"25003475", "25012257", "25048485", "25056615"} # cas borderline cf. audit DIM
def short(d: str | None) -> str:
if d is None: return "?"
if d == "REQUALIFICATION_HOSPITALISATION": return "UHCD"
if d == "FORFAIT_URGENCE": return "Forf"
return d[:8]
def majority(decisions: list[str]) -> str | None:
decisions = [d for d in decisions if d]
if not decisions:
return None
c = Counter(decisions).most_common(1)
return c[0][0]
def quality_score(raw: dict, ipp: str, gt: str, mode: str) -> tuple[int, list[str]]:
"""Score qualité justif sur 5, retourne aussi la liste des points marqués/manqués."""
notes = []
score = 0
# Concaténation de tous les textes pour grep
blob_parts = []
for k, v in raw.items():
if k.startswith("_"):
continue
if isinstance(v, str):
blob_parts.append(v)
elif isinstance(v, dict):
blob_parts.extend(str(x) for x in v.values() if isinstance(x, str))
elif isinstance(v, list):
for x in v:
if isinstance(x, str):
blob_parts.append(x)
elif isinstance(x, dict):
blob_parts.extend(str(y) for y in x.values() if isinstance(y, str))
blob = " ".join(blob_parts).lower()
# 1. Mention CCMU ?
if "ccmu" in blob:
score += 1; notes.append("+CCMU")
else:
notes.append("-CCMU")
# 2. Mention GEMSA ?
if "gemsa" in blob:
score += 1; notes.append("+GEMSA")
else:
notes.append("-GEMSA")
# 3. Mention durée passage ?
duree = raw.get("duree_passage_heures")
if duree is not None and "duree" in str(raw) or re.search(r"\d+\s*h\s*\d+|h(?:eure|rs)", blob):
if duree is not None:
score += 1; notes.append(f"+durée({duree}h)")
else:
notes.append("-durée")
else:
notes.append("-durée")
# 4. Mention mode de sortie / décision médicale ?
if any(w in blob for w in ("retour à domicile", "domicile", "consultation externe",
"hospitalisation", "transfert", "mutation")):
score += 1; notes.append("+mode_sortie")
else:
notes.append("-mode_sortie")
# 5. Présence de citations littérales (« » ou guillemets droits) avec contenu non-vide ?
has_citation = (
bool(re.search(r"«\s*[^»]{6,}\s*»", " ".join(blob_parts)))
or bool(re.search(r'"[^"]{8,}"', " ".join(blob_parts)))
)
if has_citation:
score += 1; notes.append("+citation")
else:
notes.append("-citation")
return score, notes
def hallucination_check(raw: dict, dpi: str) -> list[str]:
"""Liste de citations « ... » présentes dans la sortie LLM mais ABSENTES du DPI."""
out = []
blob_parts = []
for k, v in raw.items():
if k.startswith("_"):
continue
if isinstance(v, str):
blob_parts.append(v)
elif isinstance(v, dict):
for x in v.values():
if isinstance(x, str):
blob_parts.append(x)
full = " ".join(blob_parts)
citations = re.findall(r"«\s*([^»]{6,80})\s*»", full)
dpi_lower = dpi.lower()
for c in citations[:20]: # limite
# tolérance : on cherche un sous-fragment de 8+ caractères
if not any(c.lower()[i:i+12] in dpi_lower for i in range(0, max(1, len(c) - 12), 4)):
out.append(c.strip())
return out
def analyze(mode_label: str, path: Path, dpis: dict[str, str]) -> dict:
if not path.is_file():
print(f"⚠ Fichier manquant : {path}")
return {}
data = json.loads(path.read_text(encoding="utf-8"))
results = data["results"]
model = data["model"]
n_runs = data["runs"]
rows = []
correct_total = 0; total_runs = 0
for ipp, label, gt, ftype in GT:
runs = results.get(ipp, [])
decisions = [r.get("decision") for r in runs]
type_forfaits = [r.get("type_forfait") for r in runs]
match = sum(1 for r in runs if r.get("match"))
total_runs += len(runs)
correct_total += match
maj = majority(decisions)
# type_forfait majoritaire (ignoré si UHCD attendu)
type_maj = Counter([t for t in type_forfaits if t]).most_common(1)
type_maj_str = type_maj[0][0] if type_maj else ""
# Qualité moyenne sur les 3 runs
qscores = []
all_notes = []
halluc_total = []
for r in runs:
raw = r.get("raw", {})
s, notes = quality_score(raw, ipp, gt, mode_label)
qscores.append(s)
all_notes.append(notes)
halluc = hallucination_check(raw, dpis.get(ipp, ""))
halluc_total.extend(halluc)
rows.append({
"ipp": ipp,
"label": label,
"gt": gt,
"gt_short": short(gt),
"ftype": ftype,
"decisions": decisions,
"decisions_short": [short(d) for d in decisions],
"majority": short(maj),
"majority_match": maj == gt,
"type_forfait_maj": type_maj_str,
"type_forfait_match": (gt == "REQUALIFICATION_HOSPITALISATION") or (type_maj_str == ftype),
"stable": len(set(decisions)) == 1,
"match_runs": match,
"litigieux": ipp in LITIGIEUX,
"quality_avg": round(sum(qscores) / max(1, len(qscores)), 1),
"quality_max": max(qscores) if qscores else 0,
"quality_notes_first": all_notes[0] if all_notes else [],
"hallucinations": halluc_total[:5],
})
# Stats globales
n_dossiers = len(rows)
accuracy_runs = correct_total / max(1, total_runs)
accuracy_majority = sum(1 for r in rows if r["majority_match"]) / n_dossiers
uhcd_rows = [r for r in rows if r["gt"] == "REQUALIFICATION_HOSPITALISATION"]
forf_rows = [r for r in rows if r["gt"] == "FORFAIT_URGENCE"]
uhcd_acc_majority = sum(1 for r in uhcd_rows if r["majority_match"]) / max(1, len(uhcd_rows))
forf_acc_majority = sum(1 for r in forf_rows if r["majority_match"]) / max(1, len(forf_rows))
stability = sum(1 for r in rows if r["stable"]) / n_dossiers
litigieux_acc = sum(1 for r in rows if r["litigieux"] and r["majority_match"]) / max(1, len([r for r in rows if r["litigieux"]]))
type_forfait_acc = sum(1 for r in rows if r["gt"] == "FORFAIT_URGENCE" and r["type_forfait_match"]) / max(1, len(forf_rows))
avg_quality = round(sum(r["quality_avg"] for r in rows) / n_dossiers, 2)
n_halluc = sum(len(r["hallucinations"]) for r in rows)
return {
"mode": mode_label,
"model": model,
"n_runs": n_runs,
"rows": rows,
"accuracy_runs": round(accuracy_runs, 3),
"accuracy_majority": round(accuracy_majority, 3),
"uhcd_acc_majority": round(uhcd_acc_majority, 3),
"forfait_acc_majority": round(forf_acc_majority, 3),
"type_forfait_acc": round(type_forfait_acc, 3),
"stability": round(stability, 3),
"litigieux_acc": round(litigieux_acc, 3),
"avg_quality": avg_quality,
"n_hallucinations": n_halluc,
}
def print_table(report: dict):
print(f"\n## {report['mode']} (model={report['model']}, {report['n_runs']} runs/dossier)\n")
print(f"- Accuracy runs (3×11=33 inférences) : **{report['accuracy_runs']*100:.0f}%**")
print(f"- Accuracy vote majoritaire (sur 11 dossiers) : **{report['accuracy_majority']*100:.0f}%**")
print(f"- Accuracy UHCD (majoritaire) : {report['uhcd_acc_majority']*100:.0f}%")
print(f"- Accuracy Forfait (majoritaire) : {report['forfait_acc_majority']*100:.0f}%")
print(f"- Type forfait correct (parmi forfaits OK) : {report['type_forfait_acc']*100:.0f}%")
print(f"- Stabilité inter-runs : {report['stability']*100:.0f}%")
print(f"- Cas litigieux OK : {report['litigieux_acc']*100:.0f}%")
print(f"- Qualité justification moyenne : **{report['avg_quality']}/5**")
print(f"- Hallucinations citations : {report['n_hallucinations']}")
print()
print("| IPP | Cas | GT | Run1 | Run2 | Run3 | Maj | Stable | Type | Qual |")
print("|---|---|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|")
for r in report["rows"]:
runs = r["decisions_short"] + [""] * (3 - len(r["decisions_short"]))
stable = "" if r["stable"] else " "
ftype = r["type_forfait_maj"] if r["gt"] == "FORFAIT_URGENCE" else ""
ftype_mark = "" if r["gt"] == "REQUALIFICATION_HOSPITALISATION" else ("" if r["type_forfait_match"] else "")
flag = "" if r["majority_match"] else ""
litig = " 🔴" if r["litigieux"] else ""
print(f"| {r['ipp']} | {r['label']}{litig} | {r['gt_short']} | "
f"{runs[0]} | {runs[1]} | {runs[2]} | {flag} {r['majority']} | {stable} | "
f"{ftype}{ftype_mark} | {r['quality_avg']}/5 |")
def print_delta(baseline: dict, postfix: dict):
print("\n## Δ Baseline → Post-fix\n")
print("| IPP | Cas | GT | Baseline | Post-fix | Δ |")
print("|---|---|:---:|:---:|:---:|:---:|")
for b, p in zip(baseline["rows"], postfix["rows"]):
b_flag = "" if b["majority_match"] else ""
p_flag = "" if p["majority_match"] else ""
if b["majority_match"] and p["majority_match"]:
delta = "= ✓"
elif not b["majority_match"] and p["majority_match"]:
delta = "🟢 +1"
elif b["majority_match"] and not p["majority_match"]:
delta = "🔴 -1"
else:
delta = "= ✗"
litig = " 🔴" if b["litigieux"] else ""
print(f"| {b['ipp']} | {b['label']}{litig} | {b['gt_short']} | {b_flag} {b['majority']} | {p_flag} {p['majority']} | {delta} |")
# Headlines
print()
print(f"**Synthèse Δ** :")
print(f"- Baseline : {sum(1 for r in baseline['rows'] if r['majority_match'])}/11 → {baseline['accuracy_majority']*100:.0f}%")
print(f"- Post-fix : {sum(1 for r in postfix['rows'] if r['majority_match'])}/11 → {postfix['accuracy_majority']*100:.0f}%")
print(f"- Gain absolu : {(postfix['accuracy_majority'] - baseline['accuracy_majority'])*100:+.0f} points")
print(f"- Stabilité : {baseline['stability']*100:.0f}% → {postfix['stability']*100:.0f}%")
print(f"- Qualité justification : {baseline['avg_quality']}/5 → {postfix['avg_quality']}/5")
def main():
dpis = json.loads((RES / "dpis.json").read_text(encoding="utf-8"))
baseline = analyze("Baseline", RES / "bench_baseline.json", dpis)
postfix = analyze("Post-fix", RES / "bench_postfix.json", dpis)
if baseline:
print_table(baseline)
if postfix:
print_table(postfix)
if baseline and postfix:
print_delta(baseline, postfix)
# Sauve l'analyse complète
out = RES / "analysis.json"
out.write_text(json.dumps({"baseline": baseline, "postfix": postfix}, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\n📁 {out}")
if __name__ == "__main__":
main()