#!/usr/bin/env python3 """ Validation rapide sur un échantillon représentatif du corpus. Sélectionne 10% des documents (environ 135 PDFs) de manière aléatoire pour une validation rapide. """ import sys import json import time import random from pathlib import Path from collections import defaultdict import re sys.path.insert(0, str(Path(__file__).parent.parent)) from anonymizer_core_refactored_onnx import process_pdf def validate_corpus_sample(): """Valide l'anonymisation sur un échantillon du corpus.""" # Répertoires corpus_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)") output_dir = Path("corpus_validation_sample") output_dir.mkdir(exist_ok=True) print("=" * 80) print("VALIDATION SUR ÉCHANTILLON DU CORPUS") print("=" * 80) print(f"\n📁 Corpus: {corpus_dir}") print(f"📁 Sortie: {output_dir}") # Lister tous les PDFs all_pdfs = sorted(corpus_dir.glob("**/*.pdf")) if not all_pdfs: print(f"\n✗ Aucun PDF trouvé dans {corpus_dir}") return 1 # Sélectionner un échantillon (10%) sample_size = max(50, len(all_pdfs) // 10) # Au moins 50, ou 10% random.seed(42) # Pour reproductibilité pdf_files = random.sample(all_pdfs, min(sample_size, len(all_pdfs))) print(f"\n📄 Documents totaux: {len(all_pdfs)}") print(f"📄 Échantillon sélectionné: {len(pdf_files)} ({len(pdf_files)/len(all_pdfs)*100:.1f}%)") # Statistiques stats = { "total_documents": len(all_pdfs), "sample_size": len(pdf_files), "processed": 0, "failed": 0, "total_pii": 0, "total_time": 0.0, "by_type": defaultdict(int), "by_doc_type": defaultdict(lambda: {"count": 0, "pii": 0, "time": 0.0}), "errors": [] } # Traiter chaque PDF start_time = time.time() for i, pdf_path in enumerate(pdf_files, 1): # Déterminer le type de document doc_name = pdf_path.name.lower() if 'trackare' in doc_name: doc_type = 'trackare' elif 'crh' in doc_name: doc_type = 'CRH' elif 'cro' in doc_name: doc_type = 'CRO' elif 'anapath' in doc_name: doc_type = 'ANAPATH' elif 'bacterio' in doc_name: doc_type = 'BACTERIO' elif 'lettre' in doc_name or 'sortie' in doc_name: doc_type = 'LETTRE' elif 'consultation' in doc_name or 'anesth' in doc_name: doc_type = 'CONSULTATION' else: doc_type = 'AUTRE' print(f"\n[{i}/{len(pdf_files)}] {pdf_path.parent.name}/{pdf_path.name}") print(f" Type: {doc_type}") try: # Anonymiser doc_start = time.time() result = process_pdf( pdf_path, output_dir, make_vector_redaction=False, also_make_raster_burn=False, # Pas de PDF pour aller plus vite config_path=Path("config/dictionnaires.yml") ) doc_time = time.time() - doc_start # Lire l'audit audit_path = Path(result["audit"]) detections = [] with open(audit_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): detections.append(json.loads(line)) pii_count = len(detections) # Statistiques stats["processed"] += 1 stats["total_pii"] += pii_count stats["total_time"] += doc_time stats["by_doc_type"][doc_type]["count"] += 1 stats["by_doc_type"][doc_type]["pii"] += pii_count stats["by_doc_type"][doc_type]["time"] += doc_time for det in detections: stats["by_type"][det["kind"]] += 1 print(f" ✓ {pii_count} PII détectés en {doc_time:.2f}s") except Exception as e: stats["failed"] += 1 stats["errors"].append({ "file": str(pdf_path), "error": str(e) }) print(f" ✗ Erreur: {e}") total_time = time.time() - start_time # Résumé print("\n" + "=" * 80) print("RÉSUMÉ") print("=" * 80) print(f"\n📊 Documents:") print(f" - Corpus total: {stats['total_documents']}") print(f" - Échantillon: {stats['sample_size']}") print(f" - Traités: {stats['processed']}") print(f" - Échecs: {stats['failed']}") print(f"\n📊 Détections:") print(f" - Total PII: {stats['total_pii']}") print(f" - Moyenne par document: {stats['total_pii'] / stats['processed']:.1f}") print(f"\n📊 Performances:") print(f" - Temps total: {total_time:.1f}s ({total_time/60:.1f}min)") print(f" - Temps moyen: {stats['total_time'] / stats['processed']:.2f}s/doc") print(f"\n📊 Répartition par type de PII:") for pii_type, count in sorted(stats["by_type"].items(), key=lambda x: x[1], reverse=True): pct = count / stats['total_pii'] * 100 print(f" - {pii_type:25s}: {count:5d} ({pct:5.1f}%)") print(f"\n📊 Répartition par type de document:") for doc_type, data in sorted(stats["by_doc_type"].items(), key=lambda x: x[1]["count"], reverse=True): avg_pii = data['pii'] / data['count'] if data['count'] > 0 else 0 avg_time = data['time'] / data['count'] if data['count'] > 0 else 0 print(f" - {doc_type:15s}: {data['count']:3d} docs, {avg_pii:5.1f} PII/doc, {avg_time:5.2f}s/doc") if stats["errors"]: print(f"\n⚠️ Erreurs ({len(stats['errors'])}):") for err in stats["errors"][:10]: print(f" - {Path(err['file']).name}: {err['error']}") # Sauvegarder les statistiques stats_file = output_dir / "validation_stats.json" with open(stats_file, 'w', encoding='utf-8') as f: stats_json = { "total_documents": stats["total_documents"], "sample_size": stats["sample_size"], "processed": stats["processed"], "failed": stats["failed"], "total_pii": stats["total_pii"], "total_time": stats["total_time"], "avg_pii_per_doc": stats["total_pii"] / stats["processed"] if stats["processed"] > 0 else 0, "avg_time_per_doc": stats["total_time"] / stats["processed"] if stats["processed"] > 0 else 0, "by_type": dict(stats["by_type"]), "by_doc_type": {k: dict(v) for k, v in stats["by_doc_type"].items()}, "errors": stats["errors"] } json.dump(stats_json, f, indent=2, ensure_ascii=False) print(f"\n📊 Statistiques sauvegardées: {stats_file}") # Vérification des fuites print("\n" + "=" * 80) print("VÉRIFICATION DES FUITES") print("=" * 80) leak_check(output_dir) # Extrapolation au corpus complet print("\n" + "=" * 80) print("EXTRAPOLATION AU CORPUS COMPLET") print("=" * 80) if stats["processed"] > 0: total_estimated_pii = int(stats["total_pii"] / stats["processed"] * stats["total_documents"]) total_estimated_time = stats["total_time"] / stats["processed"] * stats["total_documents"] print(f"\n📊 Estimations pour les {stats['total_documents']} documents:") print(f" - PII total estimé: {total_estimated_pii:,}") print(f" - Temps total estimé: {total_estimated_time/60:.1f} minutes ({total_estimated_time/3600:.1f} heures)") print(f" - Moyenne: {stats['total_pii'] / stats['processed']:.1f} PII/doc, {stats['total_time'] / stats['processed']:.2f}s/doc") print("\n" + "=" * 80) return 0 def leak_check(output_dir: Path): """Vérifie les fuites dans les textes anonymisés.""" print("\n🔍 Recherche de fuites dans les textes anonymisés...") # Patterns à vérifier patterns = { "date_naissance_contexte": re.compile(r"(?:n[ée]+\s+le|DDN)\s*:?\s*\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}", re.IGNORECASE), "chcb": re.compile(r"\bCHCB\b", re.IGNORECASE), } leaks = defaultdict(list) # Vérifier tous les fichiers .pseudonymise.txt txt_files = list(output_dir.glob("*.pseudonymise.txt")) for txt_file in txt_files: with open(txt_file, 'r', encoding='utf-8') as f: content = f.read() for leak_type, pattern in patterns.items(): matches = pattern.findall(content) if matches: leaks[leak_type].extend([ {"file": txt_file.name, "match": m} for m in matches ]) # Afficher les résultats if not leaks: print(" ✅ Aucune fuite détectée!") else: print(f" ⚠️ {sum(len(v) for v in leaks.values())} fuites potentielles détectées:") for leak_type, items in leaks.items(): print(f"\n {leak_type}: {len(items)} occurrences") for item in items[:5]: # Afficher les 5 premières print(f" - {item['file']}: {item['match']}") # Sauvegarder les fuites if leaks: leak_file = output_dir / "leaks_detected.json" with open(leak_file, 'w', encoding='utf-8') as f: json.dump(dict(leaks), f, indent=2, ensure_ascii=False) print(f"\n 📄 Fuites sauvegardées: {leak_file}") if __name__ == "__main__": sys.exit(validate_corpus_sample())