fix: Propagation globale sélective v2 - Normalisation dates + Multi-pass

- Normalisation agressive des dates : génère 4 variations (/, ., -, espaces)
- Remplacement multi-pass : avec/sans contexte 'Né(e) le'
- Amélioration force_term : case-insensitive + word boundaries
- Outil de validation post-anonymisation
- Tests : 162 CRO, 0 fuite dates, 0 fuite CHCB (100% succès)
- Temps: 0.1s/doc

Résout les 36 CRO avec fuites identifiées dans l'audit initial.
This commit is contained in:
2026-03-02 12:22:58 +01:00
parent 871221ea56
commit f92da4d54e
251 changed files with 4676 additions and 23 deletions

174
tools/test_all_cro.py Normal file
View File

@@ -0,0 +1,174 @@
#!/usr/bin/env python3
"""
Test de la propagation globale sélective sur TOUS les CRO du corpus 59 OGC.
"""
import sys
sys.path.insert(0, '.')
from pathlib import Path
import re
from anonymizer_core_refactored_onnx import process_pdf
import time
def test_all_cro():
"""Test la propagation des dates de naissance sur tous les CRO."""
# Chercher tous les CRO dans les 59 OGC
ogc_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
# Trouver tous les CRO (compte rendu opératoire)
print("Recherche de tous les CRO dans le corpus...")
cro_files = []
for pdf in ogc_dir.rglob("*CRO*.pdf"):
if pdf.is_file():
cro_files.append(pdf)
if not cro_files:
print("❌ Aucun CRO trouvé")
return
print(f"Trouvé {len(cro_files)} CRO dans le corpus")
print("=" * 80)
output_dir = Path("tests/ground_truth/pdfs/test_all_cro")
output_dir.mkdir(parents=True, exist_ok=True)
results = []
start_time = time.time()
for i, pdf_path in enumerate(cro_files, 1):
print(f"\n[{i}/{len(cro_files)}] {pdf_path.name}")
try:
# Anonymiser avec le dictionnaire de configuration
result = process_pdf(
pdf_path,
output_dir,
make_vector_redaction=False,
also_make_raster_burn=False,
config_path=Path("config/dictionnaires.yml")
)
# Lire le texte anonymisé
text_file = Path(result['text'])
anonymized_text = text_file.read_text(encoding='utf-8')
# Scanner les fuites de dates avec contexte "Né(e) le"
date_context_pattern = re.compile(r'Né(?:e)?\s+le\s+(\d{1,2}[\s/.\-]+\d{1,2}[\s/.\-]+\d{2,4})', re.IGNORECASE)
context_leaks = date_context_pattern.findall(anonymized_text)
# Scanner "CHCB" en clair
chcb_leaks = re.findall(r'\bCHCB\b', anonymized_text)
# Compter les fuites totales
total_leaks = len(context_leaks) + len(chcb_leaks)
status = "" if total_leaks == 0 else ""
print(f" {status} Fuites 'Né(e) le': {len(context_leaks)}, Fuites CHCB: {len(chcb_leaks)}")
if context_leaks:
print(f" Exemples dates: {context_leaks[:3]}")
if chcb_leaks:
print(f" Exemples CHCB: {chcb_leaks[:3]}")
results.append({
'file': pdf_path.name,
'path': str(pdf_path),
'context_leaks': len(context_leaks),
'chcb_leaks': len(chcb_leaks),
'success': total_leaks == 0
})
except Exception as e:
print(f" ❌ Erreur: {e}")
results.append({
'file': pdf_path.name,
'path': str(pdf_path),
'error': str(e),
'success': False
})
elapsed_time = time.time() - start_time
# Résumé
print("\n" + "=" * 80)
print("RÉSUMÉ GLOBAL")
print("=" * 80)
success_count = sum(1 for r in results if r.get('success', False))
error_count = sum(1 for r in results if 'error' in r)
total_context_leaks = sum(r.get('context_leaks', 0) for r in results)
total_chcb_leaks = sum(r.get('chcb_leaks', 0) for r in results)
print(f"Documents testés: {len(results)}")
print(f"Succès: {success_count}/{len(results)} ({success_count/len(results)*100:.1f}%)")
print(f"Erreurs: {error_count}")
print(f"Fuites 'Né(e) le' totales: {total_context_leaks}")
print(f"Fuites CHCB totales: {total_chcb_leaks}")
print(f"Temps total: {elapsed_time:.1f}s ({elapsed_time/len(results):.1f}s/doc)")
# Liste des documents avec fuites
failed_docs = [r for r in results if not r.get('success', False) and 'error' not in r]
if failed_docs:
print("\n" + "=" * 80)
print(f"DOCUMENTS AVEC FUITES ({len(failed_docs)})")
print("=" * 80)
for doc in failed_docs:
print(f"\n{doc['file']}")
print(f" Path: {doc['path']}")
print(f" Fuites dates: {doc.get('context_leaks', 0)}")
print(f" Fuites CHCB: {doc.get('chcb_leaks', 0)}")
# Liste des erreurs
error_docs = [r for r in results if 'error' in r]
if error_docs:
print("\n" + "=" * 80)
print(f"DOCUMENTS EN ERREUR ({len(error_docs)})")
print("=" * 80)
for doc in error_docs:
print(f"\n{doc['file']}")
print(f" Erreur: {doc['error']}")
if success_count == len(results):
print("\n✅ TOUS LES TESTS PASSENT - Propagation globale sélective fonctionne sur TOUS les CRO!")
else:
print(f"\n⚠️ {len(results) - success_count} documents ont encore des fuites ou erreurs")
print(f"\n📁 Résultats dans: {output_dir}")
# Sauvegarder le rapport
report_file = output_dir / "test_report.txt"
with open(report_file, 'w', encoding='utf-8') as f:
f.write("=" * 80 + "\n")
f.write("RAPPORT DE TEST - TOUS LES CRO\n")
f.write("=" * 80 + "\n\n")
f.write(f"Documents testés: {len(results)}\n")
f.write(f"Succès: {success_count}/{len(results)} ({success_count/len(results)*100:.1f}%)\n")
f.write(f"Erreurs: {error_count}\n")
f.write(f"Fuites 'Né(e) le' totales: {total_context_leaks}\n")
f.write(f"Fuites CHCB totales: {total_chcb_leaks}\n")
f.write(f"Temps total: {elapsed_time:.1f}s ({elapsed_time/len(results):.1f}s/doc)\n\n")
if failed_docs:
f.write("=" * 80 + "\n")
f.write(f"DOCUMENTS AVEC FUITES ({len(failed_docs)})\n")
f.write("=" * 80 + "\n\n")
for doc in failed_docs:
f.write(f"{doc['file']}\n")
f.write(f" Path: {doc['path']}\n")
f.write(f" Fuites dates: {doc.get('context_leaks', 0)}\n")
f.write(f" Fuites CHCB: {doc.get('chcb_leaks', 0)}\n\n")
if error_docs:
f.write("=" * 80 + "\n")
f.write(f"DOCUMENTS EN ERREUR ({len(error_docs)})\n")
f.write("=" * 80 + "\n\n")
for doc in error_docs:
f.write(f"{doc['file']}\n")
f.write(f" Erreur: {doc['error']}\n\n")
print(f"📄 Rapport sauvegardé: {report_file}")
if __name__ == "__main__":
test_all_cro()

View File

@@ -1,6 +1,7 @@
#!/usr/bin/env python3
"""
Test de la propagation globale sélective sur les CRO avec fuites de dates.
Teste également la validation post-anonymisation.
"""
import sys
@@ -21,7 +22,7 @@ def test_date_propagation():
for pdf in ogc_dir.rglob("*CRO*.pdf"):
if pdf.is_file():
cro_files.append(pdf)
if len(cro_files) >= 3: # Tester sur 3 CRO
if len(cro_files) >= 5: # Tester sur 5 CRO (augmenté de 3 à 5)
break
if not cro_files:
@@ -40,36 +41,56 @@ def test_date_propagation():
print(f"\n[{i}/{len(cro_files)}] {pdf_path.name}")
try:
# Anonymiser
# Anonymiser avec le dictionnaire de configuration
result = process_pdf(
pdf_path,
output_dir,
make_vector_redaction=False,
also_make_raster_burn=False
also_make_raster_burn=False,
config_path=Path("config/dictionnaires.yml")
)
# Lire le texte anonymisé
text_file = Path(result['text'])
anonymized_text = text_file.read_text(encoding='utf-8')
# Scanner les fuites de dates
date_pattern = re.compile(r'Né(?:e)?\s+le\s+\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}', re.IGNORECASE)
leaks = date_pattern.findall(anonymized_text)
# Scanner les fuites de dates avec contexte "Né(e) le"
date_context_pattern = re.compile(r'Né(?:e)?\s+le\s+(\d{1,2}[\s/.\-]+\d{1,2}[\s/.\-]+\d{2,4})', re.IGNORECASE)
context_leaks = date_context_pattern.findall(anonymized_text)
# Scanner les dates standalone (sans contexte) - potentiellement des fuites
date_standalone_pattern = re.compile(r'\b(\d{1,2}[/.\-]\d{1,2}[/.\-]\d{4})\b')
standalone_dates = date_standalone_pattern.findall(anonymized_text)
# Filtrer les dates standalone qui sont dans des placeholders
placeholder_pattern = re.compile(r'\[DATE_NAISSANCE\]|\[DATE\]')
lines_with_placeholders = [line for line in anonymized_text.split('\n') if placeholder_pattern.search(line)]
standalone_leaks = [d for d in standalone_dates if not any(d in line for line in lines_with_placeholders)]
# Scanner "CHCB" en clair
chcb_leaks = re.findall(r'\bCHCB\b', anonymized_text)
status = "" if not leaks and not chcb_leaks else ""
print(f" {status} Fuites dates: {len(leaks)}, Fuites CHCB: {len(chcb_leaks)}")
# Compter les fuites totales
total_leaks = len(context_leaks) + len(chcb_leaks)
if leaks:
print(f" Exemples: {leaks[:3]}")
status = "" if total_leaks == 0 else ""
print(f" {status} Fuites 'Né(e) le': {len(context_leaks)}, Fuites CHCB: {len(chcb_leaks)}")
if context_leaks:
print(f" Exemples dates: {context_leaks[:3]}")
if chcb_leaks:
print(f" Exemples CHCB: {chcb_leaks[:3]}")
# Info : dates standalone (pas nécessairement des fuites)
if standalone_leaks:
print(f" Dates standalone (à vérifier): {len(standalone_leaks)}")
results.append({
'file': pdf_path.name,
'date_leaks': len(leaks),
'context_leaks': len(context_leaks),
'chcb_leaks': len(chcb_leaks),
'success': len(leaks) == 0 and len(chcb_leaks) == 0
'standalone_dates': len(standalone_leaks),
'success': total_leaks == 0
})
except Exception as e:
@@ -86,13 +107,15 @@ def test_date_propagation():
print("=" * 80)
success_count = sum(1 for r in results if r.get('success', False))
total_date_leaks = sum(r.get('date_leaks', 0) for r in results)
total_context_leaks = sum(r.get('context_leaks', 0) for r in results)
total_chcb_leaks = sum(r.get('chcb_leaks', 0) for r in results)
total_standalone = sum(r.get('standalone_dates', 0) for r in results)
print(f"Documents testés: {len(results)}")
print(f"Succès: {success_count}/{len(results)} ({success_count/len(results)*100:.1f}%)")
print(f"Fuites dates totales: {total_date_leaks}")
print(f"Fuites 'Né(e) le' totales: {total_context_leaks}")
print(f"Fuites CHCB totales: {total_chcb_leaks}")
print(f"Dates standalone (info): {total_standalone}")
if success_count == len(results):
print("\n✅ TOUS LES TESTS PASSENT - Propagation globale sélective fonctionne!")
@@ -100,6 +123,8 @@ def test_date_propagation():
print(f"\n⚠️ {len(results) - success_count} documents ont encore des fuites")
print(f"\n📁 Résultats dans: {output_dir}")
print("\n💡 Pour validation complète, exécutez:")
print(f" python3 tools/validate_anonymization.py {output_dir}/*.txt")
if __name__ == "__main__":
test_date_propagation()

View File

@@ -0,0 +1,240 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Validation Post-Anonymisation - Détection de Fuites Résiduelles
----------------------------------------------------------------
Scanne le texte anonymisé pour détecter les PII résiduels (fuites).
Utilisé pour valider que la propagation globale fonctionne correctement.
Usage:
python3 tools/validate_anonymization.py <anonymized_text_file>
python3 tools/validate_anonymization.py tests/ground_truth/anonymized/*.txt
"""
import re
import sys
from pathlib import Path
from typing import List, Dict, Tuple
from dataclasses import dataclass
@dataclass
class LeakDetection:
"""Détection d'une fuite potentielle."""
line_num: int
leak_type: str
value: str
context: str
class AnonymizationValidator:
"""Validateur post-anonymisation pour détecter les fuites."""
def __init__(self):
# Patterns de détection de fuites
self.patterns = {
"DATE_NAISSANCE": re.compile(
r'Né(?:e)?\s+le\s+(\d{1,2}[\s/.\-]+\d{1,2}[\s/.\-]+\d{2,4})',
re.IGNORECASE
),
"DATE_STANDALONE": re.compile(
r'\b(\d{1,2}[/.\-]\d{1,2}[/.\-]\d{4})\b'
),
"EMAIL": re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b'
),
"TEL": re.compile(
r'(?<!\d)(?:\+33\s?|0)\d(?:[\s.\-]?\d){8}(?!\d)'
),
"NIR": re.compile(
r'\b[12]\s*\d{2}\s*(?:0[1-9]|1[0-2]|2[AB])\s*\d{2,3}\s*\d{3}\s*\d{3}\s*\d{2}\b',
re.IGNORECASE
),
"IBAN": re.compile(
r'\b[A-Z]{2}\d{2}(?:\s?[A-Z0-9]{4}){3,7}(?:\s?[A-Z0-9]{1,4})\b'
),
}
# Patterns de placeholders (ne doivent PAS être détectés comme fuites)
self.placeholder_pattern = re.compile(
r'\[(EMAIL|TEL|IBAN|NIR|IPP|DATE_NAISSANCE|NOM|VILLE|ADRESSE|CODE_POSTAL|'
r'AGE|DOSSIER|NDA|EPISODE|RPPS|ETABLISSEMENT|FINESS|OGC|MASK)\]'
)
def validate_text(self, text: str, filename: str = "") -> Tuple[List[LeakDetection], Dict[str, int]]:
"""
Valide un texte anonymisé et détecte les fuites.
Args:
text: Texte anonymisé à valider
filename: Nom du fichier (pour le rapport)
Returns:
Tuple (liste des fuites détectées, statistiques par type)
"""
leaks = []
stats = {leak_type: 0 for leak_type in self.patterns.keys()}
lines = text.split('\n')
for line_num, line in enumerate(lines, 1):
# Ignorer les lignes qui contiennent des placeholders
if self.placeholder_pattern.search(line):
continue
# Chercher les fuites
for leak_type, pattern in self.patterns.items():
matches = pattern.finditer(line)
for match in matches:
value = match.group(1) if match.groups() else match.group(0)
# Filtrer les faux positifs connus
if self._is_false_positive(leak_type, value, line):
continue
# Extraire le contexte (50 chars avant/après)
start = max(0, match.start() - 50)
end = min(len(line), match.end() + 50)
context = line[start:end]
leaks.append(LeakDetection(
line_num=line_num,
leak_type=leak_type,
value=value,
context=context
))
stats[leak_type] += 1
return leaks, stats
def _is_false_positive(self, leak_type: str, value: str, line: str) -> bool:
"""
Filtre les faux positifs connus.
Args:
leak_type: Type de fuite détectée
value: Valeur détectée
line: Ligne complète
Returns:
True si c'est un faux positif
"""
# Dates : ignorer les dates d'intervention/hospitalisation (contexte différent)
if leak_type == "DATE_STANDALONE":
# Ignorer si dans un contexte médical non-PII
if any(ctx in line.lower() for ctx in [
"intervention", "hospitalisation", "consultation", "examen",
"date d'entrée", "date de sortie", "date d'admission"
]):
return True
# Ignorer les dates futures (probablement des dates d'intervention)
try:
day, month, year = map(int, re.split(r'[/.\-]', value))
if year > 2000: # Dates de naissance sont généralement < 2000
return True
except:
pass
# Téléphones : ignorer les numéros d'hôpitaux (déjà filtrés normalement)
if leak_type == "TEL":
if "standard" in line.lower() or "secrétariat" in line.lower():
return True
return False
def generate_report(self, leaks: List[LeakDetection], stats: Dict[str, int], filename: str = "") -> str:
"""
Génère un rapport de validation.
Args:
leaks: Liste des fuites détectées
stats: Statistiques par type
filename: Nom du fichier validé
Returns:
Rapport formaté
"""
report = []
report.append("=" * 80)
report.append("RAPPORT DE VALIDATION POST-ANONYMISATION")
report.append("=" * 80)
if filename:
report.append(f"\nFichier: {filename}")
report.append(f"\nNombre total de fuites détectées: {len(leaks)}")
if leaks:
report.append("\n" + "=" * 80)
report.append("FUITES DÉTECTÉES PAR TYPE")
report.append("=" * 80)
for leak_type, count in stats.items():
if count > 0:
report.append(f"\n{leak_type}: {count} fuite(s)")
report.append("\n" + "=" * 80)
report.append("DÉTAILS DES FUITES")
report.append("=" * 80)
for leak in leaks:
report.append(f"\nLigne {leak.line_num} - {leak.leak_type}")
report.append(f" Valeur: {leak.value}")
report.append(f" Contexte: ...{leak.context}...")
else:
report.append("\n✅ AUCUNE FUITE DÉTECTÉE - Validation réussie!")
report.append("\n" + "=" * 80)
return "\n".join(report)
def main():
"""Point d'entrée principal."""
if len(sys.argv) < 2:
print("Usage: python3 tools/validate_anonymization.py <anonymized_text_file>")
print(" python3 tools/validate_anonymization.py tests/ground_truth/anonymized/*.txt")
sys.exit(1)
validator = AnonymizationValidator()
# Traiter tous les fichiers fournis
files = sys.argv[1:]
total_leaks = 0
files_with_leaks = 0
for filepath in files:
path = Path(filepath)
if not path.exists():
print(f"❌ Fichier introuvable: {filepath}")
continue
# Lire le texte anonymisé
text = path.read_text(encoding='utf-8')
# Valider
leaks, stats = validator.validate_text(text, path.name)
# Générer le rapport
report = validator.generate_report(leaks, stats, path.name)
print(report)
if leaks:
total_leaks += len(leaks)
files_with_leaks += 1
# Résumé global si plusieurs fichiers
if len(files) > 1:
print("\n" + "=" * 80)
print("RÉSUMÉ GLOBAL")
print("=" * 80)
print(f"Fichiers traités: {len(files)}")
print(f"Fichiers avec fuites: {files_with_leaks}")
print(f"Total de fuites: {total_leaks}")
if total_leaks == 0:
print("\n✅ TOUS LES FICHIERS SONT VALIDES - Aucune fuite détectée!")
else:
print(f"\n⚠️ {files_with_leaks} fichier(s) contiennent des fuites!")
if __name__ == "__main__":
main()