From 355a33acde6b19098b275d4d74edaac4c5c46c9b Mon Sep 17 00:00:00 2001 From: dom Date: Sun, 8 Mar 2026 14:17:18 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20parall=C3=A9liser=20DAS=5FLLM=20+=20RAG?= =?UTF-8?q?=20DP=20+=20DP=20selector?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Restructure le pipeline dans extract_medical_info() : AVANT : DAS_LLM séquentiel → ThreadPool(RAG complet + DP_selector) APRÈS : Groupe 1 (ThreadPool max_workers=3) : - DAS_LLM : extraction DAS supplémentaires - RAG DP : enrichissement DP seul (via enrich_dp) - DP selector : sélection NUKE-3 Groupe 2 : - enrichissement DAS + actes (via enrich_das_and_actes) Le RAG DP ne dépend pas du DAS_LLM, donc les deux peuvent s'exécuter en parallèle. Le Groupe 2 attend le DAS_LLM car il enrichit les DAS trouvés par celui-ci. Ajoute aussi des timings sur les groupes et la validation QC. Co-Authored-By: Claude Opus 4.6 --- src/medical/cim10_extractor.py | 101 ++++++++++++++++++++++++++------- 1 file changed, 82 insertions(+), 19 deletions(-) diff --git a/src/medical/cim10_extractor.py b/src/medical/cim10_extractor.py index 57f7745..4c6f3d5 100644 --- a/src/medical/cim10_extractor.py +++ b/src/medical/cim10_extractor.py @@ -11,6 +11,7 @@ from __future__ import annotations import logging import re +import time from datetime import datetime from typing import Optional @@ -87,43 +88,71 @@ def extract_medical_info( _extract_imagerie(anonymized_text, dossier) _extract_complications(anonymized_text, dossier, edsnlp_result) - # Phase 4 : pass LLM pour détecter des DAS supplémentaires + # Phase 4 : DAS LLM + RAG DP + DP selector en parallèle + _dp_selection_needed = dossier.document_type != "trackare" + if use_rag: - _extract_das_llm(anonymized_text, dossier) - - # Optimisation #1 : paralléliser enrichissement RAG et sélection DP - _dp_selection_needed = use_rag and dossier.document_type != "trackare" - - if use_rag or _dp_selection_needed: from concurrent.futures import ThreadPoolExecutor, as_completed - def _task_enrich(): - if use_rag: - _enrich_with_rag(dossier) + # --- Groupe 1 : 3 tâches indépendantes en parallèle --- + # - DAS LLM : détecte des DAS supplémentaires (ne dépend pas du RAG DP) + # - RAG DP : enrichit seulement le DP (ne dépend pas du DAS LLM) + # - DP selector : sélectionne le DP optimal (indépendant des deux autres) + + def _task_das_llm(): + t0 = time.monotonic() + _extract_das_llm(anonymized_text, dossier) + elapsed = time.monotonic() - t0 + logger.info("⏱ [DAS-LLM-TASK] %.1fs — extraction DAS LLM terminée", elapsed) + + def _task_rag_dp(): + t0 = time.monotonic() + _enrich_dp_only(dossier) + elapsed = time.monotonic() - t0 + logger.info("⏱ [RAG-DP-TASK] %.1fs — enrichissement DP terminé", elapsed) def _task_select_dp(): if not _dp_selection_needed: return None + t0 = time.monotonic() from .dp_selector import select_dp, build_synthese synthese = build_synthese(dossier, parsed_data) - return select_dp(dossier, synthese, config={"llm_enabled": use_rag}) + result = select_dp(dossier, synthese, config={"llm_enabled": use_rag}) + elapsed = time.monotonic() - t0 + logger.info("⏱ [DP-SELECT] %.1fs — sélection DP terminée", elapsed) + return result dp_selection_result = None - with ThreadPoolExecutor(max_workers=2) as pool: - fut_enrich = pool.submit(_task_enrich) + t_group1 = time.monotonic() + + with ThreadPoolExecutor(max_workers=3) as pool: + fut_das = pool.submit(_task_das_llm) + fut_rag_dp = pool.submit(_task_rag_dp) fut_dp = pool.submit(_task_select_dp) - # Attendre les deux tâches - for fut in as_completed([fut_enrich, fut_dp]): + + for fut in as_completed([fut_das, fut_rag_dp, fut_dp]): exc = fut.exception() if exc and fut is fut_dp: logger.error("NUKE-3: erreur sélection DP", exc_info=exc) dossier.quality_flags["dp_selection_status"] = "error" dossier.alertes_codage.append("QUALITE DEGRADEE : sélection DP (NUKE-3) en erreur") - elif exc: - logger.error("RAG enrichissement échoué", exc_info=exc) + elif exc and fut is fut_rag_dp: + logger.error("RAG enrichissement DP échoué", exc_info=exc) + elif exc and fut is fut_das: + logger.error("DAS LLM extraction échouée", exc_info=exc) + if not fut_dp.exception(): dp_selection_result = fut_dp.result() + elapsed_group1 = time.monotonic() - t_group1 + logger.info("⏱ [GROUPE-1] %.1fs — DAS_LLM + RAG_DP + DP_SELECT terminés", elapsed_group1) + + # --- Groupe 2 : enrichir les DAS (existants + nouveaux du DAS LLM) + actes --- + t_group2 = time.monotonic() + _enrich_das_and_actes(dossier) + elapsed_group2 = time.monotonic() - t_group2 + logger.info("⏱ [GROUPE-2] %.1fs — enrichissement DAS + actes terminé", elapsed_group2) + # Appliquer la sélection DP après parallélisation if dp_selection_result is not None: selection = dp_selection_result @@ -151,12 +180,15 @@ def extract_medical_info( dossier.alertes_codage.append( f"NUKE-3 REVIEW: DP ambigu — {selection.reason}" ) - elif dossier.document_type != "trackare": + elif _dp_selection_needed: # Fallback sans RAG : sélection DP seule try: + t_dp_norag = time.monotonic() from .dp_selector import select_dp, build_synthese synthese = build_synthese(dossier, parsed_data) selection = select_dp(dossier, synthese, config={"llm_enabled": False}) + elapsed_dp = time.monotonic() - t_dp_norag + logger.info("⏱ [DP-SELECT] %.1fs — sélection DP (sans RAG)", elapsed_dp) dossier.dp_selection = selection if selection.chosen_code: current_code = ( @@ -240,7 +272,10 @@ def extract_medical_info( # Post-processing : validation justifications (QC batch) if use_rag: + t_qc = time.monotonic() _validate_justifications(dossier) + elapsed_qc = time.monotonic() - t_qc + logger.info("⏱ [QC-VALIDATION] %.1fs — validation justifications terminée", elapsed_qc) # Post-processing : traçabilité source (page + extrait) if page_tracker: @@ -457,7 +492,7 @@ def _extract_das_llm(text: str, dossier: DossierMedical) -> None: def _enrich_with_rag(dossier: DossierMedical) -> None: - """Enrichit les diagnostics via le RAG (FAISS + Ollama).""" + """Enrichit les diagnostics via le RAG (FAISS + Ollama) — wrapper rétro-compatible.""" try: from .rag_search import enrich_dossier enrich_dossier(dossier) @@ -471,6 +506,34 @@ def _enrich_with_rag(dossier: DossierMedical) -> None: dossier.alertes_codage.append("QUALITE DEGRADEE : erreur RAG — codage sans référentiels") +def _enrich_dp_only(dossier: DossierMedical) -> None: + """Enrichit SEULEMENT le DP via le RAG (Phase 1, parallélisable).""" + try: + from .rag_search import enrich_dp + enrich_dp(dossier) + except ImportError: + logger.error("RAG INDISPONIBLE : faiss-cpu ou sentence-transformers manquant") + dossier.quality_flags["rag_status"] = "unavailable" + dossier.alertes_codage.append("QUALITE DEGRADEE : RAG indisponible — codage sans référentiels") + except Exception: + logger.error("RAG EN ERREUR : enrichissement DP échoué", exc_info=True) + dossier.quality_flags["rag_status"] = "error" + dossier.alertes_codage.append("QUALITE DEGRADEE : erreur RAG DP — codage sans référentiels") + + +def _enrich_das_and_actes(dossier: DossierMedical) -> None: + """Enrichit les DAS et actes CCAM via le RAG (Phase 2, après DAS LLM).""" + try: + from .rag_search import enrich_das_and_actes + enrich_das_and_actes(dossier) + except ImportError: + logger.error("RAG INDISPONIBLE : faiss-cpu ou sentence-transformers manquant") + dossier.quality_flags["rag_status"] = "unavailable" + except Exception: + logger.error("RAG EN ERREUR : enrichissement DAS/actes échoué", exc_info=True) + dossier.quality_flags["rag_status"] = "error" + + def _extract_sejour(parsed: dict, dossier: DossierMedical) -> None: """Extrait les informations de séjour.""" patient = parsed.get("patient", {})