feat(pmsi): add DP candidate pool + pool rank LLM + benchmark

- DPPoolCandidate model (terme, section, preuve, score_initial)
- build_dp_candidate_pool() with filters (_is_pool_excluded, _dedup_pool)
- Pool exclusion: admin noise, bio values, vague symptoms, place names
- DP_POOL_RANK template for LLM-based ranking among pool candidates
- llm_dp_pool_rank() with guardrails (GF-1 evidence, GF-3 confidence)
- benchmark_quality.py: --dp-candidates, --use-dp-pool-rank flags
- 41 new tests (pool, exclusion, dedup, pool rank, synthese)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-24 00:06:44 +01:00
parent 56c38c3d98
commit da34bdc8d7
6 changed files with 1911 additions and 2 deletions

660
benchmark_quality.py Normal file
View File

@@ -0,0 +1,660 @@
#!/usr/bin/env python3
"""Benchmark qualité DP scoring déterministe vs pipeline LLM.
Compare le DP trouvé par le nouveau scoring déterministe (± fallback LLM)
avec le DP de référence (gold) extrait par le pipeline complet (avec LLM).
Métriques (calculées sur dossiers avec gold_dp non-None uniquement) :
- exact_match : code identique
- family4 : 4 premiers chars identiques (ex: K85.1 vs K85.0)
- family3 : 3 premiers chars identiques (ex: K85.x vs K86.x → non)
- coverage_dp : % de dossiers où un DP est proposé (new_code non-None)
Usage:
.venv/bin/python3 benchmark_quality.py [--limit 50] [--verbose]
.venv/bin/python3 benchmark_quality.py --limit 50 --use-llm --verbose
"""
from __future__ import annotations
import argparse
import json
import sys
import time
from collections import Counter
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent))
from src.extraction.crh_parser import parse_crh
from src.config import DossierMedical, Diagnostic
from src.medical.cim10_extractor import (
_extract_sejour,
_extract_actes,
_extract_biologie,
_extract_imagerie,
)
from src.medical.dp_scoring import (
build_dp_shortlist,
build_dp_candidate_pool,
score_candidates,
select_dp,
llm_dp_fallback,
llm_dp_pool_rank,
generate_synthese_pmsi,
)
BASE = Path(__file__).resolve().parent
ANON_DIR = BASE / "output" / "anonymized"
STRUCT_DIR = BASE / "output" / "structured"
def find_crh_dossiers(limit: int = 50) -> list[dict]:
"""Trouve les dossiers avec CRH anonymisé ET JSON gold."""
dossiers = []
for anon_dir in sorted(ANON_DIR.iterdir()):
if not anon_dir.is_dir():
continue
dir_name = anon_dir.name
crh_files = list(anon_dir.glob("CRH_*_anonymized.txt"))
if not crh_files:
continue
crh_file = crh_files[0]
crh_name = crh_file.stem.replace("_anonymized", "")
gold_json = STRUCT_DIR / dir_name / f"{crh_name}_cim10.json"
if not gold_json.exists():
continue
dossiers.append({
"dir_name": dir_name,
"crh_name": crh_name,
"text_path": crh_file,
"gold_path": gold_json,
})
if len(dossiers) >= limit:
break
return dossiers
def load_gold_dp(gold_path: Path) -> dict:
"""Charge le DP de référence depuis le JSON gold."""
data = json.loads(gold_path.read_text(encoding="utf-8"))
dp = data.get("diagnostic_principal", {})
return {
"code": dp.get("cim10_suggestion"),
"label": dp.get("texte", ""),
"confidence": dp.get("cim10_confidence", ""),
"source": dp.get("source", ""),
}
def run_dp_only(text_path: Path, use_llm: bool = False) -> dict:
"""Extraction DP ciblée : scoring déterministe + fallback LLM optionnel."""
text = text_path.read_text(encoding="utf-8")
parsed = parse_crh(text)
dossier = DossierMedical()
dossier.document_type = parsed.get("type", "")
_extract_sejour(parsed, dossier)
_extract_biologie(text, dossier)
_extract_actes(text, dossier)
_extract_imagerie(text, dossier)
edsnlp_result = None
try:
from src.medical.edsnlp_pipeline import run_edsnlp
edsnlp_result = run_edsnlp(text)
except Exception:
pass
candidates = build_dp_shortlist(parsed, text, edsnlp_result, dossier)
candidates = score_candidates(candidates, dossier, full_text=text)
selection = select_dp(candidates, dossier, use_llm=use_llm)
# Instrumentation : comorbidity fallback
comorbidity_fallback = (
selection.verdict == "review"
and "comorbidité banale" in (selection.winner_reason or "")
)
dp_pre_llm = None
if comorbidity_fallback and selection.candidates:
c0 = selection.candidates[0]
dp_pre_llm = {"code": c0.code, "section": c0.source_section}
if use_llm and selection.verdict == "review":
llm_selection = llm_dp_fallback(
parsed, text, dossier,
dp_candidates=candidates,
edsnlp_result=edsnlp_result,
)
if llm_selection.candidates:
all_candidates = list(llm_selection.candidates)
if selection.candidates:
all_candidates.extend(selection.candidates)
llm_selection.candidates = all_candidates
selection = llm_selection
dossier.dp_selection = selection
if selection.candidates:
winner = selection.candidates[0]
dossier.diagnostic_principal = Diagnostic(
texte=winner.label,
cim10_suggestion=winner.code,
source=winner.source_section,
source_page=winner.source_page,
source_excerpt=winner.source_excerpt,
)
result = {
"dp_code": None,
"dp_label": "",
"dp_source": "",
"verdict": None,
"winner_reason": None,
"candidates": [],
"comorbidity_fallback": comorbidity_fallback,
"dp_pre_llm": dp_pre_llm,
}
if dossier.diagnostic_principal:
result["dp_code"] = dossier.diagnostic_principal.cim10_suggestion
result["dp_label"] = dossier.diagnostic_principal.texte
result["dp_source"] = dossier.diagnostic_principal.source or ""
if dossier.dp_selection:
sel = dossier.dp_selection
result["verdict"] = sel.verdict
result["winner_reason"] = sel.winner_reason
result["candidates"] = [
{"code": c.code, "label": c.label, "section": c.source_section,
"score": c.score, "details": c.score_details}
for c in sel.candidates
]
return result
def run_dp_pool_rank(text_path: Path) -> dict:
"""DP Pool Rank : génère SynthesePMSI + pool, puis LLM choisit parmi le pool."""
text = text_path.read_text(encoding="utf-8")
parsed = parse_crh(text)
dossier = DossierMedical()
dossier.document_type = parsed.get("type", "")
_extract_sejour(parsed, dossier)
_extract_biologie(text, dossier)
_extract_actes(text, dossier)
_extract_imagerie(text, dossier)
edsnlp_result = None
try:
from src.medical.edsnlp_pipeline import run_edsnlp
edsnlp_result = run_edsnlp(text)
except Exception:
pass
# 1. Synthèse PMSI
synthese = generate_synthese_pmsi(parsed, text, dossier)
# 2. Pool de candidats
pool = build_dp_candidate_pool(parsed, text, edsnlp_result, dossier)
# 3. LLM pool rank
dp_shortlist = build_dp_shortlist(parsed, text, edsnlp_result, dossier)
dp_shortlist = score_candidates(dp_shortlist, dossier, full_text=text)
selection = llm_dp_pool_rank(
parsed, text, dossier,
pool_candidates=pool,
synthese=synthese,
fallback_oneshot=True,
dp_candidates=dp_shortlist,
edsnlp_result=edsnlp_result,
)
dossier.dp_selection = selection
if selection.candidates:
winner = selection.candidates[0]
dossier.diagnostic_principal = Diagnostic(
texte=winner.label,
cim10_suggestion=winner.code,
source=winner.source_section,
source_page=winner.source_page,
source_excerpt=winner.source_excerpt,
)
result = {
"dp_code": None,
"dp_label": "",
"dp_source": "",
"verdict": None,
"winner_reason": None,
"candidates": [],
"pool_size": len(pool),
"pool_top10": [
{"terme": c.terme, "section": c.section,
"preuve": c.preuve[:120], "score": round(c.score_initial, 2)}
for c in pool[:10]
],
"synthese": synthese.model_dump() if synthese else None,
}
if dossier.diagnostic_principal:
result["dp_code"] = dossier.diagnostic_principal.cim10_suggestion
result["dp_label"] = dossier.diagnostic_principal.texte
result["dp_source"] = dossier.diagnostic_principal.source or ""
if dossier.dp_selection:
sel = dossier.dp_selection
result["verdict"] = sel.verdict
result["winner_reason"] = sel.winner_reason
result["candidates"] = [
{"code": c.code, "label": c.label, "section": c.source_section,
"score": c.score, "details": c.score_details}
for c in sel.candidates
]
return result
# --- Matching helpers ---
def _norm(code: str) -> str:
"""Normalise un code CIM-10 pour comparaison : supprime le point."""
return code.replace(".", "")
def match_exact(a: str | None, b: str | None) -> bool:
if not a or not b:
return False
return a == b
def match_family4(a: str | None, b: str | None) -> bool:
"""4 premiers chars sans point identiques (ex: K851 vs K850 → True)."""
if not a or not b:
return False
return _norm(a)[:4] == _norm(b)[:4]
def match_family3(a: str | None, b: str | None) -> bool:
"""3 premiers chars identiques (ex: K85.x → K85)."""
if not a or not b:
return False
return a[:3] == b[:3]
def main():
parser = argparse.ArgumentParser(description="Benchmark qualité DP scoring")
parser.add_argument("--limit", type=int, default=50, help="Nombre de dossiers")
parser.add_argument("--verbose", action="store_true", help="Afficher chaque dossier")
parser.add_argument("--use-llm", action="store_true",
help="Activer le fallback LLM DP sur les REVIEW (nécessite Ollama)")
parser.add_argument("--synthese-pmsi", action="store_true",
help="Générer la SynthesePMSI pour chaque dossier (nécessite Ollama)")
parser.add_argument("--dp-candidates", action="store_true",
help="Générer et afficher le DP Candidate Pool pour chaque dossier")
parser.add_argument("--use-dp-pool-rank", action="store_true",
help="Utiliser le mode DP Pool Rank (LLM choisit parmi le pool, nécessite Ollama)")
args = parser.parse_args()
mode = "déterministe + LLM fallback" if args.use_llm else "déterministe seul"
if args.use_dp_pool_rank:
mode = "DP Pool Rank (LLM choisit parmi pool)"
if args.synthese_pmsi:
mode += " + SynthesePMSI"
if args.dp_candidates:
mode += " + DPCandidatePool"
print(f"=== Benchmark DP scoring {mode} (n={args.limit}) ===\n")
dossiers = find_crh_dossiers(limit=args.limit)
print(f"Dossiers CRH trouvés : {len(dossiers)}\n")
if not dossiers:
print("ERREUR : aucun dossier CRH avec gold JSON trouvé")
return
# Compteurs
total = len(dossiers)
gold_none = 0 # gold_dp = None (exclus des métriques de match)
evaluable = 0 # gold_dp non-None → base pour exact/family
exact = 0
fam4 = 0
fam3 = 0
coverage_has_dp = 0 # new_code non-None (sur total)
review_count = 0
confirmed_count = 0
comorbidity_fallback_count = 0
errors: list[dict] = []
review_reasons: list[str] = []
syntheses: list[dict] = [] # (crh_name, gold_code, new_code, synthese_dict)
dp_pools: list[dict] = []
pool_rank_results: list[dict] = [] # résultats détaillés pool rank
gold_confidences = Counter()
verdicts = Counter()
source_sections = Counter()
timings: list[float] = []
for i, d in enumerate(dossiers):
gold = load_gold_dp(d["gold_path"])
gold_code = gold["code"]
gold_confidences[gold["confidence"] or "none"] += 1
t0 = time.time()
if args.use_dp_pool_rank:
result = run_dp_pool_rank(d["text_path"])
pool_rank_results.append({
"crh": d["crh_name"],
"gold_code": gold_code,
"dp_label": result["dp_label"],
"dp_source": result["dp_source"],
"verdict": result["verdict"],
"winner_reason": result["winner_reason"],
"pool_size": result.get("pool_size", 0),
"pool_top10": result.get("pool_top10", []),
"synthese": result.get("synthese"),
"candidates": result.get("candidates", []),
})
else:
result = run_dp_only(d["text_path"], use_llm=args.use_llm)
elapsed = time.time() - t0
timings.append(elapsed)
new_code = result["dp_code"]
# SynthesePMSI optionnelle
if args.synthese_pmsi:
text_synth = d["text_path"].read_text(encoding="utf-8")
parsed_synth = parse_crh(text_synth)
dossier_tmp = DossierMedical()
dossier_tmp.document_type = parsed_synth.get("type", "")
_extract_sejour(parsed_synth, dossier_tmp)
_extract_actes(text_synth, dossier_tmp)
synthese = generate_synthese_pmsi(parsed_synth, text_synth, dossier_tmp)
syntheses.append({
"crh": d["crh_name"],
"gold_code": gold_code,
"new_code": new_code,
"synthese": synthese.model_dump() if synthese else None,
})
# DP Candidate Pool optionnel
if args.dp_candidates:
text_pool = d["text_path"].read_text(encoding="utf-8")
parsed_pool = parse_crh(text_pool)
dossier_pool = DossierMedical()
dossier_pool.document_type = parsed_pool.get("type", "")
_extract_sejour(parsed_pool, dossier_pool)
_extract_actes(text_pool, dossier_pool)
edsnlp_pool = None
try:
from src.medical.edsnlp_pipeline import run_edsnlp
edsnlp_pool = run_edsnlp(text_pool)
except Exception:
pass
pool = build_dp_candidate_pool(parsed_pool, text_pool, edsnlp_pool, dossier_pool)
dp_pools.append({
"crh": d["crh_name"],
"gold_code": gold_code,
"new_code": new_code,
"pool_size": len(pool),
"candidates": [
{"terme": c.terme, "section": c.section,
"preuve": c.preuve[:120], "score": round(c.score_initial, 2)}
for c in pool
],
})
verdict = result["verdict"]
verdicts[verdict or "no_selection"] += 1
if result["dp_source"]:
source_sections[result["dp_source"]] += 1
# Coverage : new_code proposé (sur total)
if new_code:
coverage_has_dp += 1
# Métriques de match : uniquement si gold_dp non-None
if gold_code is None:
gold_none += 1
else:
evaluable += 1
is_exact = match_exact(new_code, gold_code)
is_f4 = match_family4(new_code, gold_code)
is_f3 = match_family3(new_code, gold_code)
if is_exact:
exact += 1
if is_f4:
fam4 += 1
if is_f3:
fam3 += 1
# Erreurs (non-exact avec gold)
if not is_exact:
errors.append({
"dir": d["dir_name"],
"crh": d["crh_name"],
"gold_code": gold_code,
"gold_label": gold["label"],
"gold_conf": gold["confidence"],
"new_code": new_code or "(aucun)",
"new_label": result["dp_label"] or "(aucun)",
"new_source": result["dp_source"],
"verdict": verdict,
"winner_reason": result["winner_reason"] or "",
"candidates": result["candidates"][:3],
"is_f4": is_f4,
"is_f3": is_f3,
})
if result.get("comorbidity_fallback"):
comorbidity_fallback_count += 1
if verdict == "review":
review_count += 1
if result["winner_reason"]:
review_reasons.append(result["winner_reason"])
elif verdict == "confirmed":
confirmed_count += 1
if args.verbose:
if gold_code is None:
tag = "SKIP"
elif match_exact(new_code, gold_code):
tag = "EXACT"
elif match_family4(new_code, gold_code):
tag = "FAM4"
elif match_family3(new_code, gold_code):
tag = "FAM3"
else:
tag = "MISS"
print(f" [{i+1:3d}] {d['crh_name']} : gold={gold_code} new={new_code} "
f"[{tag}] verdict={verdict} ({elapsed:.1f}s)")
# === Rapport ===
print(f"\n{'='*60}")
print(f"RESULTATS — {total} dossiers CRH ({mode})")
print(f"{'='*60}\n")
pct = lambda n, d: n / d * 100 if d else 0
print(f" Évaluables (gold non-None) : {evaluable}/{total} (excl. {gold_none} sans gold DP)")
print()
print(f" DP exact match : {exact}/{evaluable} ({pct(exact, evaluable):.1f}%)")
print(f" DP family4 : {fam4}/{evaluable} ({pct(fam4, evaluable):.1f}%)")
print(f" DP family3 : {fam3}/{evaluable} ({pct(fam3, evaluable):.1f}%)")
print(f" Coverage DP : {coverage_has_dp}/{total} ({pct(coverage_has_dp, total):.1f}%)")
print()
print(f" Verdict REVIEW : {review_count}/{total} ({pct(review_count, total):.1f}%)")
print(f" Verdict CONFIRM: {confirmed_count}/{total} ({pct(confirmed_count, total):.1f}%)")
print(f" Comorbidité FB : {comorbidity_fallback_count}/{total} ({pct(comorbidity_fallback_count, total):.1f}%)")
if timings:
avg_t = sum(timings) / len(timings)
print(f"\n Temps moyen : {avg_t:.1f}s/dossier")
print(f" Temps total : {sum(timings):.1f}s")
print(f"\n Gold confidence :")
for conf, cnt in gold_confidences.most_common():
print(f" {conf:8s} : {cnt}")
print(f"\n Sources DP (new) :")
for src, cnt in source_sections.most_common():
print(f" {src:35s} : {cnt}")
print(f"\n Verdicts :")
for v, cnt in verdicts.most_common():
print(f" {v:15s} : {cnt}")
if review_reasons:
print(f"\n Top 5 review reasons :")
reason_patterns = Counter()
for r in review_reasons:
if "aucun candidat" in r:
reason_patterns["aucun candidat DP trouvé"] += 1
elif "delta insuffisant" in r:
reason_patterns["delta insuffisant (ambiguïté)"] += 1
elif "evidence_excerpt vide" in r:
reason_patterns["LLM: evidence_excerpt vide"] += 1
elif "comorbidité" in r:
reason_patterns["LLM: comorbidité hors section forte"] += 1
elif "code invalide" in r:
reason_patterns["LLM: code CIM-10 invalide"] += 1
elif "LLM non disponible" in r or "erreur LLM" in r:
reason_patterns["LLM: erreur/indisponible"] += 1
elif "réponse LLM invalide" in r:
reason_patterns["LLM: réponse invalide"] += 1
elif "section faible" in r or "confidence" in r:
reason_patterns["LLM: garde-fou (section/confidence)"] += 1
else:
reason_patterns[r[:60]] += 1
for reason, cnt in reason_patterns.most_common(5):
print(f" [{cnt:2d}] {reason}")
if errors:
print(f"\n{'='*60}")
print(f"ERREURS DP — {len(errors)} dossiers (5 premiers)")
print(f"{'='*60}\n")
for e in errors[:5]:
fam_tag = " [fam4]" if e.get("is_f4") else (" [fam3]" if e.get("is_f3") else "")
print(f" {e['crh']} ({e['dir']}){fam_tag}")
print(f" Gold : {e['gold_code']}{e['gold_label'][:60]} (conf={e['gold_conf']})")
print(f" New : {e['new_code']}{e['new_label'][:60]}")
print(f" Source: {e['new_source']}, Verdict: {e['verdict']}")
if e.get('winner_reason'):
print(f" Reason: {e['winner_reason'][:80]}")
if e['candidates']:
print(f" Candidats :")
for c in e['candidates']:
print(f" {c['code']}{c['label'][:50]} "
f"(section={c['section']}, score={c['score']})")
print()
# Affichage des synthèses PMSI si activé
if args.synthese_pmsi and syntheses:
print(f"\n{'='*60}")
print(f"SYNTHESES PMSI — {len(syntheses)} dossiers")
print(f"{'='*60}")
for s in syntheses:
print(f"\n --- {s['crh']} (gold={s['gold_code']}, new={s['new_code']}) ---")
syn = s.get("synthese")
if not syn:
print(" (échec génération)")
continue
print(f" Motif admission : {syn.get('motif_admission', '')[:100]}")
print(f" Problème PEC : {syn.get('probleme_pris_en_charge', '')[:100]}")
print(f" Diagnostic retenu : {syn.get('diagnostic_retenu', '')[:100]}")
actes = syn.get("actes_ou_traitements_majeurs", [])
if actes:
print(f" Actes/traitements : {', '.join(a[:60] for a in actes[:4])}")
compli = syn.get("complications", [])
if compli:
print(f" Complications : {', '.join(c[:60] for c in compli[:3])}")
comor = syn.get("terrain_comorbidites", [])
if comor:
print(f" Terrain/comorbidités: {', '.join(c[:60] for c in comor[:5])}")
preuves = syn.get("preuves", [])
if preuves:
print(f" Preuves ({len(preuves)}) :")
for p in preuves[:3]:
print(f" [{p.get('section', '?')}] {p.get('excerpt', '')[:120]}")
# Affichage des résultats DP Pool Rank si activé
if args.use_dp_pool_rank and pool_rank_results:
print(f"\n{'='*60}")
print(f"DP POOL RANK — {len(pool_rank_results)} dossiers")
print(f"{'='*60}")
chosen_ok = sum(1 for r in pool_rank_results if r["dp_label"])
print(f"\n Choix effectué : {chosen_ok}/{len(pool_rank_results)} "
f"({chosen_ok/len(pool_rank_results)*100:.0f}%)")
for r in pool_rank_results:
print(f"\n --- {r['crh']} (gold={r['gold_code']}) ---")
# SynthesePMSI
syn = r.get("synthese")
if syn:
print(f" SynthesePMSI :")
print(f" Motif admission : {syn.get('motif_admission', '')[:80]}")
print(f" Problème PEC : {syn.get('probleme_pris_en_charge', '')[:80]}")
print(f" Diag retenu : {syn.get('diagnostic_retenu', '')[:80]}")
else:
print(f" SynthesePMSI : (non disponible)")
# Pool top 10
print(f" Pool ({r['pool_size']} candidats) :")
for j, c in enumerate(r.get("pool_top10", [])[:10]):
print(f" [{j}] {c['terme'][:55]:55s} ({c['section']}, {c['score']:.2f})")
# Résultat LLM
print(f" >>> DP choisi : {r['dp_label'][:70] or '(aucun)'}")
print(f" Source : {r['dp_source']}")
print(f" Verdict : {r['verdict']}")
print(f" Reason : {(r['winner_reason'] or '')[:100]}")
# Affichage des DP Candidate Pools si activé
if args.dp_candidates and dp_pools:
print(f"\n{'='*60}")
print(f"DP CANDIDATE POOL — {len(dp_pools)} dossiers")
print(f"{'='*60}")
pool_sizes = [p["pool_size"] for p in dp_pools]
print(f"\n Taille pool : min={min(pool_sizes)}, max={max(pool_sizes)}, "
f"moy={sum(pool_sizes)/len(pool_sizes):.1f}")
for p in dp_pools:
print(f"\n --- {p['crh']} (gold={p['gold_code']}, new={p['new_code']}) "
f"{p['pool_size']} candidats ---")
for i, c in enumerate(p["candidates"][:10], 1):
print(f" [{i:2d}] {c['terme'][:60]:60s} "
f"({c['section']}, score={c['score']:.2f})")
if c["preuve"]:
print(f" preuve: {c['preuve'][:100]}")
# JSON exportable
summary = {
"mode": mode,
"total": total,
"evaluable": evaluable,
"gold_none": gold_none,
"exact_match": exact,
"exact_match_pct": round(pct(exact, evaluable), 1),
"family4": fam4,
"family4_pct": round(pct(fam4, evaluable), 1),
"family3": fam3,
"family3_pct": round(pct(fam3, evaluable), 1),
"coverage_dp": coverage_has_dp,
"coverage_dp_pct": round(pct(coverage_has_dp, total), 1),
"review_count": review_count,
"review_pct": round(pct(review_count, total), 1),
"confirmed_count": confirmed_count,
"comorbidity_fallback_count": comorbidity_fallback_count,
"comorbidity_fallback_pct": round(pct(comorbidity_fallback_count, total), 1),
"errors": errors,
}
if args.synthese_pmsi:
summary["syntheses_pmsi"] = syntheses
if args.dp_candidates:
summary["dp_pools"] = dp_pools
if args.use_dp_pool_rank:
summary["pool_rank_results"] = pool_rank_results
suffix = "_llm" if args.use_llm else ""
if args.use_dp_pool_rank:
suffix = "_pool_rank"
if args.synthese_pmsi:
suffix += "_synthese"
if args.dp_candidates:
suffix += "_pool"
out_path = BASE / "output" / f"benchmark_dp_quality{suffix}.json"
out_path.write_text(json.dumps(summary, ensure_ascii=False, indent=2), encoding="utf-8")
print(f"\nRésultats exportés : {out_path}")
if __name__ == "__main__":
main()

View File

@@ -169,6 +169,14 @@ class SynthesePMSI(BaseModel):
preuves: list[PreuveSynthese] = Field(default_factory=list)
class DPPoolCandidate(BaseModel):
"""Candidat du pool DP élargi (terme libre, pas forcément codé CIM-10)."""
terme: str # Texte du diagnostic candidat
section: str # Section source (conclusion, motif_hospitalisation, edsnlp, acte, etc.)
preuve: str = "" # Extrait du texte source (~200 chars)
score_initial: float = 0.0 # Score 0-1 (poids section + indicateurs)
class DPCandidate(BaseModel):
code: Optional[str] = None
label: str
@@ -247,6 +255,7 @@ class DossierMedical(BaseModel):
sejour: Sejour = Field(default_factory=Sejour)
diagnostic_principal: Optional[Diagnostic] = None
dp_selection: Optional[DPSelection] = None
synthese_pmsi: Optional[SynthesePMSI] = None
diagnostics_associes: list[Diagnostic] = Field(default_factory=list)
actes_ccam: list[ActeCCAM] = Field(default_factory=list)
antecedents: list[Antecedent] = Field(default_factory=list)

View File

@@ -17,6 +17,7 @@ from typing import Optional
from ..config import (
DossierMedical,
DPCandidate,
DPPoolCandidate,
DPSelection,
DP_REVIEW_THRESHOLD,
DP_SCORING_WEIGHTS,
@@ -893,7 +894,562 @@ def llm_dp_fallback(
# ---------------------------------------------------------------------------
# 6. Synthèse PMSI — raisonnement clinique structuré avant codage DP
# 5b. DP Pool Rank — sélection du DP parmi un pool de candidats via LLM
# ---------------------------------------------------------------------------
def _format_pool_for_prompt(candidates: list[DPPoolCandidate], max_items: int = 30) -> str:
"""Formate la liste de candidats pour le prompt LLM de ranking."""
lines = []
for i, c in enumerate(candidates[:max_items]):
preuve_short = c.preuve[:120].replace("\n", " ").strip() if c.preuve else ""
lines.append(
f" [{i}] terme=\"{c.terme}\" | section={c.section} "
f"| preuve=\"{preuve_short}\" | score={c.score_initial:.2f}"
)
return "\n".join(lines)
def _build_clinical_context(
parsed: dict,
dossier: DossierMedical,
text: str,
synthese: SynthesePMSI | None = None,
) -> str:
"""Construit le contexte clinique pour le prompt de ranking.
Priorité :
1. SynthesePMSI structurée (si disponible)
2. Fallback : motif + sections fortes + actes
"""
if synthese:
parts = []
if synthese.motif_admission:
parts.append(f"Motif d'admission : {synthese.motif_admission}")
if synthese.probleme_pris_en_charge:
parts.append(f"Problème pris en charge : {synthese.probleme_pris_en_charge}")
if synthese.diagnostic_retenu:
parts.append(f"Diagnostic retenu : {synthese.diagnostic_retenu}")
if synthese.actes_ou_traitements_majeurs:
parts.append(f"Actes : {', '.join(synthese.actes_ou_traitements_majeurs)}")
if synthese.complications:
parts.append(f"Complications : {', '.join(synthese.complications)}")
if synthese.terrain_comorbidites:
parts.append(f"Terrain : {', '.join(synthese.terrain_comorbidites)}")
if synthese.preuves:
for p in synthese.preuves[:3]:
parts.append(f"Preuve [{p.section}] : {p.excerpt[:150]}")
return "\n".join(parts)
# Fallback : sections fortes
motif = _build_motif(parsed, dossier, full_text=text)
sections_fortes = _build_strong_sections_text(parsed)
actes = _build_actes(dossier)
return (
f"Motif d'hospitalisation : {motif}\n"
f"Sections cliniques :\n{sections_fortes}\n"
f"Actes : {actes}"
)
def llm_dp_pool_rank(
parsed: dict,
text: str,
dossier: DossierMedical,
pool_candidates: list[DPPoolCandidate],
synthese: SynthesePMSI | None = None,
fallback_oneshot: bool = True,
dp_candidates: list[DPCandidate] | None = None,
edsnlp_result=None,
) -> DPSelection:
"""Sélectionne le DP en demandant au LLM de choisir parmi le pool de candidats.
Le LLM reçoit la SynthesePMSI (ou sections fortes en fallback) + la liste
des candidats du pool. Il doit choisir un index et recopier exactement le terme.
Args:
parsed: CRH parsé
text: texte brut du CRH
dossier: dossier médical en cours
pool_candidates: candidats issus de build_dp_candidate_pool()
synthese: SynthesePMSI si disponible (prioritaire pour le contexte)
fallback_oneshot: si True, fallback vers llm_dp_fallback quand chosen_index=-1
dp_candidates: candidats DP existants (pour le fallback oneshot)
edsnlp_result: résultat edsnlp (pour le fallback oneshot)
Returns:
DPSelection avec verdict confirmed ou review.
"""
if not pool_candidates:
logger.info("DP pool rank : aucun candidat dans le pool")
if fallback_oneshot:
return llm_dp_fallback(parsed, text, dossier, dp_candidates, edsnlp_result)
return DPSelection(verdict="review", winner_reason="pool vide, pas de fallback")
try:
from .ollama_client import call_ollama
from ..prompts import DP_POOL_RANK
except ImportError:
logger.warning("Module ollama_client non disponible pour le DP pool rank")
return DPSelection(verdict="review", winner_reason="LLM non disponible")
# Construire le contexte et la liste de candidats
contexte = _build_clinical_context(parsed, dossier, text, synthese)
candidates_text = _format_pool_for_prompt(pool_candidates)
prompt = DP_POOL_RANK.format(
contexte_clinique=contexte,
candidates_list=candidates_text,
)
try:
result = call_ollama(prompt, temperature=0.0, max_tokens=600, role="coding")
except Exception:
logger.warning("Erreur LLM DP pool rank", exc_info=True)
if fallback_oneshot:
return llm_dp_fallback(parsed, text, dossier, dp_candidates, edsnlp_result)
return DPSelection(verdict="review", winner_reason="erreur LLM pool rank")
if not result or not isinstance(result, dict):
logger.warning("Réponse LLM pool rank invalide : %s", type(result))
if fallback_oneshot:
return llm_dp_fallback(parsed, text, dossier, dp_candidates, edsnlp_result)
return DPSelection(verdict="review", winner_reason="réponse LLM pool rank invalide")
# Parser la réponse
chosen_index = result.get("chosen_index", -1)
chosen_terme = result.get("chosen_terme", "")
evidence_section_raw = result.get("evidence_section", "")
evidence_excerpt = result.get("evidence_excerpt", "")
confidence = result.get("confidence", "low")
reason = result.get("reason", "")
# Normaliser l'index
if not isinstance(chosen_index, int):
try:
chosen_index = int(chosen_index)
except (ValueError, TypeError):
chosen_index = -1
logger.info(
"LLM pool rank: index=%d terme='%s' section=%s confidence=%s reason='%s'",
chosen_index, (chosen_terme or "")[:60], evidence_section_raw, confidence, (reason or "")[:80],
)
# chosen_index == -1 → aucun candidat retenu
if chosen_index < 0 or chosen_index >= len(pool_candidates):
logger.info("LLM pool rank : chosen_index=%d hors plage (0-%d), fallback",
chosen_index, len(pool_candidates) - 1)
if fallback_oneshot:
return llm_dp_fallback(parsed, text, dossier, dp_candidates, edsnlp_result)
return DPSelection(
verdict="review",
winner_reason=f"LLM pool rank: aucun candidat retenu (index={chosen_index})",
)
# Candidat sélectionné
chosen = pool_candidates[chosen_index]
# Vérifier cohérence du terme (le LLM doit recopier exactement)
if chosen_terme and normalize_text(chosen_terme) != normalize_text(chosen.terme):
logger.warning(
"LLM pool rank : terme recopié '%s' ≠ candidat '%s' (index %d)",
chosen_terme[:60], chosen.terme[:60], chosen_index,
)
# On fait confiance à l'index, pas au terme recopié
# Normaliser la section
evidence_section = _normalize_evidence_section(evidence_section_raw)
if not evidence_section:
evidence_section = chosen.section
# Utiliser la preuve du candidat si le LLM n'en fournit pas
if not evidence_excerpt:
evidence_excerpt = chosen.preuve
source_tag = f"llm_pool_rank ({evidence_section})"
# Le pool ne contient pas de codes CIM-10 → on ne peut pas valider/normaliser ici.
# On crée un candidat DPCandidate sans code, qui sera codé en aval par CODING_CIM10.
candidate = DPCandidate(
code=None, # sera codé CIM-10 plus tard si nécessaire
label=chosen.terme,
source_section=source_tag,
source_excerpt=evidence_excerpt,
confidence_raw=confidence,
)
# Score synthétique
confidence_scores = {"high": 3, "medium": 2, "low": 1}
candidate.score = confidence_scores.get(confidence, 1)
candidate.score_details = {
"llm_confidence": candidate.score,
"pool_score": round(chosen.score_initial * 10),
"pool_index": chosen_index,
}
# Garde-fous simplifiés (pas de code → pas de GF-2 comorbidité)
has_evidence = bool(evidence_excerpt and evidence_excerpt.strip())
# GF-1 : evidence vide → REVIEW
if not has_evidence:
logger.info("LLM pool rank : pas de preuve pour '%s', REVIEW", chosen.terme[:60])
return DPSelection(
verdict="review", candidates=[candidate],
winner_reason=f"LLM pool rank: evidence vide pour '{chosen.terme[:40]}'",
)
# GF-3 : CONFIRMED uniquement si confidence high
if confidence != "high":
return DPSelection(
verdict="review", candidates=[candidate],
winner_reason=f"LLM pool rank: '{chosen.terme[:40]}' — confidence {confidence}",
)
return DPSelection(
verdict="confirmed", candidates=[candidate],
winner_reason=f"LLM pool rank: '{chosen.terme[:40]}' (confidence={confidence}, reason={reason[:60]})",
)
# ---------------------------------------------------------------------------
# 6. DP Candidate Pool — extraction élargie de termes diagnostiques
# ---------------------------------------------------------------------------
# Phrases indicatives : signalent le diagnostic principal dans le texte.
# Capturer le texte APRÈS le marqueur, jusqu'à un point/newline/fin de phrase.
_INDICATIVE_PHRASES_RE = re.compile(
r"(?:"
r"diagnostic(?:\s+(?:principal|retenu|de\s+sortie|final))?\s*(?::|retenu\s*:)"
r"|diagnostics?\s+retenus?\s*:"
r"|au\s+total\s*[:,]"
r"|en\s+(?:résumé|resume|synthèse|synthese)\s*[:,]"
r"|hospitalis[ée]e?\s+pour\b"
r"|admise?\s+pour\b"
r"|adress[ée]e?\s+pour\b"
r"|prise?\s+en\s+charge\s+(?:pour|d[e'u])\b"
r"|motif\s+(?:d[e']?\s*)?(?:hospitalisation|admission|consultation)\s*:"
r")"
r"\s*(.{5,200}?)(?:[.\n]|$)",
re.IGNORECASE,
)
# Valeurs biologiques à exclure (termes isolés ou patterns numériques)
_BIO_EXCLUSION_RE = re.compile(
r"(?:"
r"\b(?:hb|hémoglobine|créatinine|crp|leucocytes|plaquettes|glycémie|"
r"natrémie|kaliémie|calcémie|bilirubine|albumine|fibrinogène|"
r"hématocrite|inr|tp|tca|pct|bnp|nt-?probnp|troponine|lactates?|"
r"ferritine|transferrine|vitamine|acide\s+urique|phosphatases?|"
r"transaminases?|gamma-?gt|ldh|cpk|lipase|amylase)\b"
r".*\d" # suivi d'une valeur numérique
r"|\d+[.,]?\d*\s*(?:g/[dl]|mg/[dl]|mmol/[l]|µmol/[l]|ui/[l]|%|ml/min)"
r")",
re.IGNORECASE,
)
# Symptômes isolés trop vagues pour être candidats DP (sans contexte)
_VAGUE_SYMPTOMS = frozenset({
"douleur", "fièvre", "toux", "fatigue", "asthénie",
"nausées", "vomissements", "céphalées", "malaise",
"vertiges", "dyspnée", "essoufflement",
})
# Fragments administratifs/structurels à exclure du pool
_ADMIN_NOISE_RE = re.compile(
r"(?:"
# Titres et fonctions hospitalières
r"praticiens?\s+hospitaliers?"
r"|assistant\s+sp[ée]cialiste"
r"|chef\s+de\s+(?:clinique|p[oô]le|service)"
r"|ancien\s+chef"
r"|cadre\s+(?:sant[ée]|infirmier|de\s+p[oô]le)"
r"|infirmi[eè]res?"
r"|secr[ée]tariat"
r"|assistantes?\s+sociales?"
r"|psychologues?"
r"|psychomotricienne"
r"|orthophonistes?"
r"|m[ée]decin\s+coordonnateur"
r"|m[ée]decin\s+du\s+sport"
r"|consultation\s+externes?"
r"|attach[ée]s?"
r"|cadres?\s+infirmiers?"
# Diplômes / formations
r"|desc\s+\w+"
r"|diu\s+\w+"
# Identifiants et en-têtes
r"|n°\s*(?:finess|rpps)"
r"|centre\s+(?:hospitalier|de\s+comp[ée]tences)"
r"|imprim[ée]\s+le"
r"|page\(?s?\)?\s*:"
r"|bien\s+confraternellement"
r"|les\s+consignes\s+d['']\s*usage"
r"|information\s+patient"
r"|h[oô]pitaux?\s+de"
r"|h[oô]pital\s+de\s+"
r"|v\d+\s*-\s*imprim"
r"|[a-z0-9_.+-]+@[a-z0-9-]+\.[a-z]"
r"|bp\s+\d+.*cedex"
r"|avenue|boulevard|rue\s"
# Services et pôles
r"|p[oô]le\s+(?:sp[ée]cialit|femme|m[eè]re|enfant|m[ée]dical)"
r"|service\s+d[eu]s?\s+"
r"|explorations?\s+fonct"
r"|oncologue\s+digestif"
r"|proctologue"
r"|h[ée]pato-gastro"
r"|m[ée]decine\s+interne"
r"|immunologie\s+clinique"
r"|dermatologie$"
r"|rhumato-immunologie"
r"|n[ée]onatologie"
r"|p[ée]diatrie\s+de\s+"
r"|urgences\s+p[ée]diatriques"
r"|reproduction$"
r"|maladies\s+auto"
r"|auto-inflammatoires"
r"|syst[ée]miques\s+rares"
r"|allergiques$"
r"|m[ée]taboliques$"
r"|digestives?$"
r"|__{3,}"
# Posologie / médicaments avec dosage
r"|\d+\s*mg\s*(?:\(|,|\s)"
r"|\b(?:orale|sous-cutan[ée]e|intraveineuse)\b.*\b(?:matin|midi|soir|jour)\b"
r"|cpr\s+\d|cprdis|comprim[ée]"
r"|\bmatin\s+midi\s+soi"
# Rendez-vous et logistique
r"|prochains?\s+rdv"
r"|hdj\s+protocolaire"
r"|pose\s+de\s+picc"
r"|bs\s+les\s+"
r"|prise\s+de\s+rendez"
# Examen clinique (observations, pas diagnostics)
r"|murmure\s+v[ée]siculaire"
r"|pouls\s+p[ée]riph[ée]riques"
r"|abdomen\s+souple"
r"|sans\s+bruits?\s+surajout"
r"|bha\s+per[çc]us"
r"|sans\s+tj\s+ni\s+rhj"
r"|examen\s+au\s+monofilament"
r"|rp\s+de\s+contr[oô]le"
# Poids et mesures
r"|poids\s+de\s+sortie"
# Allergie sans valeur diagnostique
r"|allergie\s*:\s*\d"
# Biologie / valeurs labo (en-têtes)
r"|biologie\s+d['']\s*entr[ée]e"
# Histoire / contexte (en-têtes)
r"|histoire\s+de\s+la\s+maladie"
r"|mode\s+de\s+vie"
# Noms de services (patterns additionnels)
r"|endocrinologie"
r"|diab[ée]tologie"
r"|nutrition$"
r"|f[ée]d[ée]ration$"
r")",
re.IGNORECASE,
)
# Fragments trop courts ou trop génériques (< 3 mots significatifs)
_PLACE_NOISE_RE = re.compile(
r"^(?:de\s+)?(?:bordeaux|toulouse|lille|paris|lyon|marseille|angers|tours"
r"|bayonne|montpellier|nantes|rennes|strasbourg|nancy)(?:\s+et\s+\w+)?$",
re.IGNORECASE,
)
# Poids de section pour le score_initial (0-1)
_POOL_SECTION_WEIGHTS: dict[str, float] = {
"diag_sortie": 1.0,
"diagnostics_retenus": 1.0,
"diag_principal": 1.0,
"indicative_phrase": 0.9,
"motif_hospitalisation": 0.8,
"conclusion": 0.7,
"synthese": 0.7,
"acte": 0.5,
"edsnlp": 0.4,
"cim10_map": 0.6,
"histoire_maladie": 0.3,
"evolution": 0.3,
}
_MAX_POOL_SIZE = 30
def build_dp_candidate_pool(
parsed: dict,
text: str,
edsnlp_result,
dossier: DossierMedical,
) -> list[DPPoolCandidate]:
"""Construit un pool élargi de candidats DP (termes libres, pas de codage).
Sources :
1. Phrases indicatives dans le texte complet
2. Diagnostics des sections fortes (phrases médicales significatives)
3. Entités edsnlp NER (non-niées)
4. Actes/traitements majeurs comme indices
5. CIM10_MAP matches dans les sections fortes
Dédup par terme normalisé, scoring 0-1, cap à 30 candidats.
"""
from .das_filter import is_valid_diagnostic_text, clean_diagnostic_text
from .cim10_extractor import CIM10_MAP
raw: list[DPPoolCandidate] = []
# --- 1. Phrases indicatives (texte complet) ---
for m in _INDICATIVE_PHRASES_RE.finditer(text):
phrase = m.group(1).strip().rstrip(",.;:!")
phrase = clean_diagnostic_text(phrase)
if _is_pool_excluded(phrase):
continue
if not is_valid_diagnostic_text(phrase):
continue
raw.append(DPPoolCandidate(
terme=phrase,
section="indicative_phrase",
preuve=_extract_excerpt(text, m.start()),
score_initial=_POOL_SECTION_WEIGHTS["indicative_phrase"],
))
# --- 2. Sections fortes : phrases médicales ---
sections = parsed.get("sections", {})
for section_key in ("diag_sortie", "diagnostics_retenus", "diag_principal",
"motif_hospitalisation", "conclusion", "synthese"):
section_text = sections.get(section_key, "")
if not section_text:
continue
weight = _POOL_SECTION_WEIGHTS.get(section_key, 0.3)
# Extraire les phrases/segments significatifs (séparés par ponctuation forte)
fragments = re.split(r"[.\n;]+", section_text)
for frag in fragments:
frag = clean_diagnostic_text(frag.strip())
if len(frag) < 5 or len(frag) > 200:
continue
if _is_pool_excluded(frag):
continue
if not is_valid_diagnostic_text(frag):
continue
raw.append(DPPoolCandidate(
terme=frag,
section=section_key,
preuve=section_text[:200].strip(),
score_initial=weight,
))
# --- 3. edsnlp NER entities ---
if edsnlp_result:
for ent in edsnlp_result.cim10_entities:
if ent.negation or ent.hypothese:
continue
terme = clean_diagnostic_text(ent.texte.capitalize())
if _is_pool_excluded(terme):
continue
if not is_valid_diagnostic_text(terme):
continue
raw.append(DPPoolCandidate(
terme=terme,
section="edsnlp",
preuve=f"code={ent.code}" if ent.code else "",
score_initial=_POOL_SECTION_WEIGHTS["edsnlp"],
))
# --- 4. Actes/traitements majeurs ---
for acte in dossier.actes_ccam[:10]:
terme = clean_diagnostic_text(acte.texte)
if len(terme) < 5:
continue
raw.append(DPPoolCandidate(
terme=terme,
section="acte",
preuve=f"CCAM={acte.code_ccam_suggestion}" if acte.code_ccam_suggestion else "",
score_initial=_POOL_SECTION_WEIGHTS["acte"],
))
# --- 5. CIM10_MAP matches dans sections fortes ---
for section_key in ("motif_hospitalisation", "conclusion", "synthese",
"diag_sortie", "diagnostics_retenus", "diag_principal"):
section_text = sections.get(section_key, "")
if not section_text:
continue
section_norm = normalize_text(section_text)
for terme_map, code in CIM10_MAP.items():
if normalize_text(terme_map) in section_norm:
raw.append(DPPoolCandidate(
terme=terme_map.capitalize(),
section="cim10_map",
preuve=f"CIM10_MAP→{code} (dans {section_key})",
score_initial=_POOL_SECTION_WEIGHTS["cim10_map"],
))
# --- Dédup par terme normalisé, garder le meilleur score ---
pool = _dedup_pool(raw)
# --- Cap à MAX_POOL_SIZE ---
pool.sort(key=lambda c: -c.score_initial)
return pool[:_MAX_POOL_SIZE]
def _is_pool_excluded(terme: str) -> bool:
"""Vérifie si un terme doit être exclu du pool (bio, bruit admin, trop vague)."""
t_lower = terme.lower().strip()
# Trop court
if len(t_lower) < 5:
return True
# Valeur biologique
if _BIO_EXCLUSION_RE.search(terme):
return True
# Symptôme isolé trop vague (un seul mot)
words = t_lower.split()
if len(words) == 1 and t_lower in _VAGUE_SYMPTOMS:
return True
# Fragment administratif/structurel
if _ADMIN_NOISE_RE.search(terme):
return True
# Nom de ville / lieu seul
if _PLACE_NOISE_RE.match(t_lower):
return True
# Fragments purement numériques ou dates
if re.match(r"^[\d/\-:.\s,]+$", t_lower):
return True
# Fragment trop court ET non-médical
# Exclure : "de Bordeaux", "Kgs", "fédération" etc.
# Garder : "Pneumopathie", "Cholécystite", "Ictère" (diagnostics valides)
if len(words) <= 2 and len(t_lower) < 15:
# Si c'est un mot unique qui passe is_valid_diagnostic_text,
# le garder (il sera filtré plus tard si vague)
if len(words) == 1 and len(t_lower) >= 6:
pass # diagnostic potentiel (pneumopathie, cholécystite, etc.)
else:
return True
return False
def _dedup_pool(candidates: list[DPPoolCandidate]) -> list[DPPoolCandidate]:
"""Déduplique par terme normalisé, garde le score le plus élevé."""
seen: dict[str, DPPoolCandidate] = {}
for c in candidates:
key = normalize_text(c.terme)
if not key:
continue
if key not in seen or c.score_initial > seen[key].score_initial:
seen[key] = c
return list(seen.values())
# ---------------------------------------------------------------------------
# 7. Synthèse PMSI — raisonnement clinique structuré avant codage DP
# ---------------------------------------------------------------------------
# Comorbidités banales : NE DOIVENT PAS être probleme_pris_en_charge

View File

@@ -9,6 +9,7 @@ from .templates import (
CPAM_ARGUMENTATION,
DP_TIEBREAK,
DP_LLM_ONESHOT,
DP_POOL_RANK,
SYNTHESE_PMSI,
CPAM_ADVERSARIAL,
)
@@ -22,6 +23,7 @@ __all__ = [
"CPAM_ARGUMENTATION",
"DP_TIEBREAK",
"DP_LLM_ONESHOT",
"DP_POOL_RANK",
"SYNTHESE_PMSI",
"CPAM_ADVERSARIAL",
]

View File

@@ -372,7 +372,48 @@ Réponds UNIQUEMENT en JSON :
}}"""
# ---------------------------------------------------------------------------
# 8. Synthèse PMSI — raisonnement clinique structuré avant codage DP
# 8. DP Pool Rank — sélection du DP parmi un pool de candidats
# ---------------------------------------------------------------------------
# Rôle : coding | Temperature : 0.0 | Max tokens : 600
# Fichier d'origine : src/medical/dp_scoring.py → llm_dp_pool_rank()
# Variables : contexte_clinique, candidates_list
DP_POOL_RANK = """\
Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI.
Tu dois choisir le Diagnostic Principal (DP) PARMI les candidats ci-dessous.
DÉFINITION DU DP (Guide méthodologique ATIH) :
Le DP est le problème de santé qui a mobilisé l'essentiel des ressources du séjour. C'est la pathologie ACTIVE, TRAITÉE, RETENUE en fin de séjour.
RÈGLES DE SÉLECTION :
1. Choisis le diagnostic correspondant au "problème pris en charge" — la pathologie AIGUË/ACTIVE qui justifie l'hospitalisation, PAS une comorbidité chronique de fond (HTA, diabète équilibré, obésité, dyslipidémie).
2. Ne choisis JAMAIS un candidat purement administratif, logistique ou géographique.
3. En cas de plusieurs diagnostics actifs, préfère :
a) Le diagnostic AIGU traité pendant ce séjour
b) Le diagnostic explicitement "retenu" / "au total" / "diagnostic de sortie"
c) Le diagnostic issu de la section la plus fiable (diag_sortie > conclusion > synthese)
4. Si AUCUN candidat n'est assez solide pour être le DP, retourne chosen_index = -1.
CONTEXTE CLINIQUE :
{contexte_clinique}
CANDIDATS (index, terme, section, preuve, score) :
{candidates_list}
INSTRUCTION CRITIQUE : le champ "chosen_terme" DOIT être la RECOPIE EXACTE du terme candidat (pas de paraphrase, pas de reformulation).
Réponds UNIQUEMENT en JSON :
{{
"chosen_index": 0,
"chosen_terme": "recopie EXACTE du terme du candidat choisi",
"evidence_section": "section du candidat choisi",
"evidence_excerpt": "extrait court du texte prouvant le choix (copié de la preuve ou du contexte)",
"confidence": "high ou medium ou low",
"reason": "1 phrase max justifiant le choix"
}}"""
# ---------------------------------------------------------------------------
# 9. Synthèse PMSI — raisonnement clinique structuré avant codage DP
# ---------------------------------------------------------------------------
# Rôle : coding | Temperature : 0.0 | Max tokens : 1200
# Fichier d'origine : src/medical/dp_scoring.py → generate_synthese_pmsi()

View File

@@ -3,9 +3,11 @@
import pytest
from src.config import (
ActeCCAM,
DossierMedical,
Diagnostic,
DPCandidate,
DPPoolCandidate,
DPSelection,
DP_SCORING_WEIGHTS,
DP_REVIEW_THRESHOLD,
@@ -15,14 +17,20 @@ from src.config import (
)
from src.medical.dp_scoring import (
build_dp_shortlist,
build_dp_candidate_pool,
score_candidates,
select_dp,
generate_synthese_pmsi,
llm_dp_pool_rank,
_format_pool_for_prompt,
_build_clinical_context,
_get_context_window,
_is_z_code_whitelisted,
_is_comorbidity_code,
_has_explicit_pec_proof,
_dedup_by_code,
_dedup_pool,
_is_pool_excluded,
_normalize_evidence_section,
_is_comorbidite_banale,
_has_pec_marker,
@@ -718,6 +726,390 @@ class TestSectionNormalization:
assert _normalize_evidence_section("sections fortes du dossier") == "autres"
class TestSynthesePMSI:
"""Tests pour generate_synthese_pmsi()."""
def test_returns_synthese_on_valid_response(self, monkeypatch):
"""Réponse LLM valide → SynthesePMSI complète."""
mock_response = {
"motif_admission": "Douleur abdominale aiguë",
"probleme_pris_en_charge": "Pancréatite aiguë biliaire",
"diagnostic_retenu": "Pancréatite aiguë d'origine biliaire",
"actes_ou_traitements_majeurs": ["Scanner abdominal", "Mise à jeun"],
"complications": [],
"terrain_comorbidites": ["HTA traitée", "Diabète type 2"],
"preuves": [
{"section": "motif_hospitalisation", "excerpt": "douleur abdominale intense"},
{"section": "conclusion", "excerpt": "pancréatite aiguë biliaire confirmée"},
],
}
def mock_call_ollama(prompt, **kwargs):
return mock_response
import src.medical.dp_scoring as mod
monkeypatch.setattr(mod, "call_ollama", mock_call_ollama, raising=False)
# Forcer l'import inline à utiliser notre mock
import src.medical.ollama_client as oc_mod
monkeypatch.setattr(oc_mod, "call_ollama", mock_call_ollama)
parsed = _make_parsed(sections={"conclusion": "pancréatite aiguë biliaire confirmée"})
dossier = DossierMedical()
result = generate_synthese_pmsi(parsed, "texte complet", dossier)
assert result is not None
assert isinstance(result, SynthesePMSI)
assert result.probleme_pris_en_charge == "Pancréatite aiguë biliaire"
assert result.motif_admission == "Douleur abdominale aiguë"
assert "Scanner abdominal" in result.actes_ou_traitements_majeurs
assert len(result.terrain_comorbidites) == 2
assert result.complications == []
assert len(result.preuves) == 2
assert result.preuves[0].section == "motif_hospitalisation"
def test_returns_none_on_invalid_response(self, monkeypatch):
"""Réponse LLM non-dict → None."""
def mock_call_ollama(prompt, **kwargs):
return "texte brut"
import src.medical.ollama_client as oc_mod
monkeypatch.setattr(oc_mod, "call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
result = generate_synthese_pmsi(parsed, "texte", dossier)
assert result is None
def test_returns_none_on_exception(self, monkeypatch):
"""Exception LLM → None."""
def mock_call_ollama(prompt, **kwargs):
raise ConnectionError("Ollama down")
import src.medical.ollama_client as oc_mod
monkeypatch.setattr(oc_mod, "call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
result = generate_synthese_pmsi(parsed, "texte", dossier)
assert result is None
def test_robust_to_string_lists(self, monkeypatch):
"""Le LLM renvoie des strings au lieu de listes → toléré."""
mock_response = {
"motif_admission": "Fièvre",
"probleme_pris_en_charge": "Pneumopathie",
"diagnostic_retenu": "Pneumopathie bactérienne",
"actes_ou_traitements_majeurs": "Antibiothérapie IV", # string
"complications": "Insuffisance respiratoire", # string
"terrain_comorbidites": "BPCO", # string
"preuves": [],
}
def mock_call_ollama(prompt, **kwargs):
return mock_response
import src.medical.ollama_client as oc_mod
monkeypatch.setattr(oc_mod, "call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
result = generate_synthese_pmsi(parsed, "texte", dossier)
assert result is not None
assert result.actes_ou_traitements_majeurs == ["Antibiothérapie IV"]
assert result.complications == ["Insuffisance respiratoire"]
assert result.terrain_comorbidites == ["BPCO"]
def test_preuves_malformed_skipped(self, monkeypatch):
"""Preuves sans section/excerpt → ignorées."""
mock_response = {
"motif_admission": "Test",
"probleme_pris_en_charge": "Test",
"diagnostic_retenu": "Test",
"preuves": [
{"section": "conclusion", "excerpt": "valide"},
{"section": "", "excerpt": "section vide"},
{"no_section": True},
"pas un dict",
],
}
def mock_call_ollama(prompt, **kwargs):
return mock_response
import src.medical.ollama_client as oc_mod
monkeypatch.setattr(oc_mod, "call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
result = generate_synthese_pmsi(parsed, "texte", dossier)
assert result is not None
assert len(result.preuves) == 1
assert result.preuves[0].section == "conclusion"
def test_serialization_round_trip(self):
"""SynthesePMSI se sérialise/désérialise correctement."""
syn = SynthesePMSI(
motif_admission="Douleur thoracique",
probleme_pris_en_charge="Infarctus du myocarde",
diagnostic_retenu="IDM ST+ antérieur",
actes_ou_traitements_majeurs=["Coronarographie", "Angioplastie"],
complications=["Insuffisance cardiaque"],
terrain_comorbidites=["HTA", "Tabagisme"],
preuves=[PreuveSynthese(section="conclusion", excerpt="IDM confirmé")],
)
data = syn.model_dump()
restored = SynthesePMSI(**data)
assert restored.probleme_pris_en_charge == "Infarctus du myocarde"
assert len(restored.preuves) == 1
assert restored.preuves[0].section == "conclusion"
def test_dossier_medical_field(self):
"""Le champ synthese_pmsi est disponible sur DossierMedical."""
dossier = DossierMedical()
assert dossier.synthese_pmsi is None
dossier.synthese_pmsi = SynthesePMSI(
probleme_pris_en_charge="Test",
)
assert dossier.synthese_pmsi.probleme_pris_en_charge == "Test"
data = dossier.model_dump(exclude_none=True)
assert "synthese_pmsi" in data
# ===========================================================================
# DP Candidate Pool
# ===========================================================================
class TestDPPoolCandidate:
"""Tests du modèle DPPoolCandidate."""
def test_basic_creation(self):
c = DPPoolCandidate(terme="Pancréatite aiguë", section="conclusion")
assert c.terme == "Pancréatite aiguë"
assert c.section == "conclusion"
assert c.score_initial == 0.0
assert c.preuve == ""
def test_serialization(self):
c = DPPoolCandidate(
terme="Cholécystite aiguë",
section="diag_sortie",
preuve="cholécystite aiguë lithiasique",
score_initial=0.9,
)
data = c.model_dump()
restored = DPPoolCandidate(**data)
assert restored.terme == "Cholécystite aiguë"
assert restored.score_initial == 0.9
class TestIsPoolExcluded:
"""Tests du filtrage des candidats pool."""
def test_bio_value_excluded(self):
assert _is_pool_excluded("CRP 180 mg/L") is True
def test_bio_term_with_number_excluded(self):
assert _is_pool_excluded("Hémoglobine 7.2 g/dL") is True
def test_vague_symptom_excluded(self):
assert _is_pool_excluded("douleur") is True
assert _is_pool_excluded("fièvre") is True
def test_vague_symptom_with_context_kept(self):
"""Symptôme qualifié (multi-mots) → conservé."""
assert _is_pool_excluded("douleur abdominale aiguë") is False
def test_medical_diagnosis_kept(self):
assert _is_pool_excluded("Pancréatite aiguë biliaire") is False
def test_numeric_value_excluded(self):
assert _is_pool_excluded("12.5 g/dL") is True
class TestDedupPool:
"""Tests de la déduplication du pool."""
def test_dedup_keeps_highest_score(self):
candidates = [
DPPoolCandidate(terme="Pancréatite aiguë", section="conclusion", score_initial=0.7),
DPPoolCandidate(terme="Pancréatite aiguë", section="diag_sortie", score_initial=1.0),
]
result = _dedup_pool(candidates)
assert len(result) == 1
assert result[0].score_initial == 1.0
assert result[0].section == "diag_sortie"
def test_dedup_normalizes_text(self):
"""Variantes d'accents/espaces → même clé."""
candidates = [
DPPoolCandidate(terme="Pancréatite aiguë", section="a", score_initial=0.5),
DPPoolCandidate(terme="pancreatite aigue", section="b", score_initial=0.8),
]
result = _dedup_pool(candidates)
assert len(result) == 1
def test_distinct_terms_kept(self):
candidates = [
DPPoolCandidate(terme="Pancréatite aiguë", section="a", score_initial=0.7),
DPPoolCandidate(terme="Cholécystite aiguë", section="b", score_initial=0.9),
]
result = _dedup_pool(candidates)
assert len(result) == 2
class TestBuildDPCandidatePool:
"""Tests d'intégration de build_dp_candidate_pool()."""
def test_indicative_phrase_extraction(self):
"""Les phrases indicatives sont extraites du texte."""
text = "Le patient a été hospitalisé pour pancréatite aiguë biliaire. Suivi habituel."
parsed = _make_parsed(sections={"conclusion": "Pancréatite aiguë biliaire confirmée."})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, text, None, dossier)
termes = [c.terme.lower() for c in pool]
assert any("pancréatite" in t or "pancreatite" in t for t in termes)
def test_sections_fortes_extraction(self):
"""Les diagnostics des sections fortes apparaissent dans le pool."""
parsed = _make_parsed(sections={
"diag_sortie": "Cholécystite aiguë lithiasique",
"conclusion": "Évolution favorable après cholécystectomie",
})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte complet", None, dossier)
termes = [c.terme.lower() for c in pool]
assert any("cholécystite" in t or "cholecystite" in t for t in termes)
def test_edsnlp_entities_included(self):
"""Les entités edsnlp non-niées apparaissent dans le pool."""
from dataclasses import dataclass
@dataclass
class MockEntity:
texte: str
code: str
negation: bool = False
hypothese: bool = False
@dataclass
class MockResult:
cim10_entities: list
edsnlp = MockResult(cim10_entities=[
MockEntity(texte="pneumopathie", code="J18.9"),
MockEntity(texte="HTA", code="I10", negation=True), # exclu
])
parsed = _make_parsed()
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte", edsnlp, dossier)
termes = [c.terme.lower() for c in pool]
assert any("pneumopathie" in t for t in termes)
# HTA niée ne doit pas apparaître
assert not any(t == "hta" for t in termes)
def test_actes_included(self):
"""Les actes CCAM du dossier apparaissent comme candidats."""
parsed = _make_parsed()
dossier = DossierMedical()
dossier.actes_ccam = [
ActeCCAM(texte="Cholécystectomie", code_ccam_suggestion="HMFC004"),
]
pool = build_dp_candidate_pool(parsed, "texte", None, dossier)
termes = [c.terme.lower() for c in pool]
assert any("cholécystectomie" in t or "cholecystectomie" in t for t in termes)
def test_cim10_map_matches(self):
"""Les termes CIM10_MAP matchés dans les sections fortes sont inclus."""
parsed = _make_parsed(sections={
"conclusion": "Patient avec pancréatite aiguë biliaire sévère.",
})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte", None, dossier)
sections = [c.section for c in pool]
assert "cim10_map" in sections
def test_bio_values_excluded(self):
"""Les valeurs biologiques ne polluent pas le pool."""
parsed = _make_parsed(sections={
"conclusion": "CRP 180 mg/L. Hémoglobine 7.2 g/dL. Pancréatite aiguë.",
})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte", None, dossier)
termes = [c.terme.lower() for c in pool]
assert not any("crp" in t and "mg" in t for t in termes)
def test_dedup_across_sources(self):
"""Un même terme de 2 sources → 1 seule entrée (meilleur score)."""
parsed = _make_parsed(sections={
"conclusion": "Pancréatite aiguë biliaire confirmée.",
"motif_hospitalisation": "Pancréatite aiguë biliaire.",
})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte", None, dossier)
# Compter les variantes "pancréatite aiguë biliaire"
from src.medical.cim10_dict import normalize_text
keys = [normalize_text(c.terme) for c in pool]
pancreatite_keys = [k for k in keys if "pancreatite" in k and "biliaire" in k]
# Après dedup, devrait être au plus 1-2 (phrase complète vs segment)
assert len(pancreatite_keys) <= 2
def test_cap_at_30(self):
"""Le pool est plafonné à 30 candidats."""
# Créer un texte avec beaucoup de diagnostics
diagnostics = [f"diagnostic numéro {i}" for i in range(50)]
section_text = ". ".join(diagnostics) + "."
parsed = _make_parsed(sections={"conclusion": section_text})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, section_text, None, dossier)
assert len(pool) <= 30
def test_empty_input(self):
"""Entrée vide → pool vide."""
parsed = _make_parsed()
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "", None, dossier)
assert isinstance(pool, list)
def test_score_ordering(self):
"""Le pool est trié par score_initial décroissant."""
parsed = _make_parsed(sections={
"diag_sortie": "Cholécystite aiguë",
"conclusion": "Angiocholite associée",
})
dossier = DossierMedical()
pool = build_dp_candidate_pool(parsed, "texte", None, dossier)
if len(pool) >= 2:
scores = [c.score_initial for c in pool]
assert scores == sorted(scores, reverse=True)
# ===========================================================================
# Anti-comorbidité SynthesePMSI
# ===========================================================================
@@ -943,3 +1335,252 @@ class TestBuildMotifFallback:
parsed = _make_parsed()
dossier = DossierMedical()
assert _build_motif(parsed, dossier) == "Non renseigné"
# ===================================================================
# Tests DP Pool Rank
# ===================================================================
class TestFormatPoolForPrompt:
"""Tests pour _format_pool_for_prompt()."""
def test_basic_formatting(self):
"""Vérifie le format des candidats pour le prompt."""
pool = [
DPPoolCandidate(terme="Pneumopathie", section="conclusion",
preuve="Au total : pneumopathie", score_initial=0.7),
DPPoolCandidate(terme="Embolie pulmonaire", section="diag_sortie",
preuve="Diagnostic de sortie", score_initial=1.0),
]
text = _format_pool_for_prompt(pool)
assert "[0]" in text
assert "[1]" in text
assert "Pneumopathie" in text
assert "Embolie pulmonaire" in text
assert "conclusion" in text
assert "diag_sortie" in text
def test_max_items_cap(self):
"""Vérifie que max_items est respecté."""
pool = [
DPPoolCandidate(terme=f"Diag_{i}", section="conclusion", score_initial=0.5)
for i in range(10)
]
text = _format_pool_for_prompt(pool, max_items=3)
assert "[0]" in text
assert "[2]" in text
assert "[3]" not in text
def test_empty_pool(self):
"""Pool vide → texte vide."""
assert _format_pool_for_prompt([]) == ""
class TestBuildClinicalContext:
"""Tests pour _build_clinical_context()."""
def test_with_synthese(self):
"""Avec SynthesePMSI disponible."""
synthese = SynthesePMSI(
motif_admission="Douleur thoracique",
probleme_pris_en_charge="Embolie pulmonaire",
diagnostic_retenu="Embolie pulmonaire bilatérale",
)
parsed = _make_parsed()
dossier = DossierMedical()
ctx = _build_clinical_context(parsed, dossier, "", synthese)
assert "Embolie pulmonaire" in ctx
assert "Douleur thoracique" in ctx
def test_without_synthese(self):
"""Sans SynthesePMSI → fallback sections fortes."""
parsed = _make_parsed(sections={"conclusion": "Pneumopathie traitée"})
dossier = DossierMedical()
ctx = _build_clinical_context(parsed, dossier, "texte complet", None)
assert "Pneumopathie traitée" in ctx
assert "Motif" in ctx
class TestLlmDpPoolRank:
"""Tests unitaires pour llm_dp_pool_rank() — sans appel LLM réel."""
def test_empty_pool_fallback_off(self):
"""Pool vide + fallback OFF → REVIEW."""
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=[],
fallback_oneshot=False,
)
assert selection.verdict == "review"
assert "pool vide" in selection.winner_reason
def test_empty_pool_fallback_on(self, monkeypatch):
"""Pool vide + fallback ON → tente llm_dp_fallback."""
# Mock llm_dp_fallback pour retourner un résultat connu
from src.medical import dp_scoring
mock_selection = DPSelection(
verdict="review",
winner_reason="fallback activé",
)
monkeypatch.setattr(dp_scoring, "llm_dp_fallback", lambda *a, **kw: mock_selection)
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=[],
fallback_oneshot=True,
)
assert selection.verdict == "review"
assert "fallback" in selection.winner_reason
def test_valid_llm_response_high_confidence(self, monkeypatch):
"""Réponse LLM valide avec confidence high → CONFIRMED."""
pool = [
DPPoolCandidate(terme="Embolie pulmonaire", section="conclusion",
preuve="Au total : embolie pulmonaire", score_initial=0.7),
DPPoolCandidate(terme="HTA", section="conclusion",
preuve="terrain HTA", score_initial=0.3),
]
# Mock call_ollama
def mock_call_ollama(prompt, **kwargs):
return {
"chosen_index": 0,
"chosen_terme": "Embolie pulmonaire",
"evidence_section": "conclusion",
"evidence_excerpt": "Au total : embolie pulmonaire",
"confidence": "high",
"reason": "pathologie aiguë traitée",
}
from src.medical import dp_scoring
monkeypatch.setattr("src.medical.ollama_client.call_ollama", mock_call_ollama)
parsed = _make_parsed(sections={"conclusion": "Au total : embolie pulmonaire"})
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=pool,
fallback_oneshot=False,
)
assert selection.verdict == "confirmed"
assert len(selection.candidates) == 1
assert selection.candidates[0].label == "Embolie pulmonaire"
assert selection.candidates[0].source_section == "llm_pool_rank (conclusion)"
assert selection.candidates[0].code is None # pas de code CIM-10, sera codé en aval
def test_valid_llm_response_medium_confidence(self, monkeypatch):
"""Réponse LLM avec confidence medium → REVIEW."""
pool = [
DPPoolCandidate(terme="Insuffisance cardiaque", section="conclusion",
preuve="insuffisance cardiaque", score_initial=0.7),
]
def mock_call_ollama(prompt, **kwargs):
return {
"chosen_index": 0,
"chosen_terme": "Insuffisance cardiaque",
"evidence_section": "conclusion",
"evidence_excerpt": "insuffisance cardiaque globale",
"confidence": "medium",
"reason": "diagnostic probable",
}
monkeypatch.setattr("src.medical.ollama_client.call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=pool,
fallback_oneshot=False,
)
assert selection.verdict == "review"
assert "confidence medium" in selection.winner_reason
def test_chosen_index_minus_one_fallback_off(self, monkeypatch):
"""chosen_index=-1 + fallback OFF → REVIEW."""
pool = [
DPPoolCandidate(terme="HTA", section="conclusion",
preuve="HTA", score_initial=0.3),
]
def mock_call_ollama(prompt, **kwargs):
return {
"chosen_index": -1,
"chosen_terme": "",
"confidence": "low",
"reason": "aucun candidat solide",
}
monkeypatch.setattr("src.medical.ollama_client.call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=pool,
fallback_oneshot=False,
)
assert selection.verdict == "review"
assert "aucun candidat retenu" in selection.winner_reason
def test_index_out_of_range_fallback_off(self, monkeypatch):
"""Index hors plage → REVIEW."""
pool = [
DPPoolCandidate(terme="Pneumopathie", section="conclusion",
preuve="...", score_initial=0.7),
]
def mock_call_ollama(prompt, **kwargs):
return {
"chosen_index": 5,
"chosen_terme": "Fantôme",
"confidence": "high",
}
monkeypatch.setattr("src.medical.ollama_client.call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=pool,
fallback_oneshot=False,
)
assert selection.verdict == "review"
def test_score_details_contain_pool_info(self, monkeypatch):
"""Les score_details du candidat contiennent les infos pool."""
pool = [
DPPoolCandidate(terme="Cholécystite aiguë", section="diag_sortie",
preuve="cholécystite aiguë lithiasique", score_initial=0.9),
]
def mock_call_ollama(prompt, **kwargs):
return {
"chosen_index": 0,
"chosen_terme": "Cholécystite aiguë",
"evidence_section": "diag_sortie",
"evidence_excerpt": "cholécystite aiguë lithiasique",
"confidence": "high",
"reason": "diagnostic chirurgical aigu",
}
monkeypatch.setattr("src.medical.ollama_client.call_ollama", mock_call_ollama)
parsed = _make_parsed()
dossier = DossierMedical()
selection = llm_dp_pool_rank(
parsed, "texte", dossier,
pool_candidates=pool,
fallback_oneshot=False,
)
assert selection.verdict == "confirmed"
details = selection.candidates[0].score_details
assert "pool_score" in details
assert "pool_index" in details
assert details["pool_index"] == 0