#!/usr/bin/env python3 """Analyse les résultats bench_baseline.json + bench_postfix.json. Calcule : - accuracy par dossier (3 runs, vote majoritaire) - accuracy globale, UHCD, Forfait - stabilité inter-runs - score qualité justification (présence CCMU, GEMSA, durée, citations, cohérence type_forfait) - Δ baseline vs postfix par dossier Sortie : tables markdown sur stdout + JSON brut sauvegardé. """ from __future__ import annotations import json import re import sys from collections import Counter from pathlib import Path ROOT = Path(__file__).resolve().parent.parent RES = ROOT / "tools" / "_bench_t2a_out" # (ipp, label court, ground truth, type_forfait attendu) GT = [ ("25003284", "Pneumo VRS 78a 3h37", "FORFAIT_URGENCE", "Standard"), ("25003362", "Intox enfant 3a 4h41", "FORFAIT_URGENCE", "PE2"), ("25003364", "Pneumo SLA 71a 7h35", "REQUALIFICATION_HOSPITALISATION", None), ("25003451", "Plaie suturée 3a 2h00", "FORFAIT_URGENCE", "SU2"), ("25003475", "Aura migr. 34a 4h03", "REQUALIFICATION_HOSPITALISATION", None), ("25005866", "TC hockey 17a 12h01", "REQUALIFICATION_HOSPITALISATION", None), ("25010621", "Laryngite 5a 2h49", "FORFAIT_URGENCE", "PE2"), ("25012257", "Douleur abdo 76a 7h20", "REQUALIFICATION_HOSPITALISATION", None), ("25048485", "CTCG ado 13a 6h50", "FORFAIT_URGENCE", "PE2"), ("25056615", "Salpingite 39a 4h30", "FORFAIT_URGENCE", "Standard"), ("25151530", "Colique nephr. 58a 6h21", "FORFAIT_URGENCE", "Standard"), ] LITIGIEUX = {"25003475", "25012257", "25048485", "25056615"} # cas borderline cf. audit DIM def short(d: str | None) -> str: if d is None: return "?" if d == "REQUALIFICATION_HOSPITALISATION": return "UHCD" if d == "FORFAIT_URGENCE": return "Forf" return d[:8] def majority(decisions: list[str]) -> str | None: decisions = [d for d in decisions if d] if not decisions: return None c = Counter(decisions).most_common(1) return c[0][0] def quality_score(raw: dict, ipp: str, gt: str, mode: str) -> tuple[int, list[str]]: """Score qualité justif sur 5, retourne aussi la liste des points marqués/manqués.""" notes = [] score = 0 # Concaténation de tous les textes pour grep blob_parts = [] for k, v in raw.items(): if k.startswith("_"): continue if isinstance(v, str): blob_parts.append(v) elif isinstance(v, dict): blob_parts.extend(str(x) for x in v.values() if isinstance(x, str)) elif isinstance(v, list): for x in v: if isinstance(x, str): blob_parts.append(x) elif isinstance(x, dict): blob_parts.extend(str(y) for y in x.values() if isinstance(y, str)) blob = " ".join(blob_parts).lower() # 1. Mention CCMU ? if "ccmu" in blob: score += 1; notes.append("+CCMU") else: notes.append("-CCMU") # 2. Mention GEMSA ? if "gemsa" in blob: score += 1; notes.append("+GEMSA") else: notes.append("-GEMSA") # 3. Mention durée passage ? duree = raw.get("duree_passage_heures") if duree is not None and "duree" in str(raw) or re.search(r"\d+\s*h\s*\d+|h(?:eure|rs)", blob): if duree is not None: score += 1; notes.append(f"+durée({duree}h)") else: notes.append("-durée") else: notes.append("-durée") # 4. Mention mode de sortie / décision médicale ? if any(w in blob for w in ("retour à domicile", "domicile", "consultation externe", "hospitalisation", "transfert", "mutation")): score += 1; notes.append("+mode_sortie") else: notes.append("-mode_sortie") # 5. Présence de citations littérales (« » ou guillemets droits) avec contenu non-vide ? has_citation = ( bool(re.search(r"«\s*[^»]{6,}\s*»", " ".join(blob_parts))) or bool(re.search(r'"[^"]{8,}"', " ".join(blob_parts))) ) if has_citation: score += 1; notes.append("+citation") else: notes.append("-citation") return score, notes def hallucination_check(raw: dict, dpi: str) -> list[str]: """Liste de citations « ... » présentes dans la sortie LLM mais ABSENTES du DPI.""" out = [] blob_parts = [] for k, v in raw.items(): if k.startswith("_"): continue if isinstance(v, str): blob_parts.append(v) elif isinstance(v, dict): for x in v.values(): if isinstance(x, str): blob_parts.append(x) full = " ".join(blob_parts) citations = re.findall(r"«\s*([^»]{6,80})\s*»", full) dpi_lower = dpi.lower() for c in citations[:20]: # limite # tolérance : on cherche un sous-fragment de 8+ caractères if not any(c.lower()[i:i+12] in dpi_lower for i in range(0, max(1, len(c) - 12), 4)): out.append(c.strip()) return out def analyze(mode_label: str, path: Path, dpis: dict[str, str]) -> dict: if not path.is_file(): print(f"⚠ Fichier manquant : {path}") return {} data = json.loads(path.read_text(encoding="utf-8")) results = data["results"] model = data["model"] n_runs = data["runs"] rows = [] correct_total = 0; total_runs = 0 for ipp, label, gt, ftype in GT: runs = results.get(ipp, []) decisions = [r.get("decision") for r in runs] type_forfaits = [r.get("type_forfait") for r in runs] match = sum(1 for r in runs if r.get("match")) total_runs += len(runs) correct_total += match maj = majority(decisions) # type_forfait majoritaire (ignoré si UHCD attendu) type_maj = Counter([t for t in type_forfaits if t]).most_common(1) type_maj_str = type_maj[0][0] if type_maj else "—" # Qualité moyenne sur les 3 runs qscores = [] all_notes = [] halluc_total = [] for r in runs: raw = r.get("raw", {}) s, notes = quality_score(raw, ipp, gt, mode_label) qscores.append(s) all_notes.append(notes) halluc = hallucination_check(raw, dpis.get(ipp, "")) halluc_total.extend(halluc) rows.append({ "ipp": ipp, "label": label, "gt": gt, "gt_short": short(gt), "ftype": ftype, "decisions": decisions, "decisions_short": [short(d) for d in decisions], "majority": short(maj), "majority_match": maj == gt, "type_forfait_maj": type_maj_str, "type_forfait_match": (gt == "REQUALIFICATION_HOSPITALISATION") or (type_maj_str == ftype), "stable": len(set(decisions)) == 1, "match_runs": match, "litigieux": ipp in LITIGIEUX, "quality_avg": round(sum(qscores) / max(1, len(qscores)), 1), "quality_max": max(qscores) if qscores else 0, "quality_notes_first": all_notes[0] if all_notes else [], "hallucinations": halluc_total[:5], }) # Stats globales n_dossiers = len(rows) accuracy_runs = correct_total / max(1, total_runs) accuracy_majority = sum(1 for r in rows if r["majority_match"]) / n_dossiers uhcd_rows = [r for r in rows if r["gt"] == "REQUALIFICATION_HOSPITALISATION"] forf_rows = [r for r in rows if r["gt"] == "FORFAIT_URGENCE"] uhcd_acc_majority = sum(1 for r in uhcd_rows if r["majority_match"]) / max(1, len(uhcd_rows)) forf_acc_majority = sum(1 for r in forf_rows if r["majority_match"]) / max(1, len(forf_rows)) stability = sum(1 for r in rows if r["stable"]) / n_dossiers litigieux_acc = sum(1 for r in rows if r["litigieux"] and r["majority_match"]) / max(1, len([r for r in rows if r["litigieux"]])) type_forfait_acc = sum(1 for r in rows if r["gt"] == "FORFAIT_URGENCE" and r["type_forfait_match"]) / max(1, len(forf_rows)) avg_quality = round(sum(r["quality_avg"] for r in rows) / n_dossiers, 2) n_halluc = sum(len(r["hallucinations"]) for r in rows) return { "mode": mode_label, "model": model, "n_runs": n_runs, "rows": rows, "accuracy_runs": round(accuracy_runs, 3), "accuracy_majority": round(accuracy_majority, 3), "uhcd_acc_majority": round(uhcd_acc_majority, 3), "forfait_acc_majority": round(forf_acc_majority, 3), "type_forfait_acc": round(type_forfait_acc, 3), "stability": round(stability, 3), "litigieux_acc": round(litigieux_acc, 3), "avg_quality": avg_quality, "n_hallucinations": n_halluc, } def print_table(report: dict): print(f"\n## {report['mode']} (model={report['model']}, {report['n_runs']} runs/dossier)\n") print(f"- Accuracy runs (3×11=33 inférences) : **{report['accuracy_runs']*100:.0f}%**") print(f"- Accuracy vote majoritaire (sur 11 dossiers) : **{report['accuracy_majority']*100:.0f}%**") print(f"- Accuracy UHCD (majoritaire) : {report['uhcd_acc_majority']*100:.0f}%") print(f"- Accuracy Forfait (majoritaire) : {report['forfait_acc_majority']*100:.0f}%") print(f"- Type forfait correct (parmi forfaits OK) : {report['type_forfait_acc']*100:.0f}%") print(f"- Stabilité inter-runs : {report['stability']*100:.0f}%") print(f"- Cas litigieux OK : {report['litigieux_acc']*100:.0f}%") print(f"- Qualité justification moyenne : **{report['avg_quality']}/5**") print(f"- Hallucinations citations : {report['n_hallucinations']}") print() print("| IPP | Cas | GT | Run1 | Run2 | Run3 | Maj | Stable | Type | Qual |") print("|---|---|:---:|:---:|:---:|:---:|:---:|:---:|:---:|:---:|") for r in report["rows"]: runs = r["decisions_short"] + ["—"] * (3 - len(r["decisions_short"])) stable = "✓" if r["stable"] else " " ftype = r["type_forfait_maj"] if r["gt"] == "FORFAIT_URGENCE" else "—" ftype_mark = "" if r["gt"] == "REQUALIFICATION_HOSPITALISATION" else (" ✓" if r["type_forfait_match"] else " ✗") flag = "✓" if r["majority_match"] else "✗" litig = " 🔴" if r["litigieux"] else "" print(f"| {r['ipp']} | {r['label']}{litig} | {r['gt_short']} | " f"{runs[0]} | {runs[1]} | {runs[2]} | {flag} {r['majority']} | {stable} | " f"{ftype}{ftype_mark} | {r['quality_avg']}/5 |") def print_delta(baseline: dict, postfix: dict): print("\n## Δ Baseline → Post-fix\n") print("| IPP | Cas | GT | Baseline | Post-fix | Δ |") print("|---|---|:---:|:---:|:---:|:---:|") for b, p in zip(baseline["rows"], postfix["rows"]): b_flag = "✓" if b["majority_match"] else "✗" p_flag = "✓" if p["majority_match"] else "✗" if b["majority_match"] and p["majority_match"]: delta = "= ✓" elif not b["majority_match"] and p["majority_match"]: delta = "🟢 +1" elif b["majority_match"] and not p["majority_match"]: delta = "🔴 -1" else: delta = "= ✗" litig = " 🔴" if b["litigieux"] else "" print(f"| {b['ipp']} | {b['label']}{litig} | {b['gt_short']} | {b_flag} {b['majority']} | {p_flag} {p['majority']} | {delta} |") # Headlines print() print(f"**Synthèse Δ** :") print(f"- Baseline : {sum(1 for r in baseline['rows'] if r['majority_match'])}/11 → {baseline['accuracy_majority']*100:.0f}%") print(f"- Post-fix : {sum(1 for r in postfix['rows'] if r['majority_match'])}/11 → {postfix['accuracy_majority']*100:.0f}%") print(f"- Gain absolu : {(postfix['accuracy_majority'] - baseline['accuracy_majority'])*100:+.0f} points") print(f"- Stabilité : {baseline['stability']*100:.0f}% → {postfix['stability']*100:.0f}%") print(f"- Qualité justification : {baseline['avg_quality']}/5 → {postfix['avg_quality']}/5") def main(): dpis = json.loads((RES / "dpis.json").read_text(encoding="utf-8")) baseline = analyze("Baseline", RES / "bench_baseline.json", dpis) postfix = analyze("Post-fix", RES / "bench_postfix.json", dpis) if baseline: print_table(baseline) if postfix: print_table(postfix) if baseline and postfix: print_delta(baseline, postfix) # Sauve l'analyse complète out = RES / "analysis.json" out.write_text(json.dumps({"baseline": baseline, "postfix": postfix}, ensure_ascii=False, indent=2), encoding="utf-8") print(f"\n📁 {out}") if __name__ == "__main__": main()