338 lines
12 KiB
Python
338 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""Annotateur gold CRH — outil CLI pour créer et valider le gold standard.
|
|
|
|
3 modes :
|
|
--bootstrap : Génère un CSV template depuis les JSON pipeline existants
|
|
--import-csv : Convertit un CSV rempli en JSONL gold + index JSON
|
|
--check : Valide un fichier JSONL gold (doublons, formats, stats)
|
|
|
|
Usage:
|
|
python tools/gold_crh_annotator.py --bootstrap --out data/gold_crh/gold_template.csv
|
|
python tools/gold_crh_annotator.py --import-csv data/gold_crh/gold_template.csv
|
|
python tools/gold_crh_annotator.py --check data/gold_crh/gold_crh.jsonl
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parent.parent
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
OUTPUT_DIR = ROOT / "output" / "structured"
|
|
GOLD_DIR = ROOT / "data" / "gold_crh"
|
|
DEFAULT_JSONL = GOLD_DIR / "gold_crh.jsonl"
|
|
DEFAULT_INDEX = GOLD_DIR / "gold_crh_index.json"
|
|
|
|
CSV_COLUMNS = [
|
|
"case_id",
|
|
"dp_expected_code",
|
|
"dp_expected_label",
|
|
"dp_acceptable_codes",
|
|
"dp_acceptable_family3",
|
|
"allow_symptom_dp",
|
|
"confidence",
|
|
"evidence_1_section",
|
|
"evidence_1_excerpt",
|
|
"evidence_2_section",
|
|
"evidence_2_excerpt",
|
|
"notes",
|
|
]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bootstrap : JSON pipeline → CSV template
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def find_merged_json(dossier_id: str) -> Path | None:
|
|
d = OUTPUT_DIR / dossier_id
|
|
if not d.exists():
|
|
return None
|
|
fusions = list(d.glob("*fusionne_cim10.json"))
|
|
if fusions:
|
|
return fusions[0]
|
|
cim10s = list(d.glob("*_cim10.json"))
|
|
return cim10s[0] if cim10s else None
|
|
|
|
|
|
def bootstrap_template(out_path: Path, case_ids: list[str] | None = None) -> int:
|
|
"""Génère un CSV template pré-rempli depuis les JSON pipeline."""
|
|
if case_ids:
|
|
dossier_ids = case_ids
|
|
else:
|
|
# Trouver tous les dossiers, filtrer les CRH ou sans DP trackare
|
|
dossier_ids = sorted(
|
|
d.name for d in OUTPUT_DIR.iterdir()
|
|
if d.is_dir() and find_merged_json(d.name)
|
|
)
|
|
|
|
rows: list[dict] = []
|
|
for did in dossier_ids:
|
|
path = find_merged_json(did)
|
|
if not path:
|
|
continue
|
|
try:
|
|
data = json.loads(path.read_text("utf-8"))
|
|
except (json.JSONDecodeError, OSError):
|
|
continue
|
|
|
|
dp = data.get("diagnostic_principal", {})
|
|
dp_code = dp.get("cim10_final") or dp.get("cim10_suggestion") or ""
|
|
dp_label = dp.get("texte", "")
|
|
|
|
# Pré-remplir avec le DP pipeline comme suggestion
|
|
rows.append({
|
|
"case_id": did,
|
|
"dp_expected_code": dp_code,
|
|
"dp_expected_label": dp_label,
|
|
"dp_acceptable_codes": "",
|
|
"dp_acceptable_family3": dp_code[:3] if dp_code else "",
|
|
"allow_symptom_dp": "false",
|
|
"confidence": "probable",
|
|
"evidence_1_section": "",
|
|
"evidence_1_excerpt": "",
|
|
"evidence_2_section": "",
|
|
"evidence_2_excerpt": "",
|
|
"notes": f"pipeline: {dp_code} ({dp.get('source', '?')})",
|
|
})
|
|
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(out_path, "w", newline="", encoding="utf-8") as f:
|
|
writer = csv.DictWriter(f, fieldnames=CSV_COLUMNS)
|
|
writer.writeheader()
|
|
writer.writerows(rows)
|
|
|
|
return len(rows)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Import CSV → JSONL + index
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def import_csv_to_jsonl(csv_path: Path, jsonl_path: Path | None = None,
|
|
index_path: Path | None = None) -> tuple[int, list[str]]:
|
|
"""Convertit un CSV rempli en JSONL gold + index JSON.
|
|
|
|
Retourne (nb_cas, erreurs).
|
|
"""
|
|
from src.eval.gold_models import is_valid_cim10_format
|
|
|
|
jsonl_path = jsonl_path or DEFAULT_JSONL
|
|
index_path = index_path or DEFAULT_INDEX
|
|
|
|
if not csv_path.exists():
|
|
return 0, [f"Fichier CSV introuvable: {csv_path}"]
|
|
|
|
rows: list[dict] = []
|
|
errors: list[str] = []
|
|
|
|
with open(csv_path, "r", encoding="utf-8") as f:
|
|
reader = csv.DictReader(f)
|
|
for i, row in enumerate(reader, 2): # ligne 2+ (header = 1)
|
|
case_id = row.get("case_id", "").strip()
|
|
if not case_id:
|
|
errors.append(f"ligne {i}: case_id vide")
|
|
continue
|
|
|
|
dp_code = row.get("dp_expected_code", "").strip().upper()
|
|
dp_label = row.get("dp_expected_label", "").strip()
|
|
|
|
if not dp_code:
|
|
errors.append(f"ligne {i} ({case_id}): dp_expected_code obligatoire")
|
|
continue
|
|
|
|
if not is_valid_cim10_format(dp_code):
|
|
errors.append(f"ligne {i} ({case_id}): code CIM-10 invalide: {dp_code}")
|
|
continue
|
|
|
|
# Parse listes séparées par |
|
|
acceptable_codes = [
|
|
c.strip().upper()
|
|
for c in row.get("dp_acceptable_codes", "").split("|")
|
|
if c.strip()
|
|
]
|
|
for ac in acceptable_codes:
|
|
if not is_valid_cim10_format(ac):
|
|
errors.append(f"ligne {i} ({case_id}): code acceptable invalide: {ac}")
|
|
|
|
acceptable_family3 = [
|
|
f.strip().upper()
|
|
for f in row.get("dp_acceptable_family3", "").split("|")
|
|
if f.strip()
|
|
]
|
|
|
|
allow_symptom = row.get("allow_symptom_dp", "false").strip().lower() in (
|
|
"true", "1", "yes", "oui"
|
|
)
|
|
confidence = row.get("confidence", "probable").strip().lower()
|
|
if confidence not in ("certain", "probable", "ambiguous"):
|
|
errors.append(f"ligne {i} ({case_id}): confidence invalide: {confidence}")
|
|
confidence = "probable"
|
|
|
|
# Evidence
|
|
evidence = []
|
|
for idx in (1, 2):
|
|
section = row.get(f"evidence_{idx}_section", "").strip()
|
|
excerpt = row.get(f"evidence_{idx}_excerpt", "").strip()
|
|
if section and excerpt:
|
|
if len(excerpt) > 240:
|
|
errors.append(
|
|
f"ligne {i} ({case_id}): evidence_{idx}_excerpt "
|
|
f"trop long ({len(excerpt)} > 240)"
|
|
)
|
|
excerpt = excerpt[:240]
|
|
evidence.append({"section": section, "excerpt": excerpt})
|
|
|
|
notes = row.get("notes", "").strip()
|
|
if len(notes) > 400:
|
|
notes = notes[:400]
|
|
|
|
gold_case = {
|
|
"case_id": case_id,
|
|
"document_type": "crh",
|
|
"dp_expected": {"code": dp_code, "label": dp_label},
|
|
"dp_acceptable_codes": acceptable_codes,
|
|
"dp_acceptable_family3": acceptable_family3,
|
|
"allow_symptom_dp": allow_symptom,
|
|
"confidence": confidence,
|
|
"evidence": evidence,
|
|
"notes": notes,
|
|
}
|
|
rows.append(gold_case)
|
|
|
|
if not rows:
|
|
return 0, errors + ["Aucun cas valide trouvé dans le CSV"]
|
|
|
|
# Écrire JSONL
|
|
jsonl_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(jsonl_path, "w", encoding="utf-8") as f:
|
|
for gold_case in rows:
|
|
f.write(json.dumps(gold_case, ensure_ascii=False) + "\n")
|
|
|
|
# Écrire index (case_id → numéro de ligne 0-indexed)
|
|
index = {row["case_id"]: i for i, row in enumerate(rows)}
|
|
with open(index_path, "w", encoding="utf-8") as f:
|
|
json.dump(index, f, ensure_ascii=False, indent=2)
|
|
|
|
return len(rows), errors
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Check : validation JSONL
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_gold(jsonl_path: Path) -> tuple[dict, list[str]]:
|
|
"""Valide un fichier JSONL gold. Retourne (stats, erreurs)."""
|
|
from src.eval.gold_models import load_gold_jsonl
|
|
|
|
errors: list[str] = []
|
|
|
|
try:
|
|
cases = load_gold_jsonl(jsonl_path)
|
|
except FileNotFoundError as e:
|
|
return {}, [str(e)]
|
|
except ValueError as e:
|
|
return {}, [str(e)]
|
|
|
|
# Doublons
|
|
seen_ids: dict[str, int] = {}
|
|
for i, c in enumerate(cases):
|
|
if c.case_id in seen_ids:
|
|
errors.append(f"case_id dupliqué: {c.case_id} (lignes {seen_ids[c.case_id]+1} et {i+1})")
|
|
seen_ids[c.case_id] = i
|
|
|
|
# Stats
|
|
stats = {
|
|
"total": len(cases),
|
|
"certain": sum(1 for c in cases if c.confidence == "certain"),
|
|
"probable": sum(1 for c in cases if c.confidence == "probable"),
|
|
"ambiguous": sum(1 for c in cases if c.confidence == "ambiguous"),
|
|
"allow_symptom_dp": sum(1 for c in cases if c.allow_symptom_dp),
|
|
"with_evidence": sum(1 for c in cases if c.evidence),
|
|
"with_acceptable_codes": sum(1 for c in cases if c.dp_acceptable_codes),
|
|
"with_family3": sum(1 for c in cases if c.dp_acceptable_family3),
|
|
}
|
|
|
|
# Champs manquants / suspects
|
|
for c in cases:
|
|
if not c.dp_expected.label:
|
|
errors.append(f"{c.case_id}: dp_expected.label vide")
|
|
if not c.evidence:
|
|
errors.append(f"{c.case_id}: pas d'evidence (recommandé)")
|
|
|
|
return stats, errors
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Annotateur gold CRH — bootstrap / import / check"
|
|
)
|
|
parser.add_argument("--bootstrap", action="store_true",
|
|
help="Générer un CSV template depuis les JSON pipeline")
|
|
parser.add_argument("--import-csv", type=str, metavar="CSV",
|
|
help="Importer un CSV rempli → JSONL gold")
|
|
parser.add_argument("--check", type=str, metavar="JSONL",
|
|
help="Vérifier un fichier JSONL gold")
|
|
parser.add_argument("--out", type=str, default=str(GOLD_DIR / "gold_template.csv"),
|
|
help="Chemin de sortie (bootstrap)")
|
|
parser.add_argument("--cases", type=str, default="",
|
|
help="IDs de cas séparés par virgule (bootstrap)")
|
|
args = parser.parse_args()
|
|
|
|
if not any([args.bootstrap, args.import_csv, args.check]):
|
|
parser.print_help()
|
|
sys.exit(1)
|
|
|
|
if args.bootstrap:
|
|
case_ids = [c.strip() for c in args.cases.split(",") if c.strip()] if args.cases else None
|
|
out = Path(args.out)
|
|
n = bootstrap_template(out, case_ids)
|
|
print(f"Template généré : {out} ({n} cas)")
|
|
print(f" Colonnes : {', '.join(CSV_COLUMNS)}")
|
|
print(f" Remplir le CSV puis : python tools/gold_crh_annotator.py --import-csv {out}")
|
|
|
|
if args.import_csv:
|
|
csv_path = Path(args.import_csv)
|
|
n, errors = import_csv_to_jsonl(csv_path)
|
|
if errors:
|
|
print(f"ATTENTION — {len(errors)} avertissement(s) :")
|
|
for e in errors[:20]:
|
|
print(f" {e}")
|
|
print(f"\nImporté : {n} cas → {DEFAULT_JSONL}")
|
|
print(f"Index : {DEFAULT_INDEX}")
|
|
|
|
if args.check:
|
|
jsonl_path = Path(args.check)
|
|
stats, errors = check_gold(jsonl_path)
|
|
if errors:
|
|
print(f"ERREURS ({len(errors)}) :")
|
|
for e in errors[:20]:
|
|
print(f" {e}")
|
|
print()
|
|
if stats:
|
|
print(f"Gold CRH — {stats['total']} cas")
|
|
print(f" Confiance : certain={stats['certain']}, "
|
|
f"probable={stats['probable']}, ambiguous={stats['ambiguous']}")
|
|
print(f" allow_symptom_dp : {stats['allow_symptom_dp']}")
|
|
print(f" Avec evidence : {stats['with_evidence']}")
|
|
print(f" Avec codes alt : {stats['with_acceptable_codes']}")
|
|
print(f" Avec family3 : {stats['with_family3']}")
|
|
else:
|
|
print("Aucun cas chargé.")
|
|
sys.exit(1)
|
|
|
|
if not errors:
|
|
print("\nOK — aucune erreur détectée")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|