chore: add .gitignore

This commit is contained in:
dom
2026-03-05 00:37:41 +01:00
parent 542797a124
commit 2578afb6ff
1716 changed files with 1905609 additions and 18 deletions

View File

@@ -0,0 +1,191 @@
"""Dictionnaire CCAM complet extrait depuis le fichier XLS officiel (CNAM).
Fournit un lookup intelligent avec normalisation Unicode pour la recherche
de codes CCAM à partir de textes d'actes médicaux en français.
"""
from __future__ import annotations
import json
import logging
import re
import unicodedata
from pathlib import Path
from typing import Optional
from ..config import CCAM_DICT_PATH
logger = logging.getLogger(__name__)
# Singleton : dictionnaire chargé une seule fois
_dict_cache: dict[str, dict] | None = None
# Cache des labels normalisés pour le substring matching
_normalized_cache: list[tuple[str, str, str]] | None = None
_CCAM_CODE_RE = re.compile(r"^[A-Z]{4}\d{3}$")
def normalize_text(text: str) -> str:
"""Normalise un texte : accent folding, lowercase, collapse whitespace."""
text = text.replace("\u2019", "'").replace("\u2018", "'").replace("\u02BC", "'")
nfkd = unicodedata.normalize("NFKD", text)
stripped = "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
return re.sub(r"\s+", " ", stripped.lower()).strip()
def build_dict(source_path: str | Path) -> dict[str, dict]:
"""Construit le dictionnaire CCAM depuis un fichier XLS et l'écrit en JSON.
Format JSON : {code: {description, activite, tarif_s1, regroupement}}
Args:
source_path: Chemin vers le fichier XLS CCAM (ex: CCAM_V81.xls).
Returns:
Le dictionnaire code → infos.
"""
import xlrd
source_path = Path(source_path)
if not source_path.exists():
logger.error("Fichier XLS non trouvé : %s", source_path)
return {}
wb = xlrd.open_workbook(str(source_path))
sheet = wb.sheet_by_index(0)
result: dict[str, dict] = {}
for r in range(sheet.nrows):
code = str(sheet.cell_value(r, 0)).strip()
if not _CCAM_CODE_RE.match(code):
continue
description = str(sheet.cell_value(r, 2)).strip()
activite_raw = sheet.cell_value(r, 3)
activite = int(activite_raw) if isinstance(activite_raw, float) else None
tarif_raw = sheet.cell_value(r, 5)
tarif_s1 = round(tarif_raw, 2) if isinstance(tarif_raw, (int, float)) else None
regroupement = str(sheet.cell_value(r, 10)).strip() or None
result[code] = {
"description": description,
"activite": activite,
"tarif_s1": tarif_s1,
"regroupement": regroupement,
}
# Écrire le fichier JSON
CCAM_DICT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(CCAM_DICT_PATH, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info("Dictionnaire CCAM généré : %d codes → %s", len(result), CCAM_DICT_PATH)
return result
def load_dict() -> dict[str, dict]:
"""Charge le dictionnaire CCAM (singleton lazy-loaded).
Si le fichier JSON n'existe pas, retourne un dict vide avec un warning.
"""
global _dict_cache
if _dict_cache is not None:
return _dict_cache
if CCAM_DICT_PATH.exists():
with open(CCAM_DICT_PATH, encoding="utf-8") as f:
_dict_cache = json.load(f)
else:
logger.warning("Dictionnaire CCAM absent : %s — lancez --build-ccam-dict", CCAM_DICT_PATH)
_dict_cache = {}
return _dict_cache
def _get_normalized_entries() -> list[tuple[str, str, str]]:
"""Retourne une liste de (code, description, description_normalisée) triée par longueur."""
global _normalized_cache
if _normalized_cache is not None:
return _normalized_cache
d = load_dict()
entries = []
for code, info in d.items():
desc = info.get("description", "") if isinstance(info, dict) else str(info)
norm = normalize_text(desc)
entries.append((code, desc, norm))
# Trier par longueur de description décroissante (plus spécifique d'abord)
entries.sort(key=lambda e: -len(e[2]))
_normalized_cache = entries
return _normalized_cache
def lookup(
text: str,
domain_overrides: dict[str, str] | None = None,
) -> str | None:
"""Recherche un code CCAM pour un texte donné.
Stratégie en 3 niveaux :
1. Match substring dans domain_overrides (prioritaire, ex: CCAM_MAP existant)
2. Match exact normalisé dans le dictionnaire complet
3. Match substring normalisé avec scoring par spécificité
Args:
text: Le texte de l'acte médical à rechercher.
domain_overrides: Dictionnaire terme→code prioritaire.
Returns:
Le code CCAM trouvé ou None.
"""
if not text:
return None
text_norm = normalize_text(text)
# Niveau 1 : domain overrides (substring match)
if domain_overrides:
for terme, code in domain_overrides.items():
if normalize_text(terme) in text_norm:
return code
entries = _get_normalized_entries()
# Niveau 2 : match exact normalisé
for code, _desc, norm_desc in entries:
if norm_desc == text_norm:
return code
# Niveau 3 : substring match normalisé (plus spécifique d'abord)
for code, _desc, norm_desc in entries:
if not norm_desc or len(norm_desc) < 4:
continue
if norm_desc in text_norm or text_norm in norm_desc:
return code
return None
def validate_code(code: str) -> tuple[bool, str]:
"""Vérifie si un code CCAM existe dans le dictionnaire.
Returns:
(is_valid, description) — description vide si invalide.
"""
d = load_dict()
if code in d:
info = d[code]
desc = info.get("description", "") if isinstance(info, dict) else str(info)
return True, desc
return False, ""
def reset_cache() -> None:
"""Réinitialise les caches (utile pour les tests)."""
global _dict_cache, _normalized_cache
_dict_cache = None
_normalized_cache = None

View File

@@ -0,0 +1,122 @@
"""Détection des incompatibilités de non-cumul entre actes CCAM.
Implémente 3 règles heuristiques basées sur les principes T2A :
1. Même code de base (7 caractères) avec activités différentes
2. Même regroupement chirurgical le même jour
3. Paires de regroupements incompatibles connues
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import ActeCCAM
logger = logging.getLogger(__name__)
# Regroupements chirurgicaux soumis à cumul restreint (un seul par jour)
REGROUPEMENT_UNIQUE_PAR_JOUR: set[str] = {
"ADC", # Actes de chirurgie
"ACO", # Actes de chirurgie orthopédique
"ADO", # Actes de chirurgie ORL
"ADA", # Actes de chirurgie abdominale/digestive
"ADE", # Actes de chirurgie endoscopique
}
# Paires de regroupements incompatibles
NONCUMUL_REGROUPEMENT_PAIRS: set[frozenset[str]] = {
frozenset({"ADC", "ADE"}),
frozenset({"ADC", "ADO"}),
frozenset({"ACO", "ADE"}),
}
def _get_regroupement(acte: ActeCCAM) -> str | None:
"""Récupère le regroupement d'un acte depuis le dictionnaire CCAM."""
if not acte.code_ccam_suggestion:
return None
try:
from .ccam_dict import load_dict
d = load_dict()
info = d.get(acte.code_ccam_suggestion)
if info and isinstance(info, dict):
return info.get("regroupement")
except Exception:
pass
return None
def check_noncumul(actes: list[ActeCCAM]) -> list[str]:
"""Vérifie les règles de non-cumul entre actes CCAM.
Args:
actes: Liste d'actes CCAM d'un dossier médical.
Returns:
Liste d'alertes de non-cumul détectées.
"""
if len(actes) < 2:
return []
alertes: list[str] = []
# Enrichir les actes avec leur regroupement
actes_info: list[tuple[ActeCCAM, str | None]] = [
(acte, _get_regroupement(acte)) for acte in actes
]
# Règle 1 : même code de base (7 premiers caractères), activités différentes
codes_base: dict[str, list[ActeCCAM]] = {}
for acte in actes:
code = acte.code_ccam_suggestion
if code and len(code) >= 7:
base = code[:7]
codes_base.setdefault(base, []).append(acte)
for base, group in codes_base.items():
if len(group) > 1:
codes_full = [a.code_ccam_suggestion for a in group]
alertes.append(
f"NON-CUMUL: codes de même base {base} avec variantes "
f"({', '.join(codes_full)}) — vérifier la facturation"
)
# Règle 2 : même regroupement chirurgical le même jour
regroup_par_jour: dict[tuple[str, str | None], list[ActeCCAM]] = {}
for acte, regroup in actes_info:
if regroup and regroup in REGROUPEMENT_UNIQUE_PAR_JOUR:
key = (regroup, acte.date)
regroup_par_jour.setdefault(key, []).append(acte)
for (regroup, date), group in regroup_par_jour.items():
if len(group) > 1:
codes = [a.code_ccam_suggestion or "?" for a in group]
jour = f" le {date}" if date else ""
alertes.append(
f"NON-CUMUL: {len(group)} actes du regroupement {regroup}{jour} "
f"({', '.join(codes)}) — cumul restreint"
)
# Règle 3 : paires de regroupements incompatibles
regroups_seen: list[tuple[str, ActeCCAM]] = [
(r, a) for a, r in actes_info if r
]
checked: set[frozenset[int]] = set()
for i, (r1, a1) in enumerate(regroups_seen):
for j, (r2, a2) in enumerate(regroups_seen):
if i >= j:
continue
pair_key = frozenset({i, j})
if pair_key in checked:
continue
checked.add(pair_key)
pair = frozenset({r1, r2})
if pair in NONCUMUL_REGROUPEMENT_PAIRS:
alertes.append(
f"NON-CUMUL: regroupements incompatibles {r1}/{r2} "
f"({a1.code_ccam_suggestion or '?'} + {a2.code_ccam_suggestion or '?'})"
)
return alertes

View File

@@ -0,0 +1,243 @@
"""Dictionnaire CIM-10 complet extrait depuis les métadonnées FAISS.
Fournit un lookup intelligent avec normalisation Unicode pour la recherche
de codes CIM-10 à partir de textes médicaux en français.
"""
from __future__ import annotations
import json
import logging
import re
import unicodedata
from pathlib import Path
from typing import Optional
from ..config import CIM10_DICT_PATH, CIM10_SUPPLEMENTS_PATH, RAG_INDEX_DIR
logger = logging.getLogger(__name__)
# Singleton : dictionnaire chargé une seule fois
_dict_cache: dict[str, str] | None = None
# Cache des labels normalisés pour le substring matching
_normalized_cache: list[tuple[str, str, str]] | None = None
def normalize_text(text: str) -> str:
"""Normalise un texte : accent folding, lowercase, collapse whitespace.
Utilise unicodedata pour supprimer les accents (NFD → suppression des
combining marks), puis met en minuscules et collapse les espaces multiples.
"""
# Normaliser les apostrophes Unicode → ASCII
text = text.replace("\u2019", "'").replace("\u2018", "'").replace("\u02BC", "'")
# NFD decomposition puis suppression des combining marks (accents)
nfkd = unicodedata.normalize("NFKD", text)
stripped = "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
# Lowercase + collapse whitespace
return re.sub(r"\s+", " ", stripped.lower()).strip()
def build_dict() -> dict[str, str]:
"""Construit le dictionnaire CIM-10 depuis les métadonnées RAG.
Extrait le code et le label (première ligne de l'extrait, sans le préfixe code)
depuis chaque entrée CIM-10 du metadata.json existant.
Returns:
Le dictionnaire code → label.
"""
# Nouveau format : metadata_ref.json (fallback legacy : metadata.json)
metadata_path = RAG_INDEX_DIR / "metadata_ref.json"
if not metadata_path.exists():
legacy = RAG_INDEX_DIR / "metadata.json"
if legacy.exists():
metadata_path = legacy
else:
logger.error("Métadonnées RAG non trouvées : %s", metadata_path)
return {}
with open(metadata_path, encoding="utf-8") as f:
metadata = json.load(f)
result: dict[str, str] = {}
for entry in metadata:
if entry.get("document") != "cim10":
continue
code = entry.get("code")
extrait = entry.get("extrait", "")
if not code or not extrait:
continue
# Extraire le label : première ligne, sans le préfixe "CODE "
first_line = extrait.split("\n")[0].strip()
# Retirer le préfixe code (ex: "K85.1 Pancréatite aigüe...")
prefix = f"{code} "
if first_line.startswith(prefix):
label = first_line[len(prefix):]
else:
label = first_line
# Garder l'entrée la plus spécifique (avec point > sans point)
if code not in result or not label:
result[code] = label
# Écrire le fichier JSON
CIM10_DICT_PATH.parent.mkdir(parents=True, exist_ok=True)
with open(CIM10_DICT_PATH, "w", encoding="utf-8") as f:
json.dump(result, f, ensure_ascii=False, indent=2)
logger.info("Dictionnaire CIM-10 généré : %d codes → %s", len(result), CIM10_DICT_PATH)
return result
def load_dict() -> dict[str, str]:
"""Charge le dictionnaire CIM-10 (singleton lazy-loaded).
Si le fichier JSON n'existe pas, tente de le construire depuis metadata.json.
Fusionne ensuite les suppléments (sous-codes manquants) sans écraser le dict principal.
"""
global _dict_cache
if _dict_cache is not None:
return _dict_cache
if CIM10_DICT_PATH.exists():
with open(CIM10_DICT_PATH, encoding="utf-8") as f:
_dict_cache = json.load(f)
else:
logger.info("Dictionnaire CIM-10 absent, construction depuis metadata.json...")
_dict_cache = build_dict()
# Fusionner les suppléments (ne remplace pas les entrées existantes)
if CIM10_SUPPLEMENTS_PATH.exists():
with open(CIM10_SUPPLEMENTS_PATH, encoding="utf-8") as f:
supplements = json.load(f)
added = 0
for code, label in supplements.items():
if code not in _dict_cache:
_dict_cache[code] = label
added += 1
if added:
logger.info("Suppléments CIM-10 : %d codes ajoutés depuis %s", added, CIM10_SUPPLEMENTS_PATH.name)
return _dict_cache
def _get_normalized_entries() -> list[tuple[str, str, str]]:
"""Retourne une liste de (code, label_original, label_normalisé) triée par spécificité.
Les codes avec point (sous-codes, plus spécifiques) sont en premier.
"""
global _normalized_cache
if _normalized_cache is not None:
return _normalized_cache
d = load_dict()
entries = []
for code, label in d.items():
norm = normalize_text(label)
entries.append((code, label, norm))
# Trier : sous-codes (avec point) d'abord, puis par longueur de label décroissante
# pour préférer les matchs les plus spécifiques
entries.sort(key=lambda e: (0 if "." in e[0] else 1, -len(e[2])))
_normalized_cache = entries
return _normalized_cache
def lookup(
text: str,
domain_overrides: dict[str, str] | None = None,
) -> str | None:
"""Recherche un code CIM-10 pour un texte donné.
Stratégie en 3 niveaux :
1. Match substring dans domain_overrides (prioritaire, ex: CIM10_MAP existant)
2. Match exact normalisé dans le dictionnaire complet
3. Match substring normalisé avec scoring par spécificité (préfère sous-codes)
Args:
text: Le texte médical à rechercher.
domain_overrides: Dictionnaire terme→code prioritaire (ex: CIM10_MAP).
Returns:
Le code CIM-10 trouvé ou None.
"""
if not text:
return None
text_norm = normalize_text(text)
# Niveau 1 : domain overrides (substring match)
if domain_overrides:
for terme, code in domain_overrides.items():
if normalize_text(terme) in text_norm:
return code
# Niveau 2 : match exact normalisé dans le dictionnaire complet
d = load_dict()
for code, label in d.items():
if normalize_text(label) == text_norm:
return code
# Niveau 3 : substring match normalisé (plus spécifique d'abord)
entries = _get_normalized_entries()
for code, _label, norm_label in entries:
if not norm_label or len(norm_label) < 4:
continue
if norm_label in text_norm:
return code
return None
def normalize_code(code: str) -> str:
"""Normalise un code CIM-10 : K810 → K81.0, k85.1 → K85.1."""
code = code.strip().upper()
# Insérer le point si absent : K810 → K81.0
if len(code) > 3 and "." not in code:
code = code[:3] + "." + code[3:]
return code
def validate_code(code: str) -> tuple[bool, str]:
"""Vérifie si un code CIM-10 existe dans le dictionnaire.
Returns:
(is_valid, label) — label vide si invalide.
"""
d = load_dict()
normalized = normalize_code(code)
if normalized in d:
return True, d[normalized]
# Tenter aussi le code brut (3 caractères sans point)
raw = code.upper().strip()
if raw in d:
return True, d[raw]
return False, ""
def fallback_parent_code(code: str) -> str | None:
"""Tente de corriger un code invalide en remontant au code parent.
Le LLM hallucine souvent des sous-codes (.8, .9) sur des codes
standalone à 3 caractères (ex: D71.9 → D71, R69.8 → R69).
Returns:
Le code parent valide, ou None si aucun fallback trouvé.
"""
normalized = normalize_code(code)
# Extraire le code parent (3 caractères avant le point)
if "." in normalized:
parent = normalized.split(".")[0]
is_valid, _ = validate_code(parent)
if is_valid:
return parent
return None
def reset_cache() -> None:
"""Réinitialise les caches (utile pour les tests)."""
global _dict_cache, _normalized_cache
_dict_cache = None
_normalized_cache = None

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,315 @@
"""Enrichissement du contexte clinique pour les prompts LLM.
Interprète les données brutes (biologie, traitements, séjour) en informations
cliniques structurées pour améliorer la qualité du codage CIM-10.
"""
from __future__ import annotations
from ..config import DossierMedical
from .cim10_extractor import BIO_NORMALS
# Seuils d'interprétation biologique (test → liste de (seuil, direction, interprétation))
# Ordre décroissant : le premier seuil franchi donne l'interprétation
BIO_INTERPRETATIONS: dict[str, list[tuple[float, str, str]]] = {
"CRP": [
(100, "high", "syndrome inflammatoire majeur"),
(20, "high", "syndrome inflammatoire modéré"),
(5, "high", "syndrome inflammatoire mineur"),
],
"Lipasémie": [
(180, "high", "pancréatite biologique (>3N)"),
(60, "high", "élévation modérée de la lipase"),
],
"ASAT": [
(200, "high", "cytolyse hépatique majeure (>5N)"),
(80, "high", "cytolyse hépatique modérée (>2N)"),
],
"ALAT": [
(200, "high", "cytolyse hépatique majeure (>5N)"),
(80, "high", "cytolyse hépatique modérée (>2N)"),
],
"Bilirubine totale": [
(50, "high", "ictère franc"),
(17, "high", "hyperbilirubinémie modérée"),
],
"Hémoglobine": [
(7, "low", "anémie sévère (transfusion probable)"),
(10, "low", "anémie modérée"),
],
"Créatinine": [
(300, "high", "insuffisance rénale sévère"),
(150, "high", "insuffisance rénale modérée"),
],
"Plaquettes": [
(50, "low", "thrombopénie sévère"),
(100, "low", "thrombopénie modérée"),
],
"Leucocytes": [
(20, "high", "hyperleucocytose majeure (infection, inflammation)"),
(2, "low", "leucopénie sévère (aplasie, immunodépression)"),
],
}
# Médicaments → condition implicite (clé en lowercase)
TREATMENT_INDICATORS: dict[str, str] = {
"insuline": "diabète insulino-traité",
"metformine": "diabète type 2",
"héparine": "anticoagulation (risque thromboembolique)",
"enoxaparine": "anticoagulation (HBPM)",
"lovenox": "anticoagulation (HBPM)",
"warfarine": "anticoagulation au long cours (AVK)",
"fluindione": "anticoagulation au long cours (AVK)",
"amoxicilline": "antibiothérapie",
"ceftriaxone": "antibiothérapie IV",
"tazocilline": "antibiothérapie large spectre IV",
"morphine": "analgésie palier 3 (douleur sévère)",
"oxycodone": "analgésie palier 3 (douleur sévère)",
"oxygène": "oxygénothérapie (insuffisance respiratoire)",
"furosémide": "insuffisance cardiaque / rétention hydrique",
"lasilix": "insuffisance cardiaque / rétention hydrique",
}
def interpret_bio_value(test: str, value_str: str, is_abnormal: bool | None) -> str | None:
"""Retourne l'interprétation clinique d'une valeur bio, ou None si normale."""
if test not in BIO_INTERPRETATIONS:
return None
try:
val = float(value_str.replace(",", ".").replace(" ", ""))
except (ValueError, AttributeError):
return None
# Si la valeur est normale (pas anormale), pas d'interprétation
if is_abnormal is False:
return None
thresholds = BIO_INTERPRETATIONS[test]
for seuil, direction, interpretation in thresholds:
if direction == "high" and val >= seuil:
return interpretation
if direction == "low" and val <= seuil:
return interpretation
return None
def detect_treatment_indicators(traitements: list) -> list[dict]:
"""Retourne les conditions implicites détectées via les traitements.
Args:
traitements: Liste d'objets Traitement ou de dicts avec clé 'medicament'.
Returns:
Liste de dicts {medicament, condition}.
"""
results = []
seen_conditions: set[str] = set()
for t in traitements:
med = t.medicament if hasattr(t, "medicament") else t.get("medicament", "")
med_lower = med.lower().strip()
for keyword, condition in TREATMENT_INDICATORS.items():
if keyword in med_lower and condition not in seen_conditions:
results.append({"medicament": med, "condition": condition})
seen_conditions.add(condition)
break
return results
def detect_severity_markers(dossier: DossierMedical) -> list[str]:
"""Détecte les marqueurs de sévérité globaux."""
markers = []
duree = dossier.sejour.duree_sejour
if duree is not None:
if duree > 14:
markers.append(f"séjour prolongé ({duree} jours)")
elif duree > 7:
markers.append(f"séjour >7 jours ({duree} jours)")
age = dossier.sejour.age
if age is not None:
if age >= 80:
markers.append(f"patient très âgé ({age} ans)")
elif age >= 70:
markers.append(f"patient âgé ({age} ans)")
imc = dossier.sejour.imc
if imc is not None:
if imc >= 40:
markers.append(f"obésité morbide (IMC {imc})")
elif imc >= 30:
markers.append(f"obésité (IMC {imc})")
if dossier.complications:
markers.append(f"{len(dossier.complications)} complication(s)")
return markers
def build_enriched_context(dossier: DossierMedical) -> dict:
"""Construit le contexte clinique enrichi (appel unique par dossier).
Returns:
Dict avec les clés : patient, duree_sejour, antecedents,
biologie (avec interprétations), imagerie, complications,
dp_texte, das_codes_existants, interpretations_bio,
conditions_traitements, marqueurs_severite.
"""
# Données de base (compatibles avec l'ancien format)
ctx: dict = {
"sexe": dossier.sejour.sexe,
"age": dossier.sejour.age,
"duree_sejour": dossier.sejour.duree_sejour,
"imc": dossier.sejour.imc,
"antecedents": [a.texte for a in dossier.antecedents[:5]],
"biologie_cle": [(b.test, b.valeur, b.anomalie) for b in dossier.biologie_cle],
"imagerie": [(i.type, (i.conclusion or "")[:200]) for i in dossier.imagerie],
"complications": [c.texte for c in dossier.complications],
}
# Interprétations biologiques
interpretations = []
for b in dossier.biologie_cle:
interp = interpret_bio_value(b.test, b.valeur or "", b.anomalie)
if interp:
# Ajouter l'unité si connue
unit = ""
if b.test in ("CRP",):
unit = " mg/L"
elif b.test in ("Lipasémie", "ASAT", "ALAT", "GGT", "PAL"):
unit = " UI/L"
elif b.test in ("Bilirubine totale", "Créatinine"):
unit = " µmol/L"
elif b.test in ("Hémoglobine",):
unit = " g/dL"
elif b.test in ("Plaquettes", "Leucocytes"):
unit = " G/L"
interpretations.append({
"test": b.test,
"valeur": f"{b.valeur}{unit}",
"interpretation": interp,
})
ctx["interpretations_bio"] = interpretations
# Conditions implicites via traitements
ctx["conditions_traitements"] = detect_treatment_indicators(dossier.traitements_sortie)
# Marqueurs de sévérité
ctx["marqueurs_severite"] = detect_severity_markers(dossier)
return ctx
def format_enriched_context(context: dict) -> str:
"""Formate le contexte enrichi en texte structuré pour le prompt.
Inclut les mêmes sections que l'ancien _format_contexte() PLUS :
interprétations bio, conditions implicites traitements, marqueurs sévérité.
"""
lines = []
# Patient
sexe = context.get("sexe")
age = context.get("age")
imc = context.get("imc")
patient_parts = []
if sexe:
patient_parts.append(sexe)
if age:
patient_parts.append(f"{age} ans")
if imc:
patient_parts.append(f"IMC {imc}")
if patient_parts:
lines.append(f"- Patient : {', '.join(str(p) for p in patient_parts)}")
# Durée de séjour
duree = context.get("duree_sejour")
if duree:
lines.append(f"- Durée séjour : {duree} jours")
# Antécédents
antecedents = context.get("antecedents")
if antecedents:
lines.append(f"- Antécédents : {', '.join(antecedents[:5])}")
# Biologie (avec normes)
biologie = context.get("biologie_cle")
if biologie:
bio_parts = []
for b in biologie:
test, valeur, anomalie = (
b if isinstance(b, (list, tuple))
else (b.get("test"), b.get("valeur"), b.get("anomalie"))
)
norme_str = ""
if test in BIO_NORMALS:
lo, hi = BIO_NORMALS[test]
lo_s = int(lo) if lo == int(lo) else lo
hi_s = int(hi) if hi == int(hi) else hi
norme_str = f" [N: {lo_s}-{hi_s}]"
marker = " (\u2191)" if anomalie else ""
bio_parts.append(f"{test} {valeur}{norme_str}{marker}")
lines.append(f"- Biologie : {', '.join(bio_parts)}")
# Imagerie
imagerie = context.get("imagerie")
if imagerie:
for img in imagerie:
img_type, conclusion = (
img if isinstance(img, (list, tuple))
else (img.get("type"), img.get("conclusion"))
)
if conclusion:
lines.append(f"- Imagerie : {img_type}{conclusion[:200]}")
# Complications
complications = context.get("complications")
if complications:
lines.append(f"- Complications : {', '.join(complications)}")
# DP du séjour
dp_texte = context.get("dp_texte")
if dp_texte:
lines.append(f"- DP du séjour : {dp_texte}")
# DAS déjà codés
das_codes = context.get("das_codes_existants")
if das_codes:
lines.append(f"- DAS déjà codés : {', '.join(das_codes)}")
# --- Sections enrichies ---
# Interprétations biologiques
interpretations = context.get("interpretations_bio", [])
if interpretations:
interp_parts = [
f"{i['test']} {i['valeur']} \u2192 {i['interpretation']}"
for i in interpretations
]
lines.append(f"\nINTERPRÉTATION CLINIQUE :")
lines.append(f"- Biologie : {' ; '.join(interp_parts)}")
# Conditions implicites via traitements
conditions = context.get("conditions_traitements", [])
if conditions:
cond_parts = [
f"{c['medicament']} \u2192 {c['condition']}"
for c in conditions
]
if not interpretations:
lines.append(f"\nINTERPRÉTATION CLINIQUE :")
lines.append(f"- Traitements indicatifs : {' ; '.join(cond_parts)}")
# Marqueurs de sévérité
marqueurs = context.get("marqueurs_severite", [])
if marqueurs:
if not interpretations and not conditions:
lines.append(f"\nINTERPRÉTATION CLINIQUE :")
lines.append(f"- Marqueurs de sévérité : {', '.join(marqueurs)}")
return "\n".join(lines) if lines else "Non précisé"

View File

@@ -0,0 +1,152 @@
"""Filtrage des diagnostics associés parasites (artefacts OCR trackare)."""
import re
import unicodedata
# Corrections de codes CIM-10 systématiquement mal attribués par le LLM
# D55.9 (anémie enzymatique) est proposé pour "Anémie" non qualifiée → D64.9
CODE_CORRECTIONS: dict[str, dict] = {
"D55.9": {
"correct_code": "D64.9",
"condition_texte": r"^an[ée]mie$", # uniquement si texte = "Anémie" seul
"reason": "Anémie non qualifiée → D64.9 (sans précision), pas D55.9 (enzymatique)",
},
}
def clean_diagnostic_text(text: str) -> str:
"""Nettoie un texte de diagnostic (newlines, ponctuation trailing, espaces)."""
text = text.replace("\n", " ")
text = re.sub(r"\s+", " ", text).strip()
text = text.rstrip(",.;:!")
return text
def is_valid_diagnostic_text(text: str) -> bool:
"""Retourne True si le texte ressemble à un diagnostic médical légitime."""
t = text.strip()
# 1. Trop court
if len(t) < 3:
return False
# 2. Chiffres purs (>= 50% de chiffres)
digits = sum(c.isdigit() for c in t)
if digits >= len(t) * 0.5:
return False
# 3. Lettre + chiffres OCR : "H 51", "À 08", "H\n10", "K 3.6", "B 12,5"
if re.match(r"^[A-ZÀ-Ú]\s*\d{1,3}([.,]\d+)?$", t):
return False
# 4. Mots concaténés et/ou répétés avec espaces : "VentilationVentilation Ventilation..."
if re.match(r"^([a-zà-ÿ]{3,})(\s*\1)+\s*$", t, re.IGNORECASE):
return False
# 5. Mots répétés : tous identiques ("Absence absence", "Anticoagulant anticoagulant")
# ou ≥ 3 occurrences du même mot
words = t.lower().split()
if len(words) >= 2:
if len(set(words)) == 1:
return False
from collections import Counter
counts = Counter(words)
if counts.most_common(1)[0][1] >= 3:
return False
# 6. Fragments non-médicaux
if re.match(r"^(De |Du |Des |]\s)", t):
return False
if t in {"Isolement", "Pp 500"}:
return False
# 7. Ponctuation initiale (artefacts OCR) : ", sans précision"
if re.match(r'^[,.\-;:!)\]]\s', t):
return False
# 8. Pattern "À X.X" / "A X.X" (valeurs numériques OCR)
if re.match(r'^[ÀA]\s+\d+([.,]\d+)?$', t):
return False
# 9. Crochets (artefacts OCR) : "Episode [episode"
if '[' in t or ']' in t:
return False
# 10. Termes de laboratoire isolés (un seul mot ≠ diagnostic)
_LAB_TERMS = {"hémoglobine", "créatinine", "plaquettes", "leucocytes", "glycémie",
"natrémie", "kaliémie", "calcémie", "bilirubine", "albumine",
"fibrinogène", "hématocrite", "cétonurie", "glycosurie"}
if t.lower() in _LAB_TERMS:
return False
# 11. Fragments anatomiques courts sans pathologie : "Dans la vessie", "Le rein"
if re.match(r'^(Dans |La |Le |Les |Au |Aux )', t) and len(t) < 30:
return False
# 12. En-têtes de systèmes anatomiques (catégories sans pathologie)
_ANATOMICAL_HEADERS = {
"musculaire", "squelettique", "cardiovasculaire", "pulmonaire",
"neurologique", "digestif", "digestive", "hépatique", "rénal",
"rénale", "urinaire", "cutané", "cutanée", "articulaire",
"osseux", "osseuse", "gastrique", "intestinal", "intestinale",
"cérébral", "thoracique", "abdominal", "abdominale",
}
if len(words) == 1 and t.lower() in _ANATOMICAL_HEADERS:
return False
# Catégorie + description vague : "Musculaire - masse musculaire"
if re.match(r'^[A-ZÀ-Ú][a-zà-ÿ]+ - (masse|zone|région|état|bilan)', t, re.IGNORECASE):
return False
return True
# Paires de redondance sémantique CIM-10 en PMSI
# Format: (dominated_prefix, dominant_prefixes)
# Si un code commençant par dominated_prefix ET un code commençant par un dominant_prefix
# sont tous deux en DAS, le dominated est supprimé.
SEMANTIC_REDUNDANCIES: list[tuple[str, list[str]]] = [
# I10 (HTA essentielle) redondant si I11/I12/I13 présent (cardio/néphropathie hypertensive)
("I10", ["I11", "I12", "I13"]),
# N30 (cystite) redondant si N39.0 présent (infection urinaire)
("N30", ["N39"]),
# J18 (pneumonie SAI) redondant si J15/J16 présent (pneumonie spécifique)
("J18", ["J15", "J16"]),
]
def apply_semantic_dedup(das_list: list) -> list:
"""Retire les DAS rendus redondants par la présence d'un code plus spécifique.
Utilise SEMANTIC_REDUNDANCIES pour déterminer les paires dominé/dominant.
Accepte une liste de Diagnostic (avec attribut cim10_suggestion).
"""
codes_present = {d.cim10_suggestion for d in das_list if d.cim10_suggestion}
to_remove: set[str] = set()
for dominated_prefix, dominant_prefixes in SEMANTIC_REDUNDANCIES:
dominated_codes = [c for c in codes_present if c.startswith(dominated_prefix)]
if not dominated_codes:
continue
has_dominant = any(
c.startswith(dp) for c in codes_present for dp in dominant_prefixes
)
if has_dominant:
to_remove.update(dominated_codes)
if not to_remove:
return das_list
return [d for d in das_list if d.cim10_suggestion not in to_remove]
def correct_known_miscodes(code: str, texte: str) -> str | None:
"""Corrige les codes CIM-10 systématiquement mal attribués par le LLM.
Returns:
Le code corrigé, ou None si pas de correction nécessaire.
"""
correction = CODE_CORRECTIONS.get(code)
if not correction:
return None
if re.match(correction["condition_texte"], texte.strip(), re.IGNORECASE):
return correction["correct_code"]
return None

View File

@@ -0,0 +1,140 @@
"""Pipeline edsnlp pour l'extraction médicale (CIM-10, médicaments, négation)."""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
_nlp = None
_available = None
@dataclass
class CIM10Entity:
texte: str
code: str
negation: bool = False
hypothese: bool = False
@dataclass
class DrugEntity:
texte: str
code_atc: Optional[str] = None
negation: bool = False
@dataclass
class DateEntity:
texte: str
value: Optional[str] = None
@dataclass
class EdsnlpResult:
cim10_entities: list[CIM10Entity] = field(default_factory=list)
drug_entities: list[DrugEntity] = field(default_factory=list)
date_entities: list[DateEntity] = field(default_factory=list)
def is_available() -> bool:
"""Vérifie si edsnlp est installé et utilisable."""
global _available
if _available is not None:
return _available
try:
import edsnlp # noqa: F401
_available = True
except ImportError:
_available = False
return _available
def get_pipeline():
"""Retourne le pipeline edsnlp (singleton lazy-loaded)."""
global _nlp
if _nlp is not None:
return _nlp
if not is_available():
raise RuntimeError("edsnlp n'est pas installé")
import edsnlp
logger.info("Initialisation du pipeline edsnlp...")
nlp = edsnlp.blank("eds")
nlp.add_pipe("eds.normalizer")
nlp.add_pipe("eds.sentences")
nlp.add_pipe("eds.cim10", config=dict(attr="NORM", term_matcher="simstring"))
nlp.add_pipe("eds.drugs", config=dict(attr="NORM", term_matcher="exact"))
nlp.add_pipe("eds.negation")
nlp.add_pipe("eds.hypothesis")
nlp.add_pipe("eds.dates")
_nlp = nlp
logger.info("Pipeline edsnlp initialisé avec succès")
return _nlp
def analyze(text: str) -> EdsnlpResult:
"""Analyse un texte médical avec edsnlp.
Retourne les entités CIM-10, médicaments et dates détectées.
"""
result = EdsnlpResult()
if not is_available():
return result
try:
nlp = get_pipeline()
doc = nlp(text)
except Exception:
logger.exception("Erreur lors de l'analyse edsnlp")
return result
for ent in doc.ents:
negation = getattr(ent._, "negation", False) or False
hypothese = getattr(ent._, "hypothesis", False) or False
if ent.label_ == "cim10":
code = ent.kb_id_ or ""
if code:
result.cim10_entities.append(CIM10Entity(
texte=ent.text,
code=code,
negation=negation,
hypothese=hypothese,
))
elif ent.label_ == "drug":
code_atc = ent.kb_id_ or None
result.drug_entities.append(DrugEntity(
texte=ent.text,
code_atc=code_atc,
negation=negation,
))
# Dates
for span in doc.spans.get("dates", []):
date_value = None
if hasattr(span._, "date"):
date_obj = span._.date
if date_obj is not None:
date_value = str(date_obj)
result.date_entities.append(DateEntity(
texte=span.text,
value=date_value,
))
return result
def reset():
"""Réinitialise le pipeline (utile pour les tests)."""
global _nlp, _available
_nlp = None
_available = None

View File

@@ -0,0 +1,169 @@
"""Règles d'exclusion diagnostique : symptôme (Chapitre XVIII) vs diagnostic précis.
Lorsqu'un symptôme (R00-R99) et un diagnostic précis (Chapitres I-XIV, A00-N99)
coexistent et que le symptôme est expliqué par le diagnostic précis, le symptôme
ne doit PAS être codé comme DAS (règle ATIH de non-redondance).
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
def is_symptom_code(code: str) -> bool:
"""Vérifie si un code CIM-10 appartient au Chapitre XVIII (R00-R99 = Symptômes)."""
if not code:
return False
return bool(re.match(r"^R\d{2}", code, re.IGNORECASE))
def is_precise_diagnosis(code: str) -> bool:
"""Vérifie si un code CIM-10 appartient aux Chapitres I-XIV (A00-N99)."""
if not code:
return False
return bool(re.match(r"^[A-N]\d{2}", code, re.IGNORECASE))
# Mapping R-code → set de codes précis qui excluent le symptôme.
# Chaque R-code est exclu si l'un des codes précis (ou un code commençant par
# l'une de ces racines) est présent parmi les diagnostics du séjour.
EXCLUSION_MAP: dict[str, set[str]] = {
# R10 — Douleur abdominale → exclu par pathologies digestives précises
"R10": {"K35", "K80", "K81", "K83", "K85", "K86", "K56", "K57", "K25", "K26", "K29"},
"R10.1": {"K80", "K81", "K83"}, # Douleur hypochondre droit
"R10.3": {"K35", "K36", "K37"}, # Douleur hypogastre
"R10.4": {"K35", "K80", "K85", "K56", "K57"}, # Douleur abdominale autre/non précisée
# R11 — Nausées et vomissements
"R11": {"K29", "K80", "K81", "K85", "K56", "K91"},
# R17 — Ictère → exclu par pathologies hépatobiliaires
"R17": {"K80", "K83", "K70", "K71", "K72", "K73", "K74", "B15", "B16", "B17", "B18", "B19", "C22"},
# R50 — Fièvre → exclu par infections précises
"R50": {"A41", "J18", "J15", "J13", "J14", "J06", "N10", "N39", "K81", "K83",
"L03", "T81", "A09", "A04"},
"R50.9": {"A41", "J18", "J15", "J13", "J14", "N10", "N39", "K81"},
# R07 — Douleur thoracique → exclu par pathologies cardiaques/pulmonaires
"R07": {"I20", "I21", "I22", "I23", "I24", "I25", "I26", "J18", "J93"},
"R07.4": {"I20", "I21", "I24", "I25"},
# R06 — Dyspnée → exclu par pathologies respiratoires/cardiaques
"R06": {"J18", "J44", "J45", "J96", "I50", "I26"},
"R06.0": {"J18", "J44", "J45", "J96", "I50", "I26"},
# R31 — Hématurie → exclu par pathologies urologiques/rénales
"R31": {"N20", "N13", "C64", "C67", "N02", "N00", "N01"},
# R04 — Hémoptysie → exclu par pathologies pulmonaires
"R04": {"J18", "C34", "I26", "A16"},
# R63.4 — Perte de poids → exclu par tumeurs, infections chroniques
"R63.4": {"C15", "C16", "C18", "C19", "C20", "C22", "C25", "C34", "C50",
"A15", "A16", "B20", "B21", "B22", "B23", "B24", "E46"},
# R00 — Anomalies du rythme cardiaque → exclu par troubles du rythme précis
"R00": {"I47", "I48", "I49"},
"R00.0": {"I47", "I48"}, # Tachycardie
"R00.1": {"I49.5", "I49.8"}, # Bradycardie
}
def _code_matches(code: str, roots: set[str]) -> bool:
"""Vérifie si un code CIM-10 commence par l'une des racines données."""
if not code:
return False
code_upper = code.upper()
for root in roots:
if code_upper.startswith(root.upper()):
return True
return False
@dataclass
class ExclusionResult:
"""Résultat de l'application des règles d'exclusion."""
cleaned_das: list # Diagnostics DAS conservés
excluded: list # Diagnostics DAS exclus
warnings: list[str] = field(default_factory=list)
def check_exclusions(dp, das_list: list) -> ExclusionResult:
"""Applique les règles d'exclusion symptôme vs diagnostic précis.
Args:
dp: Diagnostic principal (objet avec attribut cim10_suggestion).
das_list: Liste des diagnostics associés (même type).
Returns:
ExclusionResult avec les DAS nettoyés, exclus, et les warnings.
"""
# Collecter tous les codes du séjour (DP + DAS)
all_codes: list[str] = []
if dp and dp.cim10_suggestion:
all_codes.append(dp.cim10_suggestion)
for das in das_list:
if das.cim10_suggestion:
all_codes.append(das.cim10_suggestion)
# Identifier les codes précis présents (Chapitres I-XIV)
precise_codes = [c for c in all_codes if is_precise_diagnosis(c)]
cleaned = []
excluded = []
warnings = []
for das in das_list:
code = das.cim10_suggestion
if not code or not is_symptom_code(code):
# Non-symptôme : toujours conservé
cleaned.append(das)
continue
# Vérifier si ce symptôme est exclu par un diagnostic précis
should_exclude = False
excluding_code = None
# Chercher dans EXCLUSION_MAP : d'abord le code exact, puis la racine (3 chars)
exclusion_roots = EXCLUSION_MAP.get(code.upper())
if exclusion_roots is None:
# Essayer la racine 3 caractères (ex: R10.4 → R10)
root3 = code.upper()[:3]
exclusion_roots = EXCLUSION_MAP.get(root3)
if exclusion_roots:
for precise in precise_codes:
if _code_matches(precise, exclusion_roots):
should_exclude = True
excluding_code = precise
break
if should_exclude:
excluded.append(das)
warnings.append(
f"DAS '{das.texte}' ({code}) exclu : symptôme redondant avec "
f"le diagnostic précis {excluding_code}"
)
else:
cleaned.append(das)
# Vérifier aussi si le DP est un symptôme avec un diagnostic précis en DAS
if dp and dp.cim10_suggestion and is_symptom_code(dp.cim10_suggestion):
dp_code = dp.cim10_suggestion
exclusion_roots = EXCLUSION_MAP.get(dp_code.upper())
if exclusion_roots is None:
exclusion_roots = EXCLUSION_MAP.get(dp_code.upper()[:3])
if exclusion_roots:
for precise in precise_codes:
if _code_matches(precise, exclusion_roots):
warnings.append(
f"ALERTE DP : le DP '{dp.texte}' ({dp_code}) est un symptôme "
f"alors qu'un diagnostic précis {precise} est présent — "
f"vérifier si le DP devrait être changé"
)
break
return ExclusionResult(cleaned_das=cleaned, excluded=excluded, warnings=warnings)

View File

@@ -0,0 +1,294 @@
"""Fusion de dossiers médicaux multi-PDFs pour un même patient.
Combine les informations de plusieurs documents (Trackare, CRH, CRO) en un
dossier unique avec des règles de priorité et de déduplication.
"""
from __future__ import annotations
import logging
from ..config import (
ActeCCAM,
Antecedent,
BiologieCle,
Complication,
Diagnostic,
DossierMedical,
Imagerie,
Sejour,
Traitement,
)
from ..medical.das_filter import is_valid_diagnostic_text, apply_semantic_dedup
from ..medical.cim10_extractor import _is_dp_family_redundant
logger = logging.getLogger(__name__)
# Priorité des types de documents pour les données de séjour
_DOC_PRIORITY = {"trackare": 0, "crh": 1, "cro": 2}
def _cim10_specificity(code: str | None) -> int:
"""Score de spécificité d'un code CIM-10 : longueur sans le point."""
if not code:
return 0
return len(code.replace(".", ""))
def _prefer_most_specific_dp(dossiers: list[DossierMedical]) -> Diagnostic | None:
"""Sélectionne le DP le plus spécifique parmi tous les dossiers."""
candidates: list[tuple[Diagnostic, int]] = []
for d in dossiers:
if d.diagnostic_principal:
spec = _cim10_specificity(d.diagnostic_principal.cim10_suggestion)
candidates.append((d.diagnostic_principal, spec))
if not candidates:
return None
# Tri : spécificité décroissante, puis confiance (high > medium > low)
conf_order = {"high": 0, "medium": 1, "low": 2}
candidates.sort(
key=lambda x: (-x[1], conf_order.get(x[0].cim10_confidence or "", 3))
)
return candidates[0][0]
def _merge_sejour(dossiers: list[DossierMedical]) -> Sejour:
"""Fusionne les informations de séjour avec priorité Trackare > CRH > CRO."""
# Trier par priorité de type de document
sorted_dossiers = sorted(
dossiers,
key=lambda d: _DOC_PRIORITY.get(d.document_type, 99),
)
merged = Sejour()
for d in sorted_dossiers:
s = d.sejour
if s.sexe and not merged.sexe:
merged.sexe = s.sexe
if s.age is not None and merged.age is None:
merged.age = s.age
if s.date_entree and not merged.date_entree:
merged.date_entree = s.date_entree
if s.date_sortie and not merged.date_sortie:
merged.date_sortie = s.date_sortie
if s.duree_sejour is not None and merged.duree_sejour is None:
merged.duree_sejour = s.duree_sejour
if s.mode_entree and not merged.mode_entree:
merged.mode_entree = s.mode_entree
if s.mode_sortie and not merged.mode_sortie:
merged.mode_sortie = s.mode_sortie
if s.imc is not None and merged.imc is None:
merged.imc = s.imc
if s.poids is not None and merged.poids is None:
merged.poids = s.poids
if s.taille is not None and merged.taille is None:
merged.taille = s.taille
return merged
def _is_enriched(d: Diagnostic) -> bool:
"""Retourne True si le diagnostic a une justification RAG."""
return bool(d.justification or d.sources_rag)
def _dedup_diagnostics(all_das: list[Diagnostic]) -> list[Diagnostic]:
"""Déduplique les diagnostics associés par code CIM-10, garde la meilleure confiance."""
conf_order = {"high": 0, "medium": 1, "low": 2}
seen: dict[str | None, Diagnostic] = {}
for d in all_das:
key = d.cim10_suggestion
if key is None:
# Sans code, dédup par texte normalisé
key = f"__text__{d.texte.lower().strip()}"
if key not in seen:
seen[key] = d
else:
existing = seen[key]
new_conf = conf_order.get(d.cim10_confidence or "", 3)
old_conf = conf_order.get(existing.cim10_confidence or "", 3)
# Garder celui avec la meilleure confiance, ou à confiance égale celui enrichi
if new_conf < old_conf or (new_conf == old_conf and _is_enriched(d) and not _is_enriched(existing)):
seen[key] = d
# Supprimer les codes parents quand un code plus spécifique existe
# Ex: K85 retiré si K85.9 présent (K85 est préfixe strict de K859)
codes = {k for k in seen if k and not k.startswith("__text__")}
normalized = {c: c.replace(".", "") for c in codes}
parents_to_remove: set[str] = set()
for code_a in codes:
norm_a = normalized[code_a]
for code_b in codes:
if code_a == code_b:
continue
norm_b = normalized[code_b]
if norm_b.startswith(norm_a) and len(norm_b) > len(norm_a):
parents_to_remove.add(code_a)
break
for parent in parents_to_remove:
del seen[parent]
return list(seen.values())
def _dedup_actes(all_actes: list[ActeCCAM]) -> list[ActeCCAM]:
"""Déduplique les actes CCAM par code."""
seen: dict[str | None, ActeCCAM] = {}
for a in all_actes:
key = a.code_ccam_suggestion
if key is None:
key = f"__text__{a.texte.lower().strip()}"
if key not in seen:
seen[key] = a
else:
existing = seen[key]
# Garder celui avec date si possible
if a.date and not existing.date:
seen[key] = a
return list(seen.values())
def merge_dossiers(dossiers: list[DossierMedical]) -> DossierMedical:
"""Fusionne plusieurs dossiers médicaux d'un même patient.
Args:
dossiers: Liste de DossierMedical issus de PDFs différents.
Returns:
Un DossierMedical fusionné.
"""
if len(dossiers) == 1:
result = dossiers[0].model_copy(deep=True)
result.source_files = [result.source_file]
# Appliquer la dédup famille DP + sémantique même pour un seul dossier
dp_code = result.diagnostic_principal.cim10_suggestion if result.diagnostic_principal else None
if dp_code:
result.diagnostics_associes = [
d for d in result.diagnostics_associes
if not d.cim10_suggestion or not _is_dp_family_redundant(d.cim10_suggestion, dp_code)
]
result.diagnostics_associes = apply_semantic_dedup(result.diagnostics_associes)
return result
merged = DossierMedical()
# Source files
merged.source_files = [d.source_file for d in dossiers if d.source_file]
# Séjour
merged.sejour = _merge_sejour(dossiers)
# Diagnostic principal : le plus spécifique
merged.diagnostic_principal = _prefer_most_specific_dp(dossiers)
# Collecter tous les DAS + DP non retenus comme DAS
all_das: list[Diagnostic] = []
for d in dossiers:
all_das.extend(d.diagnostics_associes)
# Si le DP de ce dossier est différent du DP fusionné, l'ajouter comme DAS
# mais seulement si le texte est un diagnostic valide (filtre artefacts OCR)
if (
d.diagnostic_principal
and merged.diagnostic_principal
and d.diagnostic_principal.cim10_suggestion
!= merged.diagnostic_principal.cim10_suggestion
and is_valid_diagnostic_text(d.diagnostic_principal.texte)
):
all_das.append(d.diagnostic_principal)
merged.diagnostics_associes = _dedup_diagnostics(all_das)
# Retirer les DAS redondants avec le DP (même code, famille, parent/enfant)
dp_code = merged.diagnostic_principal.cim10_suggestion if merged.diagnostic_principal else None
if dp_code:
merged.diagnostics_associes = [
d for d in merged.diagnostics_associes
if not d.cim10_suggestion or not _is_dp_family_redundant(d.cim10_suggestion, dp_code)
]
# Redondances sémantiques entre DAS
merged.diagnostics_associes = apply_semantic_dedup(merged.diagnostics_associes)
# Actes CCAM
all_actes: list[ActeCCAM] = []
for d in dossiers:
all_actes.extend(d.actes_ccam)
merged.actes_ccam = _dedup_actes(all_actes)
# Biologie : union, dédup par (test, valeur)
bio_seen: set[tuple[str, str | None]] = set()
for d in dossiers:
for b in d.biologie_cle:
key = (b.test, b.valeur)
if key not in bio_seen:
merged.biologie_cle.append(b)
bio_seen.add(key)
# Imagerie : union, dédup par (type, conclusion)
img_seen: set[tuple[str, str | None]] = set()
for d in dossiers:
for i in d.imagerie:
key = (i.type, i.conclusion)
if key not in img_seen:
merged.imagerie.append(i)
img_seen.add(key)
# Traitements : union, dédup par médicament (normalisé)
med_seen: set[str] = set()
for d in dossiers:
for t in d.traitements_sortie:
key = t.medicament.lower().strip()
if key not in med_seen:
merged.traitements_sortie.append(t)
med_seen.add(key)
# Antécédents : union, dédup par texte normalisé
ant_seen: set[str] = set()
for d in dossiers:
for a in d.antecedents:
key = a.texte.lower().strip()
if key not in ant_seen:
merged.antecedents.append(a)
ant_seen.add(key)
# Complications : union, dédup par texte normalisé
comp_seen: set[str] = set()
for d in dossiers:
for c in d.complications:
key = c.texte.lower().strip()
if key not in comp_seen:
merged.complications.append(c)
comp_seen.add(key)
# Alertes : alerte de fusion en tête + union
merged.alertes_codage = [f"FUSION: {len(dossiers)} documents fusionnés"]
alert_seen: set[str] = set()
for d in dossiers:
for a in d.alertes_codage:
if a not in alert_seen:
merged.alertes_codage.append(a)
alert_seen.add(a)
# Document type : le type prioritaire
sorted_by_prio = sorted(
dossiers,
key=lambda d: _DOC_PRIORITY.get(d.document_type, 99),
)
merged.document_type = sorted_by_prio[0].document_type
logger.info(
"Fusion de %d dossiers : DP=%s, %d DAS, %d actes",
len(dossiers),
merged.diagnostic_principal.cim10_suggestion if merged.diagnostic_principal else "aucun",
len(merged.diagnostics_associes),
len(merged.actes_ccam),
)
return merged

View File

@@ -0,0 +1,225 @@
"""Estimation heuristique du GHM (Groupe Homogène de Malades).
L'algorithme officiel (ATIH FG-MCO) est propriétaire. Ce module fournit une
estimation approximative utile comme pré-codage / aide au DIM :
1. CMD depuis le DP (table de plages CIM-10)
2. Type de prise en charge depuis les actes CCAM
3. Sévérité depuis les CMA/CMS
4. Construction du code GHM approximatif
"""
from __future__ import annotations
import bisect
from typing import Optional
from ..config import DossierMedical, GHMEstimation
# ---------------------------------------------------------------------------
# Table CIM-10 → CMD (Catégorie Majeure de Diagnostic)
# Triée par borne inférieure pour lookup par bisect.
# Format : (debut, fin, cmd, libelle)
# ---------------------------------------------------------------------------
_CMD_RANGES: list[tuple[str, str, str, str]] = [
("A00", "A99", "18", "Maladies infectieuses et parasitaires"),
("B00", "B19", "18", "Maladies infectieuses et parasitaires"),
("B20", "B24", "25", "Maladies dues au VIH"),
("B25", "B99", "18", "Maladies infectieuses et parasitaires"),
("C00", "C97", "17", "Tumeurs malignes"),
("D00", "D09", "17", "Tumeurs malignes"),
("D10", "D48", "16", "Tumeurs bénignes, hémopathies"),
("D50", "D89", "16", "Tumeurs bénignes, hémopathies"),
("E00", "E07", "10", "Maladies endocriniennes"),
("E10", "E14", "10", "Maladies endocriniennes"),
("E15", "E46", "10", "Maladies endocriniennes"),
("E47", "E90", "10", "Maladies endocriniennes"),
("F00", "F09", "19", "Maladies mentales"),
("F10", "F19", "20", "Troubles mentaux liés à l'alcool et aux toxiques"),
("F20", "F99", "19", "Maladies mentales"),
("G00", "G99", "01", "Affections du système nerveux"),
("H00", "H59", "02", "Affections de l'oeil"),
("H60", "H95", "03", "Affections ORL"),
("I00", "I99", "05", "Affections de l'appareil circulatoire"),
("J00", "J99", "04", "Affections de l'appareil respiratoire"),
("K00", "K67", "06", "Affections du tube digestif"),
("K70", "K87", "07", "Affections hépatobiliaires et pancréatiques"),
("K90", "K93", "06", "Affections du tube digestif"),
("L00", "L99", "09", "Affections de la peau"),
("M00", "M99", "08", "Affections du système ostéo-articulaire"),
("N00", "N39", "11", "Affections du rein et des voies urinaires"),
("N40", "N51", "12", "Affections de l'appareil génital masculin"),
("N60", "N98", "13", "Affections de l'appareil génital féminin"),
("N99", "N99", "11", "Affections du rein et des voies urinaires"),
("O00", "O99", "14", "Grossesses, accouchements, post-partum"),
("P00", "P96", "15", "Nouveau-nés, période périnatale"),
("Q00", "Q99", "15", "Nouveau-nés, période périnatale"),
("R00", "R99", "23", "Facteurs influençant l'état de santé (symptômes)"),
("S00", "S99", "21", "Traumatismes"),
("T00", "T19", "21", "Traumatismes"),
("T20", "T32", "22", "Brûlures"),
("T33", "T98", "21", "Traumatismes"),
("U00", "U99", "26", "Catégories spéciales"),
("V00", "Y98", "24", "Causes externes"),
("Z00", "Z99", "23", "Facteurs influençant l'état de santé"),
]
# Pré-calcul : liste triée des bornes inférieures pour bisect
_CMD_STARTS = [r[0] for r in _CMD_RANGES]
def find_cmd(code_cim10: str) -> tuple[Optional[str], Optional[str]]:
"""Trouve la CMD correspondant à un code CIM-10.
Returns:
(cmd, libelle) ou (None, None) si non trouvé.
"""
if not code_cim10:
return None, None
# Normaliser : majuscules, retirer le point
code = code_cim10.upper().replace(".", "").strip()
if len(code) < 3:
return None, None
# Prendre les 3 premiers caractères pour le lookup
code3 = code[:3]
# bisect pour trouver la plage candidate
idx = bisect.bisect_right(_CMD_STARTS, code3) - 1
if idx < 0:
return None, None
debut, fin, cmd, libelle = _CMD_RANGES[idx]
if debut <= code3 <= fin:
return cmd, libelle
return None, None
# ---------------------------------------------------------------------------
# Préfixes CCAM classants (chirurgicaux)
# Les codes CCAM commençant par ces lettres correspondent à des organes
# et sont considérés chirurgicaux quand ils désignent un acte opératoire.
# ---------------------------------------------------------------------------
_CCAM_CHIRURGICAL_PREFIXES = {"H", "J", "K", "L", "N", "P", "Q"}
# Préfixes interventionnels (imagerie, endoscopie)
_CCAM_INTERVENTIONNEL_PREFIXES = {"Z", "Y"}
def _detect_type_ghm(actes_ccam: list) -> str:
"""Détermine le type de prise en charge depuis les actes CCAM.
Returns:
"C" (chirurgical), "K" (interventionnel) ou "M" (médical).
"""
has_chirurgical = False
has_interventionnel = False
for acte in actes_ccam:
code = acte.code_ccam_suggestion
if not code or len(code) < 4:
continue
prefix = code[0].upper()
if prefix in _CCAM_CHIRURGICAL_PREFIXES:
has_chirurgical = True
break
if prefix in _CCAM_INTERVENTIONNEL_PREFIXES:
has_interventionnel = True
if has_chirurgical:
return "C"
if has_interventionnel:
return "K"
return "M"
def _compute_severity(das_list: list) -> tuple[int, int, int]:
"""Calcule le niveau de sévérité à partir des DAS.
Utilise le max des niveau_cma officiels ATIH quand disponibles,
avec fallback sur le comptage CMA/CMS.
Returns:
(niveau, cma_count, cms_count)
"""
cma_count = 0
cms_count = 0
max_cma_level = 1
for das in das_list:
niveau_cma = getattr(das, "niveau_cma", None)
if niveau_cma and niveau_cma > 1:
max_cma_level = max(max_cma_level, niveau_cma)
if getattr(das, "est_cma", False):
cma_count += 1
if getattr(das, "est_cms", False):
cms_count += 1
# Priorité au niveau CMA officiel ATIH
if max_cma_level > 1:
niveau = max_cma_level
elif cms_count >= 2:
niveau = 4
elif cms_count >= 1 or cma_count >= 3:
niveau = 3
elif cma_count >= 2:
niveau = 2
else:
niveau = 1
return niveau, cma_count, cms_count
def estimate_ghm(dossier: DossierMedical) -> GHMEstimation:
"""Estime le GHM d'un dossier médical.
Heuristique en 4 étapes :
1. CMD depuis le DP
2. Type de prise en charge depuis les actes CCAM
3. Sévérité depuis les CMA/CMS
4. Construction du code approximatif
"""
estimation = GHMEstimation()
# 1. CMD depuis le DP
dp = dossier.diagnostic_principal
dp_code = dp.cim10_suggestion if dp else None
if not dp:
estimation.alertes.append("DP absent — CMD non déterminable")
elif not dp_code:
estimation.alertes.append("DP sans code CIM-10 — CMD non déterminable")
else:
cmd, libelle = find_cmd(dp_code)
if cmd:
estimation.cmd = cmd
estimation.cmd_libelle = libelle
else:
estimation.alertes.append(f"CMD inconnue pour le code {dp_code}")
# Alerte DP symptomatique
code_letter = dp_code.upper().replace(".", "").strip()[:1]
if code_letter in ("R", "Z"):
estimation.alertes.append(
f"DP symptomatique ({dp_code}) — risque de CMD 23, impact tarif"
)
# 2. Type de prise en charge
estimation.type_ghm = _detect_type_ghm(dossier.actes_ccam)
# 3. Sévérité
niveau, cma_count, cms_count = _compute_severity(dossier.diagnostics_associes)
estimation.severite = niveau
estimation.cma_count = cma_count
estimation.cms_count = cms_count
# 4. Code approximatif
if estimation.cmd and estimation.type_ghm:
estimation.ghm_approx = f"{estimation.cmd}{estimation.type_ghm}??{estimation.severite}"
return estimation

View File

@@ -0,0 +1,85 @@
"""Cache persistant thread-safe pour les résultats Ollama."""
from __future__ import annotations
import json
import logging
import threading
from pathlib import Path
logger = logging.getLogger(__name__)
class OllamaCache:
"""Cache JSON persistant pour éviter les appels Ollama redondants.
Clé = (texte_diagnostic_normalisé, type).
Le modèle Ollama est stocké dans les métadonnées : si le modèle change,
le cache est automatiquement invalidé.
"""
def __init__(self, cache_path: Path, model: str):
self._path = cache_path
self._model = model
self._lock = threading.Lock()
self._data: dict[str, dict] = {}
self._dirty = False
self._load()
def _load(self) -> None:
"""Charge le cache depuis le disque."""
if not self._path.exists():
logger.info("Cache Ollama : nouveau cache (%s)", self._path)
return
try:
raw = json.loads(self._path.read_text(encoding="utf-8"))
if raw.get("model") != self._model:
logger.info(
"Cache Ollama : modèle changé (%s%s), cache invalidé",
raw.get("model"), self._model,
)
return
self._data = raw.get("entries", {})
logger.info("Cache Ollama : %d entrées chargées", len(self._data))
except (json.JSONDecodeError, KeyError) as e:
logger.warning("Cache Ollama : fichier corrompu (%s), réinitialisé", e)
self._data = {}
@staticmethod
def _make_key(texte: str, diag_type: str) -> str:
"""Construit une clé normalisée."""
return f"{diag_type}::{texte.strip().lower()}"
def get(self, texte: str, diag_type: str) -> dict | None:
"""Récupère un résultat caché, ou None si absent."""
key = self._make_key(texte, diag_type)
with self._lock:
return self._data.get(key)
def put(self, texte: str, diag_type: str, result: dict) -> None:
"""Stocke un résultat dans le cache."""
key = self._make_key(texte, diag_type)
with self._lock:
self._data[key] = result
self._dirty = True
def save(self) -> None:
"""Persiste le cache sur disque si modifié."""
with self._lock:
if not self._dirty:
return
self._path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"model": self._model,
"entries": self._data,
}
self._path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
self._dirty = False
logger.info("Cache Ollama : %d entrées sauvegardées", len(self._data))
def __len__(self) -> int:
with self._lock:
return len(self._data)

View File

@@ -0,0 +1,135 @@
"""Client LLM partagé — Ollama (local) avec fallback Anthropic Haiku."""
from __future__ import annotations
import json
import logging
import os
import requests
from ..config import OLLAMA_URL, OLLAMA_MODEL, OLLAMA_TIMEOUT
logger = logging.getLogger(__name__)
# --- Fallback Anthropic ---
_ANTHROPIC_MODEL = os.environ.get("ANTHROPIC_FALLBACK_MODEL", "claude-haiku-4-5-20251001")
_anthropic_client = None
def _get_anthropic_client():
"""Lazy-init du client Anthropic (uniquement si clé API présente)."""
global _anthropic_client
if _anthropic_client is not None:
return _anthropic_client
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
return None
try:
import anthropic
_anthropic_client = anthropic.Anthropic(api_key=api_key)
return _anthropic_client
except Exception as e:
logger.warning("Anthropic SDK non disponible : %s", e)
return None
def call_anthropic(
prompt: str,
temperature: float = 0.1,
max_tokens: int = 2500,
) -> dict | None:
"""Appelle l'API Anthropic (Haiku)."""
client = _get_anthropic_client()
if client is None:
return None
try:
response = client.messages.create(
model=_ANTHROPIC_MODEL,
max_tokens=max_tokens,
temperature=temperature,
messages=[{"role": "user", "content": prompt}],
)
raw = response.content[0].text
result = parse_json_response(raw)
if result is not None:
logger.debug("Anthropic fallback OK (%s)", _ANTHROPIC_MODEL)
return result
except Exception as e:
logger.warning("Anthropic fallback erreur : %s", e)
return None
def parse_json_response(raw: str) -> dict | None:
"""Parse une réponse JSON, en gérant les blocs markdown."""
text = raw.strip()
if text.startswith("```"):
first_nl = text.find("\n")
if first_nl != -1:
text = text[first_nl + 1:]
if text.rstrip().endswith("```"):
text = text.rstrip()[:-3]
text = text.strip()
try:
return json.loads(text)
except json.JSONDecodeError:
logger.warning("LLM : JSON invalide : %s", raw[:200])
return None
def call_ollama(
prompt: str,
temperature: float = 0.1,
max_tokens: int = 2500,
model: str | None = None,
timeout: int | None = None,
) -> dict | None:
"""Appelle Ollama en mode JSON natif, avec fallback Anthropic si indisponible.
Args:
prompt: Le prompt à envoyer.
temperature: Température de génération (défaut: 0.1).
max_tokens: Nombre max de tokens (défaut: 2500).
model: Modèle Ollama à utiliser (défaut: OLLAMA_MODEL global).
timeout: Timeout en secondes (défaut: OLLAMA_TIMEOUT global).
Returns:
Le dict JSON parsé, ou None en cas d'erreur.
"""
use_model = model or OLLAMA_MODEL
use_timeout = timeout or OLLAMA_TIMEOUT
for attempt in range(2):
try:
response = requests.post(
f"{OLLAMA_URL}/api/generate",
json={
"model": use_model,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
},
timeout=use_timeout,
)
response.raise_for_status()
raw = response.json().get("response", "")
result = parse_json_response(raw)
if result is not None:
return result
if attempt == 0:
logger.info("Ollama (%s) : retry après échec de parsing", use_model)
except requests.ConnectionError:
logger.info("Ollama indisponible → fallback Anthropic (%s)", _ANTHROPIC_MODEL)
return call_anthropic(prompt, temperature, max_tokens)
except requests.Timeout:
logger.warning("Ollama (%s) timeout après %ds → fallback Anthropic",
use_model, use_timeout)
return call_anthropic(prompt, temperature, max_tokens)
except (requests.RequestException, json.JSONDecodeError) as e:
logger.warning("Ollama erreur : %s", e)
return None
return None

View File

@@ -0,0 +1,725 @@
"""Indexation FAISS des documents de référence.
Objectif : éviter que des documents "procédure/méthodo" influencent le codage.
On maintient donc 2 index FAISS :
- ref : référentiels (CIM-10, CCAM, référentiels uploadés en ref:...)
- proc : procédures / guide méthodologique (guide_methodo + uploadés en proc:...)
Backwards compat : si les nouveaux fichiers n'existent pas, on retombe sur faiss.index.
"""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Optional
import pdfplumber
from ..config import RAG_INDEX_DIR, CIM10_PDF, GUIDE_METHODO_PDF, CCAM_PDF, CCAM_DICT_PATH, REFERENTIELS_DIR, EMBEDDING_MODEL
logger = logging.getLogger(__name__)
# Singletons pour les index chargés en mémoire
_loaded: dict[str, tuple] = {}
@dataclass
class Chunk:
text: str
document: str # "cim10", "guide_methodo", "ccam"
page: Optional[int] = None
code: Optional[str] = None
def _paths(kind: str) -> tuple[Path, Path]:
"""Retourne (index_path, meta_path) pour un type d'index.
kind:
- "ref" : référentiels
- "proc" : procédures
- "all" : legacy (faiss.index)
"""
kind = (kind or "ref").lower()
if kind == "proc":
return (RAG_INDEX_DIR / "faiss_proc.index", RAG_INDEX_DIR / "metadata_proc.json")
if kind == "all":
return (RAG_INDEX_DIR / "faiss.index", RAG_INDEX_DIR / "metadata.json")
# ref (default)
return (RAG_INDEX_DIR / "faiss_ref.index", RAG_INDEX_DIR / "metadata_ref.json")
def _kind_for_chunk(chunk: Chunk) -> str:
"""Détermine le type d'index cible pour un chunk."""
doc = (chunk.document or "").lower()
if doc == "guide_methodo" or doc.startswith("proc:"):
return "proc"
return "ref"
# ---------------------------------------------------------------------------
# Chunking CIM-10
# ---------------------------------------------------------------------------
def _chunk_cim10(pdf_path: Path) -> list[Chunk]:
"""Découpe le PDF CIM-10 en double chunking : sous-codes individuels + parents 3-char."""
chunks: list[Chunk] = []
current_code3: str | None = None
current_code3_text: list[str] = []
current_code3_page: int | None = None
# Sous-codes en cours d'accumulation
current_subcode: str | None = None
current_subcode_text: list[str] = []
current_subcode_page: int | None = None
code3_pattern = re.compile(r"^([A-Z]\d{2})\s+(.+)")
subcode_pattern = re.compile(r"^([A-Z]\d{2}\.\d+)\s+(.+)")
logger.info("Extraction des chunks CIM-10 (double chunking) depuis %s", pdf_path.name)
def _flush_subcode():
"""Sauvegarde le chunk sous-code en cours."""
if current_subcode and current_subcode_text:
chunk_text = "\n".join(current_subcode_text)
if len(chunk_text.split()) >= 3:
chunks.append(Chunk(
text=chunk_text,
document="cim10",
page=current_subcode_page,
code=current_subcode,
))
def _flush_code3():
"""Sauvegarde le chunk parent 3-char en cours."""
_flush_subcode()
if current_code3 and current_code3_text:
chunk_text = "\n".join(current_code3_text)
if len(chunk_text.split()) >= 5:
chunks.append(Chunk(
text=chunk_text,
document="cim10",
page=current_code3_page,
code=current_code3,
))
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m_sub = subcode_pattern.match(line)
m3 = code3_pattern.match(line)
if m_sub:
# Nouveau sous-code → flush le sous-code précédent
_flush_subcode()
current_subcode = m_sub.group(1)
current_subcode_text = [line]
current_subcode_page = page_num
# Ajouter aussi au chunk parent
if current_code3:
current_code3_text.append(line)
elif m3 and not m_sub:
# Nouveau code 3-char → flush tout le bloc précédent
_flush_code3()
current_code3 = m3.group(1)
current_code3_text = [line]
current_code3_page = page_num
current_subcode = None
current_subcode_text = []
current_subcode_page = None
else:
# Ligne de continuation
if current_subcode:
current_subcode_text.append(line)
if current_code3:
current_code3_text.append(line)
# Flush final
_flush_code3()
logger.info("CIM-10 : %d chunks extraits (double chunking sous-codes + parents)", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking Guide Méthodologique MCO
# ---------------------------------------------------------------------------
def _chunk_guide_methodo(pdf_path: Path) -> list[Chunk]:
"""Découpe le Guide Méthodologique MCO par sections/titres."""
chunks: list[Chunk] = []
current_title: str | None = None
current_text: list[str] = []
current_page: int | None = None
# Patterns de titres de sections (chapitres, sous-chapitres)
title_patterns = [
re.compile(r"^((?:CHAPITRE|TITRE|PARTIE)\s+[IVXLCDM0-9]+.*)$", re.IGNORECASE),
re.compile(r"^(\d+\.\d*\s+[A-ZÉÈÊÀÂÔÙÛÜ].{5,})$"),
re.compile(r"^([A-ZÉÈÊÀÂÔÙÛÜ][A-ZÉÈÊÀÂÔÙÛÜ\s]{10,})$"),
]
logger.info("Extraction des chunks Guide Métho depuis %s", pdf_path.name)
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
is_title = False
for pat in title_patterns:
if pat.match(line):
is_title = True
break
if is_title and len(line) > 8:
# Sauvegarder le chunk précédent
if current_title and current_text:
chunk_text = current_title + "\n" + "\n".join(current_text)
if len(chunk_text.split()) >= 20:
chunks.append(Chunk(
text=chunk_text,
document="guide_methodo",
page=current_page,
))
current_title = line
current_text = []
current_page = page_num
else:
current_text.append(line)
# Dernier chunk
if current_title and current_text:
chunk_text = current_title + "\n" + "\n".join(current_text)
if len(chunk_text.split()) >= 20:
chunks.append(Chunk(
text=chunk_text,
document="guide_methodo",
page=current_page,
))
# Si trop peu de chunks (le PDF ne suit pas les patterns de titre),
# fallback : découper par pages groupées par 3
if len(chunks) < 10:
logger.info("Guide Métho : fallback découpe par pages (peu de titres détectés)")
chunks = []
with pdfplumber.open(pdf_path) as pdf:
page_texts: list[str] = []
start_page = 1
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text:
page_texts.append(text)
if len(page_texts) >= 3:
combined = "\n".join(page_texts)
if len(combined.split()) >= 20:
chunks.append(Chunk(
text=combined,
document="guide_methodo",
page=start_page,
))
page_texts = []
start_page = page_num + 1
if page_texts:
combined = "\n".join(page_texts)
if len(combined.split()) >= 20:
chunks.append(Chunk(
text=combined,
document="guide_methodo",
page=start_page,
))
logger.info("Guide Métho : %d chunks extraits", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking CCAM
# ---------------------------------------------------------------------------
def _chunk_ccam(pdf_path: Path) -> list[Chunk]:
"""Découpe le PDF CCAM en chunks par code d'acte."""
chunks: list[Chunk] = []
ccam_pattern = re.compile(r"([A-Z]{4}\d{3})\s+(.*)")
logger.info("Extraction des chunks CCAM depuis %s", pdf_path.name)
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
current_code: str | None = None
current_lines: list[str] = []
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m = ccam_pattern.match(line)
if m:
if current_code and current_lines:
chunks.append(Chunk(
text="\n".join(current_lines),
document="ccam",
page=page_num,
code=current_code,
))
current_code = m.group(1)
current_lines = [line]
elif current_code:
current_lines.append(line)
if current_code and current_lines:
chunks.append(Chunk(
text="\n".join(current_lines),
document="ccam",
page=page_num,
code=current_code,
))
# Fallback : si aucun code CCAM détecté, indexer par page
if not chunks:
logger.info("CCAM : aucun code détecté, fallback par page")
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text and len(text.split()) >= 10:
chunks.append(Chunk(
text=text,
document="ccam",
page=page_num,
))
logger.info("CCAM : %d chunks extraits", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Chunking CCAM depuis le dictionnaire JSON
# ---------------------------------------------------------------------------
def _chunk_ccam_from_dict() -> list[Chunk]:
"""Génère des chunks CCAM depuis ccam_dict.json (un chunk par code+description).
Prioritaire sur les chunks PDF si le dictionnaire existe.
"""
if not CCAM_DICT_PATH.exists():
return []
import json as _json
with open(CCAM_DICT_PATH, encoding="utf-8") as f:
ccam_dict = _json.load(f)
chunks: list[Chunk] = []
for code, info in ccam_dict.items():
desc = info.get("description", "") if isinstance(info, dict) else str(info)
if not desc:
continue
regroupement = info.get("regroupement", "") if isinstance(info, dict) else ""
tarif = info.get("tarif_s1") if isinstance(info, dict) else None
text_parts = [f"{code} {desc}"]
if regroupement:
text_parts.append(f"Regroupement: {regroupement}")
if tarif is not None:
text_parts.append(f"Tarif S1: {tarif}")
chunks.append(Chunk(
text="\n".join(text_parts),
document="ccam",
code=code,
))
logger.info("CCAM dict : %d chunks générés depuis %s", len(chunks), CCAM_DICT_PATH)
return chunks
# ---------------------------------------------------------------------------
# Chunking CIM-10 Index Alphabétique
# ---------------------------------------------------------------------------
def _chunk_cim10_alpha(pdf_path: Path) -> list[Chunk]:
"""Parse la section INDEX ALPHABÉTIQUE du PDF CIM-10.
Détecte les entrées de type "terme → code" et génère des chunks
avec document="cim10_alpha".
"""
chunks: list[Chunk] = []
# Pattern : ligne avec un terme suivi d'un code CIM-10 en fin de ligne
entry_pattern = re.compile(r"^(.+?)\s+([A-Z]\d{2}(?:\.\d+)?)\s*$")
logger.info("Extraction de l'index alphabétique CIM-10 depuis %s", pdf_path.name)
in_alpha_section = False
with pdfplumber.open(pdf_path) as pdf:
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if not text:
continue
# Détecter le début de la section index alphabétique
text_upper = text.upper()
if "INDEX ALPHAB" in text_upper:
in_alpha_section = True
# Certaines pages avant l'index : ne pas parser
if not in_alpha_section:
continue
for line in text.split("\n"):
line = line.strip()
if not line:
continue
m = entry_pattern.match(line)
if m:
terme = m.group(1).strip()
code = m.group(2)
if len(terme) >= 3:
chunks.append(Chunk(
text=f"{terme}{code}",
document="cim10_alpha",
page=page_num,
code=code,
))
logger.info("CIM-10 index alphabétique : %d entrées extraites", len(chunks))
return chunks
# ---------------------------------------------------------------------------
# Construction de l'index FAISS
# ---------------------------------------------------------------------------
def build_index(force: bool = False) -> None:
"""Construit les index FAISS à partir des PDFs de référence.
- ref : CIM-10 (+ index alpha) + CCAM
- proc : Guide méthodologique
Args:
force: Si True, reconstruit même si l'index existe déjà.
"""
import faiss
import numpy as np
from sentence_transformers import SentenceTransformer
ref_index_path, ref_meta_path = _paths("ref")
proc_index_path, proc_meta_path = _paths("proc")
# Si tout existe déjà et pas de force
ref_ok = ref_index_path.exists() and ref_meta_path.exists()
proc_ok = proc_index_path.exists() and proc_meta_path.exists()
guide_expected = GUIDE_METHODO_PDF.exists()
if not force and ref_ok and ((not guide_expected) or proc_ok):
logger.info("Index FAISS déjà existants dans %s (use force=True pour reconstruire)", RAG_INDEX_DIR)
return
# Collecter les chunks
ref_chunks: list[Chunk] = []
proc_chunks: list[Chunk] = []
# CIM-10 (référentiel)
if CIM10_PDF.exists():
ref_chunks.extend(_chunk_cim10(CIM10_PDF))
ref_chunks.extend(_chunk_cim10_alpha(CIM10_PDF))
else:
logger.warning("PDF non trouvé : %s", CIM10_PDF)
# Guide méthodologique (procédures)
if GUIDE_METHODO_PDF.exists():
proc_chunks.extend(_chunk_guide_methodo(GUIDE_METHODO_PDF))
else:
logger.warning("PDF non trouvé : %s", GUIDE_METHODO_PDF)
# CCAM (référentiel)
ccam_dict_chunks = _chunk_ccam_from_dict()
if ccam_dict_chunks:
ref_chunks.extend(ccam_dict_chunks)
elif CCAM_PDF.exists():
ref_chunks.extend(_chunk_ccam(CCAM_PDF))
else:
logger.warning("Ni dictionnaire CCAM ni PDF CCAM trouvé")
if not ref_chunks and not proc_chunks:
logger.error("Aucun chunk extrait — vérifiez les chemins des PDFs")
return
logger.info("Total ref : %d chunks | total proc : %d chunks", len(ref_chunks), len(proc_chunks))
# Embeddings — GPU si disponible
import torch
_device = "cuda" if torch.cuda.is_available() else "cpu"
logger.info("Chargement du modèle d'embedding %s (%s)...", EMBEDDING_MODEL, _device)
model = SentenceTransformer(EMBEDDING_MODEL, device=_device)
model.max_seq_length = 512 # CamemBERT max position embeddings
def _write_index(chunks: list[Chunk], idx_path: Path, meta_path: Path, label: str) -> None:
if not chunks:
return
texts = [c.text[:2000] for c in chunks]
logger.info("Calcul des embeddings (%s) pour %d chunks...", label, len(texts))
embeddings = model.encode(texts, show_progress_bar=True, normalize_embeddings=True, batch_size=64)
embeddings = np.array(embeddings, dtype=np.float32)
dim = embeddings.shape[1]
index = faiss.IndexFlatIP(dim)
index.add(embeddings)
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(index, str(idx_path))
metadata = [asdict(c) for c in chunks]
for m in metadata:
m["extrait"] = m.pop("text")[:800]
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
logger.info("Index FAISS sauvegardé (%s) : %s (%d vecteurs, dim=%d)", label, idx_path, len(chunks), dim)
_write_index(ref_chunks, ref_index_path, ref_meta_path, "ref")
_write_index(proc_chunks, proc_index_path, proc_meta_path, "proc")
# Invalider les singletons
reset_index()
def get_index(kind: str = "ref") -> tuple | None:
"""Charge un index FAISS et ses métadonnées (singleton lazy-loaded).
Args:
kind: "ref" | "proc" | "all".
Returns:
Tuple (faiss_index, metadata_list) ou None si l'index n'existe pas.
"""
kind = (kind or "ref").lower()
if kind in _loaded:
return _loaded[kind]
index_path, meta_path = _paths(kind)
# Backwards compat : si ref/proc absent, fallback sur all
if kind in ("ref", "proc") and (not index_path.exists() or not meta_path.exists()):
legacy_idx, legacy_meta = _paths("all")
if legacy_idx.exists() and legacy_meta.exists():
logger.warning("Index %s absent — fallback legacy faiss.index", kind)
index_path, meta_path = legacy_idx, legacy_meta
else:
logger.warning("Index FAISS non trouvé dans %s — lancez build_index() d'abord", RAG_INDEX_DIR)
return None
if not index_path.exists() or not meta_path.exists():
logger.warning("Index FAISS non trouvé (%s) dans %s — lancez build_index() d'abord", kind, RAG_INDEX_DIR)
return None
import faiss
faiss_index = faiss.read_index(str(index_path))
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
logger.info("Index FAISS chargé (%s) : %d vecteurs", kind, faiss_index.ntotal)
_loaded[kind] = (faiss_index, metadata)
return _loaded[kind]
# ---------------------------------------------------------------------------
# Chunking générique pour fichiers utilisateur (référentiels)
# ---------------------------------------------------------------------------
def chunk_user_file(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier utilisateur en chunks pour indexation FAISS.
Dispatch selon l'extension :
- PDF : pages groupées par 2
- CSV/Excel : une ligne = un chunk
- TXT : paragraphes (blocs séparés par lignes vides)
Args:
file_path: Chemin du fichier.
doc_name: Nom du document (utilisé comme identifiant dans les métadonnées).
Returns:
Liste de Chunk prêts pour l'indexation.
"""
suffix = file_path.suffix.lower()
if suffix == ".pdf":
return _chunk_user_pdf(file_path, doc_name)
elif suffix in (".csv", ".xlsx", ".xls"):
return _chunk_user_tabular(file_path, doc_name)
elif suffix == ".txt":
return _chunk_user_txt(file_path, doc_name)
else:
logger.warning("Extension non supportée pour chunking : %s", suffix)
return []
def _chunk_user_pdf(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un PDF utilisateur en chunks de 2 pages."""
chunks: list[Chunk] = []
try:
with pdfplumber.open(file_path) as pdf:
page_texts: list[str] = []
start_page = 1
for page_num, page in enumerate(pdf.pages, start=1):
text = page.extract_text()
if text:
page_texts.append(text)
if len(page_texts) >= 2:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
page_texts = []
start_page = page_num + 1
if page_texts:
combined = "\n".join(page_texts)
if len(combined.split()) >= 10:
chunks.append(Chunk(
text=combined,
document=doc_name,
page=start_page,
))
except Exception:
logger.warning("Erreur lors du chunking PDF %s", file_path, exc_info=True)
logger.info("Référentiel PDF %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_tabular(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un CSV/Excel : une ligne = un chunk."""
chunks: list[Chunk] = []
try:
import pandas as pd
suffix = file_path.suffix.lower()
if suffix == ".csv":
df = pd.read_csv(file_path, encoding="utf-8", on_bad_lines="skip")
else:
df = pd.read_excel(file_path)
for idx, row in df.iterrows():
text = " | ".join(str(v) for v in row.values if pd.notna(v))
if len(text.split()) >= 3:
chunks.append(Chunk(
text=text,
document=doc_name,
page=int(idx) + 1,
))
except Exception:
logger.warning("Erreur lors du chunking tabular %s", file_path, exc_info=True)
logger.info("Référentiel tabular %s : %d chunks", doc_name, len(chunks))
return chunks
def _chunk_user_txt(file_path: Path, doc_name: str) -> list[Chunk]:
"""Découpe un fichier TXT en paragraphes (blocs séparés par lignes vides)."""
chunks: list[Chunk] = []
try:
text = file_path.read_text(encoding="utf-8")
paragraphs = re.split(r"\n\s*\n", text)
for i, para in enumerate(paragraphs):
para = para.strip()
if len(para.split()) >= 5:
chunks.append(Chunk(
text=para,
document=doc_name,
page=i + 1,
))
except Exception:
logger.warning("Erreur lors du chunking TXT %s", file_path, exc_info=True)
logger.info("Référentiel TXT %s : %d chunks", doc_name, len(chunks))
return chunks
def add_chunks_to_index(chunks: list[Chunk]) -> int:
"""Ajoute des chunks à l'index FAISS existant (incrémental).
Charge l'index si nécessaire, encode les chunks, ajoute les vecteurs,
et sauvegarde le tout.
Args:
chunks: Liste de Chunk à ajouter.
Returns:
Nombre de chunks effectivement ajoutés.
"""
if not chunks:
return 0
import faiss
import numpy as np
from .rag_search import _get_embed_model
# Dans 99% des cas, on veut éviter de mélanger : on route vers ref/proc selon le préfixe.
# Si l'appelant veut forcer, il peut passer des chunks avec document="proc:...".
kind = _kind_for_chunk(chunks[0])
index_path, meta_path = _paths(kind)
# Backwards compat : si on n'a que l'ancien index, on l'utilise.
if not index_path.exists() or not meta_path.exists():
legacy_idx, legacy_meta = _paths("all")
if legacy_idx.exists() and legacy_meta.exists():
index_path, meta_path = legacy_idx, legacy_meta
# Charger l'index existant ou en créer un nouveau
if index_path.exists() and meta_path.exists():
faiss_idx = faiss.read_index(str(index_path))
metadata = json.loads(meta_path.read_text(encoding="utf-8"))
else:
model = _get_embed_model()
# Obtenir la dimension via un encodage test
test_vec = model.encode(["test"], normalize_embeddings=True)
dim = test_vec.shape[1]
faiss_idx = faiss.IndexFlatIP(dim)
metadata = []
# Encoder les nouveaux chunks
model = _get_embed_model()
texts = [c.text[:2000] for c in chunks]
embeddings = model.encode(texts, normalize_embeddings=True, batch_size=64)
embeddings = np.array(embeddings, dtype=np.float32)
# Ajouter à l'index
faiss_idx.add(embeddings)
# Ajouter les métadonnées
from dataclasses import asdict
for chunk in chunks:
meta = asdict(chunk)
meta["extrait"] = meta.pop("text")[:800]
metadata.append(meta)
# Sauvegarder
RAG_INDEX_DIR.mkdir(parents=True, exist_ok=True)
faiss.write_index(faiss_idx, str(index_path))
meta_path.write_text(json.dumps(metadata, ensure_ascii=False, indent=2), encoding="utf-8")
# Invalider le singleton pour forcer le rechargement
reset_index()
logger.info("Index FAISS : %d chunks ajoutés (total : %d)", len(chunks), faiss_idx.ntotal)
return len(chunks)
def reset_index() -> None:
"""Invalide les singletons FAISS pour forcer le rechargement au prochain accès."""
_loaded.clear()

View File

@@ -0,0 +1,837 @@
"""Recherche RAG (FAISS) + génération via Ollama pour le codage CIM-10."""
from __future__ import annotations
import logging
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from ..config import (
ActeCCAM, Diagnostic, DossierMedical, PreuveClinique, RAGSource,
OLLAMA_CACHE_PATH, OLLAMA_MAX_PARALLEL, OLLAMA_MODEL,
EMBEDDING_MODEL, RERANKER_MODEL,
)
from .cim10_dict import normalize_code, validate_code as cim10_validate, fallback_parent_code
from .cim10_extractor import BIO_NORMALS
from .clinical_context import build_enriched_context, format_enriched_context
from .ccam_dict import validate_code as ccam_validate
from .ollama_client import call_ollama, parse_json_response
from .ollama_cache import OllamaCache
logger = logging.getLogger(__name__)
# Singleton pour le modèle d'embedding (chargé une seule fois)
_embed_model = None
_embed_lock = threading.Lock()
_embed_failed = False # Sentinelle pour éviter les retries infinis
# Singleton pour le cross-encoder de re-ranking (CPU uniquement)
_reranker_model = None
# Score minimum de similarité FAISS pour retenir un résultat
_MIN_SCORE = 0.3
# Seuil rehaussé pour le contexte CPAM (filtrage plus agressif du bruit)
_MIN_SCORE_CPAM = 0.40
def _get_embed_model():
"""Charge le modèle d'embedding (singleton thread-safe).
Tente CUDA d'abord, fallback CPU si OOM (Ollama peut occuper la VRAM).
low_cpu_mem_usage=False évite les meta tensors (accelerate + sentence-transformers 5.x).
Un Lock empêche les chargements concurrents depuis le ThreadPool.
"""
global _embed_model, _embed_failed
if _embed_model is not None:
return _embed_model
if _embed_failed:
raise RuntimeError("Modèle d'embedding indisponible (échec précédent)")
with _embed_lock:
# Double-check après acquisition du lock
if _embed_model is not None:
return _embed_model
if _embed_failed:
raise RuntimeError("Modèle d'embedding indisponible (échec précédent)")
from sentence_transformers import SentenceTransformer
import torch
_device = "cuda" if torch.cuda.is_available() else "cpu"
_model_kwargs = {"low_cpu_mem_usage": False}
try:
logger.info("Chargement du modèle d'embedding (%s)...", _device)
_embed_model = SentenceTransformer(
EMBEDDING_MODEL, device=_device, model_kwargs=_model_kwargs,
)
except (torch.OutOfMemoryError, torch.cuda.CudaError, torch.AcceleratorError,
RuntimeError, NotImplementedError) as exc:
exc_msg = str(exc).lower()
if _device == "cuda" and ("memory" in exc_msg or "meta tensor" in exc_msg):
logger.warning("CUDA erreur pour l'embedding — fallback CPU : %s", exc)
torch.cuda.empty_cache()
try:
_embed_model = SentenceTransformer(
EMBEDDING_MODEL, device="cpu", model_kwargs=_model_kwargs,
)
except Exception as exc2:
logger.error("Fallback CPU aussi en échec : %s", exc2)
_embed_failed = True
raise
else:
_embed_failed = True
raise
_embed_model.max_seq_length = 512
return _embed_model
def _get_reranker():
"""Charge le cross-encoder de re-ranking (singleton, CPU uniquement).
Forcé sur CPU pour ne pas interférer avec Ollama sur GPU.
"""
global _reranker_model
if _reranker_model is None:
from sentence_transformers import CrossEncoder
logger.info("Chargement du cross-encoder de re-ranking (cpu)...")
_reranker_model = CrossEncoder(RERANKER_MODEL, device="cpu")
return _reranker_model
def _rerank(query: str, results: list[dict], top_k: int) -> list[dict]:
"""Re-classe les résultats FAISS via un cross-encoder.
Args:
query: Texte de la requête originale.
results: Résultats FAISS avec clé 'extrait'.
top_k: Nombre de résultats à retourner.
Returns:
Résultats re-classés par score cross-encoder, limités à top_k.
"""
if not results:
return results
reranker = _get_reranker()
# Construire les paires (query, passage) pour le cross-encoder
pairs = [(query, r.get("extrait", "")) for r in results]
ce_scores = reranker.predict(pairs)
# Injecter le score cross-encoder et trier
for r, ce_score in zip(results, ce_scores):
r["score_faiss"] = r["score"]
r["score"] = float(ce_score)
results.sort(key=lambda r: r["score"], reverse=True)
return results[:top_k]
def search_similar(query: str, top_k: int = 10) -> list[dict]:
"""Recherche les passages les plus similaires dans l'index FAISS.
Args:
query: Texte du diagnostic à rechercher.
top_k: Nombre de résultats à retourner.
Returns:
Liste de dicts avec les métadonnées + score de similarité,
filtrés par score minimum et priorisant les sources CIM-10.
"""
from .rag_index import get_index
import numpy as np
# Codage CIM-10 : on interroge l'index "ref" (pas le guide méthodo).
result = get_index(kind="ref")
if result is None:
logger.warning("Index FAISS non disponible")
return []
faiss_index, metadata = result
model = _get_embed_model()
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
# Chercher plus de résultats que top_k pour pouvoir filtrer ensuite
fetch_k = min(top_k * 2, faiss_index.ntotal)
scores, indices = faiss_index.search(query_vec, fetch_k)
raw_results = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
if float(score) < _MIN_SCORE:
continue
meta = metadata[idx].copy()
meta["score"] = float(score)
raw_results.append(meta)
# Codage : on garde uniquement CIM-10 + index alpha + éventuels référentiels uploadés en ref:...
cim10_results = [r for r in raw_results if r["document"] in ("cim10", "cim10_alpha")]
ref_uploads = [r for r in raw_results if str(r.get("document", "")).startswith("ref:")]
# Ne pas laisser les procédures/méthodo contaminer la sélection.
other_results = ref_uploads
min_cim10 = min(6, len(cim10_results))
final = cim10_results[:min_cim10]
remaining_slots = top_k - len(final)
# Remplir le reste avec les meilleurs résultats (CIM-10 restants + autres)
remaining = cim10_results[min_cim10:] + other_results
remaining.sort(key=lambda r: r["score"], reverse=True)
final.extend(remaining[:remaining_slots])
return final
def search_similar_ccam(query: str, top_k: int = 8) -> list[dict]:
"""Recherche les passages CCAM les plus similaires dans l'index FAISS.
Même logique que search_similar() mais priorise les sources CCAM.
"""
from .rag_index import get_index
import numpy as np
# CCAM : index "ref".
result = get_index(kind="ref")
if result is None:
logger.warning("Index FAISS non disponible")
return []
faiss_index, metadata = result
model = _get_embed_model()
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
fetch_k = min(top_k * 2, faiss_index.ntotal)
scores, indices = faiss_index.search(query_vec, fetch_k)
raw_results = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
if float(score) < _MIN_SCORE:
continue
meta = metadata[idx].copy()
meta["score"] = float(score)
raw_results.append(meta)
# Prioriser les sources CCAM (au moins 5 sur top_k)
ccam_results = [r for r in raw_results if r["document"] == "ccam"]
other_results = [r for r in raw_results if r["document"] != "ccam"]
min_ccam = min(5, len(ccam_results))
final = ccam_results[:min_ccam]
remaining_slots = top_k - len(final)
remaining = ccam_results[min_ccam:] + other_results
remaining.sort(key=lambda r: r["score"], reverse=True)
final.extend(remaining[:remaining_slots])
return final
def search_similar_cpam(query: str, top_k: int = 8) -> list[dict]:
"""Recherche RAG spécifique au contexte CPAM (contre-argumentation).
Différences avec search_similar() :
- Priorité Guide Méthodologique (min 3 résultats) plutôt que CIM-10
- Seuil de score rehaussé (0.40 vs 0.30) pour éliminer le bruit
- Fetch élargi (top_k * 3) car filtrage plus agressif
- Déduplication par code CIM-10 (garde le meilleur score par code)
"""
from .rag_index import get_index
import numpy as np
# Contexte CPAM : on veut des procédures (guide) + définitions référentielles (CIM-10).
proc = get_index(kind="proc")
ref = get_index(kind="ref")
if proc is None and ref is None:
logger.warning("Index FAISS non disponible")
return []
model = _get_embed_model()
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
def _search_one(result_tuple, fetch_mult: int) -> list[dict]:
if result_tuple is None:
return []
faiss_index, metadata = result_tuple
fetch_k = min(top_k * fetch_mult, faiss_index.ntotal)
scores, indices = faiss_index.search(query_vec, fetch_k)
out = []
for score, idx in zip(scores[0], indices[0]):
if idx < 0:
continue
if float(score) < _MIN_SCORE_CPAM:
continue
meta = metadata[idx].copy()
meta["score"] = float(score)
out.append(meta)
return out
raw_proc = _search_one(proc, fetch_mult=3)
raw_ref = _search_one(ref, fetch_mult=3)
# Filtrer clairement :
# - proc : guide_methodo + uploads proc:
raw_proc = [r for r in raw_proc if r.get("document") == "guide_methodo" or str(r.get("document", "")).startswith("proc:")]
# - ref : CIM-10 + index alpha + uploads ref:
raw_ref = [r for r in raw_ref if r.get("document") in ("cim10", "cim10_alpha") or str(r.get("document", "")).startswith("ref:")]
raw_results = raw_proc + raw_ref
# Dédupliquer par code CIM-10 (garder meilleur score par code)
seen_codes: dict[str, dict] = {}
deduped = []
for r in raw_results:
code = r.get("code")
if code:
if code in seen_codes:
if r["score"] > seen_codes[code]["score"]:
seen_codes[code] = r
else:
seen_codes[code] = r
else:
deduped.append(r) # pas de code → garder (guide_methodo, etc.)
deduped.extend(seen_codes.values())
deduped.sort(key=lambda r: r["score"], reverse=True)
# Re-ranking cross-encoder (CPU) pour affiner le classement
reranked = _rerank(query, deduped, top_k=len(deduped))
# Prioriser le Guide Méthodologique (min 3 résultats)
guide_results = [r for r in reranked if r.get("document") == "guide_methodo" or str(r.get("document", "")).startswith("proc:")]
other_results = [
r for r in reranked
if not (r.get("document") == "guide_methodo" or str(r.get("document", "")).startswith("proc:"))
]
min_guide = min(3, len(guide_results))
final = guide_results[:min_guide]
remaining_slots = top_k - len(final)
remaining = guide_results[min_guide:] + other_results
remaining.sort(key=lambda r: r["score"], reverse=True)
final.extend(remaining[:remaining_slots])
return final
def _format_contexte(contexte: dict) -> str:
"""Formate le contexte patient de manière structurée pour le prompt."""
lines = []
sexe = contexte.get("sexe")
age = contexte.get("age")
imc = contexte.get("imc")
patient_parts = []
if sexe:
patient_parts.append(sexe)
if age:
patient_parts.append(f"{age} ans")
if imc:
patient_parts.append(f"IMC {imc}")
if patient_parts:
lines.append(f"- Patient : {', '.join(patient_parts)}")
duree = contexte.get("duree_sejour")
if duree:
lines.append(f"- Durée séjour : {duree} jours")
antecedents = contexte.get("antecedents")
if antecedents:
lines.append(f"- Antécédents : {', '.join(antecedents[:5])}")
biologie = contexte.get("biologie_cle")
if biologie:
bio_parts = []
for b in biologie:
test, valeur, anomalie = b if isinstance(b, (list, tuple)) else (b.get("test"), b.get("valeur"), b.get("anomalie"))
# Ajouter la plage de référence si connue
norme_str = ""
if test in BIO_NORMALS:
lo, hi = BIO_NORMALS[test]
lo_s = int(lo) if lo == int(lo) else lo
hi_s = int(hi) if hi == int(hi) else hi
norme_str = f" [N: {lo_s}-{hi_s}]"
marker = " (\u2191)" if anomalie else ""
bio_parts.append(f"{test} {valeur}{norme_str}{marker}")
lines.append(f"- Biologie : {', '.join(bio_parts)}")
imagerie = contexte.get("imagerie")
if imagerie:
for img in imagerie:
img_type, conclusion = img if isinstance(img, (list, tuple)) else (img.get("type"), img.get("conclusion"))
if conclusion:
lines.append(f"- Imagerie : {img_type}{conclusion[:200]}")
complications = contexte.get("complications")
if complications:
lines.append(f"- Complications : {', '.join(complications)}")
dp_texte = contexte.get("dp_texte")
if dp_texte:
lines.append(f"- DP du séjour : {dp_texte}")
das_codes = contexte.get("das_codes_existants")
if das_codes:
lines.append(f"- DAS déjà codés : {', '.join(das_codes)}")
return "\n".join(lines) if lines else "Non précisé"
def _build_prompt(texte: str, sources: list[dict], contexte: dict, est_dp: bool = True) -> str:
"""Construit le prompt expert DIM avec raisonnement structuré."""
sources_text = ""
for i, src in enumerate(sources, 1):
doc_raw = str(src.get("document", ""))
if doc_raw.startswith("ref:"):
doc_name = f"Référentiel uploadé : {doc_raw[4:]}"
elif doc_raw.startswith("proc:"):
doc_name = f"Procédure uploadée : {doc_raw[5:]}"
else:
doc_name = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(doc_raw, doc_raw)
code_info = f" (code: {src['code']})" if src.get("code") else ""
page_info = f" [page {src['page']}]" if src.get("page") else ""
sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n"
sources_text += (src.get("extrait", "")[:800]) + "\n\n"
type_diag = "DP (diagnostic principal)" if est_dp else "DAS (diagnostic associé significatif)"
ctx_str = format_enriched_context(contexte)
return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI.
Tu dois coder le diagnostic suivant en respectant STRICTEMENT les règles de l'ATIH.
RÈGLES IMPÉRATIVES :
- Le code doit provenir UNIQUEMENT des sources CIM-10 fournies
- Distingue la DESCRIPTION CLINIQUE (ce que le médecin écrit) de la LOGIQUE DE CODAGE (ce que l'ATIH impose)
- Privilégie le code le plus SPÉCIFIQUE disponible (4e ou 5e caractère)
- Vérifie les notes d'inclusion/exclusion de chaque code candidat
- Si le diagnostic est un DP, il doit refléter le motif principal de prise en charge du séjour
- Si c'est un DAS, il doit avoir mobilisé des ressources supplémentaires pendant le séjour
- EXCLUSION SYMPTÔME : Si le diagnostic est un symptôme (R00-R99) et qu'un diagnostic précis (Chapitres I-XIV, A00-N99) expliquant ce symptôme est présent, le symptôme ne doit PAS être codé comme DAS
DIAGNOSTIC À CODER : "{texte}"
TYPE : {type_diag}
CONTEXTE CLINIQUE :
{ctx_str}
SOURCES DE RÉFÉRENCE :
{sources_text}
Réponds UNIQUEMENT avec un objet JSON au format suivant, sans aucun texte avant ou après :
{{
"analyse_clinique": "que signifie ce diagnostic sur le plan médical",
"codes_candidats": "quels codes CIM-10 des sources sont compatibles",
"discrimination": "pourquoi choisir ce code plutôt qu'un autre (inclusions/exclusions, spécificité)",
"regle_pmsi": "conformité aux règles PMSI pour un {type_diag} (guide méthodologique)",
"code": "X99.9",
"confidence": "high ou medium ou low",
"justification": "explication courte en français",
"preuves_cliniques": [
{{"type": "biologie|imagerie|traitement|acte|clinique", "element": "élément concret du dossier", "interpretation": "signification clinique justifiant le code"}}
]
}}"""
def _build_prompt_ccam(texte: str, sources: list[dict], contexte: dict) -> str:
"""Construit le prompt expert DIM pour le codage CCAM avec raisonnement structuré."""
sources_text = ""
for i, src in enumerate(sources, 1):
doc_raw = str(src.get("document", ""))
if doc_raw.startswith("ref:"):
doc_name = f"Référentiel uploadé : {doc_raw[4:]}"
elif doc_raw.startswith("proc:"):
doc_name = f"Procédure uploadée : {doc_raw[5:]}"
else:
doc_name = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(doc_raw, doc_raw)
code_info = f" (code: {src['code']})" if src.get("code") else ""
page_info = f" [page {src['page']}]" if src.get("page") else ""
sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n"
sources_text += (src.get("extrait", "")[:800]) + "\n\n"
ctx_str = format_enriched_context(contexte)
return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage CCAM PMSI.
Tu dois coder l'acte chirurgical/médical suivant en respectant STRICTEMENT la nomenclature CCAM.
RÈGLES IMPÉRATIVES :
- Le code doit provenir UNIQUEMENT des sources CCAM fournies
- Un code CCAM est composé de 4 lettres + 3 chiffres (ex: HMFC004)
- Vérifie l'activité (1=acte technique, 4=anesthésie) et le regroupement
- Tiens compte du tarif secteur 1 pour valider la cohérence
- Si plusieurs codes sont possibles, choisis le plus spécifique à l'acte décrit
- En cas de doute, indique confidence "low" plutôt que de proposer un code inadapté
ACTE À CODER : "{texte}"
CONTEXTE CLINIQUE :
{ctx_str}
SOURCES CCAM :
{sources_text}
Réponds UNIQUEMENT avec un objet JSON au format suivant, sans aucun texte avant ou après :
{{
"analyse_acte": "que décrit cet acte sur le plan technique/chirurgical",
"codes_candidats": "quels codes CCAM des sources sont compatibles",
"discrimination": "pourquoi choisir ce code plutôt qu'un autre (activité, regroupement, tarif)",
"code": "ABCD123",
"confidence": "high ou medium ou low",
"justification": "explication courte en français"
}}"""
def _parse_ollama_response(raw: str) -> dict | None:
"""Parse la réponse JSON d'Ollama et reconstitue le raisonnement structuré."""
parsed = parse_json_response(raw)
if parsed is None:
return None
# Reconstituer le raisonnement à partir des champs structurés
reasoning_parts = []
for key in ("analyse_clinique", "analyse_acte", "codes_candidats", "discrimination", "regle_pmsi"):
val = parsed.pop(key, None)
if val:
titre = key.replace("_", " ").upper()
reasoning_parts.append(f"{titre} :\n{val}")
if reasoning_parts:
parsed["raisonnement"] = "\n\n".join(reasoning_parts)
return parsed
def _call_ollama(prompt: str) -> dict | None:
"""Appelle Ollama (mode JSON) et parse la réponse avec reconstitution du raisonnement."""
result = call_ollama(prompt, temperature=0.1, max_tokens=2500)
if result is None:
return None
# Reconstituer le raisonnement structuré
reasoning_parts = []
for key in ("analyse_clinique", "analyse_acte", "codes_candidats", "discrimination", "regle_pmsi"):
val = result.pop(key, None)
if val:
titre = key.replace("_", " ").upper()
reasoning_parts.append(f"{titre} :\n{val}")
if reasoning_parts:
result["raisonnement"] = "\n\n".join(reasoning_parts)
return result
def _apply_llm_result_diagnostic(diagnostic: Diagnostic, llm_result: dict) -> None:
"""Applique un résultat LLM (frais ou caché) à un Diagnostic."""
code = llm_result.get("code")
confidence = llm_result.get("confidence")
justification = llm_result.get("justification")
raisonnement = llm_result.get("raisonnement")
if code:
code = normalize_code(code)
is_valid, _ = cim10_validate(code)
if is_valid:
diagnostic.cim10_suggestion = code
else:
# Tenter fallback vers le code parent (D71.9 → D71)
parent = fallback_parent_code(code)
if parent:
logger.info(
"RAG : code Ollama %s invalide → fallback parent %s pour « %s »",
code, parent, diagnostic.texte,
)
diagnostic.cim10_suggestion = parent
else:
logger.warning(
"RAG : code Ollama %s invalide pour « %s », code ignoré",
code, diagnostic.texte,
)
if confidence in ("high", "medium", "low"):
diagnostic.cim10_confidence = confidence
if justification:
diagnostic.justification = justification
if raisonnement:
diagnostic.raisonnement = raisonnement
# Stocker les preuves cliniques
preuves = llm_result.get("preuves_cliniques", [])
if preuves and isinstance(preuves, list):
for p in preuves:
if isinstance(p, dict) and p.get("element"):
try:
diagnostic.preuves_cliniques.append(PreuveClinique(
type=p.get("type", "clinique"),
element=p["element"],
interpretation=p.get("interpretation", ""),
))
except Exception:
pass
def enrich_diagnostic(
diagnostic: Diagnostic,
contexte: dict,
est_dp: bool = True,
cache: OllamaCache | None = None,
) -> None:
"""Enrichit un Diagnostic avec le RAG (FAISS + Ollama).
Modifie le diagnostic en place. Fallback gracieux si FAISS ou Ollama échouent.
"""
diag_type = "dp" if est_dp else "das"
# 1. Vérifier le cache
cached = cache.get(diagnostic.texte, diag_type) if cache else None
# 2. Recherche FAISS (toujours, pour les sources_rag fraîches)
sources = search_similar(diagnostic.texte, top_k=10)
if not sources:
logger.debug("Aucune source RAG trouvée pour : %s", diagnostic.texte)
return
# 3. Stocker les sources RAG
diagnostic.sources_rag = [
RAGSource(
document=s["document"],
page=s.get("page"),
code=s.get("code"),
extrait=s.get("extrait", "")[:200],
)
for s in sources
]
# 4. Si cache hit, appliquer et court-circuiter Ollama
if cached is not None:
logger.info("Cache hit pour %s : « %s »", diag_type.upper(), diagnostic.texte)
_apply_llm_result_diagnostic(diagnostic, cached)
return
# 5. Appel Ollama pour justification avec raisonnement structuré
prompt = _build_prompt(diagnostic.texte, sources, contexte, est_dp=est_dp)
llm_result = _call_ollama(prompt)
if llm_result:
_apply_llm_result_diagnostic(diagnostic, llm_result)
if cache:
cache.put(diagnostic.texte, diag_type, llm_result)
else:
logger.info("Ollama non disponible — sources FAISS conservées sans justification LLM")
def _apply_llm_result_acte(acte: ActeCCAM, llm_result: dict) -> None:
"""Applique un résultat LLM (frais ou caché) à un ActeCCAM."""
code = llm_result.get("code")
confidence = llm_result.get("confidence")
justification = llm_result.get("justification")
raisonnement = llm_result.get("raisonnement")
if code:
code = code.strip().upper()
is_valid, _ = ccam_validate(code)
if is_valid:
acte.code_ccam_suggestion = code
else:
logger.warning(
"RAG : code CCAM Ollama %s invalide pour « %s », code ignoré",
code, acte.texte,
)
if confidence in ("high", "medium", "low"):
acte.ccam_confidence = confidence
if justification:
acte.justification = justification
if raisonnement:
acte.raisonnement = raisonnement
def enrich_acte(acte: ActeCCAM, contexte: dict, cache: OllamaCache | None = None) -> None:
"""Enrichit un ActeCCAM avec le RAG (FAISS + Ollama).
Modifie l'acte en place. Fallback gracieux si FAISS ou Ollama échouent.
"""
# 1. Vérifier le cache
cached = cache.get(acte.texte, "ccam") if cache else None
# 2. Recherche FAISS (sources CCAM priorisées)
sources = search_similar_ccam(acte.texte, top_k=8)
if not sources:
logger.debug("Aucune source RAG CCAM trouvée pour : %s", acte.texte)
return
# 3. Stocker les sources RAG
acte.sources_rag = [
RAGSource(
document=s["document"],
page=s.get("page"),
code=s.get("code"),
extrait=s.get("extrait", "")[:200],
)
for s in sources
]
# 4. Si cache hit, appliquer et court-circuiter Ollama
if cached is not None:
logger.info("Cache hit pour CCAM : « %s »", acte.texte)
_apply_llm_result_acte(acte, cached)
return
# 5. Appel Ollama pour justification avec raisonnement structuré
prompt = _build_prompt_ccam(acte.texte, sources, contexte)
llm_result = _call_ollama(prompt)
if llm_result:
_apply_llm_result_acte(acte, llm_result)
if cache:
cache.put(acte.texte, "ccam", llm_result)
else:
logger.info("Ollama non disponible — sources FAISS CCAM conservées sans justification LLM")
def _build_prompt_das_extraction(text: str, contexte: dict, existing_das: list[str], dp_texte: str) -> str:
"""Construit le prompt pour l'extraction LLM de DAS supplémentaires."""
ctx_str = format_enriched_context(contexte)
existing_str = "\n".join(f"- {d}" for d in existing_das) if existing_das else "Aucun"
return f"""Tu es un médecin DIM (Département d'Information Médicale) expert en codage PMSI.
Analyse le texte médical suivant et identifie les diagnostics associés significatifs (DAS) qui n'ont PAS encore été codés.
RÈGLES IMPÉRATIVES :
- Un DAS doit avoir mobilisé des ressources supplémentaires pendant le séjour
- Ne PAS proposer de doublons avec les DAS déjà codés ci-dessous
- Ne PAS proposer le diagnostic principal comme DAS
- Ne PAS coder les symptômes (R00-R99) si un diagnostic précis les explique
- Ne PAS coder les antécédents non pertinents pour le séjour
- Privilégie les codes CIM-10 les plus SPÉCIFIQUES (4e ou 5e caractère)
- Ne propose que des diagnostics CLAIREMENT mentionnés dans le texte
- ATTENTION aux valeurs biologiques : ne code PAS un diagnostic si les valeurs sont dans les normes indiquées entre crochets [N: min-max]. Exemple : Créatinine 76 [N: 50-120] = NORMAL, pas d'insuffisance rénale.
DIAGNOSTIC PRINCIPAL : {dp_texte or "Non identifié"}
DAS DÉJÀ CODÉS :
{existing_str}
CONTEXTE CLINIQUE :
{ctx_str}
TEXTE MÉDICAL :
{text[:4000]}
Réponds UNIQUEMENT avec un objet JSON au format suivant, sans aucun texte avant ou après :
{{
"diagnostics_supplementaires": [
{{
"texte": "description du diagnostic",
"code_cim10": "X99.9",
"justification": "pourquoi ce DAS est pertinent pour le séjour"
}}
]
}}
Si aucun DAS supplémentaire n'est pertinent, retourne : {{"diagnostics_supplementaires": []}}"""
def extract_das_llm(
text: str,
contexte: dict,
existing_das: list[str],
dp_texte: str,
cache: OllamaCache | None = None,
) -> list[dict]:
"""Extrait des DAS supplémentaires via un pass LLM.
Args:
text: Texte médical complet.
contexte: Contexte patient (sexe, age, etc.).
existing_das: Liste des DAS déjà codés (texte + code).
dp_texte: Texte du diagnostic principal.
cache: Cache Ollama optionnel.
Returns:
Liste de dicts {texte, code_cim10, justification} pour les DAS détectés.
"""
import hashlib
# Clé de cache basée sur le hash du texte
text_hash = hashlib.md5(text[:4000].encode()).hexdigest()[:16]
cache_key_text = f"das_extract::{text_hash}"
# Vérifier le cache
if cache is not None:
cached = cache.get(cache_key_text, "das_llm")
if cached is not None:
logger.info("Cache hit pour extraction DAS LLM")
return cached.get("diagnostics_supplementaires", [])
# Construire le prompt et appeler Ollama
prompt = _build_prompt_das_extraction(text, contexte, existing_das, dp_texte)
result = call_ollama(prompt, temperature=0.1, max_tokens=2000)
if result is None:
logger.warning("Extraction DAS LLM : Ollama non disponible")
return []
das_list = result.get("diagnostics_supplementaires", [])
if not isinstance(das_list, list):
logger.warning("Extraction DAS LLM : format inattendu")
return []
# Stocker dans le cache
if cache is not None:
cache.put(cache_key_text, "das_llm", result)
logger.info("Extraction DAS LLM : %d diagnostics supplémentaires détectés", len(das_list))
return das_list
def enrich_dossier(dossier: DossierMedical) -> None:
"""Enrichit le DP et tous les DAS d'un dossier via le RAG.
Utilise un cache persistant et parallélise les appels Ollama
pour les DAS et actes CCAM (max_workers = OLLAMA_MAX_PARALLEL).
"""
cache = OllamaCache(OLLAMA_CACHE_PATH, OLLAMA_MODEL)
contexte = build_enriched_context(dossier)
# Phase 1 : DP seul (le contexte DAS en dépend)
if dossier.diagnostic_principal:
logger.info("RAG enrichissement DP : %s", dossier.diagnostic_principal.texte)
enrich_diagnostic(dossier.diagnostic_principal, contexte, est_dp=True, cache=cache)
# Mettre à jour le contexte avec le DP pour les DAS
if dossier.diagnostic_principal:
contexte["dp_texte"] = dossier.diagnostic_principal.texte
contexte["das_codes_existants"] = [
f"{d.cim10_suggestion} ({d.texte})"
for d in dossier.diagnostics_associes
if d.cim10_suggestion
]
# Phase 2 : DAS + Actes en parallèle
das_list = dossier.diagnostics_associes
actes_list = dossier.actes_ccam
if das_list or actes_list:
with ThreadPoolExecutor(max_workers=OLLAMA_MAX_PARALLEL) as executor:
futures = []
for das in das_list:
logger.info("RAG enrichissement DAS : %s", das.texte)
futures.append(executor.submit(enrich_diagnostic, das, contexte, False, cache))
for acte in actes_list:
logger.info("RAG enrichissement CCAM : %s", acte.texte)
futures.append(executor.submit(enrich_acte, acte, contexte, cache))
for f in as_completed(futures):
f.result() # propage les exceptions
cache.save()

View File

@@ -0,0 +1,242 @@
"""Détection heuristique de sévérité et CMA/CMS pour le codage GHM.
Phase 1 : heuristique basée sur des marqueurs textuels et des racines CIM-10.
Phase 2 (future) : tables CMA/CMS officielles ATIH.
"""
from __future__ import annotations
import json
import logging
import re
from dataclasses import dataclass, field
from typing import Optional
from .cim10_dict import load_dict, normalize_text
logger = logging.getLogger(__name__)
# --- Marqueurs de sévérité dans le texte ---
_SEVERE_MARKERS = {
"aigu", "aigue", "severe", "grave", "maligne", "malin",
"foudroyant", "foudroyante", "necrosant", "necrosante",
"septique", "decompense", "decompensee", "choc",
"defaillance", "hemorragique",
"fulminant", "fulminante", "massif", "massive", "critique",
}
_MODERATE_MARKERS = {
"modere", "moderee", "moderes", "moderees",
"subaigu", "subaigue", "subaiguë",
"persistant", "persistante", "recidivant", "recidivante",
}
_MILD_MARKERS = {
"chronique", "leger", "legere",
"benin", "benigne", "mineur", "mineure",
"superficiel", "superficielle", "stable",
}
# --- Racines CIM-10 fréquemment CMA (heuristique Phase 1) ---
# Ces racines sont connues pour être souvent classées CMA dans les tables ATIH.
_HEURISTIC_CMA_ROOTS: set[str] = {
# Infectieux
"A41", # Sepsis
"A40", # Septicémie streptococcique
# Hématologie / nutrition
"D64", # Anémie
"D65", # CIVD
"E46", # Dénutrition
"E87", # Troubles hydro-électrolytiques
"E86", # Déshydratation
# Métabolique
"E11", # Diabète type 2 (avec complications)
"E10", # Diabète type 1 (avec complications)
# Cardiovasculaire
"I48", # Fibrillation auriculaire
"I50", # Insuffisance cardiaque
"I26", # Embolie pulmonaire
"I80", # Thrombose veineuse
# Respiratoire
"J18", # Pneumopathie
"J96", # Insuffisance respiratoire
"J69", # Pneumopathie d'inhalation
# Rénal
"N17", # Insuffisance rénale aiguë
"N18", # Insuffisance rénale chronique
"N39", # Infection urinaire
# Hépatique
"K72", # Insuffisance hépatique
# Infectieux nosocomial
"T81", # Complications d'actes (infection post-op)
"T80", # Complications post-perfusion
}
_cma_levels: dict[str, int] | None = None
def _load_cma_levels() -> dict[str, int]:
"""Charge les niveaux CMA officiels depuis data/cma_levels.json (lazy-loaded)."""
global _cma_levels
if _cma_levels is not None:
return _cma_levels
from ..config import CMA_LEVELS_PATH
try:
data = json.loads(CMA_LEVELS_PATH.read_text(encoding="utf-8"))
_cma_levels = {k: int(v) for k, v in data.items()}
logger.debug("CMA levels chargés : %d codes", len(_cma_levels))
except FileNotFoundError:
logger.warning("Fichier CMA levels non trouvé : %s", CMA_LEVELS_PATH)
_cma_levels = {}
except Exception:
logger.warning("Erreur chargement CMA levels", exc_info=True)
_cma_levels = {}
return _cma_levels
@dataclass
class SeverityInfo:
"""Résultat de l'évaluation de sévérité d'un diagnostic."""
est_cma_probable: bool = False
niveau_severite: str = "non_evalue" # "leger" | "modere" | "severe" | "non_evalue"
niveau_cma: int = 1 # 1 (pas CMA), 2, 3 ou 4 (officiel ATIH)
marqueurs_trouves: list[str] = field(default_factory=list)
def _detect_severity_markers(text: str) -> tuple[str, list[str]]:
"""Détecte les marqueurs de sévérité dans un texte normalisé.
Returns:
(niveau, marqueurs_trouves) où niveau est "severe", "modere", "leger" ou "non_evalue".
"""
text_norm = normalize_text(text)
words = set(text_norm.split())
found_severe = words & _SEVERE_MARKERS
found_moderate = words & _MODERATE_MARKERS
found_mild = words & _MILD_MARKERS
all_found = list(found_severe | found_moderate | found_mild)
if found_severe:
return "severe", all_found
if found_moderate:
return "modere", all_found
if found_mild:
return "leger", all_found
return "non_evalue", []
def _is_heuristic_cma(code: str) -> bool:
"""Vérifie si un code CIM-10 est probablement CMA selon les racines heuristiques."""
if not code:
return False
code_upper = code.upper()
for root in _HEURISTIC_CMA_ROOTS:
if code_upper.startswith(root):
return True
return False
def evaluate_severity(diagnostic) -> SeverityInfo:
"""Évalue la sévérité d'un diagnostic (texte + code CIM-10).
Utilise en priorité les niveaux CMA officiels ATIH (2/3/4),
avec fallback sur l'heuristique par racines CIM-10.
Args:
diagnostic: Objet avec attributs texte, cim10_suggestion.
Returns:
SeverityInfo avec est_cma_probable, niveau_cma, niveau_severite, marqueurs_trouves.
"""
info = SeverityInfo()
# 1. Marqueurs textuels depuis le texte du diagnostic
texte = diagnostic.texte or ""
niveau, marqueurs = _detect_severity_markers(texte)
# 2. Chercher aussi dans le label du dictionnaire CIM-10
code = diagnostic.cim10_suggestion
if code:
cim10_dict = load_dict()
label = cim10_dict.get(code, "")
if label:
niveau_label, marqueurs_label = _detect_severity_markers(label)
# Prendre le niveau le plus sévère
severity_order = {"severe": 3, "modere": 2, "leger": 1, "non_evalue": 0}
if severity_order.get(niveau_label, 0) > severity_order.get(niveau, 0):
niveau = niveau_label
marqueurs = list(set(marqueurs + marqueurs_label))
info.niveau_severite = niveau
info.marqueurs_trouves = marqueurs
# 3. Lookup officiel CMA ATIH (prioritaire)
if code:
cma_levels = _load_cma_levels()
official_level = cma_levels.get(code)
if official_level:
info.niveau_cma = official_level
info.est_cma_probable = True
elif _is_heuristic_cma(code):
# Fallback heuristique → niveau 2
info.niveau_cma = 2
info.est_cma_probable = True
return info
def enrich_dossier_severity(dp, das_list: list) -> tuple[list[str], int, int]:
"""Enrichit les diagnostics d'un dossier avec les informations de sévérité.
Modifie les diagnostics en place (attributs est_cma, est_cms, niveau_severite).
Args:
dp: Diagnostic principal.
das_list: Liste des diagnostics associés.
Returns:
(alertes, cma_count, cms_count).
"""
alertes = []
# Évaluer le DP
if dp and dp.cim10_suggestion:
info = evaluate_severity(dp)
dp.niveau_severite = info.niveau_severite
dp.niveau_cma = info.niveau_cma
if info.est_cma_probable:
dp.est_cma = True
# Évaluer chaque DAS
cma_count = 0
cms_count = 0
for das in das_list:
if not das.cim10_suggestion:
continue
info = evaluate_severity(das)
das.niveau_severite = info.niveau_severite
das.niveau_cma = info.niveau_cma
if info.est_cma_probable:
das.est_cma = True
cma_count += 1
# CMS = CMA niveau 4 ou CMA sévère
if info.niveau_cma >= 4 or info.niveau_severite == "severe":
das.est_cms = True
cms_count += 1
alertes.append(
f"CMA niveau {info.niveau_cma} : '{das.texte}' ({das.cim10_suggestion}) — "
f"sévérité {info.niveau_severite}"
+ (f", marqueurs : {', '.join(info.marqueurs_trouves)}" if info.marqueurs_trouves else "")
)
if cma_count >= 2:
alertes.insert(0, f"{cma_count} CMA probables détectées — impact potentiel sur le niveau de sévérité GHM")
return alertes, cma_count, cms_count