refactor: split cim10_extractor → bio_normals, bio_extraction, diagnostic_extraction, validation_pipeline

Découpe le monolithe cim10_extractor.py (1356L) en 4 modules spécialisés :
- bio_normals.py : constante BIO_NORMALS + _is_abnormal() (feuille)
- bio_extraction.py : extraction biologie structurée
- diagnostic_extraction.py : extraction DP/DAS/actes CCAM
- validation_pipeline.py : validation CIM-10/CCAM + règles métier

Le cim10_extractor.py reste orchestrateur (~450L) avec re-exports
backward-compat. Imports mis à jour dans clinical_context, rag_search,
fusion. 748 tests passent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-20 10:06:18 +01:00
parent 5823eb6b53
commit e760b12961
8 changed files with 957 additions and 889 deletions

View File

@@ -0,0 +1,347 @@
"""Extraction des diagnostics (DP, DAS) et actes CCAM depuis le texte médical."""
from __future__ import annotations
import logging
import re
from typing import Optional
from .cim10_dict import lookup as dict_lookup, normalize_text, validate_code as cim10_validate
from .ccam_dict import lookup as ccam_lookup, validate_code as ccam_validate
from .das_filter import clean_diagnostic_text, is_valid_diagnostic_text, expand_medical_abbreviations
from ..config import ActeCCAM, Diagnostic, DossierMedical
try:
from .edsnlp_pipeline import EdsnlpResult
except ImportError:
EdsnlpResult = None # type: ignore[assignment,misc]
logger = logging.getLogger(__name__)
# Mapping diagnostics fréquents → codes CIM-10
CIM10_MAP: dict[str, str] = {
# Pancréatite
"pancréatite aiguë biliaire": "K85.1",
"pancréatite aigue biliaire": "K85.1",
"pancréatite aiguë lithiasique": "K85.1",
"pancréatite aigue lithiasique": "K85.1",
"pancréatite aiguë": "K85.9",
"pancréatite aigue": "K85.9",
"pancréatite": "K85.9",
# Lithiases biliaires
"lithiase cholédoque": "K80.5",
"lithiase du cholédoque": "K80.5",
"calcul des canaux biliaires": "K80.5",
"lithiase vésiculaire": "K80.2",
"lithiases vésiculaires": "K80.2",
"vésicule lithiasique": "K80.2",
"colique hépatique": "K80.2",
# Cholécystite
"cholécystite aiguë": "K81.0",
"cholecystite aigue": "K81.0",
"angiocholite": "K83.0",
# Obésité
"obésité": "E66.0",
"obesite": "E66.0",
"surpoids": "E66.0",
# Réactions médicamenteuses
"éruption médicamenteuse": "L27.0",
"eruption medicamenteuse": "L27.0",
"éruption cutanée médicamenteuse": "L27.0",
"toxidermie": "L27.0",
"réaction au tramadol": "L27.0",
"allergie médicamenteuse": "T88.7",
# Douleur
"douleur abdominale": "R10.4",
"douleur hypochondre droit": "R10.1",
# Ictère
"ictère": "R17",
"jaunisse": "R17",
# HTA
"hypertension artérielle": "I10",
"hta": "I10",
# Diabète
"diabète type 2": "E11.9",
"diabète de type 2": "E11.9",
"diabète type 1": "E10.9",
}
# Mapping actes → codes CCAM
CCAM_MAP: dict[str, str] = {
"cholécystectomie": "HMFC004",
"cholecystectomie": "HMFC004",
"cholécystectomie par cœlioscopie": "HMFC004",
"cholecystectomie par coelioscopie": "HMFC004",
"cholangiographie": "HHHE002",
"cholangiographie peropératoire": "HHHE002",
"cpre": "HHHE002",
"sphinctérotomie endoscopique": "HHHE003",
"scanner abdominal": "ZCQK002",
"tdm abdominal": "ZCQK002",
"échographie abdominale": "ZCQJ001",
"echo abdominale": "ZCQJ001",
"irm abdominale": "ZCQN001",
}
# Patterns DAS : (pattern_normalisé, label, code_fallback)
# Les patterns sont appliqués sur du texte normalisé (sans accents, lowercase)
_DAS_PATTERNS: list[tuple[str, str, str]] = [
# Lithiases biliaires
(r"lithiase\s+(?:du\s+)?(?:bas\s+)?choledoque", "Lithiase du cholédoque", "K80.5"),
(r"vesicule\s+lithiasique|lithiases?\s+vesiculaire", "Lithiase vésiculaire", "K80.2"),
# Inflammation biliaire
(r"cholecystite\s+aigue", "Cholécystite aiguë", "K81.0"),
(r"angiocholite|cholangite", "Angiocholite", "K83.0"),
# Réactions médicamenteuses
(r"eruption\s+cutanee|toxidermie|reaction\s+au\s+tramadol", "Éruption cutanée médicamenteuse", "L27.0"),
# Cardiovasculaire
(r"hypertension\s+arterielle|\bhta\b", "Hypertension artérielle", "I10"),
(r"fibrillation\s+auriculaire|\bfa\b(?:\s+paroxystique)?|\bacfa\b", "Fibrillation auriculaire", "I48.9"),
(r"embolie\s+pulmonaire", "Embolie pulmonaire", "I26.9"),
(r"thrombose\s+veineuse\s+profonde|\btvp\b", "Thrombose veineuse profonde", "I80.2"),
# Métabolique
(r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+2|diabete\s+type\s*2", "Diabète de type 2", "E11.9"),
(r"diabete\s+(?:sucre\s+)?(?:de\s+)?type\s+1|diabete\s+type\s*1", "Diabète de type 1", "E10.9"),
(r"dyslipidemie|hypercholesterolemie", "Dyslipidémie", "E78.5"),
(r"denutrition|malnutrition", "Dénutrition", "E46"),
# Infectieux
(r"pneumopathie|pneumonie", "Pneumopathie", "J18.9"),
(r"infection\s+urinaire|pyelonephrite", "Infection urinaire", "N39.0"),
(r"\bsepsis\b|septicemie|choc\s+septique", "Sepsis", "A41.9"),
# Rénal
(r"insuffisance\s+renale", "Insuffisance rénale", "N19"),
# Hématologique
(r"anemie", "Anémie", "D64.9"),
# Addictions
(r"tabagisme|tabac\s+actif", "Tabagisme", "F17.2"),
(r"ethylisme|alcoolisme|intoxication\s+ethylique", "Éthylisme", "F10.1"),
]
def _extract_diagnostics(
parsed: dict,
text: str,
dossier: DossierMedical,
edsnlp_result: Optional[EdsnlpResult] = None,
) -> None:
"""Extrait le diagnostic principal et les diagnostics associés."""
text_lower = text.lower()
# Diagnostics codés depuis Trackare (prioritaires)
for diag in parsed.get("diagnostics", []):
texte = clean_diagnostic_text(diag.get("libelle", ""))
texte = expand_medical_abbreviations(texte)
is_principal = diag.get("type", "").lower() == "principal"
# Le DP Trackare est toujours accepté (pré-codé avec CIM-10 validé).
# Seuls les DAS passent le filtre anti-bruit.
if not is_principal and not is_valid_diagnostic_text(texte):
continue
d = Diagnostic(
texte=texte,
cim10_suggestion=diag.get("code_cim10"),
source="trackare",
)
if is_principal:
dossier.diagnostic_principal = d
else:
dossier.diagnostics_associes.append(d)
# Extraction du texte "Au total:" ou conclusion
conclusion = ""
m = re.search(
r"Au total\s*[:]?\s*(.*?)(?=\n\s*(?:Devenir|TTT|Sortie|$))",
text,
re.DOTALL | re.IGNORECASE,
)
if m:
conclusion = m.group(1).strip()
# Enrichissement via edsnlp (CIM-10)
edsnlp_codes: dict[str, str] = {}
if edsnlp_result:
for ent in edsnlp_result.cim10_entities:
if not ent.negation and not ent.hypothese:
edsnlp_codes[ent.code] = ent.texte
# Si pas de DP depuis le codage, chercher dans le texte
if not dossier.diagnostic_principal:
# D'abord essayer le fallback regex (plus précis pour les patterns spécifiques)
dp = _find_diagnostic_principal(text_lower, conclusion)
if dp:
dossier.diagnostic_principal = dp
elif edsnlp_codes:
# Utiliser la première entité CIM-10 edsnlp comme DP
code, texte = next(iter(edsnlp_codes.items()))
texte_clean = texte.capitalize()
if is_valid_diagnostic_text(texte_clean):
dossier.diagnostic_principal = Diagnostic(
texte=texte_clean, cim10_suggestion=code,
source="edsnlp",
)
# Diagnostics associés depuis le texte (regex)
das = _find_diagnostics_associes(text_lower, conclusion, dossier)
das = [d for d in das if is_valid_diagnostic_text(d.texte)]
dossier.diagnostics_associes.extend(das)
# Enrichissement DAS depuis edsnlp
if edsnlp_result:
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
for ent in edsnlp_result.cim10_entities:
if ent.negation or ent.hypothese:
continue
texte = clean_diagnostic_text(ent.texte.capitalize())
if not is_valid_diagnostic_text(texte):
continue
if ent.code not in existing_codes:
dossier.diagnostics_associes.append(Diagnostic(
texte=texte,
cim10_suggestion=ent.code,
source="edsnlp",
))
existing_codes.add(ent.code)
def _find_diagnostic_principal(text_lower: str, conclusion: str) -> Diagnostic | None:
"""Trouve le diagnostic principal dans le texte.
Normalise le texte avant matching pour gérer les variations d'accents/casse.
"""
conclusion_norm = normalize_text(conclusion)
# Chercher dans la conclusion d'abord via CIM10_MAP (domain override)
for terme, code in CIM10_MAP.items():
if normalize_text(terme) in conclusion_norm:
return Diagnostic(texte=terme.capitalize(), cim10_suggestion=code, source="regex")
text_norm = normalize_text(text_lower)
# Patterns courants pour le DP (normalisés, sans accents)
dp_patterns = [
r"pancreatite\s+aigue\s+(?:d'origine\s+)?lithiasique",
r"pancreatite\s+aigue\s+biliaire",
r"pancreatite\s+aigue",
]
for pat in dp_patterns:
m = re.search(pat, text_norm)
if m:
matched = m.group(0)
code = _lookup_cim10(matched)
return Diagnostic(texte=matched.capitalize(), cim10_suggestion=code, source="regex")
return None
def _find_diagnostics_associes(
text_lower: str, conclusion: str, dossier: DossierMedical
) -> list[Diagnostic]:
"""Trouve les diagnostics associés.
Utilise des patterns normalisés (sans accents) pour une détection robuste.
"""
das: list[Diagnostic] = []
existing_codes = set()
if dossier.diagnostic_principal:
existing_codes.add(dossier.diagnostic_principal.cim10_suggestion)
for d in dossier.diagnostics_associes:
existing_codes.add(d.cim10_suggestion)
text_norm = normalize_text(text_lower)
# Patterns DAS
for pat, label, code in _DAS_PATTERNS:
if re.search(pat, text_norm) and code not in existing_codes:
das.append(Diagnostic(texte=label, cim10_suggestion=code, source="regex"))
existing_codes.add(code)
# Obésité (IMC >= 30) — pattern spécial avec extraction de valeur
m = re.search(r"imc\s*[:=]?\s*(\d{2,3}[.,]\d+)", text_norm)
if m:
imc_val = float(m.group(1).replace(",", "."))
if imc_val >= 30 and "E66.0" not in existing_codes:
das.append(Diagnostic(texte=f"Obésité (IMC {imc_val})", cim10_suggestion="E66.0", source="regex"))
existing_codes.add("E66.0")
return das
def _extract_actes(text: str, dossier: DossierMedical) -> None:
"""Extrait les actes CCAM."""
text_lower = text.lower()
# Cholécystectomie par cœlioscopie
if re.search(r"chol[ée]cystectomie\s+par\s+c[oœ][ea]lioscopie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie par cœlioscopie",
code_ccam_suggestion="HMFC004",
date=date,
))
elif re.search(r"chol[ée]cystectomie|cholecystectomie", text_lower):
date = _find_act_date(text, r"chol[ée]cystectomie|cholecystectomie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholécystectomie",
code_ccam_suggestion="HMFC004",
date=date,
))
# Cholangiographie
if re.search(r"cholangiographie", text_lower):
date = _find_act_date(text, r"cholangiographie")
dossier.actes_ccam.append(ActeCCAM(
texte="Cholangiographie peropératoire",
code_ccam_suggestion="HHHE002",
date=date,
))
# TDM
if re.search(r"(?:tdm|scanner|tomodensitométrie)", text_lower):
date = _find_act_date(text, r"(?:TDM|scanner)")
dossier.actes_ccam.append(ActeCCAM(
texte="TDM abdominal",
code_ccam_suggestion="ZCQK002",
date=date,
))
# Fallback : tenter le lookup CCAM dict pour les actes sans code
for acte in dossier.actes_ccam:
if not acte.code_ccam_suggestion:
code = ccam_lookup(acte.texte, domain_overrides=CCAM_MAP)
if code:
acte.code_ccam_suggestion = code
def _find_act_date(text: str, act_pattern: str) -> str | None:
"""Trouve la date associée à un acte."""
# Chercher "acte le DD/MM" ou "acte le DD/MM/YYYY"
m = re.search(
rf"{act_pattern}.*?(?:le\s+)?(\d{{2}}/\d{{2}}(?:/\d{{4}})?)",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
# Chercher dans la ligne d'observation juste avant
m = re.search(
rf"(\d{{2}}/\d{{2}}/\d{{4}}).*?{act_pattern}",
text,
re.IGNORECASE,
)
if m:
return m.group(1)
return None
def _lookup_cim10(text: str) -> str | None:
"""Cherche un code CIM-10 pour un texte donné.
Utilise le dictionnaire complet (10 893 codes) avec CIM10_MAP en override prioritaire.
"""
return dict_lookup(text, domain_overrides=CIM10_MAP)