Files
t2a_v2/src/medical/validation_pipeline.py
dom 77ffbc56d4 feat: CODE_CORRECTIONS 12 règles déterministes + sentinel REJECT
- CODE_CORRECTIONS passe de 1 à 12 règles (corrections + rejets)
- REJECT_SENTINEL pour codes trop vagues (R69, R69.8, Z53.9, D71.9) ou inexistants
- Corrections : J96.0→J96.00, I50.9→I50.1 (IC gauche), N17.9→N17.0 (NTA),
  E11.9→E11.65 (DT2 insuline), K92.2→K92.0 (hématémèse), G40.9→G40.3 (épilepsie)
- _apply_code_corrections() gère REJECT : DP→None, DAS→supprimé + alerte
- 21 tests paramétrés (corrections, rejets, non-corrections)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-20 11:01:06 +01:00

377 lines
14 KiB
Python

"""Pipeline de validation et post-traitement des codes CIM-10 et CCAM."""
from __future__ import annotations
import logging
from .cim10_dict import lookup as dict_lookup, normalize_code, validate_code as cim10_validate
from .ccam_dict import validate_code as ccam_validate
from .das_filter import correct_known_miscodes, apply_semantic_dedup, REJECT_SENTINEL
from ..config import Diagnostic, DossierMedical
from .diagnostic_extraction import CIM10_MAP
logger = logging.getLogger(__name__)
_INVALID_CODE_PATTERNS = {"aucun", "none", "n/a", "non_codable", "aucun_code_valide", "inconnu"}
def _fallback_cim10(texte: str) -> str | None:
"""Tente de trouver un code CIM-10 via le dictionnaire à partir du texte diagnostic."""
code = dict_lookup(texte, domain_overrides=CIM10_MAP)
if code:
is_valid, _ = cim10_validate(code)
if is_valid:
return code
return None
def _validate_ccam(dossier: DossierMedical) -> None:
"""Valide les codes CCAM suggérés contre le dictionnaire officiel."""
for acte in dossier.actes_ccam:
if not acte.code_ccam_suggestion:
acte.validite = "non_verifie"
continue
is_valid, desc = ccam_validate(acte.code_ccam_suggestion)
if is_valid:
acte.validite = "valide"
else:
acte.validite = "non_verifie"
dossier.alertes_codage.append(
f"CCAM {acte.code_ccam_suggestion} ({acte.texte}) : code absent du dictionnaire CCAM V81"
)
def _validate_cim10(dossier: DossierMedical) -> None:
"""Valide les codes CIM-10 suggérés par Ollama contre le dictionnaire."""
diags: list[tuple[str, Diagnostic]] = []
if dossier.diagnostic_principal:
diags.append(("DP", dossier.diagnostic_principal))
for das in dossier.diagnostics_associes:
diags.append(("DAS", das))
for type_diag, diag in diags:
if not diag.cim10_suggestion:
continue
# Rejeter les hallucinations
if diag.cim10_suggestion.lower().strip() in _INVALID_CODE_PATTERNS:
fallback = _fallback_cim10(diag.texte)
if fallback:
dossier.alertes_codage.append(
f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} » → fallback {fallback}"
)
diag.cim10_suggestion = fallback
diag.cim10_confidence = "medium"
else:
dossier.alertes_codage.append(
f"CIM-10 {type_diag} ({diag.texte}) : code rejeté « {diag.cim10_suggestion} »"
)
diag.cim10_suggestion = None
diag.cim10_confidence = None
continue
# Normaliser le format (K810 → K81.0)
diag.cim10_suggestion = normalize_code(diag.cim10_suggestion)
# Valider contre le dictionnaire
is_valid, label = cim10_validate(diag.cim10_suggestion)
if not is_valid:
fallback = _fallback_cim10(diag.texte)
if fallback:
dossier.alertes_codage.append(
f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code invalide → fallback {fallback}"
)
diag.cim10_suggestion = fallback
diag.cim10_confidence = "medium"
else:
dossier.alertes_codage.append(
f"CIM-10 {type_diag} {diag.cim10_suggestion} ({diag.texte}) : code absent du dictionnaire CIM-10"
)
diag.cim10_confidence = "low"
def _apply_code_corrections(dossier: DossierMedical) -> None:
"""Corrige les codes CIM-10 systématiquement mal attribués par le LLM.
Si un code est marqué REJECT_SENTINEL, il est retiré :
- DP : code mis à None (pas de suppression du diagnostic principal)
- DAS : diagnostic supprimé de la liste
"""
# DP
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
dp = dossier.diagnostic_principal
corrected = correct_known_miscodes(dp.cim10_suggestion, dp.texte)
if corrected == REJECT_SENTINEL:
logger.info(" Code rejeté : %s pour « %s » (DP) — code supprimé", dp.cim10_suggestion, dp.texte)
dossier.alertes_codage.append(
f"Code rejeté : {dp.cim10_suggestion} ({dp.texte}) — trop vague ou inexistant"
)
dp.cim10_suggestion = None
dp.cim10_confidence = None
elif corrected:
logger.info(" Code corrigé : %s%s pour « %s »", dp.cim10_suggestion, corrected, dp.texte)
dp.cim10_suggestion = corrected
# DAS
das_to_keep = []
for diag in dossier.diagnostics_associes:
if not diag.cim10_suggestion:
das_to_keep.append(diag)
continue
corrected = correct_known_miscodes(diag.cim10_suggestion, diag.texte)
if corrected == REJECT_SENTINEL:
logger.info(" Code rejeté : %s pour « %s » (DAS) — diagnostic supprimé", diag.cim10_suggestion, diag.texte)
dossier.alertes_codage.append(
f"Code rejeté : {diag.cim10_suggestion} ({diag.texte}) — trop vague ou inexistant"
)
continue # ne pas ajouter à das_to_keep
if corrected:
logger.info(" Code corrigé : %s%s pour « %s »", diag.cim10_suggestion, corrected, diag.texte)
diag.cim10_suggestion = corrected
das_to_keep.append(diag)
dossier.diagnostics_associes = das_to_keep
def _apply_exclusion_rules(dossier: DossierMedical) -> None:
"""Applique les règles d'exclusion symptôme vs diagnostic précis."""
try:
from .exclusion_rules import check_exclusions
result = check_exclusions(dossier.diagnostic_principal, dossier.diagnostics_associes)
dossier.diagnostics_associes = result.cleaned_das
dossier.alertes_codage.extend(result.warnings)
if result.excluded:
logger.info(
" Exclusions : %d DAS symptomatiques exclus",
len(result.excluded),
)
except Exception:
logger.warning("Erreur lors de l'application des règles d'exclusion", exc_info=True)
def _apply_severity_rules(dossier: DossierMedical) -> None:
"""Enrichit les diagnostics avec les informations de sévérité heuristique."""
try:
from .severity import enrich_dossier_severity
alertes, _cma_count, _cms_count = enrich_dossier_severity(
dossier.diagnostic_principal, dossier.diagnostics_associes,
)
dossier.alertes_codage.extend(alertes)
except Exception:
logger.warning("Erreur lors de l'évaluation de sévérité", exc_info=True)
def _apply_noncumul_rules(dossier: DossierMedical) -> None:
"""Détecte les incompatibilités de non-cumul entre actes CCAM."""
try:
from .ccam_noncumul import check_noncumul
alertes = check_noncumul(dossier.actes_ccam)
dossier.alertes_codage.extend(alertes)
except Exception:
logger.warning("Erreur lors de la vérification du non-cumul CCAM", exc_info=True)
def _is_dp_family_redundant(das_code: str, dp_code: str) -> bool:
"""True si le DAS est redondant avec le DP (même code, parent/enfant, ou même famille)."""
if das_code == dp_code:
return True
# Relation parent/enfant → toujours redondant
das_norm = das_code.replace(".", "")
dp_norm = dp_code.replace(".", "")
if das_norm.startswith(dp_norm) or dp_norm.startswith(das_norm):
return True
# Même famille 3 chars, sauf exceptions
dp_family = dp_code[:3]
if das_code[:3] == dp_family:
# S/T (trauma) : sites différents → garder
if dp_family[0] in ("S", "T"):
return False
# E10-E14 (diabète) : complications différentes → garder
if dp_family[0] == "E" and dp_family[1:].isdigit() and 10 <= int(dp_family[1:]) <= 14:
return False
return True
return False
def _remove_das_equal_dp(dossier: DossierMedical) -> None:
"""Retire les DAS redondants avec le DP (même code, famille, ou sémantique)."""
dp_code = dossier.diagnostic_principal.cim10_suggestion if dossier.diagnostic_principal else None
if not dp_code:
return
before = len(dossier.diagnostics_associes)
dossier.diagnostics_associes = [
d for d in dossier.diagnostics_associes
if not d.cim10_suggestion or not _is_dp_family_redundant(d.cim10_suggestion, dp_code)
]
removed = before - len(dossier.diagnostics_associes)
if removed:
logger.info(" DAS≈DP : %d DAS retiré(s) (famille %s du DP)", removed, dp_code[:3])
# Redondances sémantiques entre DAS
dossier.diagnostics_associes = apply_semantic_dedup(dossier.diagnostics_associes)
def _track_item(item, search_key: str, page_tracker, search_text: str) -> bool:
"""Cherche la page source et l'extrait pour un item avec source_page/source_excerpt."""
if item.source_page is not None:
return False
if not search_key:
return False
page = page_tracker.find_page_for_text(search_key, search_text)
if page:
item.source_page = page
item.source_excerpt = page_tracker.extract_excerpt(search_key, search_text)
return True
return False
def _apply_source_tracking(dossier: DossierMedical, page_tracker, search_text: str) -> None:
"""Ajoute la traçabilité source (page + extrait) à tous les éléments du dossier.
Cherche le texte de chaque élément dans le texte source pour retrouver
la page d'origine et extraire un passage contextualisé.
"""
tracked = 0
total = 0
# Diagnostics (DP + DAS)
all_diags: list[Diagnostic] = []
if dossier.diagnostic_principal:
all_diags.append(dossier.diagnostic_principal)
all_diags.extend(dossier.diagnostics_associes)
for diag in all_diags:
total += 1
if _track_item(diag, diag.texte, page_tracker, search_text):
tracked += 1
# Biologie
for b in dossier.biologie_cle:
total += 1
search_key = f"{b.test}: {b.valeur}" if b.valeur else b.test
if _track_item(b, search_key, page_tracker, search_text):
tracked += 1
elif b.valeur and _track_item(b, b.test, page_tracker, search_text):
tracked += 1
# Imagerie
for img in dossier.imagerie:
total += 1
search_key = img.type
if _track_item(img, search_key, page_tracker, search_text):
tracked += 1
elif img.conclusion and _track_item(img, img.conclusion[:50], page_tracker, search_text):
tracked += 1
# Traitements
for t in dossier.traitements_sortie:
total += 1
if _track_item(t, t.medicament, page_tracker, search_text):
tracked += 1
# Actes CCAM
for a in dossier.actes_ccam:
total += 1
if _track_item(a, a.texte, page_tracker, search_text):
tracked += 1
# Antécédents
for ant in dossier.antecedents:
total += 1
if _track_item(ant, ant.texte, page_tracker, search_text):
tracked += 1
# Complications
for comp in dossier.complications:
total += 1
if _track_item(comp, comp.texte, page_tracker, search_text):
tracked += 1
if tracked:
logger.info(" Traçabilité source : %d/%d éléments localisés", tracked, total)
def _validate_justifications(dossier: DossierMedical) -> None:
"""Validation croisée de tous les diagnostics via un appel LLM unique.
Vérifie la cohérence, les preuves cliniques et la spécificité des codes.
Ajuste la confiance si la justification est faible et ajoute des alertes QC.
"""
try:
from .ollama_client import call_ollama
from .clinical_context import build_enriched_context, format_enriched_context
except ImportError:
logger.warning("Module clinical_context non disponible pour la validation QC")
return
all_diags: list[tuple[str, Diagnostic]] = []
if dossier.diagnostic_principal:
all_diags.append(("DP", dossier.diagnostic_principal))
for das in dossier.diagnostics_associes:
all_diags.append(("DAS", das))
if not all_diags:
return
# Construire le résumé des codes à valider
codes_section = ""
for i, (type_diag, diag) in enumerate(all_diags, 1):
code = diag.cim10_suggestion or "?"
justif = (diag.justification or "")[:150]
preuves = ", ".join(p.element for p in diag.preuves_cliniques[:3]) or "aucune"
codes_section += f"{i}. [{type_diag}] {code}{diag.texte}\n"
codes_section += f" Justification: {justif}\n"
codes_section += f" Preuves: {preuves}\n\n"
ctx = build_enriched_context(dossier)
ctx_str = format_enriched_context(ctx)
from ..prompts import QC_VALIDATION
prompt = QC_VALIDATION.format(ctx_str=ctx_str, codes_section=codes_section)
try:
result = call_ollama(prompt, temperature=0.1, max_tokens=2500, role="qc")
except Exception:
logger.warning("Erreur lors de l'appel Ollama pour validation QC", exc_info=True)
return
if result is None:
return
# Appliquer les ajustements
validations = result.get("validations", [])
for v in validations:
if not isinstance(v, dict):
continue
num = v.get("numero")
if not isinstance(num, int) or num < 1 or num > len(all_diags):
continue
type_diag, diag = all_diags[num - 1]
conf = v.get("confidence_recommandee")
verdict = v.get("verdict")
commentaire = v.get("commentaire", "")
if conf in ("high", "medium", "low") and conf != diag.cim10_confidence:
old = diag.cim10_confidence
diag.cim10_confidence = conf
if old and conf != old:
dossier.alertes_codage.append(
f"QC: {type_diag} {diag.cim10_suggestion} confiance {old}\u2192{conf} \u2014 {commentaire}"
)
if verdict == "supprimer" and type_diag == "DAS":
dossier.alertes_codage.append(
f"QC: DAS {diag.cim10_suggestion} ({diag.texte}) à reconsidérer \u2014 {commentaire}"
)
alertes_globales = result.get("alertes_globales", [])
if isinstance(alertes_globales, str):
alertes_globales = [alertes_globales]
for a in alertes_globales:
if isinstance(a, str) and a.strip():
dossier.alertes_codage.append(f"QC: {a}")
logger.info(" QC batch : %d validations, %d alertes globales",
len(validations), len(alertes_globales))