#!/usr/bin/env python3 """ Analyse des causes racines de la régression de qualité. Compare le test dataset (100% qualité) vs production (régression). """ import json import re from pathlib import Path from collections import defaultdict from typing import Dict, List, Tuple def analyze_audit_file(audit_path: Path) -> Dict: """Analyse un fichier audit et retourne les statistiques.""" stats = { "total_pii": 0, "by_type": defaultdict(int), "by_page": defaultdict(int), "global_tokens": [], "extracted_names": [], "has_ocr_artifacts": False, "has_medical_overmasking": False, "has_medication_masking": False, "has_date_overmasking": False, } with open(audit_path, 'r', encoding='utf-8') as f: for line in f: if not line.strip(): continue entry = json.loads(line) stats["total_pii"] += 1 pii_type = entry.get("kind", "UNKNOWN") stats["by_type"][pii_type] += 1 page = entry.get("page", -1) stats["by_page"][page] += 1 # Collecter les tokens globaux if page == -1: stats["global_tokens"].append({ "type": pii_type, "value": entry.get("original", "") }) # Détecter NOM_EXTRACTED if pii_type == "NOM_EXTRACTED": stats["extracted_names"].append(entry.get("original", "")) return stats def analyze_anonymized_text(text_path: Path) -> Dict: """Analyse le texte anonymisé pour détecter les problèmes.""" problems = { "ocr_artifacts": [], "medical_overmasking": [], "medication_masking": [], "date_overmasking": [], "city_overmasking": [], } with open(text_path, 'r', encoding='utf-8') as f: text = f.read() # Détecter artefacts OCR (lettres espacées) ocr_pattern = r'\b[A-Z]\s+[A-Z]\s+[a-z]\s+[a-z]' for match in re.finditer(ocr_pattern, text): context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) problems["ocr_artifacts"].append({ "text": match.group(0), "context": text[context_start:context_end] }) # Détecter sur-masquage médical medical_patterns = [ (r'Chef de \[MASK\]', "Chef de service"), (r'Chef de \[ETABLISSEMENT\]', "Chef de Clinique"), (r'Note \[NOM\]', "Note IDE"), (r'Avis \[NOM\]', "Avis ORL"), ] for pattern, expected in medical_patterns: for match in re.finditer(pattern, text): context_start = max(0, match.start() - 30) context_end = min(len(text), match.end() + 30) problems["medical_overmasking"].append({ "masked": match.group(0), "expected": expected, "context": text[context_start:context_end] }) # Détecter masquage de médicaments med_pattern = r'\[NOM\]\s+\d+\s*mg' for match in re.finditer(med_pattern, text): context_start = max(0, match.start() - 50) context_end = min(len(text), match.end() + 50) problems["medication_masking"].append({ "text": match.group(0), "context": text[context_start:context_end] }) # Compter les dates masquées date_count = text.count("[DATE_NAISSANCE]") date_generic_count = text.count("[DATE]") problems["date_overmasking"] = { "date_naissance_count": date_count, "date_generic_count": date_generic_count, "total": date_count + date_generic_count } # Détecter sur-masquage des villes city_pattern = r'originaire du \[VILLE\]' for match in re.finditer(city_pattern, text): context_start = max(0, match.start() - 30) context_end = min(len(text), match.end() + 30) problems["city_overmasking"].append({ "text": match.group(0), "context": text[context_start:context_end] }) return problems def compare_datasets(): """Compare test dataset vs production.""" test_dir = Path("tests/ground_truth/pdfs/baseline_anonymized") prod_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs/anonymise") print("=" * 80) print("ANALYSE DES CAUSES RACINES - RÉGRESSION DE QUALITÉ") print("=" * 80) print() # Analyser test dataset print("📊 Analyse TEST DATASET (bonne qualité)...") test_stats = [] for audit_file in sorted(test_dir.glob("*.audit.jsonl"))[:5]: stats = analyze_audit_file(audit_file) test_stats.append(stats) print(f" • {audit_file.name}: {stats['total_pii']} PII") # Analyser production print() print("📊 Analyse PRODUCTION (régression)...") prod_stats = [] prod_problems = [] for audit_file in sorted(prod_dir.glob("*.audit.jsonl"))[:5]: stats = analyze_audit_file(audit_file) prod_stats.append(stats) print(f" • {audit_file.name}: {stats['total_pii']} PII") # Analyser le texte correspondant text_file = audit_file.with_suffix('.txt').with_name( audit_file.name.replace('.audit.jsonl', '.pseudonymise.txt') ) if text_file.exists(): problems = analyze_anonymized_text(text_file) prod_problems.append(problems) # Calculer moyennes test_avg = sum(s["total_pii"] for s in test_stats) / len(test_stats) if test_stats else 0 prod_avg = sum(s["total_pii"] for s in prod_stats) / len(prod_stats) if prod_stats else 0 print() print("=" * 80) print("RÉSULTATS") print("=" * 80) print(f" Test dataset: {test_avg:.1f} PII/doc") print(f" Production: {prod_avg:.1f} PII/doc") print(f" Différence: +{prod_avg - test_avg:.1f} PII/doc (+{((prod_avg - test_avg) / test_avg * 100):.1f}%)") print() # Analyser les problèmes print("=" * 80) print("PROBLÈMES DÉTECTÉS EN PRODUCTION") print("=" * 80) print() total_ocr = sum(len(p["ocr_artifacts"]) for p in prod_problems) total_medical = sum(len(p["medical_overmasking"]) for p in prod_problems) total_medication = sum(len(p["medication_masking"]) for p in prod_problems) total_city = sum(len(p["city_overmasking"]) for p in prod_problems) print(f"1. ⚠️ ARTEFACTS OCR: {total_ocr} détectés") if total_ocr > 0: print(" Exemple:", prod_problems[0]["ocr_artifacts"][0]["text"] if prod_problems[0]["ocr_artifacts"] else "N/A") print() print(f"2. ⚠️ SUR-MASQUAGE MÉDICAL: {total_medical} détectés") if total_medical > 0: for p in prod_problems: for item in p["medical_overmasking"][:2]: print(f" • {item['masked']} → devrait être '{item['expected']}'") print() print(f"3. ⚠️ MÉDICAMENTS MASQUÉS: {total_medication} détectés") if total_medication > 0: print(" Exemple:", prod_problems[0]["medication_masking"][0]["text"] if prod_problems[0]["medication_masking"] else "N/A") print() print(f"4. ⚠️ DATES SUR-MASQUÉES:") for i, p in enumerate(prod_problems): if p["date_overmasking"]["total"] > 0: print(f" Doc {i+1}: {p['date_overmasking']['total']} dates masquées") print() print(f"5. ⚠️ VILLES SUR-MASQUÉES: {total_city} détectés") print() # Analyser répartition par type print("=" * 80) print("RÉPARTITION PAR TYPE") print("=" * 80) print() test_by_type = defaultdict(int) for s in test_stats: for t, count in s["by_type"].items(): test_by_type[t] += count prod_by_type = defaultdict(int) for s in prod_stats: for t, count in s["by_type"].items(): prod_by_type[t] += count all_types = sorted(set(list(test_by_type.keys()) + list(prod_by_type.keys()))) print(f"{'Type':<25} {'Test':<10} {'Prod':<10} {'Diff':<10}") print("-" * 60) for pii_type in all_types: test_count = test_by_type[pii_type] prod_count = prod_by_type[pii_type] diff = prod_count - test_count diff_str = f"+{diff}" if diff > 0 else str(diff) print(f"{pii_type:<25} {test_count:<10} {prod_count:<10} {diff_str:<10}") print() # Causes racines print("=" * 80) print("CAUSES RACINES IDENTIFIÉES") print("=" * 80) print() print("1. ❌ QUALITÉ D'EXTRACTION OCR") print(" Cause: Paramètres docTR non optimaux") print(" Impact: Texte fragmenté, illisible") print(" Solution: Optimiser résolution, post-traitement") print() print("2. ❌ SUR-DÉTECTION DE NOMS") print(" Cause: Termes médicaux détectés comme noms propres") print(" Impact: Faux positifs massifs") print(" Solution: Enrichir stopwords médicaux") print() print("3. ❌ MASQUAGE DE MÉDICAMENTS") print(" Cause: NER détecte médicaments comme noms") print(" Impact: Perte d'information thérapeutique") print(" Solution: Whitelist médicaments") print() print("4. ❌ SUR-MASQUAGE TERMES MÉDICAUX") print(" Cause: Regex trop larges (RE_SERVICE, RE_ETABLISSEMENT)") print(" Impact: Perte de contexte médical") print(" Solution: Raffiner regex, whitelist termes") print() print("5. ⚠️ DIFFÉRENCE TEST vs PRODUCTION") print(" Cause: Documents production plus complexes (scannés, multi-pages)") print(" Impact: Plus de répétitions, plus d'artefacts OCR") print(" Solution: Dédoplication intelligente, meilleur OCR") print() if __name__ == "__main__": compare_datasets()