"""Génération de contre-argumentation pour les contrôles CPAM via RAG + Ollama.""" from __future__ import annotations import logging import re from ..config import ControleCPAM, DossierMedical, RAGSource from ..medical.cim10_dict import normalize_code, validate_code from ..medical.cim10_extractor import BIO_NORMALS from ..medical.ollama_client import call_anthropic, call_ollama logger = logging.getLogger(__name__) def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> list[dict]: """Recherche RAG ciblée pour le sujet du désaccord. Effectue 2-5 recherches ciblées au lieu d'une requête fourre-tout : 1. Codes contestés → règles de codage spécifiques 2. Argument CPAM → passages Guide Méthodo contradictoires 3. Contexte clinique (optionnel) → définitions CIM-10 des codes en jeu 4. Définitions CIM-10 des codes contestés 5. Règles explicitement citées dans l'argument CPAM Retourne [] si le RAG est indisponible (index absent, modèle embedding inaccessible, etc.) — la contre-argumentation sera générée sans sources. """ try: from ..medical.rag_search import search_similar_cpam except Exception: logger.warning("Index RAG non disponible pour la contre-argumentation") return [] try: return _search_rag_queries(controle, dossier, search_similar_cpam) except Exception: logger.warning("Erreur RAG pour la contre-argumentation — génération sans sources", exc_info=True) return [] def _search_rag_queries( controle: ControleCPAM, dossier: DossierMedical, search_similar_cpam, ) -> list[dict]: """Exécute les requêtes RAG (séparé pour permettre un try/except global).""" all_results: list[dict] = [] # Requête 1 — Codes contestés (règles de codage) if controle.dp_ucr or controle.da_ucr: query_parts = [] if controle.dp_ucr: query_parts.append(f"règles codage {controle.dp_ucr} diagnostic principal") if controle.da_ucr: query_parts.append(f"diagnostic associé significatif {controle.da_ucr} CMA") query_codes = " ".join(query_parts) results_codes = search_similar_cpam(query_codes, top_k=6) logger.debug(" RAG requête codes : %d résultats", len(results_codes)) all_results.extend(results_codes) # Requête 2 — Argument CPAM (recherche dans le Guide Méthodo) query_parts_arg = [] if controle.titre: query_parts_arg.append(controle.titre) arg_short = controle.arg_ucr[:500] if controle.arg_ucr else "" if arg_short: query_parts_arg.append(arg_short) query_arg = " ".join(query_parts_arg) if query_arg.strip(): results_arg = search_similar_cpam(query_arg, top_k=6) logger.debug(" RAG requête argument : %d résultats", len(results_arg)) all_results.extend(results_arg) # Requête 3 — Contexte clinique (définitions CIM-10 des codes en jeu) if controle.da_ucr and dossier.diagnostic_principal: dp_text = dossier.diagnostic_principal.texte das_texts = [ d.texte for d in dossier.diagnostics_associes if d.cim10_suggestion and controle.da_ucr and d.cim10_suggestion in controle.da_ucr ] if das_texts: query_clinique = f"{dp_text} {' '.join(das_texts)}" results_clinique = search_similar_cpam(query_clinique, top_k=4) logger.debug(" RAG requête clinique : %d résultats", len(results_clinique)) all_results.extend(results_clinique) # Requête 4 — Définitions CIM-10 des codes contestés contested_codes = [] for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr): if field: contested_codes.extend(re.split(r"[,;\s]+", field.strip())) for raw_code in contested_codes: raw_code = raw_code.strip() if not raw_code: continue norm = normalize_code(raw_code) is_valid, label = validate_code(norm) if is_valid and label: query_def = f"CIM-10 {norm} {label} définition inclusion exclusion" else: query_def = f"CIM-10 {norm} définition codage" results_def = search_similar_cpam(query_def, top_k=3) logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def)) all_results.extend(results_def) # Requête 5 — Règles explicitement citées dans l'argument CPAM if controle.arg_ucr: rule_patterns = [ r'(?:R[eè]gle\s*T?\s*\d+)', r'(?:Annexe[\s-]*\d+[A-Za-z]*)', r'(?:Situation de soins?\s+[^.]{5,40})', ] rules_found = [] for pattern in rule_patterns: rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE)) if rules_found: rules_unique = list(dict.fromkeys(rules_found)) query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI" results_rules = search_similar_cpam(query_rules, top_k=4) logger.debug(" RAG requête règles (%s) : %d résultats", ", ".join(rules_unique), len(results_rules)) all_results.extend(results_rules) if not all_results: return [] # Fusion : dédupliquer par (document, code, page), garder le meilleur score seen: dict[tuple, dict] = {} for r in all_results: key = (r.get("document"), r.get("code"), r.get("page")) if key in seen: if r["score"] > seen[key]["score"]: seen[key] = r else: seen[key] = r merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True) return merged[:12] def _get_code_label(code_str: str) -> str: """Résout le libellé CIM-10 pour un ou plusieurs codes.""" codes = re.split(r"[,;\s]+", code_str.strip()) labels = [] for raw in codes: raw = raw.strip() if not raw: continue norm = normalize_code(raw) is_valid, label = validate_code(norm) if is_valid and label: labels.append(f"{norm} — {label}") else: labels.append(norm) if not labels: return "" if len(labels) == 1: parts = labels[0].split(" — ", 1) return f" — {parts[1]}" if len(parts) > 1 else "" return "\n " + "\n ".join(labels) def _get_cim10_definitions( dossier: DossierMedical, controle: ControleCPAM, ) -> str: """Construit une section de définitions CIM-10 déterministes pour tous les codes en jeu. Collecte les codes depuis : - Le dossier : DP (cim10_suggestion) + DAS (cim10_suggestion) - L'UCR : dp_ucr, da_ucr, dr_ucr Returns: Texte formaté pour injection dans le prompt, ou "" si aucun code résolu. """ codes_seen: dict[str, str] = {} # code normalisé → rôle (pour affichage) # Codes du dossier (établissement) if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion: code = dossier.diagnostic_principal.cim10_suggestion codes_seen[normalize_code(code)] = "DP établissement" for das in dossier.diagnostics_associes: if das.cim10_suggestion: norm = normalize_code(das.cim10_suggestion) if norm not in codes_seen: codes_seen[norm] = "DAS établissement" # Codes de l'UCR (CPAM) for field, role in [ (controle.dp_ucr, "DP proposé UCR"), (controle.da_ucr, "DA proposé UCR"), (controle.dr_ucr, "DR proposé UCR"), ]: if not field: continue for raw in re.split(r"[,;\s]+", field.strip()): raw = raw.strip() if not raw: continue norm = normalize_code(raw) if norm not in codes_seen: codes_seen[norm] = role if not codes_seen: return "" # Résoudre les libellés lines = [] for norm_code, role in codes_seen.items(): is_valid, label = validate_code(norm_code) if is_valid and label: lines.append(f" {norm_code} — {label} [{role}]") else: lines.append(f" {norm_code} — (code non trouvé dans le dictionnaire) [{role}]") if not lines: return "" return ( "\nDÉFINITIONS CIM-10 — RÉFÉRENCE (source : dictionnaire officiel) :\n" + "\n".join(lines) ) def _build_tagged_context(dossier: DossierMedical) -> tuple[str, dict[str, str]]: """Construit un contexte clinique avec des tags de référence pour le grounding. Chaque élément clinique reçoit un tag unique ([BIO-1], [IMG-1], [TRT-1], [ACTE-1]) que le LLM doit citer dans ses preuves pour garantir la traçabilité. Returns: (texte tagué pour injection dans le prompt, dict tag → contenu original) """ tag_map: dict[str, str] = {} lines: list[str] = [] # Biologie (avec normes de référence pour éviter les hallucinations) for i, b in enumerate(dossier.biologie_cle, 1): if not b.valeur: continue tag = f"BIO-{i}" # Interpréter la valeur par rapport aux normes connues norm_info = "" if b.test in BIO_NORMALS: lo, hi = BIO_NORMALS[b.test] try: val = float(b.valeur.replace(",", ".").split()[0]) if val > hi: norm_info = f" — ÉLEVÉ (norme {lo}-{hi})" elif val < lo: norm_info = f" — BAS (norme {lo}-{hi})" else: norm_info = f" — NORMAL (norme {lo}-{hi})" except (ValueError, AttributeError): pass content = f"{b.test}: {b.valeur}{norm_info}" tag_map[tag] = content lines.append(f" [{tag}] {content}") # Imagerie for i, im in enumerate(dossier.imagerie, 1): tag = f"IMG-{i}" conclusion = f" — {im.conclusion}" if im.conclusion else "" content = f"{im.type}{conclusion}" tag_map[tag] = content lines.append(f" [{tag}] {content}") # Traitements for i, t in enumerate(dossier.traitements_sortie[:10], 1): tag = f"TRT-{i}" posologie = f" {t.posologie}" if t.posologie else "" content = f"{t.medicament}{posologie}" tag_map[tag] = content lines.append(f" [{tag}] {content}") # Actes CCAM for i, a in enumerate(dossier.actes_ccam, 1): tag = f"ACTE-{i}" code = f" ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else "" content = f"{a.texte}{code}" tag_map[tag] = content lines.append(f" [{tag}] {content}") if not lines: return "", tag_map text = "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS (cite le tag [XX-N] dans tes preuves) :\n" + "\n".join(lines) return text, tag_map def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]: """Vérifie que les références dans preuves_dossier correspondent à des tags existants. Returns: Liste de warnings pour les références inventées. """ if not tag_map: return [] warnings: list[str] = [] preuves = response_data.get("preuves_dossier") if not preuves or not isinstance(preuves, list): return warnings for p in preuves: if not isinstance(p, dict): continue ref = p.get("ref", "") if not ref: continue if ref not in tag_map: valeur = p.get("valeur", "?") warnings.append(f"Preuve [{ref}] non traçable (« {valeur} »)") logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref) return warnings def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]: """Vérifie la cohérence entre les textes DAS et les valeurs biologiques. Détecte les contradictions comme "leucocytose" dans un DAS alors que les leucocytes sont bas, ou "anémie" alors que l'hémoglobine est normale. Returns: Liste de warnings pour les incohérences détectées. """ if not dossier.diagnostics_associes or not dossier.biologie_cle: return [] # Patterns DAS → (test bio attendu, direction attendue) _DAS_BIO_CHECKS: dict[str, tuple[str, str]] = { "leucocytose": ("Leucocytes", "high"), "leucopénie": ("Leucocytes", "low"), "leucopenie": ("Leucocytes", "low"), "thrombocytose": ("Plaquettes", "high"), "thrombocytopénie": ("Plaquettes", "low"), "thrombocytopenie": ("Plaquettes", "low"), "thrombopénie": ("Plaquettes", "low"), "thrombopenie": ("Plaquettes", "low"), "anémie": ("Hémoglobine", "low"), "anemie": ("Hémoglobine", "low"), "polyglobulie": ("Hémoglobine", "high"), "hyperkaliémie": ("Potassium", "high"), "hypokaliémie": ("Potassium", "low"), } # Indexer les valeurs bio disponibles bio_values: dict[str, float] = {} for b in dossier.biologie_cle: if b.test and b.valeur: try: bio_values[b.test] = float(b.valeur.replace(",", ".").split()[0]) except (ValueError, AttributeError): pass warnings: list[str] = [] for das in dossier.diagnostics_associes: texte_lower = (das.texte or "").lower() for pattern, (bio_test, direction) in _DAS_BIO_CHECKS.items(): if pattern not in texte_lower: continue if bio_test not in bio_values or bio_test not in BIO_NORMALS: continue val = bio_values[bio_test] lo, hi = BIO_NORMALS[bio_test] if direction == "high" and val <= hi: warnings.append( f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" ) elif direction == "low" and val >= lo: warnings.append( f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) " f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})" ) if warnings: for w in warnings: logger.warning(" DAS/bio : %s", w) return warnings def _build_cpam_prompt( dossier: DossierMedical, controle: ControleCPAM, sources: list[dict], extraction: dict | None = None, ) -> tuple[str, dict[str, str]]: """Construit le prompt pour la contre-argumentation CPAM. Args: extraction: Résultat optionnel de la passe 1 (extraction structurée). Returns: (prompt texte, tag_map pour validation grounding) """ # Résumé du dossier médical dossier_lines = [] if dossier.diagnostic_principal: dp = dossier.diagnostic_principal dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else "" dossier_lines.append(f"- DP : {dp.texte}{dp_code}") elif controle.dp_ucr: dp_label = _get_code_label(controle.dp_ucr) dossier_lines.append( f"- DP : code {controle.dp_ucr}{dp_label} " f"(codé par l'établissement, contesté par la CPAM)" ) if dossier.diagnostics_associes: das_parts = [] for das in dossier.diagnostics_associes: code = f" ({das.cim10_suggestion})" if das.cim10_suggestion else "" das_parts.append(f"{das.texte}{code}") dossier_lines.append(f"- DAS : {', '.join(das_parts)}") if dossier.actes_ccam: actes = [f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte for a in dossier.actes_ccam] dossier_lines.append(f"- Actes CCAM : {', '.join(actes)}") sejour = dossier.sejour if sejour.duree_sejour is not None: dossier_lines.append(f"- Durée séjour : {sejour.duree_sejour} jours") if sejour.sexe or sejour.age is not None: patient_info = [] if sejour.sexe: patient_info.append(sejour.sexe) if sejour.age is not None: patient_info.append(f"{sejour.age} ans") if sejour.age < 18: patient_info.append("(PÉDIATRIE — codage pédiatrique applicable)") elif sejour.age >= 80: patient_info.append("(patient âgé — comorbidités fréquentes)") dossier_lines.append(f"- Patient : {', '.join(patient_info)}") if sejour.mode_entree: mode_label = sejour.mode_entree if "urgence" in mode_label.lower() or "urgent" in mode_label.lower(): dossier_lines.append(f"- Mode d'entrée : {mode_label} (ADMISSION EN URGENCE)") else: dossier_lines.append(f"- Mode d'entrée : {mode_label}") if sejour.mode_sortie: dossier_lines.append(f"- Mode de sortie : {sejour.mode_sortie}") if sejour.imc is not None: dossier_lines.append(f"- IMC : {sejour.imc}") if dossier.biologie_cle: bio = [f"{b.test}: {b.valeur}" for b in dossier.biologie_cle[:5] if b.valeur] if bio: dossier_lines.append(f"- Biologie clé : {', '.join(bio)}") if dossier.imagerie: img_parts = [] for im in dossier.imagerie: conclusion = f" — {im.conclusion}" if im.conclusion else "" img_parts.append(f"{im.type}{conclusion}") dossier_lines.append(f"- Imagerie : {', '.join(img_parts)}") if dossier.traitements_sortie: trt_parts = [] for t in dossier.traitements_sortie[:10]: posologie = f" {t.posologie}" if t.posologie else "" trt_parts.append(f"{t.medicament}{posologie}") dossier_lines.append(f"- Traitements de sortie : {', '.join(trt_parts)}") if dossier.antecedents: dossier_lines.append(f"- Antécédents : {', '.join(a.texte for a in dossier.antecedents[:10])}") if dossier.complications: dossier_lines.append(f"- Complications : {', '.join(c.texte for c in dossier.complications)}") dossier_str = "\n".join(dossier_lines) if dossier_lines else "Non disponible" # Section asymétrie : éléments que la CPAM n'avait pas asymetrie_lines = [] if dossier.biologie_cle: bio_details = [] for b in dossier.biologie_cle if len(dossier.biologie_cle) <= 10 else dossier.biologie_cle[:10]: anomalie = " (anormale)" if b.anomalie else "" if b.valeur: bio_details.append(f"{b.test}: {b.valeur}{anomalie}") if bio_details: asymetrie_lines.append(f"- Biologie : {', '.join(bio_details)}") if dossier.imagerie: img_details = [] for im in dossier.imagerie: conclusion = f" — {im.conclusion}" if im.conclusion else "" img_details.append(f"{im.type}{conclusion}") if img_details: asymetrie_lines.append(f"- Imagerie : {', '.join(img_details)}") if dossier.traitements_sortie: trt_details = [] for t in dossier.traitements_sortie[:10]: posologie = f" {t.posologie}" if t.posologie else "" trt_details.append(f"{t.medicament}{posologie}") if trt_details: asymetrie_lines.append(f"- Traitements : {', '.join(trt_details)}") if dossier.actes_ccam: actes_details = [ f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte for a in dossier.actes_ccam ] if actes_details: asymetrie_lines.append(f"- Actes CCAM : {', '.join(actes_details)}") asymetrie_str = "" if asymetrie_lines: asymetrie_str = ( "\n\nÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM " "(l'UCR n'a eu que le CRH et les codes) :\n" + "\n".join(asymetrie_lines) ) # Codes contestés par la CPAM (avec libellés CIM-10 résolus) codes_contestes = [] if controle.dp_ucr: codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}") if controle.da_ucr: codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}") if controle.dr_ucr: codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}") if controle.actes_ucr: codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}") codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé" # Définitions CIM-10 déterministes (tous les codes en jeu) definitions_str = _get_cim10_definitions(dossier, controle) # Contexte clinique tagué pour le grounding tagged_context, tag_map = _build_tagged_context(dossier) if tagged_context: tagged_str = f"\n\n{tagged_context}" else: tagged_str = ( "\n\nATTENTION — DOSSIER PAUVRE EN ÉLÉMENTS CLINIQUES :\n" "Aucune biologie, imagerie, traitement ou acte CCAM disponible.\n" "Ne spécule PAS sur des éléments absents. Signale explicitement " "le manque de données au lieu d'inventer des preuves." ) # Vérification cohérence DAS / biologie das_bio_warnings = _check_das_bio_coherence(dossier) if das_bio_warnings: tagged_str += ( "\n\nALERTES COHÉRENCE DAS / BIOLOGIE (incohérences détectées dans le dossier) :\n" + "\n".join(f" - {w}" for w in das_bio_warnings) + "\n Prends en compte ces incohérences dans ton analyse." ) # Sources RAG sources_text = "" for i, src in enumerate(sources, 1): doc_name = { "cim10": "CIM-10 FR 2026", "cim10_alpha": "CIM-10 Index Alphabétique 2026", "guide_methodo": "Guide Méthodologique MCO 2026", "ccam": "CCAM PMSI V4 2025", }.get(src.get("document", ""), src.get("document", "")) code_info = f" (code: {src['code']})" if src.get("code") else "" page_info = f" [page {src['page']}]" if src.get("page") else "" sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n" sources_text += (src.get("extrait", "")[:800]) + "\n\n" # Section pré-analyse (résultat passe 1, si disponible) extraction_str = "" if extraction: ext_lines = [] comp = extraction.get("comprehension_contestation") if comp: ext_lines.append(f"Compréhension : {comp}") elems = extraction.get("elements_cliniques_pertinents", []) if elems and isinstance(elems, list): elem_strs = [] for e in elems: if isinstance(e, dict): elem_strs.append(f" - [{e.get('tag', '?')}] {e.get('pertinence', '')}") if elem_strs: ext_lines.append("Éléments pertinents :\n" + "\n".join(elem_strs)) accords = extraction.get("points_accord_potentiels", []) if accords and isinstance(accords, list): ext_lines.append("Points d'accord potentiels : " + " ; ".join(str(a) for a in accords)) codes = extraction.get("codes_en_jeu", {}) if codes and isinstance(codes, dict): diff = codes.get("difference_cle", "") if diff: ext_lines.append(f"Différence clé entre les codages : {diff}") if ext_lines: extraction_str = ( "\nPRÉ-ANALYSE (extraction automatique — à utiliser comme base) :\n" + "\n".join(ext_lines) ) prompt = f"""Tu es un médecin DIM (Département d'Information Médicale) expert en contentieux T2A. Tu dois produire une analyse ÉQUILIBRÉE ET CRÉDIBLE de la contestation CPAM, puis contre-argumenter en mobilisant trois axes : médical, asymétrie d'information, et réglementaire. IMPORTANT — CRÉDIBILITÉ DE L'ANALYSE : Une contre-argumentation crédible reconnaît TOUJOURS au moins un point valide dans le raisonnement adverse. Répondre "Aucun point d'accord" décrédibilise l'ensemble de l'argumentation. Tu DOIS identifier au moins un élément où la CPAM a un point légitime (même partiel), puis expliquer pourquoi cela ne suffit pas à invalider le codage. IMPORTANT — CODES CIM-10 : Ne parle JAMAIS de « codage initial » ou « codage contesté » sans citer explicitement le code CIM-10 et son libellé (ex: Z45.80 — Ajustement et entretien d'un dispositif implantable). Chaque argument doit désigner précisément quel code est défendu ou contesté, avec son libellé complet. DOSSIER MÉDICAL DE L'ÉTABLISSEMENT : {dossier_str} {asymetrie_str} {tagged_str} OBJET DU DÉSACCORD : {controle.titre} ARGUMENTATION DE LA CPAM (UCR) : {controle.arg_ucr} DÉCISION UCR : {controle.decision_ucr} CODES CONTESTÉS : {codes_str} {definitions_str} SOURCES RÉGLEMENTAIRES (Guide méthodologique, CIM-10) : {sources_text} {extraction_str} CONSIGNES : CONTEXTE CLINIQUE : - Prends en compte l'ÂGE du patient (pédiatrie < 18 ans, personne âgée >= 80 ans), le MODE D'ENTRÉE (urgence vs programmé), et la DURÉE DE SÉJOUR pour contextualiser ton analyse - En pédiatrie, les normes biologiques et les codages peuvent différer de l'adulte - Une admission en urgence implique un contexte clinique aigu qui influence le choix du DP ÉTAPE 1 — ANALYSE HONNÊTE (avant de contre-argumenter) : - Identifie ce que la CPAM a compris correctement dans le dossier - Reconnais les points où leur raisonnement est fondé, même partiellement - Explique ENSUITE pourquoi ces points ne justifient pas leur conclusion AXE MÉDICAL : - Analyse le bien-fondé médical du codage de l'établissement - CITE les éléments cliniques EXACTS du dossier en utilisant les tags [XX-N] fournis (ex: [BIO-1] CRP 180 mg/L) - Confronte l'argumentation CPAM aux sources CIM-10 et Guide Méthodologique fournies - Ne mentionne AUCUN élément qui ne figure pas dans les éléments référencés ci-dessus AXE ASYMÉTRIE D'INFORMATION : - La CPAM a fondé son analyse uniquement sur le CRH et les codes transmis - Pour CHAQUE élément clinique pertinent, cite les VALEURS EXACTES et explique leur signification clinique - Démontre en quoi ces éléments complémentaires (biologie, imagerie, traitements, actes) justifient le codage contesté - Ne mentionne AUCUN élément qui n'est pas dans le dossier fourni MISE EN FORME : - Structure chaque section avec des tirets pour lister les arguments distincts - Un argument par puce, avec la preuve ou la référence associée AXE RÉGLEMENTAIRE : - Identifie si l'UCR fait une interprétation restrictive non fondée d'une règle - Confronte le raisonnement CPAM au texte EXACT des sources fournies - Format OBLIGATOIRE pour chaque référence : [Document - page N] suivi d'une CITATION VERBATIM du passage pertinent - INTERDICTION ABSOLUE de citer une référence qui ne figure pas dans les sources fournies ci-dessus - Si aucune source pertinente n'est disponible → écrire explicitement "Pas de source réglementaire disponible" - Relève les contradictions entre l'argumentation CPAM et les règles officielles Réponds UNIQUEMENT avec un objet JSON au format suivant : {{ "analyse_contestation": "Résumé de ce que conteste la CPAM et sur quelle base", "points_accord": "Points CONCRETS où la CPAM a raison ou partiellement raison (JAMAIS 'Aucun' — il y a toujours au moins un point légitime à reconnaître)", "contre_arguments_medicaux": "Argumentation médicale en faveur du codage, en expliquant pourquoi les points d'accord ne suffisent pas à invalider le codage", "preuves_dossier": [ {{"ref": "BIO-1", "element": "biologie|imagerie|traitement|acte|clinique", "valeur": "valeur exacte du dossier", "signification": "explication clinique"}} ], "contre_arguments_asymetrie": "Éléments cliniques que la CPAM n'avait pas et qui justifient le codage", "contre_arguments_reglementaires": "Erreurs d'interprétation réglementaire de la CPAM, avec citations verbatim des sources", "references": [ {{"document": "nom du document source", "page": "numéro de page", "citation": "citation verbatim du passage"}} ], "conclusion": "Synthèse en citant EXPLICITEMENT les codes CIM-10 défendus (ex: DP Z45.80 — libellé) : points reconnus à la CPAM, puis pourquoi ce codage précis est néanmoins justifié" }}""" return prompt, tag_map def _validate_references(parsed: dict, sources: list[dict]) -> list[str]: """Vérifie que les références citées correspondent aux sources RAG fournies. Returns: Liste d'avertissements pour les références non vérifiables. """ warnings = [] refs = parsed.get("references") if not refs or not isinstance(refs, list): return warnings # Construire un set des documents sources disponibles source_docs = set() for src in sources: doc_name = src.get("document", "") source_docs.add(doc_name) # Ajouter les noms lisibles aussi readable = { "cim10": "CIM-10 FR 2026", "cim10_alpha": "CIM-10 Index Alphabétique 2026", "guide_methodo": "Guide Méthodologique MCO 2026", "ccam": "CCAM PMSI V4 2025", }.get(doc_name, "") if readable: source_docs.add(readable) source_docs.add(readable.lower()) if not source_docs: return warnings for ref in refs: if not isinstance(ref, dict): continue doc = ref.get("document", "") if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd): warnings.append(f"Référence non vérifiable : {doc}") logger.warning("CPAM : référence non vérifiable « %s »", doc) return warnings def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str: """Formate la réponse LLM en texte lisible.""" sections = [] analyse = parsed.get("analyse_contestation") if analyse: sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}") accord = parsed.get("points_accord") if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""): sections.append(f"POINTS D'ACCORD\n{accord}") # Nouveaux champs structurés par axe contre_med = parsed.get("contre_arguments_medicaux") if contre_med: sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}") # Preuves du dossier (nouveau champ structuré) preuves = parsed.get("preuves_dossier") if preuves and isinstance(preuves, list): preuves_lines = [] for p in preuves: if isinstance(p, dict): ref = p.get("ref", "") elem = p.get("element", "") valeur = p.get("valeur", "") signif = p.get("signification", "") ref_prefix = f"[{ref}] " if ref else "" preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur} → {signif}") if preuves_lines: sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines)) contre_asym = parsed.get("contre_arguments_asymetrie") if contre_asym: sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}") contre_regl = parsed.get("contre_arguments_reglementaires") if contre_regl: sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}") # Fallback : ancien champ unique (réponses en cache existantes) if not contre_med and not contre_asym and not contre_regl: contre = parsed.get("contre_arguments") if contre: sections.append(f"CONTRE-ARGUMENTS\n{contre}") # Références structurées (nouveau format liste) ou ancien format string refs = parsed.get("references") if refs: if isinstance(refs, list): ref_lines = [] for r in refs: if isinstance(r, dict): doc = r.get("document", "") page = r.get("page", "") citation = r.get("citation", "") ref_lines.append(f"- [{doc}, p.{page}] {citation}") else: ref_lines.append(f"- {r}") if ref_lines: sections.append(f"REFERENCES\n" + "\n".join(ref_lines)) else: sections.append(f"REFERENCES\n{refs}") conclusion = parsed.get("conclusion") if conclusion: sections.append(f"CONCLUSION\n{conclusion}") # Avertissements sur les références non vérifiables if ref_warnings: warning_text = "\n".join(f"- {w}" for w in ref_warnings) sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}") return "\n\n".join(sections) def _validate_adversarial( response_data: dict, tag_map: dict[str, str], controle: ControleCPAM, ) -> dict | None: """Validation adversariale — vérifie la cohérence de la contre-argumentation. Un appel LLM de relecture critique vérifie : 1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier 2. La conclusion est cohérente avec l'argumentation 3. Les points d'accord ne contredisent pas la contre-argumentation 4. Les codes CIM-10 cités sont cohérents Returns: dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec. """ import json as _json # Construire le résumé des éléments factuels disponibles if tag_map: factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items()) factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}" else: factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible" # Sérialiser la réponse LLM de façon compacte try: response_json = _json.dumps(response_data, ensure_ascii=False, indent=None) # Tronquer si trop long pour le prompt de validation if len(response_json) > 3000: response_json = response_json[:3000] + "..." except (TypeError, ValueError): logger.warning("Validation adversariale : impossible de sérialiser la réponse") return None # Normes biologiques pour vérifier les interprétations normes_lines = [] for test, (lo, hi) in BIO_NORMALS.items(): normes_lines.append(f" {test}: {lo}-{hi}") normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines) prompt = f"""Tu es un relecteur critique. Vérifie la cohérence de cette contre-argumentation CPAM. RÉPONSE GÉNÉRÉE : {response_json} {factual_section} {normes_section} CODES CONTESTÉS : {f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else ""} {f"DA UCR : {controle.da_ucr}" if controle.da_ucr else ""} Vérifie STRICTEMENT : 1. Chaque valeur bio/imagerie/traitement citée dans les preuves existe dans les éléments factuels 2. Si une valeur bio est qualifiée de "élevée", "basse" ou "anormale", vérifie qu'elle est RÉELLEMENT hors normes selon les normes ci-dessus (ex: CRP 5 = NORMAL, pas élevé) 3. La conclusion est cohérente avec l'argumentation développée 4. Les points d'accord ne contredisent pas les contre-arguments 5. Les codes CIM-10 mentionnés dans la conclusion sont cohérents avec le reste Réponds UNIQUEMENT en JSON : {{ "coherent": true ou false, "erreurs": ["description précise de chaque incohérence trouvée"], "score_confiance": 0 à 10 }}""" logger.debug(" Validation adversariale") result = call_ollama(prompt, temperature=0.0, max_tokens=800) if result is None: result = call_anthropic(prompt, temperature=0.0, max_tokens=800) if result is None: logger.warning(" Validation adversariale échouée — LLM indisponible") return None coherent = result.get("coherent", True) erreurs = result.get("erreurs", []) score = result.get("score_confiance", -1) if not coherent and erreurs: logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)", len(erreurs), score) for e in erreurs: logger.warning(" - %s", e) else: logger.info(" Validation adversariale OK (score %s/10)", score) return result def _extraction_pass( dossier: DossierMedical, controle: ControleCPAM, ) -> dict | None: """Passe 1 — Extraction structurée du contexte avant argumentation. Prompt court centré sur la compréhension de la contestation et l'extraction des éléments cliniques pertinents. Pas de rédaction argumentative. Returns: dict structuré ou None si le LLM échoue. """ # Résumé dossier compact dp_str = "" if dossier.diagnostic_principal: dp = dossier.diagnostic_principal code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else "" dp_str = f"{dp.texte}{code}" elif controle.dp_ucr: dp_str = f"code {controle.dp_ucr} (codé par l'établissement)" das_str = ", ".join( f"{d.texte} ({d.cim10_suggestion})" if d.cim10_suggestion else d.texte for d in dossier.diagnostics_associes ) # Contexte tagué (réutilise la même fonction) tagged_text, _ = _build_tagged_context(dossier) prompt = f"""Tu es un médecin DIM expert. Analyse cette contestation CPAM sans argumenter. DOSSIER : - DP : {dp_str or "Non extrait"} - DAS : {das_str or "Aucun"} {tagged_text} CONTESTATION CPAM : Titre : {controle.titre} Argument : {controle.arg_ucr} Décision : {controle.decision_ucr} {f"DP proposé UCR : {controle.dp_ucr}" if controle.dp_ucr else ""} {f"DA proposés UCR : {controle.da_ucr}" if controle.da_ucr else ""} Réponds UNIQUEMENT en JSON : {{ "comprehension_contestation": "Résumé factuel : que conteste la CPAM et pourquoi", "elements_cliniques_pertinents": [ {{"tag": "BIO-1 ou texte libre", "pertinence": "en quoi cet élément est pertinent pour le codage contesté"}} ], "points_accord_potentiels": ["points où la CPAM a partiellement raison"], "codes_en_jeu": {{ "dp_etablissement": "code + libellé", "dp_ucr": "code + libellé si proposé", "difference_cle": "explication de la différence entre les deux codages" }} }}""" logger.debug(" Passe 1 — extraction structurée") result = call_ollama(prompt, temperature=0.0, max_tokens=1500) if result is None: result = call_anthropic(prompt, temperature=0.0, max_tokens=1500) if result is not None: logger.info(" Passe 1 OK : %d éléments cliniques extraits", len(result.get("elements_cliniques_pertinents", []))) else: logger.warning(" Passe 1 échouée — fallback single-pass") return result def generate_cpam_response( dossier: DossierMedical, controle: ControleCPAM, ) -> tuple[str, dict | None, list[RAGSource]]: """Génère une contre-argumentation pour un contrôle CPAM. Args: dossier: Le dossier médical analysé. controle: Le contrôle CPAM à contester. Returns: Tuple (texte de contre-argumentation, dict LLM structuré ou None, sources RAG utilisées). """ logger.info("CPAM : génération contre-argumentation pour OGC %d — %s", controle.numero_ogc, controle.titre) # 1. Passe 1 — Extraction structurée (compréhension avant argumentation) extraction = _extraction_pass(dossier, controle) # 2. Recherche RAG ciblée sources = _search_rag_for_control(controle, dossier) logger.info(" RAG : %d sources trouvées", len(sources)) # 3. Construction du prompt (passe 2 — argumentation) prompt, tag_map = _build_cpam_prompt(dossier, controle, sources, extraction) # 4. Appel LLM — Ollama (modèle par défaut) > Haiku fallback result = call_ollama(prompt, temperature=0.1, max_tokens=4000) if result is not None: logger.info(" Contre-argumentation via Ollama") else: logger.info(" Ollama indisponible → fallback Anthropic Haiku") result = call_anthropic(prompt, temperature=0.1, max_tokens=4000) if result is not None: logger.info(" Contre-argumentation via Anthropic Haiku") # 5. Conversion des sources RAG rag_sources = [ RAGSource( document=s.get("document", ""), page=s.get("page"), code=s.get("code"), extrait=s.get("extrait", "")[:200], ) for s in sources ] if result is None: logger.warning(" LLM non disponible — contre-argumentation non générée") return "", None, rag_sources # 6. Validation des références RAG ref_warnings = _validate_references(result, sources) if ref_warnings: logger.warning(" CPAM : %d référence(s) non vérifiable(s)", len(ref_warnings)) # 7. Validation grounding (preuves traçables vers le dossier) grounding_warnings = _validate_grounding(result, tag_map) if grounding_warnings: logger.warning(" CPAM : %d preuve(s) non traçable(s)", len(grounding_warnings)) # 8. Validation adversariale (cohérence factuelle) adversarial_warnings: list[str] = [] validation = _validate_adversarial(result, tag_map, controle) if validation and not validation.get("coherent", True): erreurs = validation.get("erreurs", []) score = validation.get("score_confiance", "?") for e in erreurs: if isinstance(e, str) and e.strip(): adversarial_warnings.append(f"Incohérence détectée : {e}") if adversarial_warnings: adversarial_warnings.append(f"Score de confiance : {score}/10") all_warnings = ref_warnings + grounding_warnings + adversarial_warnings # 9. Formater la réponse text = _format_response(result, all_warnings) logger.info(" Contre-argumentation générée (%d caractères)", len(text)) return text, result, rag_sources