Files
anonymisation/evaluation/leak_scanner.py
Domi31tls 340348b820 feat: Phase 1 - Système d'évaluation de la qualité
- Sélection et copie de 27 documents représentatifs (10 simples, 12 moyens, 5 complexes)
- Outil d'annotation CLI complet (tools/annotation_tool.py)
- Guide d'annotation détaillé (docs/annotation_guide.md)
- Évaluateur de qualité (evaluation/quality_evaluator.py)
  * Calcul Précision, Rappel, F1-Score
  * Identification faux positifs/négatifs
  * Métriques par type de PII
  * Export JSON et rapports texte
- Scanner de fuite (evaluation/leak_scanner.py)
  * Détection PII résiduels (CRITIQUE)
  * Détection nouveaux PII (HAUTE)
  * Scan métadonnées PDF (MOYENNE)
- Benchmark de performance (evaluation/benchmark.py)
  * Mesure temps de traitement
  * Mesure CPU/RAM
  * Export JSON/CSV
- Tests unitaires complets pour tous les composants
- Documentation complète du module d'évaluation

Tâches complétées:
- 1.1.1 Sélection de 27 documents (au lieu de 30)
- 1.1.2 Outil d'annotation CLI
- 1.2.1 Évaluateur de qualité
- 1.2.2 Scanner de fuite
- 1.2.3 Benchmark de performance

Prochaines étapes:
- 1.1.3 Annotation des 27 documents (manuel)
- 1.1.4 Enrichissement stopwords médicaux
- 1.3 Mesure de la baseline
2026-03-02 10:07:41 +01:00

310 lines
10 KiB
Python

#!/usr/bin/env python3
"""
Scanner de fuite de PII.
Vérifie qu'aucun PII ne subsiste dans les documents anonymisés.
"""
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Optional
try:
import pymupdf as fitz
except ImportError:
import fitz
@dataclass
class LeakReport:
"""Rapport de fuite de PII."""
is_safe: bool = True
leak_count: int = 0
leaks: List[Dict] = field(default_factory=list)
severity_counts: Dict[str, int] = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convertit en dictionnaire."""
return {
"is_safe": self.is_safe,
"leak_count": self.leak_count,
"leaks": self.leaks,
"severity_counts": self.severity_counts
}
class LeakScanner:
"""Scanner de fuite de PII dans les documents anonymisés."""
# Regex pour détecter les PII
REGEX_PATTERNS = {
"EMAIL": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"TEL": re.compile(r'(?<!\d)(?:\+33|0033|0)[1-9](?:[\s.\-]?\d){8}(?!\d)'),
"NIR": re.compile(r'\b[12]\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{3}\s?\d{3}\s?\d{2}\b'),
"IBAN": re.compile(r'\b[A-Z]{2}\d{2}[\s]?(?:\d{4}[\s]?){4,7}\d{1,4}\b'),
"CODE_POSTAL": re.compile(r'\b\d{5}\b'),
"IPP": re.compile(r'\b\d{8,10}\b'),
}
def __init__(self):
"""Initialise le scanner."""
pass
def extract_text_from_pdf(self, pdf_path: Path) -> str:
"""
Extrait le texte d'un PDF.
Args:
pdf_path: Chemin vers le PDF
Returns:
Texte extrait
"""
try:
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text
except Exception as e:
print(f"✗ Erreur lors de l'extraction du texte de {pdf_path}: {e}")
return ""
def load_original_pii(self, audit_path: Path) -> List[Dict]:
"""
Charge les PII originaux depuis l'audit.
Args:
audit_path: Chemin vers le fichier .audit.jsonl
Returns:
Liste des PII originaux
"""
if not audit_path.exists():
return []
try:
pii_list = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
pii = json.loads(line)
pii_list.append(pii)
return pii_list
except Exception as e:
print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}")
return []
def scan_text(self, text: str, original_pii: List[Dict]) -> List[Dict]:
"""
Scanne le texte pour détecter les fuites de PII.
Args:
text: Texte à scanner
original_pii: Liste des PII originaux
Returns:
Liste des fuites détectées
"""
leaks = []
# 1. Vérifier que les PII originaux ne sont plus présents
for pii in original_pii:
original_text = pii.get("original", "")
if not original_text:
continue
# Recherche insensible à la casse
if re.search(re.escape(original_text), text, re.IGNORECASE):
leaks.append({
"type": "original_pii_present",
"severity": "CRITIQUE",
"pii_type": pii.get("kind", "UNKNOWN"),
"text": original_text,
"message": f"PII original encore présent: {original_text}"
})
# 2. Détecter de nouveaux PII non masqués
for pii_type, pattern in self.REGEX_PATTERNS.items():
matches = pattern.finditer(text)
for match in matches:
matched_text = match.group()
# Vérifier si ce PII était dans l'audit original
is_known = any(
pii.get("original", "").lower() == matched_text.lower()
for pii in original_pii
)
if not is_known:
leaks.append({
"type": "new_pii_detected",
"severity": "HAUTE",
"pii_type": pii_type,
"text": matched_text,
"message": f"Nouveau PII détecté: {pii_type} = {matched_text}"
})
return leaks
def scan_metadata(self, pdf_path: Path) -> List[Dict]:
"""
Scanne les métadonnées du PDF.
Args:
pdf_path: Chemin vers le PDF
Returns:
Liste des fuites dans les métadonnées
"""
leaks = []
try:
doc = fitz.open(pdf_path)
metadata = doc.metadata
doc.close()
# Champs à vérifier
suspicious_fields = ["author", "creator", "producer", "subject", "title"]
for field in suspicious_fields:
value = metadata.get(field, "")
if value and value.strip():
# Vérifier si le champ contient des PII potentiels
# (noms, emails, etc.)
if "@" in value:
leaks.append({
"type": "metadata_leak",
"severity": "MOYENNE",
"field": field,
"text": value,
"message": f"Métadonnée suspecte ({field}): {value}"
})
elif any(c.isalpha() for c in value):
# Contient des lettres (potentiellement un nom)
leaks.append({
"type": "metadata_leak",
"severity": "MOYENNE",
"field": field,
"text": value,
"message": f"Métadonnée suspecte ({field}): {value}"
})
except Exception as e:
print(f"✗ Erreur lors du scan des métadonnées de {pdf_path}: {e}")
return leaks
def scan(self, anonymized_pdf: Path, original_audit: Path) -> LeakReport:
"""
Scanne un document anonymisé pour détecter les fuites.
Args:
anonymized_pdf: Chemin vers le PDF anonymisé
original_audit: Chemin vers l'audit original
Returns:
Rapport de fuite
"""
# Extraire le texte
text = self.extract_text_from_pdf(anonymized_pdf)
# Charger les PII originaux
original_pii = self.load_original_pii(original_audit)
# Scanner le texte
text_leaks = self.scan_text(text, original_pii)
# Scanner les métadonnées
metadata_leaks = self.scan_metadata(anonymized_pdf)
# Combiner les fuites
all_leaks = text_leaks + metadata_leaks
# Compter par sévérité
severity_counts = {}
for leak in all_leaks:
severity = leak.get("severity", "UNKNOWN")
severity_counts[severity] = severity_counts.get(severity, 0) + 1
# Créer le rapport
report = LeakReport(
is_safe=len(all_leaks) == 0,
leak_count=len(all_leaks),
leaks=all_leaks,
severity_counts=severity_counts
)
return report
def generate_report(self, report: LeakReport, pdf_path: Path) -> str:
"""
Génère un rapport texte.
Args:
report: Rapport de fuite
pdf_path: Chemin du PDF
Returns:
Rapport texte
"""
lines = []
lines.append("=" * 80)
lines.append(f"RAPPORT DE FUITE - {pdf_path.name}")
lines.append("=" * 80)
lines.append("")
if report.is_safe:
lines.append("✓ DOCUMENT SÛR - Aucune fuite détectée")
else:
lines.append(f"✗ DOCUMENT NON SÛR - {report.leak_count} fuite(s) détectée(s)")
lines.append("")
# Par sévérité
lines.append("FUITES PAR SÉVÉRITÉ:")
for severity, count in sorted(report.severity_counts.items()):
lines.append(f" {severity}: {count}")
lines.append("")
# Détails des fuites
lines.append("DÉTAILS DES FUITES:")
for i, leak in enumerate(report.leaks, 1):
lines.append(f"\n [{i}] {leak['severity']} - {leak['type']}")
lines.append(f" Type PII: {leak.get('pii_type', 'N/A')}")
lines.append(f" Texte: {leak.get('text', 'N/A')}")
lines.append(f" Message: {leak.get('message', 'N/A')}")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def export_json(self, report: LeakReport, output_path: Path):
"""
Exporte le rapport en JSON.
Args:
report: Rapport de fuite
output_path: Chemin du fichier de sortie
"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report.to_dict(), f, indent=2, ensure_ascii=False)
print(f"✓ Rapport exporté: {output_path}")
if __name__ == "__main__":
# Test basique
scanner = LeakScanner()
# Exemple d'utilisation
anonymized_pdf = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.redacted.pdf")
original_audit = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl")
if anonymized_pdf.exists() and original_audit.exists():
report = scanner.scan(anonymized_pdf, original_audit)
print(scanner.generate_report(report, anonymized_pdf))