diff --git a/core/llm/t2a_mappings.py b/core/llm/t2a_mappings.py new file mode 100644 index 000000000..d87164b0f --- /dev/null +++ b/core/llm/t2a_mappings.py @@ -0,0 +1,29 @@ +"""Mappings métier pour la décision T2A (libellé terrain → catégorie T2A). + +Table validée par Dom le 12 mai 2026 sur les 11 dossiers POC GHT Sud 95. +Module partagé : importé par `_handle_t2a_decision_action` (garde-fou serveur, +commit 2) ET par `scripts/bench_t2a_dryrun.py` (mini-bench standalone). + +Cas spéciaux non mappés volontairement : +- "Retour structure d'origine" : à arbitrer cliniquement (email DIM + Pauline/Amina en cours). Loguera `Libellé terrain non mappé` sans erreur. +- "" (chaîne vide) : dossier en attente. Skip en amont par l'appelant. + +Si un nouveau libellé apparaît hors POC actuel (déploiement futur autre +établissement), le bench le logguera `Libellé terrain non mappé` et il sera +ajouté ici après validation Dom. + +Voir docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md. +""" + +from __future__ import annotations + +from typing import Dict + + +TERRAIN_VERS_T2A: Dict[str, str] = { + "Consultation externe": "FORFAIT_URGENCE", + "Hospitalisation": "REQUALIFICATION_HOSPITALISATION", + "Sortie après surveillance UHCD": "REQUALIFICATION_HOSPITALISATION", + "Transfert intra-hospitalier": "REQUALIFICATION_HOSPITALISATION", +} diff --git a/scripts/bench_t2a_dryrun.py b/scripts/bench_t2a_dryrun.py new file mode 100644 index 000000000..f42ed484d --- /dev/null +++ b/scripts/bench_t2a_dryrun.py @@ -0,0 +1,686 @@ +"""Mini-bench standalone T2A — 11 dossiers POC GHT Sud 95. + +Exécute `build_dpi_enriched` + appel LLM sur chaque dossier de `data.js`, +sans passer par Demo_urgence_2 ni Léa/Windows. Mesure la convergence +durée Python ↔ LLM et décision terrain ↔ LLM. + +Voir docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md. + +Usage : + T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py + T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py --skip-seed-check + T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py --ipp 25003284 # 1 seul + +Logs : + logs/t2a_dryrun/_.log (4 traces structurées par dossier) + scripts/bench_t2a_dryrun_.csv (récap pour analyse) +""" + +from __future__ import annotations + +import argparse +import csv +import html +import json +import os +import re +import subprocess +import sys +import time +import urllib.error +import urllib.request +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(ROOT)) + +from core.llm.t2a_decision import build_dpi_enriched, PROMPT_TEMPLATE # noqa: E402 +from core.llm.t2a_mappings import TERRAIN_VERS_T2A # noqa: E402 + + +# ── Configuration ─────────────────────────────────────────────────────────── + +DATA_JS_PATH = ROOT / "docs" / "clients" / "ght_sud_95" / "mockup_easily_assure" / "data.js" +LOG_DIR = ROOT / "logs" / "t2a_dryrun" +SCRIPTS_DIR = ROOT / "scripts" + +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434/api/generate") +MODEL = os.environ.get("T2A_BENCH_MODEL", "gemma4:31b-cloud") +SEED = 42 +TEMPERATURE = 0.0 +NUM_CTX = 16384 +NUM_PREDICT = 1500 +TIMEOUT_SEC = 180 + +SEUIL_TOLERANCE_HEURES = 0.5 # ≈ 30 min, distingue hallucination grossière + + +# ── Parsing data.js ───────────────────────────────────────────────────────── + + +def parse_data_js(path: Path) -> Dict[str, Dict[str, Any]]: + """Parse data.js via node (vm.runInContext) → dict {IPP: dossier}.""" + if not path.exists(): + raise FileNotFoundError(f"data.js introuvable : {path}") + + script = f""" +const fs = require('fs'); +const vm = require('vm'); +const src = fs.readFileSync({json.dumps(str(path))}, 'utf8'); +const ctx = {{}}; +vm.createContext(ctx); +vm.runInContext(src + '\\nthis.DOSSIERS = DOSSIERS;', ctx); +process.stdout.write(JSON.stringify(ctx.DOSSIERS)); +""" + result = subprocess.run( + ["node", "-e", script], + capture_output=True, + text=True, + timeout=15, + ) + if result.returncode != 0: + raise RuntimeError(f"node parse failed: {result.stderr}") + return json.loads(result.stdout) + + +# ── Construction dpi_raw plat (simulation OCR scroll auto) ───────────────── + + +_HTML_TAG_RE = re.compile(r"<[^>]+>") + + +def strip_html(text: Any) -> str: + """Retire les balises HTML simples et déspécifie les entités.""" + if text is None: + return "" + s = str(text) + s = _HTML_TAG_RE.sub("", s) + return html.unescape(s).strip() + + +def _bandeau(dossier: Dict[str, Any]) -> str: + """Reconstruit le bandeau Easily Assure (header fixe répété par onglet).""" + ident = dossier.get("identite", {}) or {} + pas = dossier.get("passage", {}) or {} + parts = [ + f"IPP : {ident.get('ipp', '')}", + f"{ident.get('nom', '')} {ident.get('prenom', '')}".strip(), + f"Né(e) le {ident.get('ne_le', '')}", + f"{ident.get('age', '')}", + f"Sexe : {ident.get('sexe', '')}", + f"Arrivée : {pas.get('arrivee', '')}", + f"IAO : {pas.get('iao', '')} ({pas.get('iao_heure', '')})", + f"Médecin : {pas.get('medecin', '')}", + f"Sortie : {pas.get('sortie', '')}" if pas.get('sortie') else "", + ] + return " | ".join(p for p in parts if p) + + +def _section_synthese(syn: Dict[str, Any]) -> str: + """Reproduit le bloc Synthèse Urgences au format LIBELLÉ VALEUR.""" + if not syn: + return "" + + def f(label: str, value: Any) -> str: + v = strip_html(value).strip() if value not in (None, "") else "" + return f"{label} {v}" if v else label + + def fdt(label: str, date: Any, heure: Any) -> str: + if date and heure: + return f"{label} {date} à {heure}" + if date: + return f"{label} {date}" + return label + + lignes = [ + "Synthèse Urgences", + "", + "Détails de l'épisode", + fdt("Episode - Date", syn.get("episode_date"), syn.get("episode_heure")), + f("Mode de transport à l'arrivée", syn.get("mode_transport")), + f("Médicalisation du transport", syn.get("medicalisation_transport")), + f("Mode d'entrée", syn.get("mode_entree")), + f("Origine du transfert", syn.get("origine_transfert")), + "", + "Détails de l'orientation aux Urgences", + fdt("Date d'orientation", syn.get("orientation_date"), syn.get("orientation_heure")), + f("IAO", syn.get("iao")), + f("Priorité", syn.get("priorite")), + f("Episode - Sous-type", syn.get("sous_type")), + f("Circonstances", syn.get("circonstances")), + f("Motif de prise en charge", syn.get("motif_pec")), + f("Observ. IDE Urg", syn.get("obs_ide_urg")), + "", + "Détails de la prise en charge", + f("Médecin de la prise en charge médicale", syn.get("medecin_pec")), + fdt("Date de la prise en charge médicale", syn.get("pec_date"), syn.get("pec_heure")), + f("CCMU", syn.get("ccmu")), + f("GEMSA", syn.get("gemsa")), + f("Diagnostics", syn.get("diagnostics_synthese")), + "", + "Décision médicale", + f("Médecin de la décision médicale", syn.get("medecin_decision")), + fdt("Date de décision médicale", syn.get("decision_date"), syn.get("decision_heure")), + f("Décision médicale", syn.get("decision")), + f("Orientation du patient", syn.get("orientation")), + f("US de destination", syn.get("us_destination")), + ] + return "\n".join(lignes) + + +def build_dpi_raw_from_dossier(dossier: Dict[str, Any]) -> str: + """Construit un dpi_raw plat simulant la sortie OCR scroll auto (5 onglets, + bandeau répété en tête de chaque). NE bypasse PAS build_dpi_enriched — + le résultat est un texte plat que build_dpi_enriched re-parsera. + """ + bandeau = _bandeau(dossier) + motif = dossier.get("motif", {}) or {} + examens = dossier.get("examens", {}) or {} + notes_med = dossier.get("notes_medicales", []) or [] + syn = dossier.get("synthese", {}) or {} + + # Onglet 1 : Motif & IDE + diags = motif.get("diagnostics", []) or [] + diag_principal = diags[0].get("code") if diags else "" + onglet1 = [ + bandeau, + "", + "Motif d'admission :", + f"Motif : {motif.get('symptomes_orientation', '')}".strip(), + "", + "Observ. IDE :", + strip_html(motif.get("obs_ide", "")), + "", + f"Diagnostic principal : {diag_principal}" if diag_principal else "", + ] + + # Onglet 2 : Examens cliniques + questionnaires = examens.get("questionnaires", []) or [] + notes_para = examens.get("notes_paramedicales", []) or [] + onglet2 = [bandeau, "", "Examens cliniques :"] + for q in questionnaires: + onglet2.append(f"--- {q.get('nom', '')} ---") + onglet2.append(strip_html(q.get("reponse", ""))) + for n in notes_para: + onglet2.append(f"--- {n.get('type', '')} ({n.get('par', '')}) ---") + onglet2.append(strip_html(n.get("contenu", ""))) + + # Onglet 3 : Imagerie (extrait depuis notes_medicales si présent) + imagerie_notes = [ + n for n in notes_med + if "imagerie" in str(n.get("type", "")).lower() + or "RESULTATS" in str(n.get("contenu", "")) + or "radiograph" in str(n.get("contenu", "")).lower() + ] + onglet3 = [bandeau, "", "Imagerie :"] + if imagerie_notes: + for n in imagerie_notes: + onglet3.append(strip_html(n.get("contenu", ""))) + else: + onglet3.append("(pas d'imagerie)") + + # Onglet 4 : Notes médicales (toutes, sauf imagerie déjà mise au-dessus) + onglet4 = [bandeau, "", "Notes médicales :"] + for n in notes_med: + onglet4.append(f"--- {n.get('type', '')} ({n.get('date', '')}) ---") + onglet4.append(strip_html(n.get("contenu", ""))) + + # Onglet 5 : Synthèse Urgences + onglet5 = [bandeau, "", _section_synthese(syn)] + + return "\n".join( + line for section in (onglet1, onglet2, onglet3, onglet4, onglet5) + for line in section + ) + + +# ── Filet data_quality_warning ───────────────────────────────────────────── + + +def check_data_quality(dossier: Dict[str, Any]) -> List[str]: + """Détecte des incohérences cliniques flagrantes (filet, pas exhaustif). + Ne corrige rien — signale uniquement, pour ne pas confondre une divergence + LLM↔terrain avec un défaut du data.js (anonymisation v1 incertaine). + """ + warnings: List[str] = [] + ident = dossier.get("identite", {}) or {} + motif = dossier.get("motif", {}) or {} + syn = dossier.get("synthese", {}) or {} + + # Cohérence âge déclaré vs date de naissance + date du passage + ne_le = ident.get("ne_le", "") + age_declare = ident.get("age", "") + if ne_le and age_declare: + m_age = re.match(r"(\d+)", age_declare) + if m_age: + try: + naissance = datetime.strptime(ne_le, "%d/%m/%Y") + arrivee_str = (dossier.get("passage", {}) or {}).get("arrivee", "") + if arrivee_str: + arrivee = datetime.strptime( + arrivee_str.split()[0], "%d/%m/%Y" + ) + age_calcule = (arrivee - naissance).days // 365 + age_dit = int(m_age.group(1)) + if abs(age_calcule - age_dit) > 1: + warnings.append( + f"âge incohérent : déclaré {age_dit} ans vs " + f"calculé ~{age_calcule} ans depuis {ne_le}" + ) + except ValueError: + pass + + # Cohérence motif court vs diagnostic principal (rapide) + motif_court = (dossier.get("passage", {}) or {}).get("motif_court", "").lower() + diags = motif.get("diagnostics", []) or [] + if motif_court and diags: + diag_code = str(diags[0].get("code", "")).lower() + # Détection grossière : motif "fracture" alors que diag pneumopathie, etc. + groupes_a_risque = [ + ("fracture", "pneumop"), + ("pneumop", "fracture"), + ("ophtalmo", "thora"), + ] + for m_key, d_key in groupes_a_risque: + if m_key in motif_court and d_key in diag_code: + warnings.append( + f"motif '{motif_court}' incohérent avec diagnostic " + f"principal '{diags[0].get('code', '')}'" + ) + break + + # Décision terrain vide + if syn and not syn.get("decision"): + warnings.append("synthese.decision vide (dossier en cours ?)") + + return warnings + + +# ── Appel LLM déterministe (temp=0, seed=42) ─────────────────────────────── + + +def call_llm_deterministic( + dpi_enriched: str, + model: str = MODEL, + seed: int = SEED, +) -> Tuple[Optional[Dict[str, Any]], Dict[str, Any], float, Optional[str]]: + """Appel Ollama avec température 0 + seed fixé pour reproductibilité. + + Returns: + (parsed_json, raw_response_dict, elapsed_s, error_msg) + parsed_json est None si parse_error. + raw_response_dict est le body Ollama brut (avec eval_count, etc.). + error_msg non-None si exception HTTP / timeout / parse. + """ + prompt = PROMPT_TEMPLATE.format(dpi=dpi_enriched) + payload = { + "model": model, + "prompt": prompt, + "stream": False, + "format": "json", + "keep_alive": "5m", + "options": { + "temperature": TEMPERATURE, + "seed": seed, + "num_predict": NUM_PREDICT, + "num_ctx": NUM_CTX, + }, + } + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request( + OLLAMA_URL, + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + t0 = time.time() + try: + with urllib.request.urlopen(req, timeout=TIMEOUT_SEC) as resp: + body = json.loads(resp.read().decode("utf-8")) + except (urllib.error.URLError, TimeoutError, ConnectionError) as e: + return None, {}, round(time.time() - t0, 1), f"HTTP/Network: {e}" + + elapsed = round(time.time() - t0, 1) + raw_response = body.get("response", "").strip() + + try: + # Nettoyer ```json ... ``` éventuels + cleaned = raw_response + if cleaned.startswith("```"): + cleaned = cleaned.split("\n", 1)[-1] + if cleaned.endswith("```"): + cleaned = cleaned.rsplit("```", 1)[0] + cleaned = cleaned.strip() + parsed = json.loads(cleaned) + return parsed, body, elapsed, None + except json.JSONDecodeError as e: + return None, body, elapsed, f"parse_error: {e}" + + +# ── Logging des 4 traces structurées ─────────────────────────────────────── + + +def _serialize_for_log(obj: Any) -> Any: + """Convertit datetime en str pour JSON serialization.""" + if isinstance(obj, datetime): + return obj.isoformat() + if isinstance(obj, dict): + return {k: _serialize_for_log(v) for k, v in obj.items()} + if isinstance(obj, list): + return [_serialize_for_log(v) for v in obj] + return obj + + +def write_dryrun_log( + ipp: str, + metadata: Dict[str, Any], + prompt: str, + response_body: Dict[str, Any], + parsed: Optional[Dict[str, Any]], + error_msg: Optional[str], + quality_warnings: List[str], + elapsed_s: float, +) -> Path: + """Écrit les 4 traces structurées (markdown) dans logs/t2a_dryrun/.""" + LOG_DIR.mkdir(parents=True, exist_ok=True) + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + log_path = LOG_DIR / f"{ipp}_{ts}.log" + + sections = [] + sections.append(f"# Mini-bench T2A dry-run — IPP {ipp} — {ts}\n") + sections.append(f"Modèle : `{MODEL}` | Seed : `{SEED}` | Température : `{TEMPERATURE}`\n") + if quality_warnings: + sections.append("## data_quality_warning") + for w in quality_warnings: + sections.append(f"- {w}") + sections.append("") + + sections.append("## [t2a_dryrun_metadata]\n") + sections.append("```json") + sections.append(json.dumps(_serialize_for_log(metadata), ensure_ascii=False, indent=2)) + sections.append("```\n") + + sections.append("## [t2a_dryrun_prompt]\n") + sections.append("```") + sections.append(prompt) + sections.append("```\n") + + if error_msg: + sections.append("## [t2a_dryrun_error]\n") + sections.append(f"- Erreur : `{error_msg}`") + sections.append(f"- Latence : {elapsed_s}s") + if response_body: + sections.append("- Body partiel :") + sections.append("```json") + sections.append(json.dumps(response_body, ensure_ascii=False, indent=2)[:2000]) + sections.append("```") + else: + sections.append("## [t2a_dryrun_response]\n") + sections.append(f"- Modèle : {response_body.get('model', MODEL)}") + sections.append(f"- Latence totale : {elapsed_s}s") + sections.append(f"- eval_count : {response_body.get('eval_count')}") + sections.append(f"- eval_duration : {response_body.get('eval_duration')}") + sections.append(f"- prompt_eval_count : {response_body.get('prompt_eval_count')}") + sections.append(f"- prompt_eval_duration : {response_body.get('prompt_eval_duration')}") + sections.append("\n### JSON parsé\n") + sections.append("```json") + sections.append(json.dumps(parsed, ensure_ascii=False, indent=2)) + sections.append("```") + + log_path.write_text("\n".join(sections), encoding="utf-8") + return log_path + + +# ── Vérification empirique du seed ────────────────────────────────────────── + + +def verify_seed_respect(sample_prompt: str) -> Optional[str]: + """2 appels successifs avec même prompt+seed. Compare les réponses. + Retourne None si seed respecté, sinon un message expliquant le bruit. + """ + print("[seed-check] Vérification empirique seed (2 appels) …", flush=True) + p1, _, t1, e1 = call_llm_deterministic(sample_prompt) + p2, _, t2, e2 = call_llm_deterministic(sample_prompt) + if e1 or e2: + return f"erreur lors du check seed : {e1 or e2}" + if p1 is None or p2 is None: + return "parse_error sur l'un des 2 appels seed-check" + # Compare les champs structurés (décision + durée + justification) + fields = ["decision", "decision_court", "duree_passage_heures", "justification"] + diffs = {f: (p1.get(f), p2.get(f)) for f in fields if p1.get(f) != p2.get(f)} + if diffs: + return ( + f"seed NON respecté ({MODEL}) — divergences sur " + f"{list(diffs.keys())} entre 2 appels successifs. " + f"Bruit résiduel attendu dans le bench." + ) + print(f"[seed-check] OK seed respecté ({t1}s + {t2}s)", flush=True) + return None + + +# ── Boucle principale ────────────────────────────────────────────────────── + + +def run_bench(filter_ipp: Optional[str] = None, skip_seed_check: bool = False) -> int: + """Exécute le bench complet. Retourne code de sortie (0 = succès).""" + if os.environ.get("T2A_DRYRUN") != "1": + print("ERREUR : T2A_DRYRUN=1 doit être défini pour exécuter le bench.", file=sys.stderr) + return 2 + + print(f"=== Mini-bench T2A — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===") + print(f"Source : {DATA_JS_PATH}") + print(f"Modèle : {MODEL} | Seed : {SEED} | Température : {TEMPERATURE}") + print(f"Logs : {LOG_DIR}/") + print() + + try: + dossiers = parse_data_js(DATA_JS_PATH) + except Exception as e: + print(f"ERREUR parsing data.js : {e}", file=sys.stderr) + return 3 + print(f"Dossiers parsés : {len(dossiers)}") + + # Filtrage éventuel + if filter_ipp: + if filter_ipp not in dossiers: + print(f"IPP {filter_ipp} introuvable.", file=sys.stderr) + return 4 + dossiers = {filter_ipp: dossiers[filter_ipp]} + + # Vérification seed (sur le 1er dossier) + seed_warning: Optional[str] = None + if not skip_seed_check and not filter_ipp: + first_ipp = next(iter(dossiers)) + sample_dpi = build_dpi_raw_from_dossier(dossiers[first_ipp]) + sample_enriched, _ = build_dpi_enriched(sample_dpi) + sample_prompt = PROMPT_TEMPLATE.format(dpi=sample_enriched) + seed_warning = verify_seed_respect(sample_prompt) + if seed_warning: + print(f"⚠️ {seed_warning}", flush=True) + print() + + rows: List[Dict[str, Any]] = [] + + for ipp, dossier in dossiers.items(): + if dossier.get("statut_attente") is True: + print(f"[{ipp}] SKIP (statut_attente=true)") + continue + + nom = (dossier.get("identite") or {}).get("nom", "") + print(f"[{ipp}] {nom} — construction dpi_raw …", flush=True) + + dpi_raw = build_dpi_raw_from_dossier(dossier) + dpi_enriched, metadata = build_dpi_enriched(dpi_raw) + quality_warnings = check_data_quality(dossier) + prompt = PROMPT_TEMPLATE.format(dpi=dpi_enriched) + + print(f"[{ipp}] LLM call …", flush=True) + parsed, body, elapsed, error = call_llm_deterministic(dpi_enriched) + + log_path = write_dryrun_log( + ipp=ipp, + metadata=metadata, + prompt=prompt, + response_body=body, + parsed=parsed, + error_msg=error, + quality_warnings=quality_warnings, + elapsed_s=elapsed, + ) + + # Comparaisons + duree_python = metadata.get("duree_heures_decimales") + duree_llm = parsed.get("duree_passage_heures") if parsed else None + conv_duree = None + if duree_python is not None and duree_llm is not None: + conv_duree = abs(duree_llm - duree_python) <= SEUIL_TOLERANCE_HEURES + + decision_terrain = metadata.get("decision_terrain") + decision_llm = parsed.get("decision") if parsed else None + mapping_attendu = TERRAIN_VERS_T2A.get(decision_terrain) if decision_terrain else None + conv_decision = None + if mapping_attendu is not None and decision_llm is not None: + conv_decision = (decision_llm == mapping_attendu) + + rows.append({ + "ipp": ipp, + "nom": nom, + "duree_python": duree_python, + "duree_llm": duree_llm, + "conv_duree": conv_duree, + "decision_llm": decision_llm, + "decision_terrain": decision_terrain, + "mapping_attendu": mapping_attendu if mapping_attendu else "(non mappé)", + "conv_decision": conv_decision, + "error": error, + "quality_warnings": quality_warnings, + "elapsed_s": elapsed, + "log_path": str(log_path.relative_to(ROOT)), + }) + print(f"[{ipp}] done — duree_py={duree_python} duree_llm={duree_llm} " + f"decision={decision_llm} ({elapsed}s) → {log_path.name}") + + print() + _print_recap(rows, seed_warning) + _write_csv(rows) + return 0 + + +def _print_recap(rows: List[Dict[str, Any]], seed_warning: Optional[str]) -> None: + """Affiche le tableau récap + stats agrégées.""" + print("=" * 130) + header = ( + "| IPP | Nom | duree_py | duree_llm | conv_dur | " + "decision_llm | decision_terrain | " + "mapping_attendu | conv_dec |" + ) + print(header) + print("|" + "-" * (len(header) - 2) + "|") + + for r in rows: + cd = "—" if r["conv_duree"] is None else ("OK" if r["conv_duree"] else "❌") + cdec = "—" if r["conv_decision"] is None else ("OK" if r["conv_decision"] else "❌") + print( + f"| {r['ipp']:<10} | {r['nom'][:11]:<11} | " + f"{str(r['duree_python'] or '—'):<8} | {str(r['duree_llm'] or '—'):<9} | " + f"{cd:<8} | {str(r['decision_llm'] or '—')[:28]:<28} | " + f"{str(r['decision_terrain'] or '—')[:30]:<30} | " + f"{str(r['mapping_attendu'])[:29]:<29} | {cdec:<8} |" + ) + print("=" * 130) + print() + + total = len(rows) + duree_eligible = [r for r in rows if r["conv_duree"] is not None] + duree_ok = sum(1 for r in duree_eligible if r["conv_duree"]) + dec_eligible = [r for r in rows if r["conv_decision"] is not None] + dec_ok = sum(1 for r in dec_eligible if r["conv_decision"]) + + print("STATS AGRÉGÉES :") + print(f" Dossiers traités : {total}") + print(f" Convergence durée : {duree_ok}/{len(duree_eligible)} " + f"(seuil tolérance {SEUIL_TOLERANCE_HEURES}h)") + print(f" Convergence décision : {dec_ok}/{len(dec_eligible)} (sur libellés mappés)") + non_mappes = [r for r in rows if r["mapping_attendu"] == "(non mappé)"] + if non_mappes: + print(f" Non mappés : {len(non_mappes)} " + f"({', '.join(r['decision_terrain'] for r in non_mappes)})") + errors = [r for r in rows if r["error"]] + if errors: + print(f" Erreurs API : {len(errors)}") + for r in errors: + print(f" [{r['ipp']}] {r['error']}") + + divergences = [r for r in rows if r["conv_duree"] is False or r["conv_decision"] is False] + if divergences: + print() + print("DIVERGENCES — détail :") + for r in divergences: + print(f" [{r['ipp']} {r['nom']}]") + if r["conv_duree"] is False: + print(f" durée : Python={r['duree_python']}h vs LLM={r['duree_llm']}h") + if r["conv_decision"] is False: + print(f" décision : LLM={r['decision_llm']} vs attendu={r['mapping_attendu']}") + print(f" log : {r['log_path']}") + + if seed_warning: + print() + print(f"⚠️ SEED : {seed_warning}") + print(" → les divergences peuvent venir du bruit cloud, pas du LLM.") + + print() + # Recommandation + if duree_eligible and duree_ok == len(duree_eligible): + print("RECO : FAITS_CALCULÉS bien lu par le LLM sur la durée — " + "PROMPT 3 actuel semble OK, pas besoin de le réécrire pour ce point.") + elif duree_eligible: + print("RECO : divergences durée détectées — réécriture PROMPT 3 recommandée " + "(forcer la lecture du bloc FAITS_CALCULÉS, ajouter avertissement explicite).") + else: + print("RECO : durée non comparable — vérifier le parsing build_dpi_enriched " + "(metadata duree_heures_decimales est None partout).") + + +def _write_csv(rows: List[Dict[str, Any]]) -> None: + """Écrit le récap en CSV pour analyse downstream.""" + if not rows: + return + ts = datetime.now().strftime("%Y%m%d_%H%M%S") + csv_path = SCRIPTS_DIR / f"bench_t2a_dryrun_{ts}.csv" + with csv_path.open("w", encoding="utf-8", newline="") as f: + writer = csv.DictWriter( + f, + fieldnames=[ + "ipp", "nom", "duree_python", "duree_llm", "conv_duree", + "decision_llm", "decision_terrain", "mapping_attendu", + "conv_decision", "error", "elapsed_s", "log_path", + ], + extrasaction="ignore", + ) + writer.writeheader() + for r in rows: + row = dict(r) + row["quality_warnings"] = "; ".join(r.get("quality_warnings", [])) + writer.writerow(row) + print(f"CSV : {csv_path.relative_to(ROOT)}") + + +# ── CLI ──────────────────────────────────────────────────────────────────── + + +def main() -> int: + p = argparse.ArgumentParser(description=__doc__) + p.add_argument("--ipp", help="N'exécuter que ce dossier (debug)") + p.add_argument( + "--skip-seed-check", + action="store_true", + help="Skipper la vérification empirique du seed (gagne 2 appels LLM)", + ) + args = p.parse_args() + return run_bench(filter_ipp=args.ipp, skip_seed_check=args.skip_seed_check) + + +if __name__ == "__main__": + sys.exit(main())