diff --git a/src/medical/bio_extraction.py b/src/medical/bio_extraction.py new file mode 100644 index 0000000..163d470 --- /dev/null +++ b/src/medical/bio_extraction.py @@ -0,0 +1,184 @@ +"""Extraction des résultats biologiques depuis le texte médical.""" + +from __future__ import annotations + +import re +import unicodedata + +from ..config import BiologieCle, DossierMedical, load_lab_value_sanity +from .bio_normals import BIO_NORMALS, _is_abnormal + + +def _norm_key(s: str) -> str: + """Normalise une clé (minuscules, sans accents) pour index YAML.""" + s = (s or "").strip().lower() + s = unicodedata.normalize("NFKD", s) + s = "".join(ch for ch in s if not unicodedata.combining(ch)) + return re.sub(r"\s+", " ", s) + + +def _parse_float_and_token(raw: str) -> tuple[float | None, str | None]: + """Parse un float et renvoie aussi le token numérique normalisé (avec '.').""" + if raw is None: + return None, None + s = str(raw).strip() + m = re.search(r"(-?\d+(?:[\.,]\d+)?)", s) + if not m: + return None, None + token = m.group(1).replace(",", ".") + try: + return float(token), token + except ValueError: + return None, None + + +def _sanitize_bio_value(test_name: str, raw_value: str, sanity_cfg: dict) -> tuple[str, float, str, str | None] | None: + """Applique des garde-fous anti-artefacts (OCR/PDF). + + Retour: + (token, value_float, quality, reason) ou None si non parsable. + quality: ok | suspect | discarded + """ + val, token = _parse_float_and_token(raw_value) + if val is None or token is None: + return None + + key = _norm_key(test_name) + tests_cfg = (sanity_cfg or {}).get("tests") or {} + cfg = tests_cfg.get(key) or {} + hard_min = cfg.get("hard_min") + hard_max = cfg.get("hard_max") + + if hard_min is not None and val < float(hard_min): + return token, val, "discarded", f"Valeur hors bornes plausibles (<{hard_min})" + if hard_max is not None and val > float(hard_max): + return token, val, "discarded", f"Valeur hors bornes plausibles (>{hard_max})" + + quality = "ok" + reason: str | None = None + + suspect_cfg = cfg.get("suspect") or {} + single_digit_over = suspect_cfg.get("single_digit_over") + if single_digit_over is not None: + # Ex: potassium '8' au lieu de '4.8' (décimale perdue) + if re.fullmatch(r"\d", str(raw_value).strip()) and val >= float(single_digit_over): + quality = "suspect" + reason = f"Valeur à 1 chiffre (possible décimale perdue) : vérifier dans le CR" + + return token, val, quality, reason + + +def _extract_biologie(text: str, dossier: DossierMedical) -> None: + """Extrait des résultats biologiques clés. + + Notes: + - Supporte des aliases (TGO/TGP, Hb, Na/K…) + - Capte plusieurs occurrences (utile pour valider/infirmer des diagnostics) + - Reste volontairement *simple* (regex sur texte extrait) : si une valeur est + uniquement dans un tableau PDF mal extrait, elle peut manquer. + """ + # (pattern, test_name) + bio_patterns: list[tuple[str, str]] = [ + (r"[Ll]ipas[ée]mie\s*(?:[àa=:])?\s*(\d+)\s*(?:UI/L|U/L)?", "Lipasémie"), + (r"\bCRP\b\s*[=:àa]?\s*(\d+(?:[.,]\d+)?)\s*(?:mg/[Ll])?", "CRP"), + (r"(?:\bASAT\b|\bTGO\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ASAT"), + (r"(?:\bALAT\b|\bTGP\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ALAT"), + (r"\bGGT\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "GGT"), + (r"\bPAL\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "PAL"), + (r"[Bb]ilirubine\s+(?:totale\s+)?[àa=:]\s*(\d+(?:[.,]\d+)?)\s*(?:µmol/L|mg/dL)?", "Bilirubine totale"), + + # Ionogramme / électrolytes + (r"(?:[Ss]odium|[Nn]atr[ée]mie|(? max_per_test: + break + anomalie = _is_abnormal(test_name, raw_value) + dossier.biologie_cle.append( + BiologieCle( + test=test_name, + valeur=raw_value, + valeur_num=None, + anomalie=anomalie, + quality="ok", + discard_reason=None, + ) + ) + continue + + sanitized = _sanitize_bio_value(test_name, raw_value, sanity_cfg) + if sanitized is None: + continue + token, val_num, quality, reason = sanitized + + if quality == "suspect" and not keep_suspect: + quality = "discarded" + reason = reason or "Valeur suspecte (policy keep_suspect=false)" + + # Déduplication sur la valeur normalisée + key = (test_name, token) + if key in seen: + continue + seen.add(key) + + counts[test_name] = counts.get(test_name, 0) + 1 + if counts[test_name] > max_per_test: + break + + if quality == "discarded": + # On garde la trace pour audit, sans polluer les règles qualité. + dossier.biologie_discarded.append( + { + "test": test_name, + "raw": raw_value, + "valeur": token, + "valeur_num": val_num, + "reason": reason, + } + ) + if drop_out_of_range: + continue + + anomalie = _is_abnormal(test_name, token) + dossier.biologie_cle.append( + BiologieCle( + test=test_name, + valeur=token, + valeur_num=val_num, + anomalie=anomalie, + quality=quality, + discard_reason=reason, + ) + ) diff --git a/src/medical/bio_normals.py b/src/medical/bio_normals.py new file mode 100644 index 0000000..b0f6d3d --- /dev/null +++ b/src/medical/bio_normals.py @@ -0,0 +1,40 @@ +"""Plages de référence biologiques et fonction d'interprétation.""" + +from __future__ import annotations + + +# Plages de référence biologiques (min, max) — utilisées par _is_abnormal() +# et exportées pour le formatage du contexte LLM dans rag_search.py +BIO_NORMALS: dict[str, tuple[float, float]] = { + "Lipasémie": (0, 60), + "CRP": (0, 5), + "ASAT": (0, 40), + "ALAT": (0, 40), + "GGT": (0, 60), + "PAL": (0, 150), + "Bilirubine totale": (0, 17), + # Ionogramme (fallback adulte ; les règles de décision utilisent reference_ranges.yaml) + "Sodium": (135, 145), + "Potassium": (3.5, 5.0), + "Hémoglobine": (12, 17), + "Plaquettes": (150, 400), + "Leucocytes": (4, 10), + "Créatinine": (50, 120), +} + + +def _is_abnormal(test: str, value: str) -> bool | None: + """Détermine si un résultat biologique est anormal.""" + try: + val = float(value.replace(",", ".")) + except (ValueError, AttributeError): + if value.lower() in ("négative", "negative", "normale", "normal"): + return False + if value.lower() in ("positive", "positif", "élevée", "elevee"): + return True + return None + + if test in BIO_NORMALS: + lo, hi = BIO_NORMALS[test] + return val > hi or val < lo + return None diff --git a/src/medical/cim10_extractor.py b/src/medical/cim10_extractor.py index f5230e4..bc6d6c1 100644 --- a/src/medical/cim10_extractor.py +++ b/src/medical/cim10_extractor.py @@ -1,26 +1,28 @@ -"""Extraction d'informations médicales structurées pour le codage CIM-10.""" +"""Extraction d'informations médicales structurées pour le codage CIM-10. + +Orchestrateur principal — délègue aux sous-modules : + - bio_normals : constantes BIO_NORMALS, _is_abnormal() + - bio_extraction : _extract_biologie() et helpers parsing + - diagnostic_extraction : _extract_diagnostics(), _extract_actes(), CIM10_MAP, CCAM_MAP + - validation_pipeline : _validate_cim10(), _validate_ccam(), _apply_*_rules() +""" from __future__ import annotations import logging import re -import unicodedata from datetime import datetime from typing import Optional logger = logging.getLogger(__name__) -from .cim10_dict import lookup as dict_lookup, normalize_text, normalize_code, validate_code as cim10_validate -from .ccam_dict import lookup as ccam_lookup, validate_code as ccam_validate -from .das_filter import clean_diagnostic_text, is_valid_diagnostic_text, correct_known_miscodes, expand_medical_abbreviations +from .cim10_dict import normalize_code, validate_code as cim10_validate +from .das_filter import clean_diagnostic_text, is_valid_diagnostic_text, expand_medical_abbreviations from ..config import ( - ActeCCAM, Antecedent, - BiologieCle, Complication, Diagnostic, DossierMedical, - load_lab_value_sanity, Imagerie, Sejour, Traitement, @@ -31,70 +33,31 @@ try: except ImportError: EdsnlpResult = None # type: ignore[assignment,misc] -# Mapping diagnostics fréquents → codes CIM-10 -CIM10_MAP: dict[str, str] = { - # Pancréatite - "pancréatite aiguë biliaire": "K85.1", - "pancréatite aigue biliaire": "K85.1", - "pancréatite aiguë lithiasique": "K85.1", - "pancréatite aigue lithiasique": "K85.1", - "pancréatite aiguë": "K85.9", - "pancréatite aigue": "K85.9", - "pancréatite": "K85.9", - # Lithiases biliaires - "lithiase cholédoque": "K80.5", - "lithiase du cholédoque": "K80.5", - "calcul des canaux biliaires": "K80.5", - "lithiase vésiculaire": "K80.2", - "lithiases vésiculaires": "K80.2", - "vésicule lithiasique": "K80.2", - "colique hépatique": "K80.2", - # Cholécystite - "cholécystite aiguë": "K81.0", - "cholecystite aigue": "K81.0", - "angiocholite": "K83.0", - # Obésité - "obésité": "E66.0", - "obesite": "E66.0", - "surpoids": "E66.0", - # Réactions médicamenteuses - "éruption médicamenteuse": "L27.0", - "eruption medicamenteuse": "L27.0", - "éruption cutanée médicamenteuse": "L27.0", - "toxidermie": "L27.0", - "réaction au tramadol": "L27.0", - "allergie médicamenteuse": "T88.7", - # Douleur - "douleur abdominale": "R10.4", - "douleur hypochondre droit": "R10.1", - # Ictère - "ictère": "R17", - "jaunisse": "R17", - # HTA - "hypertension artérielle": "I10", - "hta": "I10", - # Diabète - "diabète type 2": "E11.9", - "diabète de type 2": "E11.9", - "diabète type 1": "E10.9", -} +# --- Imports depuis les sous-modules --- +from .bio_extraction import _extract_biologie +from .diagnostic_extraction import ( + _extract_diagnostics, + _extract_actes, + CIM10_MAP, + CCAM_MAP, +) +from .validation_pipeline import ( + _validate_ccam, + _validate_cim10, + _apply_code_corrections, + _apply_exclusion_rules, + _apply_severity_rules, + _apply_noncumul_rules, + _remove_das_equal_dp, + _apply_source_tracking, + _validate_justifications, +) -# Mapping actes → codes CCAM -CCAM_MAP: dict[str, str] = { - "cholécystectomie": "HMFC004", - "cholecystectomie": "HMFC004", - "cholécystectomie par cœlioscopie": "HMFC004", - "cholecystectomie par coelioscopie": "HMFC004", - "cholangiographie": "HHHE002", - "cholangiographie peropératoire": "HHHE002", - "cpre": "HHHE002", - "sphinctérotomie endoscopique": "HHHE003", - "scanner abdominal": "ZCQK002", - "tdm abdominal": "ZCQK002", - "échographie abdominale": "ZCQJ001", - "echo abdominale": "ZCQJ001", - "irm abdominale": "ZCQN001", -} +# Backward compat — sera retiré dans un commit futur +from .bio_normals import BIO_NORMALS, _is_abnormal # noqa: F401 +from .validation_pipeline import _is_dp_family_redundant # noqa: F401 +from .diagnostic_extraction import _lookup_cim10 # noqa: F401 +from .diagnostic_extraction import _DAS_PATTERNS # noqa: F401 def extract_medical_info( @@ -304,239 +267,6 @@ def _extract_sejour(parsed: dict, dossier: DossierMedical) -> None: dossier.sejour.taille = patient["taille_cm"] -def _extract_diagnostics( - parsed: dict, - text: str, - dossier: DossierMedical, - edsnlp_result: Optional[EdsnlpResult] = None, -) -> None: - """Extrait le diagnostic principal et les diagnostics associés.""" - text_lower = text.lower() - - # Diagnostics codés depuis Trackare (prioritaires) - for diag in parsed.get("diagnostics", []): - texte = clean_diagnostic_text(diag.get("libelle", "")) - texte = expand_medical_abbreviations(texte) - is_principal = diag.get("type", "").lower() == "principal" - # Le DP Trackare est toujours accepté (pré-codé avec CIM-10 validé). - # Seuls les DAS passent le filtre anti-bruit. - if not is_principal and not is_valid_diagnostic_text(texte): - continue - d = Diagnostic( - texte=texte, - cim10_suggestion=diag.get("code_cim10"), - source="trackare", - ) - if is_principal: - dossier.diagnostic_principal = d - else: - dossier.diagnostics_associes.append(d) - - # Extraction du texte "Au total:" ou conclusion - conclusion = "" - m = re.search( - r"Au total\s*[::]?\s*(.*?)(?=\n\s*(?:Devenir|TTT|Sortie|$))", - text, - re.DOTALL | re.IGNORECASE, - ) - if m: - conclusion = m.group(1).strip() - - # Enrichissement via edsnlp (CIM-10) - edsnlp_codes: dict[str, str] = {} - if edsnlp_result: - for ent in edsnlp_result.cim10_entities: - if not ent.negation and not ent.hypothese: - edsnlp_codes[ent.code] = ent.texte - - # Si pas de DP depuis le codage, chercher dans le texte - if not dossier.diagnostic_principal: - # D'abord essayer le fallback regex (plus précis pour les patterns spécifiques) - dp = _find_diagnostic_principal(text_lower, conclusion) - if dp: - dossier.diagnostic_principal = dp - elif edsnlp_codes: - # Utiliser la première entité CIM-10 edsnlp comme DP - code, texte = next(iter(edsnlp_codes.items())) - texte_clean = texte.capitalize() - if is_valid_diagnostic_text(texte_clean): - dossier.diagnostic_principal = Diagnostic( - texte=texte_clean, cim10_suggestion=code, - source="edsnlp", - ) - - # Diagnostics associés depuis le texte (regex) - das = _find_diagnostics_associes(text_lower, conclusion, dossier) - das = [d for d in das if is_valid_diagnostic_text(d.texte)] - dossier.diagnostics_associes.extend(das) - - # Enrichissement DAS depuis edsnlp - if edsnlp_result: - existing_codes = set() - if dossier.diagnostic_principal: - existing_codes.add(dossier.diagnostic_principal.cim10_suggestion) - for d in dossier.diagnostics_associes: - existing_codes.add(d.cim10_suggestion) - - for ent in edsnlp_result.cim10_entities: - if ent.negation or ent.hypothese: - continue - texte = clean_diagnostic_text(ent.texte.capitalize()) - if not is_valid_diagnostic_text(texte): - continue - if ent.code not in existing_codes: - dossier.diagnostics_associes.append(Diagnostic( - texte=texte, - cim10_suggestion=ent.code, - source="edsnlp", - )) - existing_codes.add(ent.code) - - -def _find_diagnostic_principal(text_lower: str, conclusion: str) -> Diagnostic | None: - """Trouve le diagnostic principal dans le texte. - - Normalise le texte avant matching pour gérer les variations d'accents/casse. - """ - conclusion_norm = normalize_text(conclusion) - - # Chercher dans la conclusion d'abord via CIM10_MAP (domain override) - for terme, code in CIM10_MAP.items(): - if normalize_text(terme) in conclusion_norm: - return Diagnostic(texte=terme.capitalize(), cim10_suggestion=code, source="regex") - - text_norm = normalize_text(text_lower) - - # Patterns courants pour le DP (normalisés, sans accents) - dp_patterns = [ - r"pancreatite\s+aigue\s+(?:d'origine\s+)?lithiasique", - r"pancreatite\s+aigue\s+biliaire", - r"pancreatite\s+aigue", - ] - for pat in dp_patterns: - m = re.search(pat, text_norm) - if m: - matched = m.group(0) - code = _lookup_cim10(matched) - return Diagnostic(texte=matched.capitalize(), cim10_suggestion=code, source="regex") - - return None - - -# Patterns DAS : (pattern_normalisé, label, code_fallback) -# Les patterns sont appliqués sur du texte normalisé (sans accents, lowercase) -_DAS_PATTERNS: list[tuple[str, str, str]] = [ - # Lithiases biliaires - (r"lithiase\s+(?:du\s+)?(?:bas\s+)?choledoque", "Lithiase du cholédoque", "K80.5"), - (r"vesicule\s+lithiasique|lithiases?\s+vesiculaire", "Lithiase vésiculaire", "K80.2"), - # Inflammation biliaire - (r"cholecystite\s+aigue", "Cholécystite aiguë", "K81.0"), - (r"angiocholite|cholangite", "Angiocholite", "K83.0"), - # Réactions médicamenteuses - (r"eruption\s+cutanee|toxidermie|reaction\s+au\s+tramadol", "Éruption cutanée médicamenteuse", "L27.0"), - # Cardiovasculaire - (r"hypertension\s+arterielle|\bhta\b", "Hypertension artérielle", "I10"), - (r"fibrillation\s+auriculaire|\bfa\b(?:\s+paroxystique)?|\bacfa\b", "Fibrillation auriculaire", "I48.9"), - (r"embolie\s+pulmonaire", "Embolie pulmonaire", "I26.9"), - (r"thrombose\s+veineuse\s+profonde|\btvp\b", "Thrombose veineuse profonde", "I80.2"), - # Métabolique - (r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+2|diabete\s+type\s*2", "Diabète de type 2", "E11.9"), - (r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+1|diabete\s+type\s*1", "Diabète de type 1", "E10.9"), - (r"dyslipidemie|hypercholesterolemie", "Dyslipidémie", "E78.5"), - (r"denutrition|malnutrition", "Dénutrition", "E46"), - # Infectieux - (r"pneumopathie|pneumonie", "Pneumopathie", "J18.9"), - (r"infection\s+urinaire|pyelonephrite", "Infection urinaire", "N39.0"), - (r"\bsepsis\b|septicemie|choc\s+septique", "Sepsis", "A41.9"), - # Rénal - (r"insuffisance\s+renale", "Insuffisance rénale", "N19"), - # Hématologique - (r"anemie", "Anémie", "D64.9"), - # Addictions - (r"tabagisme|tabac\s+actif", "Tabagisme", "F17.2"), - (r"ethylisme|alcoolisme|intoxication\s+ethylique", "Éthylisme", "F10.1"), -] - - -def _find_diagnostics_associes( - text_lower: str, conclusion: str, dossier: DossierMedical -) -> list[Diagnostic]: - """Trouve les diagnostics associés. - - Utilise des patterns normalisés (sans accents) pour une détection robuste. - """ - das: list[Diagnostic] = [] - existing_codes = set() - if dossier.diagnostic_principal: - existing_codes.add(dossier.diagnostic_principal.cim10_suggestion) - for d in dossier.diagnostics_associes: - existing_codes.add(d.cim10_suggestion) - - text_norm = normalize_text(text_lower) - - # Patterns DAS - for pat, label, code in _DAS_PATTERNS: - if re.search(pat, text_norm) and code not in existing_codes: - das.append(Diagnostic(texte=label, cim10_suggestion=code, source="regex")) - existing_codes.add(code) - - # Obésité (IMC >= 30) — pattern spécial avec extraction de valeur - m = re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_norm) - if m: - imc_val = float(m.group(1).replace(",", ".")) - if imc_val >= 30 and "E66.0" not in existing_codes: - das.append(Diagnostic(texte=f"Obésité (IMC {imc_val})", cim10_suggestion="E66.0", source="regex")) - existing_codes.add("E66.0") - - return das - - -def _extract_actes(text: str, dossier: DossierMedical) -> None: - """Extrait les actes CCAM.""" - text_lower = text.lower() - - # Cholécystectomie par cœlioscopie - if re.search(r"chol[ée]cystectomie\s+par\s+c[oœ][ea]lioscopie", text_lower): - date = _find_act_date(text, r"chol[ée]cystectomie") - dossier.actes_ccam.append(ActeCCAM( - texte="Cholécystectomie par cœlioscopie", - code_ccam_suggestion="HMFC004", - date=date, - )) - elif re.search(r"chol[ée]cystectomie|cholecystectomie", text_lower): - date = _find_act_date(text, r"chol[ée]cystectomie|cholecystectomie") - dossier.actes_ccam.append(ActeCCAM( - texte="Cholécystectomie", - code_ccam_suggestion="HMFC004", - date=date, - )) - - # Cholangiographie - if re.search(r"cholangiographie", text_lower): - date = _find_act_date(text, r"cholangiographie") - dossier.actes_ccam.append(ActeCCAM( - texte="Cholangiographie peropératoire", - code_ccam_suggestion="HHHE002", - date=date, - )) - - # TDM - if re.search(r"(?:tdm|scanner|tomodensitométrie)", text_lower): - date = _find_act_date(text, r"(?:TDM|scanner)") - dossier.actes_ccam.append(ActeCCAM( - texte="TDM abdominal", - code_ccam_suggestion="ZCQK002", - date=date, - )) - - # Fallback : tenter le lookup CCAM dict pour les actes sans code - for acte in dossier.actes_ccam: - if not acte.code_ccam_suggestion: - code = ccam_lookup(acte.texte, domain_overrides=CCAM_MAP) - if code: - acte.code_ccam_suggestion = code - - _ANTECEDENT_NOISE = ( "item de", "surveillance", "température", "signes vitaux", "pouls", "type de note", "aucune donnée", "renseignée", @@ -691,183 +421,6 @@ def _match_drug_atc(med_name: str, drug_atc: dict[str, str]) -> Optional[str]: return None - -def _norm_key(s: str) -> str: - """Normalise une clé (minuscules, sans accents) pour index YAML.""" - s = (s or "").strip().lower() - s = unicodedata.normalize("NFKD", s) - s = "".join(ch for ch in s if not unicodedata.combining(ch)) - return re.sub(r"\s+", " ", s) - - -def _parse_float_and_token(raw: str) -> tuple[float | None, str | None]: - """Parse un float et renvoie aussi le token numérique normalisé (avec '.').""" - if raw is None: - return None, None - s = str(raw).strip() - m = re.search(r"(-?\d+(?:[\.,]\d+)?)", s) - if not m: - return None, None - token = m.group(1).replace(",", ".") - try: - return float(token), token - except ValueError: - return None, None - - -def _sanitize_bio_value(test_name: str, raw_value: str, sanity_cfg: dict) -> tuple[str, float, str, str | None] | None: - """Applique des garde-fous anti-artefacts (OCR/PDF). - - Retour: - (token, value_float, quality, reason) ou None si non parsable. - quality: ok | suspect | discarded - """ - val, token = _parse_float_and_token(raw_value) - if val is None or token is None: - return None - - key = _norm_key(test_name) - tests_cfg = (sanity_cfg or {}).get("tests") or {} - cfg = tests_cfg.get(key) or {} - hard_min = cfg.get("hard_min") - hard_max = cfg.get("hard_max") - - if hard_min is not None and val < float(hard_min): - return token, val, "discarded", f"Valeur hors bornes plausibles (<{hard_min})" - if hard_max is not None and val > float(hard_max): - return token, val, "discarded", f"Valeur hors bornes plausibles (>{hard_max})" - - quality = "ok" - reason: str | None = None - - suspect_cfg = cfg.get("suspect") or {} - single_digit_over = suspect_cfg.get("single_digit_over") - if single_digit_over is not None: - # Ex: potassium '8' au lieu de '4.8' (décimale perdue) - if re.fullmatch(r"\d", str(raw_value).strip()) and val >= float(single_digit_over): - quality = "suspect" - reason = f"Valeur à 1 chiffre (possible décimale perdue) : vérifier dans le CR" - - return token, val, quality, reason - - -def _extract_biologie(text: str, dossier: DossierMedical) -> None: - """Extrait des résultats biologiques clés. - - Notes: - - Supporte des aliases (TGO/TGP, Hb, Na/K…) - - Capte plusieurs occurrences (utile pour valider/infirmer des diagnostics) - - Reste volontairement *simple* (regex sur texte extrait) : si une valeur est - uniquement dans un tableau PDF mal extrait, elle peut manquer. - """ - # (pattern, test_name) - bio_patterns: list[tuple[str, str]] = [ - (r"[Ll]ipas[ée]mie\s*(?:[àa=:])?\s*(\d+)\s*(?:UI/L|U/L)?", "Lipasémie"), - (r"\bCRP\b\s*[=:àa]?\s*(\d+(?:[.,]\d+)?)\s*(?:mg/[Ll])?", "CRP"), - (r"(?:\bASAT\b|\bTGO\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ASAT"), - (r"(?:\bALAT\b|\bTGP\b)\s*[=:àa]?\s*([\d.,]+)\s*(?:N|U(?:I)?/L)?", "ALAT"), - (r"\bGGT\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "GGT"), - (r"\bPAL\b\s*[=:àa]?\s*(\d+)\s*(?:U(?:I)?/L)?", "PAL"), - (r"[Bb]ilirubine\s+(?:totale\s+)?[àa=:]\s*(\d+(?:[.,]\d+)?)\s*(?:µmol/L|mg/dL)?", "Bilirubine totale"), - - # Ionogramme / électrolytes - (r"(?:[Ss]odium|[Nn]atr[ée]mie|(? max_per_test: - break - anomalie = _is_abnormal(test_name, raw_value) - dossier.biologie_cle.append( - BiologieCle( - test=test_name, - valeur=raw_value, - valeur_num=None, - anomalie=anomalie, - quality="ok", - discard_reason=None, - ) - ) - continue - - sanitized = _sanitize_bio_value(test_name, raw_value, sanity_cfg) - if sanitized is None: - continue - token, val_num, quality, reason = sanitized - - if quality == "suspect" and not keep_suspect: - quality = "discarded" - reason = reason or "Valeur suspecte (policy keep_suspect=false)" - - # Déduplication sur la valeur normalisée - key = (test_name, token) - if key in seen: - continue - seen.add(key) - - counts[test_name] = counts.get(test_name, 0) + 1 - if counts[test_name] > max_per_test: - break - - if quality == "discarded": - # On garde la trace pour audit, sans polluer les règles qualité. - dossier.biologie_discarded.append( - { - "test": test_name, - "raw": raw_value, - "valeur": token, - "valeur_num": val_num, - "reason": reason, - } - ) - if drop_out_of_range: - continue - - anomalie = _is_abnormal(test_name, token) - dossier.biologie_cle.append( - BiologieCle( - test=test_name, - valeur=token, - valeur_num=val_num, - anomalie=anomalie, - quality=quality, - discard_reason=reason, - ) - ) - - - def _extract_imagerie(text: str, dossier: DossierMedical) -> None: """Extrait les résultats d'imagerie.""" # TDM @@ -948,408 +501,3 @@ def _is_negated_by_edsnlp(term: str, negated_terms: set[str]) -> bool: if term_lower in neg_term or neg_term in term_lower: return True return False - - -def _validate_ccam(dossier: DossierMedical) -> None: - """Valide les codes CCAM suggérés contre le dictionnaire officiel.""" - for acte in dossier.actes_ccam: - if not acte.code_ccam_suggestion: - acte.validite = "non_verifie" - continue - is_valid, desc = ccam_validate(acte.code_ccam_suggestion) - if is_valid: - acte.validite = "valide" - else: - acte.validite = "non_verifie" - dossier.alertes_codage.append( - f"CCAM {acte.code_ccam_suggestion} ({acte.texte}) : code absent du dictionnaire CCAM V81" - ) - - -_INVALID_CODE_PATTERNS = {"aucun", "none", "n/a", "non_codable", "aucun_code_valide", "inconnu"} - - -def _fallback_cim10(texte: str) -> str | None: - """Tente de trouver un code CIM-10 via le dictionnaire à partir du texte diagnostic.""" - code = dict_lookup(texte, domain_overrides=CIM10_MAP) - if code: - is_valid, _ = cim10_validate(code) - if is_valid: - return code - return None - - -def _validate_cim10(dossier: DossierMedical) -> None: - """Valide les codes CIM-10 suggérés par Ollama contre le dictionnaire.""" - diags: list[tuple[str, Diagnostic]] = [] - if dossier.diagnostic_principal: - diags.append(("DP", dossier.diagnostic_principal)) - for das in dossier.diagnostics_associes: - diags.append(("DAS", das)) - - for type_diag, diag in diags: - if not diag.cim10_suggestion: - continue - - # Rejeter les hallucinations - if diag.cim10_suggestion.lower().strip() in _INVALID_CODE_PATTERNS: - fallback = _fallback_cim10(diag.texte) - if fallback: - dossier.alertes_codage.append( - f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} » → fallback {fallback}" - ) - diag.cim10_suggestion = fallback - diag.cim10_confidence = "medium" - else: - dossier.alertes_codage.append( - f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} »" - ) - diag.cim10_suggestion = None - diag.cim10_confidence = None - continue - - # Normaliser le format (K810 → K81.0) - diag.cim10_suggestion = normalize_code(diag.cim10_suggestion) - - # Valider contre le dictionnaire - is_valid, label = cim10_validate(diag.cim10_suggestion) - if not is_valid: - fallback = _fallback_cim10(diag.texte) - if fallback: - dossier.alertes_codage.append( - f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code invalide → fallback {fallback}" - ) - diag.cim10_suggestion = fallback - diag.cim10_confidence = "medium" - else: - dossier.alertes_codage.append( - f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code absent du dictionnaire CIM-10" - ) - diag.cim10_confidence = "low" - - -def _find_act_date(text: str, act_pattern: str) -> str | None: - """Trouve la date associée à un acte.""" - # Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY" - m = re.search( - rf"{act_pattern}.*?(?:le\s+)?(\d{{2}}/\d{{2}}(?:/\d{{4}})?)", - text, - re.IGNORECASE, - ) - if m: - return m.group(1) - - # Chercher dans la ligne d'observation juste avant - m = re.search( - rf"(\d{{2}}/\d{{2}}/\d{{4}}).*?{act_pattern}", - text, - re.IGNORECASE, - ) - if m: - return m.group(1) - return None - - -def _apply_exclusion_rules(dossier: DossierMedical) -> None: - """Applique les règles d'exclusion symptôme vs diagnostic précis.""" - try: - from .exclusion_rules import check_exclusions - result = check_exclusions(dossier.diagnostic_principal, dossier.diagnostics_associes) - dossier.diagnostics_associes = result.cleaned_das - dossier.alertes_codage.extend(result.warnings) - if result.excluded: - logger.info( - " Exclusions : %d DAS symptomatiques exclus", - len(result.excluded), - ) - except Exception: - logger.warning("Erreur lors de l'application des règles d'exclusion", exc_info=True) - - -def _apply_severity_rules(dossier: DossierMedical) -> None: - """Enrichit les diagnostics avec les informations de sévérité heuristique.""" - try: - from .severity import enrich_dossier_severity - alertes, _cma_count, _cms_count = enrich_dossier_severity( - dossier.diagnostic_principal, dossier.diagnostics_associes, - ) - dossier.alertes_codage.extend(alertes) - except Exception: - logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True) - - -def _apply_code_corrections(dossier: DossierMedical) -> None: - """Corrige les codes CIM-10 systématiquement mal attribués par le LLM.""" - all_diags = [] - if dossier.diagnostic_principal: - all_diags.append(dossier.diagnostic_principal) - all_diags.extend(dossier.diagnostics_associes) - - for diag in all_diags: - if not diag.cim10_suggestion: - continue - corrected = correct_known_miscodes(diag.cim10_suggestion, diag.texte) - if corrected: - logger.info(" Code corrigé : %s → %s pour « %s »", diag.cim10_suggestion, corrected, diag.texte) - diag.cim10_suggestion = corrected - - -def _is_dp_family_redundant(das_code: str, dp_code: str) -> bool: - """True si le DAS est redondant avec le DP (même code, parent/enfant, ou même famille).""" - if das_code == dp_code: - return True - # Relation parent/enfant → toujours redondant - das_norm = das_code.replace(".", "") - dp_norm = dp_code.replace(".", "") - if das_norm.startswith(dp_norm) or dp_norm.startswith(das_norm): - return True - # Même famille 3 chars, sauf exceptions - dp_family = dp_code[:3] - if das_code[:3] == dp_family: - # S/T (trauma) : sites différents → garder - if dp_family[0] in ("S", "T"): - return False - # E10-E14 (diabète) : complications différentes → garder - if dp_family[0] == "E" and dp_family[1:].isdigit() and 10 <= int(dp_family[1:]) <= 14: - return False - return True - return False - - -def _remove_das_equal_dp(dossier: DossierMedical) -> None: - """Retire les DAS redondants avec le DP (même code, famille, ou sémantique).""" - from .das_filter import apply_semantic_dedup - - dp_code = dossier.diagnostic_principal.cim10_suggestion if dossier.diagnostic_principal else None - if not dp_code: - return - before = len(dossier.diagnostics_associes) - dossier.diagnostics_associes = [ - d for d in dossier.diagnostics_associes - if not d.cim10_suggestion or not _is_dp_family_redundant(d.cim10_suggestion, dp_code) - ] - removed = before - len(dossier.diagnostics_associes) - if removed: - logger.info(" DAS≈DP : %d DAS retiré(s) (famille %s du DP)", removed, dp_code[:3]) - - # Redondances sémantiques entre DAS - dossier.diagnostics_associes = apply_semantic_dedup(dossier.diagnostics_associes) - - -def _apply_noncumul_rules(dossier: DossierMedical) -> None: - """Détecte les incompatibilités de non-cumul entre actes CCAM.""" - try: - from .ccam_noncumul import check_noncumul - alertes = check_noncumul(dossier.actes_ccam) - dossier.alertes_codage.extend(alertes) - except Exception: - logger.warning("Erreur lors de la vérification du non-cumul CCAM", exc_info=True) - - -def _lookup_cim10(text: str) -> str | None: - """Cherche un code CIM-10 pour un texte donné. - - Utilise le dictionnaire complet (10 893 codes) avec CIM10_MAP en override prioritaire. - """ - return dict_lookup(text, domain_overrides=CIM10_MAP) - - -# Plages de référence biologiques (min, max) — utilisées par _is_abnormal() -# et exportées pour le formatage du contexte LLM dans rag_search.py -BIO_NORMALS: dict[str, tuple[float, float]] = { - "Lipasémie": (0, 60), - "CRP": (0, 5), - "ASAT": (0, 40), - "ALAT": (0, 40), - "GGT": (0, 60), - "PAL": (0, 150), - "Bilirubine totale": (0, 17), - # Ionogramme (fallback adulte ; les règles de décision utilisent reference_ranges.yaml) - "Sodium": (135, 145), - "Potassium": (3.5, 5.0), - "Hémoglobine": (12, 17), - "Plaquettes": (150, 400), - "Leucocytes": (4, 10), - "Créatinine": (50, 120), -} - - -def _is_abnormal(test: str, value: str) -> bool | None: - """Détermine si un résultat biologique est anormal.""" - try: - val = float(value.replace(",", ".")) - except (ValueError, AttributeError): - if value.lower() in ("négative", "negative", "normale", "normal"): - return False - if value.lower() in ("positive", "positif", "élevée", "elevee"): - return True - return None - - if test in BIO_NORMALS: - lo, hi = BIO_NORMALS[test] - return val > hi or val < lo - return None - - -def _track_item(item, search_key: str, page_tracker, search_text: str) -> bool: - """Cherche la page source et l'extrait pour un item avec source_page/source_excerpt.""" - if item.source_page is not None: - return False - if not search_key: - return False - page = page_tracker.find_page_for_text(search_key, search_text) - if page: - item.source_page = page - item.source_excerpt = page_tracker.extract_excerpt(search_key, search_text) - return True - return False - - -def _apply_source_tracking(dossier: DossierMedical, page_tracker, search_text: str) -> None: - """Ajoute la traçabilité source (page + extrait) à tous les éléments du dossier. - - Cherche le texte de chaque élément dans le texte source pour retrouver - la page d'origine et extraire un passage contextualisé. - """ - tracked = 0 - total = 0 - - # Diagnostics (DP + DAS) - all_diags: list[Diagnostic] = [] - if dossier.diagnostic_principal: - all_diags.append(dossier.diagnostic_principal) - all_diags.extend(dossier.diagnostics_associes) - - for diag in all_diags: - total += 1 - if _track_item(diag, diag.texte, page_tracker, search_text): - tracked += 1 - - # Biologie - for b in dossier.biologie_cle: - total += 1 - search_key = f"{b.test}: {b.valeur}" if b.valeur else b.test - if _track_item(b, search_key, page_tracker, search_text): - tracked += 1 - elif b.valeur and _track_item(b, b.test, page_tracker, search_text): - tracked += 1 - - # Imagerie - for img in dossier.imagerie: - total += 1 - search_key = img.type - if _track_item(img, search_key, page_tracker, search_text): - tracked += 1 - elif img.conclusion and _track_item(img, img.conclusion[:50], page_tracker, search_text): - tracked += 1 - - # Traitements - for t in dossier.traitements_sortie: - total += 1 - if _track_item(t, t.medicament, page_tracker, search_text): - tracked += 1 - - # Actes CCAM - for a in dossier.actes_ccam: - total += 1 - if _track_item(a, a.texte, page_tracker, search_text): - tracked += 1 - - # Antécédents - for ant in dossier.antecedents: - total += 1 - if _track_item(ant, ant.texte, page_tracker, search_text): - tracked += 1 - - # Complications - for comp in dossier.complications: - total += 1 - if _track_item(comp, comp.texte, page_tracker, search_text): - tracked += 1 - - if tracked: - logger.info(" Traçabilité source : %d/%d éléments localisés", tracked, total) - - -def _validate_justifications(dossier: DossierMedical) -> None: - """Validation croisée de tous les diagnostics via un appel LLM unique. - - Vérifie la cohérence, les preuves cliniques et la spécificité des codes. - Ajuste la confiance si la justification est faible et ajoute des alertes QC. - """ - try: - from .ollama_client import call_ollama - from .clinical_context import build_enriched_context, format_enriched_context - except ImportError: - logger.warning("Module clinical_context non disponible pour la validation QC") - return - - all_diags: list[tuple[str, Diagnostic]] = [] - if dossier.diagnostic_principal: - all_diags.append(("DP", dossier.diagnostic_principal)) - for das in dossier.diagnostics_associes: - all_diags.append(("DAS", das)) - - if not all_diags: - return - - # Construire le résumé des codes à valider - codes_section = "" - for i, (type_diag, diag) in enumerate(all_diags, 1): - code = diag.cim10_suggestion or "?" - justif = (diag.justification or "")[:150] - preuves = ", ".join(p.element for p in diag.preuves_cliniques[:3]) or "aucune" - codes_section += f"{i}. [{type_diag}] {code} — {diag.texte}\n" - codes_section += f" Justification: {justif}\n" - codes_section += f" Preuves: {preuves}\n\n" - - ctx = build_enriched_context(dossier) - ctx_str = format_enriched_context(ctx) - - from ..prompts import QC_VALIDATION - prompt = QC_VALIDATION.format(ctx_str=ctx_str, codes_section=codes_section) - - try: - result = call_ollama(prompt, temperature=0.1, max_tokens=2500, role="qc") - except Exception: - logger.warning("Erreur lors de l'appel Ollama pour validation QC", exc_info=True) - return - - if result is None: - return - - # Appliquer les ajustements - validations = result.get("validations", []) - for v in validations: - if not isinstance(v, dict): - continue - num = v.get("numero") - if not isinstance(num, int) or num < 1 or num > len(all_diags): - continue - type_diag, diag = all_diags[num - 1] - conf = v.get("confidence_recommandee") - verdict = v.get("verdict") - commentaire = v.get("commentaire", "") - - if conf in ("high", "medium", "low") and conf != diag.cim10_confidence: - old = diag.cim10_confidence - diag.cim10_confidence = conf - if old and conf != old: - dossier.alertes_codage.append( - f"QC: {type_diag} {diag.cim10_suggestion} confiance {old}\u2192{conf} \u2014 {commentaire}" - ) - - if verdict == "supprimer" and type_diag == "DAS": - dossier.alertes_codage.append( - f"QC: DAS {diag.cim10_suggestion} ({diag.texte}) à reconsidérer \u2014 {commentaire}" - ) - - alertes_globales = result.get("alertes_globales", []) - if isinstance(alertes_globales, str): - alertes_globales = [alertes_globales] - for a in alertes_globales: - if isinstance(a, str) and a.strip(): - dossier.alertes_codage.append(f"QC: {a}") - - logger.info(" QC batch : %d validations, %d alertes globales", - len(validations), len(alertes_globales)) diff --git a/src/medical/clinical_context.py b/src/medical/clinical_context.py index 5ba8bde..22d96f4 100644 --- a/src/medical/clinical_context.py +++ b/src/medical/clinical_context.py @@ -7,7 +7,7 @@ cliniques structurées pour améliorer la qualité du codage CIM-10. from __future__ import annotations from ..config import DossierMedical -from .cim10_extractor import BIO_NORMALS +from .bio_normals import BIO_NORMALS # Seuils d'interprétation biologique (test → liste de (seuil, direction, interprétation)) # Ordre décroissant : le premier seuil franchi donne l'interprétation diff --git a/src/medical/diagnostic_extraction.py b/src/medical/diagnostic_extraction.py new file mode 100644 index 0000000..c1e3612 --- /dev/null +++ b/src/medical/diagnostic_extraction.py @@ -0,0 +1,347 @@ +"""Extraction des diagnostics (DP, DAS) et actes CCAM depuis le texte médical.""" + +from __future__ import annotations + +import logging +import re +from typing import Optional + +from .cim10_dict import lookup as dict_lookup, normalize_text, validate_code as cim10_validate +from .ccam_dict import lookup as ccam_lookup, validate_code as ccam_validate +from .das_filter import clean_diagnostic_text, is_valid_diagnostic_text, expand_medical_abbreviations +from ..config import ActeCCAM, Diagnostic, DossierMedical + +try: + from .edsnlp_pipeline import EdsnlpResult +except ImportError: + EdsnlpResult = None # type: ignore[assignment,misc] + +logger = logging.getLogger(__name__) + +# Mapping diagnostics fréquents → codes CIM-10 +CIM10_MAP: dict[str, str] = { + # Pancréatite + "pancréatite aiguë biliaire": "K85.1", + "pancréatite aigue biliaire": "K85.1", + "pancréatite aiguë lithiasique": "K85.1", + "pancréatite aigue lithiasique": "K85.1", + "pancréatite aiguë": "K85.9", + "pancréatite aigue": "K85.9", + "pancréatite": "K85.9", + # Lithiases biliaires + "lithiase cholédoque": "K80.5", + "lithiase du cholédoque": "K80.5", + "calcul des canaux biliaires": "K80.5", + "lithiase vésiculaire": "K80.2", + "lithiases vésiculaires": "K80.2", + "vésicule lithiasique": "K80.2", + "colique hépatique": "K80.2", + # Cholécystite + "cholécystite aiguë": "K81.0", + "cholecystite aigue": "K81.0", + "angiocholite": "K83.0", + # Obésité + "obésité": "E66.0", + "obesite": "E66.0", + "surpoids": "E66.0", + # Réactions médicamenteuses + "éruption médicamenteuse": "L27.0", + "eruption medicamenteuse": "L27.0", + "éruption cutanée médicamenteuse": "L27.0", + "toxidermie": "L27.0", + "réaction au tramadol": "L27.0", + "allergie médicamenteuse": "T88.7", + # Douleur + "douleur abdominale": "R10.4", + "douleur hypochondre droit": "R10.1", + # Ictère + "ictère": "R17", + "jaunisse": "R17", + # HTA + "hypertension artérielle": "I10", + "hta": "I10", + # Diabète + "diabète type 2": "E11.9", + "diabète de type 2": "E11.9", + "diabète type 1": "E10.9", +} + +# Mapping actes → codes CCAM +CCAM_MAP: dict[str, str] = { + "cholécystectomie": "HMFC004", + "cholecystectomie": "HMFC004", + "cholécystectomie par cœlioscopie": "HMFC004", + "cholecystectomie par coelioscopie": "HMFC004", + "cholangiographie": "HHHE002", + "cholangiographie peropératoire": "HHHE002", + "cpre": "HHHE002", + "sphinctérotomie endoscopique": "HHHE003", + "scanner abdominal": "ZCQK002", + "tdm abdominal": "ZCQK002", + "échographie abdominale": "ZCQJ001", + "echo abdominale": "ZCQJ001", + "irm abdominale": "ZCQN001", +} + + +# Patterns DAS : (pattern_normalisé, label, code_fallback) +# Les patterns sont appliqués sur du texte normalisé (sans accents, lowercase) +_DAS_PATTERNS: list[tuple[str, str, str]] = [ + # Lithiases biliaires + (r"lithiase\s+(?:du\s+)?(?:bas\s+)?choledoque", "Lithiase du cholédoque", "K80.5"), + (r"vesicule\s+lithiasique|lithiases?\s+vesiculaire", "Lithiase vésiculaire", "K80.2"), + # Inflammation biliaire + (r"cholecystite\s+aigue", "Cholécystite aiguë", "K81.0"), + (r"angiocholite|cholangite", "Angiocholite", "K83.0"), + # Réactions médicamenteuses + (r"eruption\s+cutanee|toxidermie|reaction\s+au\s+tramadol", "Éruption cutanée médicamenteuse", "L27.0"), + # Cardiovasculaire + (r"hypertension\s+arterielle|\bhta\b", "Hypertension artérielle", "I10"), + (r"fibrillation\s+auriculaire|\bfa\b(?:\s+paroxystique)?|\bacfa\b", "Fibrillation auriculaire", "I48.9"), + (r"embolie\s+pulmonaire", "Embolie pulmonaire", "I26.9"), + (r"thrombose\s+veineuse\s+profonde|\btvp\b", "Thrombose veineuse profonde", "I80.2"), + # Métabolique + (r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+2|diabete\s+type\s*2", "Diabète de type 2", "E11.9"), + (r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+1|diabete\s+type\s*1", "Diabète de type 1", "E10.9"), + (r"dyslipidemie|hypercholesterolemie", "Dyslipidémie", "E78.5"), + (r"denutrition|malnutrition", "Dénutrition", "E46"), + # Infectieux + (r"pneumopathie|pneumonie", "Pneumopathie", "J18.9"), + (r"infection\s+urinaire|pyelonephrite", "Infection urinaire", "N39.0"), + (r"\bsepsis\b|septicemie|choc\s+septique", "Sepsis", "A41.9"), + # Rénal + (r"insuffisance\s+renale", "Insuffisance rénale", "N19"), + # Hématologique + (r"anemie", "Anémie", "D64.9"), + # Addictions + (r"tabagisme|tabac\s+actif", "Tabagisme", "F17.2"), + (r"ethylisme|alcoolisme|intoxication\s+ethylique", "Éthylisme", "F10.1"), +] + + +def _extract_diagnostics( + parsed: dict, + text: str, + dossier: DossierMedical, + edsnlp_result: Optional[EdsnlpResult] = None, +) -> None: + """Extrait le diagnostic principal et les diagnostics associés.""" + text_lower = text.lower() + + # Diagnostics codés depuis Trackare (prioritaires) + for diag in parsed.get("diagnostics", []): + texte = clean_diagnostic_text(diag.get("libelle", "")) + texte = expand_medical_abbreviations(texte) + is_principal = diag.get("type", "").lower() == "principal" + # Le DP Trackare est toujours accepté (pré-codé avec CIM-10 validé). + # Seuls les DAS passent le filtre anti-bruit. + if not is_principal and not is_valid_diagnostic_text(texte): + continue + d = Diagnostic( + texte=texte, + cim10_suggestion=diag.get("code_cim10"), + source="trackare", + ) + if is_principal: + dossier.diagnostic_principal = d + else: + dossier.diagnostics_associes.append(d) + + # Extraction du texte "Au total:" ou conclusion + conclusion = "" + m = re.search( + r"Au total\s*[::]?\s*(.*?)(?=\n\s*(?:Devenir|TTT|Sortie|$))", + text, + re.DOTALL | re.IGNORECASE, + ) + if m: + conclusion = m.group(1).strip() + + # Enrichissement via edsnlp (CIM-10) + edsnlp_codes: dict[str, str] = {} + if edsnlp_result: + for ent in edsnlp_result.cim10_entities: + if not ent.negation and not ent.hypothese: + edsnlp_codes[ent.code] = ent.texte + + # Si pas de DP depuis le codage, chercher dans le texte + if not dossier.diagnostic_principal: + # D'abord essayer le fallback regex (plus précis pour les patterns spécifiques) + dp = _find_diagnostic_principal(text_lower, conclusion) + if dp: + dossier.diagnostic_principal = dp + elif edsnlp_codes: + # Utiliser la première entité CIM-10 edsnlp comme DP + code, texte = next(iter(edsnlp_codes.items())) + texte_clean = texte.capitalize() + if is_valid_diagnostic_text(texte_clean): + dossier.diagnostic_principal = Diagnostic( + texte=texte_clean, cim10_suggestion=code, + source="edsnlp", + ) + + # Diagnostics associés depuis le texte (regex) + das = _find_diagnostics_associes(text_lower, conclusion, dossier) + das = [d for d in das if is_valid_diagnostic_text(d.texte)] + dossier.diagnostics_associes.extend(das) + + # Enrichissement DAS depuis edsnlp + if edsnlp_result: + existing_codes = set() + if dossier.diagnostic_principal: + existing_codes.add(dossier.diagnostic_principal.cim10_suggestion) + for d in dossier.diagnostics_associes: + existing_codes.add(d.cim10_suggestion) + + for ent in edsnlp_result.cim10_entities: + if ent.negation or ent.hypothese: + continue + texte = clean_diagnostic_text(ent.texte.capitalize()) + if not is_valid_diagnostic_text(texte): + continue + if ent.code not in existing_codes: + dossier.diagnostics_associes.append(Diagnostic( + texte=texte, + cim10_suggestion=ent.code, + source="edsnlp", + )) + existing_codes.add(ent.code) + + +def _find_diagnostic_principal(text_lower: str, conclusion: str) -> Diagnostic | None: + """Trouve le diagnostic principal dans le texte. + + Normalise le texte avant matching pour gérer les variations d'accents/casse. + """ + conclusion_norm = normalize_text(conclusion) + + # Chercher dans la conclusion d'abord via CIM10_MAP (domain override) + for terme, code in CIM10_MAP.items(): + if normalize_text(terme) in conclusion_norm: + return Diagnostic(texte=terme.capitalize(), cim10_suggestion=code, source="regex") + + text_norm = normalize_text(text_lower) + + # Patterns courants pour le DP (normalisés, sans accents) + dp_patterns = [ + r"pancreatite\s+aigue\s+(?:d'origine\s+)?lithiasique", + r"pancreatite\s+aigue\s+biliaire", + r"pancreatite\s+aigue", + ] + for pat in dp_patterns: + m = re.search(pat, text_norm) + if m: + matched = m.group(0) + code = _lookup_cim10(matched) + return Diagnostic(texte=matched.capitalize(), cim10_suggestion=code, source="regex") + + return None + + +def _find_diagnostics_associes( + text_lower: str, conclusion: str, dossier: DossierMedical +) -> list[Diagnostic]: + """Trouve les diagnostics associés. + + Utilise des patterns normalisés (sans accents) pour une détection robuste. + """ + das: list[Diagnostic] = [] + existing_codes = set() + if dossier.diagnostic_principal: + existing_codes.add(dossier.diagnostic_principal.cim10_suggestion) + for d in dossier.diagnostics_associes: + existing_codes.add(d.cim10_suggestion) + + text_norm = normalize_text(text_lower) + + # Patterns DAS + for pat, label, code in _DAS_PATTERNS: + if re.search(pat, text_norm) and code not in existing_codes: + das.append(Diagnostic(texte=label, cim10_suggestion=code, source="regex")) + existing_codes.add(code) + + # Obésité (IMC >= 30) — pattern spécial avec extraction de valeur + m = re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_norm) + if m: + imc_val = float(m.group(1).replace(",", ".")) + if imc_val >= 30 and "E66.0" not in existing_codes: + das.append(Diagnostic(texte=f"Obésité (IMC {imc_val})", cim10_suggestion="E66.0", source="regex")) + existing_codes.add("E66.0") + + return das + + +def _extract_actes(text: str, dossier: DossierMedical) -> None: + """Extrait les actes CCAM.""" + text_lower = text.lower() + + # Cholécystectomie par cœlioscopie + if re.search(r"chol[ée]cystectomie\s+par\s+c[oœ][ea]lioscopie", text_lower): + date = _find_act_date(text, r"chol[ée]cystectomie") + dossier.actes_ccam.append(ActeCCAM( + texte="Cholécystectomie par cœlioscopie", + code_ccam_suggestion="HMFC004", + date=date, + )) + elif re.search(r"chol[ée]cystectomie|cholecystectomie", text_lower): + date = _find_act_date(text, r"chol[ée]cystectomie|cholecystectomie") + dossier.actes_ccam.append(ActeCCAM( + texte="Cholécystectomie", + code_ccam_suggestion="HMFC004", + date=date, + )) + + # Cholangiographie + if re.search(r"cholangiographie", text_lower): + date = _find_act_date(text, r"cholangiographie") + dossier.actes_ccam.append(ActeCCAM( + texte="Cholangiographie peropératoire", + code_ccam_suggestion="HHHE002", + date=date, + )) + + # TDM + if re.search(r"(?:tdm|scanner|tomodensitométrie)", text_lower): + date = _find_act_date(text, r"(?:TDM|scanner)") + dossier.actes_ccam.append(ActeCCAM( + texte="TDM abdominal", + code_ccam_suggestion="ZCQK002", + date=date, + )) + + # Fallback : tenter le lookup CCAM dict pour les actes sans code + for acte in dossier.actes_ccam: + if not acte.code_ccam_suggestion: + code = ccam_lookup(acte.texte, domain_overrides=CCAM_MAP) + if code: + acte.code_ccam_suggestion = code + + +def _find_act_date(text: str, act_pattern: str) -> str | None: + """Trouve la date associée à un acte.""" + # Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY" + m = re.search( + rf"{act_pattern}.*?(?:le\s+)?(\d{{2}}/\d{{2}}(?:/\d{{4}})?)", + text, + re.IGNORECASE, + ) + if m: + return m.group(1) + + # Chercher dans la ligne d'observation juste avant + m = re.search( + rf"(\d{{2}}/\d{{2}}/\d{{4}}).*?{act_pattern}", + text, + re.IGNORECASE, + ) + if m: + return m.group(1) + return None + + +def _lookup_cim10(text: str) -> str | None: + """Cherche un code CIM-10 pour un texte donné. + + Utilise le dictionnaire complet (10 893 codes) avec CIM10_MAP en override prioritaire. + """ + return dict_lookup(text, domain_overrides=CIM10_MAP) diff --git a/src/medical/fusion.py b/src/medical/fusion.py index 3b727cf..b7af6d8 100644 --- a/src/medical/fusion.py +++ b/src/medical/fusion.py @@ -20,7 +20,7 @@ from ..config import ( Traitement, ) from ..medical.das_filter import is_valid_diagnostic_text, apply_semantic_dedup -from ..medical.cim10_extractor import _is_dp_family_redundant +from ..medical.validation_pipeline import _is_dp_family_redundant logger = logging.getLogger(__name__) diff --git a/src/medical/rag_search.py b/src/medical/rag_search.py index d57c25c..5d75828 100644 --- a/src/medical/rag_search.py +++ b/src/medical/rag_search.py @@ -12,7 +12,7 @@ from ..config import ( EMBEDDING_MODEL, RERANKER_MODEL, ) from .cim10_dict import normalize_code, validate_code as cim10_validate, fallback_parent_code -from .cim10_extractor import BIO_NORMALS +from .bio_normals import BIO_NORMALS from .clinical_context import build_enriched_context, format_enriched_context from .ccam_dict import validate_code as ccam_validate from .ollama_client import call_ollama, parse_json_response diff --git a/src/medical/validation_pipeline.py b/src/medical/validation_pipeline.py new file mode 100644 index 0000000..b9ac03b --- /dev/null +++ b/src/medical/validation_pipeline.py @@ -0,0 +1,349 @@ +"""Pipeline de validation et post-traitement des codes CIM-10 et CCAM.""" + +from __future__ import annotations + +import logging + +from .cim10_dict import lookup as dict_lookup, normalize_code, validate_code as cim10_validate +from .ccam_dict import validate_code as ccam_validate +from .das_filter import correct_known_miscodes, apply_semantic_dedup +from ..config import Diagnostic, DossierMedical +from .diagnostic_extraction import CIM10_MAP + +logger = logging.getLogger(__name__) + + +_INVALID_CODE_PATTERNS = {"aucun", "none", "n/a", "non_codable", "aucun_code_valide", "inconnu"} + + +def _fallback_cim10(texte: str) -> str | None: + """Tente de trouver un code CIM-10 via le dictionnaire à partir du texte diagnostic.""" + code = dict_lookup(texte, domain_overrides=CIM10_MAP) + if code: + is_valid, _ = cim10_validate(code) + if is_valid: + return code + return None + + +def _validate_ccam(dossier: DossierMedical) -> None: + """Valide les codes CCAM suggérés contre le dictionnaire officiel.""" + for acte in dossier.actes_ccam: + if not acte.code_ccam_suggestion: + acte.validite = "non_verifie" + continue + is_valid, desc = ccam_validate(acte.code_ccam_suggestion) + if is_valid: + acte.validite = "valide" + else: + acte.validite = "non_verifie" + dossier.alertes_codage.append( + f"CCAM {acte.code_ccam_suggestion} ({acte.texte}) : code absent du dictionnaire CCAM V81" + ) + + +def _validate_cim10(dossier: DossierMedical) -> None: + """Valide les codes CIM-10 suggérés par Ollama contre le dictionnaire.""" + diags: list[tuple[str, Diagnostic]] = [] + if dossier.diagnostic_principal: + diags.append(("DP", dossier.diagnostic_principal)) + for das in dossier.diagnostics_associes: + diags.append(("DAS", das)) + + for type_diag, diag in diags: + if not diag.cim10_suggestion: + continue + + # Rejeter les hallucinations + if diag.cim10_suggestion.lower().strip() in _INVALID_CODE_PATTERNS: + fallback = _fallback_cim10(diag.texte) + if fallback: + dossier.alertes_codage.append( + f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} » → fallback {fallback}" + ) + diag.cim10_suggestion = fallback + diag.cim10_confidence = "medium" + else: + dossier.alertes_codage.append( + f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} »" + ) + diag.cim10_suggestion = None + diag.cim10_confidence = None + continue + + # Normaliser le format (K810 → K81.0) + diag.cim10_suggestion = normalize_code(diag.cim10_suggestion) + + # Valider contre le dictionnaire + is_valid, label = cim10_validate(diag.cim10_suggestion) + if not is_valid: + fallback = _fallback_cim10(diag.texte) + if fallback: + dossier.alertes_codage.append( + f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code invalide → fallback {fallback}" + ) + diag.cim10_suggestion = fallback + diag.cim10_confidence = "medium" + else: + dossier.alertes_codage.append( + f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code absent du dictionnaire CIM-10" + ) + diag.cim10_confidence = "low" + + +def _apply_code_corrections(dossier: DossierMedical) -> None: + """Corrige les codes CIM-10 systématiquement mal attribués par le LLM.""" + all_diags = [] + if dossier.diagnostic_principal: + all_diags.append(dossier.diagnostic_principal) + all_diags.extend(dossier.diagnostics_associes) + + for diag in all_diags: + if not diag.cim10_suggestion: + continue + corrected = correct_known_miscodes(diag.cim10_suggestion, diag.texte) + if corrected: + logger.info(" Code corrigé : %s → %s pour « %s »", diag.cim10_suggestion, corrected, diag.texte) + diag.cim10_suggestion = corrected + + +def _apply_exclusion_rules(dossier: DossierMedical) -> None: + """Applique les règles d'exclusion symptôme vs diagnostic précis.""" + try: + from .exclusion_rules import check_exclusions + result = check_exclusions(dossier.diagnostic_principal, dossier.diagnostics_associes) + dossier.diagnostics_associes = result.cleaned_das + dossier.alertes_codage.extend(result.warnings) + if result.excluded: + logger.info( + " Exclusions : %d DAS symptomatiques exclus", + len(result.excluded), + ) + except Exception: + logger.warning("Erreur lors de l'application des règles d'exclusion", exc_info=True) + + +def _apply_severity_rules(dossier: DossierMedical) -> None: + """Enrichit les diagnostics avec les informations de sévérité heuristique.""" + try: + from .severity import enrich_dossier_severity + alertes, _cma_count, _cms_count = enrich_dossier_severity( + dossier.diagnostic_principal, dossier.diagnostics_associes, + ) + dossier.alertes_codage.extend(alertes) + except Exception: + logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True) + + +def _apply_noncumul_rules(dossier: DossierMedical) -> None: + """Détecte les incompatibilités de non-cumul entre actes CCAM.""" + try: + from .ccam_noncumul import check_noncumul + alertes = check_noncumul(dossier.actes_ccam) + dossier.alertes_codage.extend(alertes) + except Exception: + logger.warning("Erreur lors de la vérification du non-cumul CCAM", exc_info=True) + + +def _is_dp_family_redundant(das_code: str, dp_code: str) -> bool: + """True si le DAS est redondant avec le DP (même code, parent/enfant, ou même famille).""" + if das_code == dp_code: + return True + # Relation parent/enfant → toujours redondant + das_norm = das_code.replace(".", "") + dp_norm = dp_code.replace(".", "") + if das_norm.startswith(dp_norm) or dp_norm.startswith(das_norm): + return True + # Même famille 3 chars, sauf exceptions + dp_family = dp_code[:3] + if das_code[:3] == dp_family: + # S/T (trauma) : sites différents → garder + if dp_family[0] in ("S", "T"): + return False + # E10-E14 (diabète) : complications différentes → garder + if dp_family[0] == "E" and dp_family[1:].isdigit() and 10 <= int(dp_family[1:]) <= 14: + return False + return True + return False + + +def _remove_das_equal_dp(dossier: DossierMedical) -> None: + """Retire les DAS redondants avec le DP (même code, famille, ou sémantique).""" + dp_code = dossier.diagnostic_principal.cim10_suggestion if dossier.diagnostic_principal else None + if not dp_code: + return + before = len(dossier.diagnostics_associes) + dossier.diagnostics_associes = [ + d for d in dossier.diagnostics_associes + if not d.cim10_suggestion or not _is_dp_family_redundant(d.cim10_suggestion, dp_code) + ] + removed = before - len(dossier.diagnostics_associes) + if removed: + logger.info(" DAS≈DP : %d DAS retiré(s) (famille %s du DP)", removed, dp_code[:3]) + + # Redondances sémantiques entre DAS + dossier.diagnostics_associes = apply_semantic_dedup(dossier.diagnostics_associes) + + +def _track_item(item, search_key: str, page_tracker, search_text: str) -> bool: + """Cherche la page source et l'extrait pour un item avec source_page/source_excerpt.""" + if item.source_page is not None: + return False + if not search_key: + return False + page = page_tracker.find_page_for_text(search_key, search_text) + if page: + item.source_page = page + item.source_excerpt = page_tracker.extract_excerpt(search_key, search_text) + return True + return False + + +def _apply_source_tracking(dossier: DossierMedical, page_tracker, search_text: str) -> None: + """Ajoute la traçabilité source (page + extrait) à tous les éléments du dossier. + + Cherche le texte de chaque élément dans le texte source pour retrouver + la page d'origine et extraire un passage contextualisé. + """ + tracked = 0 + total = 0 + + # Diagnostics (DP + DAS) + all_diags: list[Diagnostic] = [] + if dossier.diagnostic_principal: + all_diags.append(dossier.diagnostic_principal) + all_diags.extend(dossier.diagnostics_associes) + + for diag in all_diags: + total += 1 + if _track_item(diag, diag.texte, page_tracker, search_text): + tracked += 1 + + # Biologie + for b in dossier.biologie_cle: + total += 1 + search_key = f"{b.test}: {b.valeur}" if b.valeur else b.test + if _track_item(b, search_key, page_tracker, search_text): + tracked += 1 + elif b.valeur and _track_item(b, b.test, page_tracker, search_text): + tracked += 1 + + # Imagerie + for img in dossier.imagerie: + total += 1 + search_key = img.type + if _track_item(img, search_key, page_tracker, search_text): + tracked += 1 + elif img.conclusion and _track_item(img, img.conclusion[:50], page_tracker, search_text): + tracked += 1 + + # Traitements + for t in dossier.traitements_sortie: + total += 1 + if _track_item(t, t.medicament, page_tracker, search_text): + tracked += 1 + + # Actes CCAM + for a in dossier.actes_ccam: + total += 1 + if _track_item(a, a.texte, page_tracker, search_text): + tracked += 1 + + # Antécédents + for ant in dossier.antecedents: + total += 1 + if _track_item(ant, ant.texte, page_tracker, search_text): + tracked += 1 + + # Complications + for comp in dossier.complications: + total += 1 + if _track_item(comp, comp.texte, page_tracker, search_text): + tracked += 1 + + if tracked: + logger.info(" Traçabilité source : %d/%d éléments localisés", tracked, total) + + +def _validate_justifications(dossier: DossierMedical) -> None: + """Validation croisée de tous les diagnostics via un appel LLM unique. + + Vérifie la cohérence, les preuves cliniques et la spécificité des codes. + Ajuste la confiance si la justification est faible et ajoute des alertes QC. + """ + try: + from .ollama_client import call_ollama + from .clinical_context import build_enriched_context, format_enriched_context + except ImportError: + logger.warning("Module clinical_context non disponible pour la validation QC") + return + + all_diags: list[tuple[str, Diagnostic]] = [] + if dossier.diagnostic_principal: + all_diags.append(("DP", dossier.diagnostic_principal)) + for das in dossier.diagnostics_associes: + all_diags.append(("DAS", das)) + + if not all_diags: + return + + # Construire le résumé des codes à valider + codes_section = "" + for i, (type_diag, diag) in enumerate(all_diags, 1): + code = diag.cim10_suggestion or "?" + justif = (diag.justification or "")[:150] + preuves = ", ".join(p.element for p in diag.preuves_cliniques[:3]) or "aucune" + codes_section += f"{i}. [{type_diag}] {code} — {diag.texte}\n" + codes_section += f" Justification: {justif}\n" + codes_section += f" Preuves: {preuves}\n\n" + + ctx = build_enriched_context(dossier) + ctx_str = format_enriched_context(ctx) + + from ..prompts import QC_VALIDATION + prompt = QC_VALIDATION.format(ctx_str=ctx_str, codes_section=codes_section) + + try: + result = call_ollama(prompt, temperature=0.1, max_tokens=2500, role="qc") + except Exception: + logger.warning("Erreur lors de l'appel Ollama pour validation QC", exc_info=True) + return + + if result is None: + return + + # Appliquer les ajustements + validations = result.get("validations", []) + for v in validations: + if not isinstance(v, dict): + continue + num = v.get("numero") + if not isinstance(num, int) or num < 1 or num > len(all_diags): + continue + type_diag, diag = all_diags[num - 1] + conf = v.get("confidence_recommandee") + verdict = v.get("verdict") + commentaire = v.get("commentaire", "") + + if conf in ("high", "medium", "low") and conf != diag.cim10_confidence: + old = diag.cim10_confidence + diag.cim10_confidence = conf + if old and conf != old: + dossier.alertes_codage.append( + f"QC: {type_diag} {diag.cim10_suggestion} confiance {old}\u2192{conf} \u2014 {commentaire}" + ) + + if verdict == "supprimer" and type_diag == "DAS": + dossier.alertes_codage.append( + f"QC: DAS {diag.cim10_suggestion} ({diag.texte}) à reconsidérer \u2014 {commentaire}" + ) + + alertes_globales = result.get("alertes_globales", []) + if isinstance(alertes_globales, str): + alertes_globales = [alertes_globales] + for a in alertes_globales: + if isinstance(a, str) and a.strip(): + dossier.alertes_codage.append(f"QC: {a}") + + logger.info(" QC batch : %d validations, %d alertes globales", + len(validations), len(alertes_globales))