#!/usr/bin/env python3 """Benchmark qualité T2A — validation end-to-end sur vrais dossiers. Compare la qualité des codes CIM-10, vetos, downgrades et CPAM entre runs successifs. Chaque run est sauvegardé dans un répertoire isolé pour permettre des comparaisons A/B. Usage: python scripts/benchmark_quality.py --n 10 python scripts/benchmark_quality.py --n 10 --compare RUN_ID python scripts/benchmark_quality.py --dossiers 116_23065570,45_23183041 python scripts/benchmark_quality.py --gold-standard """ from __future__ import annotations import argparse import json import os import shutil import subprocess import sys import time from datetime import datetime from pathlib import Path from statistics import mean, median ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) BENCHMARKS_DIR = ROOT / "output" / "benchmarks" GOLD_STANDARD_FILE = ROOT / "data" / "gold_standard" / "_selection.json" INPUT_DIR = ROOT / "input" OUTPUT_DIR = ROOT / "output" / "structured" PY = str(ROOT / ".venv" / "bin" / "python3") # --------------------------------------------------------------------------- # Sélection des dossiers # --------------------------------------------------------------------------- def _gold_standard_ids() -> list[str]: """Charge les IDs du gold standard.""" if not GOLD_STANDARD_FILE.exists(): print(f"ERREUR: {GOLD_STANDARD_FILE} introuvable") sys.exit(1) data = json.loads(GOLD_STANDARD_FILE.read_text("utf-8")) # Format: "116_23065570/116_23065570_fusionne_cim10" → on prend la partie avant / return [d.split("/")[0] for d in data["dossiers"]] def select_dossiers(n: int, gold_standard: bool, specific: list[str] | None, seed: int = 42) -> list[str]: """Sélectionne les dossiers à benchmarker.""" if specific: # Vérifier que les dossiers existent valid = [] for d in specific: if (INPUT_DIR / d).is_dir(): valid.append(d) else: print(f" WARN: dossier {d} introuvable dans input/") return valid if gold_standard: ids = _gold_standard_ids() return ids[:n] if n < len(ids) else ids # Sinon : prendre N dossiers depuis input/ (tri déterministe + seed pour reproductibilité) all_dirs = sorted( d.name for d in INPUT_DIR.iterdir() if d.is_dir() and any(d.glob("*.pdf")) ) if not all_dirs: print("ERREUR: aucun dossier avec PDF dans input/") sys.exit(1) import random rng = random.Random(seed) rng.shuffle(all_dirs) return all_dirs[:n] # --------------------------------------------------------------------------- # Exécution pipeline # --------------------------------------------------------------------------- def run_pipeline(dossier_id: str, clean: bool) -> tuple[float, bool]: """Exécute le pipeline sur un dossier. Retourne (durée_s, succès).""" input_path = INPUT_DIR / dossier_id if clean: for subdir in ["structured", "reports", "anonymized"]: target = ROOT / "output" / subdir / dossier_id if target.exists(): shutil.rmtree(target) t0 = time.time() try: result = subprocess.run( [PY, "-m", "src.main", str(input_path)], capture_output=True, text=True, cwd=str(ROOT), timeout=600, # 10 min max par dossier ) duration = time.time() - t0 if result.returncode != 0: print(f" STDERR: {result.stderr[-500:]}") return duration, False return duration, True except subprocess.TimeoutExpired: return time.time() - t0, False except Exception as e: print(f" EXCEPTION: {e}") return time.time() - t0, False # --------------------------------------------------------------------------- # Chargement dictionnaire CIM-10 # --------------------------------------------------------------------------- def load_cim10_dict() -> dict[str, str]: """Charge le dictionnaire CIM-10 (sans passer par le singleton).""" dict_path = ROOT / "data" / "cim10_dict.json" supp_path = ROOT / "data" / "cim10_supplements.json" d = {} if dict_path.exists(): d = json.loads(dict_path.read_text("utf-8")) if supp_path.exists(): for code, label in json.loads(supp_path.read_text("utf-8")).items(): d.setdefault(code, label) return d def normalize_code(code: str) -> str: """K810 → K81.0, k85.1 → K85.1.""" code = code.strip().upper() if len(code) > 3 and "." not in code: code = code[:3] + "." + code[3:] return code def is_valid_code(code: str, cim10: dict[str, str]) -> bool: """Vérifie si un code CIM-10 existe dans le dictionnaire.""" nc = normalize_code(code) return nc in cim10 or code.upper().strip() in cim10 # --------------------------------------------------------------------------- # Analyse d'un dossier # --------------------------------------------------------------------------- def find_merged_json(dossier_id: str) -> Path | None: """Trouve le JSON fusionné d'un dossier.""" d = OUTPUT_DIR / dossier_id if not d.exists(): return None # Chercher le fusionné d'abord fusions = list(d.glob("*fusionne_cim10.json")) if fusions: return fusions[0] # Sinon premier _cim10.json cim10s = list(d.glob("*_cim10.json")) return cim10s[0] if cim10s else None def analyze_dossier(dossier_id: str, cim10: dict[str, str], duration: float) -> dict: """Analyse le JSON de sortie d'un dossier et extrait les métriques.""" result = { "dossier_id": dossier_id, "processing_time_s": round(duration, 1), "success": False, } json_path = find_merged_json(dossier_id) if not json_path: return result try: data = json.loads(json_path.read_text("utf-8")) except (json.JSONDecodeError, OSError): return result result["success"] = True # --- DP --- dp = data.get("diagnostic_principal", {}) dp_code = dp.get("cim10_final") or dp.get("cim10_suggestion") or "" dp_suggestion = dp.get("cim10_suggestion") or "" result["dp"] = { "texte": (dp.get("texte") or "")[:80], "code_suggestion": dp_suggestion, "code_final": dp_code, "confidence": dp.get("cim10_confidence", ""), "has_code": bool(dp_code), "valid_code": is_valid_code(dp_code, cim10) if dp_code else False, "downgraded": bool(dp_code and dp_suggestion and dp_code != dp_suggestion), } # --- DAS --- das_list = data.get("diagnostics_associes", []) das_codes = [] das_conf = {"high": 0, "medium": 0, "low": 0} das_valid = 0 das_no_code = 0 das_downgraded = 0 for d_item in das_list: code = d_item.get("cim10_final") or d_item.get("cim10_suggestion") or "" suggestion = d_item.get("cim10_suggestion") or "" conf = d_item.get("cim10_confidence", "low") if not code: das_no_code += 1 continue das_codes.append(code) if conf in das_conf: das_conf[conf] += 1 if is_valid_code(code, cim10): das_valid += 1 if code and suggestion and code != suggestion: das_downgraded += 1 n_das_with_code = len(das_codes) result["das"] = { "total": len(das_list), "with_code": n_das_with_code, "no_code": das_no_code, "valid": das_valid, "validity_rate": round(das_valid / n_das_with_code, 3) if n_das_with_code else 0, "confidence": das_conf, "downgraded": das_downgraded, "downgrade_rate": round(das_downgraded / n_das_with_code, 3) if n_das_with_code else 0, "codes_uniques": sorted(set(das_codes)), } # --- Metrics du dossier --- metrics = data.get("metrics", {}) result["metrics"] = { "das_active": metrics.get("das_active", 0), "das_removed": metrics.get("das_removed", 0), "das_ruled_out": metrics.get("das_ruled_out", 0), } # --- Veto --- veto = data.get("veto_report", {}) issues = veto.get("issues", []) result["veto"] = { "verdict": veto.get("verdict", "NO_REPORT"), "score": veto.get("score_contestabilite", 0), "issues_count": len(issues), "hard_count": sum(1 for i in issues if i.get("severity") == "HARD"), "top_issues": [i.get("veto", i.get("type", "?")) for i in issues[:5]], } # --- GHM --- ghm = data.get("ghm_estimation") result["ghm"] = { "estimated": ghm is not None and bool(ghm), "cmd": ghm.get("cmd") if ghm else None, "severity": ghm.get("severity") if ghm else None, "ghm": ghm.get("ghm") if ghm else None, } # --- CPAM --- cpam = data.get("controles_cpam", []) result["cpam"] = { "controls_count": len(cpam), "has_response": any(bool(c.get("contre_argumentation")) for c in cpam), "sources_count": sum(len(c.get("sources_reponse", [])) for c in cpam), } # --- Biologie --- bio = data.get("biologie_cle", []) result["biologie"] = { "tests_count": len(bio), "anomalies": sum(1 for b in bio if b.get("anomalie")), } # --- Codes CIM-10 invalides (détail) --- invalid_codes = [] if dp_code and not is_valid_code(dp_code, cim10): invalid_codes.append(f"DP:{dp_code}") for code in das_codes: if not is_valid_code(code, cim10): invalid_codes.append(f"DAS:{code}") result["invalid_codes"] = invalid_codes return result # --------------------------------------------------------------------------- # Agrégation # --------------------------------------------------------------------------- def compute_aggregate(per_dossier: list[dict]) -> dict: """Calcule les métriques agrégées sur tous les dossiers.""" successful = [d for d in per_dossier if d.get("success")] n = len(successful) if n == 0: return {"n_total": len(per_dossier), "n_success": 0} # DP dp_has_code = sum(1 for d in successful if d["dp"]["has_code"]) dp_valid = sum(1 for d in successful if d["dp"]["valid_code"]) dp_conf = {"high": 0, "medium": 0, "low": 0} for d in successful: c = d["dp"]["confidence"] if c in dp_conf: dp_conf[c] += 1 dp_downgraded = sum(1 for d in successful if d["dp"]["downgraded"]) # DAS total_das = sum(d["das"]["total"] for d in successful) total_das_with_code = sum(d["das"]["with_code"] for d in successful) total_das_valid = sum(d["das"]["valid"] for d in successful) total_das_downgraded = sum(d["das"]["downgraded"] for d in successful) das_conf_agg = {"high": 0, "medium": 0, "low": 0} for d in successful: for k in das_conf_agg: das_conf_agg[k] += d["das"]["confidence"].get(k, 0) # Veto verdicts = {} total_hard = 0 for d in successful: v = d["veto"]["verdict"] verdicts[v] = verdicts.get(v, 0) + 1 total_hard += d["veto"]["hard_count"] # GHM ghm_estimated = sum(1 for d in successful if d["ghm"]["estimated"]) # CPAM cpam_total = sum(d["cpam"]["controls_count"] for d in successful) cpam_with_response = sum(1 for d in successful if d["cpam"]["has_response"]) # Temps times = [d["processing_time_s"] for d in successful] times_sorted = sorted(times) p90_idx = int(len(times_sorted) * 0.9) # Codes invalides all_invalid = [] for d in successful: all_invalid.extend(d.get("invalid_codes", [])) return { "n_total": len(per_dossier), "n_success": n, "n_failed": len(per_dossier) - n, "dp": { "has_code_rate": round(dp_has_code / n, 3), "valid_code_rate": round(dp_valid / n, 3), "confidence": dp_conf, "downgraded": dp_downgraded, }, "das": { "total": total_das, "mean_per_dossier": round(total_das / n, 1), "with_code": total_das_with_code, "valid": total_das_valid, "validity_rate": round(total_das_valid / total_das_with_code, 3) if total_das_with_code else 0, "confidence": das_conf_agg, "confidence_high_rate": round(das_conf_agg["high"] / total_das_with_code, 3) if total_das_with_code else 0, "downgraded": total_das_downgraded, "downgrade_rate": round(total_das_downgraded / total_das_with_code, 3) if total_das_with_code else 0, }, "veto": { "verdicts": verdicts, "hard_total": total_hard, "dossiers_with_hard": sum(1 for d in successful if d["veto"]["hard_count"] > 0), }, "ghm": { "estimated_rate": round(ghm_estimated / n, 3), }, "cpam": { "controls_total": cpam_total, "with_response": cpam_with_response, }, "timing": { "mean_s": round(mean(times), 1), "median_s": round(median(times), 1), "p90_s": round(times_sorted[p90_idx], 1) if times_sorted else 0, "total_s": round(sum(times), 1), }, "invalid_codes": all_invalid, "invalid_codes_count": len(all_invalid), } # --------------------------------------------------------------------------- # Rapport texte # --------------------------------------------------------------------------- def _pct(val: float) -> str: return f"{val * 100:.1f}%" def _bar(val: float, width: int = 20) -> str: filled = int(val * width) return "█" * filled + "░" * (width - filled) def generate_report(run_id: str, config: dict, agg: dict, per_dossier: list[dict]) -> str: """Génère un rapport lisible.""" lines = [] w = 66 lines.append("=" * w) lines.append(f" BENCHMARK QUALITÉ T2A — {run_id}") lines.append("=" * w) lines.append(f" Date : {config['timestamp']}") lines.append(f" Modèles : coding={config['models'].get('coding','?')} cpam={config['models'].get('cpam','?')}") lines.append(f" validation={config['models'].get('validation','?')} qc={config['models'].get('qc','?')}") lines.append(f" Dossiers : {agg['n_success']}/{agg['n_total']} traités ({agg.get('n_failed',0)} échecs)") lines.append(f" Durée : {agg['timing']['total_s']:.0f}s ({agg['timing']['mean_s']:.1f}s/dossier)") lines.append("-" * w) # DP dp = agg["dp"] lines.append("") lines.append(" DIAGNOSTIC PRINCIPAL (DP)") lines.append(f" Code obtenu : {_bar(dp['has_code_rate'])} {_pct(dp['has_code_rate'])}") lines.append(f" Code CIM-10 valide : {_bar(dp['valid_code_rate'])} {_pct(dp['valid_code_rate'])}") lines.append(f" Confiance high : {dp['confidence'].get('high',0)}/{agg['n_success']} " f"medium: {dp['confidence'].get('medium',0)} low: {dp['confidence'].get('low',0)}") lines.append(f" Downgrades : {dp['downgraded']}") # DAS das = agg["das"] lines.append("") lines.append(" DIAGNOSTICS ASSOCIÉS (DAS)") lines.append(f" Total : {das['total']} (moy {das['mean_per_dossier']}/dossier)") lines.append(f" Avec code : {das['with_code']}/{das['total']}") lines.append(f" Codes valides : {_bar(das['validity_rate'])} {_pct(das['validity_rate'])}") lines.append(f" Confiance : high={das['confidence']['high']} " f"medium={das['confidence']['medium']} low={das['confidence']['low']}") lines.append(f" Confiance high : {_bar(das['confidence_high_rate'])} {_pct(das['confidence_high_rate'])}") lines.append(f" Downgrades : {das['downgraded']} ({_pct(das['downgrade_rate'])})") # Veto veto = agg["veto"] lines.append("") lines.append(" VETOS / QUALITÉ") for v, count in sorted(veto["verdicts"].items(), key=lambda x: -x[1]): lines.append(f" {v:12s} : {count}") lines.append(f" Issues HARD : {veto['hard_total']} (dans {veto['dossiers_with_hard']} dossiers)") # GHM lines.append("") lines.append(" GHM") lines.append(f" Estimé : {_bar(agg['ghm']['estimated_rate'])} {_pct(agg['ghm']['estimated_rate'])}") # CPAM if agg["cpam"]["controls_total"] > 0: lines.append("") lines.append(" CPAM") lines.append(f" Contrôles : {agg['cpam']['controls_total']}") lines.append(f" Avec réponse : {agg['cpam']['with_response']}") # Temps lines.append("") lines.append(" TEMPS DE TRAITEMENT") lines.append(f" Moyen : {agg['timing']['mean_s']:.1f}s") lines.append(f" Médian : {agg['timing']['median_s']:.1f}s") lines.append(f" P90 : {agg['timing']['p90_s']:.1f}s") lines.append(f" Total : {agg['timing']['total_s']:.0f}s") # Codes invalides if agg["invalid_codes"]: lines.append("") lines.append(f" CODES CIM-10 INVALIDES ({agg['invalid_codes_count']})") for code in agg["invalid_codes"][:20]: lines.append(f" {code}") if agg["invalid_codes_count"] > 20: lines.append(f" ... et {agg['invalid_codes_count'] - 20} autres") # Détail par dossier lines.append("") lines.append("-" * w) lines.append(" DÉTAIL PAR DOSSIER") lines.append("-" * w) lines.append(f" {'Dossier':<25s} {'DP':>6s} {'DAS':>4s} {'Valid%':>7s} {'Veto':>10s} {'Temps':>6s}") lines.append(f" {'-'*25:<25s} {'-'*6:>6s} {'-'*4:>4s} {'-'*7:>7s} {'-'*10:>10s} {'-'*6:>6s}") for d in sorted(per_dossier, key=lambda x: x["dossier_id"]): if not d.get("success"): lines.append(f" {d['dossier_id']:<25s} {'ÉCHEC':>6s}") continue dp_code = d["dp"]["code_final"] or "-" dp_mark = "✓" if d["dp"]["valid_code"] else "✗" n_das = d["das"]["total"] vr = f"{d['das']['validity_rate']*100:.0f}%" if d["das"]["with_code"] else "-" verdict = d["veto"]["verdict"] t = f"{d['processing_time_s']:.0f}s" lines.append(f" {d['dossier_id']:<25s} {dp_code:>5s}{dp_mark} {n_das:>4d} {vr:>7s} {verdict:>10s} {t:>6s}") lines.append("") lines.append("=" * w) return "\n".join(lines) # --------------------------------------------------------------------------- # Comparaison entre runs # --------------------------------------------------------------------------- def compare_runs(current_agg: dict, baseline_agg: dict, baseline_id: str) -> str: """Compare deux runs et génère un rapport diff.""" lines = [] w = 66 lines.append("") lines.append("=" * w) lines.append(f" COMPARAISON avec {baseline_id}") lines.append("=" * w) def _delta(cur: float, base: float, is_pct: bool = True) -> str: d = cur - base sign = "+" if d >= 0 else "" if is_pct: return f"{sign}{d*100:.1f}%" return f"{sign}{d:.1f}" def _row(label: str, cur_val: float, base_val: float, is_pct: bool = True): if is_pct: cur_s = _pct(cur_val) base_s = _pct(base_val) else: cur_s = f"{cur_val:.1f}" base_s = f"{base_val:.1f}" delta_s = _delta(cur_val, base_val, is_pct) lines.append(f" {label:<24s} {base_s:>10s} {cur_s:>10s} {delta_s:>10s}") lines.append(f" {'Métrique':<24s} {'Baseline':>10s} {'Actuel':>10s} {'Delta':>10s}") lines.append(f" {'-'*24:<24s} {'-'*10:>10s} {'-'*10:>10s} {'-'*10:>10s}") _row("DP code valide", current_agg["dp"]["valid_code_rate"], baseline_agg["dp"]["valid_code_rate"]) _row("DAS validité", current_agg["das"]["validity_rate"], baseline_agg["das"]["validity_rate"]) _row("DAS confiance high", current_agg["das"]["confidence_high_rate"], baseline_agg["das"]["confidence_high_rate"]) _row("DAS downgrade", current_agg["das"]["downgrade_rate"], baseline_agg["das"]["downgrade_rate"]) _row("GHM estimé", current_agg["ghm"]["estimated_rate"], baseline_agg["ghm"]["estimated_rate"]) _row("DAS moy/dossier", current_agg["das"]["mean_per_dossier"], baseline_agg["das"]["mean_per_dossier"], is_pct=False) _row("Temps moyen (s)", current_agg["timing"]["mean_s"], baseline_agg["timing"]["mean_s"], is_pct=False) # Codes invalides cur_inv = set(current_agg.get("invalid_codes", [])) base_inv = set(baseline_agg.get("invalid_codes", [])) new_inv = cur_inv - base_inv fixed_inv = base_inv - cur_inv if new_inv: lines.append(f"\n Nouveaux codes invalides : {', '.join(sorted(new_inv))}") if fixed_inv: lines.append(f" Codes corrigés : {', '.join(sorted(fixed_inv))}") lines.append("=" * w) return "\n".join(lines) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def get_current_config() -> dict: """Récupère la configuration modèle actuelle.""" try: from src.config import OLLAMA_MODELS, OLLAMA_MODEL, OLLAMA_URL return { "models": dict(OLLAMA_MODELS), "ollama_model": OLLAMA_MODEL, "ollama_url": OLLAMA_URL, } except ImportError: return { "models": { "coding": os.environ.get("T2A_MODEL_CODING", "?"), "cpam": os.environ.get("T2A_MODEL_CPAM", "?"), "validation": os.environ.get("T2A_MODEL_VALIDATION", "?"), "qc": os.environ.get("T2A_MODEL_QC", "?"), }, "ollama_model": os.environ.get("OLLAMA_MODEL", "?"), } def main(): parser = argparse.ArgumentParser(description="Benchmark qualité T2A") parser.add_argument("--n", type=int, default=10, help="Nombre de dossiers") parser.add_argument("--dossiers", type=str, help="IDs séparés par des virgules") parser.add_argument("--gold-standard", action="store_true", help="Utiliser les 50 dossiers gold standard") parser.add_argument("--compare", type=str, help="Run ID à comparer") parser.add_argument("--label", type=str, default="", help="Label pour ce run") parser.add_argument("--no-reprocess", action="store_true", help="Analyser les outputs existants sans relancer le pipeline") parser.add_argument("--clean", action="store_true", help="Supprimer les outputs avant retraitement") parser.add_argument("--seed", type=int, default=42, help="Seed pour la sélection aléatoire") parser.add_argument("--workers", type=int, default=1, help="Nombre de dossiers traités en parallèle") args = parser.parse_args() # Sélection dossiers specific = args.dossiers.split(",") if args.dossiers else None dossiers = select_dossiers(args.n, args.gold_standard, specific, args.seed) print(f"\n Dossiers sélectionnés : {len(dossiers)}") for d in dossiers: print(f" - {d}") # Config config = get_current_config() run_id = datetime.now().strftime("%Y%m%d_%H%M%S") if args.label: run_id = f"{run_id}_{args.label}" config["timestamp"] = datetime.now().isoformat() config["run_id"] = run_id config["dossiers"] = dossiers config["args"] = { "n": args.n, "gold_standard": args.gold_standard, "clean": args.clean, "no_reprocess": args.no_reprocess, "seed": args.seed, "label": args.label, } print(f"\n Run ID : {run_id}") print(f" Modèles : {config['models']}") print(f" Reprocess: {'NON' if args.no_reprocess else 'OUI (clean=' + str(args.clean) + ')'}") print() # Charger dictionnaire CIM-10 cim10 = load_cim10_dict() print(f" Dictionnaire CIM-10 : {len(cim10)} codes") print() # Traitement per_dossier = [] total = len(dossiers) if args.workers > 1 and not args.no_reprocess: # Mode parallèle : exécuter les pipelines en parallèle puis analyser from concurrent.futures import ThreadPoolExecutor, as_completed print(f" Mode parallèle : {args.workers} workers") pipeline_results: dict[str, tuple[float, bool]] = {} done = 0 with ThreadPoolExecutor(max_workers=args.workers) as executor: futures = { executor.submit(run_pipeline, dossier_id, args.clean): dossier_id for dossier_id in dossiers } for future in as_completed(futures): dossier_id = futures[future] try: duration, success = future.result() except Exception as e: print(f" EXCEPTION {dossier_id}: {e}") duration, success = 0.0, False pipeline_results[dossier_id] = (duration, success) done += 1 mark = "✓" if success else "✗" print(f" [{done}/{total}] {dossier_id} — {duration:.1f}s {mark}") # Analyse séquentielle (ordre stable) for dossier_id in dossiers: duration, success = pipeline_results[dossier_id] metrics = analyze_dossier(dossier_id, cim10, duration) per_dossier.append(metrics) else: # Mode séquentiel (ou --no-reprocess) for i, dossier_id in enumerate(dossiers, 1): print(f" [{i}/{total}] {dossier_id}", end="", flush=True) if args.no_reprocess: duration = 0.0 success = find_merged_json(dossier_id) is not None if not success: print(" — pas de JSON") else: print(" — analyse existant") else: print(" — traitement...", end="", flush=True) duration, success = run_pipeline(dossier_id, args.clean) print(f" {duration:.1f}s {'✓' if success else '✗'}") metrics = analyze_dossier(dossier_id, cim10, duration) per_dossier.append(metrics) # Agrégation agg = compute_aggregate(per_dossier) # Rapport report = generate_report(run_id, config, agg, per_dossier) print(report) # Comparaison si demandée comparison = "" if args.compare: baseline_path = BENCHMARKS_DIR / args.compare / "metrics.json" if baseline_path.exists(): baseline = json.loads(baseline_path.read_text("utf-8")) comparison = compare_runs(agg, baseline["aggregate"], args.compare) print(comparison) else: print(f"\n WARN: run baseline {args.compare} introuvable ({baseline_path})") # Sauvegarde run_dir = BENCHMARKS_DIR / run_id run_dir.mkdir(parents=True, exist_ok=True) (run_dir / "config.json").write_text( json.dumps(config, ensure_ascii=False, indent=2), encoding="utf-8" ) (run_dir / "metrics.json").write_text( json.dumps({"aggregate": agg, "per_dossier": per_dossier}, ensure_ascii=False, indent=2), encoding="utf-8", ) (run_dir / "report.txt").write_text(report + comparison, encoding="utf-8") print(f"\n Résultats sauvegardés dans : {run_dir}") print(f" Pour comparer un futur run : python scripts/benchmark_quality.py --compare {run_id}") if __name__ == "__main__": main()