refactor: split cpam_response → cpam_rag, cpam_context, cpam_validation

Découpe le monolithe cpam_response.py (1207L) en 3 modules spécialisés :
- cpam_rag.py : recherche RAG ciblée (5 requêtes, dédup)
- cpam_context.py : construction prompt, définitions CIM-10, bio summary
- cpam_validation.py : grounding, références, codes fermée, adversariale

Le cpam_response.py reste orchestrateur (~230L) avec re-exports
backward-compat. Mocks des tests mis à jour pour cibler les bons modules.
Ajout RULE-CPAM-CORRECTION-LOOP dans base.yaml. 748 tests passent.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
dom
2026-02-20 10:06:26 +01:00
parent e760b12961
commit 3c070f3c1d
6 changed files with 1553 additions and 833 deletions

View File

@@ -54,6 +54,9 @@ packs:
RULE-DAS-TO-DP:
enabled: true
description: "DAS promu en DP si aucun DP extrait — sélection par pertinence/confiance/spécificité"
RULE-CPAM-CORRECTION-LOOP:
enabled: true
description: "Boucle de correction quand validation adversariale score ≤ 5/10"
bio_electrolytes:
enabled: true

532
src/control/cpam_context.py Normal file
View File

@@ -0,0 +1,532 @@
"""Construction du contexte et du prompt pour la contre-argumentation CPAM."""
from __future__ import annotations
import logging
import re
from ..config import ControleCPAM, DossierMedical
from ..medical.bio_normals import BIO_NORMALS
from ..medical.cim10_dict import normalize_code, validate_code
from ..prompts import CPAM_ARGUMENTATION
logger = logging.getLogger(__name__)
def _get_code_label(code_str: str) -> str:
"""Résout le libellé CIM-10 pour un ou plusieurs codes."""
codes = re.split(r"[,;\s]+", code_str.strip())
labels = []
for raw in codes:
raw = raw.strip()
if not raw:
continue
norm = normalize_code(raw)
is_valid, label = validate_code(norm)
if is_valid and label:
labels.append(f"{norm}{label}")
else:
labels.append(norm)
if not labels:
return ""
if len(labels) == 1:
parts = labels[0].split("", 1)
return f"{parts[1]}" if len(parts) > 1 else ""
return "\n " + "\n ".join(labels)
def _get_cim10_definitions(
dossier: DossierMedical,
controle: ControleCPAM,
) -> str:
"""Construit une section de définitions CIM-10 déterministes pour tous les codes en jeu.
Collecte les codes depuis :
- Le dossier : DP (cim10_suggestion) + DAS (cim10_suggestion)
- L'UCR : dp_ucr, da_ucr, dr_ucr
Returns:
Texte formaté pour injection dans le prompt, ou "" si aucun code résolu.
"""
codes_seen: dict[str, str] = {} # code normalisé → rôle (pour affichage)
# Codes du dossier (établissement)
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
code = dossier.diagnostic_principal.cim10_suggestion
codes_seen[normalize_code(code)] = "DP établissement"
for das in dossier.diagnostics_associes:
if das.cim10_suggestion:
norm = normalize_code(das.cim10_suggestion)
if norm not in codes_seen:
codes_seen[norm] = "DAS établissement"
# Codes de l'UCR (CPAM)
for field, role in [
(controle.dp_ucr, "DP proposé UCR"),
(controle.da_ucr, "DA proposé UCR"),
(controle.dr_ucr, "DR proposé UCR"),
]:
if not field:
continue
for raw in re.split(r"[,;\s]+", field.strip()):
raw = raw.strip()
if not raw:
continue
norm = normalize_code(raw)
if norm not in codes_seen:
codes_seen[norm] = role
if not codes_seen:
return ""
# Résoudre les libellés
lines = []
for norm_code, role in codes_seen.items():
is_valid, label = validate_code(norm_code)
if is_valid and label:
lines.append(f" {norm_code}{label} [{role}]")
else:
lines.append(f" {norm_code} — (code non trouvé dans le dictionnaire) [{role}]")
if not lines:
return ""
return (
"\nDÉFINITIONS CIM-10 — RÉFÉRENCE (source : dictionnaire officiel) :\n"
+ "\n".join(lines)
)
def _build_tagged_context(dossier: DossierMedical) -> tuple[str, dict[str, str]]:
"""Construit un contexte clinique avec des tags de référence pour le grounding.
Chaque élément clinique reçoit un tag unique ([BIO-1], [IMG-1], [TRT-1], [ACTE-1])
que le LLM doit citer dans ses preuves pour garantir la traçabilité.
Returns:
(texte tagué pour injection dans le prompt, dict tag → contenu original)
"""
tag_map: dict[str, str] = {}
lines: list[str] = []
# Biologie (avec normes de référence pour éviter les hallucinations)
for i, b in enumerate(dossier.biologie_cle, 1):
if not b.valeur:
continue
tag = f"BIO-{i}"
# Interpréter la valeur par rapport aux normes connues
norm_info = ""
if b.test in BIO_NORMALS:
lo, hi = BIO_NORMALS[b.test]
try:
val = float(b.valeur.replace(",", ".").split()[0])
if val > hi:
norm_info = f" — ÉLEVÉ (norme {lo}-{hi})"
elif val < lo:
norm_info = f" — BAS (norme {lo}-{hi})"
else:
norm_info = f" — NORMAL (norme {lo}-{hi})"
except (ValueError, AttributeError):
pass
content = f"{b.test}: {b.valeur}{norm_info}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Imagerie
for i, im in enumerate(dossier.imagerie, 1):
tag = f"IMG-{i}"
conclusion = f"{im.conclusion}" if im.conclusion else ""
content = f"{im.type}{conclusion}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Traitements
for i, t in enumerate(dossier.traitements_sortie[:10], 1):
tag = f"TRT-{i}"
posologie = f" {t.posologie}" if t.posologie else ""
content = f"{t.medicament}{posologie}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Actes CCAM
for i, a in enumerate(dossier.actes_ccam, 1):
tag = f"ACTE-{i}"
code = f" ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else ""
content = f"{a.texte}{code}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
if not lines:
return "", tag_map
text = "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS (cite le tag [XX-N] dans tes preuves) :\n" + "\n".join(lines)
return text, tag_map
# Interprétations cliniques pour le résumé bio déterministe
_BIO_INTERPRETATION: dict[str, dict[str, str]] = {
"CRP": {"high": "infection/inflammation active", "low": "normal", "normal": "pas d'inflammation"},
"Hémoglobine": {"high": "polyglobulie", "low": "anémie", "normal": "pas d'anémie"},
"Plaquettes": {"high": "thrombocytose", "low": "thrombopénie", "normal": "numération normale"},
"Leucocytes": {"high": "hyperleucocytose", "low": "leucopénie", "normal": "numération normale"},
"Créatinine": {"high": "insuffisance rénale", "low": "normal", "normal": "fonction rénale conservée"},
"Potassium": {"high": "hyperkaliémie", "low": "hypokaliémie", "normal": "kaliémie normale"},
"Sodium": {"high": "hypernatrémie", "low": "hyponatrémie", "normal": "natrémie normale"},
"Lipasémie": {"high": "pancréatite probable", "low": "normal", "normal": "pas de pancréatite"},
"ASAT": {"high": "cytolyse hépatique", "low": "normal", "normal": "pas de cytolyse"},
"ALAT": {"high": "cytolyse hépatique", "low": "normal", "normal": "pas de cytolyse"},
"GGT": {"high": "cholestase/atteinte hépatique", "low": "normal", "normal": "pas de cholestase"},
"Bilirubine totale": {"high": "ictère/cholestase", "low": "normal", "normal": "pas d'ictère"},
}
def _build_bio_summary(dossier: DossierMedical) -> str:
"""Construit un résumé biologique déterministe à injecter dans le prompt.
Chaque valeur bio est interprétée contre BIO_NORMALS avec une conclusion
non ambiguë que le LLM ne doit pas modifier.
Returns:
Texte formaté ou "" si aucune biologie exploitable.
"""
if not dossier.biologie_cle:
return ""
lines: list[str] = []
for b in dossier.biologie_cle:
if not b.valeur or b.test not in BIO_NORMALS:
continue
try:
val = float(b.valeur.replace(",", ".").split()[0])
except (ValueError, AttributeError):
continue
lo, hi = BIO_NORMALS[b.test]
if val > hi:
status = "ÉLEVÉ"
interp_key = "high"
elif val < lo:
status = "BAS"
interp_key = "low"
else:
status = "NORMAL"
interp_key = "normal"
interp = _BIO_INTERPRETATION.get(b.test, {}).get(interp_key, "")
interp_str = f"{interp}" if interp else ""
lines.append(f"{b.test} = {b.valeur}{status} (norme {lo}-{hi}){interp_str}")
if not lines:
return ""
return (
"FAITS BIOLOGIQUES VÉRIFIÉS (NE PAS MODIFIER ces interprétations) :\n"
+ "\n".join(lines)
+ "\n\nRÈGLE STRICTE : si tu cites une valeur biologique, tu DOIS utiliser "
"l'interprétation ci-dessus.\n"
"Ne qualifie JAMAIS une valeur NORMAL comme pathologique, "
"ni une valeur ÉLEVÉ/BAS comme normale."
)
def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]:
"""Vérifie la cohérence entre les textes DAS et les valeurs biologiques.
Détecte les contradictions comme "leucocytose" dans un DAS alors que
les leucocytes sont bas, ou "anémie" alors que l'hémoglobine est normale.
Returns:
Liste de warnings pour les incohérences détectées.
"""
if not dossier.diagnostics_associes or not dossier.biologie_cle:
return []
# Patterns DAS → (test bio attendu, direction attendue)
_DAS_BIO_CHECKS: dict[str, tuple[str, str]] = {
"leucocytose": ("Leucocytes", "high"),
"leucopénie": ("Leucocytes", "low"),
"leucopenie": ("Leucocytes", "low"),
"thrombocytose": ("Plaquettes", "high"),
"thrombocytopénie": ("Plaquettes", "low"),
"thrombocytopenie": ("Plaquettes", "low"),
"thrombopénie": ("Plaquettes", "low"),
"thrombopenie": ("Plaquettes", "low"),
"anémie": ("Hémoglobine", "low"),
"anemie": ("Hémoglobine", "low"),
"polyglobulie": ("Hémoglobine", "high"),
"hyperkaliémie": ("Potassium", "high"),
"hypokaliémie": ("Potassium", "low"),
}
# Indexer les valeurs bio disponibles
bio_values: dict[str, float] = {}
for b in dossier.biologie_cle:
if b.test and b.valeur:
try:
bio_values[b.test] = float(b.valeur.replace(",", ".").split()[0])
except (ValueError, AttributeError):
pass
warnings: list[str] = []
for das in dossier.diagnostics_associes:
texte_lower = (das.texte or "").lower()
for pattern, (bio_test, direction) in _DAS_BIO_CHECKS.items():
if pattern not in texte_lower:
continue
if bio_test not in bio_values or bio_test not in BIO_NORMALS:
continue
val = bio_values[bio_test]
lo, hi = BIO_NORMALS[bio_test]
if direction == "high" and val <= hi:
warnings.append(
f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) "
f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})"
)
elif direction == "low" and val >= lo:
warnings.append(
f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) "
f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})"
)
if warnings:
for w in warnings:
logger.warning(" DAS/bio : %s", w)
return warnings
def _build_cpam_prompt(
dossier: DossierMedical,
controle: ControleCPAM,
sources: list[dict],
extraction: dict | None = None,
) -> tuple[str, dict[str, str]]:
"""Construit le prompt pour la contre-argumentation CPAM.
Args:
extraction: Résultat optionnel de la passe 1 (extraction structurée).
Returns:
(prompt texte, tag_map pour validation grounding)
"""
# Résumé du dossier médical
dossier_lines = []
if dossier.diagnostic_principal:
dp = dossier.diagnostic_principal
dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else ""
dossier_lines.append(f"- DP : {dp.texte}{dp_code}")
elif controle.dp_ucr:
dp_label = _get_code_label(controle.dp_ucr)
dossier_lines.append(
f"- DP : code {controle.dp_ucr}{dp_label} "
f"(codé par l'établissement, contesté par la CPAM)"
)
if dossier.diagnostics_associes:
das_parts = []
for das in dossier.diagnostics_associes:
code = f" ({das.cim10_suggestion})" if das.cim10_suggestion else ""
das_parts.append(f"{das.texte}{code}")
dossier_lines.append(f"- DAS : {', '.join(das_parts)}")
if dossier.actes_ccam:
actes = [f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte
for a in dossier.actes_ccam]
dossier_lines.append(f"- Actes CCAM : {', '.join(actes)}")
sejour = dossier.sejour
if sejour.duree_sejour is not None:
dossier_lines.append(f"- Durée séjour : {sejour.duree_sejour} jours")
if sejour.sexe or sejour.age is not None:
patient_info = []
if sejour.sexe:
patient_info.append(sejour.sexe)
if sejour.age is not None:
patient_info.append(f"{sejour.age} ans")
if sejour.age < 18:
patient_info.append("(PÉDIATRIE — codage pédiatrique applicable)")
elif sejour.age >= 80:
patient_info.append("(patient âgé — comorbidités fréquentes)")
dossier_lines.append(f"- Patient : {', '.join(patient_info)}")
if sejour.mode_entree:
mode_label = sejour.mode_entree
if "urgence" in mode_label.lower() or "urgent" in mode_label.lower():
dossier_lines.append(f"- Mode d'entrée : {mode_label} (ADMISSION EN URGENCE)")
else:
dossier_lines.append(f"- Mode d'entrée : {mode_label}")
if sejour.mode_sortie:
dossier_lines.append(f"- Mode de sortie : {sejour.mode_sortie}")
if sejour.imc is not None:
dossier_lines.append(f"- IMC : {sejour.imc}")
if dossier.biologie_cle:
bio = [f"{b.test}: {b.valeur}" for b in dossier.biologie_cle[:5] if b.valeur]
if bio:
dossier_lines.append(f"- Biologie clé : {', '.join(bio)}")
if dossier.imagerie:
img_parts = []
for im in dossier.imagerie:
conclusion = f"{im.conclusion}" if im.conclusion else ""
img_parts.append(f"{im.type}{conclusion}")
dossier_lines.append(f"- Imagerie : {', '.join(img_parts)}")
if dossier.traitements_sortie:
trt_parts = []
for t in dossier.traitements_sortie[:10]:
posologie = f" {t.posologie}" if t.posologie else ""
trt_parts.append(f"{t.medicament}{posologie}")
dossier_lines.append(f"- Traitements de sortie : {', '.join(trt_parts)}")
if dossier.antecedents:
dossier_lines.append(f"- Antécédents : {', '.join(a.texte for a in dossier.antecedents[:10])}")
if dossier.complications:
dossier_lines.append(f"- Complications : {', '.join(c.texte for c in dossier.complications)}")
dossier_str = "\n".join(dossier_lines) if dossier_lines else "Non disponible"
# Section asymétrie : éléments que la CPAM n'avait pas
asymetrie_lines = []
if dossier.biologie_cle:
bio_details = []
for b in dossier.biologie_cle if len(dossier.biologie_cle) <= 10 else dossier.biologie_cle[:10]:
anomalie = " (anormale)" if b.anomalie else ""
if b.valeur:
bio_details.append(f"{b.test}: {b.valeur}{anomalie}")
if bio_details:
asymetrie_lines.append(f"- Biologie : {', '.join(bio_details)}")
if dossier.imagerie:
img_details = []
for im in dossier.imagerie:
conclusion = f"{im.conclusion}" if im.conclusion else ""
img_details.append(f"{im.type}{conclusion}")
if img_details:
asymetrie_lines.append(f"- Imagerie : {', '.join(img_details)}")
if dossier.traitements_sortie:
trt_details = []
for t in dossier.traitements_sortie[:10]:
posologie = f" {t.posologie}" if t.posologie else ""
trt_details.append(f"{t.medicament}{posologie}")
if trt_details:
asymetrie_lines.append(f"- Traitements : {', '.join(trt_details)}")
if dossier.actes_ccam:
actes_details = [
f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte
for a in dossier.actes_ccam
]
if actes_details:
asymetrie_lines.append(f"- Actes CCAM : {', '.join(actes_details)}")
asymetrie_str = ""
if asymetrie_lines:
asymetrie_str = (
"\n\nÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM "
"(l'UCR n'a eu que le CRH et les codes) :\n"
+ "\n".join(asymetrie_lines)
)
# Codes contestés par la CPAM (avec libellés CIM-10 résolus)
codes_contestes = []
if controle.dp_ucr:
codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}")
if controle.da_ucr:
codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}")
if controle.dr_ucr:
codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}")
if controle.actes_ucr:
codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}")
codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé"
# Définitions CIM-10 déterministes (tous les codes en jeu)
definitions_str = _get_cim10_definitions(dossier, controle)
# Contexte clinique tagué pour le grounding
tagged_context, tag_map = _build_tagged_context(dossier)
if tagged_context:
tagged_str = f"\n\n{tagged_context}"
else:
tagged_str = (
"\n\nATTENTION — DOSSIER PAUVRE EN ÉLÉMENTS CLINIQUES :\n"
"Aucune biologie, imagerie, traitement ou acte CCAM disponible.\n"
"Ne spécule PAS sur des éléments absents. Signale explicitement "
"le manque de données au lieu d'inventer des preuves."
)
# Résumé biologique déterministe (interprétations non modifiables par le LLM)
bio_summary = _build_bio_summary(dossier)
if bio_summary:
tagged_str += f"\n\n{bio_summary}"
# Vérification cohérence DAS / biologie
das_bio_warnings = _check_das_bio_coherence(dossier)
if das_bio_warnings:
tagged_str += (
"\n\nALERTES COHÉRENCE DAS / BIOLOGIE (incohérences détectées dans le dossier) :\n"
+ "\n".join(f" - {w}" for w in das_bio_warnings)
+ "\n Prends en compte ces incohérences dans ton analyse."
)
# Sources RAG
sources_text = ""
for i, src in enumerate(sources, 1):
doc_name = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(src.get("document", ""), src.get("document", ""))
code_info = f" (code: {src['code']})" if src.get("code") else ""
page_info = f" [page {src['page']}]" if src.get("page") else ""
sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n"
sources_text += (src.get("extrait", "")[:800]) + "\n\n"
# Section pré-analyse (résultat passe 1, si disponible)
extraction_str = ""
if extraction:
ext_lines = []
comp = extraction.get("comprehension_contestation")
if comp:
ext_lines.append(f"Compréhension : {comp}")
elems = extraction.get("elements_cliniques_pertinents", [])
if elems and isinstance(elems, list):
elem_strs = []
for e in elems:
if isinstance(e, dict):
elem_strs.append(f" - [{e.get('tag', '?')}] {e.get('pertinence', '')}")
if elem_strs:
ext_lines.append("Éléments pertinents :\n" + "\n".join(elem_strs))
accords = extraction.get("points_accord_potentiels", [])
if accords and isinstance(accords, list):
ext_lines.append("Points d'accord potentiels : " + " ; ".join(str(a) for a in accords))
codes = extraction.get("codes_en_jeu", {})
if codes and isinstance(codes, dict):
diff = codes.get("difference_cle", "")
if diff:
ext_lines.append(f"Différence clé entre les codages : {diff}")
if ext_lines:
extraction_str = (
"\nPRÉ-ANALYSE (extraction automatique — à utiliser comme base) :\n"
+ "\n".join(ext_lines)
)
prompt = CPAM_ARGUMENTATION.format(
dossier_str=dossier_str,
asymetrie_str=asymetrie_str,
tagged_str=tagged_str,
titre=controle.titre,
arg_ucr=controle.arg_ucr,
decision_ucr=controle.decision_ucr,
codes_str=codes_str,
definitions_str=definitions_str,
sources_text=sources_text,
extraction_str=extraction_str,
)
return prompt, tag_map

139
src/control/cpam_rag.py Normal file
View File

@@ -0,0 +1,139 @@
"""Recherche RAG ciblée pour la contre-argumentation CPAM."""
from __future__ import annotations
import logging
import re
from ..config import ControleCPAM, DossierMedical
from ..medical.cim10_dict import normalize_code, validate_code
logger = logging.getLogger(__name__)
def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> list[dict]:
"""Recherche RAG ciblée pour le sujet du désaccord.
Effectue 2-5 recherches ciblées au lieu d'une requête fourre-tout :
1. Codes contestés → règles de codage spécifiques
2. Argument CPAM → passages Guide Méthodo contradictoires
3. Contexte clinique (optionnel) → définitions CIM-10 des codes en jeu
4. Définitions CIM-10 des codes contestés
5. Règles explicitement citées dans l'argument CPAM
Retourne [] si le RAG est indisponible (index absent, modèle embedding
inaccessible, etc.) — la contre-argumentation sera générée sans sources.
"""
try:
from ..medical.rag_search import search_similar_cpam
except Exception:
logger.warning("Index RAG non disponible pour la contre-argumentation")
return []
try:
return _search_rag_queries(controle, dossier, search_similar_cpam)
except Exception:
logger.warning("Erreur RAG pour la contre-argumentation — génération sans sources",
exc_info=True)
return []
def _search_rag_queries(
controle: ControleCPAM,
dossier: DossierMedical,
search_similar_cpam,
) -> list[dict]:
"""Exécute les requêtes RAG (séparé pour permettre un try/except global)."""
all_results: list[dict] = []
# Requête 1 — Codes contestés (règles de codage)
if controle.dp_ucr or controle.da_ucr:
query_parts = []
if controle.dp_ucr:
query_parts.append(f"règles codage {controle.dp_ucr} diagnostic principal")
if controle.da_ucr:
query_parts.append(f"diagnostic associé significatif {controle.da_ucr} CMA")
query_codes = " ".join(query_parts)
results_codes = search_similar_cpam(query_codes, top_k=6)
logger.debug(" RAG requête codes : %d résultats", len(results_codes))
all_results.extend(results_codes)
# Requête 2 — Argument CPAM (recherche dans le Guide Méthodo)
query_parts_arg = []
if controle.titre:
query_parts_arg.append(controle.titre)
arg_short = controle.arg_ucr[:500] if controle.arg_ucr else ""
if arg_short:
query_parts_arg.append(arg_short)
query_arg = " ".join(query_parts_arg)
if query_arg.strip():
results_arg = search_similar_cpam(query_arg, top_k=6)
logger.debug(" RAG requête argument : %d résultats", len(results_arg))
all_results.extend(results_arg)
# Requête 3 — Contexte clinique (définitions CIM-10 des codes en jeu)
if controle.da_ucr and dossier.diagnostic_principal:
dp_text = dossier.diagnostic_principal.texte
das_texts = [
d.texte for d in dossier.diagnostics_associes
if d.cim10_suggestion and controle.da_ucr
and d.cim10_suggestion in controle.da_ucr
]
if das_texts:
query_clinique = f"{dp_text} {' '.join(das_texts)}"
results_clinique = search_similar_cpam(query_clinique, top_k=4)
logger.debug(" RAG requête clinique : %d résultats", len(results_clinique))
all_results.extend(results_clinique)
# Requête 4 — Définitions CIM-10 des codes contestés
contested_codes = []
for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr):
if field:
contested_codes.extend(re.split(r"[,;\s]+", field.strip()))
for raw_code in contested_codes:
raw_code = raw_code.strip()
if not raw_code:
continue
norm = normalize_code(raw_code)
is_valid, label = validate_code(norm)
if is_valid and label:
query_def = f"CIM-10 {norm} {label} définition inclusion exclusion"
else:
query_def = f"CIM-10 {norm} définition codage"
results_def = search_similar_cpam(query_def, top_k=3)
logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def))
all_results.extend(results_def)
# Requête 5 — Règles explicitement citées dans l'argument CPAM
if controle.arg_ucr:
rule_patterns = [
r'(?:R[eè]gle\s*T?\s*\d+)',
r'(?:Annexe[\s-]*\d+[A-Za-z]*)',
r'(?:Situation de soins?\s+[^.]{5,40})',
]
rules_found = []
for pattern in rule_patterns:
rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE))
if rules_found:
rules_unique = list(dict.fromkeys(rules_found))
query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI"
results_rules = search_similar_cpam(query_rules, top_k=4)
logger.debug(" RAG requête règles (%s) : %d résultats",
", ".join(rules_unique), len(results_rules))
all_results.extend(results_rules)
if not all_results:
return []
# Fusion : dédupliquer par (document, code, page), garder le meilleur score
seen: dict[tuple, dict] = {}
for r in all_results:
key = (r.get("document"), r.get("code"), r.get("page"))
if key in seen:
if r["score"] > seen[key]["score"]:
seen[key] = r
else:
seen[key] = r
merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True)
return merged[:12]

View File

@@ -1,815 +1,48 @@
"""Génération de contre-argumentation pour les contrôles CPAM via RAG + Ollama."""
"""Génération de contre-argumentation pour les contrôles CPAM via RAG + Ollama.
Orchestrateur principal — délègue aux sous-modules :
- cpam_rag : _search_rag_for_control(), _search_rag_queries()
- cpam_context : _build_cpam_prompt(), _build_tagged_context(), _build_bio_summary(), etc.
- cpam_validation : _validate_adversarial(), _validate_grounding(), _format_response(), etc.
"""
from __future__ import annotations
import logging
import re
from ..config import ControleCPAM, DossierMedical, RAGSource
from ..medical.cim10_dict import normalize_code, validate_code
from ..medical.cim10_extractor import BIO_NORMALS
from ..config import ControleCPAM, DossierMedical, RAGSource, rule_enabled
from ..medical.ollama_client import call_anthropic, call_ollama
from ..prompts import CPAM_EXTRACTION, CPAM_ARGUMENTATION, CPAM_ADVERSARIAL
from ..prompts import CPAM_EXTRACTION
# --- Imports depuis les sous-modules ---
from .cpam_rag import _search_rag_for_control
from .cpam_context import (
_build_cpam_prompt,
_build_tagged_context,
)
from .cpam_validation import (
_validate_adversarial,
_validate_grounding,
_validate_references,
_validate_codes_in_response,
_build_correction_prompt,
_format_response,
)
# Backward compat — sera retiré dans un commit futur
from .cpam_rag import _search_rag_queries # noqa: F401
from .cpam_context import ( # noqa: F401
_get_code_label,
_get_cim10_definitions,
_BIO_INTERPRETATION,
_build_bio_summary,
_check_das_bio_coherence,
)
from .cpam_validation import _CIM10_CODE_RE, _validate_adversarial as _validate_adversarial # noqa: F401
logger = logging.getLogger(__name__)
def _search_rag_for_control(controle: ControleCPAM, dossier: DossierMedical) -> list[dict]:
"""Recherche RAG ciblée pour le sujet du désaccord.
Effectue 2-5 recherches ciblées au lieu d'une requête fourre-tout :
1. Codes contestés → règles de codage spécifiques
2. Argument CPAM → passages Guide Méthodo contradictoires
3. Contexte clinique (optionnel) → définitions CIM-10 des codes en jeu
4. Définitions CIM-10 des codes contestés
5. Règles explicitement citées dans l'argument CPAM
Retourne [] si le RAG est indisponible (index absent, modèle embedding
inaccessible, etc.) — la contre-argumentation sera générée sans sources.
"""
try:
from ..medical.rag_search import search_similar_cpam
except Exception:
logger.warning("Index RAG non disponible pour la contre-argumentation")
return []
try:
return _search_rag_queries(controle, dossier, search_similar_cpam)
except Exception:
logger.warning("Erreur RAG pour la contre-argumentation — génération sans sources",
exc_info=True)
return []
def _search_rag_queries(
controle: ControleCPAM,
dossier: DossierMedical,
search_similar_cpam,
) -> list[dict]:
"""Exécute les requêtes RAG (séparé pour permettre un try/except global)."""
all_results: list[dict] = []
# Requête 1 — Codes contestés (règles de codage)
if controle.dp_ucr or controle.da_ucr:
query_parts = []
if controle.dp_ucr:
query_parts.append(f"règles codage {controle.dp_ucr} diagnostic principal")
if controle.da_ucr:
query_parts.append(f"diagnostic associé significatif {controle.da_ucr} CMA")
query_codes = " ".join(query_parts)
results_codes = search_similar_cpam(query_codes, top_k=6)
logger.debug(" RAG requête codes : %d résultats", len(results_codes))
all_results.extend(results_codes)
# Requête 2 — Argument CPAM (recherche dans le Guide Méthodo)
query_parts_arg = []
if controle.titre:
query_parts_arg.append(controle.titre)
arg_short = controle.arg_ucr[:500] if controle.arg_ucr else ""
if arg_short:
query_parts_arg.append(arg_short)
query_arg = " ".join(query_parts_arg)
if query_arg.strip():
results_arg = search_similar_cpam(query_arg, top_k=6)
logger.debug(" RAG requête argument : %d résultats", len(results_arg))
all_results.extend(results_arg)
# Requête 3 — Contexte clinique (définitions CIM-10 des codes en jeu)
if controle.da_ucr and dossier.diagnostic_principal:
dp_text = dossier.diagnostic_principal.texte
das_texts = [
d.texte for d in dossier.diagnostics_associes
if d.cim10_suggestion and controle.da_ucr
and d.cim10_suggestion in controle.da_ucr
]
if das_texts:
query_clinique = f"{dp_text} {' '.join(das_texts)}"
results_clinique = search_similar_cpam(query_clinique, top_k=4)
logger.debug(" RAG requête clinique : %d résultats", len(results_clinique))
all_results.extend(results_clinique)
# Requête 4 — Définitions CIM-10 des codes contestés
contested_codes = []
for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr):
if field:
contested_codes.extend(re.split(r"[,;\s]+", field.strip()))
for raw_code in contested_codes:
raw_code = raw_code.strip()
if not raw_code:
continue
norm = normalize_code(raw_code)
is_valid, label = validate_code(norm)
if is_valid and label:
query_def = f"CIM-10 {norm} {label} définition inclusion exclusion"
else:
query_def = f"CIM-10 {norm} définition codage"
results_def = search_similar_cpam(query_def, top_k=3)
logger.debug(" RAG requête CIM-10 %s : %d résultats", norm, len(results_def))
all_results.extend(results_def)
# Requête 5 — Règles explicitement citées dans l'argument CPAM
if controle.arg_ucr:
rule_patterns = [
r'(?:R[eè]gle\s*T?\s*\d+)',
r'(?:Annexe[\s-]*\d+[A-Za-z]*)',
r'(?:Situation de soins?\s+[^.]{5,40})',
]
rules_found = []
for pattern in rule_patterns:
rules_found.extend(re.findall(pattern, controle.arg_ucr, re.IGNORECASE))
if rules_found:
rules_unique = list(dict.fromkeys(rules_found))
query_rules = " ".join(rules_unique) + " guide méthodologique codage PMSI"
results_rules = search_similar_cpam(query_rules, top_k=4)
logger.debug(" RAG requête règles (%s) : %d résultats",
", ".join(rules_unique), len(results_rules))
all_results.extend(results_rules)
if not all_results:
return []
# Fusion : dédupliquer par (document, code, page), garder le meilleur score
seen: dict[tuple, dict] = {}
for r in all_results:
key = (r.get("document"), r.get("code"), r.get("page"))
if key in seen:
if r["score"] > seen[key]["score"]:
seen[key] = r
else:
seen[key] = r
merged = sorted(seen.values(), key=lambda r: r["score"], reverse=True)
return merged[:12]
def _get_code_label(code_str: str) -> str:
"""Résout le libellé CIM-10 pour un ou plusieurs codes."""
codes = re.split(r"[,;\s]+", code_str.strip())
labels = []
for raw in codes:
raw = raw.strip()
if not raw:
continue
norm = normalize_code(raw)
is_valid, label = validate_code(norm)
if is_valid and label:
labels.append(f"{norm}{label}")
else:
labels.append(norm)
if not labels:
return ""
if len(labels) == 1:
parts = labels[0].split("", 1)
return f"{parts[1]}" if len(parts) > 1 else ""
return "\n " + "\n ".join(labels)
def _get_cim10_definitions(
dossier: DossierMedical,
controle: ControleCPAM,
) -> str:
"""Construit une section de définitions CIM-10 déterministes pour tous les codes en jeu.
Collecte les codes depuis :
- Le dossier : DP (cim10_suggestion) + DAS (cim10_suggestion)
- L'UCR : dp_ucr, da_ucr, dr_ucr
Returns:
Texte formaté pour injection dans le prompt, ou "" si aucun code résolu.
"""
codes_seen: dict[str, str] = {} # code normalisé → rôle (pour affichage)
# Codes du dossier (établissement)
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
code = dossier.diagnostic_principal.cim10_suggestion
codes_seen[normalize_code(code)] = "DP établissement"
for das in dossier.diagnostics_associes:
if das.cim10_suggestion:
norm = normalize_code(das.cim10_suggestion)
if norm not in codes_seen:
codes_seen[norm] = "DAS établissement"
# Codes de l'UCR (CPAM)
for field, role in [
(controle.dp_ucr, "DP proposé UCR"),
(controle.da_ucr, "DA proposé UCR"),
(controle.dr_ucr, "DR proposé UCR"),
]:
if not field:
continue
for raw in re.split(r"[,;\s]+", field.strip()):
raw = raw.strip()
if not raw:
continue
norm = normalize_code(raw)
if norm not in codes_seen:
codes_seen[norm] = role
if not codes_seen:
return ""
# Résoudre les libellés
lines = []
for norm_code, role in codes_seen.items():
is_valid, label = validate_code(norm_code)
if is_valid and label:
lines.append(f" {norm_code}{label} [{role}]")
else:
lines.append(f" {norm_code} — (code non trouvé dans le dictionnaire) [{role}]")
if not lines:
return ""
return (
"\nDÉFINITIONS CIM-10 — RÉFÉRENCE (source : dictionnaire officiel) :\n"
+ "\n".join(lines)
)
def _build_tagged_context(dossier: DossierMedical) -> tuple[str, dict[str, str]]:
"""Construit un contexte clinique avec des tags de référence pour le grounding.
Chaque élément clinique reçoit un tag unique ([BIO-1], [IMG-1], [TRT-1], [ACTE-1])
que le LLM doit citer dans ses preuves pour garantir la traçabilité.
Returns:
(texte tagué pour injection dans le prompt, dict tag → contenu original)
"""
tag_map: dict[str, str] = {}
lines: list[str] = []
# Biologie (avec normes de référence pour éviter les hallucinations)
for i, b in enumerate(dossier.biologie_cle, 1):
if not b.valeur:
continue
tag = f"BIO-{i}"
# Interpréter la valeur par rapport aux normes connues
norm_info = ""
if b.test in BIO_NORMALS:
lo, hi = BIO_NORMALS[b.test]
try:
val = float(b.valeur.replace(",", ".").split()[0])
if val > hi:
norm_info = f" — ÉLEVÉ (norme {lo}-{hi})"
elif val < lo:
norm_info = f" — BAS (norme {lo}-{hi})"
else:
norm_info = f" — NORMAL (norme {lo}-{hi})"
except (ValueError, AttributeError):
pass
content = f"{b.test}: {b.valeur}{norm_info}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Imagerie
for i, im in enumerate(dossier.imagerie, 1):
tag = f"IMG-{i}"
conclusion = f"{im.conclusion}" if im.conclusion else ""
content = f"{im.type}{conclusion}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Traitements
for i, t in enumerate(dossier.traitements_sortie[:10], 1):
tag = f"TRT-{i}"
posologie = f" {t.posologie}" if t.posologie else ""
content = f"{t.medicament}{posologie}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
# Actes CCAM
for i, a in enumerate(dossier.actes_ccam, 1):
tag = f"ACTE-{i}"
code = f" ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else ""
content = f"{a.texte}{code}"
tag_map[tag] = content
lines.append(f" [{tag}] {content}")
if not lines:
return "", tag_map
text = "ÉLÉMENTS CLINIQUES RÉFÉRENCÉS (cite le tag [XX-N] dans tes preuves) :\n" + "\n".join(lines)
return text, tag_map
def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]:
"""Vérifie que les références dans preuves_dossier correspondent à des tags existants.
Returns:
Liste de warnings pour les références inventées.
"""
if not tag_map:
return []
warnings: list[str] = []
preuves = response_data.get("preuves_dossier")
if not preuves or not isinstance(preuves, list):
return warnings
for p in preuves:
if not isinstance(p, dict):
continue
ref = p.get("ref", "")
if not ref:
continue
if ref not in tag_map:
valeur = p.get("valeur", "?")
warnings.append(f"Preuve [{ref}] non traçable (« {valeur} »)")
logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref)
return warnings
def _check_das_bio_coherence(dossier: DossierMedical) -> list[str]:
"""Vérifie la cohérence entre les textes DAS et les valeurs biologiques.
Détecte les contradictions comme "leucocytose" dans un DAS alors que
les leucocytes sont bas, ou "anémie" alors que l'hémoglobine est normale.
Returns:
Liste de warnings pour les incohérences détectées.
"""
if not dossier.diagnostics_associes or not dossier.biologie_cle:
return []
# Patterns DAS → (test bio attendu, direction attendue)
_DAS_BIO_CHECKS: dict[str, tuple[str, str]] = {
"leucocytose": ("Leucocytes", "high"),
"leucopénie": ("Leucocytes", "low"),
"leucopenie": ("Leucocytes", "low"),
"thrombocytose": ("Plaquettes", "high"),
"thrombocytopénie": ("Plaquettes", "low"),
"thrombocytopenie": ("Plaquettes", "low"),
"thrombopénie": ("Plaquettes", "low"),
"thrombopenie": ("Plaquettes", "low"),
"anémie": ("Hémoglobine", "low"),
"anemie": ("Hémoglobine", "low"),
"polyglobulie": ("Hémoglobine", "high"),
"hyperkaliémie": ("Potassium", "high"),
"hypokaliémie": ("Potassium", "low"),
}
# Indexer les valeurs bio disponibles
bio_values: dict[str, float] = {}
for b in dossier.biologie_cle:
if b.test and b.valeur:
try:
bio_values[b.test] = float(b.valeur.replace(",", ".").split()[0])
except (ValueError, AttributeError):
pass
warnings: list[str] = []
for das in dossier.diagnostics_associes:
texte_lower = (das.texte or "").lower()
for pattern, (bio_test, direction) in _DAS_BIO_CHECKS.items():
if pattern not in texte_lower:
continue
if bio_test not in bio_values or bio_test not in BIO_NORMALS:
continue
val = bio_values[bio_test]
lo, hi = BIO_NORMALS[bio_test]
if direction == "high" and val <= hi:
warnings.append(
f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) "
f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})"
)
elif direction == "low" and val >= lo:
warnings.append(
f"INCOHÉRENCE : DAS « {das.texte} » ({das.cim10_suggestion or '?'}) "
f"mais {bio_test} = {val} est NORMAL (norme {lo}-{hi})"
)
if warnings:
for w in warnings:
logger.warning(" DAS/bio : %s", w)
return warnings
def _build_cpam_prompt(
dossier: DossierMedical,
controle: ControleCPAM,
sources: list[dict],
extraction: dict | None = None,
) -> tuple[str, dict[str, str]]:
"""Construit le prompt pour la contre-argumentation CPAM.
Args:
extraction: Résultat optionnel de la passe 1 (extraction structurée).
Returns:
(prompt texte, tag_map pour validation grounding)
"""
# Résumé du dossier médical
dossier_lines = []
if dossier.diagnostic_principal:
dp = dossier.diagnostic_principal
dp_code = f" ({dp.cim10_suggestion})" if dp.cim10_suggestion else ""
dossier_lines.append(f"- DP : {dp.texte}{dp_code}")
elif controle.dp_ucr:
dp_label = _get_code_label(controle.dp_ucr)
dossier_lines.append(
f"- DP : code {controle.dp_ucr}{dp_label} "
f"(codé par l'établissement, contesté par la CPAM)"
)
if dossier.diagnostics_associes:
das_parts = []
for das in dossier.diagnostics_associes:
code = f" ({das.cim10_suggestion})" if das.cim10_suggestion else ""
das_parts.append(f"{das.texte}{code}")
dossier_lines.append(f"- DAS : {', '.join(das_parts)}")
if dossier.actes_ccam:
actes = [f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte
for a in dossier.actes_ccam]
dossier_lines.append(f"- Actes CCAM : {', '.join(actes)}")
sejour = dossier.sejour
if sejour.duree_sejour is not None:
dossier_lines.append(f"- Durée séjour : {sejour.duree_sejour} jours")
if sejour.sexe or sejour.age is not None:
patient_info = []
if sejour.sexe:
patient_info.append(sejour.sexe)
if sejour.age is not None:
patient_info.append(f"{sejour.age} ans")
if sejour.age < 18:
patient_info.append("(PÉDIATRIE — codage pédiatrique applicable)")
elif sejour.age >= 80:
patient_info.append("(patient âgé — comorbidités fréquentes)")
dossier_lines.append(f"- Patient : {', '.join(patient_info)}")
if sejour.mode_entree:
mode_label = sejour.mode_entree
if "urgence" in mode_label.lower() or "urgent" in mode_label.lower():
dossier_lines.append(f"- Mode d'entrée : {mode_label} (ADMISSION EN URGENCE)")
else:
dossier_lines.append(f"- Mode d'entrée : {mode_label}")
if sejour.mode_sortie:
dossier_lines.append(f"- Mode de sortie : {sejour.mode_sortie}")
if sejour.imc is not None:
dossier_lines.append(f"- IMC : {sejour.imc}")
if dossier.biologie_cle:
bio = [f"{b.test}: {b.valeur}" for b in dossier.biologie_cle[:5] if b.valeur]
if bio:
dossier_lines.append(f"- Biologie clé : {', '.join(bio)}")
if dossier.imagerie:
img_parts = []
for im in dossier.imagerie:
conclusion = f"{im.conclusion}" if im.conclusion else ""
img_parts.append(f"{im.type}{conclusion}")
dossier_lines.append(f"- Imagerie : {', '.join(img_parts)}")
if dossier.traitements_sortie:
trt_parts = []
for t in dossier.traitements_sortie[:10]:
posologie = f" {t.posologie}" if t.posologie else ""
trt_parts.append(f"{t.medicament}{posologie}")
dossier_lines.append(f"- Traitements de sortie : {', '.join(trt_parts)}")
if dossier.antecedents:
dossier_lines.append(f"- Antécédents : {', '.join(a.texte for a in dossier.antecedents[:10])}")
if dossier.complications:
dossier_lines.append(f"- Complications : {', '.join(c.texte for c in dossier.complications)}")
dossier_str = "\n".join(dossier_lines) if dossier_lines else "Non disponible"
# Section asymétrie : éléments que la CPAM n'avait pas
asymetrie_lines = []
if dossier.biologie_cle:
bio_details = []
for b in dossier.biologie_cle if len(dossier.biologie_cle) <= 10 else dossier.biologie_cle[:10]:
anomalie = " (anormale)" if b.anomalie else ""
if b.valeur:
bio_details.append(f"{b.test}: {b.valeur}{anomalie}")
if bio_details:
asymetrie_lines.append(f"- Biologie : {', '.join(bio_details)}")
if dossier.imagerie:
img_details = []
for im in dossier.imagerie:
conclusion = f"{im.conclusion}" if im.conclusion else ""
img_details.append(f"{im.type}{conclusion}")
if img_details:
asymetrie_lines.append(f"- Imagerie : {', '.join(img_details)}")
if dossier.traitements_sortie:
trt_details = []
for t in dossier.traitements_sortie[:10]:
posologie = f" {t.posologie}" if t.posologie else ""
trt_details.append(f"{t.medicament}{posologie}")
if trt_details:
asymetrie_lines.append(f"- Traitements : {', '.join(trt_details)}")
if dossier.actes_ccam:
actes_details = [
f"{a.texte} ({a.code_ccam_suggestion})" if a.code_ccam_suggestion else a.texte
for a in dossier.actes_ccam
]
if actes_details:
asymetrie_lines.append(f"- Actes CCAM : {', '.join(actes_details)}")
asymetrie_str = ""
if asymetrie_lines:
asymetrie_str = (
"\n\nÉLÉMENTS DU DOSSIER NON TRANSMIS À LA CPAM "
"(l'UCR n'a eu que le CRH et les codes) :\n"
+ "\n".join(asymetrie_lines)
)
# Codes contestés par la CPAM (avec libellés CIM-10 résolus)
codes_contestes = []
if controle.dp_ucr:
codes_contestes.append(f"DP proposé par UCR : {controle.dp_ucr}{_get_code_label(controle.dp_ucr)}")
if controle.da_ucr:
codes_contestes.append(f"DA proposés par UCR : {controle.da_ucr}{_get_code_label(controle.da_ucr)}")
if controle.dr_ucr:
codes_contestes.append(f"DR proposé par UCR : {controle.dr_ucr}{_get_code_label(controle.dr_ucr)}")
if controle.actes_ucr:
codes_contestes.append(f"Actes proposés par UCR : {controle.actes_ucr}")
codes_str = "\n".join(codes_contestes) if codes_contestes else "Aucun code spécifique proposé"
# Définitions CIM-10 déterministes (tous les codes en jeu)
definitions_str = _get_cim10_definitions(dossier, controle)
# Contexte clinique tagué pour le grounding
tagged_context, tag_map = _build_tagged_context(dossier)
if tagged_context:
tagged_str = f"\n\n{tagged_context}"
else:
tagged_str = (
"\n\nATTENTION — DOSSIER PAUVRE EN ÉLÉMENTS CLINIQUES :\n"
"Aucune biologie, imagerie, traitement ou acte CCAM disponible.\n"
"Ne spécule PAS sur des éléments absents. Signale explicitement "
"le manque de données au lieu d'inventer des preuves."
)
# Vérification cohérence DAS / biologie
das_bio_warnings = _check_das_bio_coherence(dossier)
if das_bio_warnings:
tagged_str += (
"\n\nALERTES COHÉRENCE DAS / BIOLOGIE (incohérences détectées dans le dossier) :\n"
+ "\n".join(f" - {w}" for w in das_bio_warnings)
+ "\n Prends en compte ces incohérences dans ton analyse."
)
# Sources RAG
sources_text = ""
for i, src in enumerate(sources, 1):
doc_name = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(src.get("document", ""), src.get("document", ""))
code_info = f" (code: {src['code']})" if src.get("code") else ""
page_info = f" [page {src['page']}]" if src.get("page") else ""
sources_text += f"--- Source {i}: {doc_name}{code_info}{page_info} ---\n"
sources_text += (src.get("extrait", "")[:800]) + "\n\n"
# Section pré-analyse (résultat passe 1, si disponible)
extraction_str = ""
if extraction:
ext_lines = []
comp = extraction.get("comprehension_contestation")
if comp:
ext_lines.append(f"Compréhension : {comp}")
elems = extraction.get("elements_cliniques_pertinents", [])
if elems and isinstance(elems, list):
elem_strs = []
for e in elems:
if isinstance(e, dict):
elem_strs.append(f" - [{e.get('tag', '?')}] {e.get('pertinence', '')}")
if elem_strs:
ext_lines.append("Éléments pertinents :\n" + "\n".join(elem_strs))
accords = extraction.get("points_accord_potentiels", [])
if accords and isinstance(accords, list):
ext_lines.append("Points d'accord potentiels : " + " ; ".join(str(a) for a in accords))
codes = extraction.get("codes_en_jeu", {})
if codes and isinstance(codes, dict):
diff = codes.get("difference_cle", "")
if diff:
ext_lines.append(f"Différence clé entre les codages : {diff}")
if ext_lines:
extraction_str = (
"\nPRÉ-ANALYSE (extraction automatique — à utiliser comme base) :\n"
+ "\n".join(ext_lines)
)
prompt = CPAM_ARGUMENTATION.format(
dossier_str=dossier_str,
asymetrie_str=asymetrie_str,
tagged_str=tagged_str,
titre=controle.titre,
arg_ucr=controle.arg_ucr,
decision_ucr=controle.decision_ucr,
codes_str=codes_str,
definitions_str=definitions_str,
sources_text=sources_text,
extraction_str=extraction_str,
)
return prompt, tag_map
def _validate_references(parsed: dict, sources: list[dict]) -> list[str]:
"""Vérifie que les références citées correspondent aux sources RAG fournies.
Returns:
Liste d'avertissements pour les références non vérifiables.
"""
warnings = []
refs = parsed.get("references")
if not refs or not isinstance(refs, list):
return warnings
# Construire un set des documents sources disponibles
source_docs = set()
for src in sources:
doc_name = src.get("document", "")
source_docs.add(doc_name)
# Ajouter les noms lisibles aussi
readable = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(doc_name, "")
if readable:
source_docs.add(readable)
source_docs.add(readable.lower())
if not source_docs:
return warnings
for ref in refs:
if not isinstance(ref, dict):
continue
doc = ref.get("document", "")
if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd):
warnings.append(f"Référence non vérifiable : {doc}")
logger.warning("CPAM : référence non vérifiable « %s »", doc)
return warnings
def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str:
"""Formate la réponse LLM en texte lisible."""
sections = []
analyse = parsed.get("analyse_contestation")
if analyse:
sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}")
accord = parsed.get("points_accord")
if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""):
sections.append(f"POINTS D'ACCORD\n{accord}")
# Nouveaux champs structurés par axe
contre_med = parsed.get("contre_arguments_medicaux")
if contre_med:
sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}")
# Preuves du dossier (nouveau champ structuré)
preuves = parsed.get("preuves_dossier")
if preuves and isinstance(preuves, list):
preuves_lines = []
for p in preuves:
if isinstance(p, dict):
ref = p.get("ref", "")
elem = p.get("element", "")
valeur = p.get("valeur", "")
signif = p.get("signification", "")
ref_prefix = f"[{ref}] " if ref else ""
preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur}{signif}")
if preuves_lines:
sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines))
contre_asym = parsed.get("contre_arguments_asymetrie")
if contre_asym:
sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}")
contre_regl = parsed.get("contre_arguments_reglementaires")
if contre_regl:
sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}")
# Fallback : ancien champ unique (réponses en cache existantes)
if not contre_med and not contre_asym and not contre_regl:
contre = parsed.get("contre_arguments")
if contre:
sections.append(f"CONTRE-ARGUMENTS\n{contre}")
# Références structurées (nouveau format liste) ou ancien format string
refs = parsed.get("references")
if refs:
if isinstance(refs, list):
ref_lines = []
for r in refs:
if isinstance(r, dict):
doc = r.get("document", "")
page = r.get("page", "")
citation = r.get("citation", "")
ref_lines.append(f"- [{doc}, p.{page}] {citation}")
else:
ref_lines.append(f"- {r}")
if ref_lines:
sections.append(f"REFERENCES\n" + "\n".join(ref_lines))
else:
sections.append(f"REFERENCES\n{refs}")
conclusion = parsed.get("conclusion")
if conclusion:
sections.append(f"CONCLUSION\n{conclusion}")
# Avertissements sur les références non vérifiables
if ref_warnings:
warning_text = "\n".join(f"- {w}" for w in ref_warnings)
sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}")
return "\n\n".join(sections)
def _validate_adversarial(
response_data: dict,
tag_map: dict[str, str],
controle: ControleCPAM,
) -> dict | None:
"""Validation adversariale — vérifie la cohérence de la contre-argumentation.
Un appel LLM de relecture critique vérifie :
1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier
2. La conclusion est cohérente avec l'argumentation
3. Les points d'accord ne contredisent pas la contre-argumentation
4. Les codes CIM-10 cités sont cohérents
Returns:
dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec.
"""
import json as _json
# Construire le résumé des éléments factuels disponibles
if tag_map:
factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items())
factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}"
else:
factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible"
# Sérialiser la réponse LLM de façon compacte
try:
response_json = _json.dumps(response_data, ensure_ascii=False, indent=None)
# Tronquer si trop long pour le prompt de validation
if len(response_json) > 3000:
response_json = response_json[:3000] + "..."
except (TypeError, ValueError):
logger.warning("Validation adversariale : impossible de sérialiser la réponse")
return None
# Normes biologiques pour vérifier les interprétations
normes_lines = []
for test, (lo, hi) in BIO_NORMALS.items():
normes_lines.append(f" {test}: {lo}-{hi}")
normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines)
dp_ucr_line = f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else ""
da_ucr_line = f"DA UCR : {controle.da_ucr}" if controle.da_ucr else ""
prompt = CPAM_ADVERSARIAL.format(
response_json=response_json,
factual_section=factual_section,
normes_section=normes_section,
dp_ucr_line=dp_ucr_line,
da_ucr_line=da_ucr_line,
)
logger.debug(" Validation adversariale")
result = call_ollama(prompt, temperature=0.0, max_tokens=800, role="validation")
if result is None:
result = call_anthropic(prompt, temperature=0.0, max_tokens=800)
if result is None:
logger.warning(" Validation adversariale échouée — LLM indisponible")
return None
coherent = result.get("coherent", True)
erreurs = result.get("erreurs", [])
score = result.get("score_confiance", -1)
if not coherent and erreurs:
logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)",
len(erreurs), score)
for e in erreurs:
logger.warning(" - %s", e)
else:
logger.info(" Validation adversariale OK (score %s/10)", score)
return result
def _extraction_pass(
dossier: DossierMedical,
controle: ControleCPAM,
@@ -926,6 +159,11 @@ def generate_cpam_response(
if grounding_warnings:
logger.warning(" CPAM : %d preuve(s) non traçable(s)", len(grounding_warnings))
# 7b. Validation codes fermée (périmètre dossier + UCR)
code_warnings = _validate_codes_in_response(result, dossier, controle)
if code_warnings:
logger.warning(" CPAM : %d code(s) hors périmètre", len(code_warnings))
# 8. Validation adversariale (cohérence factuelle)
adversarial_warnings: list[str] = []
validation = _validate_adversarial(result, tag_map, controle)
@@ -938,7 +176,49 @@ def generate_cpam_response(
if adversarial_warnings:
adversarial_warnings.append(f"Score de confiance : {score}/10")
all_warnings = ref_warnings + grounding_warnings + adversarial_warnings
# 8b. Boucle de correction (max 1 retry)
if (validation
and not validation.get("coherent", True)
and validation.get("score_confiance", 10) <= 5
and rule_enabled("RULE-CPAM-CORRECTION-LOOP")):
erreurs_v = validation.get("erreurs", [])
logger.warning(" Score adversarial %s/10 — correction en cours (%d erreur(s))",
validation.get("score_confiance"), len(erreurs_v))
correction_prompt = _build_correction_prompt(prompt, result, validation)
corrected = call_ollama(correction_prompt, temperature=0.0, max_tokens=6000, role="cpam")
if corrected is None:
corrected = call_anthropic(correction_prompt, temperature=0.0, max_tokens=6000)
if corrected:
# Re-valider la correction
validation2 = _validate_adversarial(corrected, tag_map, controle)
score2 = validation2.get("score_confiance", 0) if validation2 else 0
score1 = validation.get("score_confiance", 0)
if score2 > score1:
logger.info(" Correction acceptée (score %s%s)", score1, score2)
result = corrected
validation = validation2
# Recalculer les warnings
ref_warnings = _validate_references(result, sources)
grounding_warnings = _validate_grounding(result, tag_map)
code_warnings = _validate_codes_in_response(result, dossier, controle)
adversarial_warnings = []
if validation and not validation.get("coherent", True):
for e in validation.get("erreurs", []):
if isinstance(e, str) and e.strip():
adversarial_warnings.append(f"Incohérence détectée : {e}")
if adversarial_warnings:
adversarial_warnings.append(
f"Score de confiance : {validation.get('score_confiance', '?')}/10"
)
else:
logger.warning(" Correction rejetée (score %s%s) — conserve l'original",
score1, score2)
all_warnings = ref_warnings + grounding_warnings + code_warnings + adversarial_warnings
# 9. Formater la réponse
text = _format_response(result, all_warnings)

View File

@@ -0,0 +1,376 @@
"""Validation et formatage des réponses CPAM (grounding, adversariale, codes)."""
from __future__ import annotations
import logging
import re
from ..config import ControleCPAM, DossierMedical
from ..medical.bio_normals import BIO_NORMALS
from ..medical.cim10_dict import normalize_code, validate_code
from ..medical.ollama_client import call_anthropic, call_ollama
from ..prompts import CPAM_ADVERSARIAL
logger = logging.getLogger(__name__)
def _validate_grounding(response_data: dict, tag_map: dict[str, str]) -> list[str]:
"""Vérifie que les références dans preuves_dossier correspondent à des tags existants.
Returns:
Liste de warnings pour les références inventées.
"""
if not tag_map:
return []
warnings: list[str] = []
preuves = response_data.get("preuves_dossier")
if not preuves or not isinstance(preuves, list):
return warnings
for p in preuves:
if not isinstance(p, dict):
continue
ref = p.get("ref", "")
if not ref:
continue
if ref not in tag_map:
valeur = p.get("valeur", "?")
warnings.append(f"Preuve [{ref}] non traçable (« {valeur} »)")
logger.warning("Grounding : preuve [%s] introuvable dans les tags du dossier", ref)
return warnings
def _validate_references(parsed: dict, sources: list[dict]) -> list[str]:
"""Vérifie que les références citées correspondent aux sources RAG fournies.
Returns:
Liste d'avertissements pour les références non vérifiables.
"""
warnings = []
refs = parsed.get("references")
if not refs or not isinstance(refs, list):
return warnings
# Construire un set des documents sources disponibles
source_docs = set()
for src in sources:
doc_name = src.get("document", "")
source_docs.add(doc_name)
# Ajouter les noms lisibles aussi
readable = {
"cim10": "CIM-10 FR 2026",
"cim10_alpha": "CIM-10 Index Alphabétique 2026",
"guide_methodo": "Guide Méthodologique MCO 2026",
"ccam": "CCAM PMSI V4 2025",
}.get(doc_name, "")
if readable:
source_docs.add(readable)
source_docs.add(readable.lower())
if not source_docs:
return warnings
for ref in refs:
if not isinstance(ref, dict):
continue
doc = ref.get("document", "")
if doc and not any(sd in doc.lower() or doc.lower() in sd.lower() for sd in source_docs if sd):
warnings.append(f"Référence non vérifiable : {doc}")
logger.warning("CPAM : référence non vérifiable « %s »", doc)
return warnings
# Regex pour capturer les codes CIM-10 (ex: K81.0, E87, Z45.80)
_CIM10_CODE_RE = re.compile(r"\b([A-Z]\d{2}\.?\d{0,2})\b")
def _validate_codes_in_response(
parsed: dict,
dossier: DossierMedical,
controle: ControleCPAM,
) -> list[str]:
"""Vérifie que les codes CIM-10 cités dans la réponse sont dans le périmètre du dossier.
Construit une whitelist à partir du dossier (DP, DAS) et de l'UCR (dp_ucr, da_ucr, dr_ucr),
puis extrait tous les codes CIM-10 des champs textuels de la réponse LLM.
La comparaison se fait par préfixe 3 caractères (ex: K81 matche K81.0 et K81.09).
Returns:
Liste de warnings pour les codes hors périmètre.
"""
# 1. Construire la whitelist (préfixes 3 chars)
whitelist_prefixes: set[str] = set()
def _add_code(raw: str) -> None:
raw = raw.strip()
if not raw:
return
norm = normalize_code(raw)
if norm and len(norm) >= 3:
whitelist_prefixes.add(norm[:3])
# Codes du dossier
if dossier.diagnostic_principal and dossier.diagnostic_principal.cim10_suggestion:
_add_code(dossier.diagnostic_principal.cim10_suggestion)
for das in dossier.diagnostics_associes:
if das.cim10_suggestion:
_add_code(das.cim10_suggestion)
# Codes de l'UCR
for field in (controle.dp_ucr, controle.da_ucr, controle.dr_ucr):
if not field:
continue
for raw in re.split(r"[,;\s]+", field.strip()):
_add_code(raw)
if not whitelist_prefixes:
return []
# 2. Extraire les codes CIM-10 de la réponse LLM (hors citations RAG)
text_fields = []
for key in (
"analyse_contestation",
"contre_arguments_medicaux",
"contre_arguments_asymetrie",
"contre_arguments_reglementaires",
"conclusion",
):
val = parsed.get(key)
if val and isinstance(val, str):
text_fields.append(val)
# Preuves du dossier — valeurs
preuves = parsed.get("preuves_dossier")
if preuves and isinstance(preuves, list):
for p in preuves:
if isinstance(p, dict):
v = p.get("valeur", "")
if v and isinstance(v, str):
text_fields.append(v)
combined_text = "\n".join(text_fields)
found_codes = _CIM10_CODE_RE.findall(combined_text)
if not found_codes:
return []
# 3. Comparer par préfixe 3 chars
warnings: list[str] = []
seen_warned: set[str] = set()
for raw_code in found_codes:
norm = normalize_code(raw_code)
if not norm or len(norm) < 3:
continue
prefix = norm[:3]
if prefix in whitelist_prefixes:
continue
if norm in seen_warned:
continue
seen_warned.add(norm)
is_valid, label = validate_code(norm)
label_str = f" ({label})" if is_valid and label else ""
warnings.append(f"Code {norm}{label_str} hors périmètre dossier/UCR")
logger.warning("CPAM : code %s%s absent du dossier et de l'UCR", norm, label_str)
return warnings
def _validate_adversarial(
response_data: dict,
tag_map: dict[str, str],
controle: ControleCPAM,
) -> dict | None:
"""Validation adversariale — vérifie la cohérence de la contre-argumentation.
Un appel LLM de relecture critique vérifie :
1. Les valeurs cliniques citées correspondent aux éléments tagués du dossier
2. La conclusion est cohérente avec l'argumentation
3. Les points d'accord ne contredisent pas la contre-argumentation
4. Les codes CIM-10 cités sont cohérents
Returns:
dict {"coherent": bool, "erreurs": list[str], "score_confiance": int} ou None si échec.
"""
import json as _json
# Construire le résumé des éléments factuels disponibles
if tag_map:
factual_lines = "\n".join(f" [{tag}] {content}" for tag, content in tag_map.items())
factual_section = f"ÉLÉMENTS FACTUELS DU DOSSIER :\n{factual_lines}"
else:
factual_section = "ÉLÉMENTS FACTUELS DU DOSSIER : aucun élément tagué disponible"
# Sérialiser la réponse LLM de façon compacte
try:
response_json = _json.dumps(response_data, ensure_ascii=False, indent=None)
# Tronquer si trop long pour le prompt de validation
if len(response_json) > 3000:
response_json = response_json[:3000] + "..."
except (TypeError, ValueError):
logger.warning("Validation adversariale : impossible de sérialiser la réponse")
return None
# Normes biologiques pour vérifier les interprétations
normes_lines = []
for test, (lo, hi) in BIO_NORMALS.items():
normes_lines.append(f" {test}: {lo}-{hi}")
normes_section = "NORMES BIOLOGIQUES DE RÉFÉRENCE :\n" + "\n".join(normes_lines)
dp_ucr_line = f"DP UCR : {controle.dp_ucr}" if controle.dp_ucr else ""
da_ucr_line = f"DA UCR : {controle.da_ucr}" if controle.da_ucr else ""
prompt = CPAM_ADVERSARIAL.format(
response_json=response_json,
factual_section=factual_section,
normes_section=normes_section,
dp_ucr_line=dp_ucr_line,
da_ucr_line=da_ucr_line,
)
logger.debug(" Validation adversariale")
result = call_ollama(prompt, temperature=0.0, max_tokens=800, role="validation")
if result is None:
result = call_anthropic(prompt, temperature=0.0, max_tokens=800)
if result is None:
logger.warning(" Validation adversariale échouée — LLM indisponible")
return None
coherent = result.get("coherent", True)
erreurs = result.get("erreurs", [])
score = result.get("score_confiance", -1)
if not coherent and erreurs:
logger.warning(" Validation adversariale : %d incohérence(s) détectée(s) (score %s/10)",
len(erreurs), score)
for e in erreurs:
logger.warning(" - %s", e)
else:
logger.info(" Validation adversariale OK (score %s/10)", score)
return result
def _build_correction_prompt(
original_prompt: str,
original_response: dict,
adversarial_result: dict,
) -> str:
"""Construit un prompt de correction en injectant les erreurs détectées.
Args:
original_prompt: Le prompt d'argumentation initial.
original_response: La réponse LLM originale (dict).
adversarial_result: Le résultat de la validation adversariale.
Returns:
Prompt de correction prêt à envoyer au LLM.
"""
import json as _json
erreurs = adversarial_result.get("erreurs", [])
erreurs_text = "\n".join(f" {i}. {e}" for i, e in enumerate(erreurs, 1))
# Résumé compact de la réponse problématique
summary_fields = {}
for key in ("analyse_contestation", "contre_arguments_medicaux",
"contre_arguments_asymetrie", "contre_arguments_reglementaires",
"conclusion"):
val = original_response.get(key)
if val and isinstance(val, str):
# Tronquer chaque champ à 400 chars
summary_fields[key] = val[:400] + ("..." if len(val) > 400 else "")
try:
response_summary = _json.dumps(summary_fields, ensure_ascii=False, indent=2)
except (TypeError, ValueError):
response_summary = str(summary_fields)
correction_block = (
"\n\n=== CORRECTION REQUISE — ERREURS DÉTECTÉES DANS TA RÉPONSE PRÉCÉDENTE ===\n"
f"{erreurs_text}\n\n"
f"RÉPONSE PRÉCÉDENTE (À CORRIGER) :\n{response_summary}\n\n"
"Corrige UNIQUEMENT les erreurs ci-dessus. Conserve les parties correctes.\n"
"Réponds avec le même format JSON."
)
return original_prompt + correction_block
def _format_response(parsed: dict, ref_warnings: list[str] | None = None) -> str:
"""Formate la réponse LLM en texte lisible."""
sections = []
analyse = parsed.get("analyse_contestation")
if analyse:
sections.append(f"ANALYSE DE LA CONTESTATION\n{analyse}")
accord = parsed.get("points_accord")
if accord and accord.lower() not in ("aucun", "non applicable", "n/a", ""):
sections.append(f"POINTS D'ACCORD\n{accord}")
# Nouveaux champs structurés par axe
contre_med = parsed.get("contre_arguments_medicaux")
if contre_med:
sections.append(f"CONTRE-ARGUMENTS MÉDICAUX\n{contre_med}")
# Preuves du dossier (nouveau champ structuré)
preuves = parsed.get("preuves_dossier")
if preuves and isinstance(preuves, list):
preuves_lines = []
for p in preuves:
if isinstance(p, dict):
ref = p.get("ref", "")
elem = p.get("element", "")
valeur = p.get("valeur", "")
signif = p.get("signification", "")
ref_prefix = f"[{ref}] " if ref else ""
preuves_lines.append(f"- {ref_prefix}[{elem}] {valeur}{signif}")
if preuves_lines:
sections.append(f"PREUVES DU DOSSIER\n" + "\n".join(preuves_lines))
contre_asym = parsed.get("contre_arguments_asymetrie")
if contre_asym:
sections.append(f"ASYMÉTRIE D'INFORMATION\n{contre_asym}")
contre_regl = parsed.get("contre_arguments_reglementaires")
if contre_regl:
sections.append(f"CONTRE-ARGUMENTS RÉGLEMENTAIRES\n{contre_regl}")
# Fallback : ancien champ unique (réponses en cache existantes)
if not contre_med and not contre_asym and not contre_regl:
contre = parsed.get("contre_arguments")
if contre:
sections.append(f"CONTRE-ARGUMENTS\n{contre}")
# Références structurées (nouveau format liste) ou ancien format string
refs = parsed.get("references")
if refs:
if isinstance(refs, list):
ref_lines = []
for r in refs:
if isinstance(r, dict):
doc = r.get("document", "")
page = r.get("page", "")
citation = r.get("citation", "")
ref_lines.append(f"- [{doc}, p.{page}] {citation}")
else:
ref_lines.append(f"- {r}")
if ref_lines:
sections.append(f"REFERENCES\n" + "\n".join(ref_lines))
else:
sections.append(f"REFERENCES\n{refs}")
conclusion = parsed.get("conclusion")
if conclusion:
sections.append(f"CONCLUSION\n{conclusion}")
# Avertissements sur les références non vérifiables
if ref_warnings:
warning_text = "\n".join(f"- {w}" for w in ref_warnings)
sections.append(f"AVERTISSEMENT — REFERENCES NON VÉRIFIÉES\n{warning_text}")
return "\n\n".join(sections)

View File

@@ -16,6 +16,8 @@ from src.config import (
Traitement,
)
from src.control.cpam_response import (
_build_bio_summary,
_build_correction_prompt,
_build_cpam_prompt,
_build_tagged_context,
_check_das_bio_coherence,
@@ -25,6 +27,7 @@ from src.control.cpam_response import (
_get_code_label,
_search_rag_for_control,
_validate_adversarial,
_validate_codes_in_response,
_validate_grounding,
_validate_references,
generate_cpam_response,
@@ -206,8 +209,8 @@ class TestBuildPrompt:
assert "preuves_dossier" in prompt
@patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale"))
@patch("src.control.cpam_response.normalize_code", return_value="K56.0")
@patch("src.control.cpam_context.validate_code", return_value=(True, "Iléus paralytique et obstruction intestinale"))
@patch("src.control.cpam_context.normalize_code", return_value="K56.0")
def test_prompt_codes_with_cim10_labels(self, mock_norm, mock_valid):
"""Les codes contestés affichent le libellé CIM-10."""
dossier = _make_dossier()
@@ -217,8 +220,8 @@ class TestBuildPrompt:
assert "Iléus paralytique" in prompt
assert "DA proposés par UCR" in prompt
@patch("src.control.cpam_response.validate_code", return_value=(False, ""))
@patch("src.control.cpam_response.normalize_code", return_value="Z99.9")
@patch("src.control.cpam_context.validate_code", return_value=(False, ""))
@patch("src.control.cpam_context.normalize_code", return_value="Z99.9")
def test_prompt_codes_invalid_graceful(self, mock_norm, mock_valid):
"""Les codes invalides ne crashent pas, juste pas de libellé."""
dossier = _make_dossier()
@@ -231,8 +234,8 @@ class TestBuildPrompt:
assert "Z99.9" in prompt
# Pas de crash
@patch("src.control.cpam_response.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable"))
@patch("src.control.cpam_response.normalize_code", return_value="Z45.8")
@patch("src.control.cpam_context.validate_code", return_value=(True, "Ajustement et entretien d'un dispositif implantable"))
@patch("src.control.cpam_context.normalize_code", return_value="Z45.8")
def test_prompt_dp_fallback_from_ucr(self, mock_norm, mock_valid):
"""DP absent + dp_ucr → contexte injecté dans le prompt."""
dossier = DossierMedical(
@@ -397,10 +400,11 @@ class TestValidateReferences:
class TestGenerateResponse:
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama):
def test_generate_success_ollama_cpam(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama):
"""Ollama disponible → 3 passes (extraction + argumentation + validation)."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
@@ -422,6 +426,7 @@ class TestGenerateResponse:
return {"coherent": True, "erreurs": [], "score_confiance": 9}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
@@ -434,18 +439,21 @@ class TestGenerateResponse:
assert len(sources) == 1
assert sources[0].document == "guide_methodo"
# 3 appels Ollama : extraction + argumentation + validation
assert mock_ollama.call_count == 3
assert call_count["n"] == 3
mock_anthropic.assert_not_called()
@patch("src.control.cpam_validation.call_anthropic")
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama):
def test_generate_fallback_haiku(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_val_anthropic):
"""Ollama indisponible → fallback Haiku pour les 3 passes."""
mock_rag.return_value = [
{"document": "guide_methodo", "page": 64, "extrait": "Texte guide"},
]
mock_ollama.return_value = None
mock_val_ollama.return_value = None
call_count = {"n": 0}
def anthropic_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
@@ -462,6 +470,7 @@ class TestGenerateResponse:
return {"coherent": True, "erreurs": [], "score_confiance": 8}
mock_anthropic.side_effect = anthropic_side_effect
mock_val_anthropic.side_effect = anthropic_side_effect
dossier = _make_dossier()
controle = _make_controle()
@@ -470,15 +479,15 @@ class TestGenerateResponse:
assert "Contre-args Haiku..." in text
assert response_data is not None
# Ollama appelé 3 fois mais retourne None
assert mock_ollama.call_count == 3
# Anthropic appelé 3 fois en fallback
assert mock_anthropic.call_count == 3
# 3 appels Ollama (retourne None) + 3 Anthropic en fallback
assert call_count["n"] == 3
@patch("src.control.cpam_validation.call_anthropic", return_value=None)
@patch("src.control.cpam_validation.call_ollama", return_value=None)
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama):
def test_generate_all_unavailable(self, mock_rag, mock_anthropic, mock_ollama, _mock_val_ollama, _mock_val_anthropic):
"""Tous LLMs indisponibles → texte vide, response_data None."""
mock_rag.return_value = []
mock_anthropic.return_value = None
@@ -657,8 +666,8 @@ class TestSearchRagForControl:
arg_call_query = mock_search.call_args_list[0][0][0]
assert len(arg_call_query) > 200
@patch("src.control.cpam_response.validate_code", return_value=(True, "Iléus paralytique"))
@patch("src.control.cpam_response.normalize_code", return_value="K56.0")
@patch("src.control.cpam_rag.validate_code", return_value=(True, "Iléus paralytique"))
@patch("src.control.cpam_rag.normalize_code", return_value="K56.0")
@patch("src.medical.rag_search.search_similar_cpam")
def test_query_cim10_definitions(self, mock_search, mock_norm, mock_valid):
"""Requête 4 exécutée quand codes contestés présents."""
@@ -722,8 +731,8 @@ class TestSearchRagForControl:
class TestGetCim10Definitions:
"""Tests pour l'injection déterministe des définitions CIM-10."""
@patch("src.control.cpam_response.validate_code")
@patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper())
@patch("src.control.cpam_context.validate_code")
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
def test_definitions_injected_in_prompt(self, mock_norm, mock_valid):
"""La section DÉFINITIONS CIM-10 apparaît dans le prompt avec les libellés."""
mock_valid.side_effect = lambda c: {
@@ -742,8 +751,8 @@ class TestGetCim10Definitions:
assert "Iléus paralytique" in prompt
assert "DP établissement" in prompt
@patch("src.control.cpam_response.validate_code")
@patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper())
@patch("src.control.cpam_context.validate_code")
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
def test_definitions_include_dp_and_ucr_codes(self, mock_norm, mock_valid):
"""Les codes du dossier ET de l'UCR sont tous inclus."""
mock_valid.side_effect = lambda c: {
@@ -769,8 +778,8 @@ class TestGetCim10Definitions:
assert "DP proposé UCR" in result
assert "DA proposé UCR" in result or "DAS établissement" in result
@patch("src.control.cpam_response.validate_code", return_value=(False, ""))
@patch("src.control.cpam_response.normalize_code", side_effect=lambda c: c.upper())
@patch("src.control.cpam_context.validate_code", return_value=(False, ""))
@patch("src.control.cpam_context.normalize_code", side_effect=lambda c: c.upper())
def test_definitions_graceful_when_code_unknown(self, mock_norm, mock_valid):
"""Un code inconnu ne crashe pas, affiche un message explicite."""
dossier = DossierMedical(
@@ -1149,9 +1158,10 @@ class TestExtractionPass:
assert "PRÉ-ANALYSE" not in prompt
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response._search_rag_for_control")
def test_generate_calls_three_passes(self, mock_rag, mock_ollama):
def test_generate_calls_three_passes(self, mock_rag, mock_ollama, mock_val_ollama):
"""L'orchestrateur appelle extraction + argumentation + validation."""
call_count = {"n": 0}
@@ -1174,6 +1184,7 @@ class TestExtractionPass:
return {"coherent": True, "erreurs": [], "score_confiance": 9}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
mock_rag.return_value = []
dossier = _make_dossier()
@@ -1181,7 +1192,7 @@ class TestExtractionPass:
text, response_data, sources = generate_cpam_response(dossier, controle)
# 3 appels Ollama : extraction + argumentation + validation
assert mock_ollama.call_count == 3
assert call_count["n"] == 3
assert response_data is not None
assert "Arguments..." in text
@@ -1189,7 +1200,7 @@ class TestExtractionPass:
class TestValidateAdversarial:
"""Tests pour la validation adversariale."""
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_validation.call_ollama")
def test_coherent_response_no_warnings(self, mock_ollama):
"""Réponse cohérente → coherent=true, pas de warnings dans le texte."""
mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 9}
@@ -1208,7 +1219,7 @@ class TestValidateAdversarial:
assert result["coherent"] is True
assert len(result["erreurs"]) == 0
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_validation.call_ollama")
def test_hallucinated_bio_detected(self, mock_ollama):
"""Valeur bio halluccinée → coherent=false avec erreur."""
mock_ollama.return_value = {
@@ -1231,8 +1242,8 @@ class TestValidateAdversarial:
assert len(result["erreurs"]) == 1
assert "CRP" in result["erreurs"][0]
@patch("src.control.cpam_response.call_anthropic", return_value=None)
@patch("src.control.cpam_response.call_ollama", return_value=None)
@patch("src.control.cpam_validation.call_anthropic", return_value=None)
@patch("src.control.cpam_validation.call_ollama", return_value=None)
def test_adversarial_failure_graceful(self, mock_ollama, mock_anthropic):
"""LLM indisponible → retourne None, pas de crash."""
tag_map = {"BIO-1": "CRP: 180 mg/L"}
@@ -1243,9 +1254,10 @@ class TestValidateAdversarial:
assert result is None
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response._search_rag_for_control")
def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama):
def test_adversarial_warnings_in_output(self, mock_rag, mock_ollama, mock_val_ollama):
"""Incohérences détectées → avertissements dans le texte formaté."""
call_count = {"n": 0}
@@ -1267,6 +1279,7 @@ class TestValidateAdversarial:
}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
mock_rag.return_value = []
dossier = _make_dossier()
@@ -1278,7 +1291,7 @@ class TestValidateAdversarial:
def test_adversarial_empty_tag_map(self):
"""Dossier sans tags → validation fonctionne quand même."""
with patch("src.control.cpam_response.call_ollama") as mock_ollama:
with patch("src.control.cpam_validation.call_ollama") as mock_ollama:
mock_ollama.return_value = {"coherent": True, "erreurs": [], "score_confiance": 7}
result = _validate_adversarial(
@@ -1287,3 +1300,380 @@ class TestValidateAdversarial:
assert result is not None
assert result["coherent"] is True
class TestValidateCodesInResponse:
"""Tests pour la validation codes fermée (périmètre dossier + UCR)."""
def test_code_in_dossier_no_warning(self):
"""Code du dossier cité → pas de warning."""
parsed = {"conclusion": "Le code K81.0 est justifié par la cholécystite."}
dossier = _make_dossier() # DP K81.0, DAS K56.0
controle = _make_controle()
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) == 0
def test_code_from_ucr_no_warning(self):
"""Code proposé par l'UCR cité → pas de warning."""
parsed = {"conclusion": "Le code K56.0 contesté par l'UCR est bien justifié."}
dossier = _make_dossier()
controle = _make_controle() # da_ucr="K56.0"
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) == 0
def test_invented_code_detected(self):
"""Code absent du dossier et de l'UCR → warning."""
parsed = {"conclusion": "Le code Z45.8 confirme la nécessité du séjour."}
dossier = _make_dossier() # DP K81.0, DAS K56.0
controle = _make_controle() # da_ucr=K56.0
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) >= 1
assert any("Z45" in w for w in warnings)
def test_subcode_tolerated(self):
"""K81.09 toléré quand K81.0 est dans la whitelist (même préfixe 3 chars)."""
parsed = {"contre_arguments_medicaux": "Le sous-code K81.09 est une précision de K81.0."}
dossier = _make_dossier() # DP K81.0
controle = _make_controle()
warnings = _validate_codes_in_response(parsed, dossier, controle)
# K81.09 partage le préfixe K81 avec K81.0 → toléré
assert len(warnings) == 0
def test_codes_in_citations_excluded(self):
"""Codes dans references[].citation → pas de validation."""
parsed = {
"conclusion": "Le codage est justifié.",
"references": [
{"document": "CIM-10", "citation": "Z45.8 — Ajustement d'un dispositif"},
],
}
dossier = _make_dossier()
controle = _make_controle()
warnings = _validate_codes_in_response(parsed, dossier, controle)
# Z45.8 est dans references, pas dans les champs textuels → pas flaggé
assert len(warnings) == 0
def test_no_codes_in_response_no_warning(self):
"""Réponse sans codes CIM-10 → 0 warnings."""
parsed = {"conclusion": "Le séjour est justifié par la gravité clinique."}
dossier = _make_dossier()
controle = _make_controle()
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) == 0
def test_multiple_invented_codes(self):
"""Plusieurs codes hors périmètre → autant de warnings."""
parsed = {
"contre_arguments_medicaux": "Les codes Z45.8 et E11.9 confirment le diagnostic.",
}
dossier = _make_dossier() # K81.0, K56.0
controle = _make_controle()
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) >= 2
def test_no_whitelist_no_validation(self):
"""Aucun code dans le dossier ni l'UCR → pas de validation (0 warnings)."""
parsed = {"conclusion": "Le code Z45.8 est justifié."}
dossier = DossierMedical(source_file="test.pdf", diagnostic_principal=None)
controle = ControleCPAM(
numero_ogc=1, titre="Test", arg_ucr="Test",
decision_ucr="Rejet", dp_ucr=None, da_ucr=None,
)
warnings = _validate_codes_in_response(parsed, dossier, controle)
assert len(warnings) == 0
class TestBuildBioSummary:
"""Tests pour le résumé biologique déterministe."""
def test_bio_summary_interpretation(self):
"""CRP élevée, Hb basse → résumé correct avec interprétations cliniques."""
dossier = DossierMedical(
source_file="test.pdf",
biologie_cle=[
BiologieCle(test="CRP", valeur="180 mg/L", anomalie=True),
BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True),
],
)
summary = _build_bio_summary(dossier)
assert "CRP" in summary
assert "ÉLEVÉ" in summary
assert "infection/inflammation active" in summary
assert "Hémoglobine" in summary
assert "BAS" in summary
assert "anémie" in summary
def test_bio_summary_normal_values(self):
"""Valeurs normales → interprétation 'normal' affichée."""
dossier = DossierMedical(
source_file="test.pdf",
biologie_cle=[
BiologieCle(test="Plaquettes", valeur="250 G/L", anomalie=False),
],
)
summary = _build_bio_summary(dossier)
assert "NORMAL" in summary
assert "numération normale" in summary
def test_bio_summary_in_prompt(self):
"""Le résumé bio apparaît dans le prompt CPAM."""
dossier = _make_dossier_complet() # CRP 180, Créatinine 450
controle = _make_controle()
prompt, _ = _build_cpam_prompt(dossier, controle, [])
assert "FAITS BIOLOGIQUES VÉRIFIÉS" in prompt
assert "NE PAS MODIFIER" in prompt
assert "RÈGLE STRICTE" in prompt
def test_bio_summary_empty_no_bio(self):
"""Pas de biologie → résumé vide."""
dossier = DossierMedical(source_file="test.pdf")
summary = _build_bio_summary(dossier)
assert summary == ""
def test_bio_summary_unknown_test(self):
"""Test bio non reconnu (hors BIO_NORMALS) → omis du résumé."""
dossier = DossierMedical(
source_file="test.pdf",
biologie_cle=[
BiologieCle(test="Ferritine", valeur="15 µg/L", anomalie=True),
],
)
summary = _build_bio_summary(dossier)
assert summary == ""
def test_bio_summary_unparseable_value(self):
"""Valeur bio non parseable → omise sans crash."""
dossier = DossierMedical(
source_file="test.pdf",
biologie_cle=[
BiologieCle(test="CRP", valeur="positif", anomalie=True),
BiologieCle(test="Hémoglobine", valeur="8.5 g/dL", anomalie=True),
],
)
summary = _build_bio_summary(dossier)
# CRP "positif" non parseable → omis, mais Hb présente
assert "Hémoglobine" in summary
assert "CRP" not in summary
class TestCorrectionLoop:
"""Tests pour la boucle de correction adversariale."""
@patch("src.control.cpam_response.rule_enabled", return_value=True)
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_correction_triggered_when_score_low(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
"""Score adversarial ≤ 5 → correction relancée (5 appels LLM total)."""
mock_rag.return_value = []
call_count = {"n": 0}
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
# Passe 1 extraction
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
elif call_count["n"] == 2:
# Passe 2 argumentation
return {
"analyse_contestation": "Analyse...",
"contre_arguments_medicaux": "Arguments erronés...",
"conclusion": "Conclusion avec erreurs...",
}
elif call_count["n"] == 3:
# Passe 3 validation adversariale → score bas
return {"coherent": False, "erreurs": ["CRP citée à 250 mais vaut 180"], "score_confiance": 3}
elif call_count["n"] == 4:
# Passe 4 correction
return {
"analyse_contestation": "Analyse corrigée...",
"contre_arguments_medicaux": "Arguments corrigés...",
"conclusion": "Conclusion corrigée...",
}
else:
# Passe 5 re-validation
return {"coherent": True, "erreurs": [], "score_confiance": 8}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
text, response_data, sources = generate_cpam_response(dossier, controle)
# 5 appels Ollama : extraction + argumentation + validation + correction + re-validation
assert call_count["n"] == 5
# La correction a été acceptée (score 8 > 3)
assert "corrigé" in text.lower()
@patch("src.control.cpam_response.rule_enabled", return_value=True)
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_no_correction_when_score_high(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
"""Score adversarial > 5 → pas de correction (3 appels LLM)."""
mock_rag.return_value = []
call_count = {"n": 0}
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
elif call_count["n"] == 2:
return {
"analyse_contestation": "Analyse...",
"contre_arguments_medicaux": "Arguments...",
"conclusion": "Conclusion...",
}
else:
return {"coherent": True, "erreurs": [], "score_confiance": 8}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
text, response_data, sources = generate_cpam_response(dossier, controle)
# Seulement 3 appels : extraction + argumentation + validation
assert call_count["n"] == 3
@patch("src.control.cpam_response.rule_enabled", return_value=True)
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_correction_accepted_when_score_improves(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
"""Score passe de 3 à 7 → correction acceptée."""
mock_rag.return_value = []
call_count = {"n": 0}
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
elif call_count["n"] == 2:
return {
"analyse_contestation": "Analyse originale...",
"contre_arguments_medicaux": "Arguments originaux...",
"conclusion": "Conclusion originale...",
}
elif call_count["n"] == 3:
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 3}
elif call_count["n"] == 4:
return {
"analyse_contestation": "Analyse améliorée...",
"contre_arguments_medicaux": "Arguments améliorés...",
"conclusion": "Conclusion améliorée...",
}
else:
return {"coherent": True, "erreurs": [], "score_confiance": 7}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
text, response_data, sources = generate_cpam_response(dossier, controle)
# Le résultat final est la correction
assert response_data["conclusion"] == "Conclusion améliorée..."
@patch("src.control.cpam_response.rule_enabled", return_value=True)
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_correction_rejected_when_score_same(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
"""Score ne s'améliore pas → original conservé."""
mock_rag.return_value = []
call_count = {"n": 0}
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
elif call_count["n"] == 2:
return {
"analyse_contestation": "Analyse originale...",
"contre_arguments_medicaux": "Arguments originaux...",
"conclusion": "Conclusion originale...",
}
elif call_count["n"] == 3:
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 4}
elif call_count["n"] == 4:
return {
"analyse_contestation": "Correction pire...",
"contre_arguments_medicaux": "Arguments pires...",
"conclusion": "Conclusion pire...",
}
else:
return {"coherent": False, "erreurs": ["Encore des erreurs"], "score_confiance": 3}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
text, response_data, sources = generate_cpam_response(dossier, controle)
# Score correction (3) <= score original (4) → original conservé
assert response_data["conclusion"] == "Conclusion originale..."
@patch("src.control.cpam_response.rule_enabled", return_value=False)
@patch("src.control.cpam_validation.call_ollama")
@patch("src.control.cpam_response.call_ollama")
@patch("src.control.cpam_response.call_anthropic")
@patch("src.control.cpam_response._search_rag_for_control")
def test_correction_disabled_by_rule(self, mock_rag, mock_anthropic, mock_ollama, mock_val_ollama, mock_rule):
"""RULE-CPAM-CORRECTION-LOOP désactivée → pas de retry."""
mock_rag.return_value = []
call_count = {"n": 0}
def ollama_side_effect(prompt, temperature=0.1, max_tokens=4000, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return {"comprehension_contestation": "Extraction", "elements_cliniques_pertinents": [], "points_accord_potentiels": [], "codes_en_jeu": {}}
elif call_count["n"] == 2:
return {
"analyse_contestation": "Analyse...",
"contre_arguments_medicaux": "Arguments...",
"conclusion": "Conclusion...",
}
else:
return {"coherent": False, "erreurs": ["Erreur bio"], "score_confiance": 2}
mock_ollama.side_effect = ollama_side_effect
mock_val_ollama.side_effect = ollama_side_effect
dossier = _make_dossier()
controle = _make_controle()
text, response_data, sources = generate_cpam_response(dossier, controle)
# Seulement 3 appels, pas de correction (règle désactivée)
assert call_count["n"] == 3
def test_build_correction_prompt_format(self):
"""Le prompt de correction contient les erreurs et la réponse originale."""
original_prompt = "Prompt d'argumentation original..."
original_response = {
"analyse_contestation": "Analyse avec erreur CRP 250",
"conclusion": "Conclusion erronée",
}
adversarial_result = {
"coherent": False,
"erreurs": ["CRP citée à 250 mg/L mais le dossier indique 180 mg/L"],
"score_confiance": 3,
}
correction = _build_correction_prompt(original_prompt, original_response, adversarial_result)
assert "CORRECTION REQUISE" in correction
assert "CRP citée à 250" in correction
assert "Prompt d'argumentation original" in correction
assert "Corrige UNIQUEMENT" in correction