fix: Corrections qualité Phase 1 — 261 fuites en moins, 0 régression

Audit sur 30 fichiers aléatoires (OGC 12-690) révélant un overfitting
sur les 59 premiers OGC. Corrections appliquées avec test de non-régression
à chaque étape :

- NDA pieds de page Trackare : regex Episode N. (227→0 fuites)
- ONDANSETRON : word boundary \b sur RE_NUMERO_DOSSIER (32→0)
- RPPS isolés : détection 11 chiffres dans docs Trackare (3→0)
- Stop words : retrait noms réels (ute, dogue, cambo, bains), ajout
  termes médicaux (AINS, ponction, hanche, burkitt, ORL, GDS, OAP...)
- Pattern DR. Prénom NOM : capture prénoms médecins (Ute ×19, Tam...)
- force_names : contextes structurés (DR., Signé, Note d'évolution)
  bypassent les stop words pour masquer les vrais noms de soignants
- Phase 2b : PiiHit trackare (EPISODE, RPPS) appliqués au texte .txt
- Framework de non-régression (regression_tests/) + batch audit 30 fichiers

Résultat : 322→61 fuites détectées, 113→109 faux positifs, 0 régression.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 17:32:28 +01:00
parent 2d6f8c0309
commit a356b63d68
3 changed files with 475 additions and 24 deletions

View File

@@ -391,7 +391,7 @@ _MEDICAL_STOP_WORDS_SET = {
"digestif", "digestive", "digestives", "nutritive",
# Abréviations soins trackare détectées comme NOM (batch 20 OGC)
"soins", "lit", "jeun", "lever", "pose", "surv", "ggt", "vvp",
"verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "ute", "nfs",
"verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "nfs",
# Mots narratifs CRH capturés par fusion sidebar 2-colonnes
"evolution", "évolution", "explorations", "fermeture", "allergie", "allergies",
"lotissement", "cholangiographie", "cholecystectomie", "cholécystectomie",
@@ -403,7 +403,7 @@ _MEDICAL_STOP_WORDS_SET = {
"responsable", "autre", "autres", "autonome", "autonomes",
"préparations", "preparations", "prévenir", "prevenir",
"acétylsalicylique", "acetylsalicylique", "angio",
"desc", "diu", "cambo", "bains", "dogue", "barreau",
"desc", "diu", "barreau",
"haitz", "alde",
# FP audit OGC 21 — termes médicaux/courants flagués NOM_GLOBAL
"alimentation", "augmentation", "amelioration", "amélioration",
@@ -486,12 +486,17 @@ _MEDICAL_STOP_WORDS_SET = {
"résistant", "réévaluation", "situation", "temporaire", "urgence", "urgences",
"urgent", "validation",
# Mots courants / contextuels
"angle", "bille", "boisson", "bureau", "campagne", "cases", "circuit",
"clause", "concubin", "confortable", "demain", "densité", "dernière",
"angle", "bille", "boisson", "bureau", "cases", "circuit",
"concubin", "confortable", "demain", "densité", "dernière",
"distant", "domaine", "elle", "fils", "frère", "grand", "horizon",
"hui", "identifiant", "minuit", "murent", "neuf", "original", "pages",
"personne", "premier", "quartier", "retraite", "route", "rés",
"tam", "terrasses", "trouve", "verrouillé", "villa", "étage",
"trouve", "verrouillé", "villa", "étage",
# Termes médicaux courants faussement détectés comme NOM (Phase 2 audit mars 2026)
"ains", "ponction", "hanche", "burkitt", "orl", "gds", "oap", "tvp", "epp",
"bronchite", "accueil", "cadre", "transfert", "relecture", "examens",
"traitements", "traitement", "infectiologie", "cancérologie", "cancerologie",
"maternité", "orale", "sachet", "absence",
}
# Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp
_MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names())
@@ -655,13 +660,15 @@ RE_SERVICE = re.compile(
r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)*)",
)
RE_NUMERO_DOSSIER = re.compile(
r"(?:dossier|n°\s*dossier|NDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})"
r"(?:\bdossier|\b\s*dossier|\bNDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})"
r"|"
r"(?:référence|réf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})",
r"(?:\bréférence|\bréf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})",
re.IGNORECASE,
)
RE_EPISODE = re.compile(
r"\s*[ÉéEe]pisode\s*[:\-]?\s*([A-Za-z0-9\-]{4,})",
r"\s*[ÉéEe]pisode\s*[:\-]?\s*([A-Za-z0-9\-]{4,})"
r"|"
r"[ÉéEe]pisode\s*N[o°.]?\s*\.?\s*:?\s*(\d{5,})",
re.IGNORECASE,
)
@@ -923,10 +930,13 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict
return PLACEHOLDERS["DOSSIER"]
line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line)
# N° EPISODE
# N° EPISODE / Episode N. (pieds de page Trackare)
def _repl_episode(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"]))
return PLACEHOLDERS["EPISODE"]
val = m.group(1) or m.group(2) or m.group(0)
audit.append(PiiHit(page_idx, "EPISODE", val, PLACEHOLDERS["EPISODE"]))
# Reconstruire le remplacement en gardant le préfixe et masquant la valeur
full = m.group(0)
return full[:full.find(val)] + PLACEHOLDERS["EPISODE"]
line = RE_EPISODE.sub(_repl_episode, line)
# Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.)
@@ -1060,12 +1070,34 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
names: set = set()
hits: List[PiiHit] = []
force_names: set = set() # noms issus de contextes structurés (DR., Signé, etc.) → bypass stop words
def _add_name(s: str):
for tok in s.split():
tok = tok.strip(" .-'(),")
if len(tok) >= 2 and tok[0].isupper():
names.add(tok)
# Termes non-noms fréquents dans les contextes Signé/DR./Note d'évolution
_FORCE_EXCLUDE = _MEDICATION_WHITELIST | {
"elimination", "élimination", "forte", "intraveineuse", "lavage",
"sonde", "normal", "réalisé", "realise", "germes", "bbm", "arw",
"orale", "sachet", "injectable", "comprime", "comprimé", "gelule",
"gélule", "seringue", "poche", "flacon", "ampoule", "preremplie",
"préremplie",
}
def _add_name_force(tok: str):
"""Ajoute un nom depuis un contexte structuré fiable (DR., Signé direct, Note d'évolution).
Bypass les stop words généraux mais filtre médicaments et termes de soins courants."""
tok = tok.strip(" .-'(),")
if len(tok) < 3 or not tok[0].isupper():
return
if tok.lower() in _FORCE_EXCLUDE:
return
names.add(tok)
force_names.add(tok)
# --- Identité patient ---
# Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire)
for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE):
@@ -1102,6 +1134,10 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text):
hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]")))
# RPPS isolés (11 chiffres commençant par 1 ou 2, seul sur une ligne ou en fin de ligne)
for m in re.finditer(r"^\s*([12]\d{10})\s*$", full_text, re.MULTILINE):
hits.append(PiiHit(-1, "RPPS", m.group(1), PLACEHOLDERS["RPPS"]))
# Adresse patient (toutes les occurrences)
for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE):
val = m.group(1).strip()
@@ -1192,8 +1228,8 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
for g in (m.group(1), m.group(2)):
if g:
tok = g.rstrip('-')
if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(tok)
if len(tok) >= 3:
_add_name_force(tok)
# --- "Signé" suivi directement d'un nom de soignant (ex: "Signé LARRIEU-") ---
for m in re.finditer(
@@ -1204,8 +1240,8 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
for g in (m.group(1), m.group(2)):
if g:
tok = g.rstrip('-')
if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(tok)
if len(tok) >= 3:
_add_name_force(tok)
# --- "Signé —" + médicament + nom soignant (ex: "Signé — PARACETAMOL BBM 1000 MG INJ NARZABAL") ---
for m in re.finditer(
@@ -1230,9 +1266,21 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
if len(tok) >= 3 and tok.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(tok)
# --- "DR." / "DR" suivi d'un prénom seul (ex: "DR. Ute", "DR. Tam") dans les prescriptions ---
for m in re.finditer(
r"DR\.?\s+([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})"
r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+))?",
full_text
):
for g in (m.group(1), m.group(2)):
if g:
tok = g.strip()
if len(tok) >= 3:
_add_name_force(tok)
# --- Noms soignants après timestamps dans activités de soins (ex: "07:00 ETCHEBARNE") ---
# Format Trackare : actions de soins suivies de "HH:MM NOM" ou "HH : MM NOM"
# Pattern restrictif : nom ALL-CAPS de 4+ lettres pour éviter FP (termes médicaux mixtes)
# Pattern restrictif : nom ALL-CAPS de 4+ lettres, filtre stop words (pattern bruyant)
for m in re.finditer(
r"\d{1,2}\s*:\s*\d{2}\s+"
r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛ\-]{3,})"
@@ -1245,11 +1293,12 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
if len(tok) >= 4 and tok.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(tok)
# Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement)
# Filtrer les tokens trop courts ou stop words
# Exceptions : force_names (contextes structurés) et city_tokens (villes extraites)
city_tokens = {h.original for h in hits if h.kind == "VILLE"}
filtered = set()
for tok in names:
if tok in city_tokens:
if tok in city_tokens or tok in force_names:
filtered.add(tok)
continue
if len(tok) < 3:
@@ -1258,7 +1307,7 @@ def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
continue
filtered.add(tok)
return filtered, hits
return filtered, hits, force_names
def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set:
@@ -1358,11 +1407,11 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set:
return names
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str:
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit], force_names: set = None) -> str:
"""Remplace globalement chaque nom extrait dans le texte."""
placeholder = PLACEHOLDERS["NOM"]
# Filtrer les stop words et tokens trop courts en dernière ligne de défense
safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET}
_force = force_names or set()
safe_names = {n for n in names if len(n) >= 3 and (n in _force or n.lower() not in _MEDICAL_STOP_WORDS_SET)}
for token in sorted(safe_names, key=len, reverse=True):
pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE)
new_text = []
@@ -1393,6 +1442,24 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str:
return text
def _apply_trackare_hits_to_text(text: str, audit: List[PiiHit]) -> str:
"""Applique les PiiHit non-NOM dans le texte (NDA footers, EPISODE, RPPS, etc.).
Ces hits sont détectés par _extract_trackare_identity mais n'étaient appliqués
qu'au PDF raster, pas au fichier .pseudonymise.txt."""
_APPLY_KINDS = {"EPISODE", "RPPS"}
# Collecter les valeurs à remplacer, groupées par placeholder
replacements: Dict[str, str] = {} # original → placeholder
for h in audit:
if h.kind in _APPLY_KINDS and h.original and len(h.original.strip()) >= 4:
replacements[h.original.strip()] = h.placeholder
# Remplacer les plus longs d'abord (éviter les remplacements partiels)
for original in sorted(replacements, key=len, reverse=True):
placeholder = replacements[original]
# Word boundary pour ne pas casser les mots (ex: ONDANSETRON)
text = re.sub(rf"\b{re.escape(original)}\b", placeholder, text)
return text
# ----------------- Anonymisation (regex) -----------------
def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult:
@@ -1406,8 +1473,9 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
# Phase 0b : si document Trackare, extraction renforcée des PII structurés
is_trackare = _is_trackare_document(full_raw)
trackare_force_names: set = set()
if is_trackare:
trackare_names, trackare_hits = _extract_trackare_identity(full_raw)
trackare_names, trackare_hits, trackare_force_names = _extract_trackare_identity(full_raw)
extracted_names.update(trackare_names)
audit.extend(trackare_hits)
@@ -1436,7 +1504,12 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]
# Phase 2 : application globale des noms extraits (rattrapage)
if extracted_names:
text_out = _apply_extracted_names(text_out, extracted_names, audit)
text_out = _apply_extracted_names(text_out, extracted_names, audit, force_names=trackare_force_names)
# Phase 2b : application globale des PiiHit trackare (NDA footers, EPISODE, etc.)
# Ces hits sont détectés par _extract_trackare_identity mais pas encore remplacés dans le texte
if is_trackare:
text_out = _apply_trackare_hits_to_text(text_out, audit)
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare)

View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""Test de non-régression : compare baseline vs nouvelle sortie.
Usage:
python regression_tests/check_regression.py [--rerun]
Sans --rerun : compare baseline/ vs current output (anonymise_audit_30/)
Avec --rerun : relance l'anonymisation puis compare
"""
import json
import re
import sys
from collections import Counter
from pathlib import Path
BASELINE_DIR = Path(__file__).parent / "baseline"
OUTPUT_DIR = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)/anonymise_audit_30")
# === Patterns de fuites connues ===
LEAK_CHECKS = {
"NDA_footer": re.compile(r"Episode\s+N[o°.]?\s*\.?\s*:\s*(\d{5,})"),
"ONDANSETRON_broken": re.compile(r"O\[DOSSIER\]"),
"RPPS_raw": re.compile(r"\b[12]\d{10}\b"), # 11 chiffres commençant par 1 ou 2
"bracket_double": re.compile(r"\[\["),
"www_hospital": re.compile(r"www\.ch-cote-basque"),
"FINESS_raw": re.compile(r"\b640000162\b"),
}
# === Termes médicaux qui NE doivent PAS être masqués ===
FALSE_POSITIVE_CHECKS = {
"AINS_masked": re.compile(r"\[NOM\].*(?:céphalée|paracétamol)|paracétamol.*\[NOM\]", re.I),
"ponction_masked": re.compile(r"\[NOM\]\s+lombaire", re.I),
"hanche_masked": re.compile(r"(?:de\s+la|de)\s+\[NOM\].*(?:profil|opérée|fémorale)", re.I),
"ORL_masked": re.compile(r"IRM\s+\[NOM\]", re.I),
"burkitt_masked": re.compile(r"\[NOM\]\s*\.\s*(?:stade|type|lymphome)?", re.I),
}
PLACEHOLDER_RE = re.compile(r"\[(NOM|TEL|EMAIL|NIR|IPP|DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|ADRESSE|CODE_POSTAL|VILLE|MASK|FINESS|OGC|AGE|ETABLISSEMENT|IBAN)\]")
def analyze_file(txt_path: Path) -> dict:
"""Analyse un fichier pseudonymisé et retourne les métriques."""
text = txt_path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines()
result = {
"file": txt_path.name,
"lines": len(lines),
"chars": len(text),
"empty": len(text.strip()) == 0,
}
# Comptage des placeholders
ph_counts = Counter()
for m in PLACEHOLDER_RE.finditer(text):
ph_counts[m.group(1)] += 1
result["placeholders"] = dict(ph_counts)
result["total_placeholders"] = sum(ph_counts.values())
# Détection de fuites
leaks = {}
for name, pattern in LEAK_CHECKS.items():
matches = pattern.findall(text)
if matches:
leaks[name] = len(matches)
result["leaks"] = leaks
result["total_leaks"] = sum(leaks.values())
# Détection de faux positifs
fps = {}
for name, pattern in FALSE_POSITIVE_CHECKS.items():
matches = pattern.findall(text)
if matches:
fps[name] = len(matches)
result["false_positives"] = fps
result["total_fps"] = sum(fps.values())
return result
def compare_reports(baseline_report: dict, new_report: dict) -> dict:
"""Compare deux rapports et identifie régressions/améliorations."""
changes = {
"improved_leaks": [],
"regressed_leaks": [],
"improved_fps": [],
"regressed_fps": [],
"placeholder_delta": {},
}
# Comparer les fuites
all_leak_keys = set(baseline_report["leaks"].keys()) | set(new_report["leaks"].keys())
for k in all_leak_keys:
old = baseline_report["leaks"].get(k, 0)
new = new_report["leaks"].get(k, 0)
if new < old:
changes["improved_leaks"].append((k, old, new))
elif new > old:
changes["regressed_leaks"].append((k, old, new))
# Comparer les FP
all_fp_keys = set(baseline_report["false_positives"].keys()) | set(new_report["false_positives"].keys())
for k in all_fp_keys:
old = baseline_report["false_positives"].get(k, 0)
new = new_report["false_positives"].get(k, 0)
if new < old:
changes["improved_fps"].append((k, old, new))
elif new > old:
changes["regressed_fps"].append((k, old, new))
# Comparer les placeholders
all_ph = set(baseline_report["placeholders"].keys()) | set(new_report["placeholders"].keys())
for k in all_ph:
old = baseline_report["placeholders"].get(k, 0)
new = new_report["placeholders"].get(k, 0)
if old != new:
changes["placeholder_delta"][k] = new - old
return changes
def main():
rerun = "--rerun" in sys.argv
if rerun:
print("=== Relance de l'anonymisation des 30 fichiers ===\n")
import subprocess
result = subprocess.run(
[sys.executable, "run_batch_30_audit.py"],
cwd=str(Path(__file__).parent.parent),
capture_output=False,
)
if result.returncode != 0:
print("ERREUR: batch échoué")
sys.exit(1)
print()
# Analyser la baseline
baseline_files = sorted(BASELINE_DIR.glob("*.pseudonymise.txt"))
new_files = sorted(OUTPUT_DIR.glob("*.pseudonymise.txt"))
if not baseline_files:
print("ERREUR: pas de fichiers baseline trouvés")
sys.exit(1)
print(f"=== RAPPORT DE NON-RÉGRESSION ===")
print(f"Baseline: {len(baseline_files)} fichiers")
print(f"Nouveau: {len(new_files)} fichiers\n")
# Rapport par fichier
baseline_reports = {}
new_reports = {}
for f in baseline_files:
baseline_reports[f.name] = analyze_file(f)
for f in new_files:
new_reports[f.name] = analyze_file(f)
# === Métriques globales baseline ===
total_leaks_baseline = sum(r["total_leaks"] for r in baseline_reports.values())
total_fps_baseline = sum(r["total_fps"] for r in baseline_reports.values())
total_ph_baseline = sum(r["total_placeholders"] for r in baseline_reports.values())
empty_baseline = sum(1 for r in baseline_reports.values() if r["empty"])
total_leaks_new = sum(r["total_leaks"] for r in new_reports.values())
total_fps_new = sum(r["total_fps"] for r in new_reports.values())
total_ph_new = sum(r["total_placeholders"] for r in new_reports.values())
empty_new = sum(1 for r in new_reports.values() if r["empty"])
print("--- MÉTRIQUES GLOBALES ---")
print(f"{'Métrique':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
def delta_str(old, new):
d = new - old
if d > 0:
return f"+{d}"
return str(d)
print(f"{'Fuites détectées':<30} {total_leaks_baseline:>10} {total_leaks_new:>10} {delta_str(total_leaks_baseline, total_leaks_new):>10}")
print(f"{'Faux positifs détectés':<30} {total_fps_baseline:>10} {total_fps_new:>10} {delta_str(total_fps_baseline, total_fps_new):>10}")
print(f"{'Total placeholders':<30} {total_ph_baseline:>10} {total_ph_new:>10} {delta_str(total_ph_baseline, total_ph_new):>10}")
print(f"{'Fichiers vides':<30} {empty_baseline:>10} {empty_new:>10} {delta_str(empty_baseline, empty_new):>10}")
# Détail des fuites par type
all_leak_types = set()
for r in list(baseline_reports.values()) + list(new_reports.values()):
all_leak_types.update(r["leaks"].keys())
if all_leak_types:
print("\n--- FUITES PAR TYPE ---")
print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
for lt in sorted(all_leak_types):
old = sum(r["leaks"].get(lt, 0) for r in baseline_reports.values())
new = sum(r["leaks"].get(lt, 0) for r in new_reports.values())
marker = "" if new < old else ("" if new > old else "")
print(f"{lt:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}")
# Détail des FP par type
all_fp_types = set()
for r in list(baseline_reports.values()) + list(new_reports.values()):
all_fp_types.update(r["false_positives"].keys())
if all_fp_types:
print("\n--- FAUX POSITIFS PAR TYPE ---")
print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
for ft in sorted(all_fp_types):
old = sum(r["false_positives"].get(ft, 0) for r in baseline_reports.values())
new = sum(r["false_positives"].get(ft, 0) for r in new_reports.values())
marker = "" if new < old else ("" if new > old else "")
print(f"{ft:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}")
# Fichiers avec régressions
regressions = []
improvements = []
for fname in sorted(set(baseline_reports.keys()) & set(new_reports.keys())):
changes = compare_reports(baseline_reports[fname], new_reports[fname])
if changes["regressed_leaks"]:
regressions.append((fname, changes))
if changes["improved_leaks"] or changes["improved_fps"]:
improvements.append((fname, changes))
if regressions:
print(f"\n⚠ RÉGRESSIONS ({len(regressions)} fichiers):")
for fname, changes in regressions:
for k, old, new in changes["regressed_leaks"]:
print(f" {fname}: {k} {old}{new} (+{new-old})")
if improvements:
print(f"\n✓ AMÉLIORATIONS ({len(improvements)} fichiers):")
for fname, changes in improvements:
for k, old, new in changes["improved_leaks"]:
print(f" {fname}: {k} {old}{new} (-{old-new})")
for k, old, new in changes["improved_fps"]:
print(f" {fname}: FP {k} {old}{new} (-{old-new})")
# Verdict final
print("\n" + "=" * 62)
if total_leaks_new > total_leaks_baseline:
print("❌ RÉGRESSION : plus de fuites qu'avant")
sys.exit(1)
elif total_leaks_new < total_leaks_baseline:
print(f"✅ AMÉLIORATION : {total_leaks_baseline - total_leaks_new} fuites en moins")
else:
print("➡ NEUTRE : même nombre de fuites")
if total_fps_new < total_fps_baseline:
print(f"✅ AMÉLIORATION : {total_fps_baseline - total_fps_new} faux positifs en moins")
elif total_fps_new > total_fps_baseline:
print(f"⚠ ATTENTION : {total_fps_new - total_fps_baseline} faux positifs en plus")
sys.exit(0)
if __name__ == "__main__":
main()

120
run_batch_30_audit.py Normal file
View File

@@ -0,0 +1,120 @@
#!/usr/bin/env python3
"""Batch 30 fichiers aléatoires pour contrôle humain."""
import sys
import time
import json
from pathlib import Path
from collections import Counter
sys.path.insert(0, str(Path(__file__).parent))
import anonymizer_core_refactored_onnx as core
from eds_pseudo_manager import EdsPseudoManager
SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
OUTDIR = SRC / "anonymise_audit_30"
CONFIG = Path("/home/dom/ai/anonymisation/config/dictionnaires.yml")
PDFS = [
SRC / "110_23061319/trackare-07026002-23061319_07026002_23061319.pdf",
SRC / "115_23066188/CRH 23066188.pdf",
SRC / "161_23098838/CRO 23098838.pdf",
SRC / "179_23126805/trackare-23005591-23126805_23005591_23126805.pdf",
SRC / "181_23127286/CRH 23127286.pdf",
SRC / "192_23132490/CRH 23132490.pdf",
SRC / "208_23151988/trackare-23020064-23151988_23020064_23151988.pdf",
SRC / "215_23158603/trackare-22028007-23158603_22028007_23158603.pdf",
SRC / "227_23173599/CRH 23173599.pdf",
SRC / "236_23116794/trackare-BA054633-23116794_BA054633_23116794.pdf",
SRC / "248_23194278/CRH 23194278.pdf",
SRC / "263_23203642/CRO 23203642.pdf",
SRC / "28_23135549/trackare-15021750-23135549_15021750_23135549.pdf",
SRC / "321_23043929/CRH 321_23066387.pdf",
SRC / "379_23098754/trackare-18009635-23098754_18009635_23098754.pdf",
SRC / "39_23167029/trackare-23022121-23167029_23022121_23167029.pdf",
SRC / "444_23141032/trackare-BA102259-23141032_BA102259_23141032.pdf",
SRC / "478_23161697/cro 478_23161697.pdf",
SRC / "50_23219173/trackare-07019278-23219173_07019278_23219173.pdf",
SRC / "520_23177582/trackare-99252128-23177582_99252128_23177582.pdf",
SRC / "556_23220878/trackare-21041742-23220878_21041742_23220878.pdf",
SRC / "602_23070052/trackare-20028293-23070052_20028293_23070052.pdf",
SRC / "604_23070704/trackare-23008170-23070704_23008170_23070704.pdf",
SRC / "655_23163458/trackare-01296746-23163458_01296746_23163458.pdf",
SRC / "684_23207941/CRH 684_23207941.pdf",
SRC / "79_23187785/79_23187785 Dossier.pdf",
SRC / "12_23084754/CRO 23084754.pdf" if (SRC / "12_23084754/CRO 23084754.pdf").exists() else SRC / "122_23070126/LETTRE DE SORTIE 23070126.pdf",
SRC / "122_23070126/LETTRE DE SORTIE 23070126.pdf",
SRC / "131_23079402/CRH 23079402.pdf",
SRC / "290_23025988/cr anesth 290_23025988.pdf",
]
def main():
print("Chargement EDS-Pseudo...", flush=True)
ner = EdsPseudoManager()
ner.load()
assert ner.is_loaded(), "EDS-Pseudo non chargé"
print("EDS-Pseudo chargé.\n", flush=True)
# Vérifier existence des fichiers
existing = [p for p in PDFS if p.exists()]
missing = [p for p in PDFS if not p.exists()]
if missing:
print(f"ATTENTION: {len(missing)} fichiers manquants:")
for p in missing:
print(f" - {p.name}")
print()
print(f"Fichiers à traiter: {len(existing)}/30\n")
OUTDIR.mkdir(exist_ok=True)
ok = ko = skip_encrypted = 0
global_counts = Counter()
t0 = time.time()
for i, pdf in enumerate(existing, 1):
ogc = pdf.parent.name.split("_")[0]
print(f"[{i}/{len(existing)}] {pdf.name} (OGC {ogc})...", end=" ", flush=True)
try:
outputs = core.process_pdf(
pdf_path=pdf,
out_dir=OUTDIR,
make_vector_redaction=False,
also_make_raster_burn=True,
config_path=CONFIG,
use_hf=True,
ner_manager=ner,
ner_thresholds=None,
ogc_label=ogc,
)
audit_path = Path(outputs.get("audit", ""))
if audit_path.exists():
for line in audit_path.read_text().splitlines():
try:
h = json.loads(line)
global_counts[h["kind"]] += 1
except Exception:
pass
print("OK", flush=True)
ok += 1
except Exception as e:
err = str(e)
if "encrypted" in err.lower() or "password" in err.lower():
print(f"SKIP (chiffré)", flush=True)
skip_encrypted += 1
else:
print(f"ERREUR: {e}", flush=True)
ko += 1
elapsed = time.time() - t0
print(f"\n{'='*60}")
print(f"Terminé en {elapsed:.0f}s — OK: {ok}, Chiffrés: {skip_encrypted}, Erreurs: {ko}")
print(f"Total PII détectés: {sum(global_counts.values())}")
print(f"\nDétail par type:")
for k, v in sorted(global_counts.items(), key=lambda x: -x[1]):
print(f" {k:30s} {v:6d}")
print(f"\nSortie: {OUTDIR}")
if __name__ == "__main__":
main()