#!/usr/bin/env python3 """ Évaluateur de qualité d'anonymisation. Compare les annotations manuelles (ground truth) avec les détections automatiques pour calculer les métriques de qualité (Précision, Rappel, F1-Score). """ import json from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional import re @dataclass class EvaluationResult: """Résultat d'évaluation pour un document.""" pdf_path: str true_positives: int = 0 false_positives: int = 0 false_negatives: int = 0 precision: float = 0.0 recall: float = 0.0 f1_score: float = 0.0 missed_pii: List[Dict] = field(default_factory=list) # Faux négatifs détaillés false_detections: List[Dict] = field(default_factory=list) # Faux positifs détaillés by_type: Dict[str, Dict] = field(default_factory=dict) # Métriques par type de PII def to_dict(self) -> Dict: """Convertit en dictionnaire.""" return { "pdf_path": self.pdf_path, "true_positives": self.true_positives, "false_positives": self.false_positives, "false_negatives": self.false_negatives, "precision": round(self.precision, 4), "recall": round(self.recall, 4), "f1_score": round(self.f1_score, 4), "missed_pii": self.missed_pii, "false_detections": self.false_detections, "by_type": self.by_type } class QualityEvaluator: """Évaluateur de qualité d'anonymisation.""" # Mapping des types de PII entre annotations et détections TYPE_MAPPING = { # Annotations → Détections possibles "NOM": ["NOM", "NOM_GLOBAL", "PRENOM", "PRENOM_GLOBAL"], "PRENOM": ["PRENOM", "PRENOM_GLOBAL", "NOM", "NOM_GLOBAL"], "TEL": ["TEL", "TEL_GLOBAL"], "EMAIL": ["EMAIL", "EMAIL_GLOBAL"], "ADRESSE": ["ADRESSE", "ADRESSE_GLOBAL"], "CODE_POSTAL": ["CODE_POSTAL", "CODE_POSTAL_GLOBAL"], "VILLE": ["VILLE", "VILLE_GLOBAL"], "NIR": ["NIR", "NIR_GLOBAL"], "IPP": ["IPP", "IPP_GLOBAL"], "NDA": ["NDA", "NDA_GLOBAL"], "RPPS": ["RPPS", "RPPS_GLOBAL"], "FINESS": ["FINESS", "FINESS_GLOBAL"], "OGC": ["OGC", "OGC_GLOBAL"], "ETABLISSEMENT": ["ETAB", "ETAB_GLOBAL", "VLM_ETAB"], "SERVICE": ["SERVICE", "SERVICE_GLOBAL", "VLM_SERVICE"], "DATE": ["DATE", "DATE_GLOBAL"], "DATE_NAISSANCE": ["DATE_NAISSANCE", "DATE_NAISSANCE_GLOBAL"], "AGE": ["AGE", "AGE_GLOBAL"], "NUMERO_PATIENT": ["VLM_NUM_PATIENT", "IPP"], "NUMERO_LOT": ["VLM_NUM_LOT"], "NUMERO_ORDONNANCE": ["VLM_NUM_ORD"], "NUMERO_SEJOUR": ["VLM_NUM_SEJOUR", "NDA"], } def __init__(self, ground_truth_dir: Path): """ Initialise l'évaluateur. Args: ground_truth_dir: Répertoire contenant les annotations manuelles """ self.ground_truth_dir = Path(ground_truth_dir) def normalize_text(self, text: str) -> str: """ Normalise un texte pour la comparaison. Args: text: Texte à normaliser Returns: Texte normalisé """ # Lowercase text = text.lower() # Supprimer les espaces multiples text = re.sub(r'\s+', ' ', text) # Strip text = text.strip() return text def load_annotations(self, pdf_path: Path) -> Optional[Dict]: """ Charge les annotations manuelles d'un document. Args: pdf_path: Chemin vers le PDF Returns: Annotations ou None si non trouvées """ # Chercher dans le répertoire ground_truth configuré annotation_file = self.ground_truth_dir / f"{pdf_path.stem}.json" if not annotation_file.exists(): # Fallback: chercher avec le suffixe .annotations.json annotation_file = self.ground_truth_dir / f"{pdf_path.stem}.annotations.json" if not annotation_file.exists(): return None try: with open(annotation_file, 'r', encoding='utf-8') as f: data = json.load(f) # Convertir le format "pages" en format "annotations" si nécessaire if "pages" in data and "annotations" not in data: annotations = [] for page in data["pages"]: page_num = page["page_number"] for pii_type, texts in page["pii"].items(): for text in texts: annotations.append({ "page": page_num, "type": pii_type, "text": text, "context": "" }) data["annotations"] = annotations return data except Exception as e: print(f"✗ Erreur lors du chargement des annotations {annotation_file}: {e}") return None def load_audit(self, audit_path: Path) -> Optional[List[Dict]]: """ Charge l'audit de détection automatique. Args: audit_path: Chemin vers le fichier .audit.jsonl Returns: Liste des détections ou None si non trouvé """ if not audit_path.exists(): return None try: detections = [] with open(audit_path, 'r', encoding='utf-8') as f: for line in f: if line.strip(): detections.append(json.loads(line)) return detections except Exception as e: print(f"✗ Erreur lors du chargement de l'audit {audit_path}: {e}") return None def types_match(self, ann_type: str, det_type: str) -> bool: """ Vérifie si deux types de PII correspondent. Args: ann_type: Type dans l'annotation det_type: Type dans la détection Returns: True si les types correspondent """ # Mapping direct if ann_type in self.TYPE_MAPPING: return det_type in self.TYPE_MAPPING[ann_type] # Correspondance exacte return ann_type == det_type def compare(self, annotations: List[Dict], detections: List[Dict]) -> Tuple[List, List, List]: """ Compare les annotations avec les détections. Args: annotations: Liste des annotations manuelles detections: Liste des détections automatiques Returns: Tuple (true_positives, false_negatives, false_positives) """ true_positives = [] false_negatives = [] false_positives = [] # Créer des clés de comparaison pour les annotations ann_keys = {} for ann in annotations: page = ann.get("page", 0) pii_type = ann.get("type", "") text = self.normalize_text(ann.get("text", "")) key = (page, text) if key not in ann_keys: ann_keys[key] = [] ann_keys[key].append(ann) # Créer des clés de comparaison pour les détections det_keys = {} for det in detections: page = det.get("page", 0) text = self.normalize_text(det.get("original", "")) key = (page, text) if key not in det_keys: det_keys[key] = [] det_keys[key].append(det) # Trouver les true positives et false negatives matched_det_keys = set() for key, anns in ann_keys.items(): page, text = key if key in det_keys: # Vérifier si au moins une détection correspond au type dets = det_keys[key] matched = False for ann in anns: ann_type = ann.get("type", "") for det in dets: det_type = det.get("kind", "") if self.types_match(ann_type, det_type): true_positives.append({ "page": page, "type": ann_type, "text": ann.get("text", ""), "detected_as": det_type, "context": ann.get("context", "") }) matched = True matched_det_keys.add(key) break if matched: break if not matched: # Détecté mais type incorrect for ann in anns: false_negatives.append({ "page": page, "type": ann.get("type", ""), "text": ann.get("text", ""), "context": ann.get("context", ""), "reason": "type_mismatch", "detected_as": [d.get("kind", "") for d in dets] }) else: # Non détecté for ann in anns: false_negatives.append({ "page": page, "type": ann.get("type", ""), "text": ann.get("text", ""), "context": ann.get("context", ""), "reason": "not_detected" }) # Trouver les false positives for key, dets in det_keys.items(): if key not in matched_det_keys: page, text = key for det in dets: false_positives.append({ "page": page, "type": det.get("kind", ""), "text": det.get("original", ""), "placeholder": det.get("placeholder", "") }) return true_positives, false_negatives, false_positives def calculate_metrics(self, tp: int, fp: int, fn: int) -> Tuple[float, float, float]: """ Calcule les métriques de qualité. Args: tp: True positives fp: False positives fn: False negatives Returns: Tuple (precision, recall, f1_score) """ # Précision precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 # Rappel recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0 # F1-Score f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0 return precision, recall, f1_score def calculate_metrics_by_type(self, tp_list: List[Dict], fn_list: List[Dict], fp_list: List[Dict]) -> Dict[str, Dict]: """ Calcule les métriques par type de PII. Args: tp_list: Liste des true positives fn_list: Liste des false negatives fp_list: Liste des false positives Returns: Dictionnaire des métriques par type """ by_type = {} # Compter par type for tp in tp_list: pii_type = tp["type"] if pii_type not in by_type: by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0} by_type[pii_type]["tp"] += 1 for fn in fn_list: pii_type = fn["type"] if pii_type not in by_type: by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0} by_type[pii_type]["fn"] += 1 for fp in fp_list: pii_type = fp["type"] if pii_type not in by_type: by_type[pii_type] = {"tp": 0, "fp": 0, "fn": 0} by_type[pii_type]["fp"] += 1 # Calculer les métriques for pii_type, counts in by_type.items(): tp = counts["tp"] fp = counts["fp"] fn = counts["fn"] precision, recall, f1 = self.calculate_metrics(tp, fp, fn) counts["precision"] = round(precision, 4) counts["recall"] = round(recall, 4) counts["f1_score"] = round(f1, 4) return by_type def evaluate(self, pdf_path: Path, audit_path: Path) -> Optional[EvaluationResult]: """ Évalue la qualité d'anonymisation d'un document. Args: pdf_path: Chemin vers le PDF original audit_path: Chemin vers le fichier .audit.jsonl Returns: Résultat d'évaluation ou None si erreur """ # Charger les annotations annotations_data = self.load_annotations(pdf_path) if not annotations_data: print(f"✗ Annotations introuvables pour {pdf_path.name}") return None annotations = annotations_data.get("annotations", []) # Charger l'audit detections = self.load_audit(audit_path) if detections is None: print(f"✗ Audit introuvable: {audit_path}") return None # Comparer tp_list, fn_list, fp_list = self.compare(annotations, detections) # Calculer les métriques globales tp = len(tp_list) fp = len(fp_list) fn = len(fn_list) precision, recall, f1_score = self.calculate_metrics(tp, fp, fn) # Calculer les métriques par type by_type = self.calculate_metrics_by_type(tp_list, fn_list, fp_list) # Créer le résultat result = EvaluationResult( pdf_path=str(pdf_path), true_positives=tp, false_positives=fp, false_negatives=fn, precision=precision, recall=recall, f1_score=f1_score, missed_pii=fn_list, false_detections=fp_list, by_type=by_type ) return result def evaluate_batch(self, pdf_list: List[Path], audit_list: List[Path]) -> List[EvaluationResult]: """ Évalue un batch de documents. Args: pdf_list: Liste des PDFs audit_list: Liste des audits Returns: Liste des résultats d'évaluation """ results = [] for pdf_path, audit_path in zip(pdf_list, audit_list): result = self.evaluate(pdf_path, audit_path) if result: results.append(result) return results def generate_report(self, results: List[EvaluationResult]) -> str: """ Génère un rapport texte des résultats. Args: results: Liste des résultats d'évaluation Returns: Rapport texte """ if not results: return "Aucun résultat à afficher." # Calculer les métriques globales total_tp = sum(r.true_positives for r in results) total_fp = sum(r.false_positives for r in results) total_fn = sum(r.false_negatives for r in results) avg_precision = sum(r.precision for r in results) / len(results) avg_recall = sum(r.recall for r in results) / len(results) avg_f1 = sum(r.f1_score for r in results) / len(results) # Générer le rapport report = [] report.append("=" * 80) report.append("RAPPORT D'ÉVALUATION DE LA QUALITÉ D'ANONYMISATION") report.append("=" * 80) report.append("") report.append(f"Documents évalués: {len(results)}") report.append("") report.append("MÉTRIQUES GLOBALES:") report.append(f" True Positives: {total_tp}") report.append(f" False Positives: {total_fp}") report.append(f" False Negatives: {total_fn}") report.append("") report.append(f" Précision moyenne: {avg_precision:.4f} ({avg_precision*100:.2f}%)") report.append(f" Rappel moyen: {avg_recall:.4f} ({avg_recall*100:.2f}%)") report.append(f" F1-Score moyen: {avg_f1:.4f}") report.append("") # Résultats par document report.append("RÉSULTATS PAR DOCUMENT:") report.append("") for result in results: pdf_name = Path(result.pdf_path).name report.append(f" {pdf_name}") report.append(f" Précision: {result.precision:.4f} Rappel: {result.recall:.4f} F1: {result.f1_score:.4f}") report.append(f" TP: {result.true_positives} FP: {result.false_positives} FN: {result.false_negatives}") report.append("") # Faux négatifs critiques critical_fn = [] for result in results: for fn in result.missed_pii: if fn.get("reason") == "not_detected": critical_fn.append((Path(result.pdf_path).name, fn)) if critical_fn: report.append(f"FAUX NÉGATIFS CRITIQUES ({len(critical_fn)}):") report.append("") for pdf_name, fn in critical_fn[:10]: # Limiter à 10 report.append(f" {pdf_name} - Page {fn['page']+1}") report.append(f" Type: {fn['type']}") report.append(f" Texte: {fn['text']}") report.append(f" Contexte: {fn['context'][:80]}...") report.append("") report.append("=" * 80) return "\n".join(report) def export_json(self, results: List[EvaluationResult], output_path: Path): """ Exporte les résultats en JSON. Args: results: Liste des résultats output_path: Chemin du fichier de sortie """ data = { "evaluation_date": Path(__file__).stat().st_mtime, "documents_count": len(results), "results": [r.to_dict() for r in results] } with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"✓ Résultats exportés: {output_path}") if __name__ == "__main__": # Test basique evaluator = QualityEvaluator(Path("tests/ground_truth/pdfs")) # Exemple d'utilisation pdf_path = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.pdf") audit_path = Path("tests/ground_truth/pdfs/001_simple_unknown_BACTERIO_23018396.audit.jsonl") if pdf_path.exists() and audit_path.exists(): result = evaluator.evaluate(pdf_path, audit_path) if result: print(evaluator.generate_report([result]))