#!/usr/bin/env python3 """ Validation sur le corpus complet (59 OGC / 130 PDFs). Ce script anonymise tous les documents du corpus et vérifie : - Absence de fuites (dates de naissance, CHCB, etc.) - Statistiques de détection par type - Performances (temps de traitement) """ import sys import json import time from pathlib import Path from collections import defaultdict, Counter import re sys.path.insert(0, str(Path(__file__).parent.parent)) from anonymizer_core_refactored_onnx import process_pdf def validate_full_corpus(): """Valide l'anonymisation sur le corpus complet.""" # Répertoires corpus_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)") output_dir = Path("corpus_validation") output_dir.mkdir(exist_ok=True) print("=" * 80) print("VALIDATION SUR CORPUS COMPLET") print("=" * 80) print(f"\n📁 Corpus: {corpus_dir}") print(f"📁 Sortie: {output_dir}") # Lister tous les PDFs pdf_files = sorted(corpus_dir.glob("**/*.pdf")) if not pdf_files: print(f"\n✗ Aucun PDF trouvé dans {corpus_dir}") return 1 print(f"\n📄 Documents trouvés: {len(pdf_files)}") # Statistiques stats = { "total_documents": len(pdf_files), "processed": 0, "failed": 0, "total_pii": 0, "total_time": 0.0, "by_type": defaultdict(int), "by_folder": defaultdict(lambda: {"count": 0, "pii": 0, "time": 0.0}), "errors": [] } # Traiter chaque PDF start_time = time.time() for i, pdf_path in enumerate(pdf_files, 1): folder_name = pdf_path.parent.name print(f"\n[{i}/{len(pdf_files)}] {folder_name}/{pdf_path.name}") try: # Anonymiser doc_start = time.time() result = process_pdf( pdf_path, output_dir, make_vector_redaction=False, also_make_raster_burn=True, config_path=Path("config/dictionnaires.yml") ) doc_time = time.time() - doc_start # Lire l'audit audit_path = Path(result["audit"]) detections = [] with open(audit_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): detections.append(json.loads(line)) pii_count = len(detections) # Statistiques stats["processed"] += 1 stats["total_pii"] += pii_count stats["total_time"] += doc_time stats["by_folder"][folder_name]["count"] += 1 stats["by_folder"][folder_name]["pii"] += pii_count stats["by_folder"][folder_name]["time"] += doc_time for det in detections: stats["by_type"][det["kind"]] += 1 print(f" ✓ {pii_count} PII détectés en {doc_time:.2f}s") except Exception as e: stats["failed"] += 1 stats["errors"].append({ "file": str(pdf_path), "error": str(e) }) print(f" ✗ Erreur: {e}") total_time = time.time() - start_time # Résumé print("\n" + "=" * 80) print("RÉSUMÉ") print("=" * 80) print(f"\n📊 Documents:") print(f" - Total: {stats['total_documents']}") print(f" - Traités: {stats['processed']}") print(f" - Échecs: {stats['failed']}") print(f"\n📊 Détections:") print(f" - Total PII: {stats['total_pii']}") print(f" - Moyenne par document: {stats['total_pii'] / stats['processed']:.1f}") print(f"\n📊 Performances:") print(f" - Temps total: {total_time:.1f}s ({total_time/60:.1f}min)") print(f" - Temps moyen: {stats['total_time'] / stats['processed']:.2f}s/doc") print(f"\n📊 Top 10 types de PII:") for pii_type, count in sorted(stats["by_type"].items(), key=lambda x: x[1], reverse=True)[:10]: print(f" - {pii_type}: {count}") print(f"\n📊 Top 10 dossiers:") for folder, data in sorted(stats["by_folder"].items(), key=lambda x: x[1]["pii"], reverse=True)[:10]: print(f" - {folder}: {data['count']} docs, {data['pii']} PII, {data['time']:.1f}s") if stats["errors"]: print(f"\n⚠️ Erreurs ({len(stats['errors'])}):") for err in stats["errors"][:5]: print(f" - {Path(err['file']).name}: {err['error']}") # Sauvegarder les statistiques stats_file = output_dir / "validation_stats.json" with open(stats_file, 'w', encoding='utf-8') as f: # Convertir defaultdict en dict pour JSON stats_json = { "total_documents": stats["total_documents"], "processed": stats["processed"], "failed": stats["failed"], "total_pii": stats["total_pii"], "total_time": stats["total_time"], "by_type": dict(stats["by_type"]), "by_folder": {k: dict(v) for k, v in stats["by_folder"].items()}, "errors": stats["errors"] } json.dump(stats_json, f, indent=2, ensure_ascii=False) print(f"\n📊 Statistiques sauvegardées: {stats_file}") # Vérification des fuites print("\n" + "=" * 80) print("VÉRIFICATION DES FUITES") print("=" * 80) leak_check(output_dir) print("\n" + "=" * 80) return 0 def leak_check(output_dir: Path): """Vérifie les fuites dans les textes anonymisés.""" print("\n🔍 Recherche de fuites dans les textes anonymisés...") # Patterns à vérifier patterns = { "date_naissance": re.compile(r"(?:n[ée]+\s+le|DDN)\s*:?\s*\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}", re.IGNORECASE), "chcb": re.compile(r"\bCHCB\b", re.IGNORECASE), "date_format": re.compile(r"\b\d{2}[/.\-]\d{2}[/.\-]\d{4}\b"), } leaks = defaultdict(list) # Vérifier tous les fichiers .pseudonymise.txt txt_files = list(output_dir.glob("*.pseudonymise.txt")) for txt_file in txt_files: with open(txt_file, 'r', encoding='utf-8') as f: content = f.read() for leak_type, pattern in patterns.items(): matches = pattern.findall(content) if matches: leaks[leak_type].extend([ {"file": txt_file.name, "match": m} for m in matches ]) # Afficher les résultats if not leaks: print(" ✅ Aucune fuite détectée!") else: print(f" ⚠️ {sum(len(v) for v in leaks.values())} fuites potentielles détectées:") for leak_type, items in leaks.items(): print(f"\n {leak_type}: {len(items)} occurrences") for item in items[:3]: # Afficher les 3 premières print(f" - {item['file']}: {item['match']}") # Sauvegarder les fuites if leaks: leak_file = output_dir / "leaks_detected.json" with open(leak_file, 'w', encoding='utf-8') as f: json.dump(dict(leaks), f, indent=2, ensure_ascii=False) print(f"\n 📄 Fuites sauvegardées: {leak_file}") if __name__ == "__main__": sys.exit(validate_full_corpus())