Intégration du modèle CamemBERT-bio-deid v3 (F1=0.96, Recall=0.97, 1112 docs)
et corrections qualité issues de l'audit approfondi sur 29 fichiers.
Détection des villes en texte libre :
- Automate Aho-Corasick sur 33K communes INSEE + 11.6K villes FINESS
- Stratégie contextuelle : exige un contexte géographique (à, de, vers,
habite, urgences de, etc.) sauf pour les villes composées (Saint-Palais)
- Blacklist de ~80 communes homonymes de mots courants (charge, signes, plan...)
- Normalisation SAINT↔ST pour les variantes orthographiques
- De 18 fuites de villes à 2 cas résiduels atypiques
Masquage des initiales de prénom :
- Post-traitement regex : "Dr T. [NOM]" → "Dr [NOM] [NOM]"
- Références initiales : "Ref : JF/VA" → "Ref : [NOM]/[NOM]"
Détection texte espacé d'en-tête :
- "C E N T R E H O S P I T A L I E R" → [ETABLISSEMENT]
Autres corrections :
- Fix regex RE_EXTRACT_MME_MR (Mr?.? → Mr.?, \s+ → [ \t]+, * → {0,4})
- Stop words médicaux : lever, coucher, services hospitaliers (viscérale, etc.)
- CamemBERT NER manager : version tracking, propriété version, log F1/Recall
- Script finetune : export ONNX automatique + mise à jour VERSION.json
- Évaluateur qualité : exclusion stop words médicaux des alertes INSEE
Documentation :
- Spécifications techniques CamemBERT-bio-deid v3
- Conformité RGPD + AI Act (caviardage PDF raster)
- AIPD (Analyse d'Impact Protection des Données)
Score qualité : 97.0/100 (Grade A), Leak score 100/100
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
655 lines
23 KiB
Python
655 lines
23 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Évaluation unifiée de la qualité d'anonymisation
|
|
=================================================
|
|
Produit un score reproductible en analysant les sorties d'anonymisation.
|
|
|
|
5 axes de vérification :
|
|
1. LEAK_AUDIT — Noms détectés (audit) encore présents dans le texte
|
|
2. LEAK_REGEX — Patterns PII (email, tel, NIR) non masqués
|
|
3. LEAK_INSEE — Mots ALL-CAPS qui sont des noms INSEE connus, non masqués
|
|
4. FP_DENSITY — Sur-masquage (densité de placeholders)
|
|
5. FP_MEDICAL — Termes médicaux masqués à tort
|
|
|
|
Produit un score global 0-100 et un rapport JSON pour suivi dans le temps.
|
|
|
|
Usage:
|
|
python scripts/evaluate_quality.py # audit_30
|
|
python scripts/evaluate_quality.py --dir /chemin/sortie # répertoire custom
|
|
python scripts/evaluate_quality.py --save # sauvegarder comme baseline
|
|
python scripts/evaluate_quality.py --compare # comparer avec baseline
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
import unicodedata
|
|
from collections import Counter, defaultdict
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Set, Tuple
|
|
|
|
# === Chemins par défaut ===
|
|
PROJECT_DIR = Path(__file__).parent.parent
|
|
DEFAULT_DIR = Path(
|
|
"/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)"
|
|
"/anonymise_audit_30"
|
|
)
|
|
INSEE_NOMS = PROJECT_DIR / "data" / "insee" / "noms_famille_france.txt"
|
|
INSEE_PRENOMS = PROJECT_DIR / "data" / "insee" / "prenoms_france.txt"
|
|
BASELINE_PATH = PROJECT_DIR / "evaluation" / "baseline_scores.json"
|
|
|
|
# === Regex PII ===
|
|
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
|
|
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .\-]?\d){8}(?!\d)")
|
|
RE_NIR = re.compile(
|
|
r"\b[12]\s?\d{2}\s?(0[1-9]|1[0-2]|2[AB])\s?\d{2,3}\s?\d{3}\s?\d{3}\s?\d{2}\b"
|
|
)
|
|
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[\s]?(?:\d{4}[\s]?){4,7}\d{1,4}\b")
|
|
RE_PLACEHOLDER = re.compile(r"\[[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ_]+\]")
|
|
|
|
# Termes médicaux qui ne doivent PAS être masqués (faux positifs connus)
|
|
MEDICAL_FP_PATTERNS = {
|
|
# [NOM] MÊME LIGNE suivi d'un terme médical → faux positif probable
|
|
"ponction_lombaire": re.compile(r"\[NOM\][ \t]+lombaire", re.I),
|
|
"hanche_context": re.compile(
|
|
r"(?:de\s+la|de)\s+\[NOM\][ \t]+(?:profil|opérée|fémorale)", re.I
|
|
),
|
|
# IRM [NOM] sur la même ligne (pas cross-line)
|
|
"IRM_NOM": re.compile(r"IRM[ \t]+\[NOM\](?![\s]*(?:médicale|cérébrale))", re.I),
|
|
# [NOM] suivi de stade/type/lymphome = faux positif (pathologie masquée)
|
|
"lymphome_context": re.compile(
|
|
r"\[NOM\][ \t]*\.[ \t]*(?:stade|type|lymphome)", re.I
|
|
),
|
|
}
|
|
|
|
# Mots à ignorer dans la vérification INSEE (trop ambigus)
|
|
NAME_IGNORE = {
|
|
"CENTRE", "SERVICE", "COMPTE", "RENDU", "LETTRE", "SORTIE",
|
|
"CONSULTATION", "ANESTHESISTE", "BACTERIO", "OBSERVATION",
|
|
"HOSPITALIER", "CLINIQUE", "HOPITAL", "PHARMACIE", "TABLES",
|
|
"FINESS", "EMAIL", "ADRESSE", "EPISODE", "ETABLISSEMENT",
|
|
"NAISSANCE", "POSTAL", "DOSSIER", "RPPS", "GLOBAL",
|
|
"TRACKARE", "BIOLOGIE", "MEDICALE", "CHIRURGIE", "MEDECINE",
|
|
"URGENCES", "ANALYSE", "RESULTATS", "DIAGNOSTIC", "ANTECEDENT",
|
|
"TRAITEMENT", "INTERVENTION", "OPERATOIRE", "RAPPORT",
|
|
"PATIENT", "MONSIEUR", "MADAME", "DOCTEUR",
|
|
"NORMAL", "POSITIF", "NEGATIF", "PRESENT", "ABSENT",
|
|
# Instructions soins Trackare (aussi patronymes INSEE → faux positifs évaluateur)
|
|
"LEVER", "COUCHER", "MANGER", "MARCHER", "SORTIR", "POSE",
|
|
"GAUCHE", "DROITE", "ANTERIEUR", "POSTERIEUR",
|
|
"JANVIER", "FEVRIER", "MARS", "AVRIL", "JUIN", "JUILLET",
|
|
"AOUT", "SEPTEMBRE", "OCTOBRE", "NOVEMBRE", "DECEMBRE",
|
|
"FRANCE", "BAYONNE", "BORDEAUX", "PARIS", "TOULOUSE",
|
|
"SAINT", "SAINTE",
|
|
}
|
|
|
|
# Titres/préfixes qui apparaissent dans les entrées NOM mais ne sont pas des PII
|
|
TITLE_PREFIXES = {
|
|
"Dr", "DR", "Pr", "PR", "M", "Mme", "MME", "Mlle", "MLLE",
|
|
"Docteur", "DOCTEUR", "Professeur", "PROFESSEUR",
|
|
"Monsieur", "MONSIEUR", "Madame", "MADAME",
|
|
"Nom", "NOM", "Prénom", "PRENOM", "PRÉNOM",
|
|
"Date", "DATE", "Adresse", "ADRESSE",
|
|
"Née", "NEE", "Le", "LE", "La", "LA", "De", "DE", "Du", "DU",
|
|
"Des", "DES", "Les", "LES", "Au", "AU", "Aux", "AUX",
|
|
"Et", "ET", "Ou", "OU", "En", "EN",
|
|
"Ute", # artefact OCR fréquent
|
|
}
|
|
|
|
|
|
def normalize_nfkd(s: str) -> str:
|
|
"""Supprime les accents."""
|
|
return "".join(
|
|
c for c in unicodedata.normalize("NFD", s)
|
|
if unicodedata.category(c) != "Mn"
|
|
)
|
|
|
|
|
|
def load_insee_names() -> Tuple[Set[str], Set[str]]:
|
|
"""Charge les noms et prénoms INSEE (normalisés uppercase sans accents)."""
|
|
noms = set()
|
|
prenoms = set()
|
|
|
|
if INSEE_NOMS.exists():
|
|
for line in INSEE_NOMS.read_text(encoding="utf-8").splitlines():
|
|
name = line.strip()
|
|
if name and len(name) >= 3:
|
|
noms.add(normalize_nfkd(name).upper())
|
|
|
|
if INSEE_PRENOMS.exists():
|
|
for line in INSEE_PRENOMS.read_text(encoding="utf-8").splitlines():
|
|
name = line.strip()
|
|
if name and len(name) >= 3:
|
|
prenoms.add(normalize_nfkd(name).upper())
|
|
|
|
return noms, prenoms
|
|
|
|
|
|
def extract_name_tokens(audit_entries: List[dict]) -> Set[str]:
|
|
"""Extrait les tokens de noms individuels depuis les entrées audit NOM.
|
|
|
|
Filtre les titres (Dr, Pr, M., Mme...) et tokens trop courts/génériques.
|
|
"""
|
|
tokens = set()
|
|
for entry in audit_entries:
|
|
kind = entry.get("kind", "")
|
|
if "NOM" not in kind and "PRENOM" not in kind:
|
|
continue
|
|
original = entry.get("original", "")
|
|
if not original:
|
|
continue
|
|
# Découper le nom complet en tokens individuels
|
|
for token in re.split(r"[\s\-]+", original):
|
|
clean = token.strip(".,;:()\"'")
|
|
if len(clean) < 3:
|
|
continue
|
|
if not clean[0].isupper():
|
|
continue
|
|
# Exclure titres et préfixes
|
|
if clean in TITLE_PREFIXES:
|
|
continue
|
|
# Exclure mots génériques
|
|
if normalize_nfkd(clean).upper() in NAME_IGNORE:
|
|
continue
|
|
tokens.add(clean)
|
|
return tokens
|
|
|
|
|
|
def check_leak_audit(text: str, name_tokens: Set[str]) -> List[dict]:
|
|
"""Vérifie si des noms de l'audit sont encore dans le texte.
|
|
|
|
Retourne une entrée par token unique trouvé (avec le nombre d'occurrences).
|
|
"""
|
|
leaks = []
|
|
# Retirer les placeholders du texte pour ne pas matcher dedans
|
|
clean_text = RE_PLACEHOLDER.sub("___", text)
|
|
|
|
for token in name_tokens:
|
|
# Chercher le token comme mot entier (insensible à la casse)
|
|
pattern = re.compile(r"\b" + re.escape(token) + r"\b", re.IGNORECASE)
|
|
matches = list(pattern.finditer(clean_text))
|
|
if matches:
|
|
# Premier match pour le contexte
|
|
m = matches[0]
|
|
context_start = max(0, m.start() - 30)
|
|
context_end = min(len(clean_text), m.end() + 30)
|
|
context = clean_text[context_start:context_end].strip()
|
|
leaks.append({
|
|
"type": "LEAK_AUDIT",
|
|
"severity": "CRITIQUE",
|
|
"token": token,
|
|
"occurrences": len(matches),
|
|
"context": context,
|
|
})
|
|
return leaks
|
|
|
|
|
|
def check_leak_regex(text: str) -> List[dict]:
|
|
"""Cherche des patterns PII non masqués dans le texte."""
|
|
leaks = []
|
|
clean_text = RE_PLACEHOLDER.sub("___", text)
|
|
|
|
for name, pattern in [
|
|
("EMAIL", RE_EMAIL),
|
|
("TEL", RE_TEL),
|
|
("NIR", RE_NIR),
|
|
("IBAN", RE_IBAN),
|
|
]:
|
|
for m in pattern.finditer(clean_text):
|
|
# Ignorer si dans un contexte de placeholder
|
|
before = clean_text[max(0, m.start() - 2):m.start()]
|
|
if "[" in before or "___" in before:
|
|
continue
|
|
leaks.append({
|
|
"type": "LEAK_REGEX",
|
|
"severity": "HAUTE",
|
|
"pii_type": name,
|
|
"value": m.group(),
|
|
})
|
|
return leaks
|
|
|
|
|
|
def check_leak_insee(
|
|
text: str,
|
|
insee_noms: Set[str],
|
|
insee_prenoms: Set[str],
|
|
known_tokens: Set[str],
|
|
) -> List[dict]:
|
|
"""Cherche des mots ALL-CAPS qui sont des noms INSEE non masqués."""
|
|
leaks = []
|
|
clean_text = RE_PLACEHOLDER.sub("___", text)
|
|
seen = set()
|
|
|
|
# Mots ALL-CAPS de 3+ caractères
|
|
for m in re.finditer(r"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{3,})\b", clean_text):
|
|
word = m.group(1)
|
|
if word in seen:
|
|
continue
|
|
seen.add(word)
|
|
|
|
# Ignorer mots connus non-noms
|
|
normalized = normalize_nfkd(word).upper()
|
|
if normalized in NAME_IGNORE:
|
|
continue
|
|
|
|
# Vérifier si c'est un nom INSEE ET pas déjà dans les tokens connus
|
|
is_nom = normalized in insee_noms
|
|
is_prenom = normalized in insee_prenoms
|
|
|
|
if (is_nom or is_prenom) and word not in known_tokens:
|
|
# Vérifier le contexte — indicateurs que c'est un vrai nom
|
|
pos = m.start()
|
|
before = clean_text[max(0, pos - 40):pos].strip()
|
|
|
|
# Heuristiques de contexte fort (Dr, M., Mme, etc.)
|
|
strong_ctx = bool(re.search(
|
|
r"(?:Dr|Pr|M\.|Mme|Mlle|Docteur|Professeur|Monsieur|Madame)\s*$",
|
|
before, re.I
|
|
))
|
|
|
|
context_start = max(0, pos - 30)
|
|
context_end = min(len(clean_text), m.end() + 30)
|
|
context = clean_text[context_start:context_end].strip()
|
|
|
|
leaks.append({
|
|
"type": "LEAK_INSEE",
|
|
"severity": "HAUTE" if strong_ctx else "MOYENNE",
|
|
"word": word,
|
|
"is_nom": is_nom,
|
|
"is_prenom": is_prenom,
|
|
"strong_context": strong_ctx,
|
|
"context": context,
|
|
})
|
|
|
|
return leaks
|
|
|
|
|
|
def check_fp_medical(text: str) -> List[dict]:
|
|
"""Détecte les termes médicaux masqués à tort."""
|
|
fps = []
|
|
for name, pattern in MEDICAL_FP_PATTERNS.items():
|
|
for m in pattern.finditer(text):
|
|
fps.append({
|
|
"type": "FP_MEDICAL",
|
|
"pattern": name,
|
|
"match": m.group()[:80],
|
|
})
|
|
return fps
|
|
|
|
|
|
def check_fp_density(text: str) -> dict:
|
|
"""Calcule la densité de placeholders et détecte le sur-masquage."""
|
|
words = text.split()
|
|
total = len(words)
|
|
if total == 0:
|
|
return {"total_words": 0, "placeholders": 0, "density_pct": 0.0,
|
|
"nom_count": 0, "nom_pct": 0.0, "alert": False}
|
|
|
|
ph_count = sum(1 for w in words if RE_PLACEHOLDER.match(w))
|
|
nom_count = text.count("[NOM]")
|
|
|
|
density = ph_count / total * 100
|
|
nom_pct = nom_count / total * 100
|
|
|
|
return {
|
|
"total_words": total,
|
|
"placeholders": ph_count,
|
|
"density_pct": round(density, 2),
|
|
"nom_count": nom_count,
|
|
"nom_pct": round(nom_pct, 2),
|
|
"alert": nom_pct > 5.0,
|
|
}
|
|
|
|
|
|
def evaluate_file(
|
|
audit_path: Path,
|
|
txt_path: Path,
|
|
insee_noms: Set[str],
|
|
insee_prenoms: Set[str],
|
|
) -> dict:
|
|
"""Évalue un couple audit.jsonl + pseudonymise.txt."""
|
|
# Charger les données
|
|
audit_entries = []
|
|
with audit_path.open("r", encoding="utf-8") as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if line:
|
|
audit_entries.append(json.loads(line))
|
|
|
|
text = txt_path.read_text(encoding="utf-8")
|
|
name_tokens = extract_name_tokens(audit_entries)
|
|
|
|
# Vérifications
|
|
leak_audit = check_leak_audit(text, name_tokens)
|
|
leak_regex = check_leak_regex(text)
|
|
leak_insee = check_leak_insee(text, insee_noms, insee_prenoms, name_tokens)
|
|
fp_medical = check_fp_medical(text)
|
|
fp_density = check_fp_density(text)
|
|
|
|
# Comptages
|
|
audit_kinds = Counter(e.get("kind", "?") for e in audit_entries)
|
|
|
|
return {
|
|
"file": txt_path.stem.replace(".pseudonymise", ""),
|
|
"audit_hits": len(audit_entries),
|
|
"audit_kinds": dict(audit_kinds.most_common(10)),
|
|
"name_tokens_known": len(name_tokens),
|
|
"leak_audit": leak_audit,
|
|
"leak_regex": leak_regex,
|
|
"leak_insee": leak_insee,
|
|
"fp_medical": fp_medical,
|
|
"fp_density": fp_density,
|
|
"counts": {
|
|
"leak_audit": len(leak_audit),
|
|
"leak_regex": len(leak_regex),
|
|
"leak_insee_high": sum(
|
|
1 for l in leak_insee if l["severity"] == "HAUTE"
|
|
),
|
|
"leak_insee_medium": sum(
|
|
1 for l in leak_insee if l["severity"] == "MOYENNE"
|
|
),
|
|
"fp_medical": len(fp_medical),
|
|
"fp_overmasking": 1 if fp_density.get("alert") else 0,
|
|
},
|
|
}
|
|
|
|
|
|
def compute_scores(results: List[dict]) -> dict:
|
|
"""Calcule les scores globaux."""
|
|
total_name_tokens = sum(r["name_tokens_known"] for r in results)
|
|
# leak_audit = nombre de tokens UNIQUES qui fuient
|
|
total_leak_audit = sum(r["counts"]["leak_audit"] for r in results)
|
|
total_leak_occurrences = sum(
|
|
sum(l.get("occurrences", 1) for l in r["leak_audit"])
|
|
for r in results
|
|
)
|
|
total_leak_regex = sum(r["counts"]["leak_regex"] for r in results)
|
|
total_leak_insee_high = sum(r["counts"]["leak_insee_high"] for r in results)
|
|
total_leak_insee_med = sum(r["counts"]["leak_insee_medium"] for r in results)
|
|
total_fp_medical = sum(r["counts"]["fp_medical"] for r in results)
|
|
total_fp_overmask = sum(r["counts"]["fp_overmasking"] for r in results)
|
|
total_audit_hits = sum(r["audit_hits"] for r in results)
|
|
|
|
# Score leak (100 = aucune fuite, 0 = catastrophique)
|
|
# Proportionnel au nombre total de noms connus
|
|
if total_name_tokens > 0:
|
|
# Taux de fuite = noms uniques qui fuient / total noms connus
|
|
leak_rate = total_leak_audit / total_name_tokens
|
|
# Pénalité additionnelle pour regex et INSEE (contexte fort)
|
|
extra_penalty = (total_leak_regex * 2 + total_leak_insee_high * 1)
|
|
leak_score = max(0, round(100 * (1 - leak_rate) - extra_penalty, 1))
|
|
else:
|
|
leak_score = 100 if total_leak_audit == 0 else 0
|
|
|
|
# Score FP (100 = aucun faux positif, 0 = sur-masquage massif)
|
|
fp_penalty = total_fp_medical * 2 + total_fp_overmask * 5
|
|
fp_score = max(0, 100 - fp_penalty)
|
|
|
|
# Score global pondéré (leak plus important que FP)
|
|
global_score = round(leak_score * 0.7 + fp_score * 0.3, 1)
|
|
|
|
return {
|
|
"global_score": global_score,
|
|
"leak_score": leak_score,
|
|
"fp_score": fp_score,
|
|
"totals": {
|
|
"documents": len(results),
|
|
"audit_hits": total_audit_hits,
|
|
"name_tokens_known": total_name_tokens,
|
|
"leak_audit": total_leak_audit,
|
|
"leak_occurrences": total_leak_occurrences,
|
|
"leak_regex": total_leak_regex,
|
|
"leak_insee_high": total_leak_insee_high,
|
|
"leak_insee_medium": total_leak_insee_med,
|
|
"fp_medical": total_fp_medical,
|
|
"fp_overmasking": total_fp_overmask,
|
|
},
|
|
}
|
|
|
|
|
|
def print_report(scores: dict, results: List[dict]) -> None:
|
|
"""Affiche le rapport console."""
|
|
t = scores["totals"]
|
|
|
|
print(f"\n{'='*65}")
|
|
print(f" ÉVALUATION QUALITÉ ANONYMISATION")
|
|
print(f" {datetime.now().strftime('%Y-%m-%d %H:%M')}")
|
|
print(f"{'='*65}")
|
|
|
|
# Score global
|
|
gs = scores["global_score"]
|
|
grade = (
|
|
"A+" if gs >= 98 else "A" if gs >= 95 else "B" if gs >= 90
|
|
else "C" if gs >= 80 else "D" if gs >= 60 else "F"
|
|
)
|
|
print(f"\n SCORE GLOBAL : {gs}/100 [{grade}]")
|
|
print(f" Leak score : {scores['leak_score']}/100")
|
|
print(f" FP score : {scores['fp_score']}/100")
|
|
|
|
# Résumé des fuites
|
|
print(f"\n --- FUITES (FAUX NÉGATIFS) ---")
|
|
print(f" Documents analysés : {t['documents']}")
|
|
print(f" Noms connus (audit) : {t['name_tokens_known']}")
|
|
print(f" Fuites noms audit : {t['leak_audit']} noms uniques"
|
|
f" ({t.get('leak_occurrences', '?')} occurrences)"
|
|
f"{' CRITIQUE' if t['leak_audit'] > 0 else ' OK'}")
|
|
print(f" Fuites regex (PII) : {t['leak_regex']}"
|
|
f"{' HAUTE' if t['leak_regex'] > 0 else ' OK'}")
|
|
print(f" Noms INSEE (contexte fort) : {t['leak_insee_high']}"
|
|
f"{' HAUTE' if t['leak_insee_high'] > 0 else ' OK'}")
|
|
print(f" Noms INSEE (contexte faible): {t['leak_insee_medium']}")
|
|
|
|
# Résumé FP
|
|
print(f"\n --- FAUX POSITIFS ---")
|
|
print(f" Termes médicaux masqués : {t['fp_medical']}")
|
|
print(f" Alertes sur-masquage : {t['fp_overmasking']}")
|
|
|
|
# Détail des fuites critiques
|
|
all_leaks = []
|
|
for r in results:
|
|
for leak in r["leak_audit"]:
|
|
all_leaks.append((r["file"], leak))
|
|
for leak in r["leak_regex"]:
|
|
all_leaks.append((r["file"], leak))
|
|
for leak in r["leak_insee"]:
|
|
if leak["severity"] == "HAUTE":
|
|
all_leaks.append((r["file"], leak))
|
|
|
|
if all_leaks:
|
|
print(f"\n --- DÉTAIL FUITES ({len(all_leaks)}) ---")
|
|
for fname, leak in all_leaks[:30]:
|
|
sev = leak.get("severity", "?")
|
|
if leak["type"] == "LEAK_AUDIT":
|
|
print(f" [{sev}] {fname}: nom '{leak['token']}' "
|
|
f"encore présent")
|
|
print(f" ...{leak['context']}...")
|
|
elif leak["type"] == "LEAK_REGEX":
|
|
print(f" [{sev}] {fname}: {leak['pii_type']} "
|
|
f"'{leak['value']}'")
|
|
elif leak["type"] == "LEAK_INSEE":
|
|
src = "nom" if leak["is_nom"] else "prénom"
|
|
print(f" [{sev}] {fname}: '{leak['word']}' "
|
|
f"(INSEE {src}, non masqué)")
|
|
print(f" ...{leak['context']}...")
|
|
if len(all_leaks) > 30:
|
|
print(f" ... et {len(all_leaks) - 30} autres")
|
|
|
|
# Détail FP
|
|
all_fps = []
|
|
for r in results:
|
|
for fp in r["fp_medical"]:
|
|
all_fps.append((r["file"], fp))
|
|
|
|
if all_fps:
|
|
print(f"\n --- DÉTAIL FAUX POSITIFS ({len(all_fps)}) ---")
|
|
for fname, fp in all_fps[:15]:
|
|
print(f" {fname}: {fp['pattern']} → '{fp['match'][:60]}'")
|
|
|
|
# Fichiers avec problèmes
|
|
problem_files = [
|
|
r for r in results
|
|
if r["counts"]["leak_audit"] > 0 or r["counts"]["leak_regex"] > 0
|
|
]
|
|
if problem_files:
|
|
print(f"\n --- FICHIERS PROBLÉMATIQUES ({len(problem_files)}) ---")
|
|
for r in problem_files:
|
|
c = r["counts"]
|
|
print(f" {r['file']}: "
|
|
f"leak_audit={c['leak_audit']} "
|
|
f"leak_regex={c['leak_regex']}")
|
|
|
|
print(f"\n{'='*65}\n")
|
|
|
|
|
|
def save_baseline(scores: dict, results: List[dict], path: Path) -> None:
|
|
"""Sauvegarde les scores comme baseline."""
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
data = {
|
|
"date": datetime.now().isoformat(),
|
|
"scores": scores,
|
|
"per_file": {
|
|
r["file"]: r["counts"] for r in results
|
|
},
|
|
}
|
|
path.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
|
|
print(f"Baseline sauvegardée : {path}")
|
|
|
|
|
|
def compare_baseline(scores: dict, baseline_path: Path) -> None:
|
|
"""Compare les scores actuels avec la baseline."""
|
|
if not baseline_path.exists():
|
|
print("Pas de baseline trouvée. Utilisez --save d'abord.")
|
|
return
|
|
|
|
baseline = json.loads(baseline_path.read_text(encoding="utf-8"))
|
|
bs = baseline["scores"]
|
|
|
|
print(f"\n --- COMPARAISON AVEC BASELINE ({baseline['date'][:10]}) ---")
|
|
print(f" {'Métrique':<30} {'Baseline':>10} {'Actuel':>10} {'Delta':>10}")
|
|
print(f" {'-'*62}")
|
|
|
|
for key in ["global_score", "leak_score", "fp_score"]:
|
|
old = bs[key]
|
|
new = scores[key]
|
|
delta = new - old
|
|
marker = " +" if delta > 0 else (" -" if delta < 0 else " ")
|
|
print(f" {key:<30} {old:>10.1f} {new:>10.1f} {delta:>+10.1f}{marker}")
|
|
|
|
# Comparer les totaux
|
|
for key in ["leak_audit", "leak_regex", "leak_insee_high", "fp_medical"]:
|
|
old = bs["totals"].get(key, 0)
|
|
new = scores["totals"].get(key, 0)
|
|
delta = new - old
|
|
better = delta < 0 # moins de fuites/FP = mieux
|
|
marker = " OK" if better else (" !!" if delta > 0 else "")
|
|
print(f" {key:<30} {old:>10} {new:>10} {delta:>+10}{marker}")
|
|
|
|
print()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Évaluation qualité d'anonymisation"
|
|
)
|
|
parser.add_argument(
|
|
"--dir", type=Path, default=DEFAULT_DIR,
|
|
help="Répertoire contenant les fichiers anonymisés"
|
|
)
|
|
parser.add_argument(
|
|
"--save", action="store_true",
|
|
help="Sauvegarder les scores comme baseline"
|
|
)
|
|
parser.add_argument(
|
|
"--compare", action="store_true",
|
|
help="Comparer avec la baseline sauvegardée"
|
|
)
|
|
parser.add_argument(
|
|
"--json", type=Path, default=None,
|
|
help="Exporter le rapport complet en JSON"
|
|
)
|
|
parser.add_argument(
|
|
"--verbose", "-v", action="store_true",
|
|
help="Afficher les détails par fichier"
|
|
)
|
|
args = parser.parse_args()
|
|
|
|
output_dir = args.dir
|
|
if not output_dir.exists():
|
|
print(f"Répertoire non trouvé : {output_dir}")
|
|
sys.exit(1)
|
|
|
|
# Trouver les paires audit + texte
|
|
audit_files = sorted(output_dir.glob("*.audit.jsonl"))
|
|
if not audit_files:
|
|
print(f"Aucun .audit.jsonl trouvé dans {output_dir}")
|
|
sys.exit(1)
|
|
|
|
pairs = []
|
|
for af in audit_files:
|
|
stem = af.name.replace(".audit.jsonl", "")
|
|
txt = af.parent / f"{stem}.pseudonymise.txt"
|
|
if txt.exists():
|
|
pairs.append((af, txt))
|
|
|
|
print(f"Chargement gazetteers INSEE...", end=" ", flush=True)
|
|
insee_noms, insee_prenoms = load_insee_names()
|
|
print(f"{len(insee_noms)} noms, {len(insee_prenoms)} prénoms")
|
|
|
|
print(f"Analyse de {len(pairs)} documents...\n", flush=True)
|
|
|
|
# Évaluer chaque fichier
|
|
results = []
|
|
for af, txt in pairs:
|
|
result = evaluate_file(af, txt, insee_noms, insee_prenoms)
|
|
results.append(result)
|
|
|
|
if args.verbose:
|
|
c = result["counts"]
|
|
status = "OK" if sum(c.values()) == 0 else "!!"
|
|
print(f" [{status}] {result['file']}: "
|
|
f"leak_a={c['leak_audit']} "
|
|
f"leak_r={c['leak_regex']} "
|
|
f"leak_i={c['leak_insee_high']}+{c['leak_insee_medium']} "
|
|
f"fp_m={c['fp_medical']} "
|
|
f"fp_o={c['fp_overmasking']}")
|
|
|
|
# Scores globaux
|
|
scores = compute_scores(results)
|
|
|
|
# Rapport console
|
|
print_report(scores, results)
|
|
|
|
# Comparaison baseline
|
|
if args.compare:
|
|
compare_baseline(scores, BASELINE_PATH)
|
|
|
|
# Sauvegarde baseline
|
|
if args.save:
|
|
save_baseline(scores, results, BASELINE_PATH)
|
|
|
|
# Export JSON
|
|
if args.json:
|
|
report = {
|
|
"date": datetime.now().isoformat(),
|
|
"directory": str(output_dir),
|
|
"scores": scores,
|
|
"results": results,
|
|
}
|
|
args.json.write_text(
|
|
json.dumps(report, indent=2, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
print(f"Rapport JSON : {args.json}")
|
|
|
|
# Exit code
|
|
if scores["totals"]["leak_audit"] > 0:
|
|
sys.exit(1)
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|