feat: 8 optimisations vitesse + qualité pipeline CIM-10

1. Parallélisation intra-dossier (RAG + DP selector en parallèle)
2. Cache embeddings FAISS (_embed_cached avec LRU)
3. Lazy loading edsnlp (déjà singleton, vérifié)
4. Prompt DP amélioré avec règles PMSI/ATIH
5. Validation croisée Bio↔DAS (cohérence biologie/diagnostics)
6. Resélection DP après vetos/exclusions (reselect_dp_after_vetos)
7. Pré-filtrage R-codes (déjà implémenté dans exclusion_rules)
8. Cache embeddings texte (intégré dans rag_search)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-03-07 22:18:07 +01:00
parent e6bd7406a4
commit 63f61f196b
5 changed files with 268 additions and 30 deletions

View File

@@ -52,6 +52,7 @@ from .validation_pipeline import (
_remove_das_equal_dp, _remove_das_equal_dp,
_apply_source_tracking, _apply_source_tracking,
_validate_justifications, _validate_justifications,
_validate_bio_das_coherence,
) )
@@ -90,18 +91,42 @@ def extract_medical_info(
if use_rag: if use_rag:
_extract_das_llm(anonymized_text, dossier) _extract_das_llm(anonymized_text, dossier)
if use_rag: # Optimisation #1 : paralléliser enrichissement RAG et sélection DP
_enrich_with_rag(dossier) _dp_selection_needed = use_rag and dossier.document_type != "trackare"
# NUKE-3 : sélection DP type DIM (CRH uniquement) if use_rag or _dp_selection_needed:
if dossier.document_type != "trackare": from concurrent.futures import ThreadPoolExecutor, as_completed
try:
def _task_enrich():
if use_rag:
_enrich_with_rag(dossier)
def _task_select_dp():
if not _dp_selection_needed:
return None
from .dp_selector import select_dp, build_synthese from .dp_selector import select_dp, build_synthese
synthese = build_synthese(dossier, parsed_data) synthese = build_synthese(dossier, parsed_data)
selection = select_dp( return select_dp(dossier, synthese, config={"llm_enabled": use_rag})
dossier, synthese, config={"llm_enabled": use_rag},
) dp_selection_result = None
with ThreadPoolExecutor(max_workers=2) as pool:
fut_enrich = pool.submit(_task_enrich)
fut_dp = pool.submit(_task_select_dp)
# Attendre les deux tâches
for fut in as_completed([fut_enrich, fut_dp]):
exc = fut.exception()
if exc and fut is fut_dp:
logger.error("NUKE-3: erreur sélection DP", exc_info=exc)
dossier.quality_flags["dp_selection_status"] = "error"
dossier.alertes_codage.append("QUALITE DEGRADEE : sélection DP (NUKE-3) en erreur")
elif exc:
logger.error("RAG enrichissement échoué", exc_info=exc)
if not fut_dp.exception():
dp_selection_result = fut_dp.result()
# Appliquer la sélection DP après parallélisation
if dp_selection_result is not None:
selection = dp_selection_result
dossier.dp_selection = selection dossier.dp_selection = selection
if selection.chosen_code: if selection.chosen_code:
@@ -110,10 +135,6 @@ def extract_medical_info(
if dossier.diagnostic_principal else None if dossier.diagnostic_principal else None
) )
has_multiple = len(selection.candidates) >= 2 has_multiple = len(selection.candidates) >= 2
# MAJ DP si :
# - DP existant et NUKE-3 sélectionne un code différent
# - Pas de DP mais plusieurs candidats (choix non trivial)
# Le cas "1 seul candidat, pas de DP" est géré par RULE-DAS-TO-DP
should_update = ( should_update = (
(current_code and selection.chosen_code != current_code) (current_code and selection.chosen_code != current_code)
or (not current_code and has_multiple) or (not current_code and has_multiple)
@@ -130,6 +151,34 @@ def extract_medical_info(
dossier.alertes_codage.append( dossier.alertes_codage.append(
f"NUKE-3 REVIEW: DP ambigu — {selection.reason}" f"NUKE-3 REVIEW: DP ambigu — {selection.reason}"
) )
elif dossier.document_type != "trackare":
# Fallback sans RAG : sélection DP seule
try:
from .dp_selector import select_dp, build_synthese
synthese = build_synthese(dossier, parsed_data)
selection = select_dp(dossier, synthese, config={"llm_enabled": False})
dossier.dp_selection = selection
if selection.chosen_code:
current_code = (
dossier.diagnostic_principal.cim10_suggestion
if dossier.diagnostic_principal else None
)
has_multiple = len(selection.candidates) >= 2
should_update = (
(current_code and selection.chosen_code != current_code)
or (not current_code and has_multiple)
)
if should_update:
dossier.diagnostic_principal = Diagnostic(
texte=selection.chosen_term or "",
cim10_suggestion=selection.chosen_code,
cim10_confidence=selection.confidence,
source="nuke3",
)
if selection.verdict == "REVIEW":
dossier.alertes_codage.append(
f"NUKE-3 REVIEW: DP ambigu — {selection.reason}"
)
except Exception: except Exception:
logger.error("NUKE-3: erreur sélection DP", exc_info=True) logger.error("NUKE-3: erreur sélection DP", exc_info=True)
dossier.quality_flags["dp_selection_status"] = "error" dossier.quality_flags["dp_selection_status"] = "error"
@@ -156,6 +205,17 @@ def extract_medical_info(
# Post-processing : retirer DAS dont le code est identique au DP # Post-processing : retirer DAS dont le code est identique au DP
_remove_das_equal_dp(dossier) _remove_das_equal_dp(dossier)
# Post-processing : cohérence DAS ↔ biologie
_validate_bio_das_coherence(dossier)
# Post-processing : resélection DP si exclu par vetos/exclusions
if dossier.document_type != "trackare":
try:
from .dp_selector import reselect_dp_after_vetos
reselect_dp_after_vetos(dossier, parsed_data)
except Exception:
logger.error("NUKE-3 reselect après vetos échouée", exc_info=True)
# Post-processing : validation justifications (QC batch) # Post-processing : validation justifications (QC batch)
if use_rag: if use_rag:
_validate_justifications(dossier) _validate_justifications(dossier)

View File

@@ -650,3 +650,59 @@ def select_dp(
debug_scores={"top1": candidates[0].score, "top2": candidates[1].score, "delta": delta}, debug_scores={"top1": candidates[0].score, "top2": candidates[1].score, "delta": delta},
) )
return _enforce_confirmed_rules(selection, synthese) return _enforce_confirmed_rules(selection, synthese)
# ---------------------------------------------------------------------------
# Optimisation #6 : resélection DP après vetos / exclusions
# ---------------------------------------------------------------------------
def reselect_dp_after_vetos(dossier: DossierMedical, parsed_data: dict) -> None:
"""Re-lance la sélection DP si le DP actuel a été supprimé par les vetos/exclusions.
Appelé après _apply_exclusion_rules / _remove_das_equal_dp : si le code DP
ne figure plus dans les candidats valides, on relance select_dp() pour
choisir un nouveau DP parmi les candidats restants.
"""
if not dossier.dp_selection or not dossier.diagnostic_principal:
return
current_dp_code = dossier.diagnostic_principal.cim10_suggestion
if not current_dp_code:
return
# Vérifier si le DP actuel est toujours présent dans les DAS ou le DP lui-même
das_codes = {d.cim10_suggestion for d in dossier.diagnostics_associes if d.cim10_suggestion}
all_codes = das_codes | {current_dp_code}
# Si le DP a été retiré (par exclusion ou veto), resélectionner
vetoed_codes = set()
for alerte in dossier.alertes_codage:
if "exclu" in alerte.lower() or "retiré" in alerte.lower() or "veto" in alerte.lower():
# Extraire le code CIM-10 de l'alerte (format courant: "... CODE ...")
import re as _re
codes_in_alert = _re.findall(r'\b([A-Z]\d{2}(?:\.\d{1,2})?)\b', alerte)
vetoed_codes.update(codes_in_alert)
if current_dp_code not in vetoed_codes:
return
logger.info("NUKE-3 resélection DP : %s a été exclu/vetoé, relance", current_dp_code)
try:
synthese = build_synthese(dossier, parsed_data)
new_selection = select_dp(dossier, synthese, config={"llm_enabled": False})
if new_selection.chosen_code and new_selection.chosen_code != current_dp_code:
dossier.dp_selection = new_selection
dossier.diagnostic_principal = Diagnostic(
texte=new_selection.chosen_term or "",
cim10_suggestion=new_selection.chosen_code,
cim10_confidence=new_selection.confidence,
source="nuke3-reselect",
)
dossier.alertes_codage.append(
f"NUKE-3 RESELECT: DP changé {current_dp_code}{new_selection.chosen_code} "
f"(ancien DP exclu/vetoé)"
)
except Exception:
logger.error("NUKE-3 resélection DP échouée", exc_info=True)

View File

@@ -31,6 +31,11 @@ _embed_failed = False # Sentinelle pour éviter les retries infinis
_reranker_model = None _reranker_model = None
_reranker_lock = threading.Lock() _reranker_lock = threading.Lock()
# Cache d'embeddings : évite de recalculer les vecteurs pour les mêmes textes
_embedding_cache: dict[str, "numpy.ndarray"] = {}
_embedding_cache_lock = threading.Lock()
_EMBEDDING_CACHE_MAX = 5000
# Score minimum de similarité FAISS pour retenir un résultat # Score minimum de similarité FAISS pour retenir un résultat
_MIN_SCORE = 0.3 _MIN_SCORE = 0.3
# Seuil rehaussé pour le contexte CPAM (filtrage plus agressif du bruit) # Seuil rehaussé pour le contexte CPAM (filtrage plus agressif du bruit)
@@ -132,6 +137,41 @@ def _rerank(query: str, results: list[dict], top_k: int) -> list[dict]:
return results[:top_k] return results[:top_k]
def _embed_cached(texts: list[str]) -> "numpy.ndarray":
"""Calcule les embeddings avec cache. Retourne un array (N, dim)."""
import numpy as np
model = _get_embed_model()
results = [None] * len(texts)
to_compute: list[tuple[int, str]] = []
with _embedding_cache_lock:
for i, t in enumerate(texts):
cached = _embedding_cache.get(t)
if cached is not None:
results[i] = cached
else:
to_compute.append((i, t))
if to_compute:
new_texts = [t for _, t in to_compute]
new_vecs = model.encode(new_texts, normalize_embeddings=True, batch_size=64)
new_vecs = np.array(new_vecs, dtype=np.float32)
with _embedding_cache_lock:
for j, (i, t) in enumerate(to_compute):
vec = new_vecs[j]
results[i] = vec
_embedding_cache[t] = vec
# Eviction simple si trop d'entrées
if len(_embedding_cache) > _EMBEDDING_CACHE_MAX:
keys = list(_embedding_cache.keys())
for k in keys[:len(keys) // 5]:
del _embedding_cache[k]
return np.array(results, dtype=np.float32)
def search_similar(query: str, top_k: int = 10) -> list[dict]: def search_similar(query: str, top_k: int = 10) -> list[dict]:
"""Recherche les passages les plus similaires dans l'index FAISS. """Recherche les passages les plus similaires dans l'index FAISS.
@@ -154,9 +194,7 @@ def search_similar(query: str, top_k: int = 10) -> list[dict]:
faiss_index, metadata = result faiss_index, metadata = result
model = _get_embed_model() query_vec = _embed_cached([query])
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
# Chercher plus de résultats que top_k pour pouvoir filtrer ensuite # Chercher plus de résultats que top_k pour pouvoir filtrer ensuite
fetch_k = min(top_k * 2, faiss_index.ntotal) fetch_k = min(top_k * 2, faiss_index.ntotal)
@@ -218,9 +256,7 @@ def search_similar_ccam(query: str, top_k: int = 8) -> list[dict]:
faiss_index, metadata = result faiss_index, metadata = result
model = _get_embed_model() query_vec = _embed_cached([query])
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
fetch_k = min(top_k * 2, faiss_index.ntotal) fetch_k = min(top_k * 2, faiss_index.ntotal)
scores, indices = faiss_index.search(query_vec, fetch_k) scores, indices = faiss_index.search(query_vec, fetch_k)
@@ -268,9 +304,7 @@ def search_similar_cpam(query: str, top_k: int = 8) -> list[dict]:
logger.warning("Index FAISS non disponible") logger.warning("Index FAISS non disponible")
return [] return []
model = _get_embed_model() query_vec = _embed_cached([query])
query_vec = model.encode([query], normalize_embeddings=True)
query_vec = np.array(query_vec, dtype=np.float32)
def _search_one(result_tuple, fetch_mult: int) -> list[dict]: def _search_one(result_tuple, fetch_mult: int) -> list[dict]:
if result_tuple is None: if result_tuple is None:

View File

@@ -212,6 +212,82 @@ def _remove_das_equal_dp(dossier: DossierMedical) -> None:
dossier.diagnostics_associes = apply_semantic_dedup(dossier.diagnostics_associes) dossier.diagnostics_associes = apply_semantic_dedup(dossier.diagnostics_associes)
# Mapping code CIM-10 → analytes biologiques attendus comme anormaux
# Si le code est codé en DAS mais que la biologie ne montre pas d'anomalie,
# on ajoute une alerte qualité.
_BIO_DAS_COHERENCE: dict[str, list[tuple[str, str]]] = {
# Anémie → Hémoglobine basse
"D64": [("Hémoglobine", "low")],
"D50": [("Hémoglobine", "low"), ("Ferritine", "low")],
# Insuffisance rénale aiguë → Créatinine élevée
"N17": [("Créatinine", "high")],
"N18": [("Créatinine", "high")],
"N19": [("Créatinine", "high")],
# Hyperkaliémie → Potassium élevé
"E87.5": [("Potassium", "high")],
# Hyponatrémie
"E87.1": [("Sodium", "low")],
# Hypokaliémie
"E87.6": [("Potassium", "low")],
# Thrombopénie
"D69.6": [("Plaquettes", "low")],
"D69.5": [("Plaquettes", "low")],
# Hyperglycémie / Diabète décompensé
"E10": [("Glycémie", "high")],
"E11": [("Glycémie", "high")],
"R73": [("Glycémie", "high")],
# Syndrome inflammatoire
"R65": [("CRP", "high")],
# Hypothyroïdie
"E03": [("TSH", "high")],
# Hyperthyroïdie
"E05": [("TSH", "low")],
# Insuffisance hépatique
"K72": [("ALAT", "high")],
"K71": [("ALAT", "high")],
}
def _validate_bio_das_coherence(dossier: DossierMedical) -> None:
"""Alerte quand un DAS implique une anomalie biologique non retrouvée."""
from .bio_normals import BIO_NORMALS, _is_abnormal
if not dossier.biologie_cle:
return
# Indexer la biologie du dossier : analyte → (valeur, anomalie)
bio_index: dict[str, tuple[str, bool | None]] = {}
for bio in dossier.biologie_cle:
abnormal = _is_abnormal(bio.test, bio.valeur)
bio_index[bio.test] = (bio.valeur, abnormal)
all_diags = []
if dossier.diagnostic_principal:
all_diags.append(("DP", dossier.diagnostic_principal))
for das in dossier.diagnostics_associes:
all_diags.append(("DAS", das))
for role, diag in all_diags:
code = diag.cim10_suggestion
if not code:
continue
# Chercher par code exact puis par racine 3 chars
expected = _BIO_DAS_COHERENCE.get(code) or _BIO_DAS_COHERENCE.get(code[:3])
if not expected:
continue
for analyte, direction in expected:
if analyte not in bio_index:
continue
valeur, is_abnormal = bio_index[analyte]
if is_abnormal is False:
lo, hi = BIO_NORMALS.get(analyte, (None, None))
range_str = f" (N: {lo}-{hi})" if lo is not None else ""
dossier.alertes_codage.append(
f"BIO-COHERENCE: {role} {code} ({diag.texte}) attend {analyte} "
f"anormal mais valeur={valeur}{range_str} est normale"
)
def _track_item(item, search_key: str, page_tracker, search_text: str) -> bool: def _track_item(item, search_key: str, page_tracker, search_text: str) -> bool:
"""Cherche la page source et l'extrait pour un item avec source_page/source_excerpt.""" """Cherche la page source et l'extrait pour un item avec source_page/source_excerpt."""
if item.source_page is not None: if item.source_page is not None:

View File

@@ -433,15 +433,27 @@ Réponds UNIQUEMENT en JSON :
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
DP_RANKER_CONSTRAINED = """\ DP_RANKER_CONSTRAINED = """\
Tu es un médecin DIM expert en codage PMSI. Tu dois choisir le Diagnostic Principal (DP) \ Tu es un médecin DIM expert en codage PMSI MCO. Tu dois choisir le Diagnostic Principal (DP) \
parmi la liste FERMÉE de {n_candidates} candidats ci-dessous. parmi la liste FERMÉE de {n_candidates} candidats ci-dessous.
RÈGLES STRICTES : DÉFINITION OFFICIELLE DU DP (Guide méthodologique ATIH) :
1. Le DP reflète le MOTIF PRINCIPAL de prise en charge pendant ce séjour Le DP est le motif de prise en charge qui a mobilisé l'essentiel de l'effort médical et \
2. Un acte seul (cholécystectomie, biopsie…) NE PEUT PAS être DP s'il existe un candidat textuel soignant au cours du séjour. Ce n'est PAS nécessairement le diagnostic le plus grave.
3. Un symptôme (R00-R99) NE PEUT PAS être DP si une étiologie candidate existe dans la liste
4. Une comorbidité chronique (HTA, diabète, BPCO) NE PEUT PAS être DP sauf prise en charge ACTIVE RÈGLES PMSI STRICTES :
5. Tu DOIS choisir un index de la liste — JAMAIS de réponse hors liste 1. Le DP = le problème de santé qui a motivé l'admission ET mobilisé l'essentiel des soins
2. Si le patient est admis pour une pathologie aiguë (infection, fracture, embolie...), \
c'est cette pathologie aiguë qui est DP, même si le patient a des comorbidités plus sévères
3. Si le séjour est chirurgical, le DP = la pathologie justifiant l'intervention (ex: \
cholécystite pour cholécystectomie, PAS "cholécystectomie" comme DP)
4. Un symptôme (R00-R99) NE PEUT être DP QUE si aucune étiologie n'a été identifiée
5. Une comorbidité chronique (HTA, diabète, BPCO, FA) NE PEUT PAS être DP sauf si elle \
est le motif DIRECT de l'hospitalisation (ex: décompensation aiguë, poussée)
6. Les codes Z ne sont DP que pour : bilan (Z03/Z04), surveillance (Z08/Z09), \
chimiothérapie (Z51), rééducation (Z50), appareillage (Z43/Z45)
7. En cas de doute entre deux candidats : privilégier celui mentionné dans le diagnostic \
de sortie ou la conclusion du CRH
8. Tu DOIS choisir un index de la liste — JAMAIS de réponse hors liste
CANDIDATS : CANDIDATS :
{candidates_str} {candidates_str}
@@ -455,5 +467,5 @@ Réponds UNIQUEMENT en JSON :
"confidence": "high|medium|low", "confidence": "high|medium|low",
"verdict": "CONFIRMED|REVIEW", "verdict": "CONFIRMED|REVIEW",
"evidence": ["raison 1", "raison 2"], "evidence": ["raison 1", "raison 2"],
"reason": "explication courte justifiant le choix" "reason": "explication courte justifiant le choix selon les règles PMSI"
}}""" }}"""