feat: paralléliser DAS_LLM + RAG DP + DP selector

Restructure le pipeline dans extract_medical_info() :

AVANT : DAS_LLM séquentiel → ThreadPool(RAG complet + DP_selector)

APRÈS :
  Groupe 1 (ThreadPool max_workers=3) :
    - DAS_LLM : extraction DAS supplémentaires
    - RAG DP : enrichissement DP seul (via enrich_dp)
    - DP selector : sélection NUKE-3
  Groupe 2 :
    - enrichissement DAS + actes (via enrich_das_and_actes)

Le RAG DP ne dépend pas du DAS_LLM, donc les deux peuvent
s'exécuter en parallèle. Le Groupe 2 attend le DAS_LLM car
il enrichit les DAS trouvés par celui-ci.

Ajoute aussi des timings sur les groupes et la validation QC.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-03-08 14:17:18 +01:00
parent f5a6122495
commit 355a33acde

View File

@@ -11,6 +11,7 @@ from __future__ import annotations
import logging
import re
import time
from datetime import datetime
from typing import Optional
@@ -87,43 +88,71 @@ def extract_medical_info(
_extract_imagerie(anonymized_text, dossier)
_extract_complications(anonymized_text, dossier, edsnlp_result)
# Phase 4 : pass LLM pour détecter des DAS supplémentaires
# Phase 4 : DAS LLM + RAG DP + DP selector en parallèle
_dp_selection_needed = dossier.document_type != "trackare"
if use_rag:
_extract_das_llm(anonymized_text, dossier)
# Optimisation #1 : paralléliser enrichissement RAG et sélection DP
_dp_selection_needed = use_rag and dossier.document_type != "trackare"
if use_rag or _dp_selection_needed:
from concurrent.futures import ThreadPoolExecutor, as_completed
def _task_enrich():
if use_rag:
_enrich_with_rag(dossier)
# --- Groupe 1 : 3 tâches indépendantes en parallèle ---
# - DAS LLM : détecte des DAS supplémentaires (ne dépend pas du RAG DP)
# - RAG DP : enrichit seulement le DP (ne dépend pas du DAS LLM)
# - DP selector : sélectionne le DP optimal (indépendant des deux autres)
def _task_das_llm():
t0 = time.monotonic()
_extract_das_llm(anonymized_text, dossier)
elapsed = time.monotonic() - t0
logger.info("⏱ [DAS-LLM-TASK] %.1fs — extraction DAS LLM terminée", elapsed)
def _task_rag_dp():
t0 = time.monotonic()
_enrich_dp_only(dossier)
elapsed = time.monotonic() - t0
logger.info("⏱ [RAG-DP-TASK] %.1fs — enrichissement DP terminé", elapsed)
def _task_select_dp():
if not _dp_selection_needed:
return None
t0 = time.monotonic()
from .dp_selector import select_dp, build_synthese
synthese = build_synthese(dossier, parsed_data)
return select_dp(dossier, synthese, config={"llm_enabled": use_rag})
result = select_dp(dossier, synthese, config={"llm_enabled": use_rag})
elapsed = time.monotonic() - t0
logger.info("⏱ [DP-SELECT] %.1fs — sélection DP terminée", elapsed)
return result
dp_selection_result = None
with ThreadPoolExecutor(max_workers=2) as pool:
fut_enrich = pool.submit(_task_enrich)
t_group1 = time.monotonic()
with ThreadPoolExecutor(max_workers=3) as pool:
fut_das = pool.submit(_task_das_llm)
fut_rag_dp = pool.submit(_task_rag_dp)
fut_dp = pool.submit(_task_select_dp)
# Attendre les deux tâches
for fut in as_completed([fut_enrich, fut_dp]):
for fut in as_completed([fut_das, fut_rag_dp, fut_dp]):
exc = fut.exception()
if exc and fut is fut_dp:
logger.error("NUKE-3: erreur sélection DP", exc_info=exc)
dossier.quality_flags["dp_selection_status"] = "error"
dossier.alertes_codage.append("QUALITE DEGRADEE : sélection DP (NUKE-3) en erreur")
elif exc:
logger.error("RAG enrichissement échoué", exc_info=exc)
elif exc and fut is fut_rag_dp:
logger.error("RAG enrichissement DP échoué", exc_info=exc)
elif exc and fut is fut_das:
logger.error("DAS LLM extraction échouée", exc_info=exc)
if not fut_dp.exception():
dp_selection_result = fut_dp.result()
elapsed_group1 = time.monotonic() - t_group1
logger.info("⏱ [GROUPE-1] %.1fs — DAS_LLM + RAG_DP + DP_SELECT terminés", elapsed_group1)
# --- Groupe 2 : enrichir les DAS (existants + nouveaux du DAS LLM) + actes ---
t_group2 = time.monotonic()
_enrich_das_and_actes(dossier)
elapsed_group2 = time.monotonic() - t_group2
logger.info("⏱ [GROUPE-2] %.1fs — enrichissement DAS + actes terminé", elapsed_group2)
# Appliquer la sélection DP après parallélisation
if dp_selection_result is not None:
selection = dp_selection_result
@@ -151,12 +180,15 @@ def extract_medical_info(
dossier.alertes_codage.append(
f"NUKE-3 REVIEW: DP ambigu — {selection.reason}"
)
elif dossier.document_type != "trackare":
elif _dp_selection_needed:
# Fallback sans RAG : sélection DP seule
try:
t_dp_norag = time.monotonic()
from .dp_selector import select_dp, build_synthese
synthese = build_synthese(dossier, parsed_data)
selection = select_dp(dossier, synthese, config={"llm_enabled": False})
elapsed_dp = time.monotonic() - t_dp_norag
logger.info("⏱ [DP-SELECT] %.1fs — sélection DP (sans RAG)", elapsed_dp)
dossier.dp_selection = selection
if selection.chosen_code:
current_code = (
@@ -240,7 +272,10 @@ def extract_medical_info(
# Post-processing : validation justifications (QC batch)
if use_rag:
t_qc = time.monotonic()
_validate_justifications(dossier)
elapsed_qc = time.monotonic() - t_qc
logger.info("⏱ [QC-VALIDATION] %.1fs — validation justifications terminée", elapsed_qc)
# Post-processing : traçabilité source (page + extrait)
if page_tracker:
@@ -457,7 +492,7 @@ def _extract_das_llm(text: str, dossier: DossierMedical) -> None:
def _enrich_with_rag(dossier: DossierMedical) -> None:
"""Enrichit les diagnostics via le RAG (FAISS + Ollama)."""
"""Enrichit les diagnostics via le RAG (FAISS + Ollama) — wrapper rétro-compatible."""
try:
from .rag_search import enrich_dossier
enrich_dossier(dossier)
@@ -471,6 +506,34 @@ def _enrich_with_rag(dossier: DossierMedical) -> None:
dossier.alertes_codage.append("QUALITE DEGRADEE : erreur RAG — codage sans référentiels")
def _enrich_dp_only(dossier: DossierMedical) -> None:
"""Enrichit SEULEMENT le DP via le RAG (Phase 1, parallélisable)."""
try:
from .rag_search import enrich_dp
enrich_dp(dossier)
except ImportError:
logger.error("RAG INDISPONIBLE : faiss-cpu ou sentence-transformers manquant")
dossier.quality_flags["rag_status"] = "unavailable"
dossier.alertes_codage.append("QUALITE DEGRADEE : RAG indisponible — codage sans référentiels")
except Exception:
logger.error("RAG EN ERREUR : enrichissement DP échoué", exc_info=True)
dossier.quality_flags["rag_status"] = "error"
dossier.alertes_codage.append("QUALITE DEGRADEE : erreur RAG DP — codage sans référentiels")
def _enrich_das_and_actes(dossier: DossierMedical) -> None:
"""Enrichit les DAS et actes CCAM via le RAG (Phase 2, après DAS LLM)."""
try:
from .rag_search import enrich_das_and_actes
enrich_das_and_actes(dossier)
except ImportError:
logger.error("RAG INDISPONIBLE : faiss-cpu ou sentence-transformers manquant")
dossier.quality_flags["rag_status"] = "unavailable"
except Exception:
logger.error("RAG EN ERREUR : enrichissement DAS/actes échoué", exc_info=True)
dossier.quality_flags["rag_status"] = "error"
def _extract_sejour(parsed: dict, dossier: DossierMedical) -> None:
"""Extrait les informations de séjour."""
patient = parsed.get("patient", {})