Permet aux médecins DIM de valider/corriger les codes CIM-10 extraits par le pipeline pour construire un gold standard (50 dossiers). - ValidationManager : gestion annotations JSON dans data/gold_standard/ - Script sélection 50 dossiers (25 CPAM + 25 stratifiés CMD/confiance) - Routes /validation, /api/cim10/search, /api/validation/save, /validation/metrics - Formulaire avec autocomplete CIM-10, boutons Correct/Modifier/Supprimer - Dashboard métriques : precision, recall, F1, hallucination par confiance/source Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
232 lines
7.7 KiB
Python
232 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
"""Sélectionne 50 dossiers pour le gold standard de validation DIM.
|
|
|
|
- 25 dossiers CPAM (cas complexes, déjà contrôlés)
|
|
- 25 dossiers non-CPAM stratifiés par CMD, confiance DP, nombre de DAS
|
|
|
|
Crée data/gold_standard/_selection.json et initialise les annotations vides.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import random
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
# Ajouter le répertoire racine au path
|
|
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
|
|
|
from src.config import STRUCTURED_DIR, BASE_DIR, DossierMedical
|
|
|
|
GOLD_DIR = BASE_DIR / "data" / "gold_standard"
|
|
TARGET_TOTAL = 50
|
|
TARGET_CPAM = 25
|
|
|
|
|
|
def load_all_dossiers() -> list[dict]:
|
|
"""Charge tous les dossiers fusionnés depuis output/structured/."""
|
|
dossiers = []
|
|
for subdir in sorted(STRUCTURED_DIR.iterdir()):
|
|
if not subdir.is_dir():
|
|
continue
|
|
# Chercher le fichier fusionné
|
|
fusionne = None
|
|
for f in subdir.glob("*fusionne*.json"):
|
|
fusionne = f
|
|
break
|
|
if not fusionne:
|
|
# Prendre le premier JSON du dossier
|
|
jsons = sorted(subdir.glob("*.json"))
|
|
if jsons:
|
|
fusionne = jsons[0]
|
|
if not fusionne:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(fusionne.read_text(encoding="utf-8"))
|
|
dossier = DossierMedical.model_validate(data)
|
|
rel_path = str(fusionne.relative_to(STRUCTURED_DIR))
|
|
group_name = subdir.name
|
|
dossiers.append({
|
|
"dossier_id": f"{group_name}/{fusionne.stem}",
|
|
"group_name": group_name,
|
|
"path_rel": rel_path,
|
|
"dossier": dossier,
|
|
})
|
|
except Exception as e:
|
|
print(f" Erreur chargement {fusionne.name}: {e}")
|
|
return dossiers
|
|
|
|
|
|
def select_dossiers(all_dossiers: list[dict]) -> list[dict]:
|
|
"""Sélectionne les 50 dossiers selon la stratégie définie."""
|
|
# Séparer CPAM / non-CPAM
|
|
cpam = [d for d in all_dossiers if d["dossier"].controles_cpam]
|
|
non_cpam = [d for d in all_dossiers if not d["dossier"].controles_cpam]
|
|
|
|
print(f"Dossiers CPAM disponibles : {len(cpam)}")
|
|
print(f"Dossiers non-CPAM disponibles : {len(non_cpam)}")
|
|
|
|
# Prendre tous les CPAM (ou max TARGET_CPAM)
|
|
selected_cpam = cpam[:TARGET_CPAM]
|
|
remaining_target = TARGET_TOTAL - len(selected_cpam)
|
|
|
|
# Stratifier les non-CPAM
|
|
selected_non_cpam = stratified_sample(non_cpam, remaining_target)
|
|
|
|
selected = selected_cpam + selected_non_cpam
|
|
print(f"\nSélection finale : {len(selected)} dossiers")
|
|
print(f" - CPAM : {len(selected_cpam)}")
|
|
print(f" - Non-CPAM : {len(selected_non_cpam)}")
|
|
|
|
return selected
|
|
|
|
|
|
def stratified_sample(dossiers: list[dict], n: int) -> list[dict]:
|
|
"""Échantillonnage stratifié par CMD, confiance DP et nombre de DAS."""
|
|
if len(dossiers) <= n:
|
|
return dossiers
|
|
|
|
# Grouper par CMD
|
|
by_cmd: dict[str, list[dict]] = {}
|
|
for d in dossiers:
|
|
ghm = d["dossier"].ghm_estimation
|
|
cmd = ghm.cmd if ghm else "inconnu"
|
|
by_cmd.setdefault(cmd or "inconnu", []).append(d)
|
|
|
|
selected = []
|
|
seen_ids = set()
|
|
|
|
# Phase 1 : 1 dossier par CMD (diversité maximale)
|
|
cmds = sorted(by_cmd.keys())
|
|
random.seed(42) # Reproductible
|
|
for cmd in cmds:
|
|
if len(selected) >= n:
|
|
break
|
|
candidates = by_cmd[cmd]
|
|
# Préférer un mix de confiances
|
|
random.shuffle(candidates)
|
|
d = candidates[0]
|
|
selected.append(d)
|
|
seen_ids.add(d["dossier_id"])
|
|
|
|
# Phase 2 : compléter avec diversité confiance DP
|
|
if len(selected) < n:
|
|
remaining = [d for d in dossiers if d["dossier_id"] not in seen_ids]
|
|
# Trier par confiance DP (low > medium > high pour surreprésenter les cas difficiles)
|
|
conf_order = {"low": 0, "medium": 1, "high": 2, None: 3}
|
|
remaining.sort(key=lambda d: (
|
|
conf_order.get(
|
|
d["dossier"].diagnostic_principal.cim10_confidence
|
|
if d["dossier"].diagnostic_principal else None,
|
|
3
|
|
),
|
|
-len(d["dossier"].diagnostics_associes), # beaucoup de DAS d'abord
|
|
))
|
|
for d in remaining:
|
|
if len(selected) >= n:
|
|
break
|
|
selected.append(d)
|
|
|
|
return selected[:n]
|
|
|
|
|
|
def create_empty_annotation(dossier_id: str, dossier: DossierMedical) -> dict:
|
|
"""Crée une annotation vide pour un dossier."""
|
|
dp = dossier.diagnostic_principal
|
|
das_list = []
|
|
for i, das in enumerate(dossier.diagnostics_associes):
|
|
das_list.append({
|
|
"index": i,
|
|
"texte_original": das.texte,
|
|
"code_pipeline": das.cim10_suggestion or "",
|
|
"confidence": das.cim10_confidence or "",
|
|
"source": das.source or "",
|
|
"statut": "correct",
|
|
"code_corrige": None,
|
|
"commentaire": "",
|
|
})
|
|
|
|
return {
|
|
"dossier_id": dossier_id,
|
|
"validateur": "",
|
|
"date_validation": "",
|
|
"statut": "non_commence",
|
|
"dp": {
|
|
"texte_original": dp.texte if dp else "",
|
|
"code_pipeline": dp.cim10_suggestion if dp else "",
|
|
"confidence": dp.cim10_confidence if dp else "",
|
|
"statut": "correct",
|
|
"code_corrige": None,
|
|
"commentaire": "",
|
|
},
|
|
"das": das_list,
|
|
"das_ajoutes": [],
|
|
"commentaire_general": "",
|
|
}
|
|
|
|
|
|
def main():
|
|
print("=== Sélection des dossiers pour validation DIM ===\n")
|
|
|
|
all_dossiers = load_all_dossiers()
|
|
print(f"Total dossiers chargés : {len(all_dossiers)}\n")
|
|
|
|
if not all_dossiers:
|
|
print("Aucun dossier trouvé dans output/structured/")
|
|
sys.exit(1)
|
|
|
|
selected = select_dossiers(all_dossiers)
|
|
|
|
# Créer le répertoire gold standard
|
|
GOLD_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Sauvegarder la sélection
|
|
selection = {
|
|
"date_selection": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
|
|
"total": len(selected),
|
|
"cpam": sum(1 for d in selected if d["dossier"].controles_cpam),
|
|
"non_cpam": sum(1 for d in selected if not d["dossier"].controles_cpam),
|
|
"dossiers": [d["dossier_id"] for d in selected],
|
|
}
|
|
selection_path = GOLD_DIR / "_selection.json"
|
|
selection_path.write_text(
|
|
json.dumps(selection, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
print(f"\nSélection sauvegardée : {selection_path}")
|
|
|
|
# Initialiser les annotations vides
|
|
created = 0
|
|
for d in selected:
|
|
dossier_id = d["dossier_id"]
|
|
safe_name = dossier_id.replace("/", "__") + ".json"
|
|
annot_path = GOLD_DIR / safe_name
|
|
if not annot_path.exists():
|
|
annotation = create_empty_annotation(dossier_id, d["dossier"])
|
|
annot_path.write_text(
|
|
json.dumps(annotation, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
created += 1
|
|
|
|
print(f"Annotations vides créées : {created}")
|
|
print(f"Annotations existantes préservées : {len(selected) - created}")
|
|
|
|
# Résumé
|
|
print(f"\n--- Résumé ---")
|
|
for i, d in enumerate(selected, 1):
|
|
dos = d["dossier"]
|
|
dp_code = dos.diagnostic_principal.cim10_suggestion if dos.diagnostic_principal else "?"
|
|
dp_conf = (dos.diagnostic_principal.cim10_confidence or "?") if dos.diagnostic_principal else "?"
|
|
n_das = len(dos.diagnostics_associes)
|
|
cpam_flag = " [CPAM]" if dos.controles_cpam else ""
|
|
ghm = dos.ghm_estimation
|
|
cmd = ghm.cmd if ghm else "?"
|
|
print(f" {i:2d}. {d['group_name']:<20s} DP={dp_code:<6s} conf={dp_conf:<7s} DAS={n_das:2d} CMD={cmd}{cpam_flag}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|