#!/usr/bin/env python3 """Sélectionne 50 dossiers pour le gold standard de validation DIM. - 25 dossiers CPAM (cas complexes, déjà contrôlés) - 25 dossiers non-CPAM stratifiés par CMD, confiance DP, nombre de DAS Crée data/gold_standard/_selection.json et initialise les annotations vides. """ from __future__ import annotations import json import random import sys from pathlib import Path # Ajouter le répertoire racine au path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from src.config import STRUCTURED_DIR, BASE_DIR, DossierMedical GOLD_DIR = BASE_DIR / "data" / "gold_standard" TARGET_TOTAL = 50 TARGET_CPAM = 25 def load_all_dossiers() -> list[dict]: """Charge tous les dossiers fusionnés depuis output/structured/.""" dossiers = [] for subdir in sorted(STRUCTURED_DIR.iterdir()): if not subdir.is_dir(): continue # Chercher le fichier fusionné fusionne = None for f in subdir.glob("*fusionne*.json"): fusionne = f break if not fusionne: # Prendre le premier JSON du dossier jsons = sorted(subdir.glob("*.json")) if jsons: fusionne = jsons[0] if not fusionne: continue try: data = json.loads(fusionne.read_text(encoding="utf-8")) dossier = DossierMedical.model_validate(data) rel_path = str(fusionne.relative_to(STRUCTURED_DIR)) group_name = subdir.name dossiers.append({ "dossier_id": f"{group_name}/{fusionne.stem}", "group_name": group_name, "path_rel": rel_path, "dossier": dossier, }) except Exception as e: print(f" Erreur chargement {fusionne.name}: {e}") return dossiers def select_dossiers(all_dossiers: list[dict]) -> list[dict]: """Sélectionne les 50 dossiers selon la stratégie définie.""" # Séparer CPAM / non-CPAM cpam = [d for d in all_dossiers if d["dossier"].controles_cpam] non_cpam = [d for d in all_dossiers if not d["dossier"].controles_cpam] print(f"Dossiers CPAM disponibles : {len(cpam)}") print(f"Dossiers non-CPAM disponibles : {len(non_cpam)}") # Prendre tous les CPAM (ou max TARGET_CPAM) selected_cpam = cpam[:TARGET_CPAM] remaining_target = TARGET_TOTAL - len(selected_cpam) # Stratifier les non-CPAM selected_non_cpam = stratified_sample(non_cpam, remaining_target) selected = selected_cpam + selected_non_cpam print(f"\nSélection finale : {len(selected)} dossiers") print(f" - CPAM : {len(selected_cpam)}") print(f" - Non-CPAM : {len(selected_non_cpam)}") return selected def stratified_sample(dossiers: list[dict], n: int) -> list[dict]: """Échantillonnage stratifié par CMD, confiance DP et nombre de DAS.""" if len(dossiers) <= n: return dossiers # Grouper par CMD by_cmd: dict[str, list[dict]] = {} for d in dossiers: ghm = d["dossier"].ghm_estimation cmd = ghm.cmd if ghm else "inconnu" by_cmd.setdefault(cmd or "inconnu", []).append(d) selected = [] seen_ids = set() # Phase 1 : 1 dossier par CMD (diversité maximale) cmds = sorted(by_cmd.keys()) random.seed(42) # Reproductible for cmd in cmds: if len(selected) >= n: break candidates = by_cmd[cmd] # Préférer un mix de confiances random.shuffle(candidates) d = candidates[0] selected.append(d) seen_ids.add(d["dossier_id"]) # Phase 2 : compléter avec diversité confiance DP if len(selected) < n: remaining = [d for d in dossiers if d["dossier_id"] not in seen_ids] # Trier par confiance DP (low > medium > high pour surreprésenter les cas difficiles) conf_order = {"low": 0, "medium": 1, "high": 2, None: 3} remaining.sort(key=lambda d: ( conf_order.get( d["dossier"].diagnostic_principal.cim10_confidence if d["dossier"].diagnostic_principal else None, 3 ), -len(d["dossier"].diagnostics_associes), # beaucoup de DAS d'abord )) for d in remaining: if len(selected) >= n: break selected.append(d) return selected[:n] def create_empty_annotation(dossier_id: str, dossier: DossierMedical) -> dict: """Crée une annotation vide pour un dossier.""" dp = dossier.diagnostic_principal das_list = [] for i, das in enumerate(dossier.diagnostics_associes): das_list.append({ "index": i, "texte_original": das.texte, "code_pipeline": das.cim10_suggestion or "", "confidence": das.cim10_confidence or "", "source": das.source or "", "statut": "correct", "code_corrige": None, "commentaire": "", }) return { "dossier_id": dossier_id, "validateur": "", "date_validation": "", "statut": "non_commence", "dp": { "texte_original": dp.texte if dp else "", "code_pipeline": dp.cim10_suggestion if dp else "", "confidence": dp.cim10_confidence if dp else "", "statut": "correct", "code_corrige": None, "commentaire": "", }, "das": das_list, "das_ajoutes": [], "commentaire_general": "", } def main(): print("=== Sélection des dossiers pour validation DIM ===\n") all_dossiers = load_all_dossiers() print(f"Total dossiers chargés : {len(all_dossiers)}\n") if not all_dossiers: print("Aucun dossier trouvé dans output/structured/") sys.exit(1) selected = select_dossiers(all_dossiers) # Créer le répertoire gold standard GOLD_DIR.mkdir(parents=True, exist_ok=True) # Sauvegarder la sélection selection = { "date_selection": __import__("datetime").datetime.now().isoformat(timespec="seconds"), "total": len(selected), "cpam": sum(1 for d in selected if d["dossier"].controles_cpam), "non_cpam": sum(1 for d in selected if not d["dossier"].controles_cpam), "dossiers": [d["dossier_id"] for d in selected], } selection_path = GOLD_DIR / "_selection.json" selection_path.write_text( json.dumps(selection, ensure_ascii=False, indent=2), encoding="utf-8", ) print(f"\nSélection sauvegardée : {selection_path}") # Initialiser les annotations vides created = 0 for d in selected: dossier_id = d["dossier_id"] safe_name = dossier_id.replace("/", "__") + ".json" annot_path = GOLD_DIR / safe_name if not annot_path.exists(): annotation = create_empty_annotation(dossier_id, d["dossier"]) annot_path.write_text( json.dumps(annotation, ensure_ascii=False, indent=2), encoding="utf-8", ) created += 1 print(f"Annotations vides créées : {created}") print(f"Annotations existantes préservées : {len(selected) - created}") # Résumé print(f"\n--- Résumé ---") for i, d in enumerate(selected, 1): dos = d["dossier"] dp_code = dos.diagnostic_principal.cim10_suggestion if dos.diagnostic_principal else "?" dp_conf = (dos.diagnostic_principal.cim10_confidence or "?") if dos.diagnostic_principal else "?" n_das = len(dos.diagnostics_associes) cpam_flag = " [CPAM]" if dos.controles_cpam else "" ghm = dos.ghm_estimation cmd = ghm.cmd if ghm else "?" print(f" {i:2d}. {d['group_name']:<20s} DP={dp_code:<6s} conf={dp_conf:<7s} DAS={n_das:2d} CMD={cmd}{cpam_flag}") if __name__ == "__main__": main()