- Sélection et copie de 27 documents représentatifs (10 simples, 12 moyens, 5 complexes) - Outil d'annotation CLI complet (tools/annotation_tool.py) - Guide d'annotation détaillé (docs/annotation_guide.md) - Évaluateur de qualité (evaluation/quality_evaluator.py) * Calcul Précision, Rappel, F1-Score * Identification faux positifs/négatifs * Métriques par type de PII * Export JSON et rapports texte - Scanner de fuite (evaluation/leak_scanner.py) * Détection PII résiduels (CRITIQUE) * Détection nouveaux PII (HAUTE) * Scan métadonnées PDF (MOYENNE) - Benchmark de performance (evaluation/benchmark.py) * Mesure temps de traitement * Mesure CPU/RAM * Export JSON/CSV - Tests unitaires complets pour tous les composants - Documentation complète du module d'évaluation Tâches complétées: - 1.1.1 Sélection de 27 documents (au lieu de 30) - 1.1.2 Outil d'annotation CLI - 1.2.1 Évaluateur de qualité - 1.2.2 Scanner de fuite - 1.2.3 Benchmark de performance Prochaines étapes: - 1.1.3 Annotation des 27 documents (manuel) - 1.1.4 Enrichissement stopwords médicaux - 1.3 Mesure de la baseline
310 lines
10 KiB
Python
310 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Scanner de fuite de PII.
|
|
|
|
Vérifie qu'aucun PII ne subsiste dans les documents anonymisés.
|
|
"""
|
|
import json
|
|
import re
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import List, Dict, Optional
|
|
|
|
try:
|
|
import pymupdf as fitz
|
|
except ImportError:
|
|
import fitz
|
|
|
|
|
|
@dataclass
|
|
class LeakReport:
|
|
"""Rapport de fuite de PII."""
|
|
|
|
is_safe: bool = True
|
|
leak_count: int = 0
|
|
leaks: List[Dict] = field(default_factory=list)
|
|
severity_counts: Dict[str, int] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""Convertit en dictionnaire."""
|
|
return {
|
|
"is_safe": self.is_safe,
|
|
"leak_count": self.leak_count,
|
|
"leaks": self.leaks,
|
|
"severity_counts": self.severity_counts
|
|
}
|
|
|
|
|
|
class LeakScanner:
|
|
"""Scanner de fuite de PII dans les documents anonymisés."""
|
|
|
|
# Regex pour détecter les PII
|
|
REGEX_PATTERNS = {
|
|
"EMAIL": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
|
|
"TEL": re.compile(r'(?<!\d)(?:\+33|0033|0)[1-9](?:[\s.\-]?\d){8}(?!\d)'),
|
|
"NIR": re.compile(r'\b[12]\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{3}\s?\d{3}\s?\d{2}\b'),
|
|
"IBAN": re.compile(r'\b[A-Z]{2}\d{2}[\s]?(?:\d{4}[\s]?){4,7}\d{1,4}\b'),
|
|
"CODE_POSTAL": re.compile(r'\b\d{5}\b'),
|
|
"IPP": re.compile(r'\b\d{8,10}\b'),
|
|
}
|
|
|
|
def __init__(self):
|
|
"""Initialise le scanner."""
|
|
pass
|
|
|
|
def extract_text_from_pdf(self, pdf_path: Path) -> str:
|
|
"""
|
|
Extrait le texte d'un PDF.
|
|
|
|
Args:
|
|
pdf_path: Chemin vers le PDF
|
|
|
|
Returns:
|
|
Texte extrait
|
|
"""
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
text = ""
|
|
for page in doc:
|
|
text += page.get_text()
|
|
doc.close()
|
|
return text
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors de l'extraction du texte de {pdf_path}: {e}")
|
|
return ""
|
|
|
|
def load_original_pii(self, audit_path: Path) -> List[Dict]:
|
|
"""
|
|
Charge les PII originaux depuis l'audit.
|
|
|
|
Args:
|
|
audit_path: Chemin vers le fichier .audit.jsonl
|
|
|
|
Returns:
|
|
Liste des PII originaux
|
|
"""
|
|
if not audit_path.exists():
|
|
return []
|
|
|
|
try:
|
|
pii_list = []
|
|
with open(audit_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
if line.strip():
|
|
pii = json.loads(line)
|
|
pii_list.append(pii)
|
|
return pii_list
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}")
|
|
return []
|
|
|
|
def scan_text(self, text: str, original_pii: List[Dict]) -> List[Dict]:
|
|
"""
|
|
Scanne le texte pour détecter les fuites de PII.
|
|
|
|
Args:
|
|
text: Texte à scanner
|
|
original_pii: Liste des PII originaux
|
|
|
|
Returns:
|
|
Liste des fuites détectées
|
|
"""
|
|
leaks = []
|
|
|
|
# 1. Vérifier que les PII originaux ne sont plus présents
|
|
for pii in original_pii:
|
|
original_text = pii.get("original", "")
|
|
if not original_text:
|
|
continue
|
|
|
|
# Recherche insensible à la casse
|
|
if re.search(re.escape(original_text), text, re.IGNORECASE):
|
|
leaks.append({
|
|
"type": "original_pii_present",
|
|
"severity": "CRITIQUE",
|
|
"pii_type": pii.get("kind", "UNKNOWN"),
|
|
"text": original_text,
|
|
"message": f"PII original encore présent: {original_text}"
|
|
})
|
|
|
|
# 2. Détecter de nouveaux PII non masqués
|
|
for pii_type, pattern in self.REGEX_PATTERNS.items():
|
|
matches = pattern.finditer(text)
|
|
for match in matches:
|
|
matched_text = match.group()
|
|
|
|
# Vérifier si ce PII était dans l'audit original
|
|
is_known = any(
|
|
pii.get("original", "").lower() == matched_text.lower()
|
|
for pii in original_pii
|
|
)
|
|
|
|
if not is_known:
|
|
leaks.append({
|
|
"type": "new_pii_detected",
|
|
"severity": "HAUTE",
|
|
"pii_type": pii_type,
|
|
"text": matched_text,
|
|
"message": f"Nouveau PII détecté: {pii_type} = {matched_text}"
|
|
})
|
|
|
|
return leaks
|
|
|
|
def scan_metadata(self, pdf_path: Path) -> List[Dict]:
|
|
"""
|
|
Scanne les métadonnées du PDF.
|
|
|
|
Args:
|
|
pdf_path: Chemin vers le PDF
|
|
|
|
Returns:
|
|
Liste des fuites dans les métadonnées
|
|
"""
|
|
leaks = []
|
|
|
|
try:
|
|
doc = fitz.open(pdf_path)
|
|
metadata = doc.metadata
|
|
doc.close()
|
|
|
|
# Champs à vérifier
|
|
suspicious_fields = ["author", "creator", "producer", "subject", "title"]
|
|
|
|
for field in suspicious_fields:
|
|
value = metadata.get(field, "")
|
|
if value and value.strip():
|
|
# Vérifier si le champ contient des PII potentiels
|
|
# (noms, emails, etc.)
|
|
if "@" in value:
|
|
leaks.append({
|
|
"type": "metadata_leak",
|
|
"severity": "MOYENNE",
|
|
"field": field,
|
|
"text": value,
|
|
"message": f"Métadonnée suspecte ({field}): {value}"
|
|
})
|
|
elif any(c.isalpha() for c in value):
|
|
# Contient des lettres (potentiellement un nom)
|
|
leaks.append({
|
|
"type": "metadata_leak",
|
|
"severity": "MOYENNE",
|
|
"field": field,
|
|
"text": value,
|
|
"message": f"Métadonnée suspecte ({field}): {value}"
|
|
})
|
|
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du scan des métadonnées de {pdf_path}: {e}")
|
|
|
|
return leaks
|
|
|
|
def scan(self, anonymized_pdf: Path, original_audit: Path) -> LeakReport:
|
|
"""
|
|
Scanne un document anonymisé pour détecter les fuites.
|
|
|
|
Args:
|
|
anonymized_pdf: Chemin vers le PDF anonymisé
|
|
original_audit: Chemin vers l'audit original
|
|
|
|
Returns:
|
|
Rapport de fuite
|
|
"""
|
|
# Extraire le texte
|
|
text = self.extract_text_from_pdf(anonymized_pdf)
|
|
|
|
# Charger les PII originaux
|
|
original_pii = self.load_original_pii(original_audit)
|
|
|
|
# Scanner le texte
|
|
text_leaks = self.scan_text(text, original_pii)
|
|
|
|
# Scanner les métadonnées
|
|
metadata_leaks = self.scan_metadata(anonymized_pdf)
|
|
|
|
# Combiner les fuites
|
|
all_leaks = text_leaks + metadata_leaks
|
|
|
|
# Compter par sévérité
|
|
severity_counts = {}
|
|
for leak in all_leaks:
|
|
severity = leak.get("severity", "UNKNOWN")
|
|
severity_counts[severity] = severity_counts.get(severity, 0) + 1
|
|
|
|
# Créer le rapport
|
|
report = LeakReport(
|
|
is_safe=len(all_leaks) == 0,
|
|
leak_count=len(all_leaks),
|
|
leaks=all_leaks,
|
|
severity_counts=severity_counts
|
|
)
|
|
|
|
return report
|
|
|
|
def generate_report(self, report: LeakReport, pdf_path: Path) -> str:
|
|
"""
|
|
Génère un rapport texte.
|
|
|
|
Args:
|
|
report: Rapport de fuite
|
|
pdf_path: Chemin du PDF
|
|
|
|
Returns:
|
|
Rapport texte
|
|
"""
|
|
lines = []
|
|
lines.append("=" * 80)
|
|
lines.append(f"RAPPORT DE FUITE - {pdf_path.name}")
|
|
lines.append("=" * 80)
|
|
lines.append("")
|
|
|
|
if report.is_safe:
|
|
lines.append("✓ DOCUMENT SÛR - Aucune fuite détectée")
|
|
else:
|
|
lines.append(f"✗ DOCUMENT NON SÛR - {report.leak_count} fuite(s) détectée(s)")
|
|
lines.append("")
|
|
|
|
# Par sévérité
|
|
lines.append("FUITES PAR SÉVÉRITÉ:")
|
|
for severity, count in sorted(report.severity_counts.items()):
|
|
lines.append(f" {severity}: {count}")
|
|
lines.append("")
|
|
|
|
# Détails des fuites
|
|
lines.append("DÉTAILS DES FUITES:")
|
|
for i, leak in enumerate(report.leaks, 1):
|
|
lines.append(f"\n [{i}] {leak['severity']} - {leak['type']}")
|
|
lines.append(f" Type PII: {leak.get('pii_type', 'N/A')}")
|
|
lines.append(f" Texte: {leak.get('text', 'N/A')}")
|
|
lines.append(f" Message: {leak.get('message', 'N/A')}")
|
|
|
|
lines.append("")
|
|
lines.append("=" * 80)
|
|
|
|
return "\n".join(lines)
|
|
|
|
def export_json(self, report: LeakReport, output_path: Path):
|
|
"""
|
|
Exporte le rapport en JSON.
|
|
|
|
Args:
|
|
report: Rapport de fuite
|
|
output_path: Chemin du fichier de sortie
|
|
"""
|
|
with open(output_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report.to_dict(), f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✓ Rapport exporté: {output_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Test basique
|
|
scanner = LeakScanner()
|
|
|
|
# Exemple d'utilisation
|
|
anonymized_pdf = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.redacted.pdf")
|
|
original_audit = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl")
|
|
|
|
if anonymized_pdf.exists() and original_audit.exists():
|
|
report = scanner.scan(anonymized_pdf, original_audit)
|
|
print(scanner.generate_report(report, anonymized_pdf))
|