Files
t2a_v2/src/medical/severity.py
dom 01d47f3c4b feat: mode hybride Ollama — gemma3:27b pour CPAM, 12b pour codage
Le pipeline utilise désormais gemma3:12b (rapide) pour le codage CIM-10
et gemma3:27b (meilleur raisonnement) pour la contre-argumentation CPAM.
Configurable via OLLAMA_MODEL_CPAM et OLLAMA_TIMEOUT_CPAM.

Inclut aussi : traçabilité source/page DAS, niveaux CMA ATIH, sévérité,
page tracker PDF, améliorations fusion et filtres DAS.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 17:53:53 +01:00

243 lines
7.7 KiB
Python

"""Détection heuristique de sévérité et CMA/CMS pour le codage GHM.
Phase 1 : heuristique basée sur des marqueurs textuels et des racines CIM-10.
Phase 2 (future) : tables CMA/CMS officielles ATIH.
"""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from .cim10_dict import load_dict, normalize_text
logger = logging.getLogger(__name__)
# --- Marqueurs de sévérité dans le texte ---
_SEVERE_MARKERS = {
"aigu", "aigue", "severe", "grave", "maligne", "malin",
"foudroyant", "foudroyante", "necrosant", "necrosante",
"septique", "decompense", "decompensee", "choc",
"defaillance", "hemorragique",
"fulminant", "fulminante", "massif", "massive", "critique",
}
_MODERATE_MARKERS = {
"modere", "moderee", "moderes", "moderees",
"subaigu", "subaigue", "subaiguë",
"persistant", "persistante", "recidivant", "recidivante",
}
_MILD_MARKERS = {
"chronique", "leger", "legere",
"benin", "benigne", "mineur", "mineure",
"superficiel", "superficielle", "stable",
}
# --- Racines CIM-10 fréquemment CMA (heuristique Phase 1) ---
# Ces racines sont connues pour être souvent classées CMA dans les tables ATIH.
_HEURISTIC_CMA_ROOTS: set[str] = {
# Infectieux
"A41", # Sepsis
"A40", # Septicémie streptococcique
# Hématologie / nutrition
"D64", # Anémie
"D65", # CIVD
"E46", # Dénutrition
"E87", # Troubles hydro-électrolytiques
"E86", # Déshydratation
# Métabolique
"E11", # Diabète type 2 (avec complications)
"E10", # Diabète type 1 (avec complications)
# Cardiovasculaire
"I48", # Fibrillation auriculaire
"I50", # Insuffisance cardiaque
"I26", # Embolie pulmonaire
"I80", # Thrombose veineuse
# Respiratoire
"J18", # Pneumopathie
"J96", # Insuffisance respiratoire
"J69", # Pneumopathie d'inhalation
# Rénal
"N17", # Insuffisance rénale aiguë
"N18", # Insuffisance rénale chronique
"N39", # Infection urinaire
# Hépatique
"K72", # Insuffisance hépatique
# Infectieux nosocomial
"T81", # Complications d'actes (infection post-op)
"T80", # Complications post-perfusion
}
_cma_levels: dict[str, int] | None = None
def _load_cma_levels() -> dict[str, int]:
"""Charge les niveaux CMA officiels depuis data/cma_levels.json (lazy-loaded)."""
global _cma_levels
if _cma_levels is not None:
return _cma_levels
from ..config import CMA_LEVELS_PATH
try:
data = json.loads(CMA_LEVELS_PATH.read_text(encoding="utf-8"))
_cma_levels = {k: int(v) for k, v in data.items()}
logger.debug("CMA levels chargés : %d codes", len(_cma_levels))
except FileNotFoundError:
logger.warning("Fichier CMA levels non trouvé : %s", CMA_LEVELS_PATH)
_cma_levels = {}
except Exception:
logger.warning("Erreur chargement CMA levels", exc_info=True)
_cma_levels = {}
return _cma_levels
@dataclass
class SeverityInfo:
"""Résultat de l'évaluation de sévérité d'un diagnostic."""
est_cma_probable: bool = False
niveau_severite: str = "non_evalue" # "leger" | "modere" | "severe" | "non_evalue"
niveau_cma: int = 1 # 1 (pas CMA), 2, 3 ou 4 (officiel ATIH)
marqueurs_trouves: list[str] = field(default_factory=list)
def _detect_severity_markers(text: str) -> tuple[str, list[str]]:
"""Détecte les marqueurs de sévérité dans un texte normalisé.
Returns:
(niveau, marqueurs_trouves) où niveau est "severe", "modere", "leger" ou "non_evalue".
"""
text_norm = normalize_text(text)
words = set(text_norm.split())
found_severe = words & _SEVERE_MARKERS
found_moderate = words & _MODERATE_MARKERS
found_mild = words & _MILD_MARKERS
all_found = list(found_severe | found_moderate | found_mild)
if found_severe:
return "severe", all_found
if found_moderate:
return "modere", all_found
if found_mild:
return "leger", all_found
return "non_evalue", []
def _is_heuristic_cma(code: str) -> bool:
"""Vérifie si un code CIM-10 est probablement CMA selon les racines heuristiques."""
if not code:
return False
code_upper = code.upper()
for root in _HEURISTIC_CMA_ROOTS:
if code_upper.startswith(root):
return True
return False
def evaluate_severity(diagnostic) -> SeverityInfo:
"""Évalue la sévérité d'un diagnostic (texte + code CIM-10).
Utilise en priorité les niveaux CMA officiels ATIH (2/3/4),
avec fallback sur l'heuristique par racines CIM-10.
Args:
diagnostic: Objet avec attributs texte, cim10_suggestion.
Returns:
SeverityInfo avec est_cma_probable, niveau_cma, niveau_severite, marqueurs_trouves.
"""
info = SeverityInfo()
# 1. Marqueurs textuels depuis le texte du diagnostic
texte = diagnostic.texte or ""
niveau, marqueurs = _detect_severity_markers(texte)
# 2. Chercher aussi dans le label du dictionnaire CIM-10
code = diagnostic.cim10_suggestion
if code:
cim10_dict = load_dict()
label = cim10_dict.get(code, "")
if label:
niveau_label, marqueurs_label = _detect_severity_markers(label)
# Prendre le niveau le plus sévère
severity_order = {"severe": 3, "modere": 2, "leger": 1, "non_evalue": 0}
if severity_order.get(niveau_label, 0) > severity_order.get(niveau, 0):
niveau = niveau_label
marqueurs = list(set(marqueurs + marqueurs_label))
info.niveau_severite = niveau
info.marqueurs_trouves = marqueurs
# 3. Lookup officiel CMA ATIH (prioritaire)
if code:
cma_levels = _load_cma_levels()
official_level = cma_levels.get(code)
if official_level:
info.niveau_cma = official_level
info.est_cma_probable = True
elif _is_heuristic_cma(code):
# Fallback heuristique → niveau 2
info.niveau_cma = 2
info.est_cma_probable = True
return info
def enrich_dossier_severity(dp, das_list: list) -> tuple[list[str], int, int]:
"""Enrichit les diagnostics d'un dossier avec les informations de sévérité.
Modifie les diagnostics en place (attributs est_cma, est_cms, niveau_severite).
Args:
dp: Diagnostic principal.
das_list: Liste des diagnostics associés.
Returns:
(alertes, cma_count, cms_count).
"""
alertes = []
# Évaluer le DP
if dp and dp.cim10_suggestion:
info = evaluate_severity(dp)
dp.niveau_severite = info.niveau_severite
dp.niveau_cma = info.niveau_cma
if info.est_cma_probable:
dp.est_cma = True
# Évaluer chaque DAS
cma_count = 0
cms_count = 0
for das in das_list:
if not das.cim10_suggestion:
continue
info = evaluate_severity(das)
das.niveau_severite = info.niveau_severite
das.niveau_cma = info.niveau_cma
if info.est_cma_probable:
das.est_cma = True
cma_count += 1
# CMS = CMA niveau 4 ou CMA sévère
if info.niveau_cma >= 4 or info.niveau_severite == "severe":
das.est_cms = True
cms_count += 1
alertes.append(
f"CMA niveau {info.niveau_cma} : '{das.texte}' ({das.cim10_suggestion}) — "
f"sévérité {info.niveau_severite}"
+ (f", marqueurs : {', '.join(info.marqueurs_trouves)}" if info.marqueurs_trouves else "")
)
if cma_count >= 2:
alertes.insert(0, f"{cma_count} CMA probables détectées — impact potentiel sur le niveau de sévérité GHM")
return alertes, cma_count, cms_count