feat: règles métier T2A Phase 1 — exclusions diagnostiques, sévérité CMA et alertes codage

Ajout des règles d'exclusion symptôme (R00-R99) vs diagnostic précis (Chapitres I-XIV),
détection heuristique de sévérité CMA sur 25 racines CIM-10, et affichage des alertes
de codage dans le viewer Flask. 153 tests, 0 régression.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-11 08:53:14 +01:00
parent 12f4479cd2
commit 9df4465fef
8 changed files with 911 additions and 42 deletions

View File

@@ -113,6 +113,12 @@ def extract_medical_info(
if use_rag:
_enrich_with_rag(dossier)
# Post-processing : exclusions symptôme vs diagnostic précis
_apply_exclusion_rules(dossier)
# Post-processing : enrichissement sévérité (CMA/CMS heuristique)
_apply_severity_rules(dossier)
return dossier
@@ -641,6 +647,34 @@ def _find_act_date(text: str, act_pattern: str) -> str | None:
return None
def _apply_exclusion_rules(dossier: DossierMedical) -> None:
"""Applique les règles d'exclusion symptôme vs diagnostic précis."""
try:
from .exclusion_rules import check_exclusions
result = check_exclusions(dossier.diagnostic_principal, dossier.diagnostics_associes)
dossier.diagnostics_associes = result.cleaned_das
dossier.alertes_codage.extend(result.warnings)
if result.excluded:
logger.info(
" Exclusions : %d DAS symptomatiques exclus",
len(result.excluded),
)
except Exception:
logger.warning("Erreur lors de l'application des règles d'exclusion", exc_info=True)
def _apply_severity_rules(dossier: DossierMedical) -> None:
"""Enrichit les diagnostics avec les informations de sévérité heuristique."""
try:
from .severity import enrich_dossier_severity
alertes = enrich_dossier_severity(
dossier.diagnostic_principal, dossier.diagnostics_associes,
)
dossier.alertes_codage.extend(alertes)
except Exception:
logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True)
def _lookup_cim10(text: str) -> str | None:
"""Cherche un code CIM-10 pour un texte donné.

View File

@@ -0,0 +1,169 @@
"""Règles d'exclusion diagnostique : symptôme (Chapitre XVIII) vs diagnostic précis.
Lorsqu'un symptôme (R00-R99) et un diagnostic précis (Chapitres I-XIV, A00-N99)
coexistent et que le symptôme est expliqué par le diagnostic précis, le symptôme
ne doit PAS être codé comme DAS (règle ATIH de non-redondance).
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
def is_symptom_code(code: str) -> bool:
"""Vérifie si un code CIM-10 appartient au Chapitre XVIII (R00-R99 = Symptômes)."""
if not code:
return False
return bool(re.match(r"^R\d{2}", code, re.IGNORECASE))
def is_precise_diagnosis(code: str) -> bool:
"""Vérifie si un code CIM-10 appartient aux Chapitres I-XIV (A00-N99)."""
if not code:
return False
return bool(re.match(r"^[A-N]\d{2}", code, re.IGNORECASE))
# Mapping R-code → set de codes précis qui excluent le symptôme.
# Chaque R-code est exclu si l'un des codes précis (ou un code commençant par
# l'une de ces racines) est présent parmi les diagnostics du séjour.
EXCLUSION_MAP: dict[str, set[str]] = {
# R10 — Douleur abdominale → exclu par pathologies digestives précises
"R10": {"K35", "K80", "K81", "K83", "K85", "K86", "K56", "K57", "K25", "K26", "K29"},
"R10.1": {"K80", "K81", "K83"}, # Douleur hypochondre droit
"R10.3": {"K35", "K36", "K37"}, # Douleur hypogastre
"R10.4": {"K35", "K80", "K85", "K56", "K57"}, # Douleur abdominale autre/non précisée
# R11 — Nausées et vomissements
"R11": {"K29", "K80", "K81", "K85", "K56", "K91"},
# R17 — Ictère → exclu par pathologies hépatobiliaires
"R17": {"K80", "K83", "K70", "K71", "K72", "K73", "K74", "B15", "B16", "B17", "B18", "B19", "C22"},
# R50 — Fièvre → exclu par infections précises
"R50": {"A41", "J18", "J15", "J13", "J14", "J06", "N10", "N39", "K81", "K83",
"L03", "T81", "A09", "A04"},
"R50.9": {"A41", "J18", "J15", "J13", "J14", "N10", "N39", "K81"},
# R07 — Douleur thoracique → exclu par pathologies cardiaques/pulmonaires
"R07": {"I20", "I21", "I22", "I23", "I24", "I25", "I26", "J18", "J93"},
"R07.4": {"I20", "I21", "I24", "I25"},
# R06 — Dyspnée → exclu par pathologies respiratoires/cardiaques
"R06": {"J18", "J44", "J45", "J96", "I50", "I26"},
"R06.0": {"J18", "J44", "J45", "J96", "I50", "I26"},
# R31 — Hématurie → exclu par pathologies urologiques/rénales
"R31": {"N20", "N13", "C64", "C67", "N02", "N00", "N01"},
# R04 — Hémoptysie → exclu par pathologies pulmonaires
"R04": {"J18", "C34", "I26", "A16"},
# R63.4 — Perte de poids → exclu par tumeurs, infections chroniques
"R63.4": {"C15", "C16", "C18", "C19", "C20", "C22", "C25", "C34", "C50",
"A15", "A16", "B20", "B21", "B22", "B23", "B24", "E46"},
# R00 — Anomalies du rythme cardiaque → exclu par troubles du rythme précis
"R00": {"I47", "I48", "I49"},
"R00.0": {"I47", "I48"}, # Tachycardie
"R00.1": {"I49.5", "I49.8"}, # Bradycardie
}
def _code_matches(code: str, roots: set[str]) -> bool:
"""Vérifie si un code CIM-10 commence par l'une des racines données."""
if not code:
return False
code_upper = code.upper()
for root in roots:
if code_upper.startswith(root.upper()):
return True
return False
@dataclass
class ExclusionResult:
"""Résultat de l'application des règles d'exclusion."""
cleaned_das: list # Diagnostics DAS conservés
excluded: list # Diagnostics DAS exclus
warnings: list[str] = field(default_factory=list)
def check_exclusions(dp, das_list: list) -> ExclusionResult:
"""Applique les règles d'exclusion symptôme vs diagnostic précis.
Args:
dp: Diagnostic principal (objet avec attribut cim10_suggestion).
das_list: Liste des diagnostics associés (même type).
Returns:
ExclusionResult avec les DAS nettoyés, exclus, et les warnings.
"""
# Collecter tous les codes du séjour (DP + DAS)
all_codes: list[str] = []
if dp and dp.cim10_suggestion:
all_codes.append(dp.cim10_suggestion)
for das in das_list:
if das.cim10_suggestion:
all_codes.append(das.cim10_suggestion)
# Identifier les codes précis présents (Chapitres I-XIV)
precise_codes = [c for c in all_codes if is_precise_diagnosis(c)]
cleaned = []
excluded = []
warnings = []
for das in das_list:
code = das.cim10_suggestion
if not code or not is_symptom_code(code):
# Non-symptôme : toujours conservé
cleaned.append(das)
continue
# Vérifier si ce symptôme est exclu par un diagnostic précis
should_exclude = False
excluding_code = None
# Chercher dans EXCLUSION_MAP : d'abord le code exact, puis la racine (3 chars)
exclusion_roots = EXCLUSION_MAP.get(code.upper())
if exclusion_roots is None:
# Essayer la racine 3 caractères (ex: R10.4 → R10)
root3 = code.upper()[:3]
exclusion_roots = EXCLUSION_MAP.get(root3)
if exclusion_roots:
for precise in precise_codes:
if _code_matches(precise, exclusion_roots):
should_exclude = True
excluding_code = precise
break
if should_exclude:
excluded.append(das)
warnings.append(
f"DAS '{das.texte}' ({code}) exclu : symptôme redondant avec "
f"le diagnostic précis {excluding_code}"
)
else:
cleaned.append(das)
# Vérifier aussi si le DP est un symptôme avec un diagnostic précis en DAS
if dp and dp.cim10_suggestion and is_symptom_code(dp.cim10_suggestion):
dp_code = dp.cim10_suggestion
exclusion_roots = EXCLUSION_MAP.get(dp_code.upper())
if exclusion_roots is None:
exclusion_roots = EXCLUSION_MAP.get(dp_code.upper()[:3])
if exclusion_roots:
for precise in precise_codes:
if _code_matches(precise, exclusion_roots):
warnings.append(
f"ALERTE DP : le DP '{dp.texte}' ({dp_code}) est un symptôme "
f"alors qu'un diagnostic précis {precise} est présent — "
f"vérifier si le DP devrait être changé"
)
break
return ExclusionResult(cleaned_das=cleaned, excluded=excluded, warnings=warnings)

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import json
import logging
import re
from typing import Optional
import requests
@@ -15,6 +16,12 @@ logger = logging.getLogger(__name__)
# Singleton pour le modèle d'embedding (chargé une seule fois)
_embed_model = None
# Score minimum de similarité FAISS pour retenir un résultat
_MIN_SCORE = 0.3
# Marqueur de fin de raisonnement dans la réponse Ollama
_RESULT_MARKER = "###RESULT###"
def _get_embed_model():
"""Charge le modèle d'embedding (singleton)."""
@@ -27,7 +34,7 @@ def _get_embed_model():
return _embed_model
def search_similar(query: str, top_k: int = 5) -> list[dict]:
def search_similar(query: str, top_k: int = 10) -> list[dict]:
"""Recherche les passages les plus similaires dans l'index FAISS.
Args:
@@ -35,7 +42,8 @@ def search_similar(query: str, top_k: int = 5) -> list[dict]:
top_k: Nombre de résultats à retourner.
Returns:
Liste de dicts avec les métadonnées + score de similarité.
Liste de dicts avec les métadonnées + score de similarité,
filtrés par score minimum et priorisant les sources CIM-10.
"""
from .rag_index import get_index
import numpy as np
@@ -51,21 +59,93 @@ def search_similar(query: str, top_k: int = 5) -> list[dict]:
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
scores, indices = faiss_index.search(query_vec, min(top_k, faiss_index.ntotal))
# Chercher plus de résultats que top_k pour pouvoir filtrer ensuite
fetch_k = min(top_k * 2, faiss_index.ntotal)
scores, indices = faiss_index.search(query_vec, fetch_k)
results = []
raw_results = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
if float(score) < _MIN_SCORE:
continue
meta = metadata[idx].copy()
meta["score"] = float(score)
results.append(meta)
raw_results.append(meta)
return results
# Prioriser les sources CIM-10 (au moins 6 sur top_k)
cim10_results = [r for r in raw_results if r["document"] == "cim10"]
other_results = [r for r in raw_results if r["document"] != "cim10"]
min_cim10 = min(6, len(cim10_results))
final = cim10_results[:min_cim10]
remaining_slots = top_k - len(final)
# Remplir le reste avec les meilleurs résultats (CIM-10 restants + autres)
remaining = cim10_results[min_cim10:] + other_results
remaining.sort(key=lambda r: r["score"], reverse=True)
final.extend(remaining[:remaining_slots])
return final
def _build_prompt(texte: str, sources: list[dict], contexte: dict) -> str:
"""Construit le prompt pour Ollama."""
def _format_contexte(contexte: dict) -> str:
"""Formate le contexte patient de manière structurée pour le prompt."""
lines = []
sexe = contexte.get("sexe")
age = contexte.get("age")
imc = contexte.get("imc")
patient_parts = []
if sexe:
patient_parts.append(sexe)
if age:
patient_parts.append(f"{age} ans")
if imc:
patient_parts.append(f"IMC {imc}")
if patient_parts:
lines.append(f"- Patient : {', '.join(patient_parts)}")
duree = contexte.get("duree_sejour")
if duree:
lines.append(f"- Durée séjour : {duree} jours")
antecedents = contexte.get("antecedents")
if antecedents:
lines.append(f"- Antécédents : {', '.join(antecedents[:5])}")
biologie = contexte.get("biologie_cle")
if biologie:
bio_parts = []
for b in biologie:
test, valeur, anomalie = b if isinstance(b, (list, tuple)) else (b.get("test"), b.get("valeur"), b.get("anomalie"))
marker = " (\u2191)" if anomalie else ""
bio_parts.append(f"{test} {valeur}{marker}")
lines.append(f"- Biologie : {', '.join(bio_parts)}")
imagerie = contexte.get("imagerie")
if imagerie:
for img in imagerie:
img_type, conclusion = img if isinstance(img, (list, tuple)) else (img.get("type"), img.get("conclusion"))
if conclusion:
lines.append(f"- Imagerie : {img_type}{conclusion[:200]}")
complications = contexte.get("complications")
if complications:
lines.append(f"- Complications : {', '.join(complications)}")
dp_texte = contexte.get("dp_texte")
if dp_texte:
lines.append(f"- DP du séjour : {dp_texte}")
das_codes = contexte.get("das_codes_existants")
if das_codes:
lines.append(f"- DAS déjà codés : {', '.join(das_codes)}")
return "\n".join(lines) if lines else "Non précisé"
def _build_prompt(texte: str, sources: list[dict], contexte: dict, est_dp: bool = True) -> str:
"""Construit le prompt expert DIM avec raisonnement structuré."""
sources_text = ""
for i, src in enumerate(sources, 1):
doc_name = {
@@ -80,24 +160,94 @@ def _build_prompt(texte: str, sources: list[dict], contexte: dict) -> str:
sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n"
sources_text += (src.get("extrait", "")[:800]) + "\n\n"
ctx_parts = []
if contexte.get("sexe"):
ctx_parts.append(f"sexe: {contexte['sexe']}")
if contexte.get("age"):
ctx_parts.append(f"âge: {contexte['age']} ans")
ctx_str = ", ".join(ctx_parts) if ctx_parts else "non précisé"
type_diag = "DP (diagnostic principal)" if est_dp else "DAS (diagnostic associé significatif)"
ctx_str = _format_contexte(contexte)
return f"""Tu es un expert en codage CIM-10 pour le PMSI en France. Suggère le code CIM-10 le plus précis pour le diagnostic suivant, en te basant UNIQUEMENT sur les sources officielles fournies.
return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI.
Tu dois coder le diagnostic suivant en respectant STRICTEMENT les règles de l'ATIH.
Diagnostic à coder : "{texte}"
Contexte patient : {ctx_str}
RÈGLES IMPÉRATIVES :
- Le code doit provenir UNIQUEMENT des sources CIM-10 fournies
- Distingue la DESCRIPTION CLINIQUE (ce que le médecin écrit) de la LOGIQUE DE CODAGE (ce que l'ATIH impose)
- Privilégie le code le plus SPÉCIFIQUE disponible (4e ou 5e caractère)
- Vérifie les notes d'inclusion/exclusion de chaque code candidat
- Si le diagnostic est un DP, il doit refléter le motif principal de prise en charge du séjour
- Si c'est un DAS, il doit avoir mobilisé des ressources supplémentaires pendant le séjour
- EXCLUSION SYMPTÔME : Si le diagnostic est un symptôme (R00-R99) et qu'un diagnostic précis (Chapitres I-XIV, A00-N99) expliquant ce symptôme est présent, le symptôme ne doit PAS être codé comme DAS
Sources de référence :
DIAGNOSTIC À CODER : "{texte}"
TYPE : {type_diag}
CONTEXTE CLINIQUE :
{ctx_str}
SOURCES CIM-10 :
{sources_text}
Réponds UNIQUEMENT au format JSON suivant, sans texte avant ou après :
RAISONNE ÉTAPE PAR ÉTAPE :
1. ANALYSE CLINIQUE : Que signifie ce diagnostic sur le plan médical ?
2. CODES CANDIDATS : Quels codes des sources fournies sont compatibles ?
3. DISCRIMINATION : Pourquoi choisir un code plutôt qu'un autre ? (inclusions/exclusions, spécificité)
4. RÈGLE PMSI : Ce code est-il conforme pour un {type_diag} ? (guide méthodologique)
Après ton raisonnement, conclus OBLIGATOIREMENT par le JSON suivant sur une ligne séparée :
{_RESULT_MARKER}
{{"code": "X99.9", "confidence": "high|medium|low", "justification": "explication courte en français"}}"""
def _parse_ollama_response(raw: str) -> dict | None:
"""Parse la réponse Ollama en extrayant le JSON après le marqueur ###RESULT###.
Fallback sur la recherche d'accolades si le marqueur est absent.
Retourne un dict avec les clés code/confidence/justification + raisonnement.
"""
raisonnement = None
json_str = None
# Stratégie 1 : chercher le marqueur ###RESULT###
marker_pos = raw.find(_RESULT_MARKER)
if marker_pos != -1:
raisonnement = raw[:marker_pos].strip()
after_marker = raw[marker_pos + len(_RESULT_MARKER):]
brace_start = after_marker.find("{")
brace_end = after_marker.rfind("}")
if brace_start != -1 and brace_end != -1:
json_str = after_marker[brace_start:brace_end + 1]
else:
# Fallback : chercher le dernier bloc JSON dans la réponse
# (le raisonnement peut contenir des accolades intermédiaires)
last_brace = raw.rfind("}")
if last_brace != -1:
# Chercher l'accolade ouvrante correspondante en remontant
depth = 0
start = -1
for i in range(last_brace, -1, -1):
if raw[i] == "}":
depth += 1
elif raw[i] == "{":
depth -= 1
if depth == 0:
start = i
break
if start != -1:
json_str = raw[start:last_brace + 1]
raisonnement = raw[:start].strip()
if not json_str:
logger.warning("Ollama : réponse sans JSON valide : %s", raw[:200])
return None
try:
parsed = json.loads(json_str)
except json.JSONDecodeError:
logger.warning("Ollama : JSON invalide : %s", json_str[:200])
return None
if raisonnement:
parsed["raisonnement"] = raisonnement
return parsed
def _call_ollama(prompt: str) -> dict | None:
"""Appelle Ollama et parse la réponse JSON."""
try:
@@ -109,27 +259,14 @@ def _call_ollama(prompt: str) -> dict | None:
"stream": False,
"options": {
"temperature": 0.1,
"num_predict": 300,
"num_predict": 1200,
},
},
timeout=OLLAMA_TIMEOUT,
)
response.raise_for_status()
raw = response.json().get("response", "")
# Extraire le JSON de la réponse (peut contenir du texte autour)
json_match = None
# Chercher un bloc JSON entre accolades
brace_start = raw.find("{")
brace_end = raw.rfind("}")
if brace_start != -1 and brace_end != -1:
json_match = raw[brace_start:brace_end + 1]
if json_match:
return json.loads(json_match)
else:
logger.warning("Ollama : réponse sans JSON valide : %s", raw[:200])
return None
return _parse_ollama_response(raw)
except requests.ConnectionError:
logger.warning("Ollama non disponible (connexion refusée)")
@@ -145,13 +282,14 @@ def _call_ollama(prompt: str) -> dict | None:
def enrich_diagnostic(
diagnostic: Diagnostic,
contexte: dict,
est_dp: bool = True,
) -> None:
"""Enrichit un Diagnostic avec le RAG (FAISS + Ollama).
Modifie le diagnostic en place. Fallback gracieux si FAISS ou Ollama échouent.
"""
# 1. Recherche FAISS
sources = search_similar(diagnostic.texte, top_k=5)
sources = search_similar(diagnostic.texte, top_k=10)
if not sources:
logger.debug("Aucune source RAG trouvée pour : %s", diagnostic.texte)
@@ -168,14 +306,15 @@ def enrich_diagnostic(
for s in sources
]
# 3. Appel Ollama pour justification
prompt = _build_prompt(diagnostic.texte, sources, contexte)
# 3. Appel Ollama pour justification avec raisonnement structuré
prompt = _build_prompt(diagnostic.texte, sources, contexte, est_dp=est_dp)
llm_result = _call_ollama(prompt)
if llm_result:
code = llm_result.get("code")
confidence = llm_result.get("confidence")
justification = llm_result.get("justification")
raisonnement = llm_result.get("raisonnement")
if code:
diagnostic.cim10_suggestion = code
@@ -183,6 +322,8 @@ def enrich_diagnostic(
diagnostic.cim10_confidence = confidence
if justification:
diagnostic.justification = justification
if raisonnement:
diagnostic.raisonnement = raisonnement
else:
logger.info("Ollama non disponible — sources FAISS conservées sans justification LLM")
@@ -192,12 +333,27 @@ def enrich_dossier(dossier: DossierMedical) -> None:
contexte = {
"sexe": dossier.sejour.sexe,
"age": dossier.sejour.age,
"duree_sejour": dossier.sejour.duree_sejour,
"imc": dossier.sejour.imc,
"antecedents": dossier.antecedents[:5],
"biologie_cle": [(b.test, b.valeur, b.anomalie) for b in dossier.biologie_cle],
"imagerie": [(i.type, (i.conclusion or "")[:200]) for i in dossier.imagerie],
"complications": dossier.complications,
}
if dossier.diagnostic_principal:
logger.info("RAG enrichissement DP : %s", dossier.diagnostic_principal.texte)
enrich_diagnostic(dossier.diagnostic_principal, contexte)
enrich_diagnostic(dossier.diagnostic_principal, contexte, est_dp=True)
# Pour les DAS, ajouter le DP et les DAS existants au contexte pour cohérence
if dossier.diagnostic_principal:
contexte["dp_texte"] = dossier.diagnostic_principal.texte
contexte["das_codes_existants"] = [
f"{d.cim10_suggestion} ({d.texte})"
for d in dossier.diagnostics_associes
if d.cim10_suggestion
]
for das in dossier.diagnostics_associes:
logger.info("RAG enrichissement DAS : %s", das.texte)
enrich_diagnostic(das, contexte)
enrich_diagnostic(das, contexte, est_dp=False)

201
src/medical/severity.py Normal file
View File

@@ -0,0 +1,201 @@
"""Détection heuristique de sévérité et CMA/CMS pour le codage GHM.
Phase 1 : heuristique basée sur des marqueurs textuels et des racines CIM-10.
Phase 2 (future) : tables CMA/CMS officielles ATIH.
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Optional
from .cim10_dict import load_dict, normalize_text
# --- Marqueurs de sévérité dans le texte ---
_SEVERE_MARKERS = {
"aigu", "aigue", "severe", "grave", "maligne", "malin",
"foudroyant", "foudroyante", "necrosant", "necrosante",
"septique", "decompense", "decompensee", "choc",
"defaillance", "hemorragique",
"fulminant", "fulminante", "massif", "massive", "critique",
}
_MODERATE_MARKERS = {
"modere", "moderee", "moderes", "moderees",
"subaigu", "subaigue", "subaiguë",
"persistant", "persistante", "recidivant", "recidivante",
}
_MILD_MARKERS = {
"chronique", "leger", "legere",
"benin", "benigne", "mineur", "mineure",
"superficiel", "superficielle", "stable",
}
# --- Racines CIM-10 fréquemment CMA (heuristique Phase 1) ---
# Ces racines sont connues pour être souvent classées CMA dans les tables ATIH.
_HEURISTIC_CMA_ROOTS: set[str] = {
# Infectieux
"A41", # Sepsis
"A40", # Septicémie streptococcique
# Hématologie / nutrition
"D64", # Anémie
"D65", # CIVD
"E46", # Dénutrition
"E87", # Troubles hydro-électrolytiques
"E86", # Déshydratation
# Métabolique
"E11", # Diabète type 2 (avec complications)
"E10", # Diabète type 1 (avec complications)
# Cardiovasculaire
"I48", # Fibrillation auriculaire
"I50", # Insuffisance cardiaque
"I26", # Embolie pulmonaire
"I80", # Thrombose veineuse
# Respiratoire
"J18", # Pneumopathie
"J96", # Insuffisance respiratoire
"J69", # Pneumopathie d'inhalation
# Rénal
"N17", # Insuffisance rénale aiguë
"N18", # Insuffisance rénale chronique
"N39", # Infection urinaire
# Hépatique
"K72", # Insuffisance hépatique
# Infectieux nosocomial
"T81", # Complications d'actes (infection post-op)
"T80", # Complications post-perfusion
}
@dataclass
class SeverityInfo:
"""Résultat de l'évaluation de sévérité d'un diagnostic."""
est_cma_probable: bool = False
niveau_severite: str = "non_evalue" # "leger" | "modere" | "severe" | "non_evalue"
marqueurs_trouves: list[str] = field(default_factory=list)
def _detect_severity_markers(text: str) -> tuple[str, list[str]]:
"""Détecte les marqueurs de sévérité dans un texte normalisé.
Returns:
(niveau, marqueurs_trouves) où niveau est "severe", "modere", "leger" ou "non_evalue".
"""
text_norm = normalize_text(text)
words = set(text_norm.split())
found_severe = words & _SEVERE_MARKERS
found_moderate = words & _MODERATE_MARKERS
found_mild = words & _MILD_MARKERS
all_found = list(found_severe | found_moderate | found_mild)
if found_severe:
return "severe", all_found
if found_moderate:
return "modere", all_found
if found_mild:
return "leger", all_found
return "non_evalue", []
def _is_heuristic_cma(code: str) -> bool:
"""Vérifie si un code CIM-10 est probablement CMA selon les racines heuristiques."""
if not code:
return False
code_upper = code.upper()
for root in _HEURISTIC_CMA_ROOTS:
if code_upper.startswith(root):
return True
return False
def evaluate_severity(diagnostic) -> SeverityInfo:
"""Évalue la sévérité d'un diagnostic (texte + code CIM-10).
Args:
diagnostic: Objet avec attributs texte, cim10_suggestion.
Returns:
SeverityInfo avec est_cma_probable, niveau_severite, marqueurs_trouves.
"""
info = SeverityInfo()
# 1. Marqueurs textuels depuis le texte du diagnostic
texte = diagnostic.texte or ""
niveau, marqueurs = _detect_severity_markers(texte)
# 2. Chercher aussi dans le label du dictionnaire CIM-10
code = diagnostic.cim10_suggestion
if code:
cim10_dict = load_dict()
label = cim10_dict.get(code, "")
if label:
niveau_label, marqueurs_label = _detect_severity_markers(label)
# Prendre le niveau le plus sévère
severity_order = {"severe": 3, "modere": 2, "leger": 1, "non_evalue": 0}
if severity_order.get(niveau_label, 0) > severity_order.get(niveau, 0):
niveau = niveau_label
marqueurs = list(set(marqueurs + marqueurs_label))
info.niveau_severite = niveau
info.marqueurs_trouves = marqueurs
# 3. Heuristique CMA basée sur la racine CIM-10
if code and _is_heuristic_cma(code):
info.est_cma_probable = True
# Un diagnostic sévère avec un code CMA-probable = forte indication
if niveau == "severe" and info.est_cma_probable:
info.est_cma_probable = True
return info
def enrich_dossier_severity(dp, das_list: list) -> list[str]:
"""Enrichit les diagnostics d'un dossier avec les informations de sévérité.
Modifie les diagnostics en place (attributs est_cma, est_cms, niveau_severite).
Args:
dp: Diagnostic principal.
das_list: Liste des diagnostics associés.
Returns:
Liste d'alertes de sévérité générées.
"""
alertes = []
# Évaluer le DP
if dp and dp.cim10_suggestion:
info = evaluate_severity(dp)
dp.niveau_severite = info.niveau_severite
if info.est_cma_probable:
dp.est_cma = True
# Évaluer chaque DAS
cma_count = 0
for das in das_list:
if not das.cim10_suggestion:
continue
info = evaluate_severity(das)
das.niveau_severite = info.niveau_severite
if info.est_cma_probable:
das.est_cma = True
cma_count += 1
alertes.append(
f"CMA probable : '{das.texte}' ({das.cim10_suggestion}) — "
f"sévérité {info.niveau_severite}"
+ (f", marqueurs : {', '.join(info.marqueurs_trouves)}" if info.marqueurs_trouves else "")
)
if cma_count >= 2:
alertes.insert(0, f"{cma_count} CMA probables détectées — impact potentiel sur le niveau de sévérité GHM")
return alertes