Files
anonymisation/tools/root_cause_analysis.py
Domi31tls 92557d4e74 chore(rgpd): replace CHCB/Bayonne/Saint-Denis/Réunion refs in source + configs (D-12)
Anonymise toutes les références à des entités réelles (CHCB, Bayonne, Saint-Denis,
Réunion, etc.) dans le code source, les configurations YAML, les scripts/outils,
et les tests unitaires. Conserve les tests synthétiques (cases) intentionnels.

- profile key chcb_strict → chuxx_strict
- CHCB → CHUXX, Bayonne → Chicago, Saint-Denis → Springfield,
  Réunion → Province Bêta, 64100/97400 → 12345, FINESS → 999999999,
  préfixe tél 05.59.44 → 0X.XX.XX
- renomme tools/test_chcb_leak.py → tools/test_force_term_leak.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-06-02 14:39:21 +02:00

274 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""
Analyse des causes racines de la régression de qualité.
Compare le test dataset (100% qualité) vs production (régression).
"""
import json
import re
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Tuple
def analyze_audit_file(audit_path: Path) -> Dict:
"""Analyse un fichier audit et retourne les statistiques."""
stats = {
"total_pii": 0,
"by_type": defaultdict(int),
"by_page": defaultdict(int),
"global_tokens": [],
"extracted_names": [],
"has_ocr_artifacts": False,
"has_medical_overmasking": False,
"has_medication_masking": False,
"has_date_overmasking": False,
}
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if not line.strip():
continue
entry = json.loads(line)
stats["total_pii"] += 1
pii_type = entry.get("kind", "UNKNOWN")
stats["by_type"][pii_type] += 1
page = entry.get("page", -1)
stats["by_page"][page] += 1
# Collecter les tokens globaux
if page == -1:
stats["global_tokens"].append({
"type": pii_type,
"value": entry.get("original", "")
})
# Détecter NOM_EXTRACTED
if pii_type == "NOM_EXTRACTED":
stats["extracted_names"].append(entry.get("original", ""))
return stats
def analyze_anonymized_text(text_path: Path) -> Dict:
"""Analyse le texte anonymisé pour détecter les problèmes."""
problems = {
"ocr_artifacts": [],
"medical_overmasking": [],
"medication_masking": [],
"date_overmasking": [],
"city_overmasking": [],
}
with open(text_path, 'r', encoding='utf-8') as f:
text = f.read()
# Détecter artefacts OCR (lettres espacées)
ocr_pattern = r'\b[A-Z]\s+[A-Z]\s+[a-z]\s+[a-z]'
for match in re.finditer(ocr_pattern, text):
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
problems["ocr_artifacts"].append({
"text": match.group(0),
"context": text[context_start:context_end]
})
# Détecter sur-masquage médical
medical_patterns = [
(r'Chef de \[MASK\]', "Chef de service"),
(r'Chef de \[ETABLISSEMENT\]', "Chef de Clinique"),
(r'Note \[NOM\]', "Note IDE"),
(r'Avis \[NOM\]', "Avis ORL"),
]
for pattern, expected in medical_patterns:
for match in re.finditer(pattern, text):
context_start = max(0, match.start() - 30)
context_end = min(len(text), match.end() + 30)
problems["medical_overmasking"].append({
"masked": match.group(0),
"expected": expected,
"context": text[context_start:context_end]
})
# Détecter masquage de médicaments
med_pattern = r'\[NOM\]\s+\d+\s*mg'
for match in re.finditer(med_pattern, text):
context_start = max(0, match.start() - 50)
context_end = min(len(text), match.end() + 50)
problems["medication_masking"].append({
"text": match.group(0),
"context": text[context_start:context_end]
})
# Compter les dates masquées
date_count = text.count("[DATE_NAISSANCE]")
date_generic_count = text.count("[DATE]")
problems["date_overmasking"] = {
"date_naissance_count": date_count,
"date_generic_count": date_generic_count,
"total": date_count + date_generic_count
}
# Détecter sur-masquage des villes
city_pattern = r'originaire du \[VILLE\]'
for match in re.finditer(city_pattern, text):
context_start = max(0, match.start() - 30)
context_end = min(len(text), match.end() + 30)
problems["city_overmasking"].append({
"text": match.group(0),
"context": text[context_start:context_end]
})
return problems
def compare_datasets():
"""Compare test dataset vs production."""
test_dir = Path("tests/ground_truth/pdfs/baseline_anonymized")
prod_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHUXX_DocJustificatifs/anonymise")
print("=" * 80)
print("ANALYSE DES CAUSES RACINES - RÉGRESSION DE QUALITÉ")
print("=" * 80)
print()
# Analyser test dataset
print("📊 Analyse TEST DATASET (bonne qualité)...")
test_stats = []
for audit_file in sorted(test_dir.glob("*.audit.jsonl"))[:5]:
stats = analyze_audit_file(audit_file)
test_stats.append(stats)
print(f"{audit_file.name}: {stats['total_pii']} PII")
# Analyser production
print()
print("📊 Analyse PRODUCTION (régression)...")
prod_stats = []
prod_problems = []
for audit_file in sorted(prod_dir.glob("*.audit.jsonl"))[:5]:
stats = analyze_audit_file(audit_file)
prod_stats.append(stats)
print(f"{audit_file.name}: {stats['total_pii']} PII")
# Analyser le texte correspondant
text_file = audit_file.with_suffix('.txt').with_name(
audit_file.name.replace('.audit.jsonl', '.pseudonymise.txt')
)
if text_file.exists():
problems = analyze_anonymized_text(text_file)
prod_problems.append(problems)
# Calculer moyennes
test_avg = sum(s["total_pii"] for s in test_stats) / len(test_stats) if test_stats else 0
prod_avg = sum(s["total_pii"] for s in prod_stats) / len(prod_stats) if prod_stats else 0
print()
print("=" * 80)
print("RÉSULTATS")
print("=" * 80)
print(f" Test dataset: {test_avg:.1f} PII/doc")
print(f" Production: {prod_avg:.1f} PII/doc")
print(f" Différence: +{prod_avg - test_avg:.1f} PII/doc (+{((prod_avg - test_avg) / test_avg * 100):.1f}%)")
print()
# Analyser les problèmes
print("=" * 80)
print("PROBLÈMES DÉTECTÉS EN PRODUCTION")
print("=" * 80)
print()
total_ocr = sum(len(p["ocr_artifacts"]) for p in prod_problems)
total_medical = sum(len(p["medical_overmasking"]) for p in prod_problems)
total_medication = sum(len(p["medication_masking"]) for p in prod_problems)
total_city = sum(len(p["city_overmasking"]) for p in prod_problems)
print(f"1. ⚠️ ARTEFACTS OCR: {total_ocr} détectés")
if total_ocr > 0:
print(" Exemple:", prod_problems[0]["ocr_artifacts"][0]["text"] if prod_problems[0]["ocr_artifacts"] else "N/A")
print()
print(f"2. ⚠️ SUR-MASQUAGE MÉDICAL: {total_medical} détectés")
if total_medical > 0:
for p in prod_problems:
for item in p["medical_overmasking"][:2]:
print(f"{item['masked']} → devrait être '{item['expected']}'")
print()
print(f"3. ⚠️ MÉDICAMENTS MASQUÉS: {total_medication} détectés")
if total_medication > 0:
print(" Exemple:", prod_problems[0]["medication_masking"][0]["text"] if prod_problems[0]["medication_masking"] else "N/A")
print()
print(f"4. ⚠️ DATES SUR-MASQUÉES:")
for i, p in enumerate(prod_problems):
if p["date_overmasking"]["total"] > 0:
print(f" Doc {i+1}: {p['date_overmasking']['total']} dates masquées")
print()
print(f"5. ⚠️ VILLES SUR-MASQUÉES: {total_city} détectés")
print()
# Analyser répartition par type
print("=" * 80)
print("RÉPARTITION PAR TYPE")
print("=" * 80)
print()
test_by_type = defaultdict(int)
for s in test_stats:
for t, count in s["by_type"].items():
test_by_type[t] += count
prod_by_type = defaultdict(int)
for s in prod_stats:
for t, count in s["by_type"].items():
prod_by_type[t] += count
all_types = sorted(set(list(test_by_type.keys()) + list(prod_by_type.keys())))
print(f"{'Type':<25} {'Test':<10} {'Prod':<10} {'Diff':<10}")
print("-" * 60)
for pii_type in all_types:
test_count = test_by_type[pii_type]
prod_count = prod_by_type[pii_type]
diff = prod_count - test_count
diff_str = f"+{diff}" if diff > 0 else str(diff)
print(f"{pii_type:<25} {test_count:<10} {prod_count:<10} {diff_str:<10}")
print()
# Causes racines
print("=" * 80)
print("CAUSES RACINES IDENTIFIÉES")
print("=" * 80)
print()
print("1. ❌ QUALITÉ D'EXTRACTION OCR")
print(" Cause: Paramètres docTR non optimaux")
print(" Impact: Texte fragmenté, illisible")
print(" Solution: Optimiser résolution, post-traitement")
print()
print("2. ❌ SUR-DÉTECTION DE NOMS")
print(" Cause: Termes médicaux détectés comme noms propres")
print(" Impact: Faux positifs massifs")
print(" Solution: Enrichir stopwords médicaux")
print()
print("3. ❌ MASQUAGE DE MÉDICAMENTS")
print(" Cause: NER détecte médicaments comme noms")
print(" Impact: Perte d'information thérapeutique")
print(" Solution: Whitelist médicaments")
print()
print("4. ❌ SUR-MASQUAGE TERMES MÉDICAUX")
print(" Cause: Regex trop larges (RE_SERVICE, RE_ETABLISSEMENT)")
print(" Impact: Perte de contexte médical")
print(" Solution: Raffiner regex, whitelist termes")
print()
print("5. ⚠️ DIFFÉRENCE TEST vs PRODUCTION")
print(" Cause: Documents production plus complexes (scannés, multi-pages)")
print(" Impact: Plus de répétitions, plus d'artefacts OCR")
print(" Solution: Dédoplication intelligente, meilleur OCR")
print()
if __name__ == "__main__":
compare_datasets()