feat: Phase 1 - Système d'évaluation de la qualité

- Sélection et copie de 27 documents représentatifs (10 simples, 12 moyens, 5 complexes)
- Outil d'annotation CLI complet (tools/annotation_tool.py)
- Guide d'annotation détaillé (docs/annotation_guide.md)
- Évaluateur de qualité (evaluation/quality_evaluator.py)
  * Calcul Précision, Rappel, F1-Score
  * Identification faux positifs/négatifs
  * Métriques par type de PII
  * Export JSON et rapports texte
- Scanner de fuite (evaluation/leak_scanner.py)
  * Détection PII résiduels (CRITIQUE)
  * Détection nouveaux PII (HAUTE)
  * Scan métadonnées PDF (MOYENNE)
- Benchmark de performance (evaluation/benchmark.py)
  * Mesure temps de traitement
  * Mesure CPU/RAM
  * Export JSON/CSV
- Tests unitaires complets pour tous les composants
- Documentation complète du module d'évaluation

Tâches complétées:
- 1.1.1 Sélection de 27 documents (au lieu de 30)
- 1.1.2 Outil d'annotation CLI
- 1.2.1 Évaluateur de qualité
- 1.2.2 Scanner de fuite
- 1.2.3 Benchmark de performance

Prochaines étapes:
- 1.1.3 Annotation des 27 documents (manuel)
- 1.1.4 Enrichissement stopwords médicaux
- 1.3 Mesure de la baseline
This commit is contained in:
2026-03-02 10:07:41 +01:00
parent 0067738df6
commit 340348b820
86 changed files with 35587 additions and 40 deletions

262
evaluation/README.md Normal file
View File

@@ -0,0 +1,262 @@
# Module d'Évaluation de la Qualité d'Anonymisation
Ce module fournit des outils pour évaluer et valider la qualité de l'anonymisation des documents PDF médicaux.
## Composants
### 1. QualityEvaluator
Évalue la qualité d'anonymisation en comparant les annotations manuelles (ground truth) avec les détections automatiques.
**Métriques calculées** :
- Précision (Precision) : TP / (TP + FP)
- Rappel (Recall) : TP / (TP + FN)
- F1-Score : 2 × (Precision × Recall) / (Precision + Recall)
**Usage** :
```python
from evaluation import QualityEvaluator
from pathlib import Path
evaluator = QualityEvaluator(Path("tests/ground_truth/pdfs"))
# Évaluer un document
result = evaluator.evaluate(
pdf_path=Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.pdf"),
audit_path=Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl")
)
print(f"Précision: {result.precision:.4f}")
print(f"Rappel: {result.recall:.4f}")
print(f"F1-Score: {result.f1_score:.4f}")
# Générer un rapport
report = evaluator.generate_report([result])
print(report)
# Exporter en JSON
evaluator.export_json([result], Path("evaluation_results.json"))
```
### 2. LeakScanner
Scanne les documents anonymisés pour détecter les fuites de PII (données personnelles résiduelles).
**Vérifications** :
- PII originaux encore présents (CRITIQUE)
- Nouveaux PII détectés (HAUTE)
- Métadonnées PDF suspectes (MOYENNE)
**Usage** :
```python
from evaluation import LeakScanner
from pathlib import Path
scanner = LeakScanner()
# Scanner un document anonymisé
report = scanner.scan(
anonymized_pdf=Path("output/document.redacted.pdf"),
original_audit=Path("output/document.audit.jsonl")
)
if report.is_safe:
print("✓ Document sûr - Aucune fuite détectée")
else:
print(f"{report.leak_count} fuite(s) détectée(s)")
for leak in report.leaks:
print(f" - {leak['severity']}: {leak['message']}")
# Générer un rapport
report_text = scanner.generate_report(report, Path("document.pdf"))
print(report_text)
# Exporter en JSON
scanner.export_json(report, Path("leak_report.json"))
```
### 3. Benchmark
Mesure les performances du système d'anonymisation (temps, CPU, RAM).
**Métriques collectées** :
- Temps de traitement (total, par page)
- Utilisation CPU (%)
- Utilisation RAM (MB)
- Nombre de PII détectés
**Usage** :
```python
from evaluation import Benchmark
from pathlib import Path
benchmark = Benchmark(Path("tests/ground_truth/pdfs"))
# Définir la fonction d'anonymisation à benchmarker
def anonymize_func(pdf_path):
# Votre code d'anonymisation ici
# Retourner le chemin vers le fichier .audit.jsonl
return pdf_path.parent / f"{pdf_path.stem}.audit.jsonl"
# Benchmarker des documents
pdf_list = list(Path("tests/ground_truth/pdfs").glob("*.pdf"))
results = benchmark.run(pdf_list, anonymize_func)
# Générer un rapport
report = benchmark.generate_report(results)
print(report)
# Exporter en JSON
benchmark.export_json(results, Path("benchmark_results.json"))
# Exporter en CSV
benchmark.export_csv(results, Path("benchmark_results.csv"))
```
## Installation
Dépendances requises :
```bash
pip install pymupdf psutil
```
## Tests
Exécuter les tests unitaires :
```bash
pytest tests/unit/test_quality_evaluator.py -v
pytest tests/unit/test_leak_scanner.py -v
pytest tests/unit/test_benchmark.py -v
```
## Format des Données
### Annotations (ground truth)
Format JSON :
```json
{
"pdf_path": "document.pdf",
"metadata": {
"annotator": "annotator_1",
"annotation_date": "2024-01-15T10:30:00",
"document_type": "compte_rendu",
"page_count": 3,
"difficulty": "medium"
},
"annotations": [
{
"id": "ann_001",
"page": 0,
"type": "NOM",
"text": "DUPONT",
"bbox": null,
"context": "Dr. DUPONT a examiné le patient",
"mandatory": true,
"difficulty": "easy",
"detection_method_expected": ["regex", "ner", "contextual"]
}
],
"medical_terms_to_preserve": [
"Médecin DIM",
"Service de cardiologie"
],
"statistics": {
"total_pii": 1,
"by_type": {
"NOM": 1
}
}
}
```
### Audit (détections)
Format JSONL (une ligne par PII détecté) :
```json
{"page": 0, "kind": "NOM", "original": "DUPONT", "placeholder": "[NOM]"}
{"page": 0, "kind": "TEL", "original": "01 23 45 67 89", "placeholder": "[TEL]"}
```
## Métriques Cibles
Pour garantir la conformité RGPD et la qualité d'anonymisation :
- **Rappel (Recall)** : ≥ 99.5% (maximum 0.5% de PII manqués)
- **Précision (Precision)** : ≥ 97% (maximum 3% de faux positifs)
- **F1-Score** : ≥ 0.98
- **Taux de documents sûrs** : ≥ 98% (documents avec 0 faux négatif)
## Workflow Complet
1. **Annoter les documents** : Utiliser `tools/annotation_tool.py`
2. **Anonymiser les documents** : Utiliser le système d'anonymisation
3. **Évaluer la qualité** : Utiliser `QualityEvaluator`
4. **Scanner les fuites** : Utiliser `LeakScanner`
5. **Benchmarker les performances** : Utiliser `Benchmark`
6. **Analyser les résultats** : Identifier les améliorations nécessaires
## Exemples de Rapports
### Rapport d'Évaluation
```
================================================================================
RAPPORT D'ÉVALUATION DE LA QUALITÉ D'ANONYMISATION
================================================================================
Documents évalués: 27
MÉTRIQUES GLOBALES:
True Positives: 245
False Positives: 8
False Negatives: 2
Précision moyenne: 0.9684 (96.84%)
Rappel moyen: 0.9919 (99.19%)
F1-Score moyen: 0.9800
RÉSULTATS PAR DOCUMENT:
001_simple_unknown_BACTERIO_23018396.pdf
Précision: 1.0000 Rappel: 1.0000 F1: 1.0000
TP: 10 FP: 0 FN: 0
```
### Rapport de Fuite
```
================================================================================
RAPPORT DE FUITE - document.redacted.pdf
================================================================================
✓ DOCUMENT SÛR - Aucune fuite détectée
================================================================================
```
### Rapport de Benchmark
```
================================================================================
RAPPORT DE BENCHMARK - PERFORMANCE D'ANONYMISATION
================================================================================
SYSTÈME:
OS: Linux 6.8.0
CPU: AMD Ryzen 9 9950X
Cœurs: 16 physiques / 32 logiques
RAM: 128.0 GB
Python: 3.12.0
RÉSUMÉ:
Documents: 27
Temps moyen: 8.5s
Temps min/max: 2.1s / 25.3s
CPU moyen: 45.2%
RAM moyenne: 1024.5 MB
PII détectés: 245 (moy: 9.1)
```
## Licence
Ce module fait partie du système d'anonymisation de documents PDF médicaux.

15
evaluation/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
"""
Module d'évaluation de la qualité d'anonymisation.
"""
from .quality_evaluator import QualityEvaluator, EvaluationResult
from .leak_scanner import LeakScanner, LeakReport
from .benchmark import Benchmark, BenchmarkResult
__all__ = [
'QualityEvaluator',
'EvaluationResult',
'LeakScanner',
'LeakReport',
'Benchmark',
'BenchmarkResult',
]

339
evaluation/benchmark.py Normal file
View File

@@ -0,0 +1,339 @@
#!/usr/bin/env python3
"""
Benchmark de performance du système d'anonymisation.
Mesure les temps de traitement, l'utilisation CPU/RAM, et les métriques de qualité.
"""
import json
import time
import psutil
import platform
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Optional
from datetime import datetime
@dataclass
class BenchmarkResult:
"""Résultat de benchmark pour un document."""
pdf_path: str
processing_time_s: float = 0.0
time_per_page_s: float = 0.0
cpu_usage_percent: float = 0.0
ram_usage_mb: float = 0.0
pii_detected: int = 0
quality_metrics: Dict = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convertit en dictionnaire."""
return {
"pdf_path": self.pdf_path,
"processing_time_s": round(self.processing_time_s, 2),
"time_per_page_s": round(self.time_per_page_s, 2),
"cpu_usage_percent": round(self.cpu_usage_percent, 2),
"ram_usage_mb": round(self.ram_usage_mb, 2),
"pii_detected": self.pii_detected,
"quality_metrics": self.quality_metrics
}
class Benchmark:
"""Benchmark de performance."""
def __init__(self, test_data_dir: Path):
"""
Initialise le benchmark.
Args:
test_data_dir: Répertoire contenant les données de test
"""
self.test_data_dir = Path(test_data_dir)
self.process = psutil.Process()
def get_system_info(self) -> Dict:
"""
Récupère les informations système.
Returns:
Dictionnaire des informations système
"""
return {
"os": platform.system(),
"os_version": platform.version(),
"cpu": platform.processor(),
"cpu_count": psutil.cpu_count(logical=False),
"cpu_count_logical": psutil.cpu_count(logical=True),
"ram_gb": round(psutil.virtual_memory().total / (1024**3), 2),
"python_version": platform.python_version()
}
def measure_cpu_ram(self, duration_s: float = 1.0) -> tuple:
"""
Mesure l'utilisation CPU et RAM pendant une durée.
Args:
duration_s: Durée de mesure en secondes
Returns:
Tuple (cpu_percent, ram_mb)
"""
# Mesurer le CPU sur une période
cpu_percent = self.process.cpu_percent(interval=duration_s)
# Mesurer la RAM
ram_mb = self.process.memory_info().rss / (1024 * 1024)
return cpu_percent, ram_mb
def benchmark_document(
self,
pdf_path: Path,
anonymize_func,
page_count: Optional[int] = None
) -> BenchmarkResult:
"""
Benchmark un document.
Args:
pdf_path: Chemin vers le PDF
anonymize_func: Fonction d'anonymisation à benchmarker
page_count: Nombre de pages (optionnel)
Returns:
Résultat du benchmark
"""
# Mesurer le temps de traitement
start_time = time.time()
start_cpu = self.process.cpu_percent()
start_ram = self.process.memory_info().rss / (1024 * 1024)
# Exécuter l'anonymisation
try:
audit_path = anonymize_func(pdf_path)
except Exception as e:
print(f"✗ Erreur lors de l'anonymisation de {pdf_path.name}: {e}")
return BenchmarkResult(pdf_path=str(pdf_path))
# Mesurer après traitement
end_time = time.time()
end_cpu = self.process.cpu_percent()
end_ram = self.process.memory_info().rss / (1024 * 1024)
processing_time = end_time - start_time
cpu_usage = (start_cpu + end_cpu) / 2
ram_usage = end_ram - start_ram
# Compter les PII détectés
pii_count = 0
if audit_path and audit_path.exists():
try:
with open(audit_path, 'r', encoding='utf-8') as f:
pii_count = sum(1 for line in f if line.strip())
except Exception:
pass
# Calculer le temps par page
time_per_page = processing_time / page_count if page_count and page_count > 0 else 0.0
# Créer le résultat
result = BenchmarkResult(
pdf_path=str(pdf_path),
processing_time_s=processing_time,
time_per_page_s=time_per_page,
cpu_usage_percent=cpu_usage,
ram_usage_mb=ram_usage,
pii_detected=pii_count
)
return result
def run(
self,
pdf_list: List[Path],
anonymize_func,
page_counts: Optional[List[int]] = None
) -> List[BenchmarkResult]:
"""
Exécute le benchmark sur une liste de documents.
Args:
pdf_list: Liste des PDFs à benchmarker
anonymize_func: Fonction d'anonymisation
page_counts: Liste des nombres de pages (optionnel)
Returns:
Liste des résultats
"""
results = []
if page_counts is None:
page_counts = [None] * len(pdf_list)
for i, (pdf_path, page_count) in enumerate(zip(pdf_list, page_counts), 1):
print(f"[{i}/{len(pdf_list)}] Benchmark: {pdf_path.name}")
result = self.benchmark_document(pdf_path, anonymize_func, page_count)
results.append(result)
print(f" Temps: {result.processing_time_s:.2f}s "
f"CPU: {result.cpu_usage_percent:.1f}% "
f"RAM: {result.ram_usage_mb:.1f}MB "
f"PII: {result.pii_detected}")
return results
def calculate_summary(self, results: List[BenchmarkResult]) -> Dict:
"""
Calcule les statistiques résumées.
Args:
results: Liste des résultats
Returns:
Dictionnaire des statistiques
"""
if not results:
return {}
processing_times = [r.processing_time_s for r in results]
cpu_usages = [r.cpu_usage_percent for r in results]
ram_usages = [r.ram_usage_mb for r in results]
pii_counts = [r.pii_detected for r in results]
return {
"documents_count": len(results),
"avg_time_per_doc": round(sum(processing_times) / len(processing_times), 2),
"min_time": round(min(processing_times), 2),
"max_time": round(max(processing_times), 2),
"avg_cpu_percent": round(sum(cpu_usages) / len(cpu_usages), 2),
"avg_ram_mb": round(sum(ram_usages) / len(ram_usages), 2),
"total_pii_detected": sum(pii_counts),
"avg_pii_per_doc": round(sum(pii_counts) / len(pii_counts), 2)
}
def generate_report(self, results: List[BenchmarkResult]) -> str:
"""
Génère un rapport texte.
Args:
results: Liste des résultats
Returns:
Rapport texte
"""
if not results:
return "Aucun résultat à afficher."
summary = self.calculate_summary(results)
system_info = self.get_system_info()
lines = []
lines.append("=" * 80)
lines.append("RAPPORT DE BENCHMARK - PERFORMANCE D'ANONYMISATION")
lines.append("=" * 80)
lines.append("")
# Informations système
lines.append("SYSTÈME:")
lines.append(f" OS: {system_info['os']} {system_info['os_version']}")
lines.append(f" CPU: {system_info['cpu']}")
lines.append(f" Cœurs: {system_info['cpu_count']} physiques / {system_info['cpu_count_logical']} logiques")
lines.append(f" RAM: {system_info['ram_gb']} GB")
lines.append(f" Python: {system_info['python_version']}")
lines.append("")
# Résumé
lines.append("RÉSUMÉ:")
lines.append(f" Documents: {summary['documents_count']}")
lines.append(f" Temps moyen: {summary['avg_time_per_doc']}s")
lines.append(f" Temps min/max: {summary['min_time']}s / {summary['max_time']}s")
lines.append(f" CPU moyen: {summary['avg_cpu_percent']}%")
lines.append(f" RAM moyenne: {summary['avg_ram_mb']} MB")
lines.append(f" PII détectés: {summary['total_pii_detected']} (moy: {summary['avg_pii_per_doc']})")
lines.append("")
# Détails par document
lines.append("DÉTAILS PAR DOCUMENT:")
lines.append("")
for result in results:
pdf_name = Path(result.pdf_path).name
lines.append(f" {pdf_name}")
lines.append(f" Temps: {result.processing_time_s:.2f}s "
f"CPU: {result.cpu_usage_percent:.1f}% "
f"RAM: {result.ram_usage_mb:.1f}MB "
f"PII: {result.pii_detected}")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def export_json(self, results: List[BenchmarkResult], output_path: Path):
"""
Exporte les résultats en JSON.
Args:
results: Liste des résultats
output_path: Chemin du fichier de sortie
"""
data = {
"benchmark_date": datetime.now().isoformat(),
"system_info": self.get_system_info(),
"results": [r.to_dict() for r in results],
"summary": self.calculate_summary(results)
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"✓ Résultats exportés: {output_path}")
def export_csv(self, results: List[BenchmarkResult], output_path: Path):
"""
Exporte les résultats en CSV.
Args:
results: Liste des résultats
output_path: Chemin du fichier de sortie
"""
import csv
with open(output_path, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
# En-tête
writer.writerow([
"pdf_path",
"processing_time_s",
"time_per_page_s",
"cpu_usage_percent",
"ram_usage_mb",
"pii_detected"
])
# Données
for result in results:
writer.writerow([
result.pdf_path,
result.processing_time_s,
result.time_per_page_s,
result.cpu_usage_percent,
result.ram_usage_mb,
result.pii_detected
])
print(f"✓ Résultats exportés: {output_path}")
if __name__ == "__main__":
# Test basique
benchmark = Benchmark(Path("tests/ground_truth/pdfs"))
# Afficher les informations système
system_info = benchmark.get_system_info()
print("Informations système:")
for key, value in system_info.items():
print(f" {key}: {value}")

309
evaluation/leak_scanner.py Normal file
View File

@@ -0,0 +1,309 @@
#!/usr/bin/env python3
"""
Scanner de fuite de PII.
Vérifie qu'aucun PII ne subsiste dans les documents anonymisés.
"""
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Optional
try:
import pymupdf as fitz
except ImportError:
import fitz
@dataclass
class LeakReport:
"""Rapport de fuite de PII."""
is_safe: bool = True
leak_count: int = 0
leaks: List[Dict] = field(default_factory=list)
severity_counts: Dict[str, int] = field(default_factory=dict)
def to_dict(self) -> Dict:
"""Convertit en dictionnaire."""
return {
"is_safe": self.is_safe,
"leak_count": self.leak_count,
"leaks": self.leaks,
"severity_counts": self.severity_counts
}
class LeakScanner:
"""Scanner de fuite de PII dans les documents anonymisés."""
# Regex pour détecter les PII
REGEX_PATTERNS = {
"EMAIL": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
"TEL": re.compile(r'(?<!\d)(?:\+33|0033|0)[1-9](?:[\s.\-]?\d){8}(?!\d)'),
"NIR": re.compile(r'\b[12]\s?\d{2}\s?\d{2}\s?\d{2}\s?\d{3}\s?\d{3}\s?\d{2}\b'),
"IBAN": re.compile(r'\b[A-Z]{2}\d{2}[\s]?(?:\d{4}[\s]?){4,7}\d{1,4}\b'),
"CODE_POSTAL": re.compile(r'\b\d{5}\b'),
"IPP": re.compile(r'\b\d{8,10}\b'),
}
def __init__(self):
"""Initialise le scanner."""
pass
def extract_text_from_pdf(self, pdf_path: Path) -> str:
"""
Extrait le texte d'un PDF.
Args:
pdf_path: Chemin vers le PDF
Returns:
Texte extrait
"""
try:
doc = fitz.open(pdf_path)
text = ""
for page in doc:
text += page.get_text()
doc.close()
return text
except Exception as e:
print(f"✗ Erreur lors de l'extraction du texte de {pdf_path}: {e}")
return ""
def load_original_pii(self, audit_path: Path) -> List[Dict]:
"""
Charge les PII originaux depuis l'audit.
Args:
audit_path: Chemin vers le fichier .audit.jsonl
Returns:
Liste des PII originaux
"""
if not audit_path.exists():
return []
try:
pii_list = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
pii = json.loads(line)
pii_list.append(pii)
return pii_list
except Exception as e:
print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}")
return []
def scan_text(self, text: str, original_pii: List[Dict]) -> List[Dict]:
"""
Scanne le texte pour détecter les fuites de PII.
Args:
text: Texte à scanner
original_pii: Liste des PII originaux
Returns:
Liste des fuites détectées
"""
leaks = []
# 1. Vérifier que les PII originaux ne sont plus présents
for pii in original_pii:
original_text = pii.get("original", "")
if not original_text:
continue
# Recherche insensible à la casse
if re.search(re.escape(original_text), text, re.IGNORECASE):
leaks.append({
"type": "original_pii_present",
"severity": "CRITIQUE",
"pii_type": pii.get("kind", "UNKNOWN"),
"text": original_text,
"message": f"PII original encore présent: {original_text}"
})
# 2. Détecter de nouveaux PII non masqués
for pii_type, pattern in self.REGEX_PATTERNS.items():
matches = pattern.finditer(text)
for match in matches:
matched_text = match.group()
# Vérifier si ce PII était dans l'audit original
is_known = any(
pii.get("original", "").lower() == matched_text.lower()
for pii in original_pii
)
if not is_known:
leaks.append({
"type": "new_pii_detected",
"severity": "HAUTE",
"pii_type": pii_type,
"text": matched_text,
"message": f"Nouveau PII détecté: {pii_type} = {matched_text}"
})
return leaks
def scan_metadata(self, pdf_path: Path) -> List[Dict]:
"""
Scanne les métadonnées du PDF.
Args:
pdf_path: Chemin vers le PDF
Returns:
Liste des fuites dans les métadonnées
"""
leaks = []
try:
doc = fitz.open(pdf_path)
metadata = doc.metadata
doc.close()
# Champs à vérifier
suspicious_fields = ["author", "creator", "producer", "subject", "title"]
for field in suspicious_fields:
value = metadata.get(field, "")
if value and value.strip():
# Vérifier si le champ contient des PII potentiels
# (noms, emails, etc.)
if "@" in value:
leaks.append({
"type": "metadata_leak",
"severity": "MOYENNE",
"field": field,
"text": value,
"message": f"Métadonnée suspecte ({field}): {value}"
})
elif any(c.isalpha() for c in value):
# Contient des lettres (potentiellement un nom)
leaks.append({
"type": "metadata_leak",
"severity": "MOYENNE",
"field": field,
"text": value,
"message": f"Métadonnée suspecte ({field}): {value}"
})
except Exception as e:
print(f"✗ Erreur lors du scan des métadonnées de {pdf_path}: {e}")
return leaks
def scan(self, anonymized_pdf: Path, original_audit: Path) -> LeakReport:
"""
Scanne un document anonymisé pour détecter les fuites.
Args:
anonymized_pdf: Chemin vers le PDF anonymisé
original_audit: Chemin vers l'audit original
Returns:
Rapport de fuite
"""
# Extraire le texte
text = self.extract_text_from_pdf(anonymized_pdf)
# Charger les PII originaux
original_pii = self.load_original_pii(original_audit)
# Scanner le texte
text_leaks = self.scan_text(text, original_pii)
# Scanner les métadonnées
metadata_leaks = self.scan_metadata(anonymized_pdf)
# Combiner les fuites
all_leaks = text_leaks + metadata_leaks
# Compter par sévérité
severity_counts = {}
for leak in all_leaks:
severity = leak.get("severity", "UNKNOWN")
severity_counts[severity] = severity_counts.get(severity, 0) + 1
# Créer le rapport
report = LeakReport(
is_safe=len(all_leaks) == 0,
leak_count=len(all_leaks),
leaks=all_leaks,
severity_counts=severity_counts
)
return report
def generate_report(self, report: LeakReport, pdf_path: Path) -> str:
"""
Génère un rapport texte.
Args:
report: Rapport de fuite
pdf_path: Chemin du PDF
Returns:
Rapport texte
"""
lines = []
lines.append("=" * 80)
lines.append(f"RAPPORT DE FUITE - {pdf_path.name}")
lines.append("=" * 80)
lines.append("")
if report.is_safe:
lines.append("✓ DOCUMENT SÛR - Aucune fuite détectée")
else:
lines.append(f"✗ DOCUMENT NON SÛR - {report.leak_count} fuite(s) détectée(s)")
lines.append("")
# Par sévérité
lines.append("FUITES PAR SÉVÉRITÉ:")
for severity, count in sorted(report.severity_counts.items()):
lines.append(f" {severity}: {count}")
lines.append("")
# Détails des fuites
lines.append("DÉTAILS DES FUITES:")
for i, leak in enumerate(report.leaks, 1):
lines.append(f"\n [{i}] {leak['severity']} - {leak['type']}")
lines.append(f" Type PII: {leak.get('pii_type', 'N/A')}")
lines.append(f" Texte: {leak.get('text', 'N/A')}")
lines.append(f" Message: {leak.get('message', 'N/A')}")
lines.append("")
lines.append("=" * 80)
return "\n".join(lines)
def export_json(self, report: LeakReport, output_path: Path):
"""
Exporte le rapport en JSON.
Args:
report: Rapport de fuite
output_path: Chemin du fichier de sortie
"""
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report.to_dict(), f, indent=2, ensure_ascii=False)
print(f"✓ Rapport exporté: {output_path}")
if __name__ == "__main__":
# Test basique
scanner = LeakScanner()
# Exemple d'utilisation
anonymized_pdf = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.redacted.pdf")
original_audit = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl")
if anonymized_pdf.exists() and original_audit.exists():
report = scanner.scan(anonymized_pdf, original_audit)
print(scanner.generate_report(report, anonymized_pdf))

View File

@@ -0,0 +1,522 @@
#!/usr/bin/env python3
"""
Évaluateur de qualité d'anonymisation.
Compare les annotations manuelles (ground truth) avec les détections automatiques
pour calculer les métriques de qualité (Précision, Rappel, F1-Score).
"""
import json
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional
import re
@dataclass
class EvaluationResult:
"""Résultat d'évaluation pour un document."""
pdf_path: str
true_positives: int = 0
false_positives: int = 0
false_negatives: int = 0
precision: float = 0.0
recall: float = 0.0
f1_score: float = 0.0
missed_pii: List[Dict] = field(default_factory=list) # Faux négatifs détaillés
false_detections: List[Dict] = field(default_factory=list) # Faux positifs détaillés
by_type: Dict[str, Dict] = field(default_factory=dict) # Métriques par type de PII
def to_dict(self) -> Dict:
"""Convertit en dictionnaire."""
return {
"pdf_path": self.pdf_path,
"true_positives": self.true_positives,
"false_positives": self.false_positives,
"false_negatives": self.false_negatives,
"precision": round(self.precision, 4),
"recall": round(self.recall, 4),
"f1_score": round(self.f1_score, 4),
"missed_pii": self.missed_pii,
"false_detections": self.false_detections,
"by_type": self.by_type
}
class QualityEvaluator:
"""Évaluateur de qualité d'anonymisation."""
# Mapping des types de PII entre annotations et détections
TYPE_MAPPING = {
# Annotations → Détections possibles
"NOM": ["NOM", "NOM_GLOBAL", "PRENOM", "PRENOM_GLOBAL"],
"PRENOM": ["PRENOM", "PRENOM_GLOBAL", "NOM", "NOM_GLOBAL"],
"TEL": ["TEL", "TEL_GLOBAL"],
"EMAIL": ["EMAIL", "EMAIL_GLOBAL"],
"ADRESSE": ["ADRESSE", "ADRESSE_GLOBAL"],
"CODE_POSTAL": ["CODE_POSTAL", "CODE_POSTAL_GLOBAL"],
"VILLE": ["VILLE", "VILLE_GLOBAL"],
"NIR": ["NIR", "NIR_GLOBAL"],
"IPP": ["IPP", "IPP_GLOBAL"],
"NDA": ["NDA", "NDA_GLOBAL"],
"RPPS": ["RPPS", "RPPS_GLOBAL"],
"FINESS": ["FINESS", "FINESS_GLOBAL"],
"OGC": ["OGC", "OGC_GLOBAL"],
"ETABLISSEMENT": ["ETAB", "ETAB_GLOBAL", "VLM_ETAB"],
"SERVICE": ["SERVICE", "SERVICE_GLOBAL", "VLM_SERVICE"],
"DATE": ["DATE", "DATE_GLOBAL"],
"DATE_NAISSANCE": ["DATE_NAISSANCE", "DATE_NAISSANCE_GLOBAL"],
"AGE": ["AGE", "AGE_GLOBAL"],
"NUMERO_PATIENT": ["VLM_NUM_PATIENT", "IPP"],
"NUMERO_LOT": ["VLM_NUM_LOT"],
"NUMERO_ORDONNANCE": ["VLM_NUM_ORD"],
"NUMERO_SEJOUR": ["VLM_NUM_SEJOUR", "NDA"],
}
def __init__(self, ground_truth_dir: Path):
"""
Initialise l'évaluateur.
Args:
ground_truth_dir: Répertoire contenant les annotations manuelles
"""
self.ground_truth_dir = Path(ground_truth_dir)
def normalize_text(self, text: str) -> str:
"""
Normalise un texte pour la comparaison.
Args:
text: Texte à normaliser
Returns:
Texte normalisé
"""
# Lowercase
text = text.lower()
# Supprimer les espaces multiples
text = re.sub(r'\s+', ' ', text)
# Strip
text = text.strip()
return text
def load_annotations(self, pdf_path: Path) -> Optional[Dict]:
"""
Charge les annotations manuelles d'un document.
Args:
pdf_path: Chemin vers le PDF
Returns:
Annotations ou None si non trouvées
"""
annotation_file = pdf_path.parent / f"{pdf_path.stem}.annotations.json"
if not annotation_file.exists():
return None
try:
with open(annotation_file, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"✗ Erreur lors du chargement des annotations {annotation_file}: {e}")
return None
def load_audit(self, audit_path: Path) -> Optional[List[Dict]]:
"""
Charge l'audit de détection automatique.
Args:
audit_path: Chemin vers le fichier .audit.jsonl
Returns:
Liste des détections ou None si non trouvé
"""
if not audit_path.exists():
return None
try:
detections = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
detections.append(json.loads(line))
return detections
except Exception as e:
print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}")
return None
def types_match(self, ann_type: str, det_type: str) -> bool:
"""
Vérifie si deux types de PII correspondent.
Args:
ann_type: Type dans l'annotation
det_type: Type dans la détection
Returns:
True si les types correspondent
"""
# Mapping direct
if ann_type in self.TYPE_MAPPING:
return det_type in self.TYPE_MAPPING[ann_type]
# Correspondance exacte
return ann_type == det_type
def compare(self, annotations: List[Dict], detections: List[Dict]) -> Tuple[List, List, List]:
"""
Compare les annotations avec les détections.
Args:
annotations: Liste des annotations manuelles
detections: Liste des détections automatiques
Returns:
Tuple (true_positives, false_negatives, false_positives)
"""
true_positives = []
false_negatives = []
false_positives = []
# Créer des clés de comparaison pour les annotations
ann_keys = {}
for ann in annotations:
page = ann.get("page", 0)
pii_type = ann.get("type", "")
text = self.normalize_text(ann.get("text", ""))
key = (page, text)
if key not in ann_keys:
ann_keys[key] = []
ann_keys[key].append(ann)
# Créer des clés de comparaison pour les détections
det_keys = {}
for det in detections:
page = det.get("page", 0)
text = self.normalize_text(det.get("original", ""))
key = (page, text)
if key not in det_keys:
det_keys[key] = []
det_keys[key].append(det)
# Trouver les true positives et false negatives
matched_det_keys = set()
for key, anns in ann_keys.items():
page, text = key
if key in det_keys:
# Vérifier si au moins une détection correspond au type
dets = det_keys[key]
matched = False
for ann in anns:
ann_type = ann.get("type", "")
for det in dets:
det_type = det.get("kind", "")
if self.types_match(ann_type, det_type):
true_positives.append({
"page": page,
"type": ann_type,
"text": ann.get("text", ""),
"detected_as": det_type,
"context": ann.get("context", "")
})
matched = True
matched_det_keys.add(key)
break
if matched:
break
if not matched:
# Détecté mais type incorrect
for ann in anns:
false_negatives.append({
"page": page,
"type": ann.get("type", ""),
"text": ann.get("text", ""),
"context": ann.get("context", ""),
"reason": "type_mismatch",
"detected_as": [d.get("kind", "") for d in dets]
})
else:
# Non détecté
for ann in anns:
false_negatives.append({
"page": page,
"type": ann.get("type", ""),
"text": ann.get("text", ""),
"context": ann.get("context", ""),
"reason": "not_detected"
})
# Trouver les false positives
for key, dets in det_keys.items():
if key not in matched_det_keys:
page, text = key
for det in dets:
false_positives.append({
"page": page,
"type": det.get("kind", ""),
"text": det.get("original", ""),
"placeholder": det.get("placeholder", "")
})
return true_positives, false_negatives, false_positives
def calculate_metrics(self, tp: int, fp: int, fn: int) -> Tuple[float, float, float]:
"""
Calcule les métriques de qualité.
Args:
tp: True positives
fp: False positives
fn: False negatives
Returns:
Tuple (precision, recall, f1_score)
"""
# Précision
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
# Rappel
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
# F1-Score
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
return precision, recall, f1_score
def calculate_metrics_by_type(self, tp_list: List[Dict], fn_list: List[Dict], fp_list: List[Dict]) -> Dict[str, Dict]:
"""
Calcule les métriques par type de PII.
Args:
tp_list: Liste des true positives
fn_list: Liste des false negatives
fp_list: Liste des false positives
Returns:
Dictionnaire des métriques par type
"""
by_type = {}
# Compter par type
for tp in tp_list:
pii_type = tp["type"]
if pii_type not in by_type:
by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0}
by_type[pii_type]["tp"] += 1
for fn in fn_list:
pii_type = fn["type"]
if pii_type not in by_type:
by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0}
by_type[pii_type]["fn"] += 1
for fp in fp_list:
pii_type = fp["type"]
if pii_type not in by_type:
by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0}
by_type[pii_type]["fp"] += 1
# Calculer les métriques
for pii_type, counts in by_type.items():
tp = counts["tp"]
fp = counts["fp"]
fn = counts["fn"]
precision, recall, f1 = self.calculate_metrics(tp, fp, fn)
counts["precision"] = round(precision, 4)
counts["recall"] = round(recall, 4)
counts["f1_score"] = round(f1, 4)
return by_type
def evaluate(self, pdf_path: Path, audit_path: Path) -> Optional[EvaluationResult]:
"""
Évalue la qualité d'anonymisation d'un document.
Args:
pdf_path: Chemin vers le PDF original
audit_path: Chemin vers le fichier .audit.jsonl
Returns:
Résultat d'évaluation ou None si erreur
"""
# Charger les annotations
annotations_data = self.load_annotations(pdf_path)
if not annotations_data:
print(f"✗ Annotations introuvables pour {pdf_path.name}")
return None
annotations = annotations_data.get("annotations", [])
# Charger l'audit
detections = self.load_audit(audit_path)
if detections is None:
print(f"✗ Audit introuvable: {audit_path}")
return None
# Comparer
tp_list, fn_list, fp_list = self.compare(annotations, detections)
# Calculer les métriques globales
tp = len(tp_list)
fp = len(fp_list)
fn = len(fn_list)
precision, recall, f1_score = self.calculate_metrics(tp, fp, fn)
# Calculer les métriques par type
by_type = self.calculate_metrics_by_type(tp_list, fn_list, fp_list)
# Créer le résultat
result = EvaluationResult(
pdf_path=str(pdf_path),
true_positives=tp,
false_positives=fp,
false_negatives=fn,
precision=precision,
recall=recall,
f1_score=f1_score,
missed_pii=fn_list,
false_detections=fp_list,
by_type=by_type
)
return result
def evaluate_batch(self, pdf_list: List[Path], audit_list: List[Path]) -> List[EvaluationResult]:
"""
Évalue un batch de documents.
Args:
pdf_list: Liste des PDFs
audit_list: Liste des audits
Returns:
Liste des résultats d'évaluation
"""
results = []
for pdf_path, audit_path in zip(pdf_list, audit_list):
result = self.evaluate(pdf_path, audit_path)
if result:
results.append(result)
return results
def generate_report(self, results: List[EvaluationResult]) -> str:
"""
Génère un rapport texte des résultats.
Args:
results: Liste des résultats d'évaluation
Returns:
Rapport texte
"""
if not results:
return "Aucun résultat à afficher."
# Calculer les métriques globales
total_tp = sum(r.true_positives for r in results)
total_fp = sum(r.false_positives for r in results)
total_fn = sum(r.false_negatives for r in results)
avg_precision = sum(r.precision for r in results) / len(results)
avg_recall = sum(r.recall for r in results) / len(results)
avg_f1 = sum(r.f1_score for r in results) / len(results)
# Générer le rapport
report = []
report.append("=" * 80)
report.append("RAPPORT D'ÉVALUATION DE LA QUALITÉ D'ANONYMISATION")
report.append("=" * 80)
report.append("")
report.append(f"Documents évalués: {len(results)}")
report.append("")
report.append("MÉTRIQUES GLOBALES:")
report.append(f" True Positives: {total_tp}")
report.append(f" False Positives: {total_fp}")
report.append(f" False Negatives: {total_fn}")
report.append("")
report.append(f" Précision moyenne: {avg_precision:.4f} ({avg_precision*100:.2f}%)")
report.append(f" Rappel moyen: {avg_recall:.4f} ({avg_recall*100:.2f}%)")
report.append(f" F1-Score moyen: {avg_f1:.4f}")
report.append("")
# Résultats par document
report.append("RÉSULTATS PAR DOCUMENT:")
report.append("")
for result in results:
pdf_name = Path(result.pdf_path).name
report.append(f" {pdf_name}")
report.append(f" Précision: {result.precision:.4f} Rappel: {result.recall:.4f} F1: {result.f1_score:.4f}")
report.append(f" TP: {result.true_positives} FP: {result.false_positives} FN: {result.false_negatives}")
report.append("")
# Faux négatifs critiques
critical_fn = []
for result in results:
for fn in result.missed_pii:
if fn.get("reason") == "not_detected":
critical_fn.append((Path(result.pdf_path).name, fn))
if critical_fn:
report.append(f"FAUX NÉGATIFS CRITIQUES ({len(critical_fn)}):")
report.append("")
for pdf_name, fn in critical_fn[:10]: # Limiter à 10
report.append(f" {pdf_name} - Page {fn['page']+1}")
report.append(f" Type: {fn['type']}")
report.append(f" Texte: {fn['text']}")
report.append(f" Contexte: {fn['context'][:80]}...")
report.append("")
report.append("=" * 80)
return "\n".join(report)
def export_json(self, results: List[EvaluationResult], output_path: Path):
"""
Exporte les résultats en JSON.
Args:
results: Liste des résultats
output_path: Chemin du fichier de sortie
"""
data = {
"evaluation_date": Path(__file__).stat().st_mtime,
"documents_count": len(results),
"results": [r.to_dict() for r in results]
}
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
print(f"✓ Résultats exportés: {output_path}")
if __name__ == "__main__":
# Test basique
evaluator = QualityEvaluator(Path("tests/ground_truth/pdfs"))
# Exemple d'utilisation
pdf_path = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.pdf")
audit_path = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl")
if pdf_path.exists() and audit_path.exists():
result = evaluator.evaluate(pdf_path, audit_path)
if result:
print(evaluator.generate_report([result]))