#!/usr/bin/env python3 """Test qualité CPAM — format TIM (mémoire en défense) sur dossiers réels. Charge des dossiers JSON existants et appelle generate_cpam_response() pour valider le nouveau format TIM sans relancer le pipeline complet. """ import json import logging import sys import time from pathlib import Path # Ajouter le répertoire racine au path sys.path.insert(0, str(Path(__file__).parent)) from src.config import DossierMedical, ControleCPAM from src.control.cpam_response import generate_cpam_response from src.control.cpam_validation import _is_new_tim_format # Configurer logging logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-5s %(name)s — %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("test_cpam_quality") # Dossiers à tester (variété de cas) DOSSIERS_TEST = [ "183_23087212", # Désaccord DP+DAS "116_23065570", # DAS "143_23096917", # DP+DAS "132_23080179", # Facturation ] def load_dossier(name: str) -> DossierMedical | None: """Charge un dossier JSON depuis output/structured/.""" base = Path(__file__).parent / "output" / "structured" / name # Préférer le fichier fusionné fusionne = list(base.glob("*_fusionne_cim10.json")) json_files = fusionne if fusionne else sorted(base.glob("*.json")) if not json_files: logger.error("Aucun JSON trouvé pour %s", name) return None with open(json_files[0], encoding="utf-8") as f: data = json.load(f) return DossierMedical(**data) def test_dossier(name: str) -> dict: """Teste generate_cpam_response sur un dossier et retourne les métriques.""" logger.info("=" * 70) logger.info("DOSSIER : %s", name) logger.info("=" * 70) dossier = load_dossier(name) if not dossier: return {"name": name, "error": "Dossier non trouvé"} if not dossier.controles_cpam: return {"name": name, "error": "Pas de contrôle CPAM"} controle = dossier.controles_cpam[0] logger.info("Contrôle : OGC %d — %s", controle.numero_ogc, controle.titre) logger.info("DP UCR : %s | DA UCR : %s", controle.dp_ucr or "-", controle.da_ucr or "-") # Appeler generate_cpam_response t0 = time.time() text, result, rag_sources = generate_cpam_response(dossier, controle) elapsed = time.time() - t0 metrics = { "name": name, "titre": controle.titre, "elapsed_s": round(elapsed, 1), "text_len": len(text), "rag_sources": len(rag_sources), "tier": controle.quality_tier or "?", } if result: is_tim = _is_new_tim_format(result) metrics["format"] = "TIM" if is_tim else "legacy" if is_tim: # Nouveau format TIM moyens = result.get("moyens_defense", []) confrontation = result.get("confrontation_bio", []) codes_nd = result.get("codes_non_defendables", []) refs = result.get("references", []) conclusion = result.get("conclusion_dispositive", "") # Compter les preuves dans les moyens total_preuves = 0 preuves_with_ref = 0 for m in moyens: if isinstance(m, dict): for p in m.get("preuves", []): if isinstance(p, dict): total_preuves += 1 if p.get("ref"): preuves_with_ref += 1 metrics["moyens_count"] = len(moyens) metrics["preuves_count"] = total_preuves metrics["preuves_with_ref"] = preuves_with_ref metrics["confrontation_count"] = len(confrontation) metrics["codes_nd_count"] = len(codes_nd) metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0 metrics["conclusion_len"] = len(conclusion) metrics["has_rappel_faits"] = bool(result.get("rappel_faits")) metrics["has_reponse_cpam"] = bool(result.get("reponse_points_cpam")) logger.info("-" * 40) logger.info("FORMAT : TIM (mémoire en défense)") logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"]) logger.info(" Moyens de défense : %d", len(moyens)) logger.info(" Preuves : %d (dont %d avec tag)", total_preuves, preuves_with_ref) logger.info(" Confrontation bio : %d entrées", len(confrontation)) logger.info(" Codes non défendables : %d", len(codes_nd)) logger.info(" Références : %d", metrics["refs_count"]) logger.info(" Sources RAG : %d", len(rag_sources)) if confrontation: for row in confrontation: if isinstance(row, dict): logger.info(" Bio: %s → %s = %s → %s", row.get("diagnostic", "?"), row.get("test", "?"), row.get("valeur", "?"), row.get("verdict", "?")) if codes_nd: for nd in codes_nd: if isinstance(nd, dict): logger.info(" ⚠ Non défendable: %s — %s", nd.get("code", "?"), nd.get("raison", "?")[:80]) # --- Guardian report --- guardian = result.get("guardian_report", {}) if guardian: bio_corr = guardian.get("bio_corrections", []) codes_moved = guardian.get("codes_moved_to_nd", []) text_repl = guardian.get("text_replacements", 0) score_f = guardian.get("score_factuel", "?") metrics["guardian_bio_corrections"] = len(bio_corr) metrics["guardian_codes_moved"] = len(codes_moved) metrics["guardian_text_replacements"] = int(text_repl) if text_repl else 0 metrics["guardian_score_factuel"] = score_f logger.info(" --- GUARDIAN REPORT ---") logger.info(" Score factuel : %s/10", score_f) logger.info(" Bio corrections : %d", len(bio_corr)) for c in bio_corr: logger.info(" %s : LLM=%s → réel=%s", c.get("test", "?"), c.get("valeur_llm", c.get("llm_value", "?")), c.get("valeur_reelle", c.get("real_value", "?"))) if codes_moved: logger.info(" Codes déplacés vers non-défendables : %s", ", ".join(codes_moved)) if text_repl: logger.info(" Remplacements texte : %s", text_repl) else: metrics["guardian_bio_corrections"] = 0 metrics["guardian_codes_moved"] = 0 metrics["guardian_text_replacements"] = 0 metrics["guardian_score_factuel"] = "N/A" else: # Ancien format (fallback) preuves = result.get("preuves_dossier", []) refs = result.get("references", []) conclusion = result.get("conclusion", "") metrics["moyens_count"] = 0 metrics["preuves_count"] = len(preuves) if isinstance(preuves, list) else 0 metrics["preuves_with_ref"] = sum(1 for p in (preuves or []) if isinstance(p, dict) and p.get("ref")) metrics["confrontation_count"] = 0 metrics["codes_nd_count"] = 0 metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0 metrics["conclusion_len"] = len(conclusion) logger.info("-" * 40) logger.info("FORMAT : legacy (ancien)") logger.info("RÉSULTAT : %d chars, %.1fs, tier %s", len(text), elapsed, metrics["tier"]) else: metrics["error"] = "LLM a retourné None" metrics["format"] = "N/A" logger.error("LLM n'a retourné aucun résultat !") # Afficher la contre-argumentation complète print("\n" + "~" * 70) print("CONTRE-ARGUMENTATION :") print("~" * 70) print(text[:5000] if text else "(vide)") if len(text) > 5000: print(f"\n... [tronqué, {len(text)} chars au total]") return metrics def main(): dossiers = sys.argv[1:] if len(sys.argv) > 1 else DOSSIERS_TEST results = [] for name in dossiers: try: metrics = test_dossier(name) results.append(metrics) except Exception as e: logger.exception("Erreur sur %s", name) results.append({"name": name, "error": str(e)}) # Résumé final print("\n" + "=" * 70) print("RÉSUMÉ — FORMAT TIM") print("=" * 70) print(f"{'Dossier':<20} {'Fmt':>5} {'Tier':>4} {'Temps':>6} {'Chars':>6} {'Moyens':>7} {'Bio':>4} {'ND':>3} {'Refs':>5} {'RAG':>4} {'G.Fix':>5} {'G.Mv':>4} {'G.Txt':>5} {'G.Sc':>4}") print("-" * 105) for r in results: if "error" in r: print(f"{r['name']:<20} ERREUR: {r['error']}") else: print( f"{r['name']:<20} " f"{r.get('format', '?'):>5} " f"{r.get('tier', '?'):>4} " f"{r['elapsed_s']:>5.1f}s " f"{r['text_len']:>6} " f"{r.get('moyens_count', 0):>7} " f"{r.get('confrontation_count', 0):>4} " f"{r.get('codes_nd_count', 0):>3} " f"{r.get('refs_count', 0):>5} " f"{r['rag_sources']:>4} " f"{r.get('guardian_bio_corrections', 0):>5} " f"{r.get('guardian_codes_moved', 0):>4} " f"{r.get('guardian_text_replacements', 0):>5} " f"{str(r.get('guardian_score_factuel', 'N/A')):>4}" ) if __name__ == "__main__": main()