Nouveau module src/medical/dp_finalizer.py : - 5 règles d'arbitrage (R1-R5) : CRH CONFIRMED override, Trackare corroboré, symptôme R* override/review, ambigu REVIEW, Z-code/R-code interdits auto-confirm - Traçabilité : dp_trackare, dp_crh_only, dp_final sur DossierMedical - quality_flags dict (merge sans écraser) + alertes_codage (append) Modèles config.py : - DPCandidate, DPSelection (NUKE-3) - get_dp_ranker_llm_enabled(), check_adversarial_model_config() - Champs DossierMedical : dp_trackare, dp_crh_only, dp_final, quality_flags Intégration : - main.py : appel finalize_dp() après vetos/GHM (individuel + fusionné) - benchmark : finalizer dans _rebuild_and_select(), dp_final dans output Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
875 lines
33 KiB
Python
875 lines
33 KiB
Python
#!/usr/bin/env python3
|
||
"""Benchmark NUKE-3 — rapport comparatif LLM off vs on.
|
||
|
||
Analyse les dossiers JSON existants (output/structured/) pour produire
|
||
des métriques DIM-like sur la sélection DP (NUKE-3).
|
||
|
||
Mode 1 (par défaut) : analyse les JSON existants (pas d'Ollama requis).
|
||
Mode 2 (--rerun) : relance le pipeline 2× (LLM off puis LLM on) —
|
||
nécessite Ollama pour le mode "on".
|
||
|
||
Usage:
|
||
python scripts/benchmark_nuke3_compare.py # analyse offline
|
||
python scripts/benchmark_nuke3_compare.py --n 10 # top 10 dossiers
|
||
python scripts/benchmark_nuke3_compare.py --rerun --n 5 # relance pipeline
|
||
python scripts/benchmark_nuke3_compare.py --dossiers A,B,C # dossiers spécifiques
|
||
python scripts/benchmark_nuke3_compare.py --gold data/gold_crh/gold_crh.jsonl
|
||
python scripts/benchmark_nuke3_compare.py --offline --case-id 74_23141536
|
||
python scripts/benchmark_nuke3_compare.py --offline --top-errors 20
|
||
python scripts/benchmark_nuke3_compare.py --offline --dim-pack 20
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import csv
|
||
import json
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from statistics import mean
|
||
|
||
ROOT = Path(__file__).resolve().parent.parent
|
||
sys.path.insert(0, str(ROOT))
|
||
|
||
OUTPUT_DIR = ROOT / "output" / "structured"
|
||
INPUT_DIR = ROOT / "input"
|
||
REPORT_PATH = ROOT / "docs" / "NUKE3_BENCHMARK_REPORT.md"
|
||
PY = str(ROOT / ".venv" / "bin" / "python3")
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Chargement JSON
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def find_merged_json(dossier_id: str) -> Path | None:
|
||
"""Trouve le JSON fusionné d'un dossier."""
|
||
d = OUTPUT_DIR / dossier_id
|
||
if not d.exists():
|
||
return None
|
||
fusions = list(d.glob("*fusionne_cim10.json"))
|
||
if fusions:
|
||
return fusions[0]
|
||
cim10s = list(d.glob("*_cim10.json"))
|
||
return cim10s[0] if cim10s else None
|
||
|
||
|
||
def load_dossier_json(dossier_id: str) -> dict | None:
|
||
"""Charge le JSON d'un dossier."""
|
||
path = find_merged_json(dossier_id)
|
||
if not path:
|
||
return None
|
||
try:
|
||
return json.loads(path.read_text("utf-8"))
|
||
except (json.JSONDecodeError, OSError):
|
||
return None
|
||
|
||
|
||
def select_dossiers(n: int, specific: list[str] | None) -> list[str]:
|
||
"""Sélectionne les dossiers à analyser."""
|
||
if specific:
|
||
return [d.strip() for d in specific if d.strip()]
|
||
|
||
all_dirs = sorted(
|
||
d.name for d in OUTPUT_DIR.iterdir()
|
||
if d.is_dir() and find_merged_json(d.name) is not None
|
||
)
|
||
return all_dirs[:n] if n > 0 else all_dirs
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Analyse NUKE-3 d'un dossier
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def analyze_dp_selection(data: dict) -> dict:
|
||
"""Extrait les métriques NUKE-3 d'un dossier."""
|
||
dp_sel = data.get("dp_selection")
|
||
|
||
result = {
|
||
"has_dp_selection": dp_sel is not None,
|
||
"verdict": None,
|
||
"confidence": None,
|
||
"chosen_code": None,
|
||
"n_candidates": 0,
|
||
"n_evidence": 0,
|
||
"is_comorbidity_dp": False,
|
||
"is_symptom_dp": False,
|
||
"is_act_only_dp": False,
|
||
"has_evidence": False,
|
||
"delta": None,
|
||
"reason": None,
|
||
}
|
||
|
||
if not dp_sel:
|
||
return result
|
||
|
||
result["verdict"] = dp_sel.get("verdict")
|
||
result["confidence"] = dp_sel.get("confidence")
|
||
result["chosen_code"] = dp_sel.get("chosen_code")
|
||
|
||
candidates = dp_sel.get("candidates", [])
|
||
result["n_candidates"] = len(candidates)
|
||
|
||
evidence = dp_sel.get("evidence", [])
|
||
result["n_evidence"] = len(evidence)
|
||
result["has_evidence"] = len(evidence) > 0
|
||
|
||
result["reason"] = dp_sel.get("reason")
|
||
|
||
# Debug scores
|
||
debug = dp_sel.get("debug_scores") or {}
|
||
result["delta"] = debug.get("delta")
|
||
|
||
# Flags du gagnant
|
||
if candidates:
|
||
winner = candidates[0]
|
||
result["is_comorbidity_dp"] = winner.get("is_comorbidity_like", False)
|
||
result["is_symptom_dp"] = winner.get("is_symptom_like", False)
|
||
result["is_act_only_dp"] = winner.get("is_act_only", False)
|
||
|
||
return result
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Agrégation
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def compute_metrics(analyses: list[dict]) -> dict:
|
||
"""Calcule les métriques agrégées DIM-like."""
|
||
n = len(analyses)
|
||
if n == 0:
|
||
return {"n": 0}
|
||
|
||
with_selection = [a for a in analyses if a["has_dp_selection"]]
|
||
n_sel = len(with_selection)
|
||
|
||
confirmed = [a for a in with_selection if a["verdict"] == "CONFIRMED"]
|
||
review = [a for a in with_selection if a["verdict"] == "REVIEW"]
|
||
|
||
# Métriques principales
|
||
confirmed_rate = len(confirmed) / n_sel if n_sel else 0
|
||
|
||
# Evidence
|
||
confirmed_with_evidence = sum(1 for a in confirmed if a["has_evidence"])
|
||
confirmed_evidence_rate = confirmed_with_evidence / len(confirmed) if confirmed else 0
|
||
|
||
# Codes problématiques en DP
|
||
symptom_count = sum(1 for a in with_selection if a["is_symptom_dp"])
|
||
comorbidity_count = sum(1 for a in with_selection if a["is_comorbidity_dp"])
|
||
act_only_count = sum(1 for a in with_selection if a["is_act_only_dp"])
|
||
|
||
# Confidence
|
||
conf_high = sum(1 for a in with_selection if a["confidence"] == "high")
|
||
conf_med = sum(1 for a in with_selection if a["confidence"] == "medium")
|
||
conf_low = sum(1 for a in with_selection if a["confidence"] == "low")
|
||
|
||
# R-codes en DP (symptômes)
|
||
r_code_count = sum(
|
||
1 for a in with_selection
|
||
if a["chosen_code"] and a["chosen_code"].startswith("R")
|
||
)
|
||
|
||
return {
|
||
"n_total": n,
|
||
"n_with_selection": n_sel,
|
||
"confirmed_count": len(confirmed),
|
||
"review_count": len(review),
|
||
"confirmed_rate": round(confirmed_rate, 3),
|
||
"review_rate": round(1 - confirmed_rate, 3) if n_sel else 0,
|
||
"confirmed_evidence_rate": round(confirmed_evidence_rate, 3),
|
||
"dp_symptom_rate": round(symptom_count / n_sel, 3) if n_sel else 0,
|
||
"dp_comorbidity_rate": round(comorbidity_count / n_sel, 3) if n_sel else 0,
|
||
"dp_act_only_rate": round(act_only_count / n_sel, 3) if n_sel else 0,
|
||
"dp_r_code_rate": round(r_code_count / n_sel, 3) if n_sel else 0,
|
||
"confidence": {
|
||
"high": conf_high,
|
||
"medium": conf_med,
|
||
"low": conf_low,
|
||
},
|
||
"confidence_high_rate": round(conf_high / n_sel, 3) if n_sel else 0,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Évaluation gold CRH
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def load_gold(gold_path: str | Path) -> dict:
|
||
"""Charge le gold JSONL et retourne un index case_id → GoldCRHCase."""
|
||
from src.eval.gold_models import load_gold_index
|
||
return load_gold_index(Path(gold_path))
|
||
|
||
|
||
def evaluate_gold_cases(
|
||
dossier_details: list[dict],
|
||
gold_index: dict,
|
||
) -> list[dict]:
|
||
"""Évalue les dossiers présents dans le gold. Retourne une liste d'évaluations."""
|
||
from src.eval.gold_models import evaluate_dp
|
||
|
||
evals: list[dict] = []
|
||
for d in dossier_details:
|
||
case_id = d["id"]
|
||
if case_id not in gold_index:
|
||
continue
|
||
gold_case = gold_index[case_id]
|
||
sel = d.get("dp_selection") or {}
|
||
chosen_code = sel.get("chosen_code")
|
||
verdict = sel.get("verdict")
|
||
confidence = sel.get("confidence")
|
||
|
||
ev = evaluate_dp(chosen_code, gold_case)
|
||
ev["verdict"] = verdict
|
||
ev["confidence_nuke3"] = confidence
|
||
evals.append(ev)
|
||
return evals
|
||
|
||
|
||
def compute_gold_metrics(evals: list[dict]) -> dict:
|
||
"""Calcule les métriques agrégées sur les cas gold."""
|
||
n = len(evals)
|
||
if n == 0:
|
||
return {"n": 0}
|
||
|
||
strict = sum(1 for e in evals if e["exact_match_strict"])
|
||
tolerant = sum(1 for e in evals if e["exact_match_tolerant_codes"])
|
||
family3 = sum(1 for e in evals if e["family3_match_tolerant"])
|
||
acceptable = sum(1 for e in evals if e["acceptable_match"])
|
||
symptom_bad = sum(1 for e in evals if e["symptom_not_allowed"])
|
||
|
||
# Confirmed-only accuracy
|
||
confirmed_evals = [e for e in evals if e["verdict"] == "CONFIRMED"]
|
||
n_conf = len(confirmed_evals)
|
||
conf_acceptable = sum(1 for e in confirmed_evals if e["acceptable_match"])
|
||
|
||
return {
|
||
"n": n,
|
||
"exact_match_strict": strict,
|
||
"exact_match_strict_rate": round(strict / n, 3),
|
||
"exact_match_tolerant": tolerant,
|
||
"exact_match_tolerant_rate": round(tolerant / n, 3),
|
||
"family3_match": family3,
|
||
"family3_match_rate": round(family3 / n, 3),
|
||
"acceptable_match": acceptable,
|
||
"acceptable_match_rate": round(acceptable / n, 3),
|
||
"confirmed_accuracy_tolerant": round(conf_acceptable / n_conf, 3) if n_conf else None,
|
||
"confirmed_count": n_conf,
|
||
"symptom_not_allowed": symptom_bad,
|
||
"symptom_not_allowed_rate": round(symptom_bad / n, 3),
|
||
}
|
||
|
||
|
||
def write_gold_eval_csv(evals: list[dict], csv_path: Path) -> None:
|
||
"""Écrit le CSV d'évaluation gold."""
|
||
cols = [
|
||
"case_id", "chosen_code", "verdict", "confidence_nuke3",
|
||
"dp_expected_code", "acceptable_match", "exact_match_strict",
|
||
"symptom_not_allowed", "allow_symptom_dp", "confidence_gold",
|
||
]
|
||
csv_path.parent.mkdir(parents=True, exist_ok=True)
|
||
with open(csv_path, "w", newline="", encoding="utf-8") as f:
|
||
writer = csv.DictWriter(f, fieldnames=cols, extrasaction="ignore")
|
||
writer.writeheader()
|
||
for ev in evals:
|
||
row = {
|
||
"case_id": ev["case_id"],
|
||
"chosen_code": ev["chosen_code"] or "",
|
||
"verdict": ev["verdict"] or "",
|
||
"confidence_nuke3": ev["confidence_nuke3"] or "",
|
||
"dp_expected_code": ev["dp_expected_code"],
|
||
"acceptable_match": ev["acceptable_match"],
|
||
"exact_match_strict": ev["exact_match_strict"],
|
||
"symptom_not_allowed": ev["symptom_not_allowed"],
|
||
"allow_symptom_dp": ev["allow_symptom_dp"],
|
||
"confidence_gold": ev["confidence_gold"],
|
||
}
|
||
writer.writerow(row)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Re-run pipeline (mode --rerun)
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def check_ollama() -> bool:
|
||
"""Vérifie que Ollama est joignable."""
|
||
try:
|
||
import urllib.request
|
||
url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
||
req = urllib.request.Request(f"{url}/api/tags", method="GET")
|
||
urllib.request.urlopen(req, timeout=5)
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def run_pipeline_with_env(dossier_id: str, llm_flag: str) -> bool:
|
||
"""Lance le pipeline sur un dossier avec T2A_DP_RANKER_LLM=flag."""
|
||
env = os.environ.copy()
|
||
env["T2A_DP_RANKER_LLM"] = llm_flag
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
[PY, "-m", "src.main", str(INPUT_DIR / dossier_id)],
|
||
capture_output=True, text=True, cwd=str(ROOT),
|
||
timeout=600, env=env,
|
||
)
|
||
return result.returncode == 0
|
||
except Exception as e:
|
||
print(f" ERREUR: {e}")
|
||
return False
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Rapport Markdown
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _pct(v: float) -> str:
|
||
return f"{v * 100:.1f}%"
|
||
|
||
|
||
def generate_report(
|
||
metrics_off: dict,
|
||
metrics_on: dict | None,
|
||
dossier_details: list[dict],
|
||
args: argparse.Namespace,
|
||
gold_metrics: dict | None = None,
|
||
gold_evals: list[dict] | None = None,
|
||
) -> str:
|
||
"""Génère le rapport Markdown."""
|
||
lines: list[str] = []
|
||
now = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
|
||
# Commit hash
|
||
try:
|
||
commit = subprocess.check_output(
|
||
["git", "rev-parse", "--short", "HEAD"],
|
||
cwd=str(ROOT), text=True, stderr=subprocess.DEVNULL,
|
||
).strip()
|
||
except Exception:
|
||
commit = "?"
|
||
|
||
lines.append("# NUKE-3 — Benchmark Report")
|
||
lines.append("")
|
||
lines.append(f"**Date** : {now} ")
|
||
lines.append(f"**Commit** : `{commit}` ")
|
||
lines.append(f"**Dossiers analysés** : {metrics_off['n_total']} ")
|
||
lines.append(f"**Mode** : {'rerun pipeline' if args.rerun else 'analyse offline (JSON existants)'} ")
|
||
lines.append("")
|
||
|
||
# Table comparative
|
||
lines.append("## Métriques DIM-like")
|
||
lines.append("")
|
||
|
||
if metrics_on:
|
||
lines.append("| Métrique | LLM OFF | LLM ON | Delta |")
|
||
lines.append("|----------|---------|--------|-------|")
|
||
|
||
rows = [
|
||
("CONFIRMED rate", "confirmed_rate"),
|
||
("REVIEW rate", "review_rate"),
|
||
("CONFIRMED + evidence", "confirmed_evidence_rate"),
|
||
("DP symptôme (R*)", "dp_symptom_rate"),
|
||
("DP comorbidité", "dp_comorbidity_rate"),
|
||
("DP acte-seul", "dp_act_only_rate"),
|
||
("DP R-code", "dp_r_code_rate"),
|
||
("Confidence high", "confidence_high_rate"),
|
||
]
|
||
for label, key in rows:
|
||
v_off = metrics_off.get(key, 0)
|
||
v_on = metrics_on.get(key, 0)
|
||
delta = v_on - v_off
|
||
sign = "+" if delta > 0 else ""
|
||
lines.append(
|
||
f"| {label} | {_pct(v_off)} | {_pct(v_on)} | {sign}{_pct(delta)} |"
|
||
)
|
||
else:
|
||
lines.append("| Métrique | Valeur |")
|
||
lines.append("|----------|--------|")
|
||
rows_single = [
|
||
("CONFIRMED rate", "confirmed_rate"),
|
||
("REVIEW rate", "review_rate"),
|
||
("CONFIRMED + evidence", "confirmed_evidence_rate"),
|
||
("DP symptôme (R*)", "dp_symptom_rate"),
|
||
("DP comorbidité", "dp_comorbidity_rate"),
|
||
("DP acte-seul", "dp_act_only_rate"),
|
||
("DP R-code", "dp_r_code_rate"),
|
||
("Confidence high", "confidence_high_rate"),
|
||
]
|
||
for label, key in rows_single:
|
||
v = metrics_off.get(key, 0)
|
||
lines.append(f"| {label} | {_pct(v)} |")
|
||
|
||
lines.append("")
|
||
|
||
# Volumes
|
||
lines.append("## Volumes")
|
||
lines.append("")
|
||
lines.append(f"- Dossiers avec dp_selection : {metrics_off['n_with_selection']}/{metrics_off['n_total']}")
|
||
lines.append(f"- CONFIRMED : {metrics_off['confirmed_count']}")
|
||
lines.append(f"- REVIEW : {metrics_off['review_count']}")
|
||
c = metrics_off.get("confidence", {})
|
||
lines.append(f"- Confidence — high: {c.get('high', 0)}, medium: {c.get('medium', 0)}, low: {c.get('low', 0)}")
|
||
lines.append("")
|
||
|
||
# Détail par dossier
|
||
lines.append("## Détail par dossier")
|
||
lines.append("")
|
||
lines.append("| Dossier | Verdict | Code | Confidence | Evidence | Candidats | Reason |")
|
||
lines.append("|---------|---------|------|------------|----------|-----------|--------|")
|
||
for d in dossier_details:
|
||
sel = d.get("dp_selection", {})
|
||
if not sel:
|
||
lines.append(f"| {d['id']} | - | - | - | - | - | pas de dp_selection |")
|
||
continue
|
||
lines.append(
|
||
f"| {d['id']} "
|
||
f"| {sel.get('verdict', '-')} "
|
||
f"| {sel.get('chosen_code', '-')} "
|
||
f"| {sel.get('confidence', '-')} "
|
||
f"| {sel.get('n_evidence', 0)} "
|
||
f"| {sel.get('n_candidates', 0)} "
|
||
f"| {(sel.get('reason') or '-')[:60]} |"
|
||
)
|
||
|
||
# Section gold CRH
|
||
if gold_metrics and gold_metrics.get("n", 0) > 0:
|
||
gm = gold_metrics
|
||
lines.append("")
|
||
lines.append("## Évaluation Gold CRH")
|
||
lines.append("")
|
||
lines.append(f"**Cas gold évalués** : {gm['n']} ")
|
||
lines.append("")
|
||
lines.append("| Métrique | Valeur |")
|
||
lines.append("|----------|--------|")
|
||
lines.append(f"| Exact match (strict) | {_pct(gm['exact_match_strict_rate'])} ({gm['exact_match_strict']}/{gm['n']}) |")
|
||
lines.append(f"| Exact match (codes tolérants) | {_pct(gm['exact_match_tolerant_rate'])} ({gm['exact_match_tolerant']}/{gm['n']}) |")
|
||
lines.append(f"| Family3 match (tolérant) | {_pct(gm['family3_match_rate'])} ({gm['family3_match']}/{gm['n']}) |")
|
||
lines.append(f"| Acceptable match (codes OU family3) | {_pct(gm['acceptable_match_rate'])} ({gm['acceptable_match']}/{gm['n']}) |")
|
||
if gm["confirmed_accuracy_tolerant"] is not None:
|
||
lines.append(f"| Confirmed accuracy (tolérant) | {_pct(gm['confirmed_accuracy_tolerant'])} ({gm['confirmed_count']} CONFIRMED) |")
|
||
lines.append(f"| Symptôme non autorisé | {gm['symptom_not_allowed']}/{gm['n']} |")
|
||
lines.append("")
|
||
|
||
# Détail par cas gold
|
||
if gold_evals:
|
||
lines.append("### Détail par cas gold")
|
||
lines.append("")
|
||
lines.append("| Case ID | Choisi | Attendu | Strict | Acceptable | Symptôme interdit | Verdict |")
|
||
lines.append("|---------|--------|---------|--------|------------|-------------------|---------|")
|
||
for ev in gold_evals:
|
||
ok_s = "OK" if ev["exact_match_strict"] else "FAIL"
|
||
ok_a = "OK" if ev["acceptable_match"] else "FAIL"
|
||
sym = "OUI" if ev["symptom_not_allowed"] else "-"
|
||
lines.append(
|
||
f"| {ev['case_id']} "
|
||
f"| {ev['chosen_code'] or '-'} "
|
||
f"| {ev['dp_expected_code']} "
|
||
f"| {ok_s} "
|
||
f"| {ok_a} "
|
||
f"| {sym} "
|
||
f"| {ev['verdict'] or '-'} |"
|
||
)
|
||
lines.append("")
|
||
|
||
lines.append("")
|
||
lines.append("---")
|
||
lines.append(f"*Généré par `scripts/benchmark_nuke3_compare.py` — {now}*")
|
||
|
||
# Règle DIM rappel
|
||
lines.append("")
|
||
lines.append("> **Règle DIM** : `CONFIRMED` ⇒ `evidence` obligatoirement non vide.")
|
||
lines.append("> Un DP sans preuve exploitable est automatiquement `REVIEW`.")
|
||
|
||
return "\n".join(lines)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Main
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def _rebuild_and_select(data: dict) -> dict:
|
||
"""Reconstruit un DossierMedical depuis le JSON et exécute select_dp() offline.
|
||
|
||
Utile quand les JSON n'ont pas de champ dp_selection (générés avant NUKE-3).
|
||
"""
|
||
from src.config import DossierMedical, Diagnostic, Sejour
|
||
from src.medical.dp_selector import select_dp
|
||
|
||
dp_raw = data.get("diagnostic_principal", {})
|
||
das_raw = data.get("diagnostics_associes", [])
|
||
doc_type = data.get("document_type", "crh")
|
||
sej_raw = data.get("sejour", {})
|
||
|
||
dp_diag = None
|
||
if dp_raw and dp_raw.get("texte"):
|
||
dp_diag = Diagnostic(
|
||
texte=dp_raw.get("texte", ""),
|
||
cim10_suggestion=dp_raw.get("cim10_suggestion") or dp_raw.get("cim10_final"),
|
||
cim10_confidence=dp_raw.get("cim10_confidence"),
|
||
source=dp_raw.get("source"),
|
||
)
|
||
das_list = []
|
||
for d_item in das_raw:
|
||
code = d_item.get("cim10_suggestion") or d_item.get("cim10_final")
|
||
if not code:
|
||
continue
|
||
das_list.append(Diagnostic(
|
||
texte=d_item.get("texte", ""),
|
||
cim10_suggestion=code,
|
||
cim10_confidence=d_item.get("cim10_confidence"),
|
||
source=d_item.get("source"),
|
||
status=d_item.get("status"),
|
||
))
|
||
|
||
safe_sej = {k: v for k, v in sej_raw.items() if k in Sejour.model_fields}
|
||
dossier = DossierMedical(
|
||
document_type=doc_type,
|
||
sejour=Sejour(**safe_sej),
|
||
diagnostic_principal=dp_diag,
|
||
diagnostics_associes=das_list,
|
||
)
|
||
|
||
# Construire synthese depuis les champs disponibles.
|
||
# Les JSONs pré-NUKE-3 n'ont pas de sections CRH stockées.
|
||
# On récupère le texte de conclusion depuis les source_excerpt si besoin.
|
||
conclusion = data.get("conclusion_medicale", "")
|
||
if not conclusion:
|
||
# Chercher "CONCLUSION" dans source_excerpt des DAS ou traitements.
|
||
# Prendre l'extrait le plus long (les courts sont souvent tronqués).
|
||
best = ""
|
||
for container in (das_raw, data.get("traitements_sortie", [])):
|
||
for item in container:
|
||
excerpt = item.get("source_excerpt", "")
|
||
up = excerpt.upper()
|
||
if "CONCLUSION" in up:
|
||
idx = up.index("CONCLUSION")
|
||
candidate = excerpt[idx:]
|
||
if len(candidate) > len(best):
|
||
best = candidate
|
||
conclusion = best
|
||
|
||
synthese = {
|
||
"motif": data.get("motif_hospitalisation", ""),
|
||
"conclusion": conclusion,
|
||
"diag_sortie": data.get("synthese_medicale", {}).get("diag_sortie", ""),
|
||
"diag_principal": data.get("synthese_medicale", {}).get("diag_principal", ""),
|
||
"synthese": data.get("synthese_medicale", {}).get("synthese", ""),
|
||
}
|
||
|
||
selection = select_dp(dossier, synthese, config={"llm_enabled": False})
|
||
dossier.dp_selection = selection
|
||
|
||
# Finalizer DP (arbitrage Trackare vs CRH, traçabilité)
|
||
try:
|
||
from src.medical.dp_finalizer import finalize_dp
|
||
finalize_dp(dossier)
|
||
except Exception:
|
||
pass
|
||
|
||
# Utiliser dp_final si disponible, sinon dp_selection
|
||
final = dossier.dp_final or selection
|
||
|
||
# Convertir en dict compatible analyze_dp_selection
|
||
cands = [c.model_dump() for c in final.candidates]
|
||
result = {
|
||
"dp_selection": {
|
||
"verdict": final.verdict,
|
||
"confidence": final.confidence,
|
||
"chosen_code": final.chosen_code,
|
||
"chosen_term": final.chosen_term,
|
||
"candidates": cands,
|
||
"evidence": final.evidence,
|
||
"reason": final.reason,
|
||
"debug_scores": final.debug_scores,
|
||
}
|
||
}
|
||
if dossier.dp_final:
|
||
result["dp_final"] = dossier.dp_final.model_dump(exclude_none=True)
|
||
if dossier.quality_flags:
|
||
result["quality_flags"] = dossier.quality_flags
|
||
return result
|
||
|
||
|
||
def _run_debug_reports(
|
||
args: argparse.Namespace,
|
||
dossier_ids: list[str],
|
||
dossier_details: list[dict],
|
||
gold_index: dict | None,
|
||
gold_evals: list[dict] | None,
|
||
out_dir: Path,
|
||
) -> None:
|
||
"""Exécute les modes --case-id, --top-errors, --dim-pack."""
|
||
from src.eval.gold_debug import (
|
||
build_case_report,
|
||
write_case_report,
|
||
build_error_entry,
|
||
sort_error_entries,
|
||
write_top_errors_csv,
|
||
write_top_errors_md,
|
||
write_top_errors_jsonl,
|
||
select_dim_pack_cases,
|
||
write_dim_pack,
|
||
)
|
||
from src.eval.gold_models import evaluate_dp
|
||
|
||
has_debug = args.case_id or args.top_errors > 0 or args.dim_pack > 0
|
||
if not has_debug:
|
||
return
|
||
|
||
# Helper : build full report for a case
|
||
def _build_report_for(case_id: str) -> dict | None:
|
||
data = load_dossier_json(case_id)
|
||
if not data:
|
||
return None
|
||
|
||
# Offline rebuild si nécessaire
|
||
if args.offline and not data.get("dp_selection"):
|
||
rebuilt = _rebuild_and_select(data)
|
||
data["dp_selection"] = rebuilt["dp_selection"]
|
||
|
||
dp_sel = data.get("dp_selection")
|
||
|
||
gold_case_dict = None
|
||
eval_result = None
|
||
if gold_index and case_id in gold_index:
|
||
gc = gold_index[case_id]
|
||
gold_case_dict = gc.model_dump()
|
||
chosen_code = (dp_sel or {}).get("chosen_code")
|
||
eval_result = evaluate_dp(chosen_code, gc)
|
||
|
||
return build_case_report(case_id, data, dp_sel, gold_case_dict, eval_result)
|
||
|
||
# --case-id
|
||
if args.case_id:
|
||
cid = args.case_id.strip()
|
||
data = load_dossier_json(cid)
|
||
if not data:
|
||
print(f"ERREUR: output JSON introuvable pour {cid}")
|
||
print(f" Suggestion : relancer le pipeline avec --rerun ou vérifier output/structured/{cid}/")
|
||
sys.exit(1)
|
||
if gold_index and cid not in gold_index:
|
||
print(f"ERREUR: {cid} absent du gold ({len(gold_index)} cas chargés)")
|
||
sys.exit(1)
|
||
|
||
report = _build_report_for(cid)
|
||
if report:
|
||
jp, mp = write_case_report(report, out_dir)
|
||
print(f"\n=== Case debug: {cid} ===")
|
||
print(f" JSON : {jp}")
|
||
print(f" MD : {mp}")
|
||
|
||
# --top-errors
|
||
if args.top_errors > 0:
|
||
if not gold_index:
|
||
print("ERREUR: --top-errors requiert --gold (ou auto-détection gold_crh.jsonl)")
|
||
sys.exit(1)
|
||
|
||
# Build reports for all gold cases
|
||
all_reports: list[dict] = []
|
||
gold_case_ids = set(gold_index.keys())
|
||
for cid in dossier_ids:
|
||
if cid not in gold_case_ids:
|
||
continue
|
||
r = _build_report_for(cid)
|
||
if r:
|
||
all_reports.append(r)
|
||
|
||
entries = [build_error_entry(r) for r in all_reports]
|
||
entries = sort_error_entries(entries)
|
||
entries = entries[:args.top_errors]
|
||
|
||
csv_p = out_dir / "NUKE3_GOLD_TOP_ERRORS.csv"
|
||
md_p = out_dir / "NUKE3_GOLD_TOP_ERRORS.md"
|
||
jsonl_p = out_dir / "NUKE3_GOLD_TOP_ERRORS.jsonl"
|
||
|
||
write_top_errors_csv(entries, csv_p)
|
||
write_top_errors_md(entries, md_p)
|
||
write_top_errors_jsonl(entries, jsonl_p)
|
||
|
||
print(f"\n=== Top {len(entries)} erreurs gold ===")
|
||
print(f" CSV : {csv_p}")
|
||
print(f" MD : {md_p}")
|
||
print(f" JSONL : {jsonl_p}")
|
||
|
||
# --dim-pack
|
||
if args.dim_pack > 0:
|
||
# Build reports for all CRH (non-trackare) dossiers
|
||
all_reports_dim: list[dict] = []
|
||
for cid in dossier_ids:
|
||
r = _build_report_for(cid)
|
||
if r and r["document_type"] != "trackare":
|
||
all_reports_dim.append(r)
|
||
elif r and r["prediction"]["verdict"] == "REVIEW":
|
||
# Include trackare-sans-DP too (they go through scoring)
|
||
all_reports_dim.append(r)
|
||
|
||
selected = select_dim_pack_cases(all_reports_dim, args.dim_pack)
|
||
csv_p, cases_dir = write_dim_pack(selected, out_dir)
|
||
|
||
print(f"\n=== DIM Pack ({len(selected)} cas) ===")
|
||
print(f" CSV : {csv_p}")
|
||
print(f" Cas JSON : {cases_dir}/")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(description="Benchmark NUKE-3 comparatif")
|
||
parser.add_argument("--n", type=int, default=0, help="Nombre de dossiers (0=tous)")
|
||
parser.add_argument("--dossiers", type=str, default="", help="IDs séparés par virgules")
|
||
parser.add_argument("--rerun", action="store_true", help="Relancer le pipeline (nécessite Ollama pour LLM on)")
|
||
parser.add_argument("--offline", action="store_true",
|
||
help="Exécuter NUKE-3 offline (reconstruit DossierMedical depuis JSON, LLM off)")
|
||
parser.add_argument("--gold", type=str, default="",
|
||
help="Fichier JSONL gold CRH (évaluation tolérante)")
|
||
parser.add_argument("--case-id", type=str, default="",
|
||
help="Rapport détaillé pour un cas (ex: 74_23141536)")
|
||
parser.add_argument("--top-errors", type=int, default=0,
|
||
help="Top N erreurs gold (ex: 20)")
|
||
parser.add_argument("--dim-pack", type=int, default=0,
|
||
help="Pack DIM de N cas CRH à annoter (ex: 20)")
|
||
parser.add_argument("--out-dir", type=str, default=str(ROOT / "docs" / "gold_debug"),
|
||
help="Dossier de sortie pour debug reports")
|
||
parser.add_argument("--output", type=str, default=str(REPORT_PATH), help="Chemin du rapport")
|
||
args = parser.parse_args()
|
||
|
||
specific = [d.strip() for d in args.dossiers.split(",") if d.strip()] if args.dossiers else None
|
||
dossier_ids = select_dossiers(args.n, specific)
|
||
|
||
if not dossier_ids:
|
||
print("ERREUR: aucun dossier trouvé")
|
||
sys.exit(1)
|
||
|
||
print(f"NUKE-3 benchmark — {len(dossier_ids)} dossiers")
|
||
|
||
# Mode rerun
|
||
if args.rerun:
|
||
ollama_ok = check_ollama()
|
||
print(f" Ollama: {'OK' if ollama_ok else 'INDISPONIBLE'}")
|
||
|
||
# Pass 1 : LLM OFF
|
||
print("\n=== Pass 1 : T2A_DP_RANKER_LLM=0 ===")
|
||
for did in dossier_ids:
|
||
ok = run_pipeline_with_env(did, "0")
|
||
status = "OK" if ok else "FAIL"
|
||
print(f" {did}: {status}")
|
||
|
||
# Analyse JSON existants (ou résultat du pass 1)
|
||
print("\n=== Analyse des dossiers ===")
|
||
analyses_off: list[dict] = []
|
||
dossier_details: list[dict] = []
|
||
|
||
for did in dossier_ids:
|
||
data = load_dossier_json(did)
|
||
if not data:
|
||
print(f" {did}: JSON introuvable")
|
||
dossier_details.append({"id": did, "dp_selection": None})
|
||
continue
|
||
|
||
# Mode offline : reconstruire le DossierMedical et exécuter select_dp
|
||
if args.offline and not data.get("dp_selection"):
|
||
rebuilt = _rebuild_and_select(data)
|
||
data["dp_selection"] = rebuilt["dp_selection"]
|
||
|
||
analysis = analyze_dp_selection(data)
|
||
analyses_off.append(analysis)
|
||
dossier_details.append({"id": did, "dp_selection": analysis})
|
||
|
||
verdict = analysis["verdict"] or "-"
|
||
code = analysis["chosen_code"] or "-"
|
||
print(f" {did}: {verdict} — {code} (evidence: {analysis['n_evidence']})")
|
||
|
||
metrics_off = compute_metrics(analyses_off)
|
||
|
||
# Pass 2 : LLM ON (si rerun + Ollama dispo)
|
||
metrics_on = None
|
||
if args.rerun:
|
||
if not check_ollama():
|
||
print("\nWARN: Ollama indisponible — pass LLM ON ignorée")
|
||
print(" Le rapport ne contiendra que les métriques LLM OFF")
|
||
else:
|
||
print("\n=== Pass 2 : T2A_DP_RANKER_LLM=1 ===")
|
||
for did in dossier_ids:
|
||
ok = run_pipeline_with_env(did, "1")
|
||
status = "OK" if ok else "FAIL"
|
||
print(f" {did}: {status}")
|
||
|
||
analyses_on: list[dict] = []
|
||
for did in dossier_ids:
|
||
data = load_dossier_json(did)
|
||
if data:
|
||
analyses_on.append(analyze_dp_selection(data))
|
||
metrics_on = compute_metrics(analyses_on)
|
||
|
||
# Gold CRH
|
||
gold_metrics = None
|
||
gold_evals = None
|
||
gold_index = None
|
||
|
||
gold_path = args.gold
|
||
if not gold_path:
|
||
# Auto-détection
|
||
default_gold = ROOT / "data" / "gold_crh" / "gold_crh.jsonl"
|
||
if default_gold.exists():
|
||
gold_path = str(default_gold)
|
||
|
||
if gold_path:
|
||
try:
|
||
gold_index = load_gold(gold_path)
|
||
print(f"\n=== Évaluation Gold CRH ({len(gold_index)} cas) ===")
|
||
gold_evals = evaluate_gold_cases(dossier_details, gold_index)
|
||
gold_metrics = compute_gold_metrics(gold_evals)
|
||
|
||
for ev in gold_evals:
|
||
match_str = "OK" if ev["acceptable_match"] else "FAIL"
|
||
sym_str = " [R* interdit]" if ev["symptom_not_allowed"] else ""
|
||
print(f" {ev['case_id']}: {ev['chosen_code'] or '-'} vs {ev['dp_expected_code']}"
|
||
f" → {match_str}{sym_str}")
|
||
|
||
# CSV évaluation
|
||
csv_out = ROOT / "docs" / "NUKE3_GOLD_EVAL.csv"
|
||
write_gold_eval_csv(gold_evals, csv_out)
|
||
print(f"\nCSV évaluation : {csv_out}")
|
||
except Exception as e:
|
||
print(f"\nERREUR gold : {e}")
|
||
gold_metrics = None
|
||
gold_evals = None
|
||
|
||
# --- Debug reports (--case-id, --top-errors, --dim-pack) ---
|
||
out_dir = Path(args.out_dir)
|
||
_run_debug_reports(args, dossier_ids, dossier_details, gold_index, gold_evals, out_dir)
|
||
|
||
# Rapport
|
||
report = generate_report(
|
||
metrics_off, metrics_on, dossier_details, args,
|
||
gold_metrics=gold_metrics, gold_evals=gold_evals,
|
||
)
|
||
output_path = Path(args.output)
|
||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||
output_path.write_text(report, encoding="utf-8")
|
||
print(f"\nRapport écrit : {output_path}")
|
||
|
||
# Résumé console
|
||
print(f"\n{'='*50}")
|
||
print(f"CONFIRMED : {metrics_off['confirmed_count']}/{metrics_off['n_with_selection']}"
|
||
f" ({_pct(metrics_off['confirmed_rate'])})")
|
||
print(f"REVIEW : {metrics_off['review_count']}/{metrics_off['n_with_selection']}"
|
||
f" ({_pct(metrics_off['review_rate'])})")
|
||
print(f"Evidence : {_pct(metrics_off['confirmed_evidence_rate'])} des CONFIRMED")
|
||
print(f"DP symptôme : {_pct(metrics_off['dp_symptom_rate'])}")
|
||
print(f"DP comorbidité: {_pct(metrics_off['dp_comorbidity_rate'])}")
|
||
if gold_metrics and gold_metrics.get("n", 0) > 0:
|
||
gm = gold_metrics
|
||
print(f"\n--- Gold CRH ({gm['n']} cas) ---")
|
||
print(f"Strict match : {_pct(gm['exact_match_strict_rate'])}")
|
||
print(f"Acceptable match : {_pct(gm['acceptable_match_rate'])}")
|
||
if gm['confirmed_accuracy_tolerant'] is not None:
|
||
print(f"Confirmed acc. : {_pct(gm['confirmed_accuracy_tolerant'])}")
|
||
print(f"Symptôme interdit: {gm['symptom_not_allowed']}")
|
||
print(f"{'='*50}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|