Files
t2a_v2/src/medical/diagnostic_extraction.py
dom e760b12961 refactor: split cim10_extractor → bio_normals, bio_extraction, diagnostic_extraction, validation_pipeline
Découpe le monolithe cim10_extractor.py (1356L) en 4 modules spécialisés :
- bio_normals.py : constante BIO_NORMALS + _is_abnormal() (feuille)
- bio_extraction.py : extraction biologie structurée
- diagnostic_extraction.py : extraction DP/DAS/actes CCAM
- validation_pipeline.py : validation CIM-10/CCAM + règles métier

Le cim10_extractor.py reste orchestrateur (~450L) avec re-exports
backward-compat. Imports mis à jour dans clinical_context, rag_search,
fusion. 748 tests passent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 10:06:18 +01:00

348 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Extraction des diagnostics (DP, DAS) et actes CCAM depuis le texte médical."""
from __future__ import annotations
import logging
import re
from typing import Optional
from .cim10_dict import lookup as dict_lookup, normalize_text, validate_code as cim10_validate
from .ccam_dict import lookup as ccam_lookup, validate_code as ccam_validate
from .das_filter import clean_diagnostic_text, is_valid_diagnostic_text, expand_medical_abbreviations
from ..config import ActeCCAM, Diagnostic, DossierMedical
try:
from .edsnlp_pipeline import EdsnlpResult
except ImportError:
EdsnlpResult = None # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
# Mapping diagnostics fréquents → codes CIM-10
CIM10_MAP: dict[str, str] = {
# Pancréatite
"pancréatite aiguë biliaire": "K85.1",
"pancréatite aigue biliaire": "K85.1",
"pancréatite aiguë lithiasique": "K85.1",
"pancréatite aigue lithiasique": "K85.1",
"pancréatite aiguë": "K85.9",
"pancréatite aigue": "K85.9",
"pancréatite": "K85.9",
# Lithiases biliaires
"lithiase cholédoque": "K80.5",
"lithiase du cholédoque": "K80.5",
"calcul des canaux biliaires": "K80.5",
"lithiase vésiculaire": "K80.2",
"lithiases vésiculaires": "K80.2",
"vésicule lithiasique": "K80.2",
"colique hépatique": "K80.2",
# Cholécystite
"cholécystite aiguë": "K81.0",
"cholecystite aigue": "K81.0",
"angiocholite": "K83.0",
# Obésité
"obésité": "E66.0",
"obesite": "E66.0",
"surpoids": "E66.0",
# Réactions médicamenteuses
"éruption médicamenteuse": "L27.0",
"eruption medicamenteuse": "L27.0",
"éruption cutanée médicamenteuse": "L27.0",
"toxidermie": "L27.0",
"réaction au tramadol": "L27.0",
"allergie médicamenteuse": "T88.7",
# Douleur
"douleur abdominale": "R10.4",
"douleur hypochondre droit": "R10.1",
# Ictère
"ictère": "R17",
"jaunisse": "R17",
# HTA
"hypertension artérielle": "I10",
"hta": "I10",
# Diabète
"diabète type 2": "E11.9",
"diabète de type 2": "E11.9",
"diabète type 1": "E10.9",
}
# Mapping actes → codes CCAM
CCAM_MAP: dict[str, str] = {
"cholécystectomie": "HMFC004",
"cholecystectomie": "HMFC004",
"cholécystectomie par cœlioscopie": "HMFC004",
"cholecystectomie par coelioscopie": "HMFC004",
"cholangiographie": "HHHE002",
"cholangiographie peropératoire": "HHHE002",
"cpre": "HHHE002",
"sphinctérotomie endoscopique": "HHHE003",
"scanner abdominal": "ZCQK002",
"tdm abdominal": "ZCQK002",
"échographie abdominale": "ZCQJ001",
"echo abdominale": "ZCQJ001",
"irm abdominale": "ZCQN001",
}
# Patterns DAS : (pattern_normalisé, label, code_fallback)
# Les patterns sont appliqués sur du texte normalisé (sans accents, lowercase)
_DAS_PATTERNS: list[tuple[str, str, str]] = [
# Lithiases biliaires
(r"lithiase\s+(?:du\s+)?(?:bas\s+)?choledoque", "Lithiase du cholédoque", "K80.5"),
(r"vesicule\s+lithiasique|lithiases?\s+vesiculaire", "Lithiase vésiculaire", "K80.2"),
# Inflammation biliaire
(r"cholecystite\s+aigue", "Cholécystite aiguë", "K81.0"),
(r"angiocholite|cholangite", "Angiocholite", "K83.0"),
# Réactions médicamenteuses
(r"eruption\s+cutanee|toxidermie|reaction\s+au\s+tramadol", "Éruption cutanée médicamenteuse", "L27.0"),
# Cardiovasculaire
(r"hypertension\s+arterielle|\bhta\b", "Hypertension artérielle", "I10"),
(r"fibrillation\s+auriculaire|\bfa\b(?:\s+paroxystique)?|\bacfa\b", "Fibrillation auriculaire", "I48.9"),
(r"embolie\s+pulmonaire", "Embolie pulmonaire", "I26.9"),
(r"thrombose\s+veineuse\s+profonde|\btvp\b", "Thrombose veineuse profonde", "I80.2"),
# Métabolique
(r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+2|diabete\s+type\s*2", "Diabète de type 2", "E11.9"),
(r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+1|diabete\s+type\s*1", "Diabète de type 1", "E10.9"),
(r"dyslipidemie|hypercholesterolemie", "Dyslipidémie", "E78.5"),
(r"denutrition|malnutrition", "Dénutrition", "E46"),
# Infectieux
(r"pneumopathie|pneumonie", "Pneumopathie", "J18.9"),
(r"infection\s+urinaire|pyelonephrite", "Infection urinaire", "N39.0"),
(r"\bsepsis\b|septicemie|choc\s+septique", "Sepsis", "A41.9"),
# Rénal
(r"insuffisance\s+renale", "Insuffisance rénale", "N19"),
# Hématologique
(r"anemie", "Anémie", "D64.9"),
# Addictions
(r"tabagisme|tabac\s+actif", "Tabagisme", "F17.2"),
(r"ethylisme|alcoolisme|intoxication\s+ethylique", "Éthylisme", "F10.1"),
]
def _extract_diagnostics(
parsed: dict,
text: str,
dossier: DossierMedical,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> None:
"""Extrait le diagnostic principal et les diagnostics associés."""
text_lower = text.lower()
# Diagnostics codés depuis Trackare (prioritaires)
for diag in parsed.get("diagnostics", []):
texte = clean_diagnostic_text(diag.get("libelle", ""))
texte = expand_medical_abbreviations(texte)
is_principal = diag.get("type", "").lower() == "principal"
# Le DP Trackare est toujours accepté (pré-codé avec CIM-10 validé).
# Seuls les DAS passent le filtre anti-bruit.
if not is_principal and not is_valid_diagnostic_text(texte):
continue
d = Diagnostic(
texte=texte,
cim10_suggestion=diag.get("code_cim10"),
source="trackare",
)
if is_principal:
dossier.diagnostic_principal = d
else:
dossier.diagnostics_associes.append(d)
# Extraction du texte "Au total:" ou conclusion
conclusion = ""
m = re.search(
r"Au total\s*[:]?\s*(.*?)(?=\n\s*(?:Devenir|TTT|Sortie|$))",
text,
re.DOTALL | re.IGNORECASE,
)
if m:
conclusion = m.group(1).strip()
# Enrichissement via edsnlp (CIM-10)
edsnlp_codes: dict[str, str] = {}
if edsnlp_result:
for ent in edsnlp_result.cim10_entities:
if not ent.negation and not ent.hypothese:
edsnlp_codes[ent.code] = ent.texte
# Si pas de DP depuis le codage, chercher dans le texte
if not dossier.diagnostic_principal:
# D'abord essayer le fallback regex (plus précis pour les patterns spécifiques)
dp = _find_diagnostic_principal(text_lower, conclusion)
if dp:
dossier.diagnostic_principal = dp
elif edsnlp_codes:
# Utiliser la première entité CIM-10 edsnlp comme DP
code, texte = next(iter(edsnlp_codes.items()))
texte_clean = texte.capitalize()
if is_valid_diagnostic_text(texte_clean):
dossier.diagnostic_principal = Diagnostic(
texte=texte_clean, cim10_suggestion=code,
source="edsnlp",
)
# Diagnostics associés depuis le texte (regex)
das = _find_diagnostics_associes(text_lower, conclusion, dossier)
das = [d for d in das if is_valid_diagnostic_text(d.texte)]
dossier.diagnostics_associes.extend(das)
# Enrichissement DAS depuis edsnlp
if edsnlp_result:
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
for ent in edsnlp_result.cim10_entities:
if ent.negation or ent.hypothese:
continue
texte = clean_diagnostic_text(ent.texte.capitalize())
if not is_valid_diagnostic_text(texte):
continue
if ent.code not in existing_codes:
dossier.diagnostics_associes.append(Diagnostic(
texte=texte,
cim10_suggestion=ent.code,
source="edsnlp",
))
existing_codes.add(ent.code)
def _find_diagnostic_principal(text_lower: str, conclusion: str) -> Diagnostic | None:
"""Trouve le diagnostic principal dans le texte.
Normalise le texte avant matching pour gérer les variations d'accents/casse.
"""
conclusion_norm = normalize_text(conclusion)
# Chercher dans la conclusion d'abord via CIM10_MAP (domain override)
for terme, code in CIM10_MAP.items():
if normalize_text(terme) in conclusion_norm:
return Diagnostic(texte=terme.capitalize(), cim10_suggestion=code, source="regex")
text_norm = normalize_text(text_lower)
# Patterns courants pour le DP (normalisés, sans accents)
dp_patterns = [
r"pancreatite\s+aigue\s+(?:d'origine\s+)?lithiasique",
r"pancreatite\s+aigue\s+biliaire",
r"pancreatite\s+aigue",
]
for pat in dp_patterns:
m = re.search(pat, text_norm)
if m:
matched = m.group(0)
code = _lookup_cim10(matched)
return Diagnostic(texte=matched.capitalize(), cim10_suggestion=code, source="regex")
return None
def _find_diagnostics_associes(
text_lower: str, conclusion: str, dossier: DossierMedical
) -> list[Diagnostic]:
"""Trouve les diagnostics associés.
Utilise des patterns normalisés (sans accents) pour une détection robuste.
"""
das: list[Diagnostic] = []
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
text_norm = normalize_text(text_lower)
# Patterns DAS
for pat, label, code in _DAS_PATTERNS:
if re.search(pat, text_norm) and code not in existing_codes:
das.append(Diagnostic(texte=label, cim10_suggestion=code, source="regex"))
existing_codes.add(code)
# Obésité (IMC >= 30) — pattern spécial avec extraction de valeur
m = re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_norm)
if m:
imc_val = float(m.group(1).replace(",", "."))
if imc_val >= 30 and "E66.0" not in existing_codes:
das.append(Diagnostic(texte=f"Obésité (IMC {imc_val})", cim10_suggestion="E66.0", source="regex"))
existing_codes.add("E66.0")
return das
def _extract_actes(text: str, dossier: DossierMedical) -> None:
"""Extrait les actes CCAM."""
text_lower = text.lower()
# Cholécystectomie par cœlioscopie
if re.search(r"chol[ée]cystectomie\s+par\s+c[oœ][ea]lioscopie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie par cœlioscopie",
code_ccam_suggestion="HMFC004",
date=date,
))
elif re.search(r"chol[ée]cystectomie|cholecystectomie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie|cholecystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie",
code_ccam_suggestion="HMFC004",
date=date,
))
# Cholangiographie
if re.search(r"cholangiographie", text_lower):
date = _find_act_date(text, r"cholangiographie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholangiographie peropératoire",
code_ccam_suggestion="HHHE002",
date=date,
))
# TDM
if re.search(r"(?:tdm|scanner|tomodensitométrie)", text_lower):
date = _find_act_date(text, r"(?:TDM|scanner)")
dossier.actes_ccam.append(ActeCCAM(
texte="TDM abdominal",
code_ccam_suggestion="ZCQK002",
date=date,
))
# Fallback : tenter le lookup CCAM dict pour les actes sans code
for acte in dossier.actes_ccam:
if not acte.code_ccam_suggestion:
code = ccam_lookup(acte.texte, domain_overrides=CCAM_MAP)
if code:
acte.code_ccam_suggestion = code
def _find_act_date(text: str, act_pattern: str) -> str | None:
"""Trouve la date associée à un acte."""
# Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY"
m = re.search(
rf"{act_pattern}.*?(?:le\s+)?(\d{{2}}/\d{{2}}(?:/\d{{4}})?)",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
# Chercher dans la ligne d'observation juste avant
m = re.search(
rf"(\d{{2}}/\d{{2}}/\d{{4}}).*?{act_pattern}",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
return None
def _lookup_cim10(text: str) -> str | None:
"""Cherche un code CIM-10 pour un texte donné.
Utilise le dictionnaire complet (10 893 codes) avec CIM10_MAP en override prioritaire.
"""
return dict_lookup(text, domain_overrides=CIM10_MAP)