#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ QC Audit — Contrôle qualité post-anonymisation ----------------------------------------------- Analyse les fichiers .audit.jsonl et .pseudonymise.txt pour détecter : - Faux négatifs résiduels (EMAIL, TEL, NIR non masqués) - Sur-masquage (densité de placeholders trop élevée) - Faux positifs candidats (NOM_EXTRACTED qui sont des stop words ou < 3 chars) - Faux négatifs candidats (mots ALL-CAPS >= 5 chars non masqués, non médicaux) - Stats audit (comptage par kind, top 10) Usage : python3 qc_audit.py path/to/*.audit.jsonl python3 qc_audit.py --batch-dir path/anonymise/ """ from __future__ import annotations import argparse import csv import json import re import sys from pathlib import Path from collections import Counter from typing import List, Dict, Any, Optional # Regex PII critiques (mêmes patterns que le core) RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") RE_TEL = re.compile(r"(? List[Dict[str, Any]]: """Charge un fichier .audit.jsonl.""" entries = [] with audit_path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if line: entries.append(json.loads(line)) return entries def load_text(txt_path: Path) -> str: """Charge un fichier .pseudonymise.txt.""" return txt_path.read_text(encoding="utf-8") def scan_residual_pii(text: str) -> Dict[str, List[str]]: """Détecte les PII résiduels (faux négatifs) dans le texte pseudonymisé.""" residuals: Dict[str, List[str]] = {} for m in RE_EMAIL.finditer(text): # Ignorer les emails dans les placeholders if "[" not in text[max(0, m.start() - 1):m.start()]: residuals.setdefault("EMAIL", []).append(m.group()) for m in RE_TEL.finditer(text): if "[" not in text[max(0, m.start() - 1):m.start()]: residuals.setdefault("TEL", []).append(m.group()) for m in RE_NIR.finditer(text): if "[" not in text[max(0, m.start() - 1):m.start()]: residuals.setdefault("NIR", []).append(m.group()) return residuals def placeholder_density(text: str) -> Dict[str, Any]: """Calcule la densité de placeholders. Alerte si [NOM] > 5% des mots.""" words = text.split() total_words = len(words) if total_words == 0: return {"total_words": 0, "placeholders": 0, "density_pct": 0.0, "alert": False} placeholder_count = sum(1 for w in words if RE_PLACEHOLDER.match(w)) nom_count = text.count("[NOM]") density = placeholder_count / total_words * 100 nom_density = nom_count / total_words * 100 return { "total_words": total_words, "placeholders": placeholder_count, "density_pct": round(density, 2), "nom_count": nom_count, "nom_density_pct": round(nom_density, 2), "alert_overmasking": nom_density > 5.0, } def audit_stats(entries: List[Dict[str, Any]]) -> Dict[str, int]: """Comptage par kind (top 10).""" counter = Counter(e.get("kind", "UNKNOWN") for e in entries) return dict(counter.most_common(10)) def fp_candidates(entries: List[Dict[str, Any]]) -> List[Dict[str, str]]: """Faux positifs candidats : NOM_EXTRACTED qui sont des stop words ou < 3 chars.""" candidates = [] for e in entries: kind = e.get("kind", "") original = e.get("original", "") if kind != "NOM_EXTRACTED": continue is_fp = False reason = "" if len(original) < 3: is_fp = True reason = "trop court (< 3 chars)" elif _MEDICAL_STOP_WORDS_SET and original.lower() in _MEDICAL_STOP_WORDS_SET: is_fp = True reason = "stop word médical" if is_fp: candidates.append({"kind": kind, "original": original, "reason": reason}) return candidates def fn_candidates(text: str) -> List[str]: """Faux négatifs candidats : mots ALL-CAPS >= 5 chars non masqués, non médicaux.""" candidates = [] seen = set() for m in re.finditer(r"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{5,})\b", text): word = m.group(1) if word in seen: continue seen.add(word) # Ignorer les placeholders et mots dans les balises if "[" in text[max(0, m.start() - 1):m.start()]: continue if "]" in text[m.end():min(len(text), m.end() + 1)]: continue # Ignorer les mots médicaux connus if _MEDICAL_STOP_WORDS_SET and word.lower() in _MEDICAL_STOP_WORDS_SET: continue # Ignorer les abréviations médicales très courantes if word in {"TABLES", "FINESS", "EMAIL", "ADRESSE", "IBAN", "EPISODE", "ETABLISSEMENT", "DATE", "NAISSANCE", "POSTAL", "MASK", "DOSSIER", "RPPS", "GLOBAL", "EXTRACTED", "TRACKARE"}: continue candidates.append(word) return candidates def analyze_file(audit_path: Path, txt_path: Optional[Path] = None) -> Dict[str, Any]: """Analyse complète d'un couple audit.jsonl + pseudonymise.txt.""" result: Dict[str, Any] = {"file": str(audit_path)} # Charger l'audit entries = load_audit(audit_path) result["total_hits"] = len(entries) result["stats"] = audit_stats(entries) result["fp_candidates"] = fp_candidates(entries) # Charger le texte si disponible if txt_path is None: # Déduire le chemin du .pseudonymise.txt stem = audit_path.name.replace(".audit.jsonl", "") txt_path = audit_path.parent / f"{stem}.pseudonymise.txt" if txt_path.exists(): text = load_text(txt_path) result["residual_pii"] = scan_residual_pii(text) result["density"] = placeholder_density(text) result["fn_candidates"] = fn_candidates(text) else: result["residual_pii"] = {} result["density"] = {} result["fn_candidates"] = [] return result def print_report(analysis: Dict[str, Any]) -> None: """Affiche un rapport lisible pour un fichier.""" print(f"\n{'='*70}") print(f" QC Audit : {analysis['file']}") print(f"{'='*70}") print(f"\n Total hits audit : {analysis['total_hits']}") # Stats par kind print("\n Top 10 kinds :") for kind, count in analysis.get("stats", {}).items(): print(f" {kind:30s} : {count}") # Densité density = analysis.get("density", {}) if density: print(f"\n Densité placeholders : {density.get('density_pct', 0)}% " f"({density.get('placeholders', 0)}/{density.get('total_words', 0)} mots)") print(f" [NOM] : {density.get('nom_count', 0)} occurrences " f"({density.get('nom_density_pct', 0)}%)") if density.get("alert_overmasking"): print(" *** ALERTE : sur-masquage possible ([NOM] > 5% des mots) ***") # PII résiduels residuals = analysis.get("residual_pii", {}) if residuals: print("\n PII résiduels (faux négatifs) :") for pii_type, values in residuals.items(): print(f" {pii_type} : {len(values)} trouvé(s)") for v in values[:3]: print(f" - {v}") else: print("\n PII résiduels : aucun détecté") # FP candidats fps = analysis.get("fp_candidates", []) if fps: print(f"\n FP candidats ({len(fps)}) :") for fp in fps[:10]: print(f" - {fp['original']:20s} ({fp['reason']})") # FN candidats fns = analysis.get("fn_candidates", []) if fns: print(f"\n FN candidats ({len(fns)} mots ALL-CAPS non masqués) :") for fn in fns[:15]: print(f" - {fn}") print() def batch_report(results: List[Dict[str, Any]], csv_path: Optional[Path] = None) -> None: """Rapport batch résumé. Optionnel : export CSV.""" print(f"\n{'='*70}") print(f" RAPPORT BATCH — {len(results)} fichier(s)") print(f"{'='*70}") total_hits = sum(r.get("total_hits", 0) for r in results) total_residuals = sum( sum(len(v) for v in r.get("residual_pii", {}).values()) for r in results ) total_fps = sum(len(r.get("fp_candidates", [])) for r in results) total_fns = sum(len(r.get("fn_candidates", [])) for r in results) alerts = [r["file"] for r in results if r.get("density", {}).get("alert_overmasking")] print(f"\n Total hits audit : {total_hits}") print(f" PII résiduels (FN) : {total_residuals}") print(f" FP candidats : {total_fps}") print(f" FN candidats (ALL-CAPS) : {total_fns}") print(f" Alertes sur-masquage : {len(alerts)}") if alerts: for a in alerts: print(f" - {a}") if csv_path: with csv_path.open("w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow([ "fichier", "total_hits", "residual_pii", "density_pct", "nom_density_pct", "alert_overmasking", "fp_count", "fn_count", ]) for r in results: d = r.get("density", {}) writer.writerow([ Path(r["file"]).name, r.get("total_hits", 0), sum(len(v) for v in r.get("residual_pii", {}).values()), d.get("density_pct", ""), d.get("nom_density_pct", ""), d.get("alert_overmasking", ""), len(r.get("fp_candidates", [])), len(r.get("fn_candidates", [])), ]) print(f"\n Rapport CSV : {csv_path}") print() def main(): parser = argparse.ArgumentParser(description="QC Audit post-anonymisation") parser.add_argument("files", nargs="*", help="Fichiers .audit.jsonl à analyser") parser.add_argument("--batch-dir", type=str, help="Répertoire contenant les fichiers anonymisés") parser.add_argument("--csv", type=str, help="Chemin du rapport CSV résumé (mode batch)") args = parser.parse_args() audit_files: List[Path] = [] if args.batch_dir: batch_dir = Path(args.batch_dir) audit_files = sorted(batch_dir.glob("**/*.audit.jsonl")) elif args.files: audit_files = [Path(f) for f in args.files] else: parser.print_help() sys.exit(1) if not audit_files: print("Aucun fichier .audit.jsonl trouvé.") sys.exit(1) results = [] for af in audit_files: analysis = analyze_file(af) results.append(analysis) print_report(analysis) if len(results) > 1: csv_path = Path(args.csv) if args.csv else None batch_report(results, csv_path) if __name__ == "__main__": main()