Guardian déterministe post-LLM (0 appel modèle, <1ms) : - Corrige les valeurs bio hallucinées via confrontation dossier - Step 1b : vérifie l'association test↔diagnostic via _BIO_THRESHOLDS - Chemin bidirectionnel : CONFIRMÉ↔NON CONFIRMÉ selon bio réelle - Force R3 : codes bio-infirmés → codes_non_defendables - Step 2b : retire les codes bio-confirmés de codes_non_defendables - Retire les moyens défendant des codes bio-contredits - _safe_bio_replace() : regex protégeant les normes [X-Y] - Nettoyage texte libre (conclusion, rappel, codes_nd, raisonnement) - Score factuel déterministe avec pénalités Config modèles pour déploiement local (DGX Spark) : - CPAM : mistral-small3.2:24b (TIM complet, bonne précision bio) - Validation : qwen3:32b (rapide, LOGIC-3 actif) - Timeout : 120s → 600s pour modèles locaux Ollama : migration /api/generate → /api/chat (messages format) Prompt CPAM_ARGUMENTATION restructuré : - R1-R6 non-négociables en tête (avant données) - Champ raisonnement_interne (chain-of-thought structuré) - 5 passes TIM avec références explicites aux règles Test cpam_quality : métriques guardian dans le résumé Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
240 lines
9.6 KiB
Python
240 lines
9.6 KiB
Python
#!/usr/bin/env python3
|
|
"""Test qualité CPAM — format TIM (mémoire en défense) sur dossiers réels.
|
|
|
|
Charge des dossiers JSON existants et appelle generate_cpam_response()
|
|
pour valider le nouveau format TIM sans relancer le pipeline complet.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Ajouter le répertoire racine au path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from src.config import DossierMedical, ControleCPAM
|
|
from src.control.cpam_response import generate_cpam_response
|
|
from src.control.cpam_validation import _is_new_tim_format
|
|
|
|
# Configurer logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-5s %(name)s — %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
logger = logging.getLogger("test_cpam_quality")
|
|
|
|
# Dossiers à tester (variété de cas)
|
|
DOSSIERS_TEST = [
|
|
"183_23087212", # Désaccord DP+DAS
|
|
"116_23065570", # DAS
|
|
"143_23096917", # DP+DAS
|
|
"132_23080179", # Facturation
|
|
]
|
|
|
|
|
|
def load_dossier(name: str) -> DossierMedical | None:
|
|
"""Charge un dossier JSON depuis output/structured/."""
|
|
base = Path(__file__).parent / "output" / "structured" / name
|
|
# Préférer le fichier fusionné
|
|
fusionne = list(base.glob("*_fusionne_cim10.json"))
|
|
json_files = fusionne if fusionne else sorted(base.glob("*.json"))
|
|
if not json_files:
|
|
logger.error("Aucun JSON trouvé pour %s", name)
|
|
return None
|
|
with open(json_files[0], encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return DossierMedical(**data)
|
|
|
|
|
|
def test_dossier(name: str) -> dict:
|
|
"""Teste generate_cpam_response sur un dossier et retourne les métriques."""
|
|
logger.info("=" * 70)
|
|
logger.info("DOSSIER : %s", name)
|
|
logger.info("=" * 70)
|
|
|
|
dossier = load_dossier(name)
|
|
if not dossier:
|
|
return {"name": name, "error": "Dossier non trouvé"}
|
|
|
|
if not dossier.controles_cpam:
|
|
return {"name": name, "error": "Pas de contrôle CPAM"}
|
|
|
|
controle = dossier.controles_cpam[0]
|
|
logger.info("Contrôle : OGC %d — %s", controle.numero_ogc, controle.titre)
|
|
logger.info("DP UCR : %s | DA UCR : %s", controle.dp_ucr or "-", controle.da_ucr or "-")
|
|
|
|
# Appeler generate_cpam_response
|
|
t0 = time.time()
|
|
text, result, rag_sources = generate_cpam_response(dossier, controle)
|
|
elapsed = time.time() - t0
|
|
|
|
metrics = {
|
|
"name": name,
|
|
"titre": controle.titre,
|
|
"elapsed_s": round(elapsed, 1),
|
|
"text_len": len(text),
|
|
"rag_sources": len(rag_sources),
|
|
"tier": controle.quality_tier or "?",
|
|
}
|
|
|
|
if result:
|
|
is_tim = _is_new_tim_format(result)
|
|
metrics["format"] = "TIM" if is_tim else "legacy"
|
|
|
|
if is_tim:
|
|
# Nouveau format TIM
|
|
moyens = result.get("moyens_defense", [])
|
|
confrontation = result.get("confrontation_bio", [])
|
|
codes_nd = result.get("codes_non_defendables", [])
|
|
refs = result.get("references", [])
|
|
conclusion = result.get("conclusion_dispositive", "")
|
|
|
|
# Compter les preuves dans les moyens
|
|
total_preuves = 0
|
|
preuves_with_ref = 0
|
|
for m in moyens:
|
|
if isinstance(m, dict):
|
|
for p in m.get("preuves", []):
|
|
if isinstance(p, dict):
|
|
total_preuves += 1
|
|
if p.get("ref"):
|
|
preuves_with_ref += 1
|
|
|
|
metrics["moyens_count"] = len(moyens)
|
|
metrics["preuves_count"] = total_preuves
|
|
metrics["preuves_with_ref"] = preuves_with_ref
|
|
metrics["confrontation_count"] = len(confrontation)
|
|
metrics["codes_nd_count"] = len(codes_nd)
|
|
metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0
|
|
metrics["conclusion_len"] = len(conclusion)
|
|
metrics["has_rappel_faits"] = bool(result.get("rappel_faits"))
|
|
metrics["has_reponse_cpam"] = bool(result.get("reponse_points_cpam"))
|
|
|
|
logger.info("-" * 40)
|
|
logger.info("FORMAT : TIM (mémoire en défense)")
|
|
logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"])
|
|
logger.info(" Moyens de défense : %d", len(moyens))
|
|
logger.info(" Preuves : %d (dont %d avec tag)", total_preuves, preuves_with_ref)
|
|
logger.info(" Confrontation bio : %d entrées", len(confrontation))
|
|
logger.info(" Codes non défendables : %d", len(codes_nd))
|
|
logger.info(" Références : %d", metrics["refs_count"])
|
|
logger.info(" Sources RAG : %d", len(rag_sources))
|
|
if confrontation:
|
|
for row in confrontation:
|
|
if isinstance(row, dict):
|
|
logger.info(" Bio: %s → %s = %s → %s",
|
|
row.get("diagnostic", "?"), row.get("test", "?"),
|
|
row.get("valeur", "?"), row.get("verdict", "?"))
|
|
if codes_nd:
|
|
for nd in codes_nd:
|
|
if isinstance(nd, dict):
|
|
logger.info(" ⚠ Non défendable: %s — %s",
|
|
nd.get("code", "?"), nd.get("raison", "?")[:80])
|
|
|
|
# --- Guardian report ---
|
|
guardian = result.get("guardian_report", {})
|
|
if guardian:
|
|
bio_corr = guardian.get("bio_corrections", [])
|
|
codes_moved = guardian.get("codes_moved_to_nd", [])
|
|
text_repl = guardian.get("text_replacements", 0)
|
|
score_f = guardian.get("score_factuel", "?")
|
|
metrics["guardian_bio_corrections"] = len(bio_corr)
|
|
metrics["guardian_codes_moved"] = len(codes_moved)
|
|
metrics["guardian_text_replacements"] = int(text_repl) if text_repl else 0
|
|
metrics["guardian_score_factuel"] = score_f
|
|
logger.info(" --- GUARDIAN REPORT ---")
|
|
logger.info(" Score factuel : %s/10", score_f)
|
|
logger.info(" Bio corrections : %d", len(bio_corr))
|
|
for c in bio_corr:
|
|
logger.info(" %s : LLM=%s → réel=%s",
|
|
c.get("test", "?"), c.get("valeur_llm", c.get("llm_value", "?")),
|
|
c.get("valeur_reelle", c.get("real_value", "?")))
|
|
if codes_moved:
|
|
logger.info(" Codes déplacés vers non-défendables : %s",
|
|
", ".join(codes_moved))
|
|
if text_repl:
|
|
logger.info(" Remplacements texte : %s", text_repl)
|
|
else:
|
|
metrics["guardian_bio_corrections"] = 0
|
|
metrics["guardian_codes_moved"] = 0
|
|
metrics["guardian_text_replacements"] = 0
|
|
metrics["guardian_score_factuel"] = "N/A"
|
|
else:
|
|
# Ancien format (fallback)
|
|
preuves = result.get("preuves_dossier", [])
|
|
refs = result.get("references", [])
|
|
conclusion = result.get("conclusion", "")
|
|
|
|
metrics["moyens_count"] = 0
|
|
metrics["preuves_count"] = len(preuves) if isinstance(preuves, list) else 0
|
|
metrics["preuves_with_ref"] = sum(1 for p in (preuves or []) if isinstance(p, dict) and p.get("ref"))
|
|
metrics["confrontation_count"] = 0
|
|
metrics["codes_nd_count"] = 0
|
|
metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0
|
|
metrics["conclusion_len"] = len(conclusion)
|
|
|
|
logger.info("-" * 40)
|
|
logger.info("FORMAT : legacy (ancien)")
|
|
logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"])
|
|
else:
|
|
metrics["error"] = "LLM a retourné None"
|
|
metrics["format"] = "N/A"
|
|
logger.error("LLM n'a retourné aucun résultat !")
|
|
|
|
# Afficher la contre-argumentation complète
|
|
print("\n" + "~" * 70)
|
|
print("CONTRE-ARGUMENTATION :")
|
|
print("~" * 70)
|
|
print(text[:5000] if text else "(vide)")
|
|
if len(text) > 5000:
|
|
print(f"\n... [tronqué, {len(text)} chars au total]")
|
|
|
|
return metrics
|
|
|
|
|
|
def main():
|
|
dossiers = sys.argv[1:] if len(sys.argv) > 1 else DOSSIERS_TEST
|
|
results = []
|
|
|
|
for name in dossiers:
|
|
try:
|
|
metrics = test_dossier(name)
|
|
results.append(metrics)
|
|
except Exception as e:
|
|
logger.exception("Erreur sur %s", name)
|
|
results.append({"name": name, "error": str(e)})
|
|
|
|
# Résumé final
|
|
print("\n" + "=" * 70)
|
|
print("RÉSUMÉ — FORMAT TIM")
|
|
print("=" * 70)
|
|
print(f"{'Dossier':<20} {'Fmt':>5} {'Tier':>4} {'Temps':>6} {'Chars':>6} {'Moyens':>7} {'Bio':>4} {'ND':>3} {'Refs':>5} {'RAG':>4} {'G.Fix':>5} {'G.Mv':>4} {'G.Txt':>5} {'G.Sc':>4}")
|
|
print("-" * 105)
|
|
for r in results:
|
|
if "error" in r:
|
|
print(f"{r['name']:<20} ERREUR: {r['error']}")
|
|
else:
|
|
print(
|
|
f"{r['name']:<20} "
|
|
f"{r.get('format', '?'):>5} "
|
|
f"{r.get('tier', '?'):>4} "
|
|
f"{r['elapsed_s']:>5.1f}s "
|
|
f"{r['text_len']:>6} "
|
|
f"{r.get('moyens_count', 0):>7} "
|
|
f"{r.get('confrontation_count', 0):>4} "
|
|
f"{r.get('codes_nd_count', 0):>3} "
|
|
f"{r.get('refs_count', 0):>5} "
|
|
f"{r['rag_sources']:>4} "
|
|
f"{r.get('guardian_bio_corrections', 0):>5} "
|
|
f"{r.get('guardian_codes_moved', 0):>4} "
|
|
f"{r.get('guardian_text_replacements', 0):>5} "
|
|
f"{str(r.get('guardian_score_factuel', 'N/A')):>4}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|