diff --git a/scripts/benchmark_nuke3_compare.py b/scripts/benchmark_nuke3_compare.py index 18fb853..dc3509a 100644 --- a/scripts/benchmark_nuke3_compare.py +++ b/scripts/benchmark_nuke3_compare.py @@ -560,21 +560,37 @@ def _rebuild_and_select(data: dict) -> dict: } selection = select_dp(dossier, synthese, config={"llm_enabled": False}) + dossier.dp_selection = selection + + # Finalizer DP (arbitrage Trackare vs CRH, traçabilité) + try: + from src.medical.dp_finalizer import finalize_dp + finalize_dp(dossier) + except Exception: + pass + + # Utiliser dp_final si disponible, sinon dp_selection + final = dossier.dp_final or selection # Convertir en dict compatible analyze_dp_selection - cands = [c.model_dump() for c in selection.candidates] - return { + cands = [c.model_dump() for c in final.candidates] + result = { "dp_selection": { - "verdict": selection.verdict, - "confidence": selection.confidence, - "chosen_code": selection.chosen_code, - "chosen_term": selection.chosen_term, + "verdict": final.verdict, + "confidence": final.confidence, + "chosen_code": final.chosen_code, + "chosen_term": final.chosen_term, "candidates": cands, - "evidence": selection.evidence, - "reason": selection.reason, - "debug_scores": selection.debug_scores, + "evidence": final.evidence, + "reason": final.reason, + "debug_scores": final.debug_scores, } } + if dossier.dp_final: + result["dp_final"] = dossier.dp_final.model_dump(exclude_none=True) + if dossier.quality_flags: + result["quality_flags"] = dossier.quality_flags + return result def _run_debug_reports( diff --git a/src/config.py b/src/config.py index 48dc712..711ec24 100644 --- a/src/config.py +++ b/src/config.py @@ -68,6 +68,52 @@ def get_model(role: str) -> str: return OLLAMA_MODELS.get(role, OLLAMA_MODEL) +# --- Flag LLM pour le sélecteur DP (NUKE-3) --- +# Nom canonique : T2A_DP_RANKER_LLM (0/1) +# Ancien nom accepté (compat) : DP_RANKER_LLM_ENABLED +DP_RANKER_LLM_ENABLED = os.environ.get("T2A_DP_RANKER_LLM", "1").lower() in ("1", "true", "yes") + + +def get_dp_ranker_llm_enabled() -> bool: + """Retourne l'état du flag LLM pour NUKE-3 (lecture fraîche de l'env). + + Nom canonique : T2A_DP_RANKER_LLM (0/1/true/false/yes/no). + Accepte aussi l'ancien nom DP_RANKER_LLM_ENABLED avec warning. + """ + canonical = os.environ.get("T2A_DP_RANKER_LLM") + legacy = os.environ.get("DP_RANKER_LLM_ENABLED") + + if canonical is not None: + return canonical.lower() in ("1", "true", "yes") + + if legacy is not None: + import logging as _logging + _logging.getLogger(__name__).warning( + "Env var DP_RANKER_LLM_ENABLED est dépréciée — utiliser T2A_DP_RANKER_LLM" + ) + return legacy.lower() in ("1", "true", "yes") + + # Défaut : activé + return True + + +def check_adversarial_model_config() -> tuple[bool, str]: + """LOGIC-3 — Vérifie si les modèles CPAM et validation sont identiques. + + Returns: + (same_model, warning_message) + """ + cpam = OLLAMA_MODELS.get("cpam", "") + validation = OLLAMA_MODELS.get("validation", "") + if cpam and validation and cpam == validation: + msg = ( + f"Modèles CPAM et validation identiques ({cpam}) " + "— validation adversariale dégradée" + ) + return True, msg + return False, "" + + # --- Configuration RUM / établissement --- FINESS = os.environ.get("T2A_FINESS", "000000000") @@ -553,6 +599,37 @@ class CodeDecision(BaseModel): applied_rules: list[str] = Field(default_factory=list) +class DPCandidate(BaseModel): + """Candidat DP pour la sélection NUKE-3.""" + + index: int + term: str + code: Optional[str] = None + confidence: Optional[str] = None + source: Optional[str] = None + is_comorbidity_like: bool = False + is_symptom_like: bool = False + is_act_only: bool = False + section_strength: int = 0 + num_occurrences: int = 1 + score: float = 0.0 + score_details: dict = Field(default_factory=dict) + + +class DPSelection(BaseModel): + """Résultat de la sélection NUKE-3 du DP.""" + + chosen_index: Optional[int] = None + chosen_term: Optional[str] = None + chosen_code: Optional[str] = None + confidence: Optional[str] = None + verdict: str = "REVIEW" # CONFIRMED | REVIEW + evidence: list[str] = Field(default_factory=list) + reason: Optional[str] = None + candidates: list[DPCandidate] = Field(default_factory=list) + debug_scores: Optional[dict] = None + + class Diagnostic(BaseModel): texte: str cim10_suggestion: Optional[str] = None @@ -656,6 +733,12 @@ class DossierMedical(BaseModel): document_type: str = "" sejour: Sejour = Field(default_factory=Sejour) diagnostic_principal: Optional[Diagnostic] = None + dp_selection: Optional[DPSelection] = None + # Traçabilité DP (finalizer) — audit DIM + dp_trackare: Optional[DPSelection] = None # DP issu du Trackare (si existant) + dp_crh_only: Optional[DPSelection] = None # DP issu du CRH-only pipeline + dp_final: Optional[DPSelection] = None # DP final après arbitrage finalizer + quality_flags: dict = Field(default_factory=dict) diagnostics_associes: list[Diagnostic] = Field(default_factory=list) actes_ccam: list[ActeCCAM] = Field(default_factory=list) antecedents: list[Antecedent] = Field(default_factory=list) diff --git a/src/main.py b/src/main.py index cb66880..bf7b830 100644 --- a/src/main.py +++ b/src/main.py @@ -262,6 +262,13 @@ def process_pdf(pdf_path: Path) -> list[tuple[str, DossierMedical, Anonymization except Exception: logger.warning(" Erreur estimation GHM/metrics", exc_info=True) + # 10. Finalizer DP (arbitrage Trackare vs CRH, traçabilité) + try: + from .medical.dp_finalizer import finalize_dp + finalize_dp(dossier) + except Exception: + logger.warning(" Finalizer DP : erreur", exc_info=True) + dossier.processing_time_s = round(time.time() - t0, 2) results.append((anonymized_text, dossier, report)) @@ -629,6 +636,13 @@ def main(input_path: str | None = None) -> None: except Exception: logger.warning(" Erreur estimation GHM/metrics final", exc_info=True) + # Finalizer DP (arbitrage Trackare vs CRH, traçabilité) + try: + from .medical.dp_finalizer import finalize_dp + finalize_dp(merged) + except Exception: + logger.warning(" Finalizer DP fusionné : erreur", exc_info=True) + struct_dir = STRUCTURED_DIR / subdir struct_dir.mkdir(parents=True, exist_ok=True) merged_path = struct_dir / f"{subdir}_fusionne_cim10.json" diff --git a/src/medical/dp_finalizer.py b/src/medical/dp_finalizer.py new file mode 100644 index 0000000..13a46ab --- /dev/null +++ b/src/medical/dp_finalizer.py @@ -0,0 +1,335 @@ +"""DP Finalizer — arbitrage Trackare vs CRH-only. + +Dernière étape du pipeline DP : produit ``dp_final`` avec traçabilité +complète (``dp_trackare``, ``dp_crh_only``) et ``quality_flags`` audit. + +Principes : +- Clean architecture : logique métier isolée, pas de dépendance Ollama. +- Traçabilité : chaque décision est justifiée (reason + evidence + flags). +- Prudence : en cas de doute → REVIEW, jamais CONFIRMED. +""" + +from __future__ import annotations + +from src.config import DossierMedical, DPSelection + +# Whitelist Z-codes admis en DP CONFIRMED (même que dp_selector) +_Z_CODE_DP_WHITELIST = frozenset({ + "Z03", "Z04", "Z08", "Z09", + "Z38", "Z43", "Z45", "Z50", "Z51", "Z54", "Z75", + "Z99", +}) + + +def _family3(code: str | None) -> str: + """Extrait le préfixe 3 caractères (family3) d'un code CIM-10.""" + if not code: + return "" + return code.split(".")[0].upper() + + +def _code_in_candidates(code: str | None, selection: DPSelection) -> bool: + """Vérifie si *code* apparaît dans les candidats de *selection* (exact ou family3).""" + if not code or not selection.candidates: + return False + code_up = code.upper() + fam = _family3(code) + for c in selection.candidates: + c_code = (c.code or "").upper() + if c_code == code_up or _family3(c_code) == fam: + return True + return False + + +def _has_strong_evidence(sel: DPSelection) -> bool: + """Vérifie si la sélection a une evidence forte (non triviale).""" + if not sel.evidence: + return False + # "Source: Trackare" seul n'est pas une preuve forte + strong = [e for e in sel.evidence if "Trackare" not in e] + return len(strong) > 0 + + +def _make_selection( + code: str | None, + term: str | None, + verdict: str, + confidence: str, + evidence: list[str], + reason: str, + source_sel: DPSelection | None = None, +) -> DPSelection: + """Construit un DPSelection final en préservant les candidats de la source.""" + return DPSelection( + chosen_code=code, + chosen_term=term, + verdict=verdict, + confidence=confidence, + evidence=evidence, + reason=reason, + candidates=source_sel.candidates if source_sel else [], + debug_scores=source_sel.debug_scores if source_sel else None, + chosen_index=source_sel.chosen_index if source_sel else None, + ) + + +# ── Règles R1-R5 ────────────────────────────────────────────────────── + + +def decide_dp_final( + trackare_dp: DPSelection | None, + crh_dp: DPSelection | None, + allow_symptom_dp: bool = False, +) -> tuple[DPSelection, dict, list[str]]: + """Arbitrage Trackare vs CRH-only. + + Returns: + (dp_final, quality_flags_additions, alertes) + """ + flags: dict = {} + alertes: list[str] = [] + + # ── Cas dégénérés ────────────────────────────────────────────── + + if not trackare_dp and not crh_dp: + return ( + DPSelection(verdict="REVIEW", reason="Aucun DP disponible"), + {"no_dp_source": True}, + ["Aucun DP extrait (ni Trackare ni CRH)"], + ) + + if not trackare_dp and crh_dp: + # CRH-only mode — pass-through + dp = crh_dp.model_copy(deep=True) + flags["crh_only_mode"] = True + # Appliquer R5 post-hoc + dp, r5_flags, r5_alertes = _apply_r5(dp, crh_dp, allow_symptom_dp) + flags.update(r5_flags) + alertes.extend(r5_alertes) + return dp, flags, alertes + + if trackare_dp and not crh_dp: + # Trackare-only mode + code = trackare_dp.chosen_code or "" + # R5 : Z-code → REVIEW + if code.startswith("Z") and _family3(code) not in _Z_CODE_DP_WHITELIST: + dp = _make_selection( + code=trackare_dp.chosen_code, + term=trackare_dp.chosen_term, + verdict="REVIEW", + confidence="medium", + evidence=trackare_dp.evidence + ["Z-code en DP : vérification DIM requise"], + reason="R5 — Z-code non whitelisté en DP", + source_sel=trackare_dp, + ) + flags["trackare_only_mode"] = True + flags["z_code_dp_review"] = True + return dp, flags, ["DP Trackare Z-code non whitelisté → REVIEW"] + + dp = trackare_dp.model_copy(deep=True) + # Respecter un REVIEW déjà posé par le garde-fou dp_selector + if not dp.evidence: + dp.evidence = ["Source: Trackare (codage établissement)"] + flags["trackare_only_mode"] = True + return dp, flags, alertes + + # ── Les deux sources existent ────────────────────────────────── + assert trackare_dp is not None and crh_dp is not None + + t_code = (trackare_dp.chosen_code or "").upper() + c_code = (crh_dp.chosen_code or "").upper() + t_fam = _family3(t_code) + c_fam = _family3(c_code) + + # ── R3 — Trackare symptôme (R*) + CRH étiologique ───────────── + # (évalué avant R1 : cas spécifique > cas général) + if t_code.startswith("R"): + if crh_dp.verdict == "CONFIRMED" and not c_code.startswith("R") and _has_strong_evidence(crh_dp): + # Override : CRH étiologique CONFIRMED + dp = crh_dp.model_copy(deep=True) + dp.reason = "R3 — Trackare symptôme écarté au profit du CRH étiologique CONFIRMED" + flags["trackare_symptom_overridden"] = True + alertes.append( + f"Trackare symptôme ({t_code}) remplacé par CRH ({c_code}) — " + f"diagnostic étiologique CONFIRMED" + ) + # R5 post-hoc + dp, r5_flags, r5_alertes = _apply_r5(dp, crh_dp, allow_symptom_dp) + flags.update(r5_flags) + alertes.extend(r5_alertes) + return dp, flags, alertes + else: + # REVIEW prudent + dp = _make_selection( + code=trackare_dp.chosen_code, + term=trackare_dp.chosen_term, + verdict="REVIEW", + confidence="medium", + evidence=[ + "Source: Trackare (codage établissement)", + "Alerte: Trackare code un symptôme (R*) mais le CRH mentionne un diagnostic étiologique", + ], + reason="R3 — Trackare symptôme vs CRH diagnostic : vérification DIM requise", + source_sel=trackare_dp, + ) + flags["trackare_symptom_vs_crh_diagnosis"] = True + alertes.append( + f"Trackare symptôme ({t_code}) vs CRH ({c_code}) — vérification DIM requise" + ) + return dp, flags, alertes + + # ── R1 — CRH CONFIRMED avec evidence forte → CRH prime ──────── + if crh_dp.verdict == "CONFIRMED" and _has_strong_evidence(crh_dp): + dp = crh_dp.model_copy(deep=True) + dp.reason = "R1 — CRH-only CONFIRMED avec preuves fortes" + if t_code and t_code != c_code and t_fam != c_fam: + flags["override_trackare_by_crh_confirmed"] = True + alertes.append( + f"DP final basé CRH-only CONFIRMED ({c_code}) — " + f"Trackare ({t_code}) écarté (preuves CRH supérieures)" + ) + else: + flags["crh_confirmed_coherent"] = True + # R5 post-hoc + dp, r5_flags, r5_alertes = _apply_r5(dp, crh_dp, allow_symptom_dp) + flags.update(r5_flags) + alertes.extend(r5_alertes) + return dp, flags, alertes + + # ── R2 — Trackare non-symptôme, cohérent CRH → confirmer ────── + if ( + not t_code.startswith("R") + and not t_code.startswith("Z") + and (t_code == c_code or t_fam == c_fam or _code_in_candidates(t_code, crh_dp)) + ): + dp = _make_selection( + code=trackare_dp.chosen_code, + term=trackare_dp.chosen_term, + verdict="CONFIRMED", + confidence="high", + evidence=[ + "Source: Trackare (codage établissement)", + f"Trackare {t_code} corroboré par CRH (candidat {c_code})", + ], + reason="R2 — Trackare non-symptôme corroboré par CRH", + source_sel=crh_dp, + ) + flags["trackare_confirmed_by_crh"] = True + return dp, flags, alertes + + # ── R4 — Ambigu / preuves faibles → REVIEW ──────────────────── + if trackare_dp: + base = trackare_dp + base_label = "Trackare" + else: + base = crh_dp + base_label = "CRH" + dp = _make_selection( + code=base.chosen_code, + term=base.chosen_term, + verdict="REVIEW", + confidence="medium", + evidence=base.evidence[:2] + ["Preuves insuffisantes pour confirmation automatique"], + reason="R4 — Ambigu / preuves faibles", + source_sel=crh_dp, + ) + flags["review_ambiguous"] = True + alertes.append(f"DP ambigu ({base_label} {base.chosen_code or '?'}) — REVIEW") + return dp, flags, alertes + + +# ── R5 — Interdits auto-confirm (post-hoc) ──────────────────────── + + +def _apply_r5( + dp: DPSelection, + crh_dp: DPSelection | None, + allow_symptom_dp: bool, +) -> tuple[DPSelection, dict, list[str]]: + """Applique R5 : Z-code et R-code jamais auto-CONFIRMED (sauf whitelist).""" + flags: dict = {} + alertes: list[str] = [] + code = (dp.chosen_code or "").upper() + + # Z-code non whitelisté → forcer REVIEW + if code.startswith("Z") and _family3(code) not in _Z_CODE_DP_WHITELIST: + if dp.verdict == "CONFIRMED": + dp.verdict = "REVIEW" + dp.confidence = "medium" + dp.evidence.append("R5 — Z-code non whitelisté en DP → REVIEW") + dp.reason = (dp.reason or "") + " | R5 Z-code" + flags["z_code_dp_review"] = True + alertes.append(f"Z-code {code} en DP → REVIEW (R5)") + + # R-code avec candidat non-R disponible → REVIEW si allow_symptom_dp=false + if ( + code.startswith("R") + and not allow_symptom_dp + and crh_dp + and any( + not (c.code or "").upper().startswith("R") + for c in crh_dp.candidates + if c.code + ) + ): + if dp.verdict == "CONFIRMED": + dp.verdict = "REVIEW" + dp.confidence = "medium" + dp.evidence.append("R5 — Symptôme R-code en DP avec candidat non-R disponible → REVIEW") + dp.reason = (dp.reason or "") + " | R5 R-code" + flags["r_code_dp_with_non_r_candidate"] = True + alertes.append(f"R-code {code} en DP avec alternative non-R → REVIEW (R5)") + + return dp, flags, alertes + + +# ── Fonction publique ────────────────────────────────────────────── + + +def finalize_dp(dossier: DossierMedical) -> DossierMedical: + """Point d'entrée unique du finalizer. + + Lit ``dp_selection`` et ``document_type``, produit : + - ``dp_trackare`` (si Trackare) + - ``dp_crh_only`` (si CRH) + - ``dp_final`` (arbitrage) + - ``quality_flags`` (merge sans écraser) + - ``alertes_codage`` (append) + """ + # ── 1. Identifier les sources DP ─────────────────────────────── + trackare_dp: DPSelection | None = None + crh_dp: DPSelection | None = None + + if dossier.dp_selection: + sel = dossier.dp_selection + reason = (sel.reason or "").lower() + is_trackare_source = ( + dossier.document_type == "trackare" + or "trackare" in reason + or any("Trackare" in e for e in sel.evidence) + ) + if is_trackare_source: + trackare_dp = sel + else: + crh_dp = sel + + # ── 2. Stocker les sources pour traçabilité ──────────────────── + dossier.dp_trackare = trackare_dp + dossier.dp_crh_only = crh_dp + + # ── 3. Arbitrage ─────────────────────────────────────────────── + dp_final, flags, alertes = decide_dp_final(trackare_dp, crh_dp) + + # ── 4. Écrire les résultats ──────────────────────────────────── + dossier.dp_final = dp_final + + # Merge quality_flags (ne pas écraser les flags existants) + for k, v in flags.items(): + dossier.quality_flags[k] = v + + # Append alertes_codage + for alerte in alertes: + if alerte not in dossier.alertes_codage: + dossier.alertes_codage.append(alerte) + + return dossier