diff --git a/src/config.py b/src/config.py index bfbaa8e..4cca12c 100644 --- a/src/config.py +++ b/src/config.py @@ -74,12 +74,17 @@ class Diagnostic(BaseModel): justification: Optional[str] = None raisonnement: Optional[str] = None sources_rag: list[RAGSource] = Field(default_factory=list) + est_cma: Optional[bool] = None + est_cms: Optional[bool] = None + niveau_severite: Optional[str] = None # "leger" | "modere" | "severe" | "non_evalue" class ActeCCAM(BaseModel): texte: str code_ccam_suggestion: Optional[str] = None date: Optional[str] = None + validite: Optional[str] = None # "valide" | "obsolete" | "non_verifie" + alertes: list[str] = Field(default_factory=list) class Traitement(BaseModel): @@ -112,6 +117,7 @@ class DossierMedical(BaseModel): biologie_cle: list[BiologieCle] = Field(default_factory=list) imagerie: list[Imagerie] = Field(default_factory=list) complications: list[str] = Field(default_factory=list) + alertes_codage: list[str] = Field(default_factory=list) processing_time_s: float | None = None diff --git a/src/medical/cim10_extractor.py b/src/medical/cim10_extractor.py index 6478e82..f41d012 100644 --- a/src/medical/cim10_extractor.py +++ b/src/medical/cim10_extractor.py @@ -113,6 +113,12 @@ def extract_medical_info( if use_rag: _enrich_with_rag(dossier) + # Post-processing : exclusions symptôme vs diagnostic précis + _apply_exclusion_rules(dossier) + + # Post-processing : enrichissement sévérité (CMA/CMS heuristique) + _apply_severity_rules(dossier) + return dossier @@ -641,6 +647,34 @@ def _find_act_date(text: str, act_pattern: str) -> str | None: return None +def _apply_exclusion_rules(dossier: DossierMedical) -> None: + """Applique les règles d'exclusion symptôme vs diagnostic précis.""" + try: + from .exclusion_rules import check_exclusions + result = check_exclusions(dossier.diagnostic_principal, dossier.diagnostics_associes) + dossier.diagnostics_associes = result.cleaned_das + dossier.alertes_codage.extend(result.warnings) + if result.excluded: + logger.info( + " Exclusions : %d DAS symptomatiques exclus", + len(result.excluded), + ) + except Exception: + logger.warning("Erreur lors de l'application des règles d'exclusion", exc_info=True) + + +def _apply_severity_rules(dossier: DossierMedical) -> None: + """Enrichit les diagnostics avec les informations de sévérité heuristique.""" + try: + from .severity import enrich_dossier_severity + alertes = enrich_dossier_severity( + dossier.diagnostic_principal, dossier.diagnostics_associes, + ) + dossier.alertes_codage.extend(alertes) + except Exception: + logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True) + + def _lookup_cim10(text: str) -> str | None: """Cherche un code CIM-10 pour un texte donné. diff --git a/src/medical/exclusion_rules.py b/src/medical/exclusion_rules.py new file mode 100644 index 0000000..9b62901 --- /dev/null +++ b/src/medical/exclusion_rules.py @@ -0,0 +1,169 @@ +"""Règles d'exclusion diagnostique : symptôme (Chapitre XVIII) vs diagnostic précis. + +Lorsqu'un symptôme (R00-R99) et un diagnostic précis (Chapitres I-XIV, A00-N99) +coexistent et que le symptôme est expliqué par le diagnostic précis, le symptôme +ne doit PAS être codé comme DAS (règle ATIH de non-redondance). +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field + + +def is_symptom_code(code: str) -> bool: + """Vérifie si un code CIM-10 appartient au Chapitre XVIII (R00-R99 = Symptômes).""" + if not code: + return False + return bool(re.match(r"^R\d{2}", code, re.IGNORECASE)) + + +def is_precise_diagnosis(code: str) -> bool: + """Vérifie si un code CIM-10 appartient aux Chapitres I-XIV (A00-N99).""" + if not code: + return False + return bool(re.match(r"^[A-N]\d{2}", code, re.IGNORECASE)) + + +# Mapping R-code → set de codes précis qui excluent le symptôme. +# Chaque R-code est exclu si l'un des codes précis (ou un code commençant par +# l'une de ces racines) est présent parmi les diagnostics du séjour. +EXCLUSION_MAP: dict[str, set[str]] = { + # R10 — Douleur abdominale → exclu par pathologies digestives précises + "R10": {"K35", "K80", "K81", "K83", "K85", "K86", "K56", "K57", "K25", "K26", "K29"}, + "R10.1": {"K80", "K81", "K83"}, # Douleur hypochondre droit + "R10.3": {"K35", "K36", "K37"}, # Douleur hypogastre + "R10.4": {"K35", "K80", "K85", "K56", "K57"}, # Douleur abdominale autre/non précisée + + # R11 — Nausées et vomissements + "R11": {"K29", "K80", "K81", "K85", "K56", "K91"}, + + # R17 — Ictère → exclu par pathologies hépatobiliaires + "R17": {"K80", "K83", "K70", "K71", "K72", "K73", "K74", "B15", "B16", "B17", "B18", "B19", "C22"}, + + # R50 — Fièvre → exclu par infections précises + "R50": {"A41", "J18", "J15", "J13", "J14", "J06", "N10", "N39", "K81", "K83", + "L03", "T81", "A09", "A04"}, + "R50.9": {"A41", "J18", "J15", "J13", "J14", "N10", "N39", "K81"}, + + # R07 — Douleur thoracique → exclu par pathologies cardiaques/pulmonaires + "R07": {"I20", "I21", "I22", "I23", "I24", "I25", "I26", "J18", "J93"}, + "R07.4": {"I20", "I21", "I24", "I25"}, + + # R06 — Dyspnée → exclu par pathologies respiratoires/cardiaques + "R06": {"J18", "J44", "J45", "J96", "I50", "I26"}, + "R06.0": {"J18", "J44", "J45", "J96", "I50", "I26"}, + + # R31 — Hématurie → exclu par pathologies urologiques/rénales + "R31": {"N20", "N13", "C64", "C67", "N02", "N00", "N01"}, + + # R04 — Hémoptysie → exclu par pathologies pulmonaires + "R04": {"J18", "C34", "I26", "A16"}, + + # R63.4 — Perte de poids → exclu par tumeurs, infections chroniques + "R63.4": {"C15", "C16", "C18", "C19", "C20", "C22", "C25", "C34", "C50", + "A15", "A16", "B20", "B21", "B22", "B23", "B24", "E46"}, + + # R00 — Anomalies du rythme cardiaque → exclu par troubles du rythme précis + "R00": {"I47", "I48", "I49"}, + "R00.0": {"I47", "I48"}, # Tachycardie + "R00.1": {"I49.5", "I49.8"}, # Bradycardie +} + + +def _code_matches(code: str, roots: set[str]) -> bool: + """Vérifie si un code CIM-10 commence par l'une des racines données.""" + if not code: + return False + code_upper = code.upper() + for root in roots: + if code_upper.startswith(root.upper()): + return True + return False + + +@dataclass +class ExclusionResult: + """Résultat de l'application des règles d'exclusion.""" + cleaned_das: list # Diagnostics DAS conservés + excluded: list # Diagnostics DAS exclus + warnings: list[str] = field(default_factory=list) + + +def check_exclusions(dp, das_list: list) -> ExclusionResult: + """Applique les règles d'exclusion symptôme vs diagnostic précis. + + Args: + dp: Diagnostic principal (objet avec attribut cim10_suggestion). + das_list: Liste des diagnostics associés (même type). + + Returns: + ExclusionResult avec les DAS nettoyés, exclus, et les warnings. + """ + # Collecter tous les codes du séjour (DP + DAS) + all_codes: list[str] = [] + if dp and dp.cim10_suggestion: + all_codes.append(dp.cim10_suggestion) + for das in das_list: + if das.cim10_suggestion: + all_codes.append(das.cim10_suggestion) + + # Identifier les codes précis présents (Chapitres I-XIV) + precise_codes = [c for c in all_codes if is_precise_diagnosis(c)] + + cleaned = [] + excluded = [] + warnings = [] + + for das in das_list: + code = das.cim10_suggestion + if not code or not is_symptom_code(code): + # Non-symptôme : toujours conservé + cleaned.append(das) + continue + + # Vérifier si ce symptôme est exclu par un diagnostic précis + should_exclude = False + excluding_code = None + + # Chercher dans EXCLUSION_MAP : d'abord le code exact, puis la racine (3 chars) + exclusion_roots = EXCLUSION_MAP.get(code.upper()) + if exclusion_roots is None: + # Essayer la racine 3 caractères (ex: R10.4 → R10) + root3 = code.upper()[:3] + exclusion_roots = EXCLUSION_MAP.get(root3) + + if exclusion_roots: + for precise in precise_codes: + if _code_matches(precise, exclusion_roots): + should_exclude = True + excluding_code = precise + break + + if should_exclude: + excluded.append(das) + warnings.append( + f"DAS '{das.texte}' ({code}) exclu : symptôme redondant avec " + f"le diagnostic précis {excluding_code}" + ) + else: + cleaned.append(das) + + # Vérifier aussi si le DP est un symptôme avec un diagnostic précis en DAS + if dp and dp.cim10_suggestion and is_symptom_code(dp.cim10_suggestion): + dp_code = dp.cim10_suggestion + exclusion_roots = EXCLUSION_MAP.get(dp_code.upper()) + if exclusion_roots is None: + exclusion_roots = EXCLUSION_MAP.get(dp_code.upper()[:3]) + + if exclusion_roots: + for precise in precise_codes: + if _code_matches(precise, exclusion_roots): + warnings.append( + f"ALERTE DP : le DP '{dp.texte}' ({dp_code}) est un symptôme " + f"alors qu'un diagnostic précis {precise} est présent — " + f"vérifier si le DP devrait être changé" + ) + break + + return ExclusionResult(cleaned_das=cleaned, excluded=excluded, warnings=warnings) diff --git a/src/medical/rag_search.py b/src/medical/rag_search.py index 963f12c..b028fba 100644 --- a/src/medical/rag_search.py +++ b/src/medical/rag_search.py @@ -4,6 +4,7 @@ from __future__ import annotations import json import logging +import re from typing import Optional import requests @@ -15,6 +16,12 @@ logger = logging.getLogger(__name__) # Singleton pour le modèle d'embedding (chargé une seule fois) _embed_model = None +# Score minimum de similarité FAISS pour retenir un résultat +_MIN_SCORE = 0.3 + +# Marqueur de fin de raisonnement dans la réponse Ollama +_RESULT_MARKER = "###RESULT###" + def _get_embed_model(): """Charge le modèle d'embedding (singleton).""" @@ -27,7 +34,7 @@ def _get_embed_model(): return _embed_model -def search_similar(query: str, top_k: int = 5) -> list[dict]: +def search_similar(query: str, top_k: int = 10) -> list[dict]: """Recherche les passages les plus similaires dans l'index FAISS. Args: @@ -35,7 +42,8 @@ def search_similar(query: str, top_k: int = 5) -> list[dict]: top_k: Nombre de résultats à retourner. Returns: - Liste de dicts avec les métadonnées + score de similarité. + Liste de dicts avec les métadonnées + score de similarité, + filtrés par score minimum et priorisant les sources CIM-10. """ from .rag_index import get_index import numpy as np @@ -51,21 +59,93 @@ def search_similar(query: str, top_k: int = 5) -> list[dict]: query_vec = model.encode([query], normalize_embeddings=True) query_vec = np.array(query_vec, dtype=np.float32) - scores, indices = faiss_index.search(query_vec, min(top_k, faiss_index.ntotal)) + # Chercher plus de résultats que top_k pour pouvoir filtrer ensuite + fetch_k = min(top_k * 2, faiss_index.ntotal) + scores, indices = faiss_index.search(query_vec, fetch_k) - results = [] + raw_results = [] for score, idx in zip(scores[0], indices[0]): if idx < 0: continue + if float(score) < _MIN_SCORE: + continue meta = metadata[idx].copy() meta["score"] = float(score) - results.append(meta) + raw_results.append(meta) - return results + # Prioriser les sources CIM-10 (au moins 6 sur top_k) + cim10_results = [r for r in raw_results if r["document"] == "cim10"] + other_results = [r for r in raw_results if r["document"] != "cim10"] + + min_cim10 = min(6, len(cim10_results)) + final = cim10_results[:min_cim10] + remaining_slots = top_k - len(final) + # Remplir le reste avec les meilleurs résultats (CIM-10 restants + autres) + remaining = cim10_results[min_cim10:] + other_results + remaining.sort(key=lambda r: r["score"], reverse=True) + final.extend(remaining[:remaining_slots]) + + return final -def _build_prompt(texte: str, sources: list[dict], contexte: dict) -> str: - """Construit le prompt pour Ollama.""" +def _format_contexte(contexte: dict) -> str: + """Formate le contexte patient de manière structurée pour le prompt.""" + lines = [] + + sexe = contexte.get("sexe") + age = contexte.get("age") + imc = contexte.get("imc") + patient_parts = [] + if sexe: + patient_parts.append(sexe) + if age: + patient_parts.append(f"{age} ans") + if imc: + patient_parts.append(f"IMC {imc}") + if patient_parts: + lines.append(f"- Patient : {', '.join(patient_parts)}") + + duree = contexte.get("duree_sejour") + if duree: + lines.append(f"- Durée séjour : {duree} jours") + + antecedents = contexte.get("antecedents") + if antecedents: + lines.append(f"- Antécédents : {', '.join(antecedents[:5])}") + + biologie = contexte.get("biologie_cle") + if biologie: + bio_parts = [] + for b in biologie: + test, valeur, anomalie = b if isinstance(b, (list, tuple)) else (b.get("test"), b.get("valeur"), b.get("anomalie")) + marker = " (\u2191)" if anomalie else "" + bio_parts.append(f"{test} {valeur}{marker}") + lines.append(f"- Biologie : {', '.join(bio_parts)}") + + imagerie = contexte.get("imagerie") + if imagerie: + for img in imagerie: + img_type, conclusion = img if isinstance(img, (list, tuple)) else (img.get("type"), img.get("conclusion")) + if conclusion: + lines.append(f"- Imagerie : {img_type} — {conclusion[:200]}") + + complications = contexte.get("complications") + if complications: + lines.append(f"- Complications : {', '.join(complications)}") + + dp_texte = contexte.get("dp_texte") + if dp_texte: + lines.append(f"- DP du séjour : {dp_texte}") + + das_codes = contexte.get("das_codes_existants") + if das_codes: + lines.append(f"- DAS déjà codés : {', '.join(das_codes)}") + + return "\n".join(lines) if lines else "Non précisé" + + +def _build_prompt(texte: str, sources: list[dict], contexte: dict, est_dp: bool = True) -> str: + """Construit le prompt expert DIM avec raisonnement structuré.""" sources_text = "" for i, src in enumerate(sources, 1): doc_name = { @@ -80,24 +160,94 @@ def _build_prompt(texte: str, sources: list[dict], contexte: dict) -> str: sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n" sources_text += (src.get("extrait", "")[:800]) + "\n\n" - ctx_parts = [] - if contexte.get("sexe"): - ctx_parts.append(f"sexe: {contexte['sexe']}") - if contexte.get("age"): - ctx_parts.append(f"âge: {contexte['age']} ans") - ctx_str = ", ".join(ctx_parts) if ctx_parts else "non précisé" + type_diag = "DP (diagnostic principal)" if est_dp else "DAS (diagnostic associé significatif)" + ctx_str = _format_contexte(contexte) - return f"""Tu es un expert en codage CIM-10 pour le PMSI en France. Suggère le code CIM-10 le plus précis pour le diagnostic suivant, en te basant UNIQUEMENT sur les sources officielles fournies. + return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI. +Tu dois coder le diagnostic suivant en respectant STRICTEMENT les règles de l'ATIH. -Diagnostic à coder : "{texte}" -Contexte patient : {ctx_str} +RÈGLES IMPÉRATIVES : +- Le code doit provenir UNIQUEMENT des sources CIM-10 fournies +- Distingue la DESCRIPTION CLINIQUE (ce que le médecin écrit) de la LOGIQUE DE CODAGE (ce que l'ATIH impose) +- Privilégie le code le plus SPÉCIFIQUE disponible (4e ou 5e caractère) +- Vérifie les notes d'inclusion/exclusion de chaque code candidat +- Si le diagnostic est un DP, il doit refléter le motif principal de prise en charge du séjour +- Si c'est un DAS, il doit avoir mobilisé des ressources supplémentaires pendant le séjour +- EXCLUSION SYMPTÔME : Si le diagnostic est un symptôme (R00-R99) et qu'un diagnostic précis (Chapitres I-XIV, A00-N99) expliquant ce symptôme est présent, le symptôme ne doit PAS être codé comme DAS -Sources de référence : +DIAGNOSTIC À CODER : "{texte}" +TYPE : {type_diag} + +CONTEXTE CLINIQUE : +{ctx_str} + +SOURCES CIM-10 : {sources_text} -Réponds UNIQUEMENT au format JSON suivant, sans texte avant ou après : +RAISONNE ÉTAPE PAR ÉTAPE : +1. ANALYSE CLINIQUE : Que signifie ce diagnostic sur le plan médical ? +2. CODES CANDIDATS : Quels codes des sources fournies sont compatibles ? +3. DISCRIMINATION : Pourquoi choisir un code plutôt qu'un autre ? (inclusions/exclusions, spécificité) +4. RÈGLE PMSI : Ce code est-il conforme pour un {type_diag} ? (guide méthodologique) + +Après ton raisonnement, conclus OBLIGATOIREMENT par le JSON suivant sur une ligne séparée : +{_RESULT_MARKER} {{"code": "X99.9", "confidence": "high|medium|low", "justification": "explication courte en français"}}""" +def _parse_ollama_response(raw: str) -> dict | None: + """Parse la réponse Ollama en extrayant le JSON après le marqueur ###RESULT###. + + Fallback sur la recherche d'accolades si le marqueur est absent. + Retourne un dict avec les clés code/confidence/justification + raisonnement. + """ + raisonnement = None + json_str = None + + # Stratégie 1 : chercher le marqueur ###RESULT### + marker_pos = raw.find(_RESULT_MARKER) + if marker_pos != -1: + raisonnement = raw[:marker_pos].strip() + after_marker = raw[marker_pos + len(_RESULT_MARKER):] + brace_start = after_marker.find("{") + brace_end = after_marker.rfind("}") + if brace_start != -1 and brace_end != -1: + json_str = after_marker[brace_start:brace_end + 1] + else: + # Fallback : chercher le dernier bloc JSON dans la réponse + # (le raisonnement peut contenir des accolades intermédiaires) + last_brace = raw.rfind("}") + if last_brace != -1: + # Chercher l'accolade ouvrante correspondante en remontant + depth = 0 + start = -1 + for i in range(last_brace, -1, -1): + if raw[i] == "}": + depth += 1 + elif raw[i] == "{": + depth -= 1 + if depth == 0: + start = i + break + if start != -1: + json_str = raw[start:last_brace + 1] + raisonnement = raw[:start].strip() + + if not json_str: + logger.warning("Ollama : réponse sans JSON valide : %s", raw[:200]) + return None + + try: + parsed = json.loads(json_str) + except json.JSONDecodeError: + logger.warning("Ollama : JSON invalide : %s", json_str[:200]) + return None + + if raisonnement: + parsed["raisonnement"] = raisonnement + + return parsed + + def _call_ollama(prompt: str) -> dict | None: """Appelle Ollama et parse la réponse JSON.""" try: @@ -109,27 +259,14 @@ def _call_ollama(prompt: str) -> dict | None: "stream": False, "options": { "temperature": 0.1, - "num_predict": 300, + "num_predict": 1200, }, }, timeout=OLLAMA_TIMEOUT, ) response.raise_for_status() raw = response.json().get("response", "") - - # Extraire le JSON de la réponse (peut contenir du texte autour) - json_match = None - # Chercher un bloc JSON entre accolades - brace_start = raw.find("{") - brace_end = raw.rfind("}") - if brace_start != -1 and brace_end != -1: - json_match = raw[brace_start:brace_end + 1] - - if json_match: - return json.loads(json_match) - else: - logger.warning("Ollama : réponse sans JSON valide : %s", raw[:200]) - return None + return _parse_ollama_response(raw) except requests.ConnectionError: logger.warning("Ollama non disponible (connexion refusée)") @@ -145,13 +282,14 @@ def _call_ollama(prompt: str) -> dict | None: def enrich_diagnostic( diagnostic: Diagnostic, contexte: dict, + est_dp: bool = True, ) -> None: """Enrichit un Diagnostic avec le RAG (FAISS + Ollama). Modifie le diagnostic en place. Fallback gracieux si FAISS ou Ollama échouent. """ # 1. Recherche FAISS - sources = search_similar(diagnostic.texte, top_k=5) + sources = search_similar(diagnostic.texte, top_k=10) if not sources: logger.debug("Aucune source RAG trouvée pour : %s", diagnostic.texte) @@ -168,14 +306,15 @@ def enrich_diagnostic( for s in sources ] - # 3. Appel Ollama pour justification - prompt = _build_prompt(diagnostic.texte, sources, contexte) + # 3. Appel Ollama pour justification avec raisonnement structuré + prompt = _build_prompt(diagnostic.texte, sources, contexte, est_dp=est_dp) llm_result = _call_ollama(prompt) if llm_result: code = llm_result.get("code") confidence = llm_result.get("confidence") justification = llm_result.get("justification") + raisonnement = llm_result.get("raisonnement") if code: diagnostic.cim10_suggestion = code @@ -183,6 +322,8 @@ def enrich_diagnostic( diagnostic.cim10_confidence = confidence if justification: diagnostic.justification = justification + if raisonnement: + diagnostic.raisonnement = raisonnement else: logger.info("Ollama non disponible — sources FAISS conservées sans justification LLM") @@ -192,12 +333,27 @@ def enrich_dossier(dossier: DossierMedical) -> None: contexte = { "sexe": dossier.sejour.sexe, "age": dossier.sejour.age, + "duree_sejour": dossier.sejour.duree_sejour, + "imc": dossier.sejour.imc, + "antecedents": dossier.antecedents[:5], + "biologie_cle": [(b.test, b.valeur, b.anomalie) for b in dossier.biologie_cle], + "imagerie": [(i.type, (i.conclusion or "")[:200]) for i in dossier.imagerie], + "complications": dossier.complications, } if dossier.diagnostic_principal: logger.info("RAG enrichissement DP : %s", dossier.diagnostic_principal.texte) - enrich_diagnostic(dossier.diagnostic_principal, contexte) + enrich_diagnostic(dossier.diagnostic_principal, contexte, est_dp=True) + + # Pour les DAS, ajouter le DP et les DAS existants au contexte pour cohérence + if dossier.diagnostic_principal: + contexte["dp_texte"] = dossier.diagnostic_principal.texte + contexte["das_codes_existants"] = [ + f"{d.cim10_suggestion} ({d.texte})" + for d in dossier.diagnostics_associes + if d.cim10_suggestion + ] for das in dossier.diagnostics_associes: logger.info("RAG enrichissement DAS : %s", das.texte) - enrich_diagnostic(das, contexte) + enrich_diagnostic(das, contexte, est_dp=False) diff --git a/src/medical/severity.py b/src/medical/severity.py new file mode 100644 index 0000000..7f97909 --- /dev/null +++ b/src/medical/severity.py @@ -0,0 +1,201 @@ +"""Détection heuristique de sévérité et CMA/CMS pour le codage GHM. + +Phase 1 : heuristique basée sur des marqueurs textuels et des racines CIM-10. +Phase 2 (future) : tables CMA/CMS officielles ATIH. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from typing import Optional + +from .cim10_dict import load_dict, normalize_text + + +# --- Marqueurs de sévérité dans le texte --- + +_SEVERE_MARKERS = { + "aigu", "aigue", "severe", "grave", "maligne", "malin", + "foudroyant", "foudroyante", "necrosant", "necrosante", + "septique", "decompense", "decompensee", "choc", + "defaillance", "hemorragique", + "fulminant", "fulminante", "massif", "massive", "critique", +} + +_MODERATE_MARKERS = { + "modere", "moderee", "moderes", "moderees", + "subaigu", "subaigue", "subaiguë", + "persistant", "persistante", "recidivant", "recidivante", +} + +_MILD_MARKERS = { + "chronique", "leger", "legere", + "benin", "benigne", "mineur", "mineure", + "superficiel", "superficielle", "stable", +} + + +# --- Racines CIM-10 fréquemment CMA (heuristique Phase 1) --- +# Ces racines sont connues pour être souvent classées CMA dans les tables ATIH. + +_HEURISTIC_CMA_ROOTS: set[str] = { + # Infectieux + "A41", # Sepsis + "A40", # Septicémie streptococcique + # Hématologie / nutrition + "D64", # Anémie + "D65", # CIVD + "E46", # Dénutrition + "E87", # Troubles hydro-électrolytiques + "E86", # Déshydratation + # Métabolique + "E11", # Diabète type 2 (avec complications) + "E10", # Diabète type 1 (avec complications) + # Cardiovasculaire + "I48", # Fibrillation auriculaire + "I50", # Insuffisance cardiaque + "I26", # Embolie pulmonaire + "I80", # Thrombose veineuse + # Respiratoire + "J18", # Pneumopathie + "J96", # Insuffisance respiratoire + "J69", # Pneumopathie d'inhalation + # Rénal + "N17", # Insuffisance rénale aiguë + "N18", # Insuffisance rénale chronique + "N39", # Infection urinaire + # Hépatique + "K72", # Insuffisance hépatique + # Infectieux nosocomial + "T81", # Complications d'actes (infection post-op) + "T80", # Complications post-perfusion +} + + +@dataclass +class SeverityInfo: + """Résultat de l'évaluation de sévérité d'un diagnostic.""" + est_cma_probable: bool = False + niveau_severite: str = "non_evalue" # "leger" | "modere" | "severe" | "non_evalue" + marqueurs_trouves: list[str] = field(default_factory=list) + + +def _detect_severity_markers(text: str) -> tuple[str, list[str]]: + """Détecte les marqueurs de sévérité dans un texte normalisé. + + Returns: + (niveau, marqueurs_trouves) où niveau est "severe", "modere", "leger" ou "non_evalue". + """ + text_norm = normalize_text(text) + words = set(text_norm.split()) + + found_severe = words & _SEVERE_MARKERS + found_moderate = words & _MODERATE_MARKERS + found_mild = words & _MILD_MARKERS + + all_found = list(found_severe | found_moderate | found_mild) + + if found_severe: + return "severe", all_found + if found_moderate: + return "modere", all_found + if found_mild: + return "leger", all_found + return "non_evalue", [] + + +def _is_heuristic_cma(code: str) -> bool: + """Vérifie si un code CIM-10 est probablement CMA selon les racines heuristiques.""" + if not code: + return False + code_upper = code.upper() + for root in _HEURISTIC_CMA_ROOTS: + if code_upper.startswith(root): + return True + return False + + +def evaluate_severity(diagnostic) -> SeverityInfo: + """Évalue la sévérité d'un diagnostic (texte + code CIM-10). + + Args: + diagnostic: Objet avec attributs texte, cim10_suggestion. + + Returns: + SeverityInfo avec est_cma_probable, niveau_severite, marqueurs_trouves. + """ + info = SeverityInfo() + + # 1. Marqueurs textuels depuis le texte du diagnostic + texte = diagnostic.texte or "" + niveau, marqueurs = _detect_severity_markers(texte) + + # 2. Chercher aussi dans le label du dictionnaire CIM-10 + code = diagnostic.cim10_suggestion + if code: + cim10_dict = load_dict() + label = cim10_dict.get(code, "") + if label: + niveau_label, marqueurs_label = _detect_severity_markers(label) + # Prendre le niveau le plus sévère + severity_order = {"severe": 3, "modere": 2, "leger": 1, "non_evalue": 0} + if severity_order.get(niveau_label, 0) > severity_order.get(niveau, 0): + niveau = niveau_label + marqueurs = list(set(marqueurs + marqueurs_label)) + + info.niveau_severite = niveau + info.marqueurs_trouves = marqueurs + + # 3. Heuristique CMA basée sur la racine CIM-10 + if code and _is_heuristic_cma(code): + info.est_cma_probable = True + + # Un diagnostic sévère avec un code CMA-probable = forte indication + if niveau == "severe" and info.est_cma_probable: + info.est_cma_probable = True + + return info + + +def enrich_dossier_severity(dp, das_list: list) -> list[str]: + """Enrichit les diagnostics d'un dossier avec les informations de sévérité. + + Modifie les diagnostics en place (attributs est_cma, est_cms, niveau_severite). + + Args: + dp: Diagnostic principal. + das_list: Liste des diagnostics associés. + + Returns: + Liste d'alertes de sévérité générées. + """ + alertes = [] + + # Évaluer le DP + if dp and dp.cim10_suggestion: + info = evaluate_severity(dp) + dp.niveau_severite = info.niveau_severite + if info.est_cma_probable: + dp.est_cma = True + + # Évaluer chaque DAS + cma_count = 0 + for das in das_list: + if not das.cim10_suggestion: + continue + info = evaluate_severity(das) + das.niveau_severite = info.niveau_severite + if info.est_cma_probable: + das.est_cma = True + cma_count += 1 + alertes.append( + f"CMA probable : '{das.texte}' ({das.cim10_suggestion}) — " + f"sévérité {info.niveau_severite}" + + (f", marqueurs : {', '.join(info.marqueurs_trouves)}" if info.marqueurs_trouves else "") + ) + + if cma_count >= 2: + alertes.insert(0, f"{cma_count} CMA probables détectées — impact potentiel sur le niveau de sévérité GHM") + + return alertes diff --git a/src/viewer/templates/detail.html b/src/viewer/templates/detail.html index 6ffc2a0..08bae8e 100644 --- a/src/viewer/templates/detail.html +++ b/src/viewer/templates/detail.html @@ -4,6 +4,9 @@ {% block sidebar %}
| Texte | CIM-10 | Confiance | Justification | ||
|---|---|---|---|---|---|
| Texte | CIM-10 | Confiance | Sévérité | Justification | |
| {{ das.texte }} | ++ {{ das.texte }} + {% if das.est_cma %}CMA{% endif %} + | {% if das.cim10_suggestion %}{{ das.cim10_suggestion }}{% endif %} | {{ das.cim10_confidence | confidence_badge }} | ++ {% if das.niveau_severite == 'severe' %}Sévère + {% elif das.niveau_severite == 'modere' %}Modéré + {% elif das.niveau_severite == 'leger' %}Léger + {% else %}—{% endif %} + | {{ das.justification or '' }} |
| Texte | Code CCAM | Date | |
|---|---|---|---|
| Texte | Code CCAM | Date | Validité |
| {{ a.texte }} | {% if a.code_ccam_suggestion %}{{ a.code_ccam_suggestion }}{% endif %} | {{ a.date or '' }} | +
+ {% if a.validite == 'valide' %}Valide
+ {% elif a.validite == 'obsolete' %}Obsolète
+ {% else %}—{% endif %}
+ {% for alerte in a.alertes %}
+ {{ alerte }}
+ {% endfor %}
+ |