#!/usr/bin/env python3 """Annotateur gold CRH — outil CLI pour créer et valider le gold standard. 3 modes : --bootstrap : Génère un CSV template depuis les JSON pipeline existants --import-csv : Convertit un CSV rempli en JSONL gold + index JSON --check : Valide un fichier JSONL gold (doublons, formats, stats) Usage: python tools/gold_crh_annotator.py --bootstrap --out data/gold_crh/gold_template.csv python tools/gold_crh_annotator.py --import-csv data/gold_crh/gold_template.csv python tools/gold_crh_annotator.py --check data/gold_crh/gold_crh.jsonl """ from __future__ import annotations import argparse import csv import json import sys from pathlib import Path ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) OUTPUT_DIR = ROOT / "output" / "structured" GOLD_DIR = ROOT / "data" / "gold_crh" DEFAULT_JSONL = GOLD_DIR / "gold_crh.jsonl" DEFAULT_INDEX = GOLD_DIR / "gold_crh_index.json" CSV_COLUMNS = [ "case_id", "dp_expected_code", "dp_expected_label", "dp_acceptable_codes", "dp_acceptable_family3", "allow_symptom_dp", "confidence", "evidence_1_section", "evidence_1_excerpt", "evidence_2_section", "evidence_2_excerpt", "notes", ] # --------------------------------------------------------------------------- # Bootstrap : JSON pipeline → CSV template # --------------------------------------------------------------------------- def find_merged_json(dossier_id: str) -> Path | None: d = OUTPUT_DIR / dossier_id if not d.exists(): return None fusions = list(d.glob("*fusionne_cim10.json")) if fusions: return fusions[0] cim10s = list(d.glob("*_cim10.json")) return cim10s[0] if cim10s else None def bootstrap_template(out_path: Path, case_ids: list[str] | None = None) -> int: """Génère un CSV template pré-rempli depuis les JSON pipeline.""" if case_ids: dossier_ids = case_ids else: # Trouver tous les dossiers, filtrer les CRH ou sans DP trackare dossier_ids = sorted( d.name for d in OUTPUT_DIR.iterdir() if d.is_dir() and find_merged_json(d.name) ) rows: list[dict] = [] for did in dossier_ids: path = find_merged_json(did) if not path: continue try: data = json.loads(path.read_text("utf-8")) except (json.JSONDecodeError, OSError): continue dp = data.get("diagnostic_principal", {}) dp_code = dp.get("cim10_final") or dp.get("cim10_suggestion") or "" dp_label = dp.get("texte", "") # Pré-remplir avec le DP pipeline comme suggestion rows.append({ "case_id": did, "dp_expected_code": dp_code, "dp_expected_label": dp_label, "dp_acceptable_codes": "", "dp_acceptable_family3": dp_code[:3] if dp_code else "", "allow_symptom_dp": "false", "confidence": "probable", "evidence_1_section": "", "evidence_1_excerpt": "", "evidence_2_section": "", "evidence_2_excerpt": "", "notes": f"pipeline: {dp_code} ({dp.get('source', '?')})", }) out_path.parent.mkdir(parents=True, exist_ok=True) with open(out_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS) writer.writeheader() writer.writerows(rows) return len(rows) # --------------------------------------------------------------------------- # Import CSV → JSONL + index # --------------------------------------------------------------------------- def import_csv_to_jsonl(csv_path: Path, jsonl_path: Path | None = None, index_path: Path | None = None) -> tuple[int, list[str]]: """Convertit un CSV rempli en JSONL gold + index JSON. Retourne (nb_cas, erreurs). """ from src.eval.gold_models import is_valid_cim10_format jsonl_path = jsonl_path or DEFAULT_JSONL index_path = index_path or DEFAULT_INDEX if not csv_path.exists(): return 0, [f"Fichier CSV introuvable: {csv_path}"] rows: list[dict] = [] errors: list[str] = [] with open(csv_path, "r", encoding="utf-8") as f: reader = csv.DictReader(f) for i, row in enumerate(reader, 2): # ligne 2+ (header = 1) case_id = row.get("case_id", "").strip() if not case_id: errors.append(f"ligne {i}: case_id vide") continue dp_code = row.get("dp_expected_code", "").strip().upper() dp_label = row.get("dp_expected_label", "").strip() if not dp_code: errors.append(f"ligne {i} ({case_id}): dp_expected_code obligatoire") continue if not is_valid_cim10_format(dp_code): errors.append(f"ligne {i} ({case_id}): code CIM-10 invalide: {dp_code}") continue # Parse listes séparées par | acceptable_codes = [ c.strip().upper() for c in row.get("dp_acceptable_codes", "").split("|") if c.strip() ] for ac in acceptable_codes: if not is_valid_cim10_format(ac): errors.append(f"ligne {i} ({case_id}): code acceptable invalide: {ac}") acceptable_family3 = [ f.strip().upper() for f in row.get("dp_acceptable_family3", "").split("|") if f.strip() ] allow_symptom = row.get("allow_symptom_dp", "false").strip().lower() in ( "true", "1", "yes", "oui" ) confidence = row.get("confidence", "probable").strip().lower() if confidence not in ("certain", "probable", "ambiguous"): errors.append(f"ligne {i} ({case_id}): confidence invalide: {confidence}") confidence = "probable" # Evidence evidence = [] for idx in (1, 2): section = row.get(f"evidence_{idx}_section", "").strip() excerpt = row.get(f"evidence_{idx}_excerpt", "").strip() if section and excerpt: if len(excerpt) > 240: errors.append( f"ligne {i} ({case_id}): evidence_{idx}_excerpt " f"trop long ({len(excerpt)} > 240)" ) excerpt = excerpt[:240] evidence.append({"section": section, "excerpt": excerpt}) notes = row.get("notes", "").strip() if len(notes) > 400: notes = notes[:400] gold_case = { "case_id": case_id, "document_type": "crh", "dp_expected": {"code": dp_code, "label": dp_label}, "dp_acceptable_codes": acceptable_codes, "dp_acceptable_family3": acceptable_family3, "allow_symptom_dp": allow_symptom, "confidence": confidence, "evidence": evidence, "notes": notes, } rows.append(gold_case) if not rows: return 0, errors + ["Aucun cas valide trouvé dans le CSV"] # Écrire JSONL jsonl_path.parent.mkdir(parents=True, exist_ok=True) with open(jsonl_path, "w", encoding="utf-8") as f: for gold_case in rows: f.write(json.dumps(gold_case, ensure_ascii=False) + "\n") # Écrire index (case_id → numéro de ligne 0-indexed) index = {row["case_id"]: i for i, row in enumerate(rows)} with open(index_path, "w", encoding="utf-8") as f: json.dump(index, f, ensure_ascii=False, indent=2) return len(rows), errors # --------------------------------------------------------------------------- # Check : validation JSONL # --------------------------------------------------------------------------- def check_gold(jsonl_path: Path) -> tuple[dict, list[str]]: """Valide un fichier JSONL gold. Retourne (stats, erreurs).""" from src.eval.gold_models import load_gold_jsonl errors: list[str] = [] try: cases = load_gold_jsonl(jsonl_path) except FileNotFoundError as e: return {}, [str(e)] except ValueError as e: return {}, [str(e)] # Doublons seen_ids: dict[str, int] = {} for i, c in enumerate(cases): if c.case_id in seen_ids: errors.append(f"case_id dupliqué: {c.case_id} (lignes {seen_ids[c.case_id]+1} et {i+1})") seen_ids[c.case_id] = i # Stats stats = { "total": len(cases), "certain": sum(1 for c in cases if c.confidence == "certain"), "probable": sum(1 for c in cases if c.confidence == "probable"), "ambiguous": sum(1 for c in cases if c.confidence == "ambiguous"), "allow_symptom_dp": sum(1 for c in cases if c.allow_symptom_dp), "with_evidence": sum(1 for c in cases if c.evidence), "with_acceptable_codes": sum(1 for c in cases if c.dp_acceptable_codes), "with_family3": sum(1 for c in cases if c.dp_acceptable_family3), } # Champs manquants / suspects for c in cases: if not c.dp_expected.label: errors.append(f"{c.case_id}: dp_expected.label vide") if not c.evidence: errors.append(f"{c.case_id}: pas d'evidence (recommandé)") return stats, errors # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Annotateur gold CRH — bootstrap / import / check" ) parser.add_argument("--bootstrap", action="store_true", help="Générer un CSV template depuis les JSON pipeline") parser.add_argument("--import-csv", type=str, metavar="CSV", help="Importer un CSV rempli → JSONL gold") parser.add_argument("--check", type=str, metavar="JSONL", help="Vérifier un fichier JSONL gold") parser.add_argument("--out", type=str, default=str(GOLD_DIR / "gold_template.csv"), help="Chemin de sortie (bootstrap)") parser.add_argument("--cases", type=str, default="", help="IDs de cas séparés par virgule (bootstrap)") args = parser.parse_args() if not any([args.bootstrap, args.import_csv, args.check]): parser.print_help() sys.exit(1) if args.bootstrap: case_ids = [c.strip() for c in args.cases.split(",") if c.strip()] if args.cases else None out = Path(args.out) n = bootstrap_template(out, case_ids) print(f"Template généré : {out} ({n} cas)") print(f" Colonnes : {', '.join(CSV_COLUMNS)}") print(f" Remplir le CSV puis : python tools/gold_crh_annotator.py --import-csv {out}") if args.import_csv: csv_path = Path(args.import_csv) n, errors = import_csv_to_jsonl(csv_path) if errors: print(f"ATTENTION — {len(errors)} avertissement(s) :") for e in errors[:20]: print(f" {e}") print(f"\nImporté : {n} cas → {DEFAULT_JSONL}") print(f"Index : {DEFAULT_INDEX}") if args.check: jsonl_path = Path(args.check) stats, errors = check_gold(jsonl_path) if errors: print(f"ERREURS ({len(errors)}) :") for e in errors[:20]: print(f" {e}") print() if stats: print(f"Gold CRH — {stats['total']} cas") print(f" Confiance : certain={stats['certain']}, " f"probable={stats['probable']}, ambiguous={stats['ambiguous']}") print(f" allow_symptom_dp : {stats['allow_symptom_dp']}") print(f" Avec evidence : {stats['with_evidence']}") print(f" Avec codes alt : {stats['with_acceptable_codes']}") print(f" Avec family3 : {stats['with_family3']}") else: print("Aucun cas chargé.") sys.exit(1) if not errors: print("\nOK — aucune erreur détectée") if __name__ == "__main__": main()