#!/usr/bin/env python3 """ Scanner de fuite de PII. Vérifie qu'aucun PII ne subsiste dans les documents anonymisés. """ import json import re from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Optional try: import pymupdf as fitz except ImportError: import fitz @dataclass class LeakReport: """Rapport de fuite de PII.""" is_safe: bool = True leak_count: int = 0 leaks: List[Dict] = field(default_factory=list) severity_counts: Dict[str, int] = field(default_factory=dict) def to_dict(self) -> Dict: """Convertit en dictionnaire.""" return { "is_safe": self.is_safe, "leak_count": self.leak_count, "leaks": self.leaks, "severity_counts": self.severity_counts } class LeakScanner: """Scanner de fuite de PII dans les documents anonymisés.""" # Regex pour détecter les PII REGEX_PATTERNS = { "EMAIL": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'), "TEL": re.compile(r'(? str: """ Extrait le texte d'un PDF. Args: pdf_path: Chemin vers le PDF Returns: Texte extrait """ try: doc = fitz.open(pdf_path) text = "" for page in doc: text += page.get_text() doc.close() return text except Exception as e: print(f"✗ Erreur lors de l'extraction du texte de {pdf_path}: {e}") return "" def load_original_pii(self, audit_path: Path) -> List[Dict]: """ Charge les PII originaux depuis l'audit. Args: audit_path: Chemin vers le fichier .audit.jsonl Returns: Liste des PII originaux """ if not audit_path.exists(): return [] try: pii_list = [] with open(audit_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): pii = json.loads(line) pii_list.append(pii) return pii_list except Exception as e: print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}") return [] def scan_text(self, text: str, original_pii: List[Dict]) -> List[Dict]: """ Scanne le texte pour détecter les fuites de PII. Args: text: Texte à scanner original_pii: Liste des PII originaux Returns: Liste des fuites détectées """ leaks = [] # 1. Vérifier que les PII originaux ne sont plus présents for pii in original_pii: original_text = pii.get("original", "") if not original_text: continue # Recherche insensible à la casse if re.search(re.escape(original_text), text, re.IGNORECASE): leaks.append({ "type": "original_pii_present", "severity": "CRITIQUE", "pii_type": pii.get("kind", "UNKNOWN"), "text": original_text, "message": f"PII original encore présent: {original_text}" }) # 2. Détecter de nouveaux PII non masqués for pii_type, pattern in self.REGEX_PATTERNS.items(): matches = pattern.finditer(text) for match in matches: matched_text = match.group() # Vérifier si ce PII était dans l'audit original is_known = any( pii.get("original", "").lower() == matched_text.lower() for pii in original_pii ) if not is_known: leaks.append({ "type": "new_pii_detected", "severity": "HAUTE", "pii_type": pii_type, "text": matched_text, "message": f"Nouveau PII détecté: {pii_type} = {matched_text}" }) return leaks def scan_metadata(self, pdf_path: Path) -> List[Dict]: """ Scanne les métadonnées du PDF. Args: pdf_path: Chemin vers le PDF Returns: Liste des fuites dans les métadonnées """ leaks = [] try: doc = fitz.open(pdf_path) metadata = doc.metadata doc.close() # Champs à vérifier suspicious_fields = ["author", "creator", "producer", "subject", "title"] for field in suspicious_fields: value = metadata.get(field, "") if value and value.strip(): # Vérifier si le champ contient des PII potentiels # (noms, emails, etc.) if "@" in value: leaks.append({ "type": "metadata_leak", "severity": "MOYENNE", "field": field, "text": value, "message": f"Métadonnée suspecte ({field}): {value}" }) elif any(c.isalpha() for c in value): # Contient des lettres (potentiellement un nom) leaks.append({ "type": "metadata_leak", "severity": "MOYENNE", "field": field, "text": value, "message": f"Métadonnée suspecte ({field}): {value}" }) except Exception as e: print(f"✗ Erreur lors du scan des métadonnées de {pdf_path}: {e}") return leaks def scan(self, anonymized_pdf: Path, original_audit: Path) -> LeakReport: """ Scanne un document anonymisé pour détecter les fuites. Args: anonymized_pdf: Chemin vers le PDF anonymisé original_audit: Chemin vers l'audit original Returns: Rapport de fuite """ # Extraire le texte text = self.extract_text_from_pdf(anonymized_pdf) # Charger les PII originaux original_pii = self.load_original_pii(original_audit) # Scanner le texte text_leaks = self.scan_text(text, original_pii) # Scanner les métadonnées metadata_leaks = self.scan_metadata(anonymized_pdf) # Combiner les fuites all_leaks = text_leaks + metadata_leaks # Compter par sévérité severity_counts = {} for leak in all_leaks: severity = leak.get("severity", "UNKNOWN") severity_counts[severity] = severity_counts.get(severity, 0) + 1 # Créer le rapport report = LeakReport( is_safe=len(all_leaks) == 0, leak_count=len(all_leaks), leaks=all_leaks, severity_counts=severity_counts ) return report def generate_report(self, report: LeakReport, pdf_path: Path) -> str: """ Génère un rapport texte. Args: report: Rapport de fuite pdf_path: Chemin du PDF Returns: Rapport texte """ lines = [] lines.append("=" * 80) lines.append(f"RAPPORT DE FUITE - {pdf_path.name}") lines.append("=" * 80) lines.append("") if report.is_safe: lines.append("✓ DOCUMENT SÛR - Aucune fuite détectée") else: lines.append(f"✗ DOCUMENT NON SÛR - {report.leak_count} fuite(s) détectée(s)") lines.append("") # Par sévérité lines.append("FUITES PAR SÉVÉRITÉ:") for severity, count in sorted(report.severity_counts.items()): lines.append(f" {severity}: {count}") lines.append("") # Détails des fuites lines.append("DÉTAILS DES FUITES:") for i, leak in enumerate(report.leaks, 1): lines.append(f"\n [{i}] {leak['severity']} - {leak['type']}") lines.append(f" Type PII: {leak.get('pii_type', 'N/A')}") lines.append(f" Texte: {leak.get('text', 'N/A')}") lines.append(f" Message: {leak.get('message', 'N/A')}") lines.append("") lines.append("=" * 80) return "\n".join(lines) def export_json(self, report: LeakReport, output_path: Path): """ Exporte le rapport en JSON. Args: report: Rapport de fuite output_path: Chemin du fichier de sortie """ with open(output_path, 'w', encoding='utf-8') as f: json.dump(report.to_dict(), f, indent=2, ensure_ascii=False) print(f"✓ Rapport exporté: {output_path}") if __name__ == "__main__": # Test basique scanner = LeakScanner() # Exemple d'utilisation anonymized_pdf = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.redacted.pdf") original_audit = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl") if anonymized_pdf.exists() and original_audit.exists(): report = scanner.scan(anonymized_pdf, original_audit) print(scanner.generate_report(report, anonymized_pdf))