Files
anonymisation/tools/deep_quality_regression_analysis.py

262 lines
10 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Analyse approfondie de la régression de qualité
Comparaison détaillée entre documents originaux et anonymisés
"""
import json
import re
from pathlib import Path
from typing import Dict, List, Tuple
import pdfplumber
def extract_original_text(pdf_path: str) -> str:
"""Extrait le texte du PDF original"""
with pdfplumber.open(pdf_path) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
def load_anonymized_text(txt_path: str) -> str:
"""Charge le texte anonymisé"""
return Path(txt_path).read_text(encoding='utf-8')
def load_audit(audit_path: str) -> List[Dict]:
"""Charge le fichier audit"""
audit = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
audit.append(json.loads(line))
return audit
def analyze_masking_quality(original: str, anonymized: str, audit: List[Dict]) -> Dict:
"""Analyse la qualité du masquage"""
issues = {
"ocr_artifacts": [],
"over_masked_medical_terms": [],
"over_masked_medications": [],
"over_masked_dates": [],
"over_masked_cities": [],
"legitimate_masking": [],
"false_positives": [],
"text_quality_degradation": []
}
# 1. Détecter les artefacts OCR
ocr_patterns = [
r'P Nr °a t Ric Pi Pen S',
r'[A-Z]\s[a-z]\s[a-z]\s[a-z]', # Lettres espacées
r'\d\s\d\s\d\s\d', # Chiffres espacés
]
for pattern in ocr_patterns:
for match in re.finditer(pattern, anonymized):
issues["ocr_artifacts"].append({
"text": match.group(0),
"position": match.start(),
"context": anonymized[max(0, match.start()-30):match.end()+30]
})
# 2. Détecter les termes médicaux sur-masqués
medical_terms_masked = [
("Chef de service", "Chef de [MASK]"),
("Chef de Clinique", "Chef de [ETABLISSEMENT]"),
("Note IDE", "[NOM] IDE"),
("Avis ORL", "[NOM] ORL"),
("Examen ORL", "[NOM] ORL"),
]
for original_term, masked_term in medical_terms_masked:
if masked_term in anonymized and original_term in original:
issues["over_masked_medical_terms"].append({
"original": original_term,
"masked": masked_term,
"count": anonymized.count(masked_term)
})
# 3. Détecter les médicaments sur-masqués
medication_pattern = r'\[NOM\]\s+\d+\s*mg'
for match in re.finditer(medication_pattern, anonymized):
# Trouver le médicament original
context_start = max(0, match.start() - 50)
context_end = min(len(anonymized), match.end() + 50)
context = anonymized[context_start:context_end]
issues["over_masked_medications"].append({
"masked": match.group(0),
"context": context
})
# 4. Analyser les dates masquées
date_masks = re.findall(r'\[DATE\]', anonymized)
date_naissance_masks = re.findall(r'\[DATE_NAISSANCE\]', anonymized)
# Compter les dates dans l'original
date_pattern = r'\b\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}\b'
original_dates = re.findall(date_pattern, original)
issues["over_masked_dates"] = {
"total_date_masks": len(date_masks),
"date_naissance_masks": len(date_naissance_masks),
"original_dates_count": len(original_dates),
"ratio": len(date_masks) / max(1, len(original_dates)),
"problem": len(date_masks) > len(date_naissance_masks) * 5 # Si >5x plus de dates que de dates de naissance
}
# 5. Analyser les villes masquées
ville_masks = re.findall(r'\[VILLE\]', anonymized)
issues["over_masked_cities"] = {
"count": len(ville_masks),
"contexts": []
}
for match in re.finditer(r'\[VILLE\]', anonymized):
context_start = max(0, match.start() - 30)
context_end = min(len(anonymized), match.end() + 30)
issues["over_masked_cities"]["contexts"].append(
anonymized[context_start:context_end]
)
# 6. Analyser l'audit pour les faux positifs
nom_count = sum(1 for h in audit if h['kind'] == 'NOM')
nom_global_count = sum(1 for h in audit if h['kind'] == 'NOM_GLOBAL')
issues["false_positives"] = {
"nom_count": nom_count,
"nom_global_count": nom_global_count,
"suspicious": nom_count > 50 # Plus de 50 noms dans un document = suspect
}
# 7. Comparer la qualité du texte
original_clean = re.sub(r'\s+', ' ', original).strip()
anonymized_clean = re.sub(r'\s+', ' ', anonymized).strip()
# Ratio de caractères préservés (hors masques)
mask_chars = len(re.findall(r'\[.*?\]', anonymized_clean))
preserved_ratio = (len(anonymized_clean) - mask_chars) / max(1, len(original_clean))
issues["text_quality_degradation"] = {
"original_length": len(original_clean),
"anonymized_length": len(anonymized_clean),
"preserved_ratio": preserved_ratio,
"degraded": preserved_ratio < 0.7 # Si <70% du texte préservé
}
return issues
def generate_report(issues: Dict, doc_name: str) -> str:
"""Génère un rapport détaillé"""
report = []
report.append(f"\n{'='*80}")
report.append(f"ANALYSE DE RÉGRESSION - {doc_name}")
report.append(f"{'='*80}\n")
# Artefacts OCR
if issues["ocr_artifacts"]:
report.append(f"⚠️ ARTEFACTS OCR DÉTECTÉS: {len(issues['ocr_artifacts'])}")
for i, artifact in enumerate(issues["ocr_artifacts"][:3], 1):
report.append(f" {i}. '{artifact['text']}'")
report.append(f" Contexte: ...{artifact['context']}...")
report.append("")
# Termes médicaux sur-masqués
if issues["over_masked_medical_terms"]:
report.append(f"⚠️ TERMES MÉDICAUX SUR-MASQUÉS: {len(issues['over_masked_medical_terms'])}")
for term in issues["over_masked_medical_terms"]:
report.append(f"'{term['original']}''{term['masked']}' ({term['count']}x)")
report.append("")
# Médicaments sur-masqués
if issues["over_masked_medications"]:
report.append(f"⚠️ MÉDICAMENTS SUR-MASQUÉS: {len(issues['over_masked_medications'])}")
for i, med in enumerate(issues["over_masked_medications"][:3], 1):
report.append(f" {i}. {med['masked']}")
report.append(f" Contexte: ...{med['context']}...")
report.append("")
# Dates sur-masquées
if issues["over_masked_dates"]["problem"]:
report.append(f"⚠️ DATES SUR-MASQUÉES:")
report.append(f" • Total [DATE]: {issues['over_masked_dates']['total_date_masks']}")
report.append(f" • [DATE_NAISSANCE]: {issues['over_masked_dates']['date_naissance_masks']}")
report.append(f" • Dates originales: {issues['over_masked_dates']['original_dates_count']}")
report.append(f" • Ratio: {issues['over_masked_dates']['ratio']:.1f}x")
report.append(f" • PROBLÈME: Toutes les dates sont masquées, pas seulement les dates de naissance!")
report.append("")
# Villes sur-masquées
if issues["over_masked_cities"]["count"] > 0:
report.append(f"⚠️ VILLES SUR-MASQUÉES: {issues['over_masked_cities']['count']}")
for i, ctx in enumerate(issues["over_masked_cities"]["contexts"][:3], 1):
report.append(f" {i}. ...{ctx}...")
report.append("")
# Faux positifs
if issues["false_positives"]["suspicious"]:
report.append(f"⚠️ FAUX POSITIFS SUSPECTS:")
report.append(f" • NOM détectés: {issues['false_positives']['nom_count']}")
report.append(f" • NOM_GLOBAL: {issues['false_positifs']['nom_global_count']}")
report.append(f" • PROBLÈME: Trop de noms détectés (>50), probablement des termes médicaux")
report.append("")
# Dégradation qualité texte
if issues["text_quality_degradation"]["degraded"]:
report.append(f"⚠️ DÉGRADATION QUALITÉ TEXTE:")
report.append(f" • Longueur originale: {issues['text_quality_degradation']['original_length']}")
report.append(f" • Longueur anonymisée: {issues['text_quality_degradation']['anonymized_length']}")
report.append(f" • Ratio préservé: {issues['text_quality_degradation']['preserved_ratio']:.1%}")
report.append(f" • PROBLÈME: Moins de 70% du texte préservé")
report.append("")
return "\n".join(report)
def main():
"""Analyse un échantillon de documents"""
# Chemins
original_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
anonymized_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs/anonymise")
# Documents à analyser
test_docs = [
("102_23056463/CRH 23056364.pdf", "CRH 23056364"),
]
all_reports = []
for original_rel, base_name in test_docs:
print(f"\n🔍 Analyse de {base_name}...")
original_path = original_dir / original_rel
anonymized_txt = anonymized_dir / f"{base_name}.pseudonymise.txt"
audit_file = anonymized_dir / f"{base_name}.audit.jsonl"
if not original_path.exists():
print(f" ❌ Original non trouvé: {original_path}")
continue
if not anonymized_txt.exists():
print(f" ❌ Anonymisé non trouvé: {anonymized_txt}")
continue
if not audit_file.exists():
print(f" ❌ Audit non trouvé: {audit_file}")
continue
# Extraire et analyser
original_text = extract_original_text(str(original_path))
anonymized_text = load_anonymized_text(str(anonymized_txt))
audit = load_audit(str(audit_file))
issues = analyze_masking_quality(original_text, anonymized_text, audit)
report = generate_report(issues, base_name)
all_reports.append(report)
print(report)
# Sauvegarder le rapport
output_file = Path(".kiro/specs/anonymization-quality-optimization/DEEP_REGRESSION_ANALYSIS.md")
output_file.parent.mkdir(parents=True, exist_ok=True)
full_report = "\n\n".join(all_reports)
output_file.write_text(full_report, encoding='utf-8')
print(f"\n✅ Rapport sauvegardé: {output_file}")
if __name__ == "__main__":
main()