#!/usr/bin/env python3 """ Évaluation unifiée de la qualité d'anonymisation ================================================= Produit un score reproductible en analysant les sorties d'anonymisation. 5 axes de vérification : 1. LEAK_AUDIT — Noms détectés (audit) encore présents dans le texte 2. LEAK_REGEX — Patterns PII (email, tel, NIR) non masqués 3. LEAK_INSEE — Mots ALL-CAPS qui sont des noms INSEE connus, non masqués 4. FP_DENSITY — Sur-masquage (densité de placeholders) 5. FP_MEDICAL — Termes médicaux masqués à tort Produit un score global 0-100 et un rapport JSON pour suivi dans le temps. Usage: python scripts/evaluate_quality.py # audit_30 python scripts/evaluate_quality.py --dir /chemin/sortie # répertoire custom python scripts/evaluate_quality.py --save # sauvegarder comme baseline python scripts/evaluate_quality.py --compare # comparer avec baseline """ from __future__ import annotations import argparse import json import re import sys import unicodedata from collections import Counter, defaultdict from datetime import datetime from pathlib import Path from typing import Dict, List, Set, Tuple # === Chemins par défaut === PROJECT_DIR = Path(__file__).parent.parent DEFAULT_DIR = Path( "/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)" "/anonymise_audit_30" ) INSEE_NOMS = PROJECT_DIR / "data" / "insee" / "noms_famille_france.txt" INSEE_PRENOMS = PROJECT_DIR / "data" / "insee" / "prenoms_france.txt" BASELINE_PATH = PROJECT_DIR / "evaluation" / "baseline_scores.json" # === Regex PII === RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") RE_TEL = re.compile(r"(? str: """Supprime les accents.""" return "".join( c for c in unicodedata.normalize("NFD", s) if unicodedata.category(c) != "Mn" ) def load_insee_names() -> Tuple[Set[str], Set[str]]: """Charge les noms et prénoms INSEE (normalisés uppercase sans accents).""" noms = set() prenoms = set() if INSEE_NOMS.exists(): for line in INSEE_NOMS.read_text(encoding="utf-8").splitlines(): name = line.strip() if name and len(name) >= 3: noms.add(normalize_nfkd(name).upper()) if INSEE_PRENOMS.exists(): for line in INSEE_PRENOMS.read_text(encoding="utf-8").splitlines(): name = line.strip() if name and len(name) >= 3: prenoms.add(normalize_nfkd(name).upper()) return noms, prenoms def extract_name_tokens(audit_entries: List[dict]) -> Set[str]: """Extrait les tokens de noms individuels depuis les entrées audit NOM. Filtre les titres (Dr, Pr, M., Mme...) et tokens trop courts/génériques. """ tokens = set() for entry in audit_entries: kind = entry.get("kind", "") if "NOM" not in kind and "PRENOM" not in kind: continue original = entry.get("original", "") if not original: continue # Découper le nom complet en tokens individuels for token in re.split(r"[\s\-]+", original): clean = token.strip(".,;:()\"'") if len(clean) < 3: continue if not clean[0].isupper(): continue # Exclure titres et préfixes if clean in TITLE_PREFIXES: continue # Exclure mots génériques if normalize_nfkd(clean).upper() in NAME_IGNORE: continue tokens.add(clean) return tokens def check_leak_audit(text: str, name_tokens: Set[str]) -> List[dict]: """Vérifie si des noms de l'audit sont encore dans le texte. Retourne une entrée par token unique trouvé (avec le nombre d'occurrences). """ leaks = [] # Retirer les placeholders du texte pour ne pas matcher dedans clean_text = RE_PLACEHOLDER.sub("___", text) for token in name_tokens: # Chercher le token comme mot entier (insensible à la casse) pattern = re.compile(r"\b" + re.escape(token) + r"\b", re.IGNORECASE) matches = list(pattern.finditer(clean_text)) if matches: # Premier match pour le contexte m = matches[0] context_start = max(0, m.start() - 30) context_end = min(len(clean_text), m.end() + 30) context = clean_text[context_start:context_end].strip() leaks.append({ "type": "LEAK_AUDIT", "severity": "CRITIQUE", "token": token, "occurrences": len(matches), "context": context, }) return leaks def check_leak_regex(text: str) -> List[dict]: """Cherche des patterns PII non masqués dans le texte.""" leaks = [] clean_text = RE_PLACEHOLDER.sub("___", text) for name, pattern in [ ("EMAIL", RE_EMAIL), ("TEL", RE_TEL), ("NIR", RE_NIR), ("IBAN", RE_IBAN), ]: for m in pattern.finditer(clean_text): # Ignorer si dans un contexte de placeholder before = clean_text[max(0, m.start() - 2):m.start()] if "[" in before or "___" in before: continue leaks.append({ "type": "LEAK_REGEX", "severity": "HAUTE", "pii_type": name, "value": m.group(), }) return leaks def check_leak_insee( text: str, insee_noms: Set[str], insee_prenoms: Set[str], known_tokens: Set[str], ) -> List[dict]: """Cherche des mots ALL-CAPS qui sont des noms INSEE non masqués.""" leaks = [] clean_text = RE_PLACEHOLDER.sub("___", text) seen = set() # Mots ALL-CAPS de 3+ caractères for m in re.finditer(r"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{3,})\b", clean_text): word = m.group(1) if word in seen: continue seen.add(word) # Ignorer mots connus non-noms normalized = normalize_nfkd(word).upper() if normalized in NAME_IGNORE: continue # Vérifier si c'est un nom INSEE ET pas déjà dans les tokens connus is_nom = normalized in insee_noms is_prenom = normalized in insee_prenoms if (is_nom or is_prenom) and word not in known_tokens: # Vérifier le contexte — indicateurs que c'est un vrai nom pos = m.start() before = clean_text[max(0, pos - 40):pos].strip() # Heuristiques de contexte fort (Dr, M., Mme, etc.) strong_ctx = bool(re.search( r"(?:Dr|Pr|M\.|Mme|Mlle|Docteur|Professeur|Monsieur|Madame)\s*$", before, re.I )) context_start = max(0, pos - 30) context_end = min(len(clean_text), m.end() + 30) context = clean_text[context_start:context_end].strip() leaks.append({ "type": "LEAK_INSEE", "severity": "HAUTE" if strong_ctx else "MOYENNE", "word": word, "is_nom": is_nom, "is_prenom": is_prenom, "strong_context": strong_ctx, "context": context, }) return leaks def check_fp_medical(text: str) -> List[dict]: """Détecte les termes médicaux masqués à tort.""" fps = [] for name, pattern in MEDICAL_FP_PATTERNS.items(): for m in pattern.finditer(text): fps.append({ "type": "FP_MEDICAL", "pattern": name, "match": m.group()[:80], }) return fps def check_fp_density(text: str) -> dict: """Calcule la densité de placeholders et détecte le sur-masquage.""" words = text.split() total = len(words) if total == 0: return {"total_words": 0, "placeholders": 0, "density_pct": 0.0, "nom_count": 0, "nom_pct": 0.0, "alert": False} ph_count = sum(1 for w in words if RE_PLACEHOLDER.match(w)) nom_count = text.count("[NOM]") density = ph_count / total * 100 nom_pct = nom_count / total * 100 return { "total_words": total, "placeholders": ph_count, "density_pct": round(density, 2), "nom_count": nom_count, "nom_pct": round(nom_pct, 2), "alert": nom_pct > 8.0, # seuil relevé : CRO/CRH courts listent 8-10 soignants = légitime } def evaluate_file( audit_path: Path, txt_path: Path, insee_noms: Set[str], insee_prenoms: Set[str], ) -> dict: """Évalue un couple audit.jsonl + pseudonymise.txt.""" # Charger les données audit_entries = [] with audit_path.open("r", encoding="utf-8") as f: for line in f: line = line.strip() if line: audit_entries.append(json.loads(line)) text = txt_path.read_text(encoding="utf-8") name_tokens = extract_name_tokens(audit_entries) # Vérifications leak_audit = check_leak_audit(text, name_tokens) leak_regex = check_leak_regex(text) leak_insee = check_leak_insee(text, insee_noms, insee_prenoms, name_tokens) fp_medical = check_fp_medical(text) fp_density = check_fp_density(text) # Comptages audit_kinds = Counter(e.get("kind", "?") for e in audit_entries) return { "file": txt_path.stem.replace(".pseudonymise", ""), "audit_hits": len(audit_entries), "audit_kinds": dict(audit_kinds.most_common(10)), "name_tokens_known": len(name_tokens), "leak_audit": leak_audit, "leak_regex": leak_regex, "leak_insee": leak_insee, "fp_medical": fp_medical, "fp_density": fp_density, "counts": { "leak_audit": len(leak_audit), "leak_regex": len(leak_regex), "leak_insee_high": sum( 1 for l in leak_insee if l["severity"] == "HAUTE" ), "leak_insee_medium": sum( 1 for l in leak_insee if l["severity"] == "MOYENNE" ), "fp_medical": len(fp_medical), "fp_overmasking": 1 if fp_density.get("alert") else 0, }, } def compute_scores(results: List[dict]) -> dict: """Calcule les scores globaux.""" total_name_tokens = sum(r["name_tokens_known"] for r in results) # leak_audit = nombre de tokens UNIQUES qui fuient total_leak_audit = sum(r["counts"]["leak_audit"] for r in results) total_leak_occurrences = sum( sum(l.get("occurrences", 1) for l in r["leak_audit"]) for r in results ) total_leak_regex = sum(r["counts"]["leak_regex"] for r in results) total_leak_insee_high = sum(r["counts"]["leak_insee_high"] for r in results) total_leak_insee_med = sum(r["counts"]["leak_insee_medium"] for r in results) total_fp_medical = sum(r["counts"]["fp_medical"] for r in results) total_fp_overmask = sum(r["counts"]["fp_overmasking"] for r in results) total_audit_hits = sum(r["audit_hits"] for r in results) # Score leak (100 = aucune fuite, 0 = catastrophique) # Proportionnel au nombre total de noms connus if total_name_tokens > 0: # Taux de fuite = noms uniques qui fuient / total noms connus leak_rate = total_leak_audit / total_name_tokens # Pénalité additionnelle pour regex et INSEE (contexte fort) extra_penalty = (total_leak_regex * 2 + total_leak_insee_high * 1) leak_score = max(0, round(100 * (1 - leak_rate) - extra_penalty, 1)) else: leak_score = 100 if total_leak_audit == 0 else 0 # Score FP (100 = aucun faux positif, 0 = sur-masquage massif) fp_penalty = total_fp_medical * 2 + total_fp_overmask * 5 fp_score = max(0, 100 - fp_penalty) # Score global pondéré (leak plus important que FP) global_score = round(leak_score * 0.7 + fp_score * 0.3, 1) return { "global_score": global_score, "leak_score": leak_score, "fp_score": fp_score, "totals": { "documents": len(results), "audit_hits": total_audit_hits, "name_tokens_known": total_name_tokens, "leak_audit": total_leak_audit, "leak_occurrences": total_leak_occurrences, "leak_regex": total_leak_regex, "leak_insee_high": total_leak_insee_high, "leak_insee_medium": total_leak_insee_med, "fp_medical": total_fp_medical, "fp_overmasking": total_fp_overmask, }, } def print_report(scores: dict, results: List[dict]) -> None: """Affiche le rapport console.""" t = scores["totals"] print(f"\n{'='*65}") print(f" ÉVALUATION QUALITÉ ANONYMISATION") print(f" {datetime.now().strftime('%Y-%m-%d %H:%M')}") print(f"{'='*65}") # Score global gs = scores["global_score"] grade = ( "A+" if gs >= 98 else "A" if gs >= 95 else "B" if gs >= 90 else "C" if gs >= 80 else "D" if gs >= 60 else "F" ) print(f"\n SCORE GLOBAL : {gs}/100 [{grade}]") print(f" Leak score : {scores['leak_score']}/100") print(f" FP score : {scores['fp_score']}/100") # Résumé des fuites print(f"\n --- FUITES (FAUX NÉGATIFS) ---") print(f" Documents analysés : {t['documents']}") print(f" Noms connus (audit) : {t['name_tokens_known']}") print(f" Fuites noms audit : {t['leak_audit']} noms uniques" f" ({t.get('leak_occurrences', '?')} occurrences)" f"{' CRITIQUE' if t['leak_audit'] > 0 else ' OK'}") print(f" Fuites regex (PII) : {t['leak_regex']}" f"{' HAUTE' if t['leak_regex'] > 0 else ' OK'}") print(f" Noms INSEE (contexte fort) : {t['leak_insee_high']}" f"{' HAUTE' if t['leak_insee_high'] > 0 else ' OK'}") print(f" Noms INSEE (contexte faible): {t['leak_insee_medium']}") # Résumé FP print(f"\n --- FAUX POSITIFS ---") print(f" Termes médicaux masqués : {t['fp_medical']}") print(f" Alertes sur-masquage : {t['fp_overmasking']}") # Détail des fuites critiques all_leaks = [] for r in results: for leak in r["leak_audit"]: all_leaks.append((r["file"], leak)) for leak in r["leak_regex"]: all_leaks.append((r["file"], leak)) for leak in r["leak_insee"]: if leak["severity"] == "HAUTE": all_leaks.append((r["file"], leak)) if all_leaks: print(f"\n --- DÉTAIL FUITES ({len(all_leaks)}) ---") for fname, leak in all_leaks[:30]: sev = leak.get("severity", "?") if leak["type"] == "LEAK_AUDIT": print(f" [{sev}] {fname}: nom '{leak['token']}' " f"encore présent") print(f" ...{leak['context']}...") elif leak["type"] == "LEAK_REGEX": print(f" [{sev}] {fname}: {leak['pii_type']} " f"'{leak['value']}'") elif leak["type"] == "LEAK_INSEE": src = "nom" if leak["is_nom"] else "prénom" print(f" [{sev}] {fname}: '{leak['word']}' " f"(INSEE {src}, non masqué)") print(f" ...{leak['context']}...") if len(all_leaks) > 30: print(f" ... et {len(all_leaks) - 30} autres") # Détail FP all_fps = [] for r in results: for fp in r["fp_medical"]: all_fps.append((r["file"], fp)) if all_fps: print(f"\n --- DÉTAIL FAUX POSITIFS ({len(all_fps)}) ---") for fname, fp in all_fps[:15]: print(f" {fname}: {fp['pattern']} → '{fp['match'][:60]}'") # Fichiers avec problèmes problem_files = [ r for r in results if r["counts"]["leak_audit"] > 0 or r["counts"]["leak_regex"] > 0 ] if problem_files: print(f"\n --- FICHIERS PROBLÉMATIQUES ({len(problem_files)}) ---") for r in problem_files: c = r["counts"] print(f" {r['file']}: " f"leak_audit={c['leak_audit']} " f"leak_regex={c['leak_regex']}") print(f"\n{'='*65}\n") def save_baseline(scores: dict, results: List[dict], path: Path) -> None: """Sauvegarde les scores comme baseline.""" path.parent.mkdir(parents=True, exist_ok=True) data = { "date": datetime.now().isoformat(), "scores": scores, "per_file": { r["file"]: r["counts"] for r in results }, } path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") print(f"Baseline sauvegardée : {path}") def compare_baseline(scores: dict, baseline_path: Path) -> None: """Compare les scores actuels avec la baseline.""" if not baseline_path.exists(): print("Pas de baseline trouvée. Utilisez --save d'abord.") return baseline = json.loads(baseline_path.read_text(encoding="utf-8")) bs = baseline["scores"] print(f"\n --- COMPARAISON AVEC BASELINE ({baseline['date'][:10]}) ---") print(f" {'Métrique':<30} {'Baseline':>10} {'Actuel':>10} {'Delta':>10}") print(f" {'-'*62}") for key in ["global_score", "leak_score", "fp_score"]: old = bs[key] new = scores[key] delta = new - old marker = " +" if delta > 0 else (" -" if delta < 0 else " ") print(f" {key:<30} {old:>10.1f} {new:>10.1f} {delta:>+10.1f}{marker}") # Comparer les totaux for key in ["leak_audit", "leak_regex", "leak_insee_high", "fp_medical"]: old = bs["totals"].get(key, 0) new = scores["totals"].get(key, 0) delta = new - old better = delta < 0 # moins de fuites/FP = mieux marker = " OK" if better else (" !!" if delta > 0 else "") print(f" {key:<30} {old:>10} {new:>10} {delta:>+10}{marker}") print() def main(): parser = argparse.ArgumentParser( description="Évaluation qualité d'anonymisation" ) parser.add_argument( "--dir", type=Path, default=DEFAULT_DIR, help="Répertoire contenant les fichiers anonymisés" ) parser.add_argument( "--save", action="store_true", help="Sauvegarder les scores comme baseline" ) parser.add_argument( "--compare", action="store_true", help="Comparer avec la baseline sauvegardée" ) parser.add_argument( "--json", type=Path, default=None, help="Exporter le rapport complet en JSON" ) parser.add_argument( "--verbose", "-v", action="store_true", help="Afficher les détails par fichier" ) args = parser.parse_args() output_dir = args.dir if not output_dir.exists(): print(f"Répertoire non trouvé : {output_dir}") sys.exit(1) # Trouver les paires audit + texte audit_files = sorted(output_dir.glob("*.audit.jsonl")) if not audit_files: print(f"Aucun .audit.jsonl trouvé dans {output_dir}") sys.exit(1) pairs = [] for af in audit_files: stem = af.name.replace(".audit.jsonl", "") txt = af.parent / f"{stem}.pseudonymise.txt" if txt.exists(): pairs.append((af, txt)) print(f"Chargement gazetteers INSEE...", end=" ", flush=True) insee_noms, insee_prenoms = load_insee_names() print(f"{len(insee_noms)} noms, {len(insee_prenoms)} prénoms") print(f"Analyse de {len(pairs)} documents...\n", flush=True) # Évaluer chaque fichier results = [] for af, txt in pairs: result = evaluate_file(af, txt, insee_noms, insee_prenoms) results.append(result) if args.verbose: c = result["counts"] status = "OK" if sum(c.values()) == 0 else "!!" print(f" [{status}] {result['file']}: " f"leak_a={c['leak_audit']} " f"leak_r={c['leak_regex']} " f"leak_i={c['leak_insee_high']}+{c['leak_insee_medium']} " f"fp_m={c['fp_medical']} " f"fp_o={c['fp_overmasking']}") # Scores globaux scores = compute_scores(results) # Rapport console print_report(scores, results) # Comparaison baseline if args.compare: compare_baseline(scores, BASELINE_PATH) # Sauvegarde baseline if args.save: save_baseline(scores, results, BASELINE_PATH) # Export JSON if args.json: report = { "date": datetime.now().isoformat(), "directory": str(output_dir), "scores": scores, "results": results, } args.json.write_text( json.dumps(report, indent=2, ensure_ascii=False), encoding="utf-8", ) print(f"Rapport JSON : {args.json}") # Exit code if scores["totals"]["leak_audit"] > 0: sys.exit(1) sys.exit(0) if __name__ == "__main__": main()