diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 9626308..79ee75f 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -54,6 +54,27 @@ try: except Exception: EdsPseudoManager = None # type: ignore + +def _load_edsnlp_drug_names() -> set: + """Charge les noms de médicaments mono-mot depuis edsnlp/resources/drugs.json. + Retourne un set lowercase. Fallback silencieux si edsnlp absent.""" + try: + import edsnlp as _edsnlp + drugs_path = _edsnlp.BASE_DIR / "resources" / "drugs.json" + if not drugs_path.exists(): + return set() + import json as _json + data = _json.loads(drugs_path.read_text(encoding="utf-8")) + result = set() + for _code, names in data.items(): + for name in names: + if " " not in name and len(name) >= 4: + result.add(name.lower()) + return result + except Exception: + return set() + + # ----------------- Defaults & Config ----------------- DEFAULTS_CFG = { "version": 1, @@ -312,15 +333,18 @@ _MEDICAL_STOP_WORDS_SET = { "indication", "conclusion", "technique", "anesthésie", "digestif", "digestive", "digestives", "nutritive", } +# Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp +_MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names()) + _MEDICAL_STOP_WORDS = ( r"(?:" + "|".join(re.escape(w) for w in _MEDICAL_STOP_WORDS_SET) + r")" ) # Un token de nom : commence par majuscule, lettres/tirets/apostrophes (PAS d'espace ni de point) _PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_PERSON_CONTEXT = re.compile( - r"(?:(?:Dr\.?|DR\.?|Docteur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" + r"(?:(?:Dr\.?|DR\.?|Docteur|Pr\.?|Professeur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" r"|Nom\s*:\s*" - r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par" + r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par|Réalisé\s+par" r")\s+)" rf"({_PERSON_TOKEN}(?:\s+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots ) @@ -390,10 +414,17 @@ RE_EXTRACT_DR_DEST = re.compile( ) # Noms du personnel médical après un rôle : "Aide : Marie-Paule BORDABERRY" RE_EXTRACT_STAFF_ROLE = re.compile( - r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier)\s*:\s*" + r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier" + r"|Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+(?:\s*-\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+)?\s+)?" r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}[\-]?)(?:[\s\-]+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)", ) +# "Pr DUVAL", "Pr. J.-M. DUVAL", "Professeur DUVAL" +RE_EXTRACT_PR = re.compile( + r"(?:Pr\.?|Professeur)\s+" + + _INITIAL_OPT + + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", +) CID_PATTERN = re.compile(r"\(cid:\d+\)") @@ -467,6 +498,7 @@ class AnonResult: text_out: str tables_block: str audit: List[PiiHit] = field(default_factory=list) + is_trackare: bool = False # ----------------- Config loader ----------------- @@ -877,6 +909,18 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if m.group(2): _add_name(m.group(2)) + # --- Prescripteurs / Exécutants (trackare) --- + for m in re.finditer( + r"(?:Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" + r"(?:(?:Dr|Pr)\.?\s+)?" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)" + r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+))?", + full_text, + ): + _add_name(m.group(1)) + if m.group(2): + _add_name(m.group(2)) + # --- Médecins urgences (IAO, prise en charge, décision) --- for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text): _add_name(m.group(1)) @@ -991,9 +1035,12 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: _add_tokens(m.group(1)) if m.group(2): _add_tokens(m.group(2)) - # Personnel médical avec rôle (Aide, Cadre Infirmier, etc.) + # Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.) for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): _add_tokens(m.group(1)) + # Pr / Professeur + nom(s) + for m in RE_EXTRACT_PR.finditer(full_text): + _add_tokens_force_first(m.group(1)) # Extraction des noms dans les listes virgulées après Dr/Docteur # ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé" @@ -1066,7 +1113,8 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] extracted_names = _extract_document_names(full_raw, cfg) # Phase 0b : si document Trackare, extraction renforcée des PII structurés - if _is_trackare_document(full_raw): + is_trackare = _is_trackare_document(full_raw) + if is_trackare: trackare_names, trackare_hits = _extract_trackare_identity(full_raw) extracted_names.update(trackare_names) audit.extend(trackare_hits) @@ -1094,7 +1142,7 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] if extracted_names: text_out = _apply_extracted_names(text_out, extracted_names, audit) - return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit) + return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) # ----------------- NER ONNX sur narratif ----------------- @@ -1193,6 +1241,20 @@ def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, # Filtrer les dosages détectés comme noms (ex: "10MG", "300UI", "1 000") if re.match(r"^\d[\d\s]*(?:mg|MG|ml|ML|UI|µg|mcg|g|kg|%)?$", w.strip()): continue + # Règles de validation heuristiques par type d'entité + if label in ("NOM", "PRENOM"): + # Rejeter si le contexte précédent (15 chars) contient un dosage + pos = text.find(w) + if pos > 0: + ctx_before = text[max(0, pos - 15):pos] + if re.search(r"\d+\s*(?:mg|UI|ml|µg|mcg)\b", ctx_before, re.IGNORECASE): + continue + elif label == "HOPITAL": + _STRUCTURAL_WORDS = {"SERVICE", "POLE", "PÔLE", "UNITE", "UNITÉ", "SECTEUR"} + if len(w) < 5: + continue + if w.upper() in _STRUCTURAL_WORDS: + continue placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder)) out = repl_once(out, w, placeholder) @@ -1571,6 +1633,9 @@ def process_pdf( token = h.original.strip() if not token or len(token) < 3: continue + # Garde trackare : NOM_GLOBAL très court (<=3) risque de masquer des codes diagnostics + if anon.is_trackare and h.kind == "NOM_GLOBAL" and len(token) <= 3: + continue try: final_text = re.sub(rf"\b{re.escape(token)}\b", h.placeholder, final_text) except re.error: diff --git a/qc_audit.py b/qc_audit.py new file mode 100644 index 0000000..b8e6ebc --- /dev/null +++ b/qc_audit.py @@ -0,0 +1,313 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +QC Audit — Contrôle qualité post-anonymisation +----------------------------------------------- +Analyse les fichiers .audit.jsonl et .pseudonymise.txt pour détecter : +- Faux négatifs résiduels (EMAIL, TEL, NIR non masqués) +- Sur-masquage (densité de placeholders trop élevée) +- Faux positifs candidats (NOM_EXTRACTED qui sont des stop words ou < 3 chars) +- Faux négatifs candidats (mots ALL-CAPS >= 5 chars non masqués, non médicaux) +- Stats audit (comptage par kind, top 10) + +Usage : + python3 qc_audit.py path/to/*.audit.jsonl + python3 qc_audit.py --batch-dir path/anonymise/ +""" +from __future__ import annotations + +import argparse +import csv +import json +import re +import sys +from pathlib import Path +from collections import Counter +from typing import List, Dict, Any, Optional + +# Regex PII critiques (mêmes patterns que le core) +RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") +RE_TEL = re.compile(r"(? List[Dict[str, Any]]: + """Charge un fichier .audit.jsonl.""" + entries = [] + with audit_path.open("r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + entries.append(json.loads(line)) + return entries + + +def load_text(txt_path: Path) -> str: + """Charge un fichier .pseudonymise.txt.""" + return txt_path.read_text(encoding="utf-8") + + +def scan_residual_pii(text: str) -> Dict[str, List[str]]: + """Détecte les PII résiduels (faux négatifs) dans le texte pseudonymisé.""" + residuals: Dict[str, List[str]] = {} + for m in RE_EMAIL.finditer(text): + # Ignorer les emails dans les placeholders + if "[" not in text[max(0, m.start() - 1):m.start()]: + residuals.setdefault("EMAIL", []).append(m.group()) + for m in RE_TEL.finditer(text): + if "[" not in text[max(0, m.start() - 1):m.start()]: + residuals.setdefault("TEL", []).append(m.group()) + for m in RE_NIR.finditer(text): + if "[" not in text[max(0, m.start() - 1):m.start()]: + residuals.setdefault("NIR", []).append(m.group()) + return residuals + + +def placeholder_density(text: str) -> Dict[str, Any]: + """Calcule la densité de placeholders. Alerte si [NOM] > 5% des mots.""" + words = text.split() + total_words = len(words) + if total_words == 0: + return {"total_words": 0, "placeholders": 0, "density_pct": 0.0, "alert": False} + + placeholder_count = sum(1 for w in words if RE_PLACEHOLDER.match(w)) + nom_count = text.count("[NOM]") + density = placeholder_count / total_words * 100 + nom_density = nom_count / total_words * 100 + + return { + "total_words": total_words, + "placeholders": placeholder_count, + "density_pct": round(density, 2), + "nom_count": nom_count, + "nom_density_pct": round(nom_density, 2), + "alert_overmasking": nom_density > 5.0, + } + + +def audit_stats(entries: List[Dict[str, Any]]) -> Dict[str, int]: + """Comptage par kind (top 10).""" + counter = Counter(e.get("kind", "UNKNOWN") for e in entries) + return dict(counter.most_common(10)) + + +def fp_candidates(entries: List[Dict[str, Any]]) -> List[Dict[str, str]]: + """Faux positifs candidats : NOM_EXTRACTED qui sont des stop words ou < 3 chars.""" + candidates = [] + for e in entries: + kind = e.get("kind", "") + original = e.get("original", "") + if kind != "NOM_EXTRACTED": + continue + is_fp = False + reason = "" + if len(original) < 3: + is_fp = True + reason = "trop court (< 3 chars)" + elif _MEDICAL_STOP_WORDS_SET and original.lower() in _MEDICAL_STOP_WORDS_SET: + is_fp = True + reason = "stop word médical" + if is_fp: + candidates.append({"kind": kind, "original": original, "reason": reason}) + return candidates + + +def fn_candidates(text: str) -> List[str]: + """Faux négatifs candidats : mots ALL-CAPS >= 5 chars non masqués, non médicaux.""" + candidates = [] + seen = set() + for m in re.finditer(r"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{5,})\b", text): + word = m.group(1) + if word in seen: + continue + seen.add(word) + # Ignorer les placeholders et mots dans les balises + if "[" in text[max(0, m.start() - 1):m.start()]: + continue + if "]" in text[m.end():min(len(text), m.end() + 1)]: + continue + # Ignorer les mots médicaux connus + if _MEDICAL_STOP_WORDS_SET and word.lower() in _MEDICAL_STOP_WORDS_SET: + continue + # Ignorer les abréviations médicales très courantes + if word in {"TABLES", "FINESS", "EMAIL", "ADRESSE", "IBAN", "EPISODE", + "ETABLISSEMENT", "DATE", "NAISSANCE", "POSTAL", "MASK", + "DOSSIER", "RPPS", "GLOBAL", "EXTRACTED", "TRACKARE"}: + continue + candidates.append(word) + return candidates + + +def analyze_file(audit_path: Path, txt_path: Optional[Path] = None) -> Dict[str, Any]: + """Analyse complète d'un couple audit.jsonl + pseudonymise.txt.""" + result: Dict[str, Any] = {"file": str(audit_path)} + + # Charger l'audit + entries = load_audit(audit_path) + result["total_hits"] = len(entries) + result["stats"] = audit_stats(entries) + result["fp_candidates"] = fp_candidates(entries) + + # Charger le texte si disponible + if txt_path is None: + # Déduire le chemin du .pseudonymise.txt + stem = audit_path.name.replace(".audit.jsonl", "") + txt_path = audit_path.parent / f"{stem}.pseudonymise.txt" + + if txt_path.exists(): + text = load_text(txt_path) + result["residual_pii"] = scan_residual_pii(text) + result["density"] = placeholder_density(text) + result["fn_candidates"] = fn_candidates(text) + else: + result["residual_pii"] = {} + result["density"] = {} + result["fn_candidates"] = [] + + return result + + +def print_report(analysis: Dict[str, Any]) -> None: + """Affiche un rapport lisible pour un fichier.""" + print(f"\n{'='*70}") + print(f" QC Audit : {analysis['file']}") + print(f"{'='*70}") + + print(f"\n Total hits audit : {analysis['total_hits']}") + + # Stats par kind + print("\n Top 10 kinds :") + for kind, count in analysis.get("stats", {}).items(): + print(f" {kind:30s} : {count}") + + # Densité + density = analysis.get("density", {}) + if density: + print(f"\n Densité placeholders : {density.get('density_pct', 0)}% " + f"({density.get('placeholders', 0)}/{density.get('total_words', 0)} mots)") + print(f" [NOM] : {density.get('nom_count', 0)} occurrences " + f"({density.get('nom_density_pct', 0)}%)") + if density.get("alert_overmasking"): + print(" *** ALERTE : sur-masquage possible ([NOM] > 5% des mots) ***") + + # PII résiduels + residuals = analysis.get("residual_pii", {}) + if residuals: + print("\n PII résiduels (faux négatifs) :") + for pii_type, values in residuals.items(): + print(f" {pii_type} : {len(values)} trouvé(s)") + for v in values[:3]: + print(f" - {v}") + else: + print("\n PII résiduels : aucun détecté") + + # FP candidats + fps = analysis.get("fp_candidates", []) + if fps: + print(f"\n FP candidats ({len(fps)}) :") + for fp in fps[:10]: + print(f" - {fp['original']:20s} ({fp['reason']})") + + # FN candidats + fns = analysis.get("fn_candidates", []) + if fns: + print(f"\n FN candidats ({len(fns)} mots ALL-CAPS non masqués) :") + for fn in fns[:15]: + print(f" - {fn}") + + print() + + +def batch_report(results: List[Dict[str, Any]], csv_path: Optional[Path] = None) -> None: + """Rapport batch résumé. Optionnel : export CSV.""" + print(f"\n{'='*70}") + print(f" RAPPORT BATCH — {len(results)} fichier(s)") + print(f"{'='*70}") + + total_hits = sum(r.get("total_hits", 0) for r in results) + total_residuals = sum( + sum(len(v) for v in r.get("residual_pii", {}).values()) + for r in results + ) + total_fps = sum(len(r.get("fp_candidates", [])) for r in results) + total_fns = sum(len(r.get("fn_candidates", [])) for r in results) + alerts = [r["file"] for r in results if r.get("density", {}).get("alert_overmasking")] + + print(f"\n Total hits audit : {total_hits}") + print(f" PII résiduels (FN) : {total_residuals}") + print(f" FP candidats : {total_fps}") + print(f" FN candidats (ALL-CAPS) : {total_fns}") + print(f" Alertes sur-masquage : {len(alerts)}") + if alerts: + for a in alerts: + print(f" - {a}") + + if csv_path: + with csv_path.open("w", newline="", encoding="utf-8") as f: + writer = csv.writer(f) + writer.writerow([ + "fichier", "total_hits", "residual_pii", "density_pct", + "nom_density_pct", "alert_overmasking", "fp_count", "fn_count", + ]) + for r in results: + d = r.get("density", {}) + writer.writerow([ + Path(r["file"]).name, + r.get("total_hits", 0), + sum(len(v) for v in r.get("residual_pii", {}).values()), + d.get("density_pct", ""), + d.get("nom_density_pct", ""), + d.get("alert_overmasking", ""), + len(r.get("fp_candidates", [])), + len(r.get("fn_candidates", [])), + ]) + print(f"\n Rapport CSV : {csv_path}") + + print() + + +def main(): + parser = argparse.ArgumentParser(description="QC Audit post-anonymisation") + parser.add_argument("files", nargs="*", help="Fichiers .audit.jsonl à analyser") + parser.add_argument("--batch-dir", type=str, help="Répertoire contenant les fichiers anonymisés") + parser.add_argument("--csv", type=str, help="Chemin du rapport CSV résumé (mode batch)") + args = parser.parse_args() + + audit_files: List[Path] = [] + + if args.batch_dir: + batch_dir = Path(args.batch_dir) + audit_files = sorted(batch_dir.glob("**/*.audit.jsonl")) + elif args.files: + audit_files = [Path(f) for f in args.files] + else: + parser.print_help() + sys.exit(1) + + if not audit_files: + print("Aucun fichier .audit.jsonl trouvé.") + sys.exit(1) + + results = [] + for af in audit_files: + analysis = analyze_file(af) + results.append(analysis) + print_report(analysis) + + if len(results) > 1: + csv_path = Path(args.csv) if args.csv else None + batch_report(results, csv_path) + + +if __name__ == "__main__": + main()