155 lines
5.0 KiB
Python
155 lines
5.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Test qualité des solutions CPAM 1+2+3+6 sur des dossiers réels.
|
|
|
|
Charge des dossiers JSON existants et appelle generate_cpam_response()
|
|
pour valider les nouvelles fonctionnalités sans relancer le pipeline complet.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
# Ajouter le répertoire racine au path
|
|
sys.path.insert(0, str(Path(__file__).parent))
|
|
|
|
from src.config import DossierMedical, ControleCPAM
|
|
from src.control.cpam_response import generate_cpam_response
|
|
|
|
# Configurer logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s %(levelname)-5s %(name)s — %(message)s",
|
|
datefmt="%H:%M:%S",
|
|
)
|
|
logger = logging.getLogger("test_cpam_quality")
|
|
|
|
# Dossiers à tester (variété de cas)
|
|
DOSSIERS_TEST = [
|
|
"183_23087212", # Désaccord DP+DAS
|
|
"116_23065570", # DAS
|
|
"143_23096917", # DP+DAS
|
|
"132_23080179", # Facturation
|
|
]
|
|
|
|
|
|
def load_dossier(name: str) -> DossierMedical | None:
|
|
"""Charge un dossier JSON depuis output/structured/."""
|
|
base = Path(__file__).parent / "output" / "structured" / name
|
|
json_files = sorted(base.glob("*.json"))
|
|
if not json_files:
|
|
logger.error("Aucun JSON trouvé pour %s", name)
|
|
return None
|
|
with open(json_files[0], encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return DossierMedical(**data)
|
|
|
|
|
|
def test_dossier(name: str) -> dict:
|
|
"""Teste generate_cpam_response sur un dossier et retourne les métriques."""
|
|
logger.info("=" * 70)
|
|
logger.info("DOSSIER : %s", name)
|
|
logger.info("=" * 70)
|
|
|
|
dossier = load_dossier(name)
|
|
if not dossier:
|
|
return {"name": name, "error": "Dossier non trouvé"}
|
|
|
|
if not dossier.controles_cpam:
|
|
return {"name": name, "error": "Pas de contrôle CPAM"}
|
|
|
|
controle = dossier.controles_cpam[0]
|
|
logger.info("Contrôle : OGC %d — %s", controle.numero_ogc, controle.titre)
|
|
logger.info("DP UCR : %s | DA UCR : %s", controle.dp_ucr or "-", controle.da_ucr or "-")
|
|
|
|
# Appeler generate_cpam_response
|
|
t0 = time.time()
|
|
text, result, rag_sources = generate_cpam_response(dossier, controle)
|
|
elapsed = time.time() - t0
|
|
|
|
metrics = {
|
|
"name": name,
|
|
"titre": controle.titre,
|
|
"elapsed_s": round(elapsed, 1),
|
|
"text_len": len(text),
|
|
"rag_sources": len(rag_sources),
|
|
}
|
|
|
|
if result:
|
|
# Vérifier les nouvelles fonctionnalités
|
|
preuves = result.get("preuves_dossier", [])
|
|
refs = result.get("references", [])
|
|
accord = result.get("points_accord", "")
|
|
conclusion = result.get("conclusion", "")
|
|
|
|
metrics["preuves_count"] = len(preuves) if isinstance(preuves, list) else 0
|
|
metrics["refs_count"] = len(refs) if isinstance(refs, list) else 0
|
|
metrics["accord"] = accord[:100] if accord else "(vide)"
|
|
metrics["conclusion_len"] = len(conclusion)
|
|
|
|
# Vérifier le grounding (tags BIO-N, IMG-N, etc.)
|
|
refs_with_tags = 0
|
|
if isinstance(preuves, list):
|
|
for p in preuves:
|
|
if isinstance(p, dict) and p.get("ref", ""):
|
|
refs_with_tags += 1
|
|
metrics["preuves_with_ref"] = refs_with_tags
|
|
|
|
logger.info("-" * 40)
|
|
logger.info("RÉSULTAT : %d chars, %.1fs", len(text), elapsed)
|
|
logger.info(" Preuves : %d (dont %d avec tag)", metrics["preuves_count"], refs_with_tags)
|
|
logger.info(" Références : %d", metrics["refs_count"])
|
|
logger.info(" Sources RAG : %d", len(rag_sources))
|
|
logger.info(" Points d'accord : %s", accord[:80] if accord else "(vide)")
|
|
else:
|
|
metrics["error"] = "LLM a retourné None"
|
|
logger.error("LLM n'a retourné aucun résultat !")
|
|
|
|
# Afficher la contre-argumentation complète
|
|
logger.info("\n" + "~" * 70)
|
|
logger.info("CONTRE-ARGUMENTATION :")
|
|
logger.info("~" * 70)
|
|
print(text[:3000] if text else "(vide)")
|
|
if len(text) > 3000:
|
|
print(f"\n... [tronqué, {len(text)} chars au total]")
|
|
|
|
return metrics
|
|
|
|
|
|
def main():
|
|
dossiers = sys.argv[1:] if len(sys.argv) > 1 else DOSSIERS_TEST
|
|
results = []
|
|
|
|
for name in dossiers:
|
|
try:
|
|
metrics = test_dossier(name)
|
|
results.append(metrics)
|
|
except Exception as e:
|
|
logger.exception("Erreur sur %s", name)
|
|
results.append({"name": name, "error": str(e)})
|
|
|
|
# Résumé final
|
|
print("\n" + "=" * 70)
|
|
print("RÉSUMÉ")
|
|
print("=" * 70)
|
|
print(f"{'Dossier':<20} {'Temps':>6} {'Chars':>6} {'Preuves':>8} {'Refs':>5} {'RAG':>4} {'Tags':>5}")
|
|
print("-" * 70)
|
|
for r in results:
|
|
if "error" in r:
|
|
print(f"{r['name']:<20} ERREUR: {r['error']}")
|
|
else:
|
|
print(
|
|
f"{r['name']:<20} "
|
|
f"{r['elapsed_s']:>5.1f}s "
|
|
f"{r['text_len']:>6} "
|
|
f"{r.get('preuves_count', 0):>8} "
|
|
f"{r.get('refs_count', 0):>5} "
|
|
f"{r['rag_sources']:>4} "
|
|
f"{r.get('preuves_with_ref', 0):>5}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|