feat: Phase 4 — viewer enrichi, non-cumul CCAM, fusion multi-PDFs + rebuild FAISS (21 141 vecteurs)

- Viewer : badges compteurs (DAS, actes, alertes, CMA), raisonnement LLM pliable, regroupement CCAM, navigation patient, alertes NON-CUMUL en rouge
- Non-cumul CCAM : 3 règles heuristiques (même base, même regroupement/jour, paires incompatibles)
- Fusion multi-PDFs : merge_dossiers() avec priorité Trackare, spécificité CIM-10, déduplication, champ source_files
- Index FAISS reconstruit : 21 141 vecteurs (CCAM dict 8 257 + CIM-10 alpha 306)
- 192 tests unitaires passent

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-11 12:43:34 +01:00
parent 7e69f994b0
commit 9d07894c6f
12 changed files with 1013 additions and 26 deletions

View File

@@ -0,0 +1,122 @@
"""Détection des incompatibilités de non-cumul entre actes CCAM.
Implémente 3 règles heuristiques basées sur les principes T2A :
1. Même code de base (7 caractères) avec activités différentes
2. Même regroupement chirurgical le même jour
3. Paires de regroupements incompatibles connues
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..config import ActeCCAM
logger = logging.getLogger(__name__)
# Regroupements chirurgicaux soumis à cumul restreint (un seul par jour)
REGROUPEMENT_UNIQUE_PAR_JOUR: set[str] = {
"ADC", # Actes de chirurgie
"ACO", # Actes de chirurgie orthopédique
"ADO", # Actes de chirurgie ORL
"ADA", # Actes de chirurgie abdominale/digestive
"ADE", # Actes de chirurgie endoscopique
}
# Paires de regroupements incompatibles
NONCUMUL_REGROUPEMENT_PAIRS: set[frozenset[str]] = {
frozenset({"ADC", "ADE"}),
frozenset({"ADC", "ADO"}),
frozenset({"ACO", "ADE"}),
}
def _get_regroupement(acte: ActeCCAM) -> str | None:
"""Récupère le regroupement d'un acte depuis le dictionnaire CCAM."""
if not acte.code_ccam_suggestion:
return None
try:
from .ccam_dict import load_dict
d = load_dict()
info = d.get(acte.code_ccam_suggestion)
if info and isinstance(info, dict):
return info.get("regroupement")
except Exception:
pass
return None
def check_noncumul(actes: list[ActeCCAM]) -> list[str]:
"""Vérifie les règles de non-cumul entre actes CCAM.
Args:
actes: Liste d'actes CCAM d'un dossier médical.
Returns:
Liste d'alertes de non-cumul détectées.
"""
if len(actes) < 2:
return []
alertes: list[str] = []
# Enrichir les actes avec leur regroupement
actes_info: list[tuple[ActeCCAM, str | None]] = [
(acte, _get_regroupement(acte)) for acte in actes
]
# Règle 1 : même code de base (7 premiers caractères), activités différentes
codes_base: dict[str, list[ActeCCAM]] = {}
for acte in actes:
code = acte.code_ccam_suggestion
if code and len(code) >= 7:
base = code[:7]
codes_base.setdefault(base, []).append(acte)
for base, group in codes_base.items():
if len(group) > 1:
codes_full = [a.code_ccam_suggestion for a in group]
alertes.append(
f"NON-CUMUL: codes de même base {base} avec variantes "
f"({', '.join(codes_full)}) — vérifier la facturation"
)
# Règle 2 : même regroupement chirurgical le même jour
regroup_par_jour: dict[tuple[str, str | None], list[ActeCCAM]] = {}
for acte, regroup in actes_info:
if regroup and regroup in REGROUPEMENT_UNIQUE_PAR_JOUR:
key = (regroup, acte.date)
regroup_par_jour.setdefault(key, []).append(acte)
for (regroup, date), group in regroup_par_jour.items():
if len(group) > 1:
codes = [a.code_ccam_suggestion or "?" for a in group]
jour = f" le {date}" if date else ""
alertes.append(
f"NON-CUMUL: {len(group)} actes du regroupement {regroup}{jour} "
f"({', '.join(codes)}) — cumul restreint"
)
# Règle 3 : paires de regroupements incompatibles
regroups_seen: list[tuple[str, ActeCCAM]] = [
(r, a) for a, r in actes_info if r
]
checked: set[frozenset[int]] = set()
for i, (r1, a1) in enumerate(regroups_seen):
for j, (r2, a2) in enumerate(regroups_seen):
if i >= j:
continue
pair_key = frozenset({i, j})
if pair_key in checked:
continue
checked.add(pair_key)
pair = frozenset({r1, r2})
if pair in NONCUMUL_REGROUPEMENT_PAIRS:
alertes.append(
f"NON-CUMUL: regroupements incompatibles {r1}/{r2} "
f"({a1.code_ccam_suggestion or '?'} + {a2.code_ccam_suggestion or '?'})"
)
return alertes

View File

@@ -123,6 +123,9 @@ def extract_medical_info(
# Post-processing : enrichissement sévérité (CMA/CMS heuristique)
_apply_severity_rules(dossier)
# Post-processing : détection non-cumul actes CCAM
_apply_noncumul_rules(dossier)
return dossier
@@ -702,6 +705,16 @@ def _apply_severity_rules(dossier: DossierMedical) -> None:
logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True)
def _apply_noncumul_rules(dossier: DossierMedical) -> None:
"""Détecte les incompatibilités de non-cumul entre actes CCAM."""
try:
from .ccam_noncumul import check_noncumul
alertes = check_noncumul(dossier.actes_ccam)
dossier.alertes_codage.extend(alertes)
except Exception:
logger.warning("Erreur lors de la vérification du non-cumul CCAM", exc_info=True)
def _lookup_cim10(text: str) -> str | None:
"""Cherche un code CIM-10 pour un texte donné.

246
src/medical/fusion.py Normal file
View File

@@ -0,0 +1,246 @@
"""Fusion de dossiers médicaux multi-PDFs pour un même patient.
Combine les informations de plusieurs documents (Trackare, CRH, CRO) en un
dossier unique avec des règles de priorité et de déduplication.
"""
from __future__ import annotations
import logging
from ..config import (
ActeCCAM,
BiologieCle,
Diagnostic,
DossierMedical,
Imagerie,
Sejour,
Traitement,
)
logger = logging.getLogger(__name__)
# Priorité des types de documents pour les données de séjour
_DOC_PRIORITY = {"trackare": 0, "crh": 1, "cro": 2}
def _cim10_specificity(code: str | None) -> int:
"""Score de spécificité d'un code CIM-10 : longueur sans le point."""
if not code:
return 0
return len(code.replace(".", ""))
def _prefer_most_specific_dp(dossiers: list[DossierMedical]) -> Diagnostic | None:
"""Sélectionne le DP le plus spécifique parmi tous les dossiers."""
candidates: list[tuple[Diagnostic, int]] = []
for d in dossiers:
if d.diagnostic_principal:
spec = _cim10_specificity(d.diagnostic_principal.cim10_suggestion)
candidates.append((d.diagnostic_principal, spec))
if not candidates:
return None
# Tri : spécificité décroissante, puis confiance (high > medium > low)
conf_order = {"high": 0, "medium": 1, "low": 2}
candidates.sort(
key=lambda x: (-x[1], conf_order.get(x[0].cim10_confidence or "", 3))
)
return candidates[0][0]
def _merge_sejour(dossiers: list[DossierMedical]) -> Sejour:
"""Fusionne les informations de séjour avec priorité Trackare > CRH > CRO."""
# Trier par priorité de type de document
sorted_dossiers = sorted(
dossiers,
key=lambda d: _DOC_PRIORITY.get(d.document_type, 99),
)
merged = Sejour()
for d in sorted_dossiers:
s = d.sejour
if s.sexe and not merged.sexe:
merged.sexe = s.sexe
if s.age is not None and merged.age is None:
merged.age = s.age
if s.date_entree and not merged.date_entree:
merged.date_entree = s.date_entree
if s.date_sortie and not merged.date_sortie:
merged.date_sortie = s.date_sortie
if s.duree_sejour is not None and merged.duree_sejour is None:
merged.duree_sejour = s.duree_sejour
if s.mode_entree and not merged.mode_entree:
merged.mode_entree = s.mode_entree
if s.mode_sortie and not merged.mode_sortie:
merged.mode_sortie = s.mode_sortie
if s.imc is not None and merged.imc is None:
merged.imc = s.imc
if s.poids is not None and merged.poids is None:
merged.poids = s.poids
if s.taille is not None and merged.taille is None:
merged.taille = s.taille
return merged
def _dedup_diagnostics(all_das: list[Diagnostic]) -> list[Diagnostic]:
"""Déduplique les diagnostics associés par code CIM-10, garde la meilleure confiance."""
conf_order = {"high": 0, "medium": 1, "low": 2}
seen: dict[str | None, Diagnostic] = {}
for d in all_das:
key = d.cim10_suggestion
if key is None:
# Sans code, dédup par texte normalisé
key = f"__text__{d.texte.lower().strip()}"
if key not in seen:
seen[key] = d
else:
existing = seen[key]
# Garder celui avec la meilleure confiance
if conf_order.get(d.cim10_confidence or "", 3) < conf_order.get(
existing.cim10_confidence or "", 3
):
seen[key] = d
return list(seen.values())
def _dedup_actes(all_actes: list[ActeCCAM]) -> list[ActeCCAM]:
"""Déduplique les actes CCAM par code."""
seen: dict[str | None, ActeCCAM] = {}
for a in all_actes:
key = a.code_ccam_suggestion
if key is None:
key = f"__text__{a.texte.lower().strip()}"
if key not in seen:
seen[key] = a
else:
existing = seen[key]
# Garder celui avec date si possible
if a.date and not existing.date:
seen[key] = a
return list(seen.values())
def merge_dossiers(dossiers: list[DossierMedical]) -> DossierMedical:
"""Fusionne plusieurs dossiers médicaux d'un même patient.
Args:
dossiers: Liste de DossierMedical issus de PDFs différents.
Returns:
Un DossierMedical fusionné.
"""
if len(dossiers) == 1:
result = dossiers[0].model_copy(deep=True)
result.source_files = [result.source_file]
return result
merged = DossierMedical()
# Source files
merged.source_files = [d.source_file for d in dossiers if d.source_file]
# Séjour
merged.sejour = _merge_sejour(dossiers)
# Diagnostic principal : le plus spécifique
merged.diagnostic_principal = _prefer_most_specific_dp(dossiers)
# Collecter tous les DAS + DP non retenus comme DAS
all_das: list[Diagnostic] = []
for d in dossiers:
all_das.extend(d.diagnostics_associes)
# Si le DP de ce dossier est différent du DP fusionné, l'ajouter comme DAS
if (
d.diagnostic_principal
and merged.diagnostic_principal
and d.diagnostic_principal.cim10_suggestion
!= merged.diagnostic_principal.cim10_suggestion
):
all_das.append(d.diagnostic_principal)
merged.diagnostics_associes = _dedup_diagnostics(all_das)
# Actes CCAM
all_actes: list[ActeCCAM] = []
for d in dossiers:
all_actes.extend(d.actes_ccam)
merged.actes_ccam = _dedup_actes(all_actes)
# Biologie : union, dédup par (test, valeur)
bio_seen: set[tuple[str, str | None]] = set()
for d in dossiers:
for b in d.biologie_cle:
key = (b.test, b.valeur)
if key not in bio_seen:
merged.biologie_cle.append(b)
bio_seen.add(key)
# Imagerie : union, dédup par (type, conclusion)
img_seen: set[tuple[str, str | None]] = set()
for d in dossiers:
for i in d.imagerie:
key = (i.type, i.conclusion)
if key not in img_seen:
merged.imagerie.append(i)
img_seen.add(key)
# Traitements : union, dédup par médicament (normalisé)
med_seen: set[str] = set()
for d in dossiers:
for t in d.traitements_sortie:
key = t.medicament.lower().strip()
if key not in med_seen:
merged.traitements_sortie.append(t)
med_seen.add(key)
# Antécédents : union, dédup par texte normalisé
ant_seen: set[str] = set()
for d in dossiers:
for a in d.antecedents:
key = a.lower().strip()
if key not in ant_seen:
merged.antecedents.append(a)
ant_seen.add(key)
# Complications : union, dédup par texte normalisé
comp_seen: set[str] = set()
for d in dossiers:
for c in d.complications:
key = c.lower().strip()
if key not in comp_seen:
merged.complications.append(c)
comp_seen.add(key)
# Alertes : alerte de fusion en tête + union
merged.alertes_codage = [f"FUSION: {len(dossiers)} documents fusionnés"]
alert_seen: set[str] = set()
for d in dossiers:
for a in d.alertes_codage:
if a not in alert_seen:
merged.alertes_codage.append(a)
alert_seen.add(a)
# Document type : le type prioritaire
sorted_by_prio = sorted(
dossiers,
key=lambda d: _DOC_PRIORITY.get(d.document_type, 99),
)
merged.document_type = sorted_by_prio[0].document_type
logger.info(
"Fusion de %d dossiers : DP=%s, %d DAS, %d actes",
len(dossiers),
merged.diagnostic_principal.cim10_suggestion if merged.diagnostic_principal else "aucun",
len(merged.diagnostics_associes),
len(merged.actes_ccam),
)
return merged