From 3c070f3c1df3961219ebcc86b6314aecf4245b20 Mon Sep 17 00:00:00 2001 From: dom Date: Fri, 20 Feb 2026 10:06:26 +0100 Subject: [PATCH] =?UTF-8?q?refactor:=20split=20cpam=5Fresponse=20=E2=86=92?= =?UTF-8?q?=20cpam=5Frag,=20cpam=5Fcontext,=20cpam=5Fvalidation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Découpe le monolithe cpam_response.py (1207L) en 3 modules spécialisés : - cpam_rag.py : recherche RAG ciblée (5 requêtes, dédup) - cpam_context.py : construction prompt, définitions CIM-10, bio summary - cpam_validation.py : grounding, références, codes fermée, adversariale Le cpam_response.py reste orchestrateur (~230L) avec re-exports backward-compat. Mocks des tests mis à jour pour cibler les bons modules. Ajout RULE-CPAM-CORRECTION-LOOP dans base.yaml. 748 tests passent. Co-Authored-By: Claude Opus 4.6 --- config/rules/base.yaml | 3 + src/control/cpam_context.py | 532 ++++++++++++++++++++ src/control/cpam_rag.py | 139 ++++++ src/control/cpam_response.py | 886 +++------------------------------ src/control/cpam_validation.py | 376 ++++++++++++++ tests/test_cpam_response.py | 450 +++++++++++++++-- 6 files changed, 1553 insertions(+), 833 deletions(-) create mode 100644 src/control/cpam_context.py create mode 100644 src/control/cpam_rag.py create mode 100644 src/control/cpam_validation.py diff --git a/config/rules/base.yaml b/config/rules/base.yaml index e383fc7..c4a3edf 100644 --- a/config/rules/base.yaml +++ b/config/rules/base.yaml @@ -54,6 +54,9 @@ packs: RULE-DAS-TO-DP: enabled: true description: "DAS promu en DP si aucun DP extrait — sélection par pertinence/confiance/spécificité" + RULE-CPAM-CORRECTION-LOOP: + enabled: true + description: "Boucle de correction quand validation adversariale score ≤ 5/10" bio_electrolytes: enabled: true diff --git a/src/control/cpam_context.py b/src/control/cpam_context.py new file mode 100644 index 0000000..4b93c6f --- /dev/null +++ b/src/control/cpam_context.py @@ -0,0 +1,532 @@ +"""Construction du contexte et du prompt pour la contre-argumentation CPAM.""" + +from __future__ import annotations + +import logging +import re + +from ..config import ControleCPAM, DossierMedical +from ..medical.bio_normals import BIO_NORMALS +from ..medical.cim10_dict import normalize_code, validate_code +from ..prompts import CPAM_ARGUMENTATION + +logger = logging.getLogger(__name__) + + +def _get_code_label(code_str: str) -> str: + """Résout le libellé CIM-10 pour un ou plusieurs codes.""" + codes = re.split(r"[,;\s]+", code_str.strip()) + labels = [] + for raw in codes: + raw = raw.strip() + if not raw: + continue + norm = normalize_code(raw) + is_valid, label = validate_code(norm) + if is_valid and label: + labels.append(f"{norm} — {label}") + else: + labels.append(norm) + if not labels: + return "" + if len(labels) == 1: + parts = labels[0].split(" — ", 1) + return f" — {parts[1]}" if len(parts) > 1 else "" + return "\n " + "\n ".join(labels) + + +def _get_cim10_definitions( + dossier: DossierMedical, + controle: ControleCPAM, +) -> str: + """Construit une section de définitions CIM-10 déterministes pour tous les codes en jeu. + + Collecte les codes depuis : + - Le dossier : DP (cim10_suggestion) + DAS (cim10_suggestion) + - L'UCR : dp_ucr, da_ucr, dr_ucr + + Returns: + Texte formaté pour injection dans le prompt, ou "" si aucun code résolu. + """ + codes_seen: dict[str, str] = {} # code normalisé → rôle (pour affichage) + + # Codes du dossier (établissement) + if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: + code = dossier.diagnostic_principal.cim10_suggestion + codes_seen[normalize_code(code)] = "DP établissement" + for das in dossier.diagnostics_associes: + if das.cim10_suggestion: + norm = normalize_code(das.cim10_suggestion) + if norm not in codes_seen: + codes_seen[norm] = "DAS établissement" + + # Codes de l'UCR (CPAM) + for field, role in [ + (controle.dp_ucr, "DP proposé UCR"), + (controle.da_ucr, "DA proposé UCR"), + (controle.dr_ucr, "DR proposé UCR"), + ]: + if not field: + continue + for raw in re.split(r"[,;\s]+", field.strip()): + raw = raw.strip() + if not raw: + continue + norm = normalize_code(raw) + if norm not in codes_seen: + codes_seen[norm] = role + + if not codes_seen: + return "" + + # Résoudre les libellés + lines = [] + for norm_code, role in codes_seen.items(): + is_valid, label = validate_code(norm_code) + if is_valid and label: + lines.append(f" {norm_code} — {label} [{role}]") + else: + lines.append(f" {norm_code} — (code non trouvé dans le dictionnaire) [{role}]") + + if not lines: + return "" + + return ( + "\nDÉFINITIONS CIM-10 — RÉFÉRENCE (source : dictionnaire officiel) :\n" + + "\n".join(lines) + ) + + +def _build_tagged_context(dossier: DossierMedical) -> tuple[str, dict[str, str]]: + """Construit un contexte clinique avec des tags de référence pour le grounding. + + Chaque élément clinique reçoit un tag unique ([BIO-1], [IMG-1], [TRT-1], [ACTE-1]) + que le LLM doit citer dans ses preuves pour garantir la traçabilité. + + Returns: + (texte tagué pour injection dans le prompt, dict tag → contenu original) + """ + tag_map: dict[str, str] = {} + lines: list[str] = [] + + # Biologie (avec normes de référence pour éviter les hallucinations) + for i, b in enumerate(dossier.biologie_cle, 1): + if not b.valeur: + continue + tag = f"BIO-{i}" + # Interpréter la valeur par rapport aux normes connues + norm_info = "" + if b.test in BIO_NORMALS: + lo, hi = BIO_NORMALS[b.test] + try: + val = float(b.valeur.replace(",", ".").split()[0]) + if val > hi: + norm_info = f" — ÉLEVÉ (norme {lo}-{hi})" + elif val < lo: + norm_info = f" — BAS (norme {lo}-{hi})" + else: + norm_info = f" — NORMAL (norme {lo}-{hi})" + except (ValueError, AttributeError): + pass + content = f"{b.test}: {b.valeur}{norm_info}" + tag_map[tag] = content + lines.append(f" [{tag}] {content}") + + # Imagerie + for i, im in enumerate(dossier.imagerie, 1): + tag = f"IMG-{i}" + conclusion = f" — {im.conclusion}" if im.conclusion else "" + content = f"{im.type}{conclusion}" + tag_map[tag] = content + lines.append(f" [{tag}] {content}") + + # Traitements + for i, t in enumerate(dossier.traitements_sortie[:10], 1): + tag = f"TRT-{i}" + posologie = f" {t.posologie}" if t.posologie else "" + content = f"{t.medicament}{posologie}" + tag_map[tag] = content + lines.append(f" [{tag}] {content}") + + # Actes CCAM + for i, a in enumerate(dossier.actes_ccam, 1): + tag = f"ACTE-{i}" + code = f" ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else "" + content = f"{a.texte}{code}" + tag_map[tag] = content + lines.append(f" [{tag}] {content}") + + if not lines: + return "", tag_map + + text = "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS (cite le tag [XX-N] dans tes preuves) :\n" + "\n".join(lines) + return text, tag_map + + +# Interprétations cliniques pour le résumé bio déterministe +_BIO_INTERPRETATION: dict[str, dict[str, str]] = { + "CRP": {"high": "infection/inflammation active", "low": "normal", "normal": "pas d'inflammation"}, + "Hémoglobine": {"high": "polyglobulie", "low": "anémie", "normal": "pas d'anémie"}, + "Plaquettes": {"high": "thrombocytose", "low": "thrombopénie", "normal": "numération normale"}, + "Leucocytes": {"high": "hyperleucocytose", "low": "leucopénie", "normal": "numération normale"}, + "Créatinine": {"high": "insuffisance rénale", "low": "normal", "normal": "fonction rénale conservée"}, + "Potassium": {"high": "hyperkaliémie", "low": "hypokaliémie", "normal": "kaliémie normale"}, + "Sodium": {"high": "hypernatrémie", "low": "hyponatrémie", "normal": "natrémie normale"}, + "Lipasémie": {"high": "pancréatite probable", "low": "normal", "normal": "pas de pancréatite"}, + "ASAT": {"high": "cytolyse hépatique", "low": "normal", "normal": "pas de cytolyse"}, + "ALAT": {"high": "cytolyse hépatique", "low": "normal", "normal": "pas de cytolyse"}, + "GGT": {"high": "cholestase/atteinte hépatique", "low": "normal", "normal": "pas de cholestase"}, + "Bilirubine totale": {"high": "ictère/cholestase", "low": "normal", "normal": "pas d'ictère"}, +} + + +def _build_bio_summary(dossier: DossierMedical) -> str: + """Construit un résumé biologique déterministe à injecter dans le prompt. + + Chaque valeur bio est interprétée contre BIO_NORMALS avec une conclusion + non ambiguë que le LLM ne doit pas modifier. + + Returns: + Texte formaté ou "" si aucune biologie exploitable. + """ + if not dossier.biologie_cle: + return "" + + lines: list[str] = [] + for b in dossier.biologie_cle: + if not b.valeur or b.test not in BIO_NORMALS: + continue + try: + val = float(b.valeur.replace(",", ".").split()[0]) + except (ValueError, AttributeError): + continue + + lo, hi = BIO_NORMALS[b.test] + if val > hi: + status = "ÉLEVÉ" + interp_key = "high" + elif val < lo: + status = "BAS" + interp_key = "low" + else: + status = "NORMAL" + interp_key = "normal" + + interp = _BIO_INTERPRETATION.get(b.test, {}).get(interp_key, "") + interp_str = f" — {interp}" if interp else "" + lines.append(f" ✓ {b.test} = {b.valeur} → {status} (norme {lo}-{hi}){interp_str}") + + if not lines: + return "" + + return ( + "FAITS BIOLOGIQUES VÉRIFIÉS (NE PAS MODIFIER ces interprétations) :\n" + + "\n".join(lines) + + "\n\nRÈGLE STRICTE : si tu cites une valeur biologique, tu DOIS utiliser " + "l'interprétation ci-dessus.\n" + "Ne qualifie JAMAIS une valeur NORMAL comme pathologique, " + "ni une valeur ÉLEVÉ/BAS comme normale." + ) + + +def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]: + """Vérifie la cohérence entre les textes DAS et les valeurs biologiques. + + Détecte les contradictions comme "leucocytose" dans un DAS alors que + les leucocytes sont bas, ou "anémie" alors que l'hémoglobine est normale. + + Returns: + Liste de warnings pour les incohérences détectées. + """ + if not dossier.diagnostics_associes or not dossier.biologie_cle: + return [] + + # Patterns DAS → (test bio attendu, direction attendue) + _DAS_BIO_CHECKS: dict[str, tuple[str, str]] = { + "leucocytose": ("Leucocytes", "high"), + "leucopénie": ("Leucocytes", "low"), + "leucopenie": ("Leucocytes", "low"), + "thrombocytose": ("Plaquettes", "high"), + "thrombocytopénie": ("Plaquettes", "low"), + "thrombocytopenie": ("Plaquettes", "low"), + "thrombopénie": ("Plaquettes", "low"), + "thrombopenie": ("Plaquettes", "low"), + "anémie": ("Hémoglobine", "low"), + "anemie": ("Hémoglobine", "low"), + "polyglobulie": ("Hémoglobine", "high"), + "hyperkaliémie": ("Potassium", "high"), + "hypokaliémie": ("Potassium", "low"), + } + + # Indexer les valeurs bio disponibles + bio_values: dict[str, float] = {} + for b in dossier.biologie_cle: + if b.test and b.valeur: + try: + bio_values[b.test] = float(b.valeur.replace(",", ".").split()[0]) + except (ValueError, AttributeError): + pass + + warnings: list[str] = [] + for das in dossier.diagnostics_associes: + texte_lower = (das.texte or "").lower() + for pattern, (bio_test, direction) in _DAS_BIO_CHECKS.items(): + if pattern not in texte_lower: + continue + if bio_test not in bio_values or bio_test not in BIO_NORMALS: + continue + val = bio_values[bio_test] + lo, hi = BIO_NORMALS[bio_test] + if direction == "high" and val <= hi: + warnings.append( + f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " + f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" + ) + elif direction == "low" and val >= lo: + warnings.append( + f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " + f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" + ) + + if warnings: + for w in warnings: + logger.warning(" DAS/bio : %s", w) + + return warnings + + +def _build_cpam_prompt( + dossier: DossierMedical, + controle: ControleCPAM, + sources: list[dict], + extraction: dict | None = None, +) -> tuple[str, dict[str, str]]: + """Construit le prompt pour la contre-argumentation CPAM. + + Args: + extraction: Résultat optionnel de la passe 1 (extraction structurée). + + Returns: + (prompt texte, tag_map pour validation grounding) + """ + # Résumé du dossier médical + dossier_lines = [] + + if dossier.diagnostic_principal: + dp = dossier.diagnostic_principal + dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else "" + dossier_lines.append(f"- DP : {dp.texte}{dp_code}") + elif controle.dp_ucr: + dp_label = _get_code_label(controle.dp_ucr) + dossier_lines.append( + f"- DP : code {controle.dp_ucr}{dp_label} " + f"(codé par l'établissement, contesté par la CPAM)" + ) + + if dossier.diagnostics_associes: + das_parts = [] + for das in dossier.diagnostics_associes: + code = f" ({das.cim10_suggestion})" if das.cim10_suggestion else "" + das_parts.append(f"{das.texte}{code}") + dossier_lines.append(f"- DAS : {', '.join(das_parts)}") + + if dossier.actes_ccam: + actes = [f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte + for a in dossier.actes_ccam] + dossier_lines.append(f"- Actes CCAM : {', '.join(actes)}") + + sejour = dossier.sejour + if sejour.duree_sejour is not None: + dossier_lines.append(f"- Durée séjour : {sejour.duree_sejour} jours") + if sejour.sexe or sejour.age is not None: + patient_info = [] + if sejour.sexe: + patient_info.append(sejour.sexe) + if sejour.age is not None: + patient_info.append(f"{sejour.age} ans") + if sejour.age < 18: + patient_info.append("(PÉDIATRIE — codage pédiatrique applicable)") + elif sejour.age >= 80: + patient_info.append("(patient âgé — comorbidités fréquentes)") + dossier_lines.append(f"- Patient : {', '.join(patient_info)}") + if sejour.mode_entree: + mode_label = sejour.mode_entree + if "urgence" in mode_label.lower() or "urgent" in mode_label.lower(): + dossier_lines.append(f"- Mode d'entrée : {mode_label} (ADMISSION EN URGENCE)") + else: + dossier_lines.append(f"- Mode d'entrée : {mode_label}") + if sejour.mode_sortie: + dossier_lines.append(f"- Mode de sortie : {sejour.mode_sortie}") + if sejour.imc is not None: + dossier_lines.append(f"- IMC : {sejour.imc}") + + if dossier.biologie_cle: + bio = [f"{b.test}: {b.valeur}" for b in dossier.biologie_cle[:5] if b.valeur] + if bio: + dossier_lines.append(f"- Biologie clé : {', '.join(bio)}") + + if dossier.imagerie: + img_parts = [] + for im in dossier.imagerie: + conclusion = f" — {im.conclusion}" if im.conclusion else "" + img_parts.append(f"{im.type}{conclusion}") + dossier_lines.append(f"- Imagerie : {', '.join(img_parts)}") + + if dossier.traitements_sortie: + trt_parts = [] + for t in dossier.traitements_sortie[:10]: + posologie = f" {t.posologie}" if t.posologie else "" + trt_parts.append(f"{t.medicament}{posologie}") + dossier_lines.append(f"- Traitements de sortie : {', '.join(trt_parts)}") + + if dossier.antecedents: + dossier_lines.append(f"- Antécédents : {', '.join(a.texte for a in dossier.antecedents[:10])}") + + if dossier.complications: + dossier_lines.append(f"- Complications : {', '.join(c.texte for c in dossier.complications)}") + + dossier_str = "\n".join(dossier_lines) if dossier_lines else "Non disponible" + + # Section asymétrie : éléments que la CPAM n'avait pas + asymetrie_lines = [] + + if dossier.biologie_cle: + bio_details = [] + for b in dossier.biologie_cle if len(dossier.biologie_cle) <= 10 else dossier.biologie_cle[:10]: + anomalie = " (anormale)" if b.anomalie else "" + if b.valeur: + bio_details.append(f"{b.test}: {b.valeur}{anomalie}") + if bio_details: + asymetrie_lines.append(f"- Biologie : {', '.join(bio_details)}") + + if dossier.imagerie: + img_details = [] + for im in dossier.imagerie: + conclusion = f" — {im.conclusion}" if im.conclusion else "" + img_details.append(f"{im.type}{conclusion}") + if img_details: + asymetrie_lines.append(f"- Imagerie : {', '.join(img_details)}") + + if dossier.traitements_sortie: + trt_details = [] + for t in dossier.traitements_sortie[:10]: + posologie = f" {t.posologie}" if t.posologie else "" + trt_details.append(f"{t.medicament}{posologie}") + if trt_details: + asymetrie_lines.append(f"- Traitements : {', '.join(trt_details)}") + + if dossier.actes_ccam: + actes_details = [ + f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte + for a in dossier.actes_ccam + ] + if actes_details: + asymetrie_lines.append(f"- Actes CCAM : {', '.join(actes_details)}") + + asymetrie_str = "" + if asymetrie_lines: + asymetrie_str = ( + "\n\nÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM " + "(l'UCR n'a eu que le CRH et les codes) :\n" + + "\n".join(asymetrie_lines) + ) + + # Codes contestés par la CPAM (avec libellés CIM-10 résolus) + codes_contestes = [] + if controle.dp_ucr: + codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}") + if controle.da_ucr: + codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}") + if controle.dr_ucr: + codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}") + if controle.actes_ucr: + codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}") + codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé" + + # Définitions CIM-10 déterministes (tous les codes en jeu) + definitions_str = _get_cim10_definitions(dossier, controle) + + # Contexte clinique tagué pour le grounding + tagged_context, tag_map = _build_tagged_context(dossier) + if tagged_context: + tagged_str = f"\n\n{tagged_context}" + else: + tagged_str = ( + "\n\nATTENTION — DOSSIER PAUVRE EN ÉLÉMENTS CLINIQUES :\n" + "Aucune biologie, imagerie, traitement ou acte CCAM disponible.\n" + "Ne spécule PAS sur des éléments absents. Signale explicitement " + "le manque de données au lieu d'inventer des preuves." + ) + + # Résumé biologique déterministe (interprétations non modifiables par le LLM) + bio_summary = _build_bio_summary(dossier) + if bio_summary: + tagged_str += f"\n\n{bio_summary}" + + # Vérification cohérence DAS / biologie + das_bio_warnings = _check_das_bio_coherence(dossier) + if das_bio_warnings: + tagged_str += ( + "\n\nALERTES COHÉRENCE DAS / BIOLOGIE (incohérences détectées dans le dossier) :\n" + + "\n".join(f" - {w}" for w in das_bio_warnings) + + "\n Prends en compte ces incohérences dans ton analyse." + ) + + # Sources RAG + sources_text = "" + for i, src in enumerate(sources, 1): + doc_name = { + "cim10": "CIM-10 FR 2026", + "cim10_alpha": "CIM-10 Index Alphabétique 2026", + "guide_methodo": "Guide Méthodologique MCO 2026", + "ccam": "CCAM PMSI V4 2025", + }.get(src.get("document", ""), src.get("document", "")) + + code_info = f" (code: {src['code']})" if src.get("code") else "" + page_info = f" [page {src['page']}]" if src.get("page") else "" + + sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n" + sources_text += (src.get("extrait", "")[:800]) + "\n\n" + + # Section pré-analyse (résultat passe 1, si disponible) + extraction_str = "" + if extraction: + ext_lines = [] + comp = extraction.get("comprehension_contestation") + if comp: + ext_lines.append(f"Compréhension : {comp}") + elems = extraction.get("elements_cliniques_pertinents", []) + if elems and isinstance(elems, list): + elem_strs = [] + for e in elems: + if isinstance(e, dict): + elem_strs.append(f" - [{e.get('tag', '?')}] {e.get('pertinence', '')}") + if elem_strs: + ext_lines.append("Éléments pertinents :\n" + "\n".join(elem_strs)) + accords = extraction.get("points_accord_potentiels", []) + if accords and isinstance(accords, list): + ext_lines.append("Points d'accord potentiels : " + " ; ".join(str(a) for a in accords)) + codes = extraction.get("codes_en_jeu", {}) + if codes and isinstance(codes, dict): + diff = codes.get("difference_cle", "") + if diff: + ext_lines.append(f"Différence clé entre les codages : {diff}") + if ext_lines: + extraction_str = ( + "\nPRÉ-ANALYSE (extraction automatique — à utiliser comme base) :\n" + + "\n".join(ext_lines) + ) + + prompt = CPAM_ARGUMENTATION.format( + dossier_str=dossier_str, + asymetrie_str=asymetrie_str, + tagged_str=tagged_str, + titre=controle.titre, + arg_ucr=controle.arg_ucr, + decision_ucr=controle.decision_ucr, + codes_str=codes_str, + definitions_str=definitions_str, + sources_text=sources_text, + extraction_str=extraction_str, + ) + return prompt, tag_map diff --git a/src/control/cpam_rag.py b/src/control/cpam_rag.py new file mode 100644 index 0000000..2c6deb2 --- /dev/null +++ b/src/control/cpam_rag.py @@ -0,0 +1,139 @@ +"""Recherche RAG ciblée pour la contre-argumentation CPAM.""" + +from __future__ import annotations + +import logging +import re + +from ..config import ControleCPAM, DossierMedical +from ..medical.cim10_dict import normalize_code, validate_code + +logger = logging.getLogger(__name__) + + +def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> list[dict]: + """Recherche RAG ciblée pour le sujet du désaccord. + + Effectue 2-5 recherches ciblées au lieu d'une requête fourre-tout : + 1. Codes contestés → règles de codage spécifiques + 2. Argument CPAM → passages Guide Méthodo contradictoires + 3. Contexte clinique (optionnel) → définitions CIM-10 des codes en jeu + 4. Définitions CIM-10 des codes contestés + 5. Règles explicitement citées dans l'argument CPAM + + Retourne [] si le RAG est indisponible (index absent, modèle embedding + inaccessible, etc.) — la contre-argumentation sera générée sans sources. + """ + try: + from ..medical.rag_search import search_similar_cpam + except Exception: + logger.warning("Index RAG non disponible pour la contre-argumentation") + return [] + + try: + return _search_rag_queries(controle, dossier, search_similar_cpam) + except Exception: + logger.warning("Erreur RAG pour la contre-argumentation — génération sans sources", + exc_info=True) + return [] + + +def _search_rag_queries( + controle: ControleCPAM, + dossier: DossierMedical, + search_similar_cpam, +) -> list[dict]: + """Exécute les requêtes RAG (séparé pour permettre un try/except global).""" + all_results: list[dict] = [] + + # Requête 1 — Codes contestés (règles de codage) + if controle.dp_ucr or controle.da_ucr: + query_parts = [] + if controle.dp_ucr: + query_parts.append(f"règles codage {controle.dp_ucr} diagnostic principal") + if controle.da_ucr: + query_parts.append(f"diagnostic associé significatif {controle.da_ucr} CMA") + query_codes = " ".join(query_parts) + results_codes = search_similar_cpam(query_codes, top_k=6) + logger.debug(" RAG requête codes : %d résultats", len(results_codes)) + all_results.extend(results_codes) + + # Requête 2 — Argument CPAM (recherche dans le Guide Méthodo) + query_parts_arg = [] + if controle.titre: + query_parts_arg.append(controle.titre) + arg_short = controle.arg_ucr[:500] if controle.arg_ucr else "" + if arg_short: + query_parts_arg.append(arg_short) + query_arg = " ".join(query_parts_arg) + if query_arg.strip(): + results_arg = search_similar_cpam(query_arg, top_k=6) + logger.debug(" RAG requête argument : %d résultats", len(results_arg)) + all_results.extend(results_arg) + + # Requête 3 — Contexte clinique (définitions CIM-10 des codes en jeu) + if controle.da_ucr and dossier.diagnostic_principal: + dp_text = dossier.diagnostic_principal.texte + das_texts = [ + d.texte for d in dossier.diagnostics_associes + if d.cim10_suggestion and controle.da_ucr + and d.cim10_suggestion in controle.da_ucr + ] + if das_texts: + query_clinique = f"{dp_text} {' '.join(das_texts)}" + results_clinique = search_similar_cpam(query_clinique, top_k=4) + logger.debug(" RAG requête clinique : %d résultats", len(results_clinique)) + all_results.extend(results_clinique) + + # Requête 4 — Définitions CIM-10 des codes contestés + contested_codes = [] + for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): + if field: + contested_codes.extend(re.split(r"[,;\s]+", field.strip())) + for raw_code in contested_codes: + raw_code = raw_code.strip() + if not raw_code: + continue + norm = normalize_code(raw_code) + is_valid, label = validate_code(norm) + if is_valid and label: + query_def = f"CIM-10 {norm} {label} définition inclusion exclusion" + else: + query_def = f"CIM-10 {norm} définition codage" + results_def = search_similar_cpam(query_def, top_k=3) + logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def)) + all_results.extend(results_def) + + # Requête 5 — Règles explicitement citées dans l'argument CPAM + if controle.arg_ucr: + rule_patterns = [ + r'(?:R[eè]gle\s*T?\s*\d+)', + r'(?:Annexe[\s-]*\d+[A-Za-z]*)', + r'(?:Situation de soins?\s+[^.]{5,40})', + ] + rules_found = [] + for pattern in rule_patterns: + rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE)) + if rules_found: + rules_unique = list(dict.fromkeys(rules_found)) + query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI" + results_rules = search_similar_cpam(query_rules, top_k=4) + logger.debug(" RAG requête règles (%s) : %d résultats", + ", ".join(rules_unique), len(results_rules)) + all_results.extend(results_rules) + + if not all_results: + return [] + + # Fusion : dédupliquer par (document, code, page), garder le meilleur score + seen: dict[tuple, dict] = {} + for r in all_results: + key = (r.get("document"), r.get("code"), r.get("page")) + if key in seen: + if r["score"] > seen[key]["score"]: + seen[key] = r + else: + seen[key] = r + + merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True) + return merged[:12] diff --git a/src/control/cpam_response.py b/src/control/cpam_response.py index 259f864..39c9343 100644 --- a/src/control/cpam_response.py +++ b/src/control/cpam_response.py @@ -1,815 +1,48 @@ -"""Génération de contre-argumentation pour les contrôles CPAM via RAG + Ollama.""" +"""Génération de contre-argumentation pour les contrôles CPAM via RAG + Ollama. + +Orchestrateur principal — délègue aux sous-modules : + - cpam_rag : _search_rag_for_control(), _search_rag_queries() + - cpam_context : _build_cpam_prompt(), _build_tagged_context(), _build_bio_summary(), etc. + - cpam_validation : _validate_adversarial(), _validate_grounding(), _format_response(), etc. +""" from __future__ import annotations import logging -import re -from ..config import ControleCPAM, DossierMedical, RAGSource -from ..medical.cim10_dict import normalize_code, validate_code -from ..medical.cim10_extractor import BIO_NORMALS +from ..config import ControleCPAM, DossierMedical, RAGSource, rule_enabled from ..medical.ollama_client import call_anthropic, call_ollama -from ..prompts import CPAM_EXTRACTION, CPAM_ARGUMENTATION, CPAM_ADVERSARIAL +from ..prompts import CPAM_EXTRACTION + +# --- Imports depuis les sous-modules --- +from .cpam_rag import _search_rag_for_control +from .cpam_context import ( + _build_cpam_prompt, + _build_tagged_context, +) +from .cpam_validation import ( + _validate_adversarial, + _validate_grounding, + _validate_references, + _validate_codes_in_response, + _build_correction_prompt, + _format_response, +) + +# Backward compat — sera retiré dans un commit futur +from .cpam_rag import _search_rag_queries # noqa: F401 +from .cpam_context import ( # noqa: F401 + _get_code_label, + _get_cim10_definitions, + _BIO_INTERPRETATION, + _build_bio_summary, + _check_das_bio_coherence, +) +from .cpam_validation import _CIM10_CODE_RE, _validate_adversarial as _validate_adversarial # noqa: F401 logger = logging.getLogger(__name__) -def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> list[dict]: - """Recherche RAG ciblée pour le sujet du désaccord. - - Effectue 2-5 recherches ciblées au lieu d'une requête fourre-tout : - 1. Codes contestés → règles de codage spécifiques - 2. Argument CPAM → passages Guide Méthodo contradictoires - 3. Contexte clinique (optionnel) → définitions CIM-10 des codes en jeu - 4. Définitions CIM-10 des codes contestés - 5. Règles explicitement citées dans l'argument CPAM - - Retourne [] si le RAG est indisponible (index absent, modèle embedding - inaccessible, etc.) — la contre-argumentation sera générée sans sources. - """ - try: - from ..medical.rag_search import search_similar_cpam - except Exception: - logger.warning("Index RAG non disponible pour la contre-argumentation") - return [] - - try: - return _search_rag_queries(controle, dossier, search_similar_cpam) - except Exception: - logger.warning("Erreur RAG pour la contre-argumentation — génération sans sources", - exc_info=True) - return [] - - -def _search_rag_queries( - controle: ControleCPAM, - dossier: DossierMedical, - search_similar_cpam, -) -> list[dict]: - """Exécute les requêtes RAG (séparé pour permettre un try/except global).""" - all_results: list[dict] = [] - - # Requête 1 — Codes contestés (règles de codage) - if controle.dp_ucr or controle.da_ucr: - query_parts = [] - if controle.dp_ucr: - query_parts.append(f"règles codage {controle.dp_ucr} diagnostic principal") - if controle.da_ucr: - query_parts.append(f"diagnostic associé significatif {controle.da_ucr} CMA") - query_codes = " ".join(query_parts) - results_codes = search_similar_cpam(query_codes, top_k=6) - logger.debug(" RAG requête codes : %d résultats", len(results_codes)) - all_results.extend(results_codes) - - # Requête 2 — Argument CPAM (recherche dans le Guide Méthodo) - query_parts_arg = [] - if controle.titre: - query_parts_arg.append(controle.titre) - arg_short = controle.arg_ucr[:500] if controle.arg_ucr else "" - if arg_short: - query_parts_arg.append(arg_short) - query_arg = " ".join(query_parts_arg) - if query_arg.strip(): - results_arg = search_similar_cpam(query_arg, top_k=6) - logger.debug(" RAG requête argument : %d résultats", len(results_arg)) - all_results.extend(results_arg) - - # Requête 3 — Contexte clinique (définitions CIM-10 des codes en jeu) - if controle.da_ucr and dossier.diagnostic_principal: - dp_text = dossier.diagnostic_principal.texte - das_texts = [ - d.texte for d in dossier.diagnostics_associes - if d.cim10_suggestion and controle.da_ucr - and d.cim10_suggestion in controle.da_ucr - ] - if das_texts: - query_clinique = f"{dp_text} {' '.join(das_texts)}" - results_clinique = search_similar_cpam(query_clinique, top_k=4) - logger.debug(" RAG requête clinique : %d résultats", len(results_clinique)) - all_results.extend(results_clinique) - - # Requête 4 — Définitions CIM-10 des codes contestés - contested_codes = [] - for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): - if field: - contested_codes.extend(re.split(r"[,;\s]+", field.strip())) - for raw_code in contested_codes: - raw_code = raw_code.strip() - if not raw_code: - continue - norm = normalize_code(raw_code) - is_valid, label = validate_code(norm) - if is_valid and label: - query_def = f"CIM-10 {norm} {label} définition inclusion exclusion" - else: - query_def = f"CIM-10 {norm} définition codage" - results_def = search_similar_cpam(query_def, top_k=3) - logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def)) - all_results.extend(results_def) - - # Requête 5 — Règles explicitement citées dans l'argument CPAM - if controle.arg_ucr: - rule_patterns = [ - r'(?:R[eè]gle\s*T?\s*\d+)', - r'(?:Annexe[\s-]*\d+[A-Za-z]*)', - r'(?:Situation de soins?\s+[^.]{5,40})', - ] - rules_found = [] - for pattern in rule_patterns: - rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE)) - if rules_found: - rules_unique = list(dict.fromkeys(rules_found)) - query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI" - results_rules = search_similar_cpam(query_rules, top_k=4) - logger.debug(" RAG requête règles (%s) : %d résultats", - ", ".join(rules_unique), len(results_rules)) - all_results.extend(results_rules) - - if not all_results: - return [] - - # Fusion : dédupliquer par (document, code, page), garder le meilleur score - seen: dict[tuple, dict] = {} - for r in all_results: - key = (r.get("document"), r.get("code"), r.get("page")) - if key in seen: - if r["score"] > seen[key]["score"]: - seen[key] = r - else: - seen[key] = r - - merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True) - return merged[:12] - - -def _get_code_label(code_str: str) -> str: - """Résout le libellé CIM-10 pour un ou plusieurs codes.""" - codes = re.split(r"[,;\s]+", code_str.strip()) - labels = [] - for raw in codes: - raw = raw.strip() - if not raw: - continue - norm = normalize_code(raw) - is_valid, label = validate_code(norm) - if is_valid and label: - labels.append(f"{norm} — {label}") - else: - labels.append(norm) - if not labels: - return "" - if len(labels) == 1: - parts = labels[0].split(" — ", 1) - return f" — {parts[1]}" if len(parts) > 1 else "" - return "\n " + "\n ".join(labels) - - -def _get_cim10_definitions( - dossier: DossierMedical, - controle: ControleCPAM, -) -> str: - """Construit une section de définitions CIM-10 déterministes pour tous les codes en jeu. - - Collecte les codes depuis : - - Le dossier : DP (cim10_suggestion) + DAS (cim10_suggestion) - - L'UCR : dp_ucr, da_ucr, dr_ucr - - Returns: - Texte formaté pour injection dans le prompt, ou "" si aucun code résolu. - """ - codes_seen: dict[str, str] = {} # code normalisé → rôle (pour affichage) - - # Codes du dossier (établissement) - if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: - code = dossier.diagnostic_principal.cim10_suggestion - codes_seen[normalize_code(code)] = "DP établissement" - for das in dossier.diagnostics_associes: - if das.cim10_suggestion: - norm = normalize_code(das.cim10_suggestion) - if norm not in codes_seen: - codes_seen[norm] = "DAS établissement" - - # Codes de l'UCR (CPAM) - for field, role in [ - (controle.dp_ucr, "DP proposé UCR"), - (controle.da_ucr, "DA proposé UCR"), - (controle.dr_ucr, "DR proposé UCR"), - ]: - if not field: - continue - for raw in re.split(r"[,;\s]+", field.strip()): - raw = raw.strip() - if not raw: - continue - norm = normalize_code(raw) - if norm not in codes_seen: - codes_seen[norm] = role - - if not codes_seen: - return "" - - # Résoudre les libellés - lines = [] - for norm_code, role in codes_seen.items(): - is_valid, label = validate_code(norm_code) - if is_valid and label: - lines.append(f" {norm_code} — {label} [{role}]") - else: - lines.append(f" {norm_code} — (code non trouvé dans le dictionnaire) [{role}]") - - if not lines: - return "" - - return ( - "\nDÉFINITIONS CIM-10 — RÉFÉRENCE (source : dictionnaire officiel) :\n" - + "\n".join(lines) - ) - - -def _build_tagged_context(dossier: DossierMedical) -> tuple[str, dict[str, str]]: - """Construit un contexte clinique avec des tags de référence pour le grounding. - - Chaque élément clinique reçoit un tag unique ([BIO-1], [IMG-1], [TRT-1], [ACTE-1]) - que le LLM doit citer dans ses preuves pour garantir la traçabilité. - - Returns: - (texte tagué pour injection dans le prompt, dict tag → contenu original) - """ - tag_map: dict[str, str] = {} - lines: list[str] = [] - - # Biologie (avec normes de référence pour éviter les hallucinations) - for i, b in enumerate(dossier.biologie_cle, 1): - if not b.valeur: - continue - tag = f"BIO-{i}" - # Interpréter la valeur par rapport aux normes connues - norm_info = "" - if b.test in BIO_NORMALS: - lo, hi = BIO_NORMALS[b.test] - try: - val = float(b.valeur.replace(",", ".").split()[0]) - if val > hi: - norm_info = f" — ÉLEVÉ (norme {lo}-{hi})" - elif val < lo: - norm_info = f" — BAS (norme {lo}-{hi})" - else: - norm_info = f" — NORMAL (norme {lo}-{hi})" - except (ValueError, AttributeError): - pass - content = f"{b.test}: {b.valeur}{norm_info}" - tag_map[tag] = content - lines.append(f" [{tag}] {content}") - - # Imagerie - for i, im in enumerate(dossier.imagerie, 1): - tag = f"IMG-{i}" - conclusion = f" — {im.conclusion}" if im.conclusion else "" - content = f"{im.type}{conclusion}" - tag_map[tag] = content - lines.append(f" [{tag}] {content}") - - # Traitements - for i, t in enumerate(dossier.traitements_sortie[:10], 1): - tag = f"TRT-{i}" - posologie = f" {t.posologie}" if t.posologie else "" - content = f"{t.medicament}{posologie}" - tag_map[tag] = content - lines.append(f" [{tag}] {content}") - - # Actes CCAM - for i, a in enumerate(dossier.actes_ccam, 1): - tag = f"ACTE-{i}" - code = f" ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else "" - content = f"{a.texte}{code}" - tag_map[tag] = content - lines.append(f" [{tag}] {content}") - - if not lines: - return "", tag_map - - text = "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS (cite le tag [XX-N] dans tes preuves) :\n" + "\n".join(lines) - return text, tag_map - - -def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]: - """Vérifie que les références dans preuves_dossier correspondent à des tags existants. - - Returns: - Liste de warnings pour les références inventées. - """ - if not tag_map: - return [] - - warnings: list[str] = [] - preuves = response_data.get("preuves_dossier") - if not preuves or not isinstance(preuves, list): - return warnings - - for p in preuves: - if not isinstance(p, dict): - continue - ref = p.get("ref", "") - if not ref: - continue - if ref not in tag_map: - valeur = p.get("valeur", "?") - warnings.append(f"Preuve [{ref}] non traçable (« {valeur} »)") - logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref) - - return warnings - - -def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]: - """Vérifie la cohérence entre les textes DAS et les valeurs biologiques. - - Détecte les contradictions comme "leucocytose" dans un DAS alors que - les leucocytes sont bas, ou "anémie" alors que l'hémoglobine est normale. - - Returns: - Liste de warnings pour les incohérences détectées. - """ - if not dossier.diagnostics_associes or not dossier.biologie_cle: - return [] - - # Patterns DAS → (test bio attendu, direction attendue) - _DAS_BIO_CHECKS: dict[str, tuple[str, str]] = { - "leucocytose": ("Leucocytes", "high"), - "leucopénie": ("Leucocytes", "low"), - "leucopenie": ("Leucocytes", "low"), - "thrombocytose": ("Plaquettes", "high"), - "thrombocytopénie": ("Plaquettes", "low"), - "thrombocytopenie": ("Plaquettes", "low"), - "thrombopénie": ("Plaquettes", "low"), - "thrombopenie": ("Plaquettes", "low"), - "anémie": ("Hémoglobine", "low"), - "anemie": ("Hémoglobine", "low"), - "polyglobulie": ("Hémoglobine", "high"), - "hyperkaliémie": ("Potassium", "high"), - "hypokaliémie": ("Potassium", "low"), - } - - # Indexer les valeurs bio disponibles - bio_values: dict[str, float] = {} - for b in dossier.biologie_cle: - if b.test and b.valeur: - try: - bio_values[b.test] = float(b.valeur.replace(",", ".").split()[0]) - except (ValueError, AttributeError): - pass - - warnings: list[str] = [] - for das in dossier.diagnostics_associes: - texte_lower = (das.texte or "").lower() - for pattern, (bio_test, direction) in _DAS_BIO_CHECKS.items(): - if pattern not in texte_lower: - continue - if bio_test not in bio_values or bio_test not in BIO_NORMALS: - continue - val = bio_values[bio_test] - lo, hi = BIO_NORMALS[bio_test] - if direction == "high" and val <= hi: - warnings.append( - f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " - f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" - ) - elif direction == "low" and val >= lo: - warnings.append( - f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " - f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" - ) - - if warnings: - for w in warnings: - logger.warning(" DAS/bio : %s", w) - - return warnings - - -def _build_cpam_prompt( - dossier: DossierMedical, - controle: ControleCPAM, - sources: list[dict], - extraction: dict | None = None, -) -> tuple[str, dict[str, str]]: - """Construit le prompt pour la contre-argumentation CPAM. - - Args: - extraction: Résultat optionnel de la passe 1 (extraction structurée). - - Returns: - (prompt texte, tag_map pour validation grounding) - """ - # Résumé du dossier médical - dossier_lines = [] - - if dossier.diagnostic_principal: - dp = dossier.diagnostic_principal - dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else "" - dossier_lines.append(f"- DP : {dp.texte}{dp_code}") - elif controle.dp_ucr: - dp_label = _get_code_label(controle.dp_ucr) - dossier_lines.append( - f"- DP : code {controle.dp_ucr}{dp_label} " - f"(codé par l'établissement, contesté par la CPAM)" - ) - - if dossier.diagnostics_associes: - das_parts = [] - for das in dossier.diagnostics_associes: - code = f" ({das.cim10_suggestion})" if das.cim10_suggestion else "" - das_parts.append(f"{das.texte}{code}") - dossier_lines.append(f"- DAS : {', '.join(das_parts)}") - - if dossier.actes_ccam: - actes = [f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte - for a in dossier.actes_ccam] - dossier_lines.append(f"- Actes CCAM : {', '.join(actes)}") - - sejour = dossier.sejour - if sejour.duree_sejour is not None: - dossier_lines.append(f"- Durée séjour : {sejour.duree_sejour} jours") - if sejour.sexe or sejour.age is not None: - patient_info = [] - if sejour.sexe: - patient_info.append(sejour.sexe) - if sejour.age is not None: - patient_info.append(f"{sejour.age} ans") - if sejour.age < 18: - patient_info.append("(PÉDIATRIE — codage pédiatrique applicable)") - elif sejour.age >= 80: - patient_info.append("(patient âgé — comorbidités fréquentes)") - dossier_lines.append(f"- Patient : {', '.join(patient_info)}") - if sejour.mode_entree: - mode_label = sejour.mode_entree - if "urgence" in mode_label.lower() or "urgent" in mode_label.lower(): - dossier_lines.append(f"- Mode d'entrée : {mode_label} (ADMISSION EN URGENCE)") - else: - dossier_lines.append(f"- Mode d'entrée : {mode_label}") - if sejour.mode_sortie: - dossier_lines.append(f"- Mode de sortie : {sejour.mode_sortie}") - if sejour.imc is not None: - dossier_lines.append(f"- IMC : {sejour.imc}") - - if dossier.biologie_cle: - bio = [f"{b.test}: {b.valeur}" for b in dossier.biologie_cle[:5] if b.valeur] - if bio: - dossier_lines.append(f"- Biologie clé : {', '.join(bio)}") - - if dossier.imagerie: - img_parts = [] - for im in dossier.imagerie: - conclusion = f" — {im.conclusion}" if im.conclusion else "" - img_parts.append(f"{im.type}{conclusion}") - dossier_lines.append(f"- Imagerie : {', '.join(img_parts)}") - - if dossier.traitements_sortie: - trt_parts = [] - for t in dossier.traitements_sortie[:10]: - posologie = f" {t.posologie}" if t.posologie else "" - trt_parts.append(f"{t.medicament}{posologie}") - dossier_lines.append(f"- Traitements de sortie : {', '.join(trt_parts)}") - - if dossier.antecedents: - dossier_lines.append(f"- Antécédents : {', '.join(a.texte for a in dossier.antecedents[:10])}") - - if dossier.complications: - dossier_lines.append(f"- Complications : {', '.join(c.texte for c in dossier.complications)}") - - dossier_str = "\n".join(dossier_lines) if dossier_lines else "Non disponible" - - # Section asymétrie : éléments que la CPAM n'avait pas - asymetrie_lines = [] - - if dossier.biologie_cle: - bio_details = [] - for b in dossier.biologie_cle if len(dossier.biologie_cle) <= 10 else dossier.biologie_cle[:10]: - anomalie = " (anormale)" if b.anomalie else "" - if b.valeur: - bio_details.append(f"{b.test}: {b.valeur}{anomalie}") - if bio_details: - asymetrie_lines.append(f"- Biologie : {', '.join(bio_details)}") - - if dossier.imagerie: - img_details = [] - for im in dossier.imagerie: - conclusion = f" — {im.conclusion}" if im.conclusion else "" - img_details.append(f"{im.type}{conclusion}") - if img_details: - asymetrie_lines.append(f"- Imagerie : {', '.join(img_details)}") - - if dossier.traitements_sortie: - trt_details = [] - for t in dossier.traitements_sortie[:10]: - posologie = f" {t.posologie}" if t.posologie else "" - trt_details.append(f"{t.medicament}{posologie}") - if trt_details: - asymetrie_lines.append(f"- Traitements : {', '.join(trt_details)}") - - if dossier.actes_ccam: - actes_details = [ - f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte - for a in dossier.actes_ccam - ] - if actes_details: - asymetrie_lines.append(f"- Actes CCAM : {', '.join(actes_details)}") - - asymetrie_str = "" - if asymetrie_lines: - asymetrie_str = ( - "\n\nÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM " - "(l'UCR n'a eu que le CRH et les codes) :\n" - + "\n".join(asymetrie_lines) - ) - - # Codes contestés par la CPAM (avec libellés CIM-10 résolus) - codes_contestes = [] - if controle.dp_ucr: - codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}") - if controle.da_ucr: - codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}") - if controle.dr_ucr: - codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}") - if controle.actes_ucr: - codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}") - codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé" - - # Définitions CIM-10 déterministes (tous les codes en jeu) - definitions_str = _get_cim10_definitions(dossier, controle) - - # Contexte clinique tagué pour le grounding - tagged_context, tag_map = _build_tagged_context(dossier) - if tagged_context: - tagged_str = f"\n\n{tagged_context}" - else: - tagged_str = ( - "\n\nATTENTION — DOSSIER PAUVRE EN ÉLÉMENTS CLINIQUES :\n" - "Aucune biologie, imagerie, traitement ou acte CCAM disponible.\n" - "Ne spécule PAS sur des éléments absents. Signale explicitement " - "le manque de données au lieu d'inventer des preuves." - ) - - # Vérification cohérence DAS / biologie - das_bio_warnings = _check_das_bio_coherence(dossier) - if das_bio_warnings: - tagged_str += ( - "\n\nALERTES COHÉRENCE DAS / BIOLOGIE (incohérences détectées dans le dossier) :\n" - + "\n".join(f" - {w}" for w in das_bio_warnings) - + "\n Prends en compte ces incohérences dans ton analyse." - ) - - # Sources RAG - sources_text = "" - for i, src in enumerate(sources, 1): - doc_name = { - "cim10": "CIM-10 FR 2026", - "cim10_alpha": "CIM-10 Index Alphabétique 2026", - "guide_methodo": "Guide Méthodologique MCO 2026", - "ccam": "CCAM PMSI V4 2025", - }.get(src.get("document", ""), src.get("document", "")) - - code_info = f" (code: {src['code']})" if src.get("code") else "" - page_info = f" [page {src['page']}]" if src.get("page") else "" - - sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n" - sources_text += (src.get("extrait", "")[:800]) + "\n\n" - - # Section pré-analyse (résultat passe 1, si disponible) - extraction_str = "" - if extraction: - ext_lines = [] - comp = extraction.get("comprehension_contestation") - if comp: - ext_lines.append(f"Compréhension : {comp}") - elems = extraction.get("elements_cliniques_pertinents", []) - if elems and isinstance(elems, list): - elem_strs = [] - for e in elems: - if isinstance(e, dict): - elem_strs.append(f" - [{e.get('tag', '?')}] {e.get('pertinence', '')}") - if elem_strs: - ext_lines.append("Éléments pertinents :\n" + "\n".join(elem_strs)) - accords = extraction.get("points_accord_potentiels", []) - if accords and isinstance(accords, list): - ext_lines.append("Points d'accord potentiels : " + " ; ".join(str(a) for a in accords)) - codes = extraction.get("codes_en_jeu", {}) - if codes and isinstance(codes, dict): - diff = codes.get("difference_cle", "") - if diff: - ext_lines.append(f"Différence clé entre les codages : {diff}") - if ext_lines: - extraction_str = ( - "\nPRÉ-ANALYSE (extraction automatique — à utiliser comme base) :\n" - + "\n".join(ext_lines) - ) - - prompt = CPAM_ARGUMENTATION.format( - dossier_str=dossier_str, - asymetrie_str=asymetrie_str, - tagged_str=tagged_str, - titre=controle.titre, - arg_ucr=controle.arg_ucr, - decision_ucr=controle.decision_ucr, - codes_str=codes_str, - definitions_str=definitions_str, - sources_text=sources_text, - extraction_str=extraction_str, - ) - return prompt, tag_map - - -def _validate_references(parsed: dict, sources: list[dict]) -> list[str]: - """Vérifie que les références citées correspondent aux sources RAG fournies. - - Returns: - Liste d'avertissements pour les références non vérifiables. - """ - warnings = [] - refs = parsed.get("references") - if not refs or not isinstance(refs, list): - return warnings - - # Construire un set des documents sources disponibles - source_docs = set() - for src in sources: - doc_name = src.get("document", "") - source_docs.add(doc_name) - # Ajouter les noms lisibles aussi - readable = { - "cim10": "CIM-10 FR 2026", - "cim10_alpha": "CIM-10 Index Alphabétique 2026", - "guide_methodo": "Guide Méthodologique MCO 2026", - "ccam": "CCAM PMSI V4 2025", - }.get(doc_name, "") - if readable: - source_docs.add(readable) - source_docs.add(readable.lower()) - - if not source_docs: - return warnings - - for ref in refs: - if not isinstance(ref, dict): - continue - doc = ref.get("document", "") - if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd): - warnings.append(f"Référence non vérifiable : {doc}") - logger.warning("CPAM : référence non vérifiable « %s »", doc) - - return warnings - - -def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str: - """Formate la réponse LLM en texte lisible.""" - sections = [] - - analyse = parsed.get("analyse_contestation") - if analyse: - sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}") - - accord = parsed.get("points_accord") - if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""): - sections.append(f"POINTS D'ACCORD\n{accord}") - - # Nouveaux champs structurés par axe - contre_med = parsed.get("contre_arguments_medicaux") - if contre_med: - sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}") - - # Preuves du dossier (nouveau champ structuré) - preuves = parsed.get("preuves_dossier") - if preuves and isinstance(preuves, list): - preuves_lines = [] - for p in preuves: - if isinstance(p, dict): - ref = p.get("ref", "") - elem = p.get("element", "") - valeur = p.get("valeur", "") - signif = p.get("signification", "") - ref_prefix = f"[{ref}] " if ref else "" - preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur} → {signif}") - if preuves_lines: - sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines)) - - contre_asym = parsed.get("contre_arguments_asymetrie") - if contre_asym: - sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}") - - contre_regl = parsed.get("contre_arguments_reglementaires") - if contre_regl: - sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}") - - # Fallback : ancien champ unique (réponses en cache existantes) - if not contre_med and not contre_asym and not contre_regl: - contre = parsed.get("contre_arguments") - if contre: - sections.append(f"CONTRE-ARGUMENTS\n{contre}") - - # Références structurées (nouveau format liste) ou ancien format string - refs = parsed.get("references") - if refs: - if isinstance(refs, list): - ref_lines = [] - for r in refs: - if isinstance(r, dict): - doc = r.get("document", "") - page = r.get("page", "") - citation = r.get("citation", "") - ref_lines.append(f"- [{doc}, p.{page}] {citation}") - else: - ref_lines.append(f"- {r}") - if ref_lines: - sections.append(f"REFERENCES\n" + "\n".join(ref_lines)) - else: - sections.append(f"REFERENCES\n{refs}") - - conclusion = parsed.get("conclusion") - if conclusion: - sections.append(f"CONCLUSION\n{conclusion}") - - # Avertissements sur les références non vérifiables - if ref_warnings: - warning_text = "\n".join(f"- {w}" for w in ref_warnings) - sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}") - - return "\n\n".join(sections) - - -def _validate_adversarial( - response_data: dict, - tag_map: dict[str, str], - controle: ControleCPAM, -) -> dict | None: - """Validation adversariale — vérifie la cohérence de la contre-argumentation. - - Un appel LLM de relecture critique vérifie : - 1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier - 2. La conclusion est cohérente avec l'argumentation - 3. Les points d'accord ne contredisent pas la contre-argumentation - 4. Les codes CIM-10 cités sont cohérents - - Returns: - dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec. - """ - import json as _json - - # Construire le résumé des éléments factuels disponibles - if tag_map: - factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items()) - factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}" - else: - factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible" - - # Sérialiser la réponse LLM de façon compacte - try: - response_json = _json.dumps(response_data, ensure_ascii=False, indent=None) - # Tronquer si trop long pour le prompt de validation - if len(response_json) > 3000: - response_json = response_json[:3000] + "..." - except (TypeError, ValueError): - logger.warning("Validation adversariale : impossible de sérialiser la réponse") - return None - - # Normes biologiques pour vérifier les interprétations - normes_lines = [] - for test, (lo, hi) in BIO_NORMALS.items(): - normes_lines.append(f" {test}: {lo}-{hi}") - normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines) - - dp_ucr_line = f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else "" - da_ucr_line = f"DA UCR : {controle.da_ucr}" if controle.da_ucr else "" - - prompt = CPAM_ADVERSARIAL.format( - response_json=response_json, - factual_section=factual_section, - normes_section=normes_section, - dp_ucr_line=dp_ucr_line, - da_ucr_line=da_ucr_line, - ) - - logger.debug(" Validation adversariale") - result = call_ollama(prompt, temperature=0.0, max_tokens=800, role="validation") - if result is None: - result = call_anthropic(prompt, temperature=0.0, max_tokens=800) - if result is None: - logger.warning(" Validation adversariale échouée — LLM indisponible") - return None - - coherent = result.get("coherent", True) - erreurs = result.get("erreurs", []) - score = result.get("score_confiance", -1) - - if not coherent and erreurs: - logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)", - len(erreurs), score) - for e in erreurs: - logger.warning(" - %s", e) - else: - logger.info(" Validation adversariale OK (score %s/10)", score) - - return result - - def _extraction_pass( dossier: DossierMedical, controle: ControleCPAM, @@ -926,6 +159,11 @@ def generate_cpam_response( if grounding_warnings: logger.warning(" CPAM : %d preuve(s) non traçable(s)", len(grounding_warnings)) + # 7b. Validation codes fermée (périmètre dossier + UCR) + code_warnings = _validate_codes_in_response(result, dossier, controle) + if code_warnings: + logger.warning(" CPAM : %d code(s) hors périmètre", len(code_warnings)) + # 8. Validation adversariale (cohérence factuelle) adversarial_warnings: list[str] = [] validation = _validate_adversarial(result, tag_map, controle) @@ -938,7 +176,49 @@ def generate_cpam_response( if adversarial_warnings: adversarial_warnings.append(f"Score de confiance : {score}/10") - all_warnings = ref_warnings + grounding_warnings + adversarial_warnings + # 8b. Boucle de correction (max 1 retry) + if (validation + and not validation.get("coherent", True) + and validation.get("score_confiance", 10) <= 5 + and rule_enabled("RULE-CPAM-CORRECTION-LOOP")): + + erreurs_v = validation.get("erreurs", []) + logger.warning(" Score adversarial %s/10 — correction en cours (%d erreur(s))", + validation.get("score_confiance"), len(erreurs_v)) + + correction_prompt = _build_correction_prompt(prompt, result, validation) + corrected = call_ollama(correction_prompt, temperature=0.0, max_tokens=6000, role="cpam") + if corrected is None: + corrected = call_anthropic(correction_prompt, temperature=0.0, max_tokens=6000) + + if corrected: + # Re-valider la correction + validation2 = _validate_adversarial(corrected, tag_map, controle) + score2 = validation2.get("score_confiance", 0) if validation2 else 0 + score1 = validation.get("score_confiance", 0) + + if score2 > score1: + logger.info(" Correction acceptée (score %s → %s)", score1, score2) + result = corrected + validation = validation2 + # Recalculer les warnings + ref_warnings = _validate_references(result, sources) + grounding_warnings = _validate_grounding(result, tag_map) + code_warnings = _validate_codes_in_response(result, dossier, controle) + adversarial_warnings = [] + if validation and not validation.get("coherent", True): + for e in validation.get("erreurs", []): + if isinstance(e, str) and e.strip(): + adversarial_warnings.append(f"Incohérence détectée : {e}") + if adversarial_warnings: + adversarial_warnings.append( + f"Score de confiance : {validation.get('score_confiance', '?')}/10" + ) + else: + logger.warning(" Correction rejetée (score %s → %s) — conserve l'original", + score1, score2) + + all_warnings = ref_warnings + grounding_warnings + code_warnings + adversarial_warnings # 9. Formater la réponse text = _format_response(result, all_warnings) diff --git a/src/control/cpam_validation.py b/src/control/cpam_validation.py new file mode 100644 index 0000000..fa85acc --- /dev/null +++ b/src/control/cpam_validation.py @@ -0,0 +1,376 @@ +"""Validation et formatage des réponses CPAM (grounding, adversariale, codes).""" + +from __future__ import annotations + +import logging +import re + +from ..config import ControleCPAM, DossierMedical +from ..medical.bio_normals import BIO_NORMALS +from ..medical.cim10_dict import normalize_code, validate_code +from ..medical.ollama_client import call_anthropic, call_ollama +from ..prompts import CPAM_ADVERSARIAL + +logger = logging.getLogger(__name__) + + +def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]: + """Vérifie que les références dans preuves_dossier correspondent à des tags existants. + + Returns: + Liste de warnings pour les références inventées. + """ + if not tag_map: + return [] + + warnings: list[str] = [] + preuves = response_data.get("preuves_dossier") + if not preuves or not isinstance(preuves, list): + return warnings + + for p in preuves: + if not isinstance(p, dict): + continue + ref = p.get("ref", "") + if not ref: + continue + if ref not in tag_map: + valeur = p.get("valeur", "?") + warnings.append(f"Preuve [{ref}] non traçable (« {valeur} »)") + logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref) + + return warnings + + +def _validate_references(parsed: dict, sources: list[dict]) -> list[str]: + """Vérifie que les références citées correspondent aux sources RAG fournies. + + Returns: + Liste d'avertissements pour les références non vérifiables. + """ + warnings = [] + refs = parsed.get("references") + if not refs or not isinstance(refs, list): + return warnings + + # Construire un set des documents sources disponibles + source_docs = set() + for src in sources: + doc_name = src.get("document", "") + source_docs.add(doc_name) + # Ajouter les noms lisibles aussi + readable = { + "cim10": "CIM-10 FR 2026", + "cim10_alpha": "CIM-10 Index Alphabétique 2026", + "guide_methodo": "Guide Méthodologique MCO 2026", + "ccam": "CCAM PMSI V4 2025", + }.get(doc_name, "") + if readable: + source_docs.add(readable) + source_docs.add(readable.lower()) + + if not source_docs: + return warnings + + for ref in refs: + if not isinstance(ref, dict): + continue + doc = ref.get("document", "") + if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd): + warnings.append(f"Référence non vérifiable : {doc}") + logger.warning("CPAM : référence non vérifiable « %s »", doc) + + return warnings + + +# Regex pour capturer les codes CIM-10 (ex: K81.0, E87, Z45.80) +_CIM10_CODE_RE = re.compile(r"\b([A-Z]\d{2}\.?\d{0,2})\b") + + +def _validate_codes_in_response( + parsed: dict, + dossier: DossierMedical, + controle: ControleCPAM, +) -> list[str]: + """Vérifie que les codes CIM-10 cités dans la réponse sont dans le périmètre du dossier. + + Construit une whitelist à partir du dossier (DP, DAS) et de l'UCR (dp_ucr, da_ucr, dr_ucr), + puis extrait tous les codes CIM-10 des champs textuels de la réponse LLM. + La comparaison se fait par préfixe 3 caractères (ex: K81 matche K81.0 et K81.09). + + Returns: + Liste de warnings pour les codes hors périmètre. + """ + # 1. Construire la whitelist (préfixes 3 chars) + whitelist_prefixes: set[str] = set() + + def _add_code(raw: str) -> None: + raw = raw.strip() + if not raw: + return + norm = normalize_code(raw) + if norm and len(norm) >= 3: + whitelist_prefixes.add(norm[:3]) + + # Codes du dossier + if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: + _add_code(dossier.diagnostic_principal.cim10_suggestion) + for das in dossier.diagnostics_associes: + if das.cim10_suggestion: + _add_code(das.cim10_suggestion) + + # Codes de l'UCR + for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): + if not field: + continue + for raw in re.split(r"[,;\s]+", field.strip()): + _add_code(raw) + + if not whitelist_prefixes: + return [] + + # 2. Extraire les codes CIM-10 de la réponse LLM (hors citations RAG) + text_fields = [] + for key in ( + "analyse_contestation", + "contre_arguments_medicaux", + "contre_arguments_asymetrie", + "contre_arguments_reglementaires", + "conclusion", + ): + val = parsed.get(key) + if val and isinstance(val, str): + text_fields.append(val) + + # Preuves du dossier — valeurs + preuves = parsed.get("preuves_dossier") + if preuves and isinstance(preuves, list): + for p in preuves: + if isinstance(p, dict): + v = p.get("valeur", "") + if v and isinstance(v, str): + text_fields.append(v) + + combined_text = "\n".join(text_fields) + found_codes = _CIM10_CODE_RE.findall(combined_text) + + if not found_codes: + return [] + + # 3. Comparer par préfixe 3 chars + warnings: list[str] = [] + seen_warned: set[str] = set() + + for raw_code in found_codes: + norm = normalize_code(raw_code) + if not norm or len(norm) < 3: + continue + prefix = norm[:3] + if prefix in whitelist_prefixes: + continue + if norm in seen_warned: + continue + seen_warned.add(norm) + is_valid, label = validate_code(norm) + label_str = f" ({label})" if is_valid and label else "" + warnings.append(f"Code {norm}{label_str} hors périmètre dossier/UCR") + logger.warning("CPAM : code %s%s absent du dossier et de l'UCR", norm, label_str) + + return warnings + + +def _validate_adversarial( + response_data: dict, + tag_map: dict[str, str], + controle: ControleCPAM, +) -> dict | None: + """Validation adversariale — vérifie la cohérence de la contre-argumentation. + + Un appel LLM de relecture critique vérifie : + 1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier + 2. La conclusion est cohérente avec l'argumentation + 3. Les points d'accord ne contredisent pas la contre-argumentation + 4. Les codes CIM-10 cités sont cohérents + + Returns: + dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec. + """ + import json as _json + + # Construire le résumé des éléments factuels disponibles + if tag_map: + factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items()) + factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}" + else: + factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible" + + # Sérialiser la réponse LLM de façon compacte + try: + response_json = _json.dumps(response_data, ensure_ascii=False, indent=None) + # Tronquer si trop long pour le prompt de validation + if len(response_json) > 3000: + response_json = response_json[:3000] + "..." + except (TypeError, ValueError): + logger.warning("Validation adversariale : impossible de sérialiser la réponse") + return None + + # Normes biologiques pour vérifier les interprétations + normes_lines = [] + for test, (lo, hi) in BIO_NORMALS.items(): + normes_lines.append(f" {test}: {lo}-{hi}") + normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines) + + dp_ucr_line = f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else "" + da_ucr_line = f"DA UCR : {controle.da_ucr}" if controle.da_ucr else "" + + prompt = CPAM_ADVERSARIAL.format( + response_json=response_json, + factual_section=factual_section, + normes_section=normes_section, + dp_ucr_line=dp_ucr_line, + da_ucr_line=da_ucr_line, + ) + + logger.debug(" Validation adversariale") + result = call_ollama(prompt, temperature=0.0, max_tokens=800, role="validation") + if result is None: + result = call_anthropic(prompt, temperature=0.0, max_tokens=800) + if result is None: + logger.warning(" Validation adversariale échouée — LLM indisponible") + return None + + coherent = result.get("coherent", True) + erreurs = result.get("erreurs", []) + score = result.get("score_confiance", -1) + + if not coherent and erreurs: + logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)", + len(erreurs), score) + for e in erreurs: + logger.warning(" - %s", e) + else: + logger.info(" Validation adversariale OK (score %s/10)", score) + + return result + + +def _build_correction_prompt( + original_prompt: str, + original_response: dict, + adversarial_result: dict, +) -> str: + """Construit un prompt de correction en injectant les erreurs détectées. + + Args: + original_prompt: Le prompt d'argumentation initial. + original_response: La réponse LLM originale (dict). + adversarial_result: Le résultat de la validation adversariale. + + Returns: + Prompt de correction prêt à envoyer au LLM. + """ + import json as _json + + erreurs = adversarial_result.get("erreurs", []) + erreurs_text = "\n".join(f" {i}. {e}" for i, e in enumerate(erreurs, 1)) + + # Résumé compact de la réponse problématique + summary_fields = {} + for key in ("analyse_contestation", "contre_arguments_medicaux", + "contre_arguments_asymetrie", "contre_arguments_reglementaires", + "conclusion"): + val = original_response.get(key) + if val and isinstance(val, str): + # Tronquer chaque champ à 400 chars + summary_fields[key] = val[:400] + ("..." if len(val) > 400 else "") + + try: + response_summary = _json.dumps(summary_fields, ensure_ascii=False, indent=2) + except (TypeError, ValueError): + response_summary = str(summary_fields) + + correction_block = ( + "\n\n=== CORRECTION REQUISE — ERREURS DÉTECTÉES DANS TA RÉPONSE PRÉCÉDENTE ===\n" + f"{erreurs_text}\n\n" + f"RÉPONSE PRÉCÉDENTE (À CORRIGER) :\n{response_summary}\n\n" + "Corrige UNIQUEMENT les erreurs ci-dessus. Conserve les parties correctes.\n" + "Réponds avec le même format JSON." + ) + + return original_prompt + correction_block + + +def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str: + """Formate la réponse LLM en texte lisible.""" + sections = [] + + analyse = parsed.get("analyse_contestation") + if analyse: + sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}") + + accord = parsed.get("points_accord") + if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""): + sections.append(f"POINTS D'ACCORD\n{accord}") + + # Nouveaux champs structurés par axe + contre_med = parsed.get("contre_arguments_medicaux") + if contre_med: + sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}") + + # Preuves du dossier (nouveau champ structuré) + preuves = parsed.get("preuves_dossier") + if preuves and isinstance(preuves, list): + preuves_lines = [] + for p in preuves: + if isinstance(p, dict): + ref = p.get("ref", "") + elem = p.get("element", "") + valeur = p.get("valeur", "") + signif = p.get("signification", "") + ref_prefix = f"[{ref}] " if ref else "" + preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur} → {signif}") + if preuves_lines: + sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines)) + + contre_asym = parsed.get("contre_arguments_asymetrie") + if contre_asym: + sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}") + + contre_regl = parsed.get("contre_arguments_reglementaires") + if contre_regl: + sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}") + + # Fallback : ancien champ unique (réponses en cache existantes) + if not contre_med and not contre_asym and not contre_regl: + contre = parsed.get("contre_arguments") + if contre: + sections.append(f"CONTRE-ARGUMENTS\n{contre}") + + # Références structurées (nouveau format liste) ou ancien format string + refs = parsed.get("references") + if refs: + if isinstance(refs, list): + ref_lines = [] + for r in refs: + if isinstance(r, dict): + doc = r.get("document", "") + page = r.get("page", "") + citation = r.get("citation", "") + ref_lines.append(f"- [{doc}, p.{page}] {citation}") + else: + ref_lines.append(f"- {r}") + if ref_lines: + sections.append(f"REFERENCES\n" + "\n".join(ref_lines)) + else: + sections.append(f"REFERENCES\n{refs}") + + conclusion = parsed.get("conclusion") + if conclusion: + sections.append(f"CONCLUSION\n{conclusion}") + + # Avertissements sur les références non vérifiables + if ref_warnings: + warning_text = "\n".join(f"- {w}" for w in ref_warnings) + sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}") + + return "\n\n".join(sections) diff --git a/tests/test_cpam_response.py b/tests/test_cpam_response.py index 049c02f..f39901d 100644 --- a/tests/test_cpam_response.py +++ b/tests/test_cpam_response.py @@ -16,6 +16,8 @@ from src.config import ( Traitement, ) from src.control.cpam_response import ( + _build_bio_summary, + _build_correction_prompt, _build_cpam_prompt, _build_tagged_context, _check_das_bio_coherence, @@ -25,6 +27,7 @@ from src.control.cpam_response import ( _get_code_label, _search_rag_for_control, _validate_adversarial, + _validate_codes_in_response, _validate_grounding, _validate_references, generate_cpam_response, @@ -206,8 +209,8 @@ class TestBuildPrompt: assert "preuves_dossier" in prompt - @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale")) - @patch("src.control.cpam_response.normalize_code", return_value="K56.0") + @patch("src.control.cpam_context.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale")) + @patch("src.control.cpam_context.normalize_code", return_value="K56.0") def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid): """Les codes contestés affichent le libellé CIM-10.""" dossier = _make_dossier() @@ -217,8 +220,8 @@ class TestBuildPrompt: assert "Iléus paralytique" in prompt assert "DA proposés par UCR" in prompt - @patch("src.control.cpam_response.validate_code", return_value=(False, "")) - @patch("src.control.cpam_response.normalize_code", return_value="Z99.9") + @patch("src.control.cpam_context.validate_code", return_value=(False, "")) + @patch("src.control.cpam_context.normalize_code", return_value="Z99.9") def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid): """Les codes invalides ne crashent pas, juste pas de libellé.""" dossier = _make_dossier() @@ -231,8 +234,8 @@ class TestBuildPrompt: assert "Z99.9" in prompt # Pas de crash - @patch("src.control.cpam_response.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable")) - @patch("src.control.cpam_response.normalize_code", return_value="Z45.8") + @patch("src.control.cpam_context.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable")) + @patch("src.control.cpam_context.normalize_code", return_value="Z45.8") def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid): """DP absent + dp_ucr → contexte injecté dans le prompt.""" dossier = DossierMedical( @@ -397,10 +400,11 @@ class TestValidateReferences: class TestGenerateResponse: + @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") - def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama): + def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama): """Ollama disponible → 3 passes (extraction + argumentation + validation).""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, @@ -422,6 +426,7 @@ class TestGenerateResponse: return {"coherent": True, "erreurs": [], "score_confiance": 9} mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect dossier = _make_dossier() controle = _make_controle() @@ -434,18 +439,21 @@ class TestGenerateResponse: assert len(sources) == 1 assert sources[0].document == "guide_methodo" # 3 appels Ollama : extraction + argumentation + validation - assert mock_ollama.call_count == 3 + assert call_count["n"] == 3 mock_anthropic.assert_not_called() + @patch("src.control.cpam_validation.call_anthropic") + @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") - def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama): + def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_val_anthropic): """Ollama indisponible → fallback Haiku pour les 3 passes.""" mock_rag.return_value = [ {"document": "guide_methodo", "page": 64, "extrait": "Texte guide"}, ] mock_ollama.return_value = None + mock_val_ollama.return_value = None call_count = {"n": 0} def anthropic_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): @@ -462,6 +470,7 @@ class TestGenerateResponse: return {"coherent": True, "erreurs": [], "score_confiance": 8} mock_anthropic.side_effect = anthropic_side_effect + mock_val_anthropic.side_effect = anthropic_side_effect dossier = _make_dossier() controle = _make_controle() @@ -470,15 +479,15 @@ class TestGenerateResponse: assert "Contre-args Haiku..." in text assert response_data is not None - # Ollama appelé 3 fois mais retourne None - assert mock_ollama.call_count == 3 - # Anthropic appelé 3 fois en fallback - assert mock_anthropic.call_count == 3 + # 3 appels Ollama (retourne None) + 3 Anthropic en fallback + assert call_count["n"] == 3 + @patch("src.control.cpam_validation.call_anthropic", return_value=None) + @patch("src.control.cpam_validation.call_ollama", return_value=None) @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response.call_anthropic") @patch("src.control.cpam_response._search_rag_for_control") - def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama): + def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama, _mock_val_ollama, _mock_val_anthropic): """Tous LLMs indisponibles → texte vide, response_data None.""" mock_rag.return_value = [] mock_anthropic.return_value = None @@ -657,8 +666,8 @@ class TestSearchRagForControl: arg_call_query = mock_search.call_args_list[0][0][0] assert len(arg_call_query) > 200 - @patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique")) - @patch("src.control.cpam_response.normalize_code", return_value="K56.0") + @patch("src.control.cpam_rag.validate_code", return_value=(True, "Iléus paralytique")) + @patch("src.control.cpam_rag.normalize_code", return_value="K56.0") @patch("src.medical.rag_search.search_similar_cpam") def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid): """Requête 4 exécutée quand codes contestés présents.""" @@ -722,8 +731,8 @@ class TestSearchRagForControl: class TestGetCim10Definitions: """Tests pour l'injection déterministe des définitions CIM-10.""" - @patch("src.control.cpam_response.validate_code") - @patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper()) + @patch("src.control.cpam_context.validate_code") + @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_injected_in_prompt(self, mock_norm, mock_valid): """La section DÉFINITIONS CIM-10 apparaît dans le prompt avec les libellés.""" mock_valid.side_effect = lambda c: { @@ -742,8 +751,8 @@ class TestGetCim10Definitions: assert "Iléus paralytique" in prompt assert "DP établissement" in prompt - @patch("src.control.cpam_response.validate_code") - @patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper()) + @patch("src.control.cpam_context.validate_code") + @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_include_dp_and_ucr_codes(self, mock_norm, mock_valid): """Les codes du dossier ET de l'UCR sont tous inclus.""" mock_valid.side_effect = lambda c: { @@ -769,8 +778,8 @@ class TestGetCim10Definitions: assert "DP proposé UCR" in result assert "DA proposé UCR" in result or "DAS établissement" in result - @patch("src.control.cpam_response.validate_code", return_value=(False, "")) - @patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper()) + @patch("src.control.cpam_context.validate_code", return_value=(False, "")) + @patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper()) def test_definitions_graceful_when_code_unknown(self, mock_norm, mock_valid): """Un code inconnu ne crashe pas, affiche un message explicite.""" dossier = DossierMedical( @@ -1149,9 +1158,10 @@ class TestExtractionPass: assert "PRÉ-ANALYSE" not in prompt + @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response._search_rag_for_control") - def test_generate_calls_three_passes(self, mock_rag, mock_ollama): + def test_generate_calls_three_passes(self, mock_rag, mock_ollama, mock_val_ollama): """L'orchestrateur appelle extraction + argumentation + validation.""" call_count = {"n": 0} @@ -1174,6 +1184,7 @@ class TestExtractionPass: return {"coherent": True, "erreurs": [], "score_confiance": 9} mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect mock_rag.return_value = [] dossier = _make_dossier() @@ -1181,7 +1192,7 @@ class TestExtractionPass: text, response_data, sources = generate_cpam_response(dossier, controle) # 3 appels Ollama : extraction + argumentation + validation - assert mock_ollama.call_count == 3 + assert call_count["n"] == 3 assert response_data is not None assert "Arguments..." in text @@ -1189,7 +1200,7 @@ class TestExtractionPass: class TestValidateAdversarial: """Tests pour la validation adversariale.""" - @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_validation.call_ollama") def test_coherent_response_no_warnings(self, mock_ollama): """Réponse cohérente → coherent=true, pas de warnings dans le texte.""" mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 9} @@ -1208,7 +1219,7 @@ class TestValidateAdversarial: assert result["coherent"] is True assert len(result["erreurs"]) == 0 - @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_validation.call_ollama") def test_hallucinated_bio_detected(self, mock_ollama): """Valeur bio halluccinée → coherent=false avec erreur.""" mock_ollama.return_value = { @@ -1231,8 +1242,8 @@ class TestValidateAdversarial: assert len(result["erreurs"]) == 1 assert "CRP" in result["erreurs"][0] - @patch("src.control.cpam_response.call_anthropic", return_value=None) - @patch("src.control.cpam_response.call_ollama", return_value=None) + @patch("src.control.cpam_validation.call_anthropic", return_value=None) + @patch("src.control.cpam_validation.call_ollama", return_value=None) def test_adversarial_failure_graceful(self, mock_ollama, mock_anthropic): """LLM indisponible → retourne None, pas de crash.""" tag_map = {"BIO-1": "CRP: 180 mg/L"} @@ -1243,9 +1254,10 @@ class TestValidateAdversarial: assert result is None + @patch("src.control.cpam_validation.call_ollama") @patch("src.control.cpam_response.call_ollama") @patch("src.control.cpam_response._search_rag_for_control") - def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama): + def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama, mock_val_ollama): """Incohérences détectées → avertissements dans le texte formaté.""" call_count = {"n": 0} @@ -1267,6 +1279,7 @@ class TestValidateAdversarial: } mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect mock_rag.return_value = [] dossier = _make_dossier() @@ -1278,7 +1291,7 @@ class TestValidateAdversarial: def test_adversarial_empty_tag_map(self): """Dossier sans tags → validation fonctionne quand même.""" - with patch("src.control.cpam_response.call_ollama") as mock_ollama: + with patch("src.control.cpam_validation.call_ollama") as mock_ollama: mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 7} result = _validate_adversarial( @@ -1287,3 +1300,380 @@ class TestValidateAdversarial: assert result is not None assert result["coherent"] is True + + +class TestValidateCodesInResponse: + """Tests pour la validation codes fermée (périmètre dossier + UCR).""" + + def test_code_in_dossier_no_warning(self): + """Code du dossier cité → pas de warning.""" + parsed = {"conclusion": "Le code K81.0 est justifié par la cholécystite."} + dossier = _make_dossier() # DP K81.0, DAS K56.0 + controle = _make_controle() + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) == 0 + + def test_code_from_ucr_no_warning(self): + """Code proposé par l'UCR cité → pas de warning.""" + parsed = {"conclusion": "Le code K56.0 contesté par l'UCR est bien justifié."} + dossier = _make_dossier() + controle = _make_controle() # da_ucr="K56.0" + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) == 0 + + def test_invented_code_detected(self): + """Code absent du dossier et de l'UCR → warning.""" + parsed = {"conclusion": "Le code Z45.8 confirme la nécessité du séjour."} + dossier = _make_dossier() # DP K81.0, DAS K56.0 + controle = _make_controle() # da_ucr=K56.0 + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) >= 1 + assert any("Z45" in w for w in warnings) + + def test_subcode_tolerated(self): + """K81.09 toléré quand K81.0 est dans la whitelist (même préfixe 3 chars).""" + parsed = {"contre_arguments_medicaux": "Le sous-code K81.09 est une précision de K81.0."} + dossier = _make_dossier() # DP K81.0 + controle = _make_controle() + warnings = _validate_codes_in_response(parsed, dossier, controle) + # K81.09 partage le préfixe K81 avec K81.0 → toléré + assert len(warnings) == 0 + + def test_codes_in_citations_excluded(self): + """Codes dans references[].citation → pas de validation.""" + parsed = { + "conclusion": "Le codage est justifié.", + "references": [ + {"document": "CIM-10", "citation": "Z45.8 — Ajustement d'un dispositif"}, + ], + } + dossier = _make_dossier() + controle = _make_controle() + warnings = _validate_codes_in_response(parsed, dossier, controle) + # Z45.8 est dans references, pas dans les champs textuels → pas flaggé + assert len(warnings) == 0 + + def test_no_codes_in_response_no_warning(self): + """Réponse sans codes CIM-10 → 0 warnings.""" + parsed = {"conclusion": "Le séjour est justifié par la gravité clinique."} + dossier = _make_dossier() + controle = _make_controle() + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) == 0 + + def test_multiple_invented_codes(self): + """Plusieurs codes hors périmètre → autant de warnings.""" + parsed = { + "contre_arguments_medicaux": "Les codes Z45.8 et E11.9 confirment le diagnostic.", + } + dossier = _make_dossier() # K81.0, K56.0 + controle = _make_controle() + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) >= 2 + + def test_no_whitelist_no_validation(self): + """Aucun code dans le dossier ni l'UCR → pas de validation (0 warnings).""" + parsed = {"conclusion": "Le code Z45.8 est justifié."} + dossier = DossierMedical(source_file="test.pdf", diagnostic_principal=None) + controle = ControleCPAM( + numero_ogc=1, titre="Test", arg_ucr="Test", + decision_ucr="Rejet", dp_ucr=None, da_ucr=None, + ) + warnings = _validate_codes_in_response(parsed, dossier, controle) + assert len(warnings) == 0 + + +class TestBuildBioSummary: + """Tests pour le résumé biologique déterministe.""" + + def test_bio_summary_interpretation(self): + """CRP élevée, Hb basse → résumé correct avec interprétations cliniques.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True), + BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True), + ], + ) + summary = _build_bio_summary(dossier) + + assert "CRP" in summary + assert "ÉLEVÉ" in summary + assert "infection/inflammation active" in summary + assert "Hémoglobine" in summary + assert "BAS" in summary + assert "anémie" in summary + + def test_bio_summary_normal_values(self): + """Valeurs normales → interprétation 'normal' affichée.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="Plaquettes", valeur="250 G/L", anomalie=False), + ], + ) + summary = _build_bio_summary(dossier) + + assert "NORMAL" in summary + assert "numération normale" in summary + + def test_bio_summary_in_prompt(self): + """Le résumé bio apparaît dans le prompt CPAM.""" + dossier = _make_dossier_complet() # CRP 180, Créatinine 450 + controle = _make_controle() + prompt, _ = _build_cpam_prompt(dossier, controle, []) + + assert "FAITS BIOLOGIQUES VÉRIFIÉS" in prompt + assert "NE PAS MODIFIER" in prompt + assert "RÈGLE STRICTE" in prompt + + def test_bio_summary_empty_no_bio(self): + """Pas de biologie → résumé vide.""" + dossier = DossierMedical(source_file="test.pdf") + summary = _build_bio_summary(dossier) + assert summary == "" + + def test_bio_summary_unknown_test(self): + """Test bio non reconnu (hors BIO_NORMALS) → omis du résumé.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="Ferritine", valeur="15 µg/L", anomalie=True), + ], + ) + summary = _build_bio_summary(dossier) + assert summary == "" + + def test_bio_summary_unparseable_value(self): + """Valeur bio non parseable → omise sans crash.""" + dossier = DossierMedical( + source_file="test.pdf", + biologie_cle=[ + BiologieCle(test="CRP", valeur="positif", anomalie=True), + BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True), + ], + ) + summary = _build_bio_summary(dossier) + # CRP "positif" non parseable → omis, mais Hb présente + assert "Hémoglobine" in summary + assert "CRP" not in summary + + +class TestCorrectionLoop: + """Tests pour la boucle de correction adversariale.""" + + @patch("src.control.cpam_response.rule_enabled", return_value=True) + @patch("src.control.cpam_validation.call_ollama") + @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_response.call_anthropic") + @patch("src.control.cpam_response._search_rag_for_control") + def test_correction_triggered_when_score_low(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): + """Score adversarial ≤ 5 → correction relancée (5 appels LLM total).""" + mock_rag.return_value = [] + call_count = {"n": 0} + + def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + # Passe 1 extraction + return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} + elif call_count["n"] == 2: + # Passe 2 argumentation + return { + "analyse_contestation": "Analyse...", + "contre_arguments_medicaux": "Arguments erronés...", + "conclusion": "Conclusion avec erreurs...", + } + elif call_count["n"] == 3: + # Passe 3 validation adversariale → score bas + return {"coherent": False, "erreurs": ["CRP citée à 250 mais vaut 180"], "score_confiance": 3} + elif call_count["n"] == 4: + # Passe 4 correction + return { + "analyse_contestation": "Analyse corrigée...", + "contre_arguments_medicaux": "Arguments corrigés...", + "conclusion": "Conclusion corrigée...", + } + else: + # Passe 5 re-validation + return {"coherent": True, "erreurs": [], "score_confiance": 8} + + mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect + + dossier = _make_dossier() + controle = _make_controle() + text, response_data, sources = generate_cpam_response(dossier, controle) + + # 5 appels Ollama : extraction + argumentation + validation + correction + re-validation + assert call_count["n"] == 5 + # La correction a été acceptée (score 8 > 3) + assert "corrigé" in text.lower() + + @patch("src.control.cpam_response.rule_enabled", return_value=True) + @patch("src.control.cpam_validation.call_ollama") + @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_response.call_anthropic") + @patch("src.control.cpam_response._search_rag_for_control") + def test_no_correction_when_score_high(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): + """Score adversarial > 5 → pas de correction (3 appels LLM).""" + mock_rag.return_value = [] + call_count = {"n": 0} + + def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} + elif call_count["n"] == 2: + return { + "analyse_contestation": "Analyse...", + "contre_arguments_medicaux": "Arguments...", + "conclusion": "Conclusion...", + } + else: + return {"coherent": True, "erreurs": [], "score_confiance": 8} + + mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect + + dossier = _make_dossier() + controle = _make_controle() + text, response_data, sources = generate_cpam_response(dossier, controle) + + # Seulement 3 appels : extraction + argumentation + validation + assert call_count["n"] == 3 + + @patch("src.control.cpam_response.rule_enabled", return_value=True) + @patch("src.control.cpam_validation.call_ollama") + @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_response.call_anthropic") + @patch("src.control.cpam_response._search_rag_for_control") + def test_correction_accepted_when_score_improves(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): + """Score passe de 3 à 7 → correction acceptée.""" + mock_rag.return_value = [] + call_count = {"n": 0} + + def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} + elif call_count["n"] == 2: + return { + "analyse_contestation": "Analyse originale...", + "contre_arguments_medicaux": "Arguments originaux...", + "conclusion": "Conclusion originale...", + } + elif call_count["n"] == 3: + return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 3} + elif call_count["n"] == 4: + return { + "analyse_contestation": "Analyse améliorée...", + "contre_arguments_medicaux": "Arguments améliorés...", + "conclusion": "Conclusion améliorée...", + } + else: + return {"coherent": True, "erreurs": [], "score_confiance": 7} + + mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect + + dossier = _make_dossier() + controle = _make_controle() + text, response_data, sources = generate_cpam_response(dossier, controle) + + # Le résultat final est la correction + assert response_data["conclusion"] == "Conclusion améliorée..." + + @patch("src.control.cpam_response.rule_enabled", return_value=True) + @patch("src.control.cpam_validation.call_ollama") + @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_response.call_anthropic") + @patch("src.control.cpam_response._search_rag_for_control") + def test_correction_rejected_when_score_same(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): + """Score ne s'améliore pas → original conservé.""" + mock_rag.return_value = [] + call_count = {"n": 0} + + def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} + elif call_count["n"] == 2: + return { + "analyse_contestation": "Analyse originale...", + "contre_arguments_medicaux": "Arguments originaux...", + "conclusion": "Conclusion originale...", + } + elif call_count["n"] == 3: + return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 4} + elif call_count["n"] == 4: + return { + "analyse_contestation": "Correction pire...", + "contre_arguments_medicaux": "Arguments pires...", + "conclusion": "Conclusion pire...", + } + else: + return {"coherent": False, "erreurs": ["Encore des erreurs"], "score_confiance": 3} + + mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect + + dossier = _make_dossier() + controle = _make_controle() + text, response_data, sources = generate_cpam_response(dossier, controle) + + # Score correction (3) <= score original (4) → original conservé + assert response_data["conclusion"] == "Conclusion originale..." + + @patch("src.control.cpam_response.rule_enabled", return_value=False) + @patch("src.control.cpam_validation.call_ollama") + @patch("src.control.cpam_response.call_ollama") + @patch("src.control.cpam_response.call_anthropic") + @patch("src.control.cpam_response._search_rag_for_control") + def test_correction_disabled_by_rule(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule): + """RULE-CPAM-CORRECTION-LOOP désactivée → pas de retry.""" + mock_rag.return_value = [] + call_count = {"n": 0} + + def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}} + elif call_count["n"] == 2: + return { + "analyse_contestation": "Analyse...", + "contre_arguments_medicaux": "Arguments...", + "conclusion": "Conclusion...", + } + else: + return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 2} + + mock_ollama.side_effect = ollama_side_effect + mock_val_ollama.side_effect = ollama_side_effect + + dossier = _make_dossier() + controle = _make_controle() + text, response_data, sources = generate_cpam_response(dossier, controle) + + # Seulement 3 appels, pas de correction (règle désactivée) + assert call_count["n"] == 3 + + def test_build_correction_prompt_format(self): + """Le prompt de correction contient les erreurs et la réponse originale.""" + original_prompt = "Prompt d'argumentation original..." + original_response = { + "analyse_contestation": "Analyse avec erreur CRP 250", + "conclusion": "Conclusion erronée", + } + adversarial_result = { + "coherent": False, + "erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"], + "score_confiance": 3, + } + + correction = _build_correction_prompt(original_prompt, original_response, adversarial_result) + + assert "CORRECTION REQUISE" in correction + assert "CRP citée à 250" in correction + assert "Prompt d'argumentation original" in correction + assert "Corrige UNIQUEMENT" in correction