#!/usr/bin/env python3 """Test de non-régression : compare baseline vs nouvelle sortie. Usage: python regression_tests/check_regression.py [--rerun] Sans --rerun : compare baseline/ vs current output (anonymise_audit_30/) Avec --rerun : relance l'anonymisation puis compare """ import json import re import sys from collections import Counter from pathlib import Path BASELINE_DIR = Path(__file__).parent / "baseline" OUTPUT_DIR = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)/anonymise_audit_30") # === Patterns de fuites connues === LEAK_CHECKS = { "NDA_footer": re.compile(r"Episode\s+N[o°.]?\s*\.?\s*:\s*(\d{5,})"), "ONDANSETRON_broken": re.compile(r"O\[DOSSIER\]"), "RPPS_raw": re.compile(r"\b[12]\d{10}\b"), # 11 chiffres commençant par 1 ou 2 "bracket_double": re.compile(r"\[\["), "www_hospital": re.compile(r"www\.ch-cote-basque"), "FINESS_raw": re.compile(r"\b640000162\b"), } # === Termes médicaux qui NE doivent PAS être masqués === FALSE_POSITIVE_CHECKS = { "AINS_masked": re.compile(r"\[NOM\].*(?:céphalée|paracétamol)|paracétamol.*\[NOM\]", re.I), "ponction_masked": re.compile(r"\[NOM\]\s+lombaire", re.I), "hanche_masked": re.compile(r"(?:de\s+la|de)\s+\[NOM\].*(?:profil|opérée|fémorale)", re.I), "ORL_masked": re.compile(r"IRM\s+\[NOM\]", re.I), "burkitt_masked": re.compile(r"\[NOM\]\s*\.\s*(?:stade|type|lymphome)?", re.I), } PLACEHOLDER_RE = re.compile(r"\[(NOM|TEL|EMAIL|NIR|IPP|DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|ADRESSE|CODE_POSTAL|VILLE|MASK|FINESS|OGC|AGE|ETABLISSEMENT|IBAN)\]") def analyze_file(txt_path: Path) -> dict: """Analyse un fichier pseudonymisé et retourne les métriques.""" text = txt_path.read_text(encoding="utf-8", errors="replace") lines = text.splitlines() result = { "file": txt_path.name, "lines": len(lines), "chars": len(text), "empty": len(text.strip()) == 0, } # Comptage des placeholders ph_counts = Counter() for m in PLACEHOLDER_RE.finditer(text): ph_counts[m.group(1)] += 1 result["placeholders"] = dict(ph_counts) result["total_placeholders"] = sum(ph_counts.values()) # Détection de fuites leaks = {} for name, pattern in LEAK_CHECKS.items(): matches = pattern.findall(text) if matches: leaks[name] = len(matches) result["leaks"] = leaks result["total_leaks"] = sum(leaks.values()) # Détection de faux positifs fps = {} for name, pattern in FALSE_POSITIVE_CHECKS.items(): matches = pattern.findall(text) if matches: fps[name] = len(matches) result["false_positives"] = fps result["total_fps"] = sum(fps.values()) return result def compare_reports(baseline_report: dict, new_report: dict) -> dict: """Compare deux rapports et identifie régressions/améliorations.""" changes = { "improved_leaks": [], "regressed_leaks": [], "improved_fps": [], "regressed_fps": [], "placeholder_delta": {}, } # Comparer les fuites all_leak_keys = set(baseline_report["leaks"].keys()) | set(new_report["leaks"].keys()) for k in all_leak_keys: old = baseline_report["leaks"].get(k, 0) new = new_report["leaks"].get(k, 0) if new < old: changes["improved_leaks"].append((k, old, new)) elif new > old: changes["regressed_leaks"].append((k, old, new)) # Comparer les FP all_fp_keys = set(baseline_report["false_positives"].keys()) | set(new_report["false_positives"].keys()) for k in all_fp_keys: old = baseline_report["false_positives"].get(k, 0) new = new_report["false_positives"].get(k, 0) if new < old: changes["improved_fps"].append((k, old, new)) elif new > old: changes["regressed_fps"].append((k, old, new)) # Comparer les placeholders all_ph = set(baseline_report["placeholders"].keys()) | set(new_report["placeholders"].keys()) for k in all_ph: old = baseline_report["placeholders"].get(k, 0) new = new_report["placeholders"].get(k, 0) if old != new: changes["placeholder_delta"][k] = new - old return changes def main(): rerun = "--rerun" in sys.argv if rerun: print("=== Relance de l'anonymisation des 30 fichiers ===\n") import subprocess result = subprocess.run( [sys.executable, "run_batch_30_audit.py"], cwd=str(Path(__file__).parent.parent), capture_output=False, ) if result.returncode != 0: print("ERREUR: batch échoué") sys.exit(1) print() # Analyser la baseline baseline_files = sorted(BASELINE_DIR.glob("*.pseudonymise.txt")) new_files = sorted(OUTPUT_DIR.glob("*.pseudonymise.txt")) if not baseline_files: print("ERREUR: pas de fichiers baseline trouvés") sys.exit(1) print(f"=== RAPPORT DE NON-RÉGRESSION ===") print(f"Baseline: {len(baseline_files)} fichiers") print(f"Nouveau: {len(new_files)} fichiers\n") # Rapport par fichier baseline_reports = {} new_reports = {} for f in baseline_files: baseline_reports[f.name] = analyze_file(f) for f in new_files: new_reports[f.name] = analyze_file(f) # === Métriques globales baseline === total_leaks_baseline = sum(r["total_leaks"] for r in baseline_reports.values()) total_fps_baseline = sum(r["total_fps"] for r in baseline_reports.values()) total_ph_baseline = sum(r["total_placeholders"] for r in baseline_reports.values()) empty_baseline = sum(1 for r in baseline_reports.values() if r["empty"]) total_leaks_new = sum(r["total_leaks"] for r in new_reports.values()) total_fps_new = sum(r["total_fps"] for r in new_reports.values()) total_ph_new = sum(r["total_placeholders"] for r in new_reports.values()) empty_new = sum(1 for r in new_reports.values() if r["empty"]) print("--- MÉTRIQUES GLOBALES ---") print(f"{'Métrique':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") print("-" * 62) def delta_str(old, new): d = new - old if d > 0: return f"+{d}" return str(d) print(f"{'Fuites détectées':<30} {total_leaks_baseline:>10} {total_leaks_new:>10} {delta_str(total_leaks_baseline, total_leaks_new):>10}") print(f"{'Faux positifs détectés':<30} {total_fps_baseline:>10} {total_fps_new:>10} {delta_str(total_fps_baseline, total_fps_new):>10}") print(f"{'Total placeholders':<30} {total_ph_baseline:>10} {total_ph_new:>10} {delta_str(total_ph_baseline, total_ph_new):>10}") print(f"{'Fichiers vides':<30} {empty_baseline:>10} {empty_new:>10} {delta_str(empty_baseline, empty_new):>10}") # Détail des fuites par type all_leak_types = set() for r in list(baseline_reports.values()) + list(new_reports.values()): all_leak_types.update(r["leaks"].keys()) if all_leak_types: print("\n--- FUITES PAR TYPE ---") print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") print("-" * 62) for lt in sorted(all_leak_types): old = sum(r["leaks"].get(lt, 0) for r in baseline_reports.values()) new = sum(r["leaks"].get(lt, 0) for r in new_reports.values()) marker = " ✓" if new < old else (" ✗" if new > old else "") print(f"{lt:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}") # Détail des FP par type all_fp_types = set() for r in list(baseline_reports.values()) + list(new_reports.values()): all_fp_types.update(r["false_positives"].keys()) if all_fp_types: print("\n--- FAUX POSITIFS PAR TYPE ---") print(f"{'Type':<30} {'Baseline':>10} {'Nouveau':>10} {'Delta':>10}") print("-" * 62) for ft in sorted(all_fp_types): old = sum(r["false_positives"].get(ft, 0) for r in baseline_reports.values()) new = sum(r["false_positives"].get(ft, 0) for r in new_reports.values()) marker = " ✓" if new < old else (" ✗" if new > old else "") print(f"{ft:<30} {old:>10} {new:>10} {delta_str(old, new):>10}{marker}") # Fichiers avec régressions regressions = [] improvements = [] for fname in sorted(set(baseline_reports.keys()) & set(new_reports.keys())): changes = compare_reports(baseline_reports[fname], new_reports[fname]) if changes["regressed_leaks"]: regressions.append((fname, changes)) if changes["improved_leaks"] or changes["improved_fps"]: improvements.append((fname, changes)) if regressions: print(f"\n⚠ RÉGRESSIONS ({len(regressions)} fichiers):") for fname, changes in regressions: for k, old, new in changes["regressed_leaks"]: print(f" {fname}: {k} {old}→{new} (+{new-old})") if improvements: print(f"\n✓ AMÉLIORATIONS ({len(improvements)} fichiers):") for fname, changes in improvements: for k, old, new in changes["improved_leaks"]: print(f" {fname}: {k} {old}→{new} (-{old-new})") for k, old, new in changes["improved_fps"]: print(f" {fname}: FP {k} {old}→{new} (-{old-new})") # Verdict final print("\n" + "=" * 62) if total_leaks_new > total_leaks_baseline: print("❌ RÉGRESSION : plus de fuites qu'avant") sys.exit(1) elif total_leaks_new < total_leaks_baseline: print(f"✅ AMÉLIORATION : {total_leaks_baseline - total_leaks_new} fuites en moins") else: print("➡ NEUTRE : même nombre de fuites") if total_fps_new < total_fps_baseline: print(f"✅ AMÉLIORATION : {total_fps_baseline - total_fps_new} faux positifs en moins") elif total_fps_new > total_fps_baseline: print(f"⚠ ATTENTION : {total_fps_new - total_fps_baseline} faux positifs en plus") sys.exit(0) if __name__ == "__main__": main()