diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 60e362e..ae98411 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -391,7 +391,7 @@ _MEDICAL_STOP_WORDS_SET = { "digestif", "digestive", "digestives", "nutritive", # Abréviations soins trackare détectées comme NOM (batch 20 OGC) "soins", "lit", "jeun", "lever", "pose", "surv", "ggt", "vvp", - "verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "ute", "nfs", + "verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "nfs", # Mots narratifs CRH capturés par fusion sidebar 2-colonnes "evolution", "évolution", "explorations", "fermeture", "allergie", "allergies", "lotissement", "cholangiographie", "cholecystectomie", "cholécystectomie", @@ -403,7 +403,7 @@ _MEDICAL_STOP_WORDS_SET = { "responsable", "autre", "autres", "autonome", "autonomes", "préparations", "preparations", "prévenir", "prevenir", "acétylsalicylique", "acetylsalicylique", "angio", - "desc", "diu", "cambo", "bains", "dogue", "barreau", + "desc", "diu", "barreau", "haitz", "alde", # FP audit OGC 21 — termes médicaux/courants flagués NOM_GLOBAL "alimentation", "augmentation", "amelioration", "amélioration", @@ -486,12 +486,17 @@ _MEDICAL_STOP_WORDS_SET = { "résistant", "réévaluation", "situation", "temporaire", "urgence", "urgences", "urgent", "validation", # Mots courants / contextuels - "angle", "bille", "boisson", "bureau", "campagne", "cases", "circuit", - "clause", "concubin", "confortable", "demain", "densité", "dernière", + "angle", "bille", "boisson", "bureau", "cases", "circuit", + "concubin", "confortable", "demain", "densité", "dernière", "distant", "domaine", "elle", "fils", "frère", "grand", "horizon", "hui", "identifiant", "minuit", "murent", "neuf", "original", "pages", "personne", "premier", "quartier", "retraite", "route", "rés", - "tam", "terrasses", "trouve", "verrouillé", "villa", "étage", + "trouve", "verrouillé", "villa", "étage", + # Termes médicaux courants faussement détectés comme NOM (Phase 2 audit mars 2026) + "ains", "ponction", "hanche", "burkitt", "orl", "gds", "oap", "tvp", "epp", + "bronchite", "accueil", "cadre", "transfert", "relecture", "examens", + "traitements", "traitement", "infectiologie", "cancérologie", "cancerologie", + "maternité", "orale", "sachet", "absence", } # Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp _MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names()) @@ -655,13 +660,15 @@ RE_SERVICE = re.compile( r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)*)", ) RE_NUMERO_DOSSIER = re.compile( - r"(?:dossier|n°\s*dossier|NDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})" + r"(?:\bdossier|\bn°\s*dossier|\bNDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})" r"|" - r"(?:référence|réf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})", + r"(?:\bréférence|\bréf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})", re.IGNORECASE, ) RE_EPISODE = re.compile( - r"N°\s*[ÉéEe]pisode\s*[:\-]?\s*([A-Za-z0-9\-]{4,})", + r"N°\s*[ÉéEe]pisode\s*[:\-]?\s*([A-Za-z0-9\-]{4,})" + r"|" + r"[ÉéEe]pisode\s*N[o°.]?\s*\.?\s*:?\s*(\d{5,})", re.IGNORECASE, ) @@ -923,10 +930,13 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict return PLACEHOLDERS["DOSSIER"] line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line) - # N° EPISODE + # N° EPISODE / Episode N. (pieds de page Trackare) def _repl_episode(m: re.Match) -> str: - audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"])) - return PLACEHOLDERS["EPISODE"] + val = m.group(1) or m.group(2) or m.group(0) + audit.append(PiiHit(page_idx, "EPISODE", val, PLACEHOLDERS["EPISODE"])) + # Reconstruire le remplacement en gardant le préfixe et masquant la valeur + full = m.group(0) + return full[:full.find(val)] + PLACEHOLDERS["EPISODE"] line = RE_EPISODE.sub(_repl_episode, line) # Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.) @@ -1060,12 +1070,34 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: names: set = set() hits: List[PiiHit] = [] + force_names: set = set() # noms issus de contextes structurés (DR., Signé, etc.) → bypass stop words + def _add_name(s: str): for tok in s.split(): tok = tok.strip(" .-'(),") if len(tok) >= 2 and tok[0].isupper(): names.add(tok) + # Termes non-noms fréquents dans les contextes Signé/DR./Note d'évolution + _FORCE_EXCLUDE = _MEDICATION_WHITELIST | { + "elimination", "élimination", "forte", "intraveineuse", "lavage", + "sonde", "normal", "réalisé", "realise", "germes", "bbm", "arw", + "orale", "sachet", "injectable", "comprime", "comprimé", "gelule", + "gélule", "seringue", "poche", "flacon", "ampoule", "preremplie", + "préremplie", + } + + def _add_name_force(tok: str): + """Ajoute un nom depuis un contexte structuré fiable (DR., Signé direct, Note d'évolution). + Bypass les stop words généraux mais filtre médicaments et termes de soins courants.""" + tok = tok.strip(" .-'(),") + if len(tok) < 3 or not tok[0].isupper(): + return + if tok.lower() in _FORCE_EXCLUDE: + return + names.add(tok) + force_names.add(tok) + # --- Identité patient --- # Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire) for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE): @@ -1102,6 +1134,10 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text): hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]"))) + # RPPS isolés (11 chiffres commençant par 1 ou 2, seul sur une ligne ou en fin de ligne) + for m in re.finditer(r"^\s*([12]\d{10})\s*$", full_text, re.MULTILINE): + hits.append(PiiHit(-1, "RPPS", m.group(1), PLACEHOLDERS["RPPS"])) + # Adresse patient (toutes les occurrences) for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() @@ -1192,8 +1228,8 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for g in (m.group(1), m.group(2)): if g: tok = g.rstrip('-') - if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: - _add_name(tok) + if len(tok) >= 3: + _add_name_force(tok) # --- "Signé" suivi directement d'un nom de soignant (ex: "Signé LARRIEU-") --- for m in re.finditer( @@ -1204,8 +1240,8 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: for g in (m.group(1), m.group(2)): if g: tok = g.rstrip('-') - if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: - _add_name(tok) + if len(tok) >= 3: + _add_name_force(tok) # --- "Signé —" + médicament + nom soignant (ex: "Signé — PARACETAMOL BBM 1000 MG INJ NARZABAL") --- for m in re.finditer( @@ -1230,9 +1266,21 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(tok) + # --- "DR." / "DR" suivi d'un prénom seul (ex: "DR. Ute", "DR. Tam") dans les prescriptions --- + for m in re.finditer( + r"DR\.?\s+([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})" + r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+))?", + full_text + ): + for g in (m.group(1), m.group(2)): + if g: + tok = g.strip() + if len(tok) >= 3: + _add_name_force(tok) + # --- Noms soignants après timestamps dans activités de soins (ex: "07:00 ETCHEBARNE") --- # Format Trackare : actions de soins suivies de "HH:MM NOM" ou "HH : MM NOM" - # Pattern restrictif : nom ALL-CAPS de 4+ lettres pour éviter FP (termes médicaux mixtes) + # Pattern restrictif : nom ALL-CAPS de 4+ lettres, filtre stop words (pattern bruyant) for m in re.finditer( r"\d{1,2}\s*:\s*\d{2}\s+" r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛ\-]{3,})" @@ -1245,11 +1293,12 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(tok) - # Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement) + # Filtrer les tokens trop courts ou stop words + # Exceptions : force_names (contextes structurés) et city_tokens (villes extraites) city_tokens = {h.original for h in hits if h.kind == "VILLE"} filtered = set() for tok in names: - if tok in city_tokens: + if tok in city_tokens or tok in force_names: filtered.add(tok) continue if len(tok) < 3: @@ -1258,7 +1307,7 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: continue filtered.add(tok) - return filtered, hits + return filtered, hits, force_names def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: @@ -1358,11 +1407,11 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: return names -def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: +def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_names: set = None) -> str: """Remplace globalement chaque nom extrait dans le texte.""" placeholder = PLACEHOLDERS["NOM"] - # Filtrer les stop words et tokens trop courts en dernière ligne de défense - safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET} + _force = force_names or set() + safe_names = {n for n in names if len(n) >= 3 and (n in _force or n.lower() not in _MEDICAL_STOP_WORDS_SET)} for token in sorted(safe_names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) new_text = [] @@ -1393,6 +1442,24 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: return text +def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str: + """Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, etc.). + Ces hits sont détectés par _extract_trackare_identity mais n'étaient appliqués + qu'au PDF raster, pas au fichier .pseudonymise.txt.""" + _APPLY_KINDS = {"EPISODE", "RPPS"} + # Collecter les valeurs à remplacer, groupées par placeholder + replacements: Dict[str, str] = {} # original → placeholder + for h in audit: + if h.kind in _APPLY_KINDS and h.original and len(h.original.strip()) >= 4: + replacements[h.original.strip()] = h.placeholder + # Remplacer les plus longs d'abord (éviter les remplacements partiels) + for original in sorted(replacements, key=len, reverse=True): + placeholder = replacements[original] + # Word boundary pour ne pas casser les mots (ex: ONDANSETRON) + text = re.sub(rf"\b{re.escape(original)}\b", placeholder, text) + return text + + # ----------------- Anonymisation (regex) ----------------- def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult: @@ -1406,8 +1473,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] # Phase 0b : si document Trackare, extraction renforcée des PII structurés is_trackare = _is_trackare_document(full_raw) + trackare_force_names: set = set() if is_trackare: - trackare_names, trackare_hits = _extract_trackare_identity(full_raw) + trackare_names, trackare_hits, trackare_force_names = _extract_trackare_identity(full_raw) extracted_names.update(trackare_names) audit.extend(trackare_hits) @@ -1436,7 +1504,12 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] # Phase 2 : application globale des noms extraits (rattrapage) if extracted_names: - text_out = _apply_extracted_names(text_out, extracted_names, audit) + text_out = _apply_extracted_names(text_out, extracted_names, audit, force_names=trackare_force_names) + + # Phase 2b : application globale des PiiHit trackare (NDA footers, EPISODE, etc.) + # Ces hits sont détectés par _extract_trackare_identity mais pas encore remplacés dans le texte + if is_trackare: + text_out = _apply_trackare_hits_to_text(text_out, audit) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) diff --git a/regression_tests/check_regression.py b/regression_tests/check_regression.py new file mode 100644 index 0000000..e72327a --- /dev/null +++ b/regression_tests/check_regression.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +"""Test de non-régression : compare baseline vs nouvelle sortie. + +Usage: + python regression_tests/check_regression.py [--rerun] + +Sans --rerun : compare baseline/ vs current output (anonymise_audit_30/) +Avec --rerun : relance l'anonymisation puis compare +""" +import json +import re +import sys +from collections import Counter +from pathlib import Path + +BASELINE_DIR = Path(__file__).parent / "baseline" +OUTPUT_DIR = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)/anonymise_audit_30") + +# === Patterns de fuites connues === +LEAK_CHECKS = { + "NDA_footer": re.compile(r"Episode\s+N[o°.]?\s*\.?\s*:\s*(\d{5,})"), + "ONDANSETRON_broken": re.compile(r"O\[DOSSIER\]"), + "RPPS_raw": re.compile(r"\b[12]\d{10}\b"), # 11 chiffres commençant par 1 ou 2 + "bracket_double": re.compile(r"\[\["), + "www_hospital": re.compile(r"www\.ch-cote-basque"), + "FINESS_raw": re.compile(r"\b640000162\b"), +} + +# === Termes médicaux qui NE doivent PAS être masqués === +FALSE_POSITIVE_CHECKS = { + "AINS_masked": re.compile(r"\[NOM\].*(?:céphalée|paracétamol)|paracétamol.*\[NOM\]", re.I), + "ponction_masked": re.compile(r"\[NOM\]\s+lombaire", re.I), + "hanche_masked": re.compile(r"(?:de\s+la|de)\s+\[NOM\].*(?:profil|opérée|fémorale)", re.I), + "ORL_masked": re.compile(r"IRM\s+\[NOM\]", re.I), + "burkitt_masked": re.compile(r"\[NOM\]\s*\.\s*(?:stade|type|lymphome)?", re.I), +} + +PLACEHOLDER_RE = re.compile(r"\[(NOM|TEL|EMAIL|NIR|IPP|DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|ADRESSE|CODE_POSTAL|VILLE|MASK|FINESS|OGC|AGE|ETABLISSEMENT|IBAN)\]") + + +def analyze_file(txt_path: Path) -> dict: + """Analyse un fichier pseudonymisé et retourne les métriques.""" + text = txt_path.read_text(encoding="utf-8", errors="replace") + lines = text.splitlines() + + result = { + "file": txt_path.name, + "lines": len(lines), + "chars": len(text), + "empty": len(text.strip()) == 0, + } + + # Comptage des placeholders + ph_counts = Counter() + for m in PLACEHOLDER_RE.finditer(text): + ph_counts[m.group(1)] += 1 + result["placeholders"] = dict(ph_counts) + result["total_placeholders"] = sum(ph_counts.values()) + + # Détection de fuites + leaks = {} + for name, pattern in LEAK_CHECKS.items(): + matches = pattern.findall(text) + if matches: + leaks[name] = len(matches) + result["leaks"] = leaks + result["total_leaks"] = sum(leaks.values()) + + # Détection de faux positifs + fps = {} + for name, pattern in FALSE_POSITIVE_CHECKS.items(): + matches = pattern.findall(text) + if matches: + fps[name] = len(matches) + result["false_positives"] = fps + result["total_fps"] = sum(fps.values()) + + return result + + +def compare_reports(baseline_report: dict, new_report: dict) -> dict: + """Compare deux rapports et identifie régressions/améliorations.""" + changes = { + "improved_leaks": [], + "regressed_leaks": [], + "improved_fps": [], + "regressed_fps": [], + "placeholder_delta": {}, + } + + # Comparer les fuites + all_leak_keys = set(baseline_report["leaks"].keys()) | set(new_report["leaks"].keys()) + for k in all_leak_keys: + old = baseline_report["leaks"].get(k, 0) + new = new_report["leaks"].get(k, 0) + if new < old: + changes["improved_leaks"].append((k, old, new)) + elif new > old: + changes["regressed_leaks"].append((k, old, new)) + + # Comparer les FP + all_fp_keys = set(baseline_report["false_positives"].keys()) | set(new_report["false_positives"].keys()) + for k in all_fp_keys: + old = baseline_report["false_positives"].get(k, 0) + new = new_report["false_positives"].get(k, 0) + if new < old: + changes["improved_fps"].append((k, old, new)) + elif new > old: + changes["regressed_fps"].append((k, old, new)) + + # Comparer les placeholders + all_ph = set(baseline_report["placeholders"].keys()) | set(new_report["placeholders"].keys()) + for k in all_ph: + old = baseline_report["placeholders"].get(k, 0) + new = new_report["placeholders"].get(k, 0) + if old != new: + changes["placeholder_delta"][k] = new - old + + return changes + + +def main(): + rerun = "--rerun" in sys.argv + + if rerun: + print("=== Relance de l'anonymisation des 30 fichiers ===\n") + import subprocess + result = subprocess.run( + [sys.executable, "run_batch_30_audit.py"], + cwd=str(Path(__file__).parent.parent), + capture_output=False, + ) + if result.returncode != 0: + print("ERREUR: batch échoué") + sys.exit(1) + print() + + # Analyser la baseline + baseline_files = sorted(BASELINE_DIR.glob("*.pseudonymise.txt")) + new_files = sorted(OUTPUT_DIR.glob("*.pseudonymise.txt")) + + if not baseline_files: + print("ERREUR: pas de fichiers baseline trouvés") + sys.exit(1) + + print(f"=== RAPPORT DE NON-RÉGRESSION ===") + print(f"Baseline: {len(baseline_files)} fichiers") + print(f"Nouveau: {len(new_files)} fichiers\n") + + # Rapport par fichier + baseline_reports = {} + new_reports = {} + + for f in baseline_files: + baseline_reports[f.name] = analyze_file(f) + for f in new_files: + new_reports[f.name] = analyze_file(f) + + # === Métriques globales baseline === + total_leaks_baseline = sum(r["total_leaks"] for r in baseline_reports.values()) + total_fps_baseline = sum(r["total_fps"] for r in baseline_reports.values()) + total_ph_baseline = sum(r["total_placeholders"] for r in baseline_reports.values()) + empty_baseline = sum(1 for r in baseline_reports.values() if r["empty"]) + + total_leaks_new = sum(r["total_leaks"] for r in new_reports.values()) + total_fps_new = sum(r["total_fps"] for r in new_reports.values()) + total_ph_new = sum(r["total_placeholders"] for r in new_reports.values()) + empty_new = sum(1 for r in new_reports.values() if r["empty"]) + + print("--- MÉTRIQUES GLOBALES ---") + print(f"{'Métrique':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") + print("-" * 62) + + def delta_str(old, new): + d = new - old + if d > 0: + return f"+{d}" + return str(d) + + print(f"{'Fuites détectées':<30} {total_leaks_baseline:>10} {total_leaks_new:>10} {delta_str(total_leaks_baseline, total_leaks_new):>10}") + print(f"{'Faux positifs détectés':<30} {total_fps_baseline:>10} {total_fps_new:>10} {delta_str(total_fps_baseline, total_fps_new):>10}") + print(f"{'Total placeholders':<30} {total_ph_baseline:>10} {total_ph_new:>10} {delta_str(total_ph_baseline, total_ph_new):>10}") + print(f"{'Fichiers vides':<30} {empty_baseline:>10} {empty_new:>10} {delta_str(empty_baseline, empty_new):>10}") + + # Détail des fuites par type + all_leak_types = set() + for r in list(baseline_reports.values()) + list(new_reports.values()): + all_leak_types.update(r["leaks"].keys()) + + if all_leak_types: + print("\n--- FUITES PAR TYPE ---") + print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") + print("-" * 62) + for lt in sorted(all_leak_types): + old = sum(r["leaks"].get(lt, 0) for r in baseline_reports.values()) + new = sum(r["leaks"].get(lt, 0) for r in new_reports.values()) + marker = " ✓" if new < old else (" ✗" if new > old else "") + print(f"{lt:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}") + + # Détail des FP par type + all_fp_types = set() + for r in list(baseline_reports.values()) + list(new_reports.values()): + all_fp_types.update(r["false_positives"].keys()) + + if all_fp_types: + print("\n--- FAUX POSITIFS PAR TYPE ---") + print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") + print("-" * 62) + for ft in sorted(all_fp_types): + old = sum(r["false_positives"].get(ft, 0) for r in baseline_reports.values()) + new = sum(r["false_positives"].get(ft, 0) for r in new_reports.values()) + marker = " ✓" if new < old else (" ✗" if new > old else "") + print(f"{ft:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}") + + # Fichiers avec régressions + regressions = [] + improvements = [] + for fname in sorted(set(baseline_reports.keys()) & set(new_reports.keys())): + changes = compare_reports(baseline_reports[fname], new_reports[fname]) + if changes["regressed_leaks"]: + regressions.append((fname, changes)) + if changes["improved_leaks"] or changes["improved_fps"]: + improvements.append((fname, changes)) + + if regressions: + print(f"\n⚠ RÉGRESSIONS ({len(regressions)} fichiers):") + for fname, changes in regressions: + for k, old, new in changes["regressed_leaks"]: + print(f" {fname}: {k} {old}→{new} (+{new-old})") + + if improvements: + print(f"\n✓ AMÉLIORATIONS ({len(improvements)} fichiers):") + for fname, changes in improvements: + for k, old, new in changes["improved_leaks"]: + print(f" {fname}: {k} {old}→{new} (-{old-new})") + for k, old, new in changes["improved_fps"]: + print(f" {fname}: FP {k} {old}→{new} (-{old-new})") + + # Verdict final + print("\n" + "=" * 62) + if total_leaks_new > total_leaks_baseline: + print("❌ RÉGRESSION : plus de fuites qu'avant") + sys.exit(1) + elif total_leaks_new < total_leaks_baseline: + print(f"✅ AMÉLIORATION : {total_leaks_baseline - total_leaks_new} fuites en moins") + else: + print("➡ NEUTRE : même nombre de fuites") + + if total_fps_new < total_fps_baseline: + print(f"✅ AMÉLIORATION : {total_fps_baseline - total_fps_new} faux positifs en moins") + elif total_fps_new > total_fps_baseline: + print(f"⚠ ATTENTION : {total_fps_new - total_fps_baseline} faux positifs en plus") + + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/run_batch_30_audit.py b/run_batch_30_audit.py new file mode 100644 index 0000000..199acfd --- /dev/null +++ b/run_batch_30_audit.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python3 +"""Batch 30 fichiers aléatoires pour contrôle humain.""" +import sys +import time +import json +from pathlib import Path +from collections import Counter + +sys.path.insert(0, str(Path(__file__).parent)) + +import anonymizer_core_refactored_onnx as core +from eds_pseudo_manager import EdsPseudoManager + +SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)") +OUTDIR = SRC / "anonymise_audit_30" +CONFIG = Path("/home/dom/ai/anonymisation/config/dictionnaires.yml") + +PDFS = [ + SRC / "110_23061319/trackare-07026002-23061319_07026002_23061319.pdf", + SRC / "115_23066188/CRH 23066188.pdf", + SRC / "161_23098838/CRO 23098838.pdf", + SRC / "179_23126805/trackare-23005591-23126805_23005591_23126805.pdf", + SRC / "181_23127286/CRH 23127286.pdf", + SRC / "192_23132490/CRH 23132490.pdf", + SRC / "208_23151988/trackare-23020064-23151988_23020064_23151988.pdf", + SRC / "215_23158603/trackare-22028007-23158603_22028007_23158603.pdf", + SRC / "227_23173599/CRH 23173599.pdf", + SRC / "236_23116794/trackare-BA054633-23116794_BA054633_23116794.pdf", + SRC / "248_23194278/CRH 23194278.pdf", + SRC / "263_23203642/CRO 23203642.pdf", + SRC / "28_23135549/trackare-15021750-23135549_15021750_23135549.pdf", + SRC / "321_23043929/CRH 321_23066387.pdf", + SRC / "379_23098754/trackare-18009635-23098754_18009635_23098754.pdf", + SRC / "39_23167029/trackare-23022121-23167029_23022121_23167029.pdf", + SRC / "444_23141032/trackare-BA102259-23141032_BA102259_23141032.pdf", + SRC / "478_23161697/cro 478_23161697.pdf", + SRC / "50_23219173/trackare-07019278-23219173_07019278_23219173.pdf", + SRC / "520_23177582/trackare-99252128-23177582_99252128_23177582.pdf", + SRC / "556_23220878/trackare-21041742-23220878_21041742_23220878.pdf", + SRC / "602_23070052/trackare-20028293-23070052_20028293_23070052.pdf", + SRC / "604_23070704/trackare-23008170-23070704_23008170_23070704.pdf", + SRC / "655_23163458/trackare-01296746-23163458_01296746_23163458.pdf", + SRC / "684_23207941/CRH 684_23207941.pdf", + SRC / "79_23187785/79_23187785 Dossier.pdf", + SRC / "12_23084754/CRO 23084754.pdf" if (SRC / "12_23084754/CRO 23084754.pdf").exists() else SRC / "122_23070126/LETTRE DE SORTIE 23070126.pdf", + SRC / "122_23070126/LETTRE DE SORTIE 23070126.pdf", + SRC / "131_23079402/CRH 23079402.pdf", + SRC / "290_23025988/cr anesth 290_23025988.pdf", +] + + +def main(): + print("Chargement EDS-Pseudo...", flush=True) + ner = EdsPseudoManager() + ner.load() + assert ner.is_loaded(), "EDS-Pseudo non chargé" + print("EDS-Pseudo chargé.\n", flush=True) + + # Vérifier existence des fichiers + existing = [p for p in PDFS if p.exists()] + missing = [p for p in PDFS if not p.exists()] + if missing: + print(f"ATTENTION: {len(missing)} fichiers manquants:") + for p in missing: + print(f" - {p.name}") + print() + + print(f"Fichiers à traiter: {len(existing)}/30\n") + OUTDIR.mkdir(exist_ok=True) + + ok = ko = skip_encrypted = 0 + global_counts = Counter() + t0 = time.time() + + for i, pdf in enumerate(existing, 1): + ogc = pdf.parent.name.split("_")[0] + print(f"[{i}/{len(existing)}] {pdf.name} (OGC {ogc})...", end=" ", flush=True) + try: + outputs = core.process_pdf( + pdf_path=pdf, + out_dir=OUTDIR, + make_vector_redaction=False, + also_make_raster_burn=True, + config_path=CONFIG, + use_hf=True, + ner_manager=ner, + ner_thresholds=None, + ogc_label=ogc, + ) + audit_path = Path(outputs.get("audit", "")) + if audit_path.exists(): + for line in audit_path.read_text().splitlines(): + try: + h = json.loads(line) + global_counts[h["kind"]] += 1 + except Exception: + pass + print("OK", flush=True) + ok += 1 + except Exception as e: + err = str(e) + if "encrypted" in err.lower() or "password" in err.lower(): + print(f"SKIP (chiffré)", flush=True) + skip_encrypted += 1 + else: + print(f"ERREUR: {e}", flush=True) + ko += 1 + + elapsed = time.time() - t0 + print(f"\n{'='*60}") + print(f"Terminé en {elapsed:.0f}s — OK: {ok}, Chiffrés: {skip_encrypted}, Erreurs: {ko}") + print(f"Total PII détectés: {sum(global_counts.values())}") + print(f"\nDétail par type:") + for k, v in sorted(global_counts.items(), key=lambda x: -x[1]): + print(f" {k:30s} {v:6d}") + print(f"\nSortie: {OUTDIR}") + + +if __name__ == "__main__": + main()