feat(t2a): bench_t2a_dryrun.py + t2a_mappings.py - mini-bench standalone 11 dossiers POC

Bench standalone qui exécute build_dpi_enriched + appel LLM sur les 11
dossiers POC GHT Sud 95 (docs/clients/ght_sud_95/mockup_easily_assure/data.js),
sans passer par Demo_urgence_2 ni Léa/Windows. Permet de mesurer la
convergence durée/décision Python ↔ LLM sur un panel représentatif AVANT
d'écrire le garde-fou serveur du commit 2.

core/llm/t2a_mappings.py :
- Module partagé TERRAIN_VERS_T2A (4 entrées validées par Dom 12/05)
- Importé par le bench, sera importé aussi par le garde-fou serveur commit 2
- Cas non mappés volontairement documentés (Retour structure d'origine,
  chaîne vide pour statut_attente)

scripts/bench_t2a_dryrun.py :
- Parsing data.js via node (vm.runInContext) → 11 dossiers en JSON
- Reconstruction d'un dpi_raw plat simulant l'OCR scroll auto :
  bandeau Easily Assure répété 5x (1 par onglet) + sections motif /
  examens / imagerie / notes médicales / Synthèse Urgences au format
  LIBELLÉ    VALEUR
- NE bypasse PAS build_dpi_enriched : le dpi_raw est texte plat re-parsé
  par la fonction (test de robustesse réel du parser regex)
- Appel LLM déterministe : temperature=0, seed=42, model=gemma4:31b-cloud
- Vérification empirique du respect du seed (2 appels successifs sur 1er
  dossier, comparaison decision/durée/justif) → warning si bruit cloud
- 4 traces structurées par dossier dans logs/t2a_dryrun/<IPP>_<ts>.log :
  [t2a_dryrun_metadata] / [t2a_dryrun_prompt] / [t2a_dryrun_response]
  ou [t2a_dryrun_error] en cas d'échec API
- Filet data_quality_warning (incohérence âge déclaré vs date naissance,
  motif vs diagnostic principal, décision vide) — filet, pas analyse
  exhaustive ; signale sans corriger (anonymisation v1 incertaine)
- Tableau récap stdout 9 colonnes + CSV scripts/bench_t2a_dryrun_<ts>.csv
- Stats agrégées : convergence durée X/N, convergence décision X/N
  mappés, liste détaillée des divergences avec pointeurs vers logs
- Recommandation auto : réécrire PROMPT 3 ou non selon convergence durée

Activation : T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py
Options : --ipp <IPP> (1 dossier), --skip-seed-check

Smoke test pré-commit (sans LLM) : parsing + dpi_raw + build_dpi_enriched
sur les 11 dossiers → 11/11 metadata complets, 0 parsing_warning,
durées calculées de 2.0h à 12.02h, décompo décisions terrain conforme
(7 Consultation + 1 Hosp + 1 UHCD + 1 Transfert + 1 Retour structure).

Brief complet : docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-12 18:55:57 +02:00
parent 9872f4510c
commit f2212e77e3
2 changed files with 715 additions and 0 deletions

686
scripts/bench_t2a_dryrun.py Normal file
View File

@@ -0,0 +1,686 @@
"""Mini-bench standalone T2A — 11 dossiers POC GHT Sud 95.
Exécute `build_dpi_enriched` + appel LLM sur chaque dossier de `data.js`,
sans passer par Demo_urgence_2 ni Léa/Windows. Mesure la convergence
durée Python ↔ LLM et décision terrain ↔ LLM.
Voir docs/handoffs/2026-05-12_brief_S1_build_dpi_enriched.md.
Usage :
T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py
T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py --skip-seed-check
T2A_DRYRUN=1 python scripts/bench_t2a_dryrun.py --ipp 25003284 # 1 seul
Logs :
logs/t2a_dryrun/<IPP>_<timestamp>.log (4 traces structurées par dossier)
scripts/bench_t2a_dryrun_<timestamp>.csv (récap pour analyse)
"""
from __future__ import annotations
import argparse
import csv
import html
import json
import os
import re
import subprocess
import sys
import time
import urllib.error
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
ROOT = Path(__file__).resolve().parent.parent
sys.path.insert(0, str(ROOT))
from core.llm.t2a_decision import build_dpi_enriched, PROMPT_TEMPLATE # noqa: E402
from core.llm.t2a_mappings import TERRAIN_VERS_T2A # noqa: E402
# ── Configuration ───────────────────────────────────────────────────────────
DATA_JS_PATH = ROOT / "docs" / "clients" / "ght_sud_95" / "mockup_easily_assure" / "data.js"
LOG_DIR = ROOT / "logs" / "t2a_dryrun"
SCRIPTS_DIR = ROOT / "scripts"
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434/api/generate")
MODEL = os.environ.get("T2A_BENCH_MODEL", "gemma4:31b-cloud")
SEED = 42
TEMPERATURE = 0.0
NUM_CTX = 16384
NUM_PREDICT = 1500
TIMEOUT_SEC = 180
SEUIL_TOLERANCE_HEURES = 0.5 # ≈ 30 min, distingue hallucination grossière
# ── Parsing data.js ─────────────────────────────────────────────────────────
def parse_data_js(path: Path) -> Dict[str, Dict[str, Any]]:
"""Parse data.js via node (vm.runInContext) → dict {IPP: dossier}."""
if not path.exists():
raise FileNotFoundError(f"data.js introuvable : {path}")
script = f"""
const fs = require('fs');
const vm = require('vm');
const src = fs.readFileSync({json.dumps(str(path))}, 'utf8');
const ctx = {{}};
vm.createContext(ctx);
vm.runInContext(src + '\\nthis.DOSSIERS = DOSSIERS;', ctx);
process.stdout.write(JSON.stringify(ctx.DOSSIERS));
"""
result = subprocess.run(
["node", "-e", script],
capture_output=True,
text=True,
timeout=15,
)
if result.returncode != 0:
raise RuntimeError(f"node parse failed: {result.stderr}")
return json.loads(result.stdout)
# ── Construction dpi_raw plat (simulation OCR scroll auto) ─────────────────
_HTML_TAG_RE = re.compile(r"<[^>]+>")
def strip_html(text: Any) -> str:
"""Retire les balises HTML simples et déspécifie les entités."""
if text is None:
return ""
s = str(text)
s = _HTML_TAG_RE.sub("", s)
return html.unescape(s).strip()
def _bandeau(dossier: Dict[str, Any]) -> str:
"""Reconstruit le bandeau Easily Assure (header fixe répété par onglet)."""
ident = dossier.get("identite", {}) or {}
pas = dossier.get("passage", {}) or {}
parts = [
f"IPP : {ident.get('ipp', '')}",
f"{ident.get('nom', '')} {ident.get('prenom', '')}".strip(),
f"Né(e) le {ident.get('ne_le', '')}",
f"{ident.get('age', '')}",
f"Sexe : {ident.get('sexe', '')}",
f"Arrivée : {pas.get('arrivee', '')}",
f"IAO : {pas.get('iao', '')} ({pas.get('iao_heure', '')})",
f"Médecin : {pas.get('medecin', '')}",
f"Sortie : {pas.get('sortie', '')}" if pas.get('sortie') else "",
]
return " | ".join(p for p in parts if p)
def _section_synthese(syn: Dict[str, Any]) -> str:
"""Reproduit le bloc Synthèse Urgences au format LIBELLÉ VALEUR."""
if not syn:
return ""
def f(label: str, value: Any) -> str:
v = strip_html(value).strip() if value not in (None, "") else ""
return f"{label} {v}" if v else label
def fdt(label: str, date: Any, heure: Any) -> str:
if date and heure:
return f"{label} {date} à {heure}"
if date:
return f"{label} {date}"
return label
lignes = [
"Synthèse Urgences",
"",
"Détails de l'épisode",
fdt("Episode - Date", syn.get("episode_date"), syn.get("episode_heure")),
f("Mode de transport à l'arrivée", syn.get("mode_transport")),
f("Médicalisation du transport", syn.get("medicalisation_transport")),
f("Mode d'entrée", syn.get("mode_entree")),
f("Origine du transfert", syn.get("origine_transfert")),
"",
"Détails de l'orientation aux Urgences",
fdt("Date d'orientation", syn.get("orientation_date"), syn.get("orientation_heure")),
f("IAO", syn.get("iao")),
f("Priorité", syn.get("priorite")),
f("Episode - Sous-type", syn.get("sous_type")),
f("Circonstances", syn.get("circonstances")),
f("Motif de prise en charge", syn.get("motif_pec")),
f("Observ. IDE Urg", syn.get("obs_ide_urg")),
"",
"Détails de la prise en charge",
f("Médecin de la prise en charge médicale", syn.get("medecin_pec")),
fdt("Date de la prise en charge médicale", syn.get("pec_date"), syn.get("pec_heure")),
f("CCMU", syn.get("ccmu")),
f("GEMSA", syn.get("gemsa")),
f("Diagnostics", syn.get("diagnostics_synthese")),
"",
"Décision médicale",
f("Médecin de la décision médicale", syn.get("medecin_decision")),
fdt("Date de décision médicale", syn.get("decision_date"), syn.get("decision_heure")),
f("Décision médicale", syn.get("decision")),
f("Orientation du patient", syn.get("orientation")),
f("US de destination", syn.get("us_destination")),
]
return "\n".join(lignes)
def build_dpi_raw_from_dossier(dossier: Dict[str, Any]) -> str:
"""Construit un dpi_raw plat simulant la sortie OCR scroll auto (5 onglets,
bandeau répété en tête de chaque). NE bypasse PAS build_dpi_enriched —
le résultat est un texte plat que build_dpi_enriched re-parsera.
"""
bandeau = _bandeau(dossier)
motif = dossier.get("motif", {}) or {}
examens = dossier.get("examens", {}) or {}
notes_med = dossier.get("notes_medicales", []) or []
syn = dossier.get("synthese", {}) or {}
# Onglet 1 : Motif & IDE
diags = motif.get("diagnostics", []) or []
diag_principal = diags[0].get("code") if diags else ""
onglet1 = [
bandeau,
"",
"Motif d'admission :",
f"Motif : {motif.get('symptomes_orientation', '')}".strip(),
"",
"Observ. IDE :",
strip_html(motif.get("obs_ide", "")),
"",
f"Diagnostic principal : {diag_principal}" if diag_principal else "",
]
# Onglet 2 : Examens cliniques
questionnaires = examens.get("questionnaires", []) or []
notes_para = examens.get("notes_paramedicales", []) or []
onglet2 = [bandeau, "", "Examens cliniques :"]
for q in questionnaires:
onglet2.append(f"--- {q.get('nom', '')} ---")
onglet2.append(strip_html(q.get("reponse", "")))
for n in notes_para:
onglet2.append(f"--- {n.get('type', '')} ({n.get('par', '')}) ---")
onglet2.append(strip_html(n.get("contenu", "")))
# Onglet 3 : Imagerie (extrait depuis notes_medicales si présent)
imagerie_notes = [
n for n in notes_med
if "imagerie" in str(n.get("type", "")).lower()
or "RESULTATS" in str(n.get("contenu", ""))
or "radiograph" in str(n.get("contenu", "")).lower()
]
onglet3 = [bandeau, "", "Imagerie :"]
if imagerie_notes:
for n in imagerie_notes:
onglet3.append(strip_html(n.get("contenu", "")))
else:
onglet3.append("(pas d'imagerie)")
# Onglet 4 : Notes médicales (toutes, sauf imagerie déjà mise au-dessus)
onglet4 = [bandeau, "", "Notes médicales :"]
for n in notes_med:
onglet4.append(f"--- {n.get('type', '')} ({n.get('date', '')}) ---")
onglet4.append(strip_html(n.get("contenu", "")))
# Onglet 5 : Synthèse Urgences
onglet5 = [bandeau, "", _section_synthese(syn)]
return "\n".join(
line for section in (onglet1, onglet2, onglet3, onglet4, onglet5)
for line in section
)
# ── Filet data_quality_warning ─────────────────────────────────────────────
def check_data_quality(dossier: Dict[str, Any]) -> List[str]:
"""Détecte des incohérences cliniques flagrantes (filet, pas exhaustif).
Ne corrige rien — signale uniquement, pour ne pas confondre une divergence
LLM↔terrain avec un défaut du data.js (anonymisation v1 incertaine).
"""
warnings: List[str] = []
ident = dossier.get("identite", {}) or {}
motif = dossier.get("motif", {}) or {}
syn = dossier.get("synthese", {}) or {}
# Cohérence âge déclaré vs date de naissance + date du passage
ne_le = ident.get("ne_le", "")
age_declare = ident.get("age", "")
if ne_le and age_declare:
m_age = re.match(r"(\d+)", age_declare)
if m_age:
try:
naissance = datetime.strptime(ne_le, "%d/%m/%Y")
arrivee_str = (dossier.get("passage", {}) or {}).get("arrivee", "")
if arrivee_str:
arrivee = datetime.strptime(
arrivee_str.split()[0], "%d/%m/%Y"
)
age_calcule = (arrivee - naissance).days // 365
age_dit = int(m_age.group(1))
if abs(age_calcule - age_dit) > 1:
warnings.append(
f"âge incohérent : déclaré {age_dit} ans vs "
f"calculé ~{age_calcule} ans depuis {ne_le}"
)
except ValueError:
pass
# Cohérence motif court vs diagnostic principal (rapide)
motif_court = (dossier.get("passage", {}) or {}).get("motif_court", "").lower()
diags = motif.get("diagnostics", []) or []
if motif_court and diags:
diag_code = str(diags[0].get("code", "")).lower()
# Détection grossière : motif "fracture" alors que diag pneumopathie, etc.
groupes_a_risque = [
("fracture", "pneumop"),
("pneumop", "fracture"),
("ophtalmo", "thora"),
]
for m_key, d_key in groupes_a_risque:
if m_key in motif_court and d_key in diag_code:
warnings.append(
f"motif '{motif_court}' incohérent avec diagnostic "
f"principal '{diags[0].get('code', '')}'"
)
break
# Décision terrain vide
if syn and not syn.get("decision"):
warnings.append("synthese.decision vide (dossier en cours ?)")
return warnings
# ── Appel LLM déterministe (temp=0, seed=42) ───────────────────────────────
def call_llm_deterministic(
dpi_enriched: str,
model: str = MODEL,
seed: int = SEED,
) -> Tuple[Optional[Dict[str, Any]], Dict[str, Any], float, Optional[str]]:
"""Appel Ollama avec température 0 + seed fixé pour reproductibilité.
Returns:
(parsed_json, raw_response_dict, elapsed_s, error_msg)
parsed_json est None si parse_error.
raw_response_dict est le body Ollama brut (avec eval_count, etc.).
error_msg non-None si exception HTTP / timeout / parse.
"""
prompt = PROMPT_TEMPLATE.format(dpi=dpi_enriched)
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"format": "json",
"keep_alive": "5m",
"options": {
"temperature": TEMPERATURE,
"seed": seed,
"num_predict": NUM_PREDICT,
"num_ctx": NUM_CTX,
},
}
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(
OLLAMA_URL,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
t0 = time.time()
try:
with urllib.request.urlopen(req, timeout=TIMEOUT_SEC) as resp:
body = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, TimeoutError, ConnectionError) as e:
return None, {}, round(time.time() - t0, 1), f"HTTP/Network: {e}"
elapsed = round(time.time() - t0, 1)
raw_response = body.get("response", "").strip()
try:
# Nettoyer ```json ... ``` éventuels
cleaned = raw_response
if cleaned.startswith("```"):
cleaned = cleaned.split("\n", 1)[-1]
if cleaned.endswith("```"):
cleaned = cleaned.rsplit("```", 1)[0]
cleaned = cleaned.strip()
parsed = json.loads(cleaned)
return parsed, body, elapsed, None
except json.JSONDecodeError as e:
return None, body, elapsed, f"parse_error: {e}"
# ── Logging des 4 traces structurées ───────────────────────────────────────
def _serialize_for_log(obj: Any) -> Any:
"""Convertit datetime en str pour JSON serialization."""
if isinstance(obj, datetime):
return obj.isoformat()
if isinstance(obj, dict):
return {k: _serialize_for_log(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_serialize_for_log(v) for v in obj]
return obj
def write_dryrun_log(
ipp: str,
metadata: Dict[str, Any],
prompt: str,
response_body: Dict[str, Any],
parsed: Optional[Dict[str, Any]],
error_msg: Optional[str],
quality_warnings: List[str],
elapsed_s: float,
) -> Path:
"""Écrit les 4 traces structurées (markdown) dans logs/t2a_dryrun/."""
LOG_DIR.mkdir(parents=True, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = LOG_DIR / f"{ipp}_{ts}.log"
sections = []
sections.append(f"# Mini-bench T2A dry-run — IPP {ipp}{ts}\n")
sections.append(f"Modèle : `{MODEL}` | Seed : `{SEED}` | Température : `{TEMPERATURE}`\n")
if quality_warnings:
sections.append("## data_quality_warning")
for w in quality_warnings:
sections.append(f"- {w}")
sections.append("")
sections.append("## [t2a_dryrun_metadata]\n")
sections.append("```json")
sections.append(json.dumps(_serialize_for_log(metadata), ensure_ascii=False, indent=2))
sections.append("```\n")
sections.append("## [t2a_dryrun_prompt]\n")
sections.append("```")
sections.append(prompt)
sections.append("```\n")
if error_msg:
sections.append("## [t2a_dryrun_error]\n")
sections.append(f"- Erreur : `{error_msg}`")
sections.append(f"- Latence : {elapsed_s}s")
if response_body:
sections.append("- Body partiel :")
sections.append("```json")
sections.append(json.dumps(response_body, ensure_ascii=False, indent=2)[:2000])
sections.append("```")
else:
sections.append("## [t2a_dryrun_response]\n")
sections.append(f"- Modèle : {response_body.get('model', MODEL)}")
sections.append(f"- Latence totale : {elapsed_s}s")
sections.append(f"- eval_count : {response_body.get('eval_count')}")
sections.append(f"- eval_duration : {response_body.get('eval_duration')}")
sections.append(f"- prompt_eval_count : {response_body.get('prompt_eval_count')}")
sections.append(f"- prompt_eval_duration : {response_body.get('prompt_eval_duration')}")
sections.append("\n### JSON parsé\n")
sections.append("```json")
sections.append(json.dumps(parsed, ensure_ascii=False, indent=2))
sections.append("```")
log_path.write_text("\n".join(sections), encoding="utf-8")
return log_path
# ── Vérification empirique du seed ──────────────────────────────────────────
def verify_seed_respect(sample_prompt: str) -> Optional[str]:
"""2 appels successifs avec même prompt+seed. Compare les réponses.
Retourne None si seed respecté, sinon un message expliquant le bruit.
"""
print("[seed-check] Vérification empirique seed (2 appels) …", flush=True)
p1, _, t1, e1 = call_llm_deterministic(sample_prompt)
p2, _, t2, e2 = call_llm_deterministic(sample_prompt)
if e1 or e2:
return f"erreur lors du check seed : {e1 or e2}"
if p1 is None or p2 is None:
return "parse_error sur l'un des 2 appels seed-check"
# Compare les champs structurés (décision + durée + justification)
fields = ["decision", "decision_court", "duree_passage_heures", "justification"]
diffs = {f: (p1.get(f), p2.get(f)) for f in fields if p1.get(f) != p2.get(f)}
if diffs:
return (
f"seed NON respecté ({MODEL}) — divergences sur "
f"{list(diffs.keys())} entre 2 appels successifs. "
f"Bruit résiduel attendu dans le bench."
)
print(f"[seed-check] OK seed respecté ({t1}s + {t2}s)", flush=True)
return None
# ── Boucle principale ──────────────────────────────────────────────────────
def run_bench(filter_ipp: Optional[str] = None, skip_seed_check: bool = False) -> int:
"""Exécute le bench complet. Retourne code de sortie (0 = succès)."""
if os.environ.get("T2A_DRYRUN") != "1":
print("ERREUR : T2A_DRYRUN=1 doit être défini pour exécuter le bench.", file=sys.stderr)
return 2
print(f"=== Mini-bench T2A — {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ===")
print(f"Source : {DATA_JS_PATH}")
print(f"Modèle : {MODEL} | Seed : {SEED} | Température : {TEMPERATURE}")
print(f"Logs : {LOG_DIR}/")
print()
try:
dossiers = parse_data_js(DATA_JS_PATH)
except Exception as e:
print(f"ERREUR parsing data.js : {e}", file=sys.stderr)
return 3
print(f"Dossiers parsés : {len(dossiers)}")
# Filtrage éventuel
if filter_ipp:
if filter_ipp not in dossiers:
print(f"IPP {filter_ipp} introuvable.", file=sys.stderr)
return 4
dossiers = {filter_ipp: dossiers[filter_ipp]}
# Vérification seed (sur le 1er dossier)
seed_warning: Optional[str] = None
if not skip_seed_check and not filter_ipp:
first_ipp = next(iter(dossiers))
sample_dpi = build_dpi_raw_from_dossier(dossiers[first_ipp])
sample_enriched, _ = build_dpi_enriched(sample_dpi)
sample_prompt = PROMPT_TEMPLATE.format(dpi=sample_enriched)
seed_warning = verify_seed_respect(sample_prompt)
if seed_warning:
print(f"⚠️ {seed_warning}", flush=True)
print()
rows: List[Dict[str, Any]] = []
for ipp, dossier in dossiers.items():
if dossier.get("statut_attente") is True:
print(f"[{ipp}] SKIP (statut_attente=true)")
continue
nom = (dossier.get("identite") or {}).get("nom", "")
print(f"[{ipp}] {nom} — construction dpi_raw …", flush=True)
dpi_raw = build_dpi_raw_from_dossier(dossier)
dpi_enriched, metadata = build_dpi_enriched(dpi_raw)
quality_warnings = check_data_quality(dossier)
prompt = PROMPT_TEMPLATE.format(dpi=dpi_enriched)
print(f"[{ipp}] LLM call …", flush=True)
parsed, body, elapsed, error = call_llm_deterministic(dpi_enriched)
log_path = write_dryrun_log(
ipp=ipp,
metadata=metadata,
prompt=prompt,
response_body=body,
parsed=parsed,
error_msg=error,
quality_warnings=quality_warnings,
elapsed_s=elapsed,
)
# Comparaisons
duree_python = metadata.get("duree_heures_decimales")
duree_llm = parsed.get("duree_passage_heures") if parsed else None
conv_duree = None
if duree_python is not None and duree_llm is not None:
conv_duree = abs(duree_llm - duree_python) <= SEUIL_TOLERANCE_HEURES
decision_terrain = metadata.get("decision_terrain")
decision_llm = parsed.get("decision") if parsed else None
mapping_attendu = TERRAIN_VERS_T2A.get(decision_terrain) if decision_terrain else None
conv_decision = None
if mapping_attendu is not None and decision_llm is not None:
conv_decision = (decision_llm == mapping_attendu)
rows.append({
"ipp": ipp,
"nom": nom,
"duree_python": duree_python,
"duree_llm": duree_llm,
"conv_duree": conv_duree,
"decision_llm": decision_llm,
"decision_terrain": decision_terrain,
"mapping_attendu": mapping_attendu if mapping_attendu else "(non mappé)",
"conv_decision": conv_decision,
"error": error,
"quality_warnings": quality_warnings,
"elapsed_s": elapsed,
"log_path": str(log_path.relative_to(ROOT)),
})
print(f"[{ipp}] done — duree_py={duree_python} duree_llm={duree_llm} "
f"decision={decision_llm} ({elapsed}s) → {log_path.name}")
print()
_print_recap(rows, seed_warning)
_write_csv(rows)
return 0
def _print_recap(rows: List[Dict[str, Any]], seed_warning: Optional[str]) -> None:
"""Affiche le tableau récap + stats agrégées."""
print("=" * 130)
header = (
"| IPP | Nom | duree_py | duree_llm | conv_dur | "
"decision_llm | decision_terrain | "
"mapping_attendu | conv_dec |"
)
print(header)
print("|" + "-" * (len(header) - 2) + "|")
for r in rows:
cd = "" if r["conv_duree"] is None else ("OK" if r["conv_duree"] else "")
cdec = "" if r["conv_decision"] is None else ("OK" if r["conv_decision"] else "")
print(
f"| {r['ipp']:<10} | {r['nom'][:11]:<11} | "
f"{str(r['duree_python'] or ''):<8} | {str(r['duree_llm'] or ''):<9} | "
f"{cd:<8} | {str(r['decision_llm'] or '')[:28]:<28} | "
f"{str(r['decision_terrain'] or '')[:30]:<30} | "
f"{str(r['mapping_attendu'])[:29]:<29} | {cdec:<8} |"
)
print("=" * 130)
print()
total = len(rows)
duree_eligible = [r for r in rows if r["conv_duree"] is not None]
duree_ok = sum(1 for r in duree_eligible if r["conv_duree"])
dec_eligible = [r for r in rows if r["conv_decision"] is not None]
dec_ok = sum(1 for r in dec_eligible if r["conv_decision"])
print("STATS AGRÉGÉES :")
print(f" Dossiers traités : {total}")
print(f" Convergence durée : {duree_ok}/{len(duree_eligible)} "
f"(seuil tolérance {SEUIL_TOLERANCE_HEURES}h)")
print(f" Convergence décision : {dec_ok}/{len(dec_eligible)} (sur libellés mappés)")
non_mappes = [r for r in rows if r["mapping_attendu"] == "(non mappé)"]
if non_mappes:
print(f" Non mappés : {len(non_mappes)} "
f"({', '.join(r['decision_terrain'] for r in non_mappes)})")
errors = [r for r in rows if r["error"]]
if errors:
print(f" Erreurs API : {len(errors)}")
for r in errors:
print(f" [{r['ipp']}] {r['error']}")
divergences = [r for r in rows if r["conv_duree"] is False or r["conv_decision"] is False]
if divergences:
print()
print("DIVERGENCES — détail :")
for r in divergences:
print(f" [{r['ipp']} {r['nom']}]")
if r["conv_duree"] is False:
print(f" durée : Python={r['duree_python']}h vs LLM={r['duree_llm']}h")
if r["conv_decision"] is False:
print(f" décision : LLM={r['decision_llm']} vs attendu={r['mapping_attendu']}")
print(f" log : {r['log_path']}")
if seed_warning:
print()
print(f"⚠️ SEED : {seed_warning}")
print(" → les divergences peuvent venir du bruit cloud, pas du LLM.")
print()
# Recommandation
if duree_eligible and duree_ok == len(duree_eligible):
print("RECO : FAITS_CALCULÉS bien lu par le LLM sur la durée — "
"PROMPT 3 actuel semble OK, pas besoin de le réécrire pour ce point.")
elif duree_eligible:
print("RECO : divergences durée détectées — réécriture PROMPT 3 recommandée "
"(forcer la lecture du bloc FAITS_CALCULÉS, ajouter avertissement explicite).")
else:
print("RECO : durée non comparable — vérifier le parsing build_dpi_enriched "
"(metadata duree_heures_decimales est None partout).")
def _write_csv(rows: List[Dict[str, Any]]) -> None:
"""Écrit le récap en CSV pour analyse downstream."""
if not rows:
return
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
csv_path = SCRIPTS_DIR / f"bench_t2a_dryrun_{ts}.csv"
with csv_path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(
f,
fieldnames=[
"ipp", "nom", "duree_python", "duree_llm", "conv_duree",
"decision_llm", "decision_terrain", "mapping_attendu",
"conv_decision", "error", "elapsed_s", "log_path",
],
extrasaction="ignore",
)
writer.writeheader()
for r in rows:
row = dict(r)
row["quality_warnings"] = "; ".join(r.get("quality_warnings", []))
writer.writerow(row)
print(f"CSV : {csv_path.relative_to(ROOT)}")
# ── CLI ────────────────────────────────────────────────────────────────────
def main() -> int:
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--ipp", help="N'exécuter que ce dossier (debug)")
p.add_argument(
"--skip-seed-check",
action="store_true",
help="Skipper la vérification empirique du seed (gagne 2 appels LLM)",
)
args = p.parse_args()
return run_bench(filter_ipp=args.ipp, skip_seed_check=args.skip_seed_check)
if __name__ == "__main__":
sys.exit(main())