fix: Corrections qualité Phase 1 — 261 fuites en moins, 0 régression

Audit sur 30 fichiers aléatoires (OGC 12-690) révélant un overfitting
sur les 59 premiers OGC. Corrections appliquées avec test de non-régression
à chaque étape :

- NDA pieds de page Trackare : regex Episode N. (227→0 fuites)
- ONDANSETRON : word boundary \b sur RE_NUMERO_DOSSIER (32→0)
- RPPS isolés : détection 11 chiffres dans docs Trackare (3→0)
- Stop words : retrait noms réels (ute, dogue, cambo, bains), ajout
  termes médicaux (AINS, ponction, hanche, burkitt, ORL, GDS, OAP...)
- Pattern DR. Prénom NOM : capture prénoms médecins (Ute ×19, Tam...)
- force_names : contextes structurés (DR., Signé, Note d'évolution)
  bypassent les stop words pour masquer les vrais noms de soignants
- Phase 2b : PiiHit trackare (EPISODE, RPPS) appliqués au texte .txt
- Framework de non-régression (regression_tests/) + batch audit 30 fichiers

Résultat : 322→61 fuites détectées, 113→109 faux positifs, 0 régression.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-06 17:32:28 +01:00
parent f9532d5543
commit bc2fe667a0
63 changed files with 30187 additions and 24 deletions

View File

@@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""Test de non-régression : compare baseline vs nouvelle sortie.
Usage:
python regression_tests/check_regression.py [--rerun]
Sans --rerun : compare baseline/ vs current output (anonymise_audit_30/)
Avec --rerun : relance l'anonymisation puis compare
"""
import json
import re
import sys
from collections import Counter
from pathlib import Path
BASELINE_DIR = Path(__file__).parent / "baseline"
OUTPUT_DIR = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)/anonymise_audit_30")
# === Patterns de fuites connues ===
LEAK_CHECKS = {
"NDA_footer": re.compile(r"Episode\s+N[o°.]?\s*\.?\s*:\s*(\d{5,})"),
"ONDANSETRON_broken": re.compile(r"O\[DOSSIER\]"),
"RPPS_raw": re.compile(r"\b[12]\d{10}\b"), # 11 chiffres commençant par 1 ou 2
"bracket_double": re.compile(r"\[\["),
"www_hospital": re.compile(r"www\.ch-cote-basque"),
"FINESS_raw": re.compile(r"\b640000162\b"),
}
# === Termes médicaux qui NE doivent PAS être masqués ===
FALSE_POSITIVE_CHECKS = {
"AINS_masked": re.compile(r"\[NOM\].*(?:céphalée|paracétamol)|paracétamol.*\[NOM\]", re.I),
"ponction_masked": re.compile(r"\[NOM\]\s+lombaire", re.I),
"hanche_masked": re.compile(r"(?:de\s+la|de)\s+\[NOM\].*(?:profil|opérée|fémorale)", re.I),
"ORL_masked": re.compile(r"IRM\s+\[NOM\]", re.I),
"burkitt_masked": re.compile(r"\[NOM\]\s*\.\s*(?:stade|type|lymphome)?", re.I),
}
PLACEHOLDER_RE = re.compile(r"\[(NOM|TEL|EMAIL|NIR|IPP|DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|ADRESSE|CODE_POSTAL|VILLE|MASK|FINESS|OGC|AGE|ETABLISSEMENT|IBAN)\]")
def analyze_file(txt_path: Path) -> dict:
"""Analyse un fichier pseudonymisé et retourne les métriques."""
text = txt_path.read_text(encoding="utf-8", errors="replace")
lines = text.splitlines()
result = {
"file": txt_path.name,
"lines": len(lines),
"chars": len(text),
"empty": len(text.strip()) == 0,
}
# Comptage des placeholders
ph_counts = Counter()
for m in PLACEHOLDER_RE.finditer(text):
ph_counts[m.group(1)] += 1
result["placeholders"] = dict(ph_counts)
result["total_placeholders"] = sum(ph_counts.values())
# Détection de fuites
leaks = {}
for name, pattern in LEAK_CHECKS.items():
matches = pattern.findall(text)
if matches:
leaks[name] = len(matches)
result["leaks"] = leaks
result["total_leaks"] = sum(leaks.values())
# Détection de faux positifs
fps = {}
for name, pattern in FALSE_POSITIVE_CHECKS.items():
matches = pattern.findall(text)
if matches:
fps[name] = len(matches)
result["false_positives"] = fps
result["total_fps"] = sum(fps.values())
return result
def compare_reports(baseline_report: dict, new_report: dict) -> dict:
"""Compare deux rapports et identifie régressions/améliorations."""
changes = {
"improved_leaks": [],
"regressed_leaks": [],
"improved_fps": [],
"regressed_fps": [],
"placeholder_delta": {},
}
# Comparer les fuites
all_leak_keys = set(baseline_report["leaks"].keys()) | set(new_report["leaks"].keys())
for k in all_leak_keys:
old = baseline_report["leaks"].get(k, 0)
new = new_report["leaks"].get(k, 0)
if new < old:
changes["improved_leaks"].append((k, old, new))
elif new > old:
changes["regressed_leaks"].append((k, old, new))
# Comparer les FP
all_fp_keys = set(baseline_report["false_positives"].keys()) | set(new_report["false_positives"].keys())
for k in all_fp_keys:
old = baseline_report["false_positives"].get(k, 0)
new = new_report["false_positives"].get(k, 0)
if new < old:
changes["improved_fps"].append((k, old, new))
elif new > old:
changes["regressed_fps"].append((k, old, new))
# Comparer les placeholders
all_ph = set(baseline_report["placeholders"].keys()) | set(new_report["placeholders"].keys())
for k in all_ph:
old = baseline_report["placeholders"].get(k, 0)
new = new_report["placeholders"].get(k, 0)
if old != new:
changes["placeholder_delta"][k] = new - old
return changes
def main():
rerun = "--rerun" in sys.argv
if rerun:
print("=== Relance de l'anonymisation des 30 fichiers ===\n")
import subprocess
result = subprocess.run(
[sys.executable, "run_batch_30_audit.py"],
cwd=str(Path(__file__).parent.parent),
capture_output=False,
)
if result.returncode != 0:
print("ERREUR: batch échoué")
sys.exit(1)
print()
# Analyser la baseline
baseline_files = sorted(BASELINE_DIR.glob("*.pseudonymise.txt"))
new_files = sorted(OUTPUT_DIR.glob("*.pseudonymise.txt"))
if not baseline_files:
print("ERREUR: pas de fichiers baseline trouvés")
sys.exit(1)
print(f"=== RAPPORT DE NON-RÉGRESSION ===")
print(f"Baseline: {len(baseline_files)} fichiers")
print(f"Nouveau: {len(new_files)} fichiers\n")
# Rapport par fichier
baseline_reports = {}
new_reports = {}
for f in baseline_files:
baseline_reports[f.name] = analyze_file(f)
for f in new_files:
new_reports[f.name] = analyze_file(f)
# === Métriques globales baseline ===
total_leaks_baseline = sum(r["total_leaks"] for r in baseline_reports.values())
total_fps_baseline = sum(r["total_fps"] for r in baseline_reports.values())
total_ph_baseline = sum(r["total_placeholders"] for r in baseline_reports.values())
empty_baseline = sum(1 for r in baseline_reports.values() if r["empty"])
total_leaks_new = sum(r["total_leaks"] for r in new_reports.values())
total_fps_new = sum(r["total_fps"] for r in new_reports.values())
total_ph_new = sum(r["total_placeholders"] for r in new_reports.values())
empty_new = sum(1 for r in new_reports.values() if r["empty"])
print("--- MÉTRIQUES GLOBALES ---")
print(f"{'Métrique':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
def delta_str(old, new):
d = new - old
if d > 0:
return f"+{d}"
return str(d)
print(f"{'Fuites détectées':<30} {total_leaks_baseline:>10} {total_leaks_new:>10} {delta_str(total_leaks_baseline, total_leaks_new):>10}")
print(f"{'Faux positifs détectés':<30} {total_fps_baseline:>10} {total_fps_new:>10} {delta_str(total_fps_baseline, total_fps_new):>10}")
print(f"{'Total placeholders':<30} {total_ph_baseline:>10} {total_ph_new:>10} {delta_str(total_ph_baseline, total_ph_new):>10}")
print(f"{'Fichiers vides':<30} {empty_baseline:>10} {empty_new:>10} {delta_str(empty_baseline, empty_new):>10}")
# Détail des fuites par type
all_leak_types = set()
for r in list(baseline_reports.values()) + list(new_reports.values()):
all_leak_types.update(r["leaks"].keys())
if all_leak_types:
print("\n--- FUITES PAR TYPE ---")
print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
for lt in sorted(all_leak_types):
old = sum(r["leaks"].get(lt, 0) for r in baseline_reports.values())
new = sum(r["leaks"].get(lt, 0) for r in new_reports.values())
marker = "" if new < old else ("" if new > old else "")
print(f"{lt:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}")
# Détail des FP par type
all_fp_types = set()
for r in list(baseline_reports.values()) + list(new_reports.values()):
all_fp_types.update(r["false_positives"].keys())
if all_fp_types:
print("\n--- FAUX POSITIFS PAR TYPE ---")
print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}")
print("-" * 62)
for ft in sorted(all_fp_types):
old = sum(r["false_positives"].get(ft, 0) for r in baseline_reports.values())
new = sum(r["false_positives"].get(ft, 0) for r in new_reports.values())
marker = "" if new < old else ("" if new > old else "")
print(f"{ft:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}")
# Fichiers avec régressions
regressions = []
improvements = []
for fname in sorted(set(baseline_reports.keys()) & set(new_reports.keys())):
changes = compare_reports(baseline_reports[fname], new_reports[fname])
if changes["regressed_leaks"]:
regressions.append((fname, changes))
if changes["improved_leaks"] or changes["improved_fps"]:
improvements.append((fname, changes))
if regressions:
print(f"\n⚠ RÉGRESSIONS ({len(regressions)} fichiers):")
for fname, changes in regressions:
for k, old, new in changes["regressed_leaks"]:
print(f" {fname}: {k} {old}{new} (+{new-old})")
if improvements:
print(f"\n✓ AMÉLIORATIONS ({len(improvements)} fichiers):")
for fname, changes in improvements:
for k, old, new in changes["improved_leaks"]:
print(f" {fname}: {k} {old}{new} (-{old-new})")
for k, old, new in changes["improved_fps"]:
print(f" {fname}: FP {k} {old}{new} (-{old-new})")
# Verdict final
print("\n" + "=" * 62)
if total_leaks_new > total_leaks_baseline:
print("❌ RÉGRESSION : plus de fuites qu'avant")
sys.exit(1)
elif total_leaks_new < total_leaks_baseline:
print(f"✅ AMÉLIORATION : {total_leaks_baseline - total_leaks_new} fuites en moins")
else:
print("➡ NEUTRE : même nombre de fuites")
if total_fps_new < total_fps_baseline:
print(f"✅ AMÉLIORATION : {total_fps_baseline - total_fps_new} faux positifs en moins")
elif total_fps_new > total_fps_baseline:
print(f"⚠ ATTENTION : {total_fps_new - total_fps_baseline} faux positifs en plus")
sys.exit(0)
if __name__ == "__main__":
main()