diff --git a/src/config.py b/src/config.py index 4b1680f..fbf3663 100644 --- a/src/config.py +++ b/src/config.py @@ -82,6 +82,32 @@ EMBEDDING_MODEL = os.environ.get("T2A_EMBEDDING_MODEL", "dangvantuan/sentence-ca RERANKER_MODEL = os.environ.get("T2A_RERANKER_MODEL", "cross-encoder/ms-marco-MiniLM-L-6-v2") +# --- Scoring DP (Diagnostic Principal) --- + +DP_SCORING_WEIGHTS: dict[str, int] = { + "section_diag_sortie": 4, + "section_diag_principal": 4, + "section_motif_hospitalisation": 3, + "section_conclusion": 2, + "section_synthese": 2, + "section_edsnlp": 1, + "section_regex": 1, + "proof_excerpt": 2, # excerpt non-vide + page + "negation": -4, # "pas de", "absence de", "éliminé" + "conditional": -3, # "suspect", "probable", "?" + "z_code_dp": -2, # sauf whitelist + "r_code_dp": -2, # symptôme en DP + "comorbidity_weak": -3, # comorbidité banale (toutes sections, sauf preuve PEC) +} +DP_REVIEW_THRESHOLD: int = 2 # delta minimum top1-top2 pour éviter REVIEW + +# Z-codes admis en DP (soins itératifs, surveillance, nouveau-né, rééducation, etc.) +DP_Z_CODE_WHITELIST: frozenset[str] = frozenset({ + "Z51.1", "Z51.0", "Z38", "Z50.1", "Z43", "Z45", "Z09", "Z54", + "Z75", "Z03", "Z04", "Z08", +}) + + # --- Modèles de données CIM-10 --- @@ -128,6 +154,28 @@ class Diagnostic(BaseModel): source_excerpt: Optional[str] = None # extrait du texte source (~200 chars) +class DPCandidate(BaseModel): + code: Optional[str] = None + label: str + source_section: str # "diag_sortie" | "diag_principal" | "conclusion" | "synthese" | "motif_hospitalisation" | "edsnlp" | "regex" + source_excerpt: Optional[str] = None + source_page: Optional[int] = None + confidence_raw: Optional[str] = None # "high" | "medium" | "low" + score: int = 0 + score_details: dict[str, int] = Field(default_factory=dict) + is_negated: bool = False + is_conditional: bool = False + dp_code_original_llm: Optional[str] = None # code original proposé par LLM (avant normalisation) + dp_code_normalized: bool = False # True si le code a été normalisé (parent/fallback) + + +class DPSelection(BaseModel): + verdict: str = "confirmed" # "confirmed" | "review" + candidates: list[DPCandidate] = Field(default_factory=list) + winner_reason: Optional[str] = None + llm_tiebreak: Optional[dict] = None + + class ActeCCAM(BaseModel): texte: str code_ccam_suggestion: Optional[str] = None @@ -183,6 +231,7 @@ class DossierMedical(BaseModel): document_type: str = "" sejour: Sejour = Field(default_factory=Sejour) diagnostic_principal: Optional[Diagnostic] = None + dp_selection: Optional[DPSelection] = None diagnostics_associes: list[Diagnostic] = Field(default_factory=list) actes_ccam: list[ActeCCAM] = Field(default_factory=list) antecedents: list[Antecedent] = Field(default_factory=list) @@ -248,6 +297,13 @@ class ControleCPAM(BaseModel): da_ucr: Optional[str] = None dr_ucr: Optional[str] = None actes_ucr: Optional[str] = None + # Champs enrichis (format ucr_extract) + codes_etablissement: Optional[str] = None + libelle_etablissement: Optional[str] = None + codes_controleurs: Optional[str] = None + libelle_controleurs: Optional[str] = None + codes_retenus: Optional[str] = None + ghm_ghs: Optional[str] = None contre_argumentation: Optional[str] = None response_data: Optional[dict] = None sources_reponse: list[RAGSource] = Field(default_factory=list) diff --git a/src/control/cpam_parser.py b/src/control/cpam_parser.py index 1ed7147..002a86c 100644 --- a/src/control/cpam_parser.py +++ b/src/control/cpam_parser.py @@ -1,4 +1,12 @@ -"""Parsing du fichier Excel de contrôle CPAM (UCR) et matching OGC.""" +"""Parsing du fichier Excel de contrôle CPAM (UCR) et matching OGC. + +Supporte deux formats : +- **Ancien** (ogc_structure) : colonnes N° OGC, Titre, Arg_UCR, Décision_UCR, DP_UCR, DA_UCR, DR_UCR, Actes_UCR +- **Nouveau** (ucr_extract) : colonnes N° OGC, Type désaccord, Codes Établissement, Codes Contrôleurs, + Décision UCR, Codes retenus, GHM / GHS, Texte décision, etc. + +Le format est auto-détecté à partir des en-têtes de la première ligne. +""" from __future__ import annotations @@ -12,18 +20,15 @@ from ..config import ControleCPAM logger = logging.getLogger(__name__) -# Colonnes attendues dans le fichier Excel -_EXPECTED_COLUMNS = ("N° OGC", "Titre", "Arg_UCR", "Décision_UCR", "DP_UCR", "DA_UCR", "DR_UCR", "Actes_UCR") +# Colonnes obligatoires par format +_LEGACY_REQUIRED = ("N° OGC", "Titre", "Arg_UCR", "Décision_UCR") +_NEW_REQUIRED = ("N° OGC", "Type désaccord", "Décision UCR", "Texte décision") def parse_cpam_excel(path: str | Path) -> dict[int, list[ControleCPAM]]: """Lit le fichier Excel de contrôle CPAM et retourne un dict OGC -> liste de contrôles. - Args: - path: Chemin vers le fichier .xlsx CPAM. - - Returns: - Dict avec le numéro OGC comme clé et la liste des contrôles associés. + Auto-détecte le format (ancien ogc_structure vs nouveau ucr_extract). """ path = Path(path) if not path.exists(): @@ -33,33 +38,53 @@ def parse_cpam_excel(path: str | Path) -> dict[int, list[ControleCPAM]]: wb = openpyxl.load_workbook(path, read_only=True) ws = wb[wb.sheetnames[0]] - # Lire l'en-tête rows = ws.iter_rows(values_only=True) header = next(rows, None) if header is None: logger.error("Fichier CPAM vide : %s", path) + wb.close() return {} - # Construire le mapping colonne -> index col_map = {} for i, col_name in enumerate(header): if col_name: - col_map[col_name.strip()] = i + col_map[str(col_name).strip()] = i - # Vérifier les colonnes requises - missing = [c for c in _EXPECTED_COLUMNS[:4] if c not in col_map] - if missing: - logger.error("Colonnes manquantes dans le fichier CPAM : %s", missing) + # Auto-détection du format + is_new = all(c in col_map for c in _NEW_REQUIRED) + is_legacy = all(c in col_map for c in _LEGACY_REQUIRED) + + if is_new: + logger.info("CPAM : format ucr_extract détecté") + result = _parse_new_format(rows, col_map) + elif is_legacy: + logger.info("CPAM : format ogc_structure (ancien) détecté") + result = _parse_legacy_format(rows, col_map) + else: + missing_new = [c for c in _NEW_REQUIRED if c not in col_map] + missing_leg = [c for c in _LEGACY_REQUIRED if c not in col_map] + logger.error( + "Format CPAM non reconnu. Colonnes trouvées : %s. " + "Manquantes (nouveau) : %s, (ancien) : %s", + list(col_map.keys()), missing_new, missing_leg, + ) + wb.close() return {} + wb.close() + total = sum(len(v) for v in result.values()) + logger.info("CPAM : %d contrôles chargés pour %d OGC distincts", total, len(result)) + return result + + +def _parse_legacy_format(rows, col_map: dict[str, int]) -> dict[int, list[ControleCPAM]]: + """Parse l'ancien format ogc_structure.""" result: dict[int, list[ControleCPAM]] = {} - count = 0 for row in rows: ogc_val = row[col_map["N° OGC"]] if ogc_val is None: continue - try: numero_ogc = int(ogc_val) except (ValueError, TypeError): @@ -76,11 +101,104 @@ def parse_cpam_excel(path: str | Path) -> dict[int, list[ControleCPAM]]: dr_ucr=_clean_optional(row, col_map.get("DR_UCR")), actes_ucr=_clean_optional(row, col_map.get("Actes_UCR")), ) - result.setdefault(numero_ogc, []).append(controle) - count += 1 - logger.info("CPAM : %d contrôles chargés pour %d OGC distincts", count, len(result)) + return result + + +def _parse_new_format(rows, col_map: dict[str, int]) -> dict[int, list[ControleCPAM]]: + """Parse le nouveau format ucr_extract. + + Mapping colonnes : + N° OGC → numero_ogc + Type désaccord → titre (ex: "Désaccord sur le DP") + Texte décision → arg_ucr + Décision UCR → decision_ucr (Favorable / Défavorable) + Codes Contrôleurs → dp_ucr / da_ucr selon Type désaccord + Codes Établissement → codes_etablissement + Libellé Établissement → libelle_etablissement + Libellé Contrôleurs → libelle_controleurs + Codes retenus → codes_retenus + GHM / GHS → ghm_ghs + """ + result: dict[int, list[ControleCPAM]] = {} + + idx_ogc = col_map["N° OGC"] + idx_type = col_map["Type désaccord"] + idx_decision = col_map["Décision UCR"] + idx_texte = col_map["Texte décision"] + idx_codes_etab = col_map.get("Codes Établissement") + idx_lib_etab = col_map.get("Libellé Établissement") + idx_codes_ctrl = col_map.get("Codes Contrôleurs") + idx_lib_ctrl = col_map.get("Libellé Contrôleurs") + idx_codes_ret = col_map.get("Codes retenus") + idx_ghm = col_map.get("GHM / GHS") + + for row in rows: + ogc_val = row[idx_ogc] + if ogc_val is None: + continue + try: + numero_ogc = int(ogc_val) + except (ValueError, TypeError): + logger.warning("N° OGC invalide ignoré : %s", ogc_val) + continue + + type_desaccord = str(row[idx_type] or "").strip() + decision = str(row[idx_decision] or "").strip() + texte_decision = str(row[idx_texte] or "").strip() + codes_ctrl = _clean_optional(row, idx_codes_ctrl) + codes_etab = _clean_optional(row, idx_codes_etab) + + # Construire le titre lisible + if type_desaccord == "DP": + titre = "Désaccord sur le DP" + elif type_desaccord == "DAS": + titre = "Désaccord sur les DAS" + elif type_desaccord == "DP+DAS": + titre = "Désaccord sur le DP et les DAS" + else: + titre = f"Désaccord : {type_desaccord}" if type_desaccord else "" + + # Mapper la décision vers le format attendu par cpam_response + if decision.lower().startswith("favorable"): + decision_ucr = "UCR retient" + elif decision.lower().startswith("défavorable") or decision.lower().startswith("defavorable"): + decision_ucr = "UCR confirme avis médecins contrôleurs" + else: + decision_ucr = decision + + # Distribuer les codes selon le type de désaccord + dp_ucr = None + da_ucr = None + if type_desaccord == "DP": + dp_ucr = codes_ctrl + elif type_desaccord == "DAS": + da_ucr = codes_ctrl + elif type_desaccord == "DP+DAS": + # Les codes contrôleurs peuvent mélanger DP et DAS. + # Convention : le premier code est le DP, le reste DAS. + if codes_ctrl: + parts = [c.strip() for c in codes_ctrl.split(",") if c.strip()] + dp_ucr = parts[0] if parts else None + da_ucr = ",".join(parts[1:]) if len(parts) > 1 else None + + controle = ControleCPAM( + numero_ogc=numero_ogc, + titre=titre, + arg_ucr=texte_decision, + decision_ucr=decision_ucr, + dp_ucr=dp_ucr, + da_ucr=da_ucr, + codes_etablissement=codes_etab, + libelle_etablissement=_clean_optional(row, idx_lib_etab), + codes_controleurs=codes_ctrl, + libelle_controleurs=_clean_optional(row, idx_lib_ctrl), + codes_retenus=_clean_optional(row, idx_codes_ret), + ghm_ghs=_clean_optional(row, idx_ghm), + ) + result.setdefault(numero_ogc, []).append(controle) + return result diff --git a/src/extraction/crh_parser.py b/src/extraction/crh_parser.py index 0445faf..27a9933 100644 --- a/src/extraction/crh_parser.py +++ b/src/extraction/crh_parser.py @@ -113,12 +113,19 @@ def _extract_medical_content(text: str, result: dict) -> None: result["contenu_medical"] = m.group(1).strip() # Sections spécifiques + # Note : les terminaisons incluent les en-têtes des sections suivantes + # pour éviter la capture excessive (une section s'arrête quand la suivante commence). + _DIAG_HEADERS = r"Diagnostic(?:s)?\s+(?:de\s+sortie|retenu|principal)|Problème\s+principal|Synthèse|En\s+résumé|En\s+synthèse" section_patterns = [ - ("motif_hospitalisation", r"(?:motif\s+(?:d'hospitalisation|suivant))\s*[:\s]*\n?(.*?)(?=\n\s*(?:Antécédents|Histoire|Examen|Au total|Devenir|TTT)|$)"), - ("antecedents", r"(?:Antécédents?)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Histoire|Examen|Traitement|Au total|Devenir)|$)"), - ("histoire_maladie", r"(?:Histoire de la maladie)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Examen|Biologie|Au total|Devenir)|$)"), - ("examen_clinique", r"(?:Examen clinique)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Biologie|Imagerie|Au total|Devenir)|$)"), - ("conclusion", r"(?:Au total|Conclusion)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|TTT|Traitement)|$)"), + ("motif_hospitalisation", r"(?:motif\s+(?:d'hospitalisation|suivant))\s*[:\s]*\n?(.*?)(?=\n\s*(?:Antécédents|Histoire|Examen|Au total|Devenir|TTT|" + _DIAG_HEADERS + r")|$)"), + ("antecedents", r"(?:Antécédents?)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Histoire|Examen|Traitement|Au total|Devenir|" + _DIAG_HEADERS + r")|$)"), + ("histoire_maladie", r"(?:Histoire de la maladie)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Examen|Biologie|Au total|Devenir|" + _DIAG_HEADERS + r")|$)"), + ("examen_clinique", r"(?:Examen clinique)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Biologie|Imagerie|Au total|Devenir|" + _DIAG_HEADERS + r")|$)"), + ("conclusion", r"(?:Au total|Conclusion)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|TTT|Traitement|" + _DIAG_HEADERS + r")|$)"), + # Sections à fort signal DP (avant traitement_sortie pour priorité) + ("diag_sortie", r"(?:Diagnostic(?:s)?\s+de\s+sortie|Diagnostic(?:s)?\s+retenu(?:s)?(?:\s+(?:à\s+la\s+sortie))?)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|TTT|Traitement|Synthèse|En\s+résumé|Rédigé|Cordialement)|$)"), + ("diag_principal", r"(?:Diagnostic\s+principal|Problème\s+principal)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Diagnostic(?:s)?\s+(?:de\s+sortie|retenu|associé)|Devenir|TTT|Traitement|Synthèse|En\s+résumé|Rédigé|Cordialement)|$)"), + ("synthese", r"(?:Synthèse|En\s+résumé|En\s+synthèse)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|TTT|Traitement|Rédigé|Cordialement)|$)"), ("traitement_sortie", r"(?:TTT de sortie|Traitement de sortie)\s*[:\s]*\n?(.*?)(?=\n\s*(?:Devenir|Rédigé|Cordialement)|$)"), ("devenir", r"(?:Devenir)\s*[:\s]*\n?(.*?)(?=\n\s*(?:TTT|Traitement|Rédigé|Cordialement)|$)"), ] diff --git a/src/medical/cim10_extractor.py b/src/medical/cim10_extractor.py index cb131e0..90b5927 100644 --- a/src/medical/cim10_extractor.py +++ b/src/medical/cim10_extractor.py @@ -116,7 +116,7 @@ def extract_medical_info( search_text = raw_text or anonymized_text _extract_sejour(parsed_data, dossier) - _extract_diagnostics(parsed_data, anonymized_text, dossier, edsnlp_result) + _extract_diagnostics(parsed_data, anonymized_text, dossier, edsnlp_result, use_rag=use_rag) _extract_actes(anonymized_text, dossier) _extract_antecedents(anonymized_text, dossier) _extract_traitements(parsed_data, anonymized_text, dossier, edsnlp_result) @@ -306,6 +306,7 @@ def _extract_diagnostics( text: str, dossier: DossierMedical, edsnlp_result: Optional[EdsnlpResult] = None, + use_rag: bool = False, ) -> None: """Extrait le diagnostic principal et les diagnostics associés.""" text_lower = text.lower() @@ -342,21 +343,52 @@ def _extract_diagnostics( if not ent.negation and not ent.hypothese: edsnlp_codes[ent.code] = ent.texte - # Si pas de DP depuis le codage, chercher dans le texte + # Si pas de DP depuis le codage, utiliser le scoring multi-candidats if not dossier.diagnostic_principal: - # D'abord essayer le fallback regex (plus précis pour les patterns spécifiques) - dp = _find_diagnostic_principal(text_lower, conclusion) - if dp: - dossier.diagnostic_principal = dp - elif edsnlp_codes: - # Utiliser la première entité CIM-10 edsnlp comme DP - code, texte = next(iter(edsnlp_codes.items())) - texte_clean = texte.capitalize() - if is_valid_diagnostic_text(texte_clean): - dossier.diagnostic_principal = Diagnostic( - texte=texte_clean, cim10_suggestion=code, - source="edsnlp", - ) + from .dp_scoring import build_dp_shortlist, score_candidates, select_dp, llm_dp_fallback + + candidates = build_dp_shortlist(parsed, text, edsnlp_result, dossier) + candidates = score_candidates(candidates, dossier, full_text=text) + selection = select_dp(candidates, dossier, use_llm=use_rag) + + # Fallback LLM : si scoring déterministe → REVIEW et LLM autorisé + if use_rag and selection.verdict == "review": + # Instrumentation : dp_pre_llm + pre_code = selection.candidates[0].code if selection.candidates else None + pre_section = selection.candidates[0].source_section if selection.candidates else None + is_comorbidity_trigger = "comorbidité banale" in (selection.winner_reason or "") + logger.info( + "DP pre-LLM: code=%s section=%s trigger_comorbidity_fallback=%s", + pre_code, pre_section, is_comorbidity_trigger, + ) + + llm_selection = llm_dp_fallback( + parsed, text, dossier, + dp_candidates=candidates, + edsnlp_result=edsnlp_result, + ) + # Fusionner candidats LLM + déterministes (LLM en tête) + if llm_selection.candidates: + all_candidates = list(llm_selection.candidates) + if selection.candidates: + all_candidates.extend(selection.candidates) + llm_selection.candidates = all_candidates + selection = llm_selection + + # Instrumentation : dp_post_llm + post_code = selection.candidates[0].code if selection.candidates else None + logger.info("DP post-LLM: code=%s verdict=%s", post_code, selection.verdict) + + dossier.dp_selection = selection + if selection.candidates: + winner = selection.candidates[0] + dossier.diagnostic_principal = Diagnostic( + texte=winner.label, + cim10_suggestion=winner.code, + source=winner.source_section, + source_page=winner.source_page, + source_excerpt=winner.source_excerpt, + ) # Diagnostics associés depuis le texte (regex) das = _find_diagnostics_associes(text_lower, conclusion, dossier) diff --git a/src/medical/dp_scoring.py b/src/medical/dp_scoring.py new file mode 100644 index 0000000..b344a2f --- /dev/null +++ b/src/medical/dp_scoring.py @@ -0,0 +1,844 @@ +"""Scoring déterministe du Diagnostic Principal (DP) pour les CRH. + +Collecte les candidats DP depuis les sections CRH parsées, les entités edsnlp +et les regex, puis applique un scoring multi-critères pour sélectionner le +meilleur candidat ou signaler une ambiguïté (verdict REVIEW). + +Fallback LLM one-shot : si use_llm=True et verdict REVIEW, un appel unique +au LLM voit les sections fortes et propose dp_code + evidence en un seul pass. +""" + +from __future__ import annotations + +import logging +import re +from typing import Optional + +from ..config import ( + DossierMedical, + DPCandidate, + DPSelection, + DP_REVIEW_THRESHOLD, + DP_SCORING_WEIGHTS, + DP_Z_CODE_WHITELIST, +) +from .cim10_dict import normalize_code, normalize_text, validate_code as cim10_validate + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Patterns de négation et conditionnel +# --------------------------------------------------------------------------- + +_NEGATION_PATTERNS = re.compile( + r"(?:pas\s+de|absence\s+d[e']|non\s+retenu|exclu[es]?|" + r"[ée]limin[ée]|n[ée]gatif|aucun[e]?\s|sans\s)", + re.IGNORECASE, +) + +_CONDITIONAL_PATTERNS = re.compile( + r"(?:suspect[ée]?|probable|hypothèse|hypothese|\?\s*$|" + r"[àa]\s+confirmer|[ée]ventuel(?:le)?|possiblement|" + r"ne\s+peut\s+(?:pas\s+)?[êe]tre\s+exclu)", + re.IGNORECASE, +) + +# Regex pour extraire des codes CIM-10 explicites dans du texte +# Exige le format avec point (X##.#) pour éviter les faux positifs 3-char : +# P02 (diététique), N34 (mutation N34S), T36 (T36.7°C = température) +# Les codes 3-char sans point sont trop ambigus en texte libre. +# CIM10_MAP gère les correspondances terme→code pour les diagnostics courants. +_CIM10_CODE_RE = re.compile(r"\b([A-Z]\d{2}\.\d{1,2})(?![A-Za-z°])") + +# Codes de comorbidité banals : pénalisés en DP (toutes sections) +# Presque toujours DAS, même s'ils apparaissent en conclusion/motif +_COMORBIDITY_PREFIXES = ("I10", "E66.", "E78.", "E11.", "D64.9") + +# Patterns de preuve explicite de PEC principale (exception comorbidité) +# Ex: "hospitalisé pour HTA maligne", "prise en charge de l'obésité morbide" +_PEC_PROOF_RE = re.compile( + r"(?:hospitalis[ée]e?\s+pour" + r"|prise\s+en\s+charge" + r"|admission\s+pour" + r"|adress[ée]e?\s+pour)", + re.IGNORECASE, +) + + +# Sections à fort signal DP +# NB : dans ce corpus CRH, "diag_sortie"/"diag_principal" n'existent quasiment +# jamais. "conclusion" et "synthese" SONT les sections diagnostiques de fait. +_STRONG_SECTIONS = frozenset({ + "motif", "motif_hospitalisation", + "diag_sortie", "diagnostics_retenus", "diag_principal", + "conclusion", "synthese", +}) + +# Mapping de normalisation : noms libres renvoyés par le LLM → clés de section +_SECTION_NORMALIZE_MAP = { + # conclusion + "conclusion": "conclusion", + "conclusions": "conclusion", + "au total": "conclusion", + # synthese + "synthèse": "synthese", + "synthese": "synthese", + "synthèse du séjour": "synthese", + "synthese du sejour": "synthese", + "synthèse du dossier": "synthese", + "synthese du dossier": "synthese", + "synthèse clinique": "synthese", + "synthese clinique": "synthese", + "en résumé": "synthese", + "en resume": "synthese", + "en synthèse": "synthese", + "en synthese": "synthese", + "résumé": "synthese", + "resume": "synthese", + # motif_hospitalisation + "motif": "motif_hospitalisation", + "motif d'hospitalisation": "motif_hospitalisation", + "motif d'admission": "motif_hospitalisation", + "motif de consultation": "motif_hospitalisation", + "motif_hospitalisation": "motif_hospitalisation", + "motif hospitalisation": "motif_hospitalisation", + "admission": "motif_hospitalisation", + "motif d'entrée": "motif_hospitalisation", + "motif d'entree": "motif_hospitalisation", + # diag_sortie + "diagnostic de sortie": "diag_sortie", + "diagnostics de sortie": "diag_sortie", + "diag_sortie": "diag_sortie", + "diag sortie": "diag_sortie", + # diagnostics_retenus + "diagnostic retenu": "diagnostics_retenus", + "diagnostics retenus": "diagnostics_retenus", + "diagnostic retenu à la sortie": "diagnostics_retenus", + "diagnostics retenus à la sortie": "diagnostics_retenus", + "diagnostics_retenus": "diagnostics_retenus", + # diag_principal + "diagnostic principal": "diag_principal", + "diag_principal": "diag_principal", + "diag principal": "diag_principal", + "problème principal": "diag_principal", + "probleme principal": "diag_principal", + # histoire_maladie + "histoire de la maladie": "histoire_maladie", + "histoire_maladie": "histoire_maladie", + "histoire maladie": "histoire_maladie", + "hdm": "histoire_maladie", + # evolution + "evolution dans le service": "evolution", + "évolution dans le service": "evolution", + "evolution": "evolution", + "évolution": "evolution", + # examen + "examen clinique": "examen_clinique", + "examen_clinique": "examen_clinique", + # actes + "indication opératoire": "indication_operatoire", + "indication operatoire": "indication_operatoire", + "prise en charge chirurgicale": "indication_operatoire", + "actes réalisés": "actes", + "actes realises": "actes", + "actes": "actes", + # administratif / bruit → "autres" + "sections cliniques": "autres", + "sections_cliniques": "autres", + "sections fortes du dossier": "autres", + "secrétariat": "autres", + "secretariat": "autres", + "médecine interne": "autres", + "medecine interne": "autres", + "médecin": "autres", + "medecin": "autres", + "courrier": "autres", + "courrier de sortie": "autres", + "compte rendu": "autres", + "compte-rendu": "autres", + "dossier médical": "autres", + "dossier medical": "autres", + "observations": "autres", +} + +# Fallback par mots-clés quand la correspondance exacte échoue. +# Paires (mot-clé(s), section_normalisée) testées dans l'ordre — premier match gagne. +_SECTION_KEYWORD_FALLBACKS: list[tuple[tuple[str, ...], str]] = [ + # diagnostic + sortie/retenu → diag_sortie / diagnostics_retenus + (("diagnostic", "sortie"), "diag_sortie"), + (("diagnostic", "retenu"), "diagnostics_retenus"), + # conclusion / synthese + (("conclusion",), "conclusion"), + (("synthese",), "synthese"), + (("synthèse",), "synthese"), + (("au total",), "synthese"), + (("en résumé",), "synthese"), + # motif / admission + (("motif",), "motif_hospitalisation"), + (("admission",), "motif_hospitalisation"), +] + + +def _normalize_evidence_section(raw_section: str) -> str: + """Normalise le nom de section renvoyé par le LLM vers une clé standard. + + 1. Nettoyage : lower, strip, retrait crochets/deux-points/guillemets. + 2. Correspondance exacte dans _SECTION_NORMALIZE_MAP. + 3. Fallback par mots-clés (_SECTION_KEYWORD_FALLBACKS). + """ + if not raw_section: + return "" + # Nettoyage agressif : crochets, guillemets, deux-points, underscores → espaces + key = raw_section.lower().strip() + key = re.sub(r"[\[\]\"':]+", "", key).strip() + + # 1. Exact match + result = _SECTION_NORMALIZE_MAP.get(key) + if result: + return result + + # 1b. Tenter aussi avec underscores → espaces + key_spaces = key.replace("_", " ") + result = _SECTION_NORMALIZE_MAP.get(key_spaces) + if result: + return result + + # 2. Fallback par mots-clés + for keywords, section in _SECTION_KEYWORD_FALLBACKS: + if all(kw in key for kw in keywords): + return section + + return key + + +def _is_comorbidity_code(code: str) -> bool: + """Vérifie si un code est une comorbidité banale (I10, E66.x, E78.x, E11.x, D64.9).""" + return any(code.startswith(prefix) for prefix in _COMORBIDITY_PREFIXES) + + +def _has_explicit_pec_proof(label: str, full_text: str) -> bool: + """Vérifie si le texte contient une preuve explicite que cette comorbidité + est le motif PRINCIPAL de prise en charge. + + Cherche "hospitalisé pour", "prise en charge de", "admission pour", etc. + suivis du label de la comorbidité dans une fenêtre de 100 caractères. + """ + if not full_text or not label: + return False + text_lower = full_text.lower() + label_lower = label.lower() + for m in _PEC_PROOF_RE.finditer(text_lower): + window = text_lower[m.end():m.end() + 100] + if label_lower in window: + return True + return False + + +# --------------------------------------------------------------------------- +# 1. Construction de la shortlist +# --------------------------------------------------------------------------- + +def build_dp_shortlist( + parsed: dict, + text: str, + edsnlp_result, + dossier: DossierMedical, +) -> list[DPCandidate]: + """Collecte les candidats DP depuis les sections CRH, edsnlp et regex. + + Déduplique par code CIM-10 en gardant la section la plus forte. + """ + from .cim10_extractor import CIM10_MAP, _find_diagnostic_principal + from .das_filter import is_valid_diagnostic_text, clean_diagnostic_text + + candidates: list[DPCandidate] = [] + sections = parsed.get("sections", {}) + + # Ordre de priorité des sections (décroissant) + section_priority = [ + "diag_sortie", "diag_principal", "motif_hospitalisation", + "conclusion", "synthese", + ] + + # 1. Sections CRH à fort signal + for section_key in section_priority: + section_text = sections.get(section_key, "") + if not section_text: + continue + + section_norm = normalize_text(section_text) + + # 1a. Codes CIM-10 explicites dans le texte de section + for m in _CIM10_CODE_RE.finditer(section_text): + code = normalize_code(m.group(1)) + is_valid, label = cim10_validate(code) + if is_valid: + excerpt = _extract_excerpt(section_text, m.start()) + candidates.append(DPCandidate( + code=code, + label=label, + source_section=section_key, + source_excerpt=excerpt, + )) + + # 1b. CIM10_MAP uniquement (curé pour les DP courants) + # On n'utilise PAS dict_lookup car le dictionnaire complet (10K+ entrées) + # produit des faux positifs par substring match sur du texte libre. + for terme, code in CIM10_MAP.items(): + if normalize_text(terme) in section_norm: + candidates.append(DPCandidate( + code=code, + label=terme.capitalize(), + source_section=section_key, + source_excerpt=section_text[:200].strip(), + )) + break # plus-long-match : CIM10_MAP est ordonné spécifique→générique + + # 2. edsnlp entities + if edsnlp_result: + for ent in edsnlp_result.cim10_entities: + if ent.negation or ent.hypothese: + continue + texte = clean_diagnostic_text(ent.texte.capitalize()) + if not is_valid_diagnostic_text(texte): + continue + candidates.append(DPCandidate( + code=ent.code, + label=texte, + source_section="edsnlp", + )) + + # 3. Regex fallback (_find_diagnostic_principal sur texte complet) + text_lower = text.lower() + conclusion = sections.get("conclusion", "") + dp_regex = _find_diagnostic_principal(text_lower, conclusion) + if dp_regex: + candidates.append(DPCandidate( + code=dp_regex.cim10_suggestion, + label=dp_regex.texte, + source_section="regex", + source_excerpt=dp_regex.source_excerpt, + )) + + # 4. Dédup par code CIM-10 : garder la section la plus forte + candidates = _dedup_by_code(candidates, section_priority) + + return candidates + + +def _extract_excerpt(text: str, pos: int, window: int = 100) -> str: + """Extrait ~200 chars autour d'une position dans le texte.""" + start = max(0, pos - window) + end = min(len(text), pos + window) + return text[start:end].strip() + + +def _dedup_by_code( + candidates: list[DPCandidate], + section_priority: list[str], +) -> list[DPCandidate]: + """Déduplique par code CIM-10, garde la section la plus forte.""" + priority_map = {s: i for i, s in enumerate(section_priority)} + # Ajouter edsnlp et regex en bas de priorité + priority_map.setdefault("edsnlp", len(section_priority)) + priority_map.setdefault("regex", len(section_priority) + 1) + + seen: dict[str, DPCandidate] = {} + for c in candidates: + key = c.code or c.label.lower() + if key not in seen: + seen[key] = c + else: + existing = seen[key] + existing_prio = priority_map.get(existing.source_section, 99) + new_prio = priority_map.get(c.source_section, 99) + if new_prio < existing_prio: + seen[key] = c + + return list(seen.values()) + + +# --------------------------------------------------------------------------- +# 2. Scoring des candidats +# --------------------------------------------------------------------------- + +def score_candidates( + candidates: list[DPCandidate], + dossier: DossierMedical, + full_text: str = "", +) -> list[DPCandidate]: + """Applique le scoring déterministe sur chaque candidat. + + Args: + full_text: Texte complet du document pour la détection négation/conditionnel. + """ + for c in candidates: + details: dict[str, int] = {} + + # 1. Bonus section + section_key = f"section_{c.source_section}" + section_bonus = DP_SCORING_WEIGHTS.get(section_key, 0) + if section_bonus: + details["section"] = section_bonus + + # 2. Bonus preuve (excerpt + page) + if c.source_excerpt: + proof = DP_SCORING_WEIGHTS.get("proof_excerpt", 0) + if proof: + details["proof_excerpt"] = proof + + # 3. Pénalité négation (fenêtre étroite AVANT le label) + if full_text and c.label: + prefix = _get_prefix_window(full_text, c.label, chars_before=60) + if prefix and _NEGATION_PATTERNS.search(prefix): + c.is_negated = True + penalty = DP_SCORING_WEIGHTS.get("negation", 0) + if penalty: + details["negation"] = penalty + + # 4. Pénalité conditionnel (fenêtre étroite AVANT + APRÈS le label) + if full_text and c.label: + window = _get_context_window(full_text, c.label, radius=80) + if window and _CONDITIONAL_PATTERNS.search(window): + c.is_conditional = True + penalty = DP_SCORING_WEIGHTS.get("conditional", 0) + if penalty: + details["conditional"] = penalty + + # 5. Pénalité Z-code en DP + if c.code and c.code.startswith("Z"): + if not _is_z_code_whitelisted(c.code): + penalty = DP_SCORING_WEIGHTS.get("z_code_dp", 0) + if penalty: + details["z_code_dp"] = penalty + + # 6. Pénalité R-code (symptôme) en DP + if c.code and c.code.startswith("R"): + penalty = DP_SCORING_WEIGHTS.get("r_code_dp", 0) + if penalty: + details["r_code_dp"] = penalty + + # 7. Pénalité comorbidité banale (toutes sections) + if c.code and _is_comorbidity_code(c.code): + penalty = DP_SCORING_WEIGHTS.get("comorbidity_weak", 0) + if penalty: + details["comorbidity_weak"] = penalty + # Exception : preuve explicite de PEC principale → compense + if full_text and _has_explicit_pec_proof(c.label, full_text): + details["comorbidity_pec_proof"] = abs(penalty) if penalty else 3 + + c.score_details = details + c.score = sum(details.values()) + + # Trier par score décroissant + candidates.sort(key=lambda c: -c.score) + return candidates + + +def _get_prefix_window(text: str, label: str, chars_before: int = 60) -> str: + """Retourne les N caractères AVANT la première occurrence du label. + + Sert à détecter les négations qui précèdent directement le diagnostic + ("pas de pancréatite" vs "pancréatite ... pas de complication"). + """ + text_lower = text.lower() + label_lower = label.lower() + pos = text_lower.find(label_lower) + if pos < 0: + text_norm = normalize_text(text) + label_norm = normalize_text(label) + pos = text_norm.find(label_norm) + if pos < 0: + return "" + start = max(0, pos - chars_before) + return text_norm[start:pos] + start = max(0, pos - chars_before) + return text_lower[start:pos] + + +def _get_context_window(text: str, label: str, radius: int = 200) -> str: + """Retourne une fenêtre de texte autour de la première occurrence du label.""" + text_lower = text.lower() + label_lower = label.lower() + pos = text_lower.find(label_lower) + if pos < 0: + # Essayer avec le texte normalisé + text_norm = normalize_text(text) + label_norm = normalize_text(label) + pos = text_norm.find(label_norm) + if pos < 0: + return "" + start = max(0, pos - radius) + end = min(len(text_norm), pos + len(label_norm) + radius) + return text_norm[start:end] + start = max(0, pos - radius) + end = min(len(text), pos + len(label) + radius) + return text[start:end].lower() + + +def _is_z_code_whitelisted(code: str) -> bool: + """Vérifie si un Z-code est dans la whitelist (match préfixe).""" + for prefix in DP_Z_CODE_WHITELIST: + if code.startswith(prefix): + return True + return False + + +# --------------------------------------------------------------------------- +# 3. Sélection du DP +# --------------------------------------------------------------------------- + +def select_dp( + candidates: list[DPCandidate], + dossier: DossierMedical, + use_llm: bool = False, +) -> DPSelection: + """Sélectionne le DP parmi les candidats scorés. + + Retourne verdict="confirmed" si le delta est suffisant, + "review" si ambiguïté. + """ + if not candidates: + return DPSelection(verdict="review", winner_reason="aucun candidat DP trouvé") + + # Anti-comorbidité universelle : comorbidité banale en DP → REVIEW + # sauf preuve explicite de PEC principale (hospitalisé pour, prise en charge de) + top = candidates[0] + if top.code and _is_comorbidity_code(top.code): + has_pec = "comorbidity_pec_proof" in top.score_details + if not has_pec: + logger.info( + "Comorbidité-banale DP : %s (%s, section=%s) → REVIEW + fallback LLM", + top.code, top.label, top.source_section, + ) + return DPSelection( + verdict="review", + candidates=candidates[:3], + winner_reason=f"comorbidité banale {top.code} sans preuve PEC ({top.source_section})", + ) + + if len(candidates) == 1: + return DPSelection( + verdict="confirmed", + candidates=candidates, + winner_reason="candidat unique", + ) + + top1 = candidates[0] + top2 = candidates[1] + delta = top1.score - top2.score + + if delta >= DP_REVIEW_THRESHOLD: + return DPSelection( + verdict="confirmed", + candidates=candidates, + winner_reason=f"score {top1.score} vs {top2.score} (delta {delta})", + ) + + # Delta trop faible — tenter tiebreaker LLM si autorisé + if use_llm and top1.score == top2.score: + tiebreak = _llm_tiebreak(top1, top2, dossier) + if tiebreak and tiebreak.get("winner") in ("A", "B"): + if tiebreak["winner"] == "B": + # Swap pour que le gagnant soit en premier + candidates[0], candidates[1] = candidates[1], candidates[0] + return DPSelection( + verdict="confirmed", + candidates=candidates, + winner_reason=f"LLM tiebreak: {tiebreak.get('reason', '')}", + llm_tiebreak=tiebreak, + ) + + return DPSelection( + verdict="review", + candidates=candidates[:3], + winner_reason=f"delta insuffisant: {top1.score} vs {top2.score} (delta {delta} < seuil {DP_REVIEW_THRESHOLD})", + ) + + +# --------------------------------------------------------------------------- +# 4. Tiebreaker LLM (optionnel) +# --------------------------------------------------------------------------- + +def _llm_tiebreak( + candidate_a: DPCandidate, + candidate_b: DPCandidate, + dossier: DossierMedical, +) -> dict | None: + """Appelle le LLM pour départager deux candidats DP à scores identiques.""" + try: + from .ollama_client import call_ollama + from ..prompts import DP_TIEBREAK + except ImportError: + logger.warning("Module ollama_client non disponible pour le tiebreaker DP") + return None + + motif = "" + if dossier.sejour and dossier.sejour.mode_entree: + motif = dossier.sejour.mode_entree + + def _format_candidate(c: DPCandidate) -> str: + parts = [c.label] + if c.code: + parts.append(f"({c.code})") + parts.append(f"[section: {c.source_section}, score: {c.score}]") + if c.source_excerpt: + parts.append(f'extrait: "{c.source_excerpt[:150]}"') + return " — ".join(parts) + + candidat_a_str = _format_candidate(candidate_a) + candidat_b_str = _format_candidate(candidate_b) + + sections_fortes = "Non disponible" + + prompt = DP_TIEBREAK.format( + motif=motif or "Non renseigné", + candidat_a=candidat_a_str, + candidat_b=candidat_b_str, + sections_fortes=sections_fortes, + ) + + try: + result = call_ollama(prompt, temperature=0.0, max_tokens=500, role="coding") + except Exception: + logger.warning("Erreur LLM tiebreaker DP", exc_info=True) + return None + + if not result or not isinstance(result, dict): + return None + + winner = result.get("winner") + if winner not in ("A", "B"): + return None + + return {"winner": winner, "reason": result.get("reason", "")} + + +# --------------------------------------------------------------------------- +# 5. LLM Fallback one-shot — proposition DP quand le scoring déterministe échoue +# --------------------------------------------------------------------------- + + +def _build_strong_sections_text(parsed: dict) -> str: + """Construit le texte des sections fortes pour le prompt LLM one-shot. + + Sections fortes : motif, diag_sortie, diag_principal, diagnostics_retenus, + conclusion, synthese. PAS histoire_maladie ni examen_clinique (= bruit). + """ + sections = parsed.get("sections", {}) + _STRONG_ORDER = [ + ("motif_hospitalisation", 500), + ("diag_sortie", 600), ("diagnostics_retenus", 600), + ("diag_principal", 600), + ("conclusion", 600), ("synthese", 600), + ] + parts = [] + for key, max_len in _STRONG_ORDER: + val = sections.get(key, "") + if val: + parts.append(f"[{key}] {val[:max_len]}") + return "\n".join(parts) or "Aucune section forte" + + +def _build_motif(parsed: dict, dossier: DossierMedical) -> str: + """Extrait le motif d'hospitalisation pour le prompt LLM.""" + motif = "" + if dossier.sejour and dossier.sejour.mode_entree: + motif = dossier.sejour.mode_entree + if not motif: + motif = parsed.get("sections", {}).get("motif_hospitalisation", "")[:300] or "Non renseigné" + return motif + + +def _build_actes(dossier: DossierMedical) -> str: + """Construit la liste des actes pour le prompt LLM.""" + parts = [] + for a in dossier.actes_ccam[:5]: + label = a.texte + if a.code_ccam_suggestion: + label += f" ({a.code_ccam_suggestion})" + parts.append(label) + return ", ".join(parts) or "Non renseignés" + + +def _validate_and_normalize_code(dp_code: str, pool_codes: set[str] | None = None) -> tuple[str, str | None, bool]: + """Valide et normalise un code CIM-10. Retourne (code, original_si_normalisé, is_valid).""" + dp_code = normalize_code(dp_code) + dp_code_original = None + + # Si pool fourni, vérifier appartenance + if pool_codes is not None and dp_code in pool_codes: + return dp_code, None, True + + parent3 = dp_code[:3] + parent9 = f"{parent3}.9" + + # Tenter match pool par parent + if pool_codes is not None: + if parent3 in pool_codes: + return parent3, dp_code, True + if parent9 in pool_codes: + return parent9, dp_code, True + + # Validation CIM-10 directe + is_valid, _ = cim10_validate(dp_code) + if is_valid: + return dp_code, None, True + + # Tenter parent + is_valid_p, _ = cim10_validate(parent3) + if is_valid_p: + return parent3, dp_code, True + + is_valid_9, _ = cim10_validate(parent9) + if is_valid_9: + return parent9, dp_code, True + + return dp_code, None, False + + +def _apply_guardrails( + dp_code: str, + candidate: DPCandidate, + evidence_section: str, + evidence_excerpt: str, + confidence: str, +) -> DPSelection: + """Applique les garde-fous déterministes sur un candidat LLM. + + Retourne DPSelection avec verdict confirmed ou review. + """ + is_strong_section = evidence_section in _STRONG_SECTIONS + has_evidence = bool(evidence_excerpt and evidence_excerpt.strip()) + is_high_conf = confidence == "high" + + # Score synthétique + confidence_scores = {"high": 3, "medium": 2, "low": 1} + candidate.score = confidence_scores.get(confidence, 1) + candidate.score_details = {"llm_confidence": candidate.score} + + # GF-1 : evidence_excerpt vide → REVIEW + if not has_evidence: + logger.info("LLM fallback DP : pas d'extrait preuve pour %s, REVIEW", dp_code) + return DPSelection( + verdict="review", candidates=[candidate], + winner_reason="LLM fallback: evidence_excerpt vide", + ) + + # GF-2 : comorbidité banale ET section non-forte → REVIEW + if _is_comorbidity_code(dp_code) and not is_strong_section: + logger.info("LLM fallback DP : comorbidité %s hors section forte (%s), REVIEW", dp_code, evidence_section) + return DPSelection( + verdict="review", candidates=[candidate], + winner_reason=f"LLM fallback: comorbidité {dp_code} hors section forte", + ) + + # GF-3 : CONFIRMED uniquement si section forte + confidence high + if not is_strong_section or not is_high_conf: + reasons = [] + if not is_strong_section: + reasons.append(f"section faible ({evidence_section})") + if not is_high_conf: + reasons.append(f"confidence {confidence}") + reason_str = " + ".join(reasons) + logger.info("LLM fallback DP : %s pour %s, REVIEW", reason_str, dp_code) + return DPSelection( + verdict="review", candidates=[candidate], + winner_reason=f"LLM fallback: {dp_code} — {reason_str}", + ) + + # Toutes les conditions réunies → CONFIRMED + return DPSelection( + verdict="confirmed", candidates=[candidate], + winner_reason=f"LLM fallback: {dp_code} ({confidence}, {evidence_section})", + ) + + + +def llm_dp_fallback( + parsed: dict, + text: str, + dossier: DossierMedical, + dp_candidates: list[DPCandidate] | None = None, + edsnlp_result=None, +) -> DPSelection: + """Appelle le LLM en one-shot pour identifier et coder le DP. + + Le LLM voit directement les sections fortes du CRH et doit fournir + en un seul appel : dp_code, dp_label, evidence_section, evidence_excerpt, confidence. + + Ne doit être appelé que si use_llm=True ET verdict="review". + """ + try: + from .ollama_client import call_ollama + from ..prompts import DP_LLM_ONESHOT + except ImportError: + logger.warning("Module ollama_client non disponible pour le fallback DP LLM") + return DPSelection(verdict="review", winner_reason="LLM non disponible") + + # Contexte + motif = _build_motif(parsed, dossier) + sections_fortes = _build_strong_sections_text(parsed) + actes = _build_actes(dossier) + + prompt = DP_LLM_ONESHOT.format( + motif=motif, sections_fortes=sections_fortes, actes=actes, + ) + + try: + result = call_ollama(prompt, temperature=0.0, max_tokens=800, role="coding") + except Exception: + logger.warning("Erreur LLM fallback DP", exc_info=True) + return DPSelection(verdict="review", winner_reason="erreur LLM fallback DP") + + if not result or not isinstance(result, dict): + return DPSelection(verdict="review", winner_reason="réponse LLM invalide") + + dp_code = result.get("dp_code", "") + dp_label = result.get("dp_label", "") + confidence = result.get("confidence", "low") + evidence_section_raw = result.get("evidence_section", "") + evidence_excerpt = result.get("evidence_excerpt", "") + + # Normaliser la section + evidence_section = _normalize_evidence_section(evidence_section_raw) + + logger.info( + "LLM oneshot: code=%s label='%s' section=%s confidence=%s", + dp_code, dp_label[:60], evidence_section, confidence, + ) + + if not dp_code: + return DPSelection( + verdict="review", + winner_reason="LLM: aucun code DP proposé", + ) + + # Validation et normalisation du code CIM-10 + dp_code, dp_code_original, is_valid = _validate_and_normalize_code(dp_code) + if not is_valid: + return DPSelection( + verdict="review", + winner_reason=f"code invalide {dp_code}", + ) + if dp_code_original: + logger.info("LLM oneshot: normalisation %s → %s", dp_code_original, dp_code) + + # Résoudre le label final + _, dict_label = cim10_validate(dp_code) + + # Construire le candidat + source_tag = f"llm_oneshot ({evidence_section})" if evidence_section else "llm_oneshot" + + candidate = DPCandidate( + code=dp_code, + label=dp_label or dict_label or "", + source_section=source_tag, + source_excerpt=evidence_excerpt, + confidence_raw=confidence, + dp_code_original_llm=dp_code_original, + dp_code_normalized=dp_code_original is not None, + ) + + # Appliquer les garde-fous déterministes + return _apply_guardrails(dp_code, candidate, evidence_section, evidence_excerpt, confidence) diff --git a/src/medical/fusion.py b/src/medical/fusion.py index 3b727cf..adffffe 100644 --- a/src/medical/fusion.py +++ b/src/medical/fusion.py @@ -188,6 +188,17 @@ def merge_dossiers(dossiers: list[DossierMedical]) -> DossierMedical: # Diagnostic principal : le plus spécifique merged.diagnostic_principal = _prefer_most_specific_dp(dossiers) + # Propager dp_selection depuis le dossier source du DP retenu + if merged.diagnostic_principal: + for d in dossiers: + if ( + d.diagnostic_principal + and d.diagnostic_principal.cim10_suggestion == merged.diagnostic_principal.cim10_suggestion + and d.dp_selection is not None + ): + merged.dp_selection = d.dp_selection + break + # Collecter tous les DAS + DP non retenus comme DAS all_das: list[Diagnostic] = [] for d in dossiers: diff --git a/src/prompts/__init__.py b/src/prompts/__init__.py index 92501ca..ced4388 100644 --- a/src/prompts/__init__.py +++ b/src/prompts/__init__.py @@ -7,6 +7,8 @@ from .templates import ( QC_VALIDATION, CPAM_EXTRACTION, CPAM_ARGUMENTATION, + DP_TIEBREAK, + DP_LLM_ONESHOT, CPAM_ADVERSARIAL, ) @@ -17,5 +19,7 @@ __all__ = [ "QC_VALIDATION", "CPAM_EXTRACTION", "CPAM_ARGUMENTATION", + "DP_TIEBREAK", + "DP_LLM_ONESHOT", "CPAM_ADVERSARIAL", ] diff --git a/src/prompts/templates.py b/src/prompts/templates.py index 220f8fd..aaa112b 100644 --- a/src/prompts/templates.py +++ b/src/prompts/templates.py @@ -300,7 +300,79 @@ Réponds UNIQUEMENT avec un objet JSON au format suivant : }}""" # --------------------------------------------------------------------------- -# 7. CPAM passe 3 — validation adversariale (relecture critique) +# 7. DP Tiebreaker — départage entre deux candidats DP à scores proches +# --------------------------------------------------------------------------- +# Rôle : coding | Temperature : 0.0 | Max tokens : 500 +# Fichier d'origine : src/medical/dp_scoring.py → _llm_tiebreak() +# Variables : motif, candidat_a, candidat_b, sections_fortes + +DP_TIEBREAK = """\ +Tu es un médecin DIM expert. Deux diagnostics sont candidats au poste de Diagnostic Principal (DP). +Le DP doit refléter le motif principal de prise en charge qui a mobilisé le plus de ressources pendant le séjour. + +MOTIF D'HOSPITALISATION : {motif} + +CANDIDAT A : {candidat_a} +CANDIDAT B : {candidat_b} + +SECTIONS DU CRH À FORT SIGNAL : +{sections_fortes} + +Choisis le candidat le plus approprié comme DP selon les critères ATIH : +1. Motif principal de prise en charge du séjour +2. Ressources mobilisées (actes, biologie, traitement) +3. Spécificité du code CIM-10 (préférer le plus spécifique) + +Réponds UNIQUEMENT en JSON : +{{ + "winner": "A" ou "B", + "reason": "explication courte en français" +}}""" + +# --------------------------------------------------------------------------- +# 7b. DP LLM One-shot — identification + codage CIM-10 du DP en un appel +# --------------------------------------------------------------------------- +# Rôle : coding | Temperature : 0.0 | Max tokens : 800 +# Fichier d'origine : src/medical/dp_scoring.py → llm_dp_fallback() +# Variables : motif, sections_fortes, actes + +DP_LLM_ONESHOT = """\ +Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI. +Identifie le Diagnostic Principal (DP) et code-le en CIM-10 avec le code le plus SPÉCIFIQUE (4e ou 5e caractère). + +DÉFINITION DU DP (Guide méthodologique ATIH) : +Le DP est le diagnostic qui a mobilisé l'essentiel des ressources du séjour. C'est la pathologie ACTIVE, TRAITÉE, RETENUE en fin de séjour — pas le symptôme d'entrée si un diagnostic étiologique a été posé. + +CE QUE TU NE CHERCHES PAS : +- Les comorbidités chroniques de fond (hypertension, obésité, diabète équilibré, dyslipidémie, anémie chronique) SAUF si elles sont DÉCOMPENSÉES et constituent le motif d'hospitalisation +- Les antécédents stables non traités activement pendant ce séjour +- Les facteurs de risque (tabac, alcool, sédentarité) + +MÉTHODE : +1. Lis le motif d'hospitalisation → pourquoi le patient est arrivé +2. Lis la conclusion/synthèse → quel diagnostic a été retenu après le séjour +3. Identifie la pathologie ACTIVE traitée, puis code-la en CIM-10 +4. Préfère le code le plus spécifique (ex: K85.1 > K85.9 > K85) +5. Cite la SECTION et l'EXTRAIT exact qui prouvent ton choix + +MOTIF D'HOSPITALISATION : {motif} + +SECTIONS CLINIQUES (fortes uniquement) : +{sections_fortes} + +ACTES RÉALISÉS : {actes} + +Réponds UNIQUEMENT en JSON : +{{ + "dp_code": "X00.0", + "dp_label": "libellé officiel CIM-10 en français", + "evidence_section": "nom exact de la section source", + "evidence_excerpt": "extrait EXACT copié du texte (2-3 lignes max)", + "confidence": "high ou medium ou low" +}}""" + +# --------------------------------------------------------------------------- +# 8. CPAM passe 3 — validation adversariale (relecture critique) # --------------------------------------------------------------------------- # Rôle : validation | Temperature : 0.0 | Max tokens : 800 # Fichier d'origine : src/control/cpam_response.py → _validate_adversarial() diff --git a/tests/test_cpam_parser.py b/tests/test_cpam_parser.py index 6cd7483..56c052e 100644 --- a/tests/test_cpam_parser.py +++ b/tests/test_cpam_parser.py @@ -9,13 +9,32 @@ import pytest from src.config import ControleCPAM from src.control.cpam_parser import match_dossier_ogc, parse_cpam_excel +# En-têtes +_LEGACY_HEADER = ("N° OGC", "Titre", "Arg_UCR", "Décision_UCR", "DP_UCR", "DA_UCR", "DR_UCR", "Actes_UCR") +_NEW_HEADER = ( + "N° OGC", "Type désaccord", "Codes Établissement", "Libellé Établissement", + "Codes Contrôleurs", "Libellé Contrôleurs", "Décision UCR", "Codes retenus", + "GHM / GHS", "Texte décision", +) + def _create_test_xlsx(rows: list[tuple], path: Path) -> None: - """Crée un fichier xlsx de test avec les lignes données.""" + """Crée un fichier xlsx de test au format legacy.""" wb = openpyxl.Workbook() ws = wb.active ws.title = "OGC Contrôle T2A" - ws.append(("N° OGC", "Titre", "Arg_UCR", "Décision_UCR", "DP_UCR", "DA_UCR", "DR_UCR", "Actes_UCR")) + ws.append(_LEGACY_HEADER) + for row in rows: + ws.append(row) + wb.save(path) + + +def _create_new_format_xlsx(rows: list[tuple], path: Path) -> None: + """Crée un fichier xlsx de test au format ucr_extract (nouveau).""" + wb = openpyxl.Workbook() + ws = wb.active + ws.title = "UCR Extract" + ws.append(_NEW_HEADER) for row in rows: ws.append(row) wb.save(path) @@ -128,3 +147,292 @@ class TestControleCPAMModel: assert ctrl.numero_ogc == 21 assert ctrl.contre_argumentation == "Ma réponse" assert ctrl.sources_reponse == [] + + def test_new_fields_defaults(self): + """Les 6 nouveaux champs ucr_extract sont None par défaut.""" + ctrl = ControleCPAM(numero_ogc=1) + assert ctrl.codes_etablissement is None + assert ctrl.libelle_etablissement is None + assert ctrl.codes_controleurs is None + assert ctrl.libelle_controleurs is None + assert ctrl.codes_retenus is None + assert ctrl.ghm_ghs is None + + def test_new_fields_serialization(self): + """Les champs ucr_extract apparaissent dans model_dump.""" + ctrl = ControleCPAM( + numero_ogc=10, + titre="Désaccord sur le DP", + codes_etablissement="K85.1", + libelle_etablissement="Pancréatite aiguë biliaire", + codes_controleurs="K85.9", + libelle_controleurs="Pancréatite aiguë, sans précision", + codes_retenus="K85.1", + ghm_ghs="06M091 / 1854", + ) + data = ctrl.model_dump() + assert data["codes_etablissement"] == "K85.1" + assert data["libelle_etablissement"] == "Pancréatite aiguë biliaire" + assert data["codes_controleurs"] == "K85.9" + assert data["libelle_controleurs"] == "Pancréatite aiguë, sans précision" + assert data["codes_retenus"] == "K85.1" + assert data["ghm_ghs"] == "06M091 / 1854" + + +class TestParseNewFormat: + """Tests pour le format ucr_extract (nouveau).""" + + def test_parse_basic_dp(self, tmp_path): + """Parsing basique — désaccord DP avec Codes Contrôleurs.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + # N° OGC, Type, Codes Étab, Lib Étab, Codes Ctrl, Lib Ctrl, Décision, Codes ret, GHM, Texte + (17, "DP", "K85.1", "Pancréatite aiguë biliaire", "K85.9", + "Pancréatite aiguë SAI", "Défavorable", "K85.9", "06M091 / 1854", + "Le contrôleur ne retient pas K85.1"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + assert 17 in result + ctrl = result[17][0] + assert ctrl.numero_ogc == 17 + assert ctrl.titre == "Désaccord sur le DP" + assert ctrl.dp_ucr == "K85.9" + assert ctrl.da_ucr is None + assert ctrl.arg_ucr == "Le contrôleur ne retient pas K85.1" + assert ctrl.decision_ucr == "UCR confirme avis médecins contrôleurs" + + def test_parse_basic_das(self, tmp_path): + """Parsing — désaccord DAS.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (21, "DAS", "E11.40,G63.2", "Diabète+neuropathie", "E11.40", + "Diabète type 2", "Favorable", "E11.40,G63.2", None, + "L'UCR retient les codes initiaux"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + ctrl = result[21][0] + assert ctrl.titre == "Désaccord sur les DAS" + assert ctrl.dp_ucr is None + assert ctrl.da_ucr == "E11.40" + assert ctrl.decision_ucr == "UCR retient" + + def test_parse_dp_plus_das(self, tmp_path): + """DP+DAS : premier code → dp_ucr, reste → da_ucr.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (30, "DP+DAS", "K85.1,E11.40", "...", "K85.9,G63.2,I10", + "...", "Défavorable", "K85.9,G63.2,I10", None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + ctrl = result[30][0] + assert ctrl.titre == "Désaccord sur le DP et les DAS" + assert ctrl.dp_ucr == "K85.9" + assert ctrl.da_ucr == "G63.2,I10" + + def test_parse_dp_plus_das_single_code(self, tmp_path): + """DP+DAS avec un seul code → tout en dp_ucr, pas de da_ucr.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (31, "DP+DAS", "K85.1", "...", "K85.9", + "...", "Favorable", None, None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + ctrl = result[31][0] + assert ctrl.dp_ucr == "K85.9" + assert ctrl.da_ucr is None + + def test_new_fields_populated(self, tmp_path): + """Les 6 champs enrichis sont bien remplis depuis les colonnes.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (42, "DP", "E11.40", "Diabète type 2 avec complications", + "E11.9", "Diabète type 2 sans complication", + "Défavorable", "E11.9", "05M092 / 1780", "Argumentation contrôleur"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + ctrl = result[42][0] + assert ctrl.codes_etablissement == "E11.40" + assert ctrl.libelle_etablissement == "Diabète type 2 avec complications" + assert ctrl.codes_controleurs == "E11.9" + assert ctrl.libelle_controleurs == "Diabète type 2 sans complication" + assert ctrl.codes_retenus == "E11.9" + assert ctrl.ghm_ghs == "05M092 / 1780" + + def test_decision_favorable(self, tmp_path): + """Favorable → 'UCR retient'.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (10, "DP", None, None, None, None, "Favorable", None, None, "OK"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[10][0].decision_ucr == "UCR retient" + + def test_decision_defavorable(self, tmp_path): + """Défavorable → 'UCR confirme avis médecins contrôleurs'.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (11, "DAS", None, None, None, None, "Défavorable", None, None, "KO"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[11][0].decision_ucr == "UCR confirme avis médecins contrôleurs" + + def test_decision_defavorable_no_accent(self, tmp_path): + """Defavorable (sans accent) → même mapping.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (12, "DP", None, None, None, None, "Defavorable", None, None, "KO"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[12][0].decision_ucr == "UCR confirme avis médecins contrôleurs" + + def test_decision_unknown_passthrough(self, tmp_path): + """Décision inconnue → passée telle quelle.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (13, "DP", None, None, None, None, "Partielle", None, None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[13][0].decision_ucr == "Partielle" + + def test_type_desaccord_unknown(self, tmp_path): + """Type désaccord inconnu → titre 'Désaccord : XXX'.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (14, "Actes", None, None, None, None, "Favorable", None, None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[14][0].titre == "Désaccord : Actes" + + def test_type_desaccord_empty(self, tmp_path): + """Type désaccord vide → titre vide.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (15, "", None, None, None, None, "Favorable", None, None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result[15][0].titre == "" + + def test_multiple_ogc_new_format(self, tmp_path): + """Plusieurs OGC dans le nouveau format.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (10, "DP", None, None, "K85.9", None, "Favorable", None, None, "Arg 1"), + (20, "DAS", None, None, "E11.40", None, "Défavorable", None, None, "Arg 2"), + (10, "DAS", None, None, "G63.2", None, "Favorable", None, None, "Arg 3"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + + assert len(result) == 2 + assert len(result[10]) == 2 + assert len(result[20]) == 1 + assert result[10][0].dp_ucr == "K85.9" + assert result[10][1].da_ucr == "G63.2" + + def test_empty_new_format(self, tmp_path): + """Fichier nouveau format vide (seulement en-têtes).""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([], xlsx) + + result = parse_cpam_excel(xlsx) + assert result == {} + + def test_ogc_none_skipped(self, tmp_path): + """Lignes avec N° OGC None sont ignorées.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (None, "DP", None, None, None, None, "Favorable", None, None, "Texte"), + (10, "DP", None, None, "K85.1", None, "Favorable", None, None, "OK"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert len(result) == 1 + assert 10 in result + + def test_ogc_invalid_skipped(self, tmp_path): + """N° OGC non-numérique est ignoré.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + ("ABC", "DP", None, None, None, None, "Favorable", None, None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert result == {} + + +class TestAutoDetection: + """Tests pour l'auto-détection du format.""" + + def test_detects_legacy(self, tmp_path): + """Format legacy détecté par ses en-têtes.""" + xlsx = tmp_path / "legacy.xlsx" + _create_test_xlsx([ + (17, "Titre", "Arg", "Décision", None, None, None, None), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert 17 in result + assert result[17][0].titre == "Titre" + + def test_detects_new(self, tmp_path): + """Format nouveau détecté par ses en-têtes.""" + xlsx = tmp_path / "new.xlsx" + _create_new_format_xlsx([ + (17, "DP", "K85.1", "Label", "K85.9", "Label2", + "Favorable", "K85.1", None, "Texte"), + ], xlsx) + + result = parse_cpam_excel(xlsx) + assert 17 in result + assert result[17][0].titre == "Désaccord sur le DP" + + def test_unknown_format_returns_empty(self, tmp_path): + """En-têtes non reconnues → dict vide.""" + xlsx = tmp_path / "unknown.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + ws.append(("Col1", "Col2", "Col3")) + ws.append((1, "val", "val")) + wb.save(xlsx) + + result = parse_cpam_excel(xlsx) + assert result == {} + + def test_new_format_priority_over_legacy(self, tmp_path): + """Si les deux jeux de colonnes sont présents, le nouveau format prime.""" + xlsx = tmp_path / "both.xlsx" + wb = openpyxl.Workbook() + ws = wb.active + # En-têtes contenant les deux formats + ws.append(( + "N° OGC", "Titre", "Arg_UCR", "Décision_UCR", + "Type désaccord", "Décision UCR", "Texte décision", + "DP_UCR", "DA_UCR", "DR_UCR", "Actes_UCR", + )) + ws.append((17, "Titre", "Arg", "Déc legacy", "DP", "Favorable", "Texte nouveau", + "K85.1", None, None, None)) + wb.save(xlsx) + + result = parse_cpam_excel(xlsx) + + assert 17 in result + # Le nouveau format est prioritaire → titre construit depuis Type désaccord + assert result[17][0].titre == "Désaccord sur le DP" + # arg_ucr vient de Texte décision (nouveau), pas de Arg_UCR (legacy) + assert result[17][0].arg_ucr == "Texte nouveau" diff --git a/tests/test_dp_scoring.py b/tests/test_dp_scoring.py new file mode 100644 index 0000000..0ff8849 --- /dev/null +++ b/tests/test_dp_scoring.py @@ -0,0 +1,710 @@ +"""Tests pour le module de scoring DP (Diagnostic Principal).""" + +import pytest + +from src.config import ( + DossierMedical, + Diagnostic, + DPCandidate, + DPSelection, + DP_SCORING_WEIGHTS, + DP_REVIEW_THRESHOLD, + Sejour, +) +from src.medical.dp_scoring import ( + build_dp_shortlist, + score_candidates, + select_dp, + _get_context_window, + _is_z_code_whitelisted, + _is_comorbidity_code, + _has_explicit_pec_proof, + _dedup_by_code, + _normalize_evidence_section, +) + + +# --- Helpers --- + +def _make_parsed(sections: dict | None = None, diagnostics: list | None = None) -> dict: + return { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": diagnostics or [], + "sections": sections or {}, + } + + +def _make_candidate( + code: str = "K85.1", + label: str = "Pancréatite aiguë biliaire", + source_section: str = "diag_sortie", + **kwargs, +) -> DPCandidate: + return DPCandidate(code=code, label=label, source_section=source_section, **kwargs) + + +# === Tests build_dp_shortlist === + +class TestBuildDPShortlist: + def test_from_diag_sortie_with_cim10_code(self): + parsed = _make_parsed(sections={ + "diag_sortie": "Pancréatite aiguë biliaire K85.1", + }) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", None, dossier) + codes = [c.code for c in candidates] + assert "K85.1" in codes + + def test_from_diag_principal_section(self): + parsed = _make_parsed(sections={ + "diag_principal": "Embolie pulmonaire I26.9", + }) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", None, dossier) + codes = [c.code for c in candidates] + assert "I26.9" in codes + + def test_from_conclusion_via_cim10_map(self): + parsed = _make_parsed(sections={ + "conclusion": "pancréatite aiguë biliaire, bonne évolution", + }) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", None, dossier) + codes = [c.code for c in candidates] + assert "K85.1" in codes + + def test_from_regex_fallback(self): + parsed = _make_parsed(sections={}) + text = "Au total : pancréatite aiguë biliaire.\nDevenir : retour." + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, text, None, dossier) + codes = [c.code for c in candidates] + assert "K85.1" in codes + + def test_from_edsnlp(self): + from src.medical.edsnlp_pipeline import EdsnlpResult, CIM10Entity + + parsed = _make_parsed(sections={}) + edsnlp = EdsnlpResult(cim10_entities=[ + CIM10Entity(texte="douleur abdominale", code="R10.4", negation=False), + ]) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", edsnlp, dossier) + codes = [c.code for c in candidates] + assert "R10.4" in codes + + def test_edsnlp_negated_excluded(self): + from src.medical.edsnlp_pipeline import EdsnlpResult, CIM10Entity + + parsed = _make_parsed(sections={}) + edsnlp = EdsnlpResult(cim10_entities=[ + CIM10Entity(texte="fièvre", code="R50.9", negation=True), + ]) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", edsnlp, dossier) + codes = [c.code for c in candidates] + assert "R50.9" not in codes + + def test_dedup_keeps_strongest_section(self): + """Si le même code vient de diag_sortie et conclusion, garder diag_sortie.""" + parsed = _make_parsed(sections={ + "diag_sortie": "Pancréatite K85.1", + "conclusion": "pancréatite K85.1 bonne évolution", + }) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "", None, dossier) + k85_candidates = [c for c in candidates if c.code == "K85.1"] + assert len(k85_candidates) == 1 + assert k85_candidates[0].source_section == "diag_sortie" + + def test_empty_sections_returns_empty(self): + parsed = _make_parsed(sections={}) + dossier = DossierMedical() + candidates = build_dp_shortlist(parsed, "Patient en bon état.", None, dossier) + assert candidates == [] + + +# === Tests score_candidates === + +class TestScoreCandidates: + def test_section_bonus_diag_sortie(self): + c = _make_candidate(source_section="diag_sortie") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("section") == DP_SCORING_WEIGHTS["section_diag_sortie"] + + def test_section_bonus_conclusion(self): + c = _make_candidate(source_section="conclusion") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("section") == DP_SCORING_WEIGHTS["section_conclusion"] + + def test_section_bonus_edsnlp(self): + c = _make_candidate(source_section="edsnlp") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("section") == DP_SCORING_WEIGHTS["section_edsnlp"] + + def test_proof_excerpt_bonus(self): + c = _make_candidate(source_excerpt="Pancréatite aiguë biliaire confirmée au scanner") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("proof_excerpt") == DP_SCORING_WEIGHTS["proof_excerpt"] + + def test_no_proof_bonus_without_excerpt(self): + c = _make_candidate(source_excerpt=None) + scored = score_candidates([c], DossierMedical()) + assert "proof_excerpt" not in scored[0].score_details + + def test_negation_penalty(self): + c = _make_candidate(label="Fièvre") + text = "Pas de fièvre constatée." + scored = score_candidates([c], DossierMedical(), full_text=text) + assert scored[0].is_negated is True + assert scored[0].score_details.get("negation") == DP_SCORING_WEIGHTS["negation"] + + def test_conditional_penalty(self): + c = _make_candidate(label="Embolie pulmonaire", code="I26.9") + text = "Embolie pulmonaire suspectée, à confirmer par angioscanner." + scored = score_candidates([c], DossierMedical(), full_text=text) + assert scored[0].is_conditional is True + assert scored[0].score_details.get("conditional") == DP_SCORING_WEIGHTS["conditional"] + + def test_z_code_penalty(self): + c = _make_candidate(code="Z76.0", label="Bilan de santé", source_section="conclusion") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("z_code_dp") == DP_SCORING_WEIGHTS["z_code_dp"] + + def test_z_code_whitelist_no_penalty(self): + c = _make_candidate(code="Z51.1", label="Chimiothérapie", source_section="conclusion") + scored = score_candidates([c], DossierMedical()) + assert "z_code_dp" not in scored[0].score_details + + def test_r_code_penalty(self): + c = _make_candidate(code="R10.4", label="Douleur abdominale", source_section="edsnlp") + scored = score_candidates([c], DossierMedical()) + assert scored[0].score_details.get("r_code_dp") == DP_SCORING_WEIGHTS["r_code_dp"] + + def test_sort_by_score_descending(self): + c1 = _make_candidate(code="K85.1", source_section="diag_sortie") + c2 = _make_candidate(code="R10.4", label="Douleur", source_section="edsnlp") + scored = score_candidates([c2, c1], DossierMedical()) + assert scored[0].code == "K85.1" # diag_sortie score > edsnlp + + def test_combined_scoring(self): + """Score = section bonus + proof - negation penalties.""" + c = _make_candidate( + code="K85.1", + source_section="diag_sortie", + source_excerpt="Pancréatite aiguë", + ) + scored = score_candidates([c], DossierMedical()) + expected = DP_SCORING_WEIGHTS["section_diag_sortie"] + DP_SCORING_WEIGHTS["proof_excerpt"] + assert scored[0].score == expected + + +# === Tests select_dp === + +class TestSelectDP: + def test_no_candidates_returns_review(self): + sel = select_dp([], DossierMedical()) + assert sel.verdict == "review" + + def test_single_candidate_confirmed(self): + c = _make_candidate() + c.score = 6 + sel = select_dp([c], DossierMedical()) + assert sel.verdict == "confirmed" + assert sel.winner_reason == "candidat unique" + + def test_clear_winner_confirmed(self): + c1 = _make_candidate(code="K85.1") + c1.score = 6 + c2 = _make_candidate(code="R10.4", label="Douleur", source_section="edsnlp") + c2.score = 1 + sel = select_dp([c1, c2], DossierMedical()) + assert sel.verdict == "confirmed" + assert "delta" in sel.winner_reason + + def test_close_scores_returns_review(self): + c1 = _make_candidate(code="K85.1") + c1.score = 3 + c2 = _make_candidate(code="K80.5", label="Lithiase", source_section="conclusion") + c2.score = 2 + sel = select_dp([c1, c2], DossierMedical()) + assert sel.verdict == "review" + + def test_review_returns_top3(self): + candidates = [ + _make_candidate(code=f"K8{i}.{i}", label=f"Diag {i}") + for i in range(5) + ] + for i, c in enumerate(candidates): + c.score = 5 - i + # delta between top1 and top2 = 1, < DP_REVIEW_THRESHOLD + sel = select_dp(candidates, DossierMedical()) + assert sel.verdict == "review" + assert len(sel.candidates) <= 3 + + +# === Tests utilitaires === + +class TestContextWindow: + def test_finds_label_in_text(self): + text = "Patient admis pour pancréatite aiguë biliaire confirmée." + window = _get_context_window(text, "pancréatite aiguë", radius=50) + assert "pancréatite" in window.lower() + + def test_returns_empty_when_not_found(self): + text = "Patient en bon état." + window = _get_context_window(text, "embolie pulmonaire") + assert window == "" + + +class TestZCodeWhitelist: + def test_z51_1_whitelisted(self): + assert _is_z_code_whitelisted("Z51.1") is True + + def test_z45_prefix_whitelisted(self): + assert _is_z_code_whitelisted("Z45.80") is True + + def test_z76_not_whitelisted(self): + assert _is_z_code_whitelisted("Z76.0") is False + + +class TestDedupByCode: + def test_dedup_same_code_keeps_strongest(self): + c1 = _make_candidate(code="K85.1", source_section="conclusion") + c2 = _make_candidate(code="K85.1", source_section="diag_sortie") + priority = ["diag_sortie", "diag_principal", "motif_hospitalisation", "conclusion", "synthese"] + result = _dedup_by_code([c1, c2], priority) + assert len(result) == 1 + assert result[0].source_section == "diag_sortie" + + def test_dedup_different_codes_kept(self): + c1 = _make_candidate(code="K85.1") + c2 = _make_candidate(code="K80.5", label="Lithiase") + priority = ["diag_sortie"] + result = _dedup_by_code([c1, c2], priority) + assert len(result) == 2 + + +# === Tests intégration légère === + +class TestDPScoringIntegration: + def test_crh_with_diag_sortie_section(self): + """Un CRH avec section 'Diagnostic de sortie' produit un dp_selection.""" + from src.medical.cim10_extractor import extract_medical_info + + parsed = { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": [], + "sections": { + "diag_sortie": "Pancréatite aiguë biliaire K85.1", + }, + } + text = "Diagnostic de sortie :\nPancréatite aiguë biliaire K85.1\n\nTraitement de sortie :\nParacétamol" + + dossier = extract_medical_info(parsed, text) + assert dossier.diagnostic_principal is not None + assert dossier.diagnostic_principal.cim10_suggestion == "K85.1" + assert dossier.dp_selection is not None + assert dossier.dp_selection.verdict == "confirmed" + + def test_llm_fallback_confirmed_high_strong_section(self): + """LLM one-shot CONFIRMED : high confidence + section forte.""" + from unittest.mock import patch + from src.medical.cim10_extractor import extract_medical_info + + parsed = { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": [], + "sections": { + "conclusion": "Pancréatite aiguë biliaire avec HTA connue.", + }, + } + text = "Conclusion : Pancréatite aiguë biliaire avec HTA connue." + + mock_result = { + "dp_code": "K85.1", + "dp_label": "Pancréatite aiguë biliaire", + "evidence_section": "conclusion", + "evidence_excerpt": "Pancréatite aiguë biliaire", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + dossier = extract_medical_info(parsed, text, use_rag=True) + + assert dossier.dp_selection is not None + assert dossier.dp_selection.verdict == "confirmed" + assert dossier.diagnostic_principal is not None + assert dossier.diagnostic_principal.cim10_suggestion == "K85.1" + + def test_llm_fallback_confirmed_conclusion_section(self): + """LLM one-shot CONFIRMED : conclusion est section forte.""" + from unittest.mock import patch + from src.medical.cim10_extractor import extract_medical_info + + parsed = { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": [], + "sections": {"conclusion": "Pneumopathie avec insuffisance rénale aiguë."}, + } + text = "Conclusion : Pneumopathie avec insuffisance rénale aiguë." + + mock_result = { + "dp_code": "J18.9", + "dp_label": "Pneumopathie, sans précision", + "evidence_section": "conclusion", + "evidence_excerpt": "Pneumopathie avec insuffisance rénale aiguë", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + dossier = extract_medical_info(parsed, text, use_rag=True) + + assert dossier.dp_selection is not None + assert dossier.dp_selection.verdict == "confirmed" + assert dossier.diagnostic_principal is not None + + def test_llm_fallback_review_weak_section(self): + """LLM one-shot REVIEW : evidence de histoire_maladie (section faible) → guardrail.""" + from unittest.mock import patch + from src.medical.dp_scoring import llm_dp_fallback + from src.config import DossierMedical, DPCandidate + + parsed = {"type": "crh", "sections": {"histoire_maladie": "Dyspnée aiguë."}} + text = "Histoire de la maladie : Dyspnée aiguë." + dossier = DossierMedical() + dp_candidates = [DPCandidate(code="R06.0", label="Dyspnée", source_section="edsnlp")] + + mock_result = { + "dp_code": "R06.0", + "dp_label": "Dyspnée", + "evidence_section": "histoire_maladie", + "evidence_excerpt": "Dyspnée aiguë", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + selection = llm_dp_fallback(parsed, text, dossier, dp_candidates=dp_candidates) + + assert selection.verdict == "review" + assert len(selection.candidates) >= 1 + + def test_llm_fallback_review_low_confidence(self): + """LLM one-shot REVIEW : confidence=medium → guardrail.""" + from unittest.mock import patch + from src.medical.dp_scoring import llm_dp_fallback + from src.config import DossierMedical, DPCandidate + + parsed = {"type": "crh", "sections": {"conclusion": "HTA connue, diabète équilibré."}} + text = "Conclusion : HTA connue, diabète équilibré." + dossier = DossierMedical() + dp_candidates = [DPCandidate(code="I10", label="HTA", source_section="edsnlp")] + + mock_result = { + "dp_code": "I10", + "dp_label": "Hypertension essentielle", + "evidence_section": "conclusion", + "evidence_excerpt": "HTA connue", + "confidence": "medium", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + selection = llm_dp_fallback(parsed, text, dossier, dp_candidates=dp_candidates) + + assert selection.verdict == "review" + assert "confidence medium" in selection.winner_reason + + def test_llm_fallback_guardrail_no_evidence(self): + """Garde-fou : LLM renvoie evidence vide → REVIEW.""" + from unittest.mock import patch + from src.medical.dp_scoring import llm_dp_fallback + from src.config import DossierMedical, DPCandidate + + parsed = {"type": "crh", "sections": {"conclusion": "Pancréatite."}} + text = "Conclusion : Pancréatite." + dossier = DossierMedical() + dp_candidates = [DPCandidate(code="K85.9", label="Pancréatite", source_section="edsnlp")] + + mock_result = { + "dp_code": "K85.9", + "dp_label": "Pancréatite aiguë", + "evidence_section": "conclusion", + "evidence_excerpt": "", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + selection = llm_dp_fallback(parsed, text, dossier, dp_candidates=dp_candidates) + + assert selection.verdict == "review" + + def test_llm_fallback_guardrail_comorbidity_weak_section(self): + """Garde-fou : HTA en section non-forte → REVIEW.""" + from unittest.mock import patch + from src.medical.dp_scoring import llm_dp_fallback + from src.config import DossierMedical, DPCandidate + + parsed = {"type": "crh", "sections": {"histoire_maladie": "Patient hypertendu."}} + text = "Histoire de la maladie : Patient hypertendu." + dossier = DossierMedical() + dp_candidates = [DPCandidate(code="I10", label="HTA", source_section="edsnlp")] + + mock_result = { + "dp_code": "I10", + "dp_label": "Hypertension essentielle", + "evidence_section": "histoire_maladie", + "evidence_excerpt": "Patient hypertendu", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + selection = llm_dp_fallback(parsed, text, dossier, dp_candidates=dp_candidates) + + assert selection.verdict == "review" + + def test_llm_fallback_comorbidity_in_strong_section(self): + """I10 en section forte + high confidence → CONFIRMED (garde-fou GF-2 ne bloque pas).""" + from unittest.mock import patch + from src.medical.dp_scoring import llm_dp_fallback + from src.config import DossierMedical, DPCandidate + + parsed = {"type": "crh", "sections": {"motif_hospitalisation": "HTA maligne."}} + text = "Motif d'hospitalisation : HTA maligne." + dossier = DossierMedical() + dp_candidates = [DPCandidate(code="I10", label="HTA", source_section="edsnlp")] + + mock_result = { + "dp_code": "I10", + "dp_label": "Hypertension essentielle", + "evidence_section": "motif_hospitalisation", + "evidence_excerpt": "HTA maligne", + "confidence": "high", + } + with patch("src.medical.ollama_client.call_ollama", return_value=mock_result): + selection = llm_dp_fallback(parsed, text, dossier, dp_candidates=dp_candidates) + + assert selection.verdict == "confirmed" + assert selection.candidates[0].code == "I10" + + def test_no_llm_fallback_without_use_rag(self): + """Sans use_rag, le fallback LLM ne se déclenche PAS.""" + from src.medical.cim10_extractor import extract_medical_info + + parsed = { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": [], + "sections": {"conclusion": "Bonne évolution."}, + } + text = "Conclusion : Bonne évolution." + + dossier = extract_medical_info(parsed, text, use_rag=False) + # Sans use_rag → pas de fallback LLM → verdict review + assert dossier.dp_selection is not None + assert dossier.dp_selection.verdict == "review" + + def test_trackare_dp_bypasses_scoring(self): + """Un Trackare avec DP codé ne déclenche PAS le scoring.""" + from src.medical.cim10_extractor import extract_medical_info + + parsed = { + "type": "trackare", + "patient": {"sexe": "F"}, + "sejour": {"date_entree": "01/01/2024", "date_sortie": "05/01/2024"}, + "diagnostics": [ + {"type": "Principal", "code_cim10": "K80.5", "libelle": "Calcul des canaux biliaires"}, + ], + } + text = "Calcul des canaux biliaires." + + dossier = extract_medical_info(parsed, text) + assert dossier.diagnostic_principal is not None + assert dossier.diagnostic_principal.cim10_suggestion == "K80.5" + assert dossier.dp_selection is None # Trackare DP, pas de scoring + + +# === Tests comorbidité-banale DP === + +class TestComorbidityGuard: + """Règle comorbidité-banale : I10/E66.x/E78.x/E11.x/D64.9 en DP → REVIEW + sauf preuve explicite de PEC principale.""" + + def test_is_comorbidity_expanded(self): + """La liste élargie couvre I10, E66.*, E78.*, E11.*, D64.9.""" + assert _is_comorbidity_code("I10") is True + assert _is_comorbidity_code("E66.0") is True + assert _is_comorbidity_code("E66.9") is True + assert _is_comorbidity_code("E78.0") is True + assert _is_comorbidity_code("E11.9") is True + assert _is_comorbidity_code("E11.0") is True + assert _is_comorbidity_code("D64.9") is True + # Pas comorbidité + assert _is_comorbidity_code("D64.0") is False + assert _is_comorbidity_code("E10.9") is False + assert _is_comorbidity_code("K85.1") is False + + def test_sole_comorbidity_review(self): + """Candidat unique comorbidité → REVIEW (même section forte).""" + c = _make_candidate(code="E66.0", label="Obésité", source_section="conclusion") + c.score = 4 + c.score_details = {"section": 2, "proof_excerpt": 2, "comorbidity_weak": -3} + sel = select_dp([c], DossierMedical()) + assert sel.verdict == "review" + assert "comorbidité banale" in sel.winner_reason + + def test_comorbidity_top1_multi_review(self): + """Comorbidité top1 parmi plusieurs → REVIEW.""" + c1 = _make_candidate(code="I10", label="Hta", source_section="motif_hospitalisation") + c1.score = 3 + c1.score_details = {"section": 3, "comorbidity_weak": -3} + c2 = _make_candidate(code="K85.1", label="Pancréatite", source_section="edsnlp") + c2.score = 1 + sel = select_dp([c1, c2], DossierMedical()) + assert sel.verdict == "review" + assert "comorbidité banale" in sel.winner_reason + + def test_comorbidity_with_pec_proof_confirmed(self): + """Comorbidité + preuve PEC → CONFIRMED.""" + c = _make_candidate(code="I10", label="Hta", source_section="motif_hospitalisation") + c.score = 3 + c.score_details = {"section": 3, "comorbidity_weak": -3, "comorbidity_pec_proof": 3} + sel = select_dp([c], DossierMedical()) + assert sel.verdict == "confirmed" + assert sel.winner_reason == "candidat unique" + + def test_non_comorbidity_sole_confirmed(self): + """Candidat unique non-comorbidité → CONFIRMED (pas affecté).""" + c = _make_candidate(code="K85.1", label="Pancréatite", source_section="conclusion") + c.score = 4 + sel = select_dp([c], DossierMedical()) + assert sel.verdict == "confirmed" + + def test_score_comorbidity_penalty_strong_section(self): + """Comorbidité pénalisée même en section forte (conclusion).""" + c = _make_candidate(code="E66.0", label="Obésité", source_section="conclusion") + scored = score_candidates([c], DossierMedical()) + assert "comorbidity_weak" in scored[0].score_details + assert scored[0].score_details["comorbidity_weak"] == DP_SCORING_WEIGHTS["comorbidity_weak"] + + def test_score_comorbidity_penalty_motif(self): + """Comorbidité pénalisée en motif_hospitalisation.""" + c = _make_candidate(code="I10", label="Hta", source_section="motif_hospitalisation") + scored = score_candidates([c], DossierMedical()) + assert "comorbidity_weak" in scored[0].score_details + + def test_pec_proof_detected(self): + """PEC proof détectée dans le texte → bonus dans score_details.""" + c = _make_candidate(code="I10", label="Hta", source_section="motif_hospitalisation") + text = "Patient hospitalisé pour hta maligne résistante au traitement." + scored = score_candidates([c], DossierMedical(), full_text=text) + assert "comorbidity_pec_proof" in scored[0].score_details + assert scored[0].score_details["comorbidity_pec_proof"] > 0 + + def test_pec_proof_not_found(self): + """Pas de PEC proof → pas de bonus.""" + c = _make_candidate(code="E66.0", label="Obésité", source_section="conclusion") + text = "Patient obèse, pneumopathie communautaire." + scored = score_candidates([c], DossierMedical(), full_text=text) + assert "comorbidity_pec_proof" not in scored[0].score_details + + def test_has_explicit_pec_proof_hospitalized(self): + """Détection 'hospitalisé pour' + label.""" + assert _has_explicit_pec_proof("hta", "Patient hospitalisé pour HTA maligne.") is True + + def test_has_explicit_pec_proof_prise_en_charge(self): + """Détection 'prise en charge' + label.""" + assert _has_explicit_pec_proof("obésité", "Prise en charge de l'obésité morbide.") is True + + def test_has_explicit_pec_proof_absent(self): + """Pas de PEC proof pour un label non mentionné.""" + assert _has_explicit_pec_proof("hta", "Patient admis pour douleur thoracique.") is False + + def test_has_explicit_pec_proof_admission(self): + """Détection 'admission pour' + label.""" + assert _has_explicit_pec_proof("diabète", "Admission pour diabète déséquilibré.") is True + + +class TestSectionNormalization: + """Tests pour _normalize_evidence_section — normalisation robuste.""" + + # --- Correspondances exactes existantes --- + + def test_exact_conclusion(self): + assert _normalize_evidence_section("conclusion") == "conclusion" + + def test_exact_synthese(self): + assert _normalize_evidence_section("synthèse") == "synthese" + + def test_exact_motif_hospitalisation(self): + assert _normalize_evidence_section("motif_hospitalisation") == "motif_hospitalisation" + + # --- Nouveaux alias exacts --- + + def test_synthese_du_sejour(self): + assert _normalize_evidence_section("synthèse du séjour") == "synthese" + + def test_synthese_du_sejour_ascii(self): + assert _normalize_evidence_section("synthese du sejour") == "synthese" + + def test_conclusions_pluriel(self): + assert _normalize_evidence_section("conclusions") == "conclusion" + + def test_secretariat_to_autres(self): + assert _normalize_evidence_section("secrétariat") == "autres" + + def test_medecine_interne_to_autres(self): + assert _normalize_evidence_section("médecine interne") == "autres" + + def test_sections_cliniques_to_autres(self): + assert _normalize_evidence_section("sections cliniques") == "autres" + + # --- Nettoyage crochets/guillemets --- + + def test_brackets_conclusion(self): + assert _normalize_evidence_section("[conclusion]") == "conclusion" + + def test_brackets_motif(self): + assert _normalize_evidence_section("[motif_hospitalisation]") == "motif_hospitalisation" + + def test_colon_conclusion(self): + assert _normalize_evidence_section("conclusion:") == "conclusion" + + def test_quotes_synthese(self): + assert _normalize_evidence_section('"synthèse"') == "synthese" + + # --- Fallback par mots-clés --- + + def test_keyword_conclusion_du_sejour(self): + assert _normalize_evidence_section("conclusion du séjour") == "conclusion" + + def test_keyword_synthese_medicale(self): + assert _normalize_evidence_section("synthèse médicale du dossier") == "synthese" + + def test_keyword_diagnostic_de_sortie_variant(self): + assert _normalize_evidence_section("diagnostic(s) de sortie") == "diag_sortie" + + def test_keyword_diagnostic_retenu_variant(self): + assert _normalize_evidence_section("diagnostics retenus à la sortie") == "diagnostics_retenus" + + def test_keyword_motif_admission(self): + assert _normalize_evidence_section("motif d'admission aux urgences") == "motif_hospitalisation" + + # --- Cas limites --- + + def test_empty_string(self): + assert _normalize_evidence_section("") == "" + + def test_none_like_empty(self): + assert _normalize_evidence_section(" ") == "" + + def test_unknown_section_passthrough(self): + """Section inconnue sans mot-clé → passthrough nettoyé.""" + result = _normalize_evidence_section("biologie") + assert result == "biologie" + + def test_sections_fortes_du_dossier(self): + """Alias administratif observé en benchmark.""" + assert _normalize_evidence_section("sections fortes du dossier") == "autres" diff --git a/tests/test_extraction.py b/tests/test_extraction.py index 25c3440..cb45cbe 100644 --- a/tests/test_extraction.py +++ b/tests/test_extraction.py @@ -109,6 +109,139 @@ de masse 34.370""" assert result["signes_vitaux"]["imc"] == 34.370 +class TestCRHParserDiagSections: + """Tests pour les nouvelles sections à fort signal DP.""" + + def test_parse_diag_sortie(self): + text = """Mon cher confrère, +Votre patient a été hospitalisé du 01/01/2024 au 05/01/2024. + +Diagnostic de sortie : +Pancréatite aiguë biliaire (K85.1) + +Traitement de sortie : +Paracétamol""" + result = parse_crh(text) + assert "diag_sortie" in result["sections"] + assert "K85.1" in result["sections"]["diag_sortie"] + + def test_parse_diagnostics_retenus(self): + text = """Conclusion : +Bonne évolution. + +Diagnostics retenus : +- Cholécystite aiguë lithiasique +- Lithiase vésiculaire + +Traitement de sortie : +Paracétamol""" + result = parse_crh(text) + assert "diag_sortie" in result["sections"] + assert "Cholécystite" in result["sections"]["diag_sortie"] + + def test_parse_diag_principal(self): + text = """Examen clinique : +Abdomen souple. + +Diagnostic principal : +Embolie pulmonaire segmentaire droite + +Diagnostics de sortie : +EP + TVP""" + result = parse_crh(text) + assert "diag_principal" in result["sections"] + assert "Embolie pulmonaire" in result["sections"]["diag_principal"] + + def test_parse_probleme_principal(self): + text = """Examen clinique : +Patient stable. + +Problème principal : +Insuffisance cardiaque décompensée + +Devenir : retour à domicile.""" + result = parse_crh(text) + assert "diag_principal" in result["sections"] + assert "Insuffisance cardiaque" in result["sections"]["diag_principal"] + + def test_parse_synthese(self): + text = """Examen clinique : +RAS. + +Synthèse : +Patient de 75 ans hospitalisé pour AVC ischémique sylvien droit. + +Traitement de sortie : +Aspirine""" + result = parse_crh(text) + assert "synthese" in result["sections"] + assert "AVC" in result["sections"]["synthese"] + + def test_existing_sections_preserved(self): + """Les 7 sections existantes sont toujours capturées.""" + text = """pour le motif suivant: +Pancréatite aiguë + +Antécédents : +HTA, diabète + +Histoire de la maladie +Douleur abdominale brutale + +Examen clinique +Abdomen défense en HCD + +Au total : +Pancréatite aiguë biliaire + +TTT de sortie : +Paracétamol + +Devenir : +Retour à domicile""" + result = parse_crh(text) + assert "motif_hospitalisation" in result["sections"] + assert "antecedents" in result["sections"] + assert "histoire_maladie" in result["sections"] + assert "examen_clinique" in result["sections"] + assert "conclusion" in result["sections"] + assert "traitement_sortie" in result["sections"] + assert "devenir" in result["sections"] + + def test_diag_sortie_multiline(self): + text = """Au total : +Bonne évolution. + +Diagnostic de sortie : +- Pancréatite aiguë biliaire K85.1 +- Lithiase vésiculaire K80.2 +- Obésité E66.0 + +Traitement de sortie : +Paracétamol""" + result = parse_crh(text) + assert "diag_sortie" in result["sections"] + section = result["sections"]["diag_sortie"] + assert "K85.1" in section + assert "K80.2" in section + assert "E66.0" in section + + def test_conclusion_does_not_overflow_into_diag_sortie(self): + text = """Au total : +Pancréatite aiguë biliaire, évolution favorable. + +Diagnostic de sortie : +Pancréatite aiguë biliaire K85.1 + +Traitement de sortie : +Paracétamol""" + result = parse_crh(text) + assert "conclusion" in result["sections"] + assert "diag_sortie" in result["sections"] + # La conclusion ne doit PAS contenir le texte de diag_sortie + assert "K85.1" not in result["sections"]["conclusion"] + + class TestCleanPersonName: def test_clean_simple(self): assert _clean_person_name("Sarah DUTREY") == "Sarah DUTREY" diff --git a/tests/test_medical.py b/tests/test_medical.py index e43d70e..3bca045 100644 --- a/tests/test_medical.py +++ b/tests/test_medical.py @@ -653,6 +653,38 @@ class TestBackwardCompatAntecedent: assert all(isinstance(c, Complication) for c in dossier.complications) +class TestDPSelectionIntegration: + """Tests d'intégration du scoring DP dans le pipeline d'extraction.""" + + def test_crh_dp_selection_populated(self): + """Un CRH sans DP Trackare déclenche le scoring et peuple dp_selection.""" + parsed = { + "type": "crh", + "patient": {"sexe": "M"}, + "sejour": {}, + "diagnostics": [], + } + text = "Pancréatite aiguë biliaire.\nTTT de sortie :\nParacétamol\n\nDevenir : retour." + dossier = extract_medical_info(parsed, text) + assert dossier.diagnostic_principal is not None + assert dossier.diagnostic_principal.cim10_suggestion == "K85.1" + assert dossier.dp_selection is not None + assert len(dossier.dp_selection.candidates) >= 1 + + def test_dp_selection_serialization(self): + """dp_selection est sérialisable en JSON via model_dump().""" + from src.config import DPCandidate, DPSelection + sel = DPSelection( + verdict="confirmed", + candidates=[DPCandidate(code="K85.1", label="Test", source_section="regex")], + winner_reason="candidat unique", + ) + data = sel.model_dump() + assert data["verdict"] == "confirmed" + assert len(data["candidates"]) == 1 + assert data["candidates"][0]["code"] == "K85.1" + + class TestSourceTrackingFields: """Tests que les champs source_page/source_excerpt existent sur les modèles."""