262 lines
10 KiB
Python
262 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Analyse approfondie de la régression de qualité
|
|
Comparaison détaillée entre documents originaux et anonymisés
|
|
"""
|
|
import json
|
|
import re
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
import pdfplumber
|
|
|
|
def extract_original_text(pdf_path: str) -> str:
|
|
"""Extrait le texte du PDF original"""
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
return "\n".join(page.extract_text() or "" for page in pdf.pages)
|
|
|
|
def load_anonymized_text(txt_path: str) -> str:
|
|
"""Charge le texte anonymisé"""
|
|
return Path(txt_path).read_text(encoding='utf-8')
|
|
|
|
def load_audit(audit_path: str) -> List[Dict]:
|
|
"""Charge le fichier audit"""
|
|
audit = []
|
|
with open(audit_path, 'r', encoding='utf-8') as f:
|
|
for line in f:
|
|
if line.strip():
|
|
audit.append(json.loads(line))
|
|
return audit
|
|
|
|
def analyze_masking_quality(original: str, anonymized: str, audit: List[Dict]) -> Dict:
|
|
"""Analyse la qualité du masquage"""
|
|
|
|
issues = {
|
|
"ocr_artifacts": [],
|
|
"over_masked_medical_terms": [],
|
|
"over_masked_medications": [],
|
|
"over_masked_dates": [],
|
|
"over_masked_cities": [],
|
|
"legitimate_masking": [],
|
|
"false_positives": [],
|
|
"text_quality_degradation": []
|
|
}
|
|
|
|
# 1. Détecter les artefacts OCR
|
|
ocr_patterns = [
|
|
r'P Nr °a t Ric Pi Pen S',
|
|
r'[A-Z]\s[a-z]\s[a-z]\s[a-z]', # Lettres espacées
|
|
r'\d\s\d\s\d\s\d', # Chiffres espacés
|
|
]
|
|
for pattern in ocr_patterns:
|
|
for match in re.finditer(pattern, anonymized):
|
|
issues["ocr_artifacts"].append({
|
|
"text": match.group(0),
|
|
"position": match.start(),
|
|
"context": anonymized[max(0, match.start()-30):match.end()+30]
|
|
})
|
|
|
|
# 2. Détecter les termes médicaux sur-masqués
|
|
medical_terms_masked = [
|
|
("Chef de service", "Chef de [MASK]"),
|
|
("Chef de Clinique", "Chef de [ETABLISSEMENT]"),
|
|
("Note IDE", "[NOM] IDE"),
|
|
("Avis ORL", "[NOM] ORL"),
|
|
("Examen ORL", "[NOM] ORL"),
|
|
]
|
|
for original_term, masked_term in medical_terms_masked:
|
|
if masked_term in anonymized and original_term in original:
|
|
issues["over_masked_medical_terms"].append({
|
|
"original": original_term,
|
|
"masked": masked_term,
|
|
"count": anonymized.count(masked_term)
|
|
})
|
|
|
|
# 3. Détecter les médicaments sur-masqués
|
|
medication_pattern = r'\[NOM\]\s+\d+\s*mg'
|
|
for match in re.finditer(medication_pattern, anonymized):
|
|
# Trouver le médicament original
|
|
context_start = max(0, match.start() - 50)
|
|
context_end = min(len(anonymized), match.end() + 50)
|
|
context = anonymized[context_start:context_end]
|
|
issues["over_masked_medications"].append({
|
|
"masked": match.group(0),
|
|
"context": context
|
|
})
|
|
|
|
# 4. Analyser les dates masquées
|
|
date_masks = re.findall(r'\[DATE\]', anonymized)
|
|
date_naissance_masks = re.findall(r'\[DATE_NAISSANCE\]', anonymized)
|
|
|
|
# Compter les dates dans l'original
|
|
date_pattern = r'\b\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}\b'
|
|
original_dates = re.findall(date_pattern, original)
|
|
|
|
issues["over_masked_dates"] = {
|
|
"total_date_masks": len(date_masks),
|
|
"date_naissance_masks": len(date_naissance_masks),
|
|
"original_dates_count": len(original_dates),
|
|
"ratio": len(date_masks) / max(1, len(original_dates)),
|
|
"problem": len(date_masks) > len(date_naissance_masks) * 5 # Si >5x plus de dates que de dates de naissance
|
|
}
|
|
|
|
# 5. Analyser les villes masquées
|
|
ville_masks = re.findall(r'\[VILLE\]', anonymized)
|
|
issues["over_masked_cities"] = {
|
|
"count": len(ville_masks),
|
|
"contexts": []
|
|
}
|
|
for match in re.finditer(r'\[VILLE\]', anonymized):
|
|
context_start = max(0, match.start() - 30)
|
|
context_end = min(len(anonymized), match.end() + 30)
|
|
issues["over_masked_cities"]["contexts"].append(
|
|
anonymized[context_start:context_end]
|
|
)
|
|
|
|
# 6. Analyser l'audit pour les faux positifs
|
|
nom_count = sum(1 for h in audit if h['kind'] == 'NOM')
|
|
nom_global_count = sum(1 for h in audit if h['kind'] == 'NOM_GLOBAL')
|
|
|
|
issues["false_positives"] = {
|
|
"nom_count": nom_count,
|
|
"nom_global_count": nom_global_count,
|
|
"suspicious": nom_count > 50 # Plus de 50 noms dans un document = suspect
|
|
}
|
|
|
|
# 7. Comparer la qualité du texte
|
|
original_clean = re.sub(r'\s+', ' ', original).strip()
|
|
anonymized_clean = re.sub(r'\s+', ' ', anonymized).strip()
|
|
|
|
# Ratio de caractères préservés (hors masques)
|
|
mask_chars = len(re.findall(r'\[.*?\]', anonymized_clean))
|
|
preserved_ratio = (len(anonymized_clean) - mask_chars) / max(1, len(original_clean))
|
|
|
|
issues["text_quality_degradation"] = {
|
|
"original_length": len(original_clean),
|
|
"anonymized_length": len(anonymized_clean),
|
|
"preserved_ratio": preserved_ratio,
|
|
"degraded": preserved_ratio < 0.7 # Si <70% du texte préservé
|
|
}
|
|
|
|
return issues
|
|
|
|
def generate_report(issues: Dict, doc_name: str) -> str:
|
|
"""Génère un rapport détaillé"""
|
|
report = []
|
|
report.append(f"\n{'='*80}")
|
|
report.append(f"ANALYSE DE RÉGRESSION - {doc_name}")
|
|
report.append(f"{'='*80}\n")
|
|
|
|
# Artefacts OCR
|
|
if issues["ocr_artifacts"]:
|
|
report.append(f"⚠️ ARTEFACTS OCR DÉTECTÉS: {len(issues['ocr_artifacts'])}")
|
|
for i, artifact in enumerate(issues["ocr_artifacts"][:3], 1):
|
|
report.append(f" {i}. '{artifact['text']}'")
|
|
report.append(f" Contexte: ...{artifact['context']}...")
|
|
report.append("")
|
|
|
|
# Termes médicaux sur-masqués
|
|
if issues["over_masked_medical_terms"]:
|
|
report.append(f"⚠️ TERMES MÉDICAUX SUR-MASQUÉS: {len(issues['over_masked_medical_terms'])}")
|
|
for term in issues["over_masked_medical_terms"]:
|
|
report.append(f" • '{term['original']}' → '{term['masked']}' ({term['count']}x)")
|
|
report.append("")
|
|
|
|
# Médicaments sur-masqués
|
|
if issues["over_masked_medications"]:
|
|
report.append(f"⚠️ MÉDICAMENTS SUR-MASQUÉS: {len(issues['over_masked_medications'])}")
|
|
for i, med in enumerate(issues["over_masked_medications"][:3], 1):
|
|
report.append(f" {i}. {med['masked']}")
|
|
report.append(f" Contexte: ...{med['context']}...")
|
|
report.append("")
|
|
|
|
# Dates sur-masquées
|
|
if issues["over_masked_dates"]["problem"]:
|
|
report.append(f"⚠️ DATES SUR-MASQUÉES:")
|
|
report.append(f" • Total [DATE]: {issues['over_masked_dates']['total_date_masks']}")
|
|
report.append(f" • [DATE_NAISSANCE]: {issues['over_masked_dates']['date_naissance_masks']}")
|
|
report.append(f" • Dates originales: {issues['over_masked_dates']['original_dates_count']}")
|
|
report.append(f" • Ratio: {issues['over_masked_dates']['ratio']:.1f}x")
|
|
report.append(f" • PROBLÈME: Toutes les dates sont masquées, pas seulement les dates de naissance!")
|
|
report.append("")
|
|
|
|
# Villes sur-masquées
|
|
if issues["over_masked_cities"]["count"] > 0:
|
|
report.append(f"⚠️ VILLES SUR-MASQUÉES: {issues['over_masked_cities']['count']}")
|
|
for i, ctx in enumerate(issues["over_masked_cities"]["contexts"][:3], 1):
|
|
report.append(f" {i}. ...{ctx}...")
|
|
report.append("")
|
|
|
|
# Faux positifs
|
|
if issues["false_positives"]["suspicious"]:
|
|
report.append(f"⚠️ FAUX POSITIFS SUSPECTS:")
|
|
report.append(f" • NOM détectés: {issues['false_positives']['nom_count']}")
|
|
report.append(f" • NOM_GLOBAL: {issues['false_positifs']['nom_global_count']}")
|
|
report.append(f" • PROBLÈME: Trop de noms détectés (>50), probablement des termes médicaux")
|
|
report.append("")
|
|
|
|
# Dégradation qualité texte
|
|
if issues["text_quality_degradation"]["degraded"]:
|
|
report.append(f"⚠️ DÉGRADATION QUALITÉ TEXTE:")
|
|
report.append(f" • Longueur originale: {issues['text_quality_degradation']['original_length']}")
|
|
report.append(f" • Longueur anonymisée: {issues['text_quality_degradation']['anonymized_length']}")
|
|
report.append(f" • Ratio préservé: {issues['text_quality_degradation']['preserved_ratio']:.1%}")
|
|
report.append(f" • PROBLÈME: Moins de 70% du texte préservé")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
def main():
|
|
"""Analyse un échantillon de documents"""
|
|
|
|
# Chemins
|
|
original_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
|
|
anonymized_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs/anonymise")
|
|
|
|
# Documents à analyser
|
|
test_docs = [
|
|
("102_23056463/CRH 23056364.pdf", "CRH 23056364"),
|
|
]
|
|
|
|
all_reports = []
|
|
|
|
for original_rel, base_name in test_docs:
|
|
print(f"\n🔍 Analyse de {base_name}...")
|
|
|
|
original_path = original_dir / original_rel
|
|
anonymized_txt = anonymized_dir / f"{base_name}.pseudonymise.txt"
|
|
audit_file = anonymized_dir / f"{base_name}.audit.jsonl"
|
|
|
|
if not original_path.exists():
|
|
print(f" ❌ Original non trouvé: {original_path}")
|
|
continue
|
|
if not anonymized_txt.exists():
|
|
print(f" ❌ Anonymisé non trouvé: {anonymized_txt}")
|
|
continue
|
|
if not audit_file.exists():
|
|
print(f" ❌ Audit non trouvé: {audit_file}")
|
|
continue
|
|
|
|
# Extraire et analyser
|
|
original_text = extract_original_text(str(original_path))
|
|
anonymized_text = load_anonymized_text(str(anonymized_txt))
|
|
audit = load_audit(str(audit_file))
|
|
|
|
issues = analyze_masking_quality(original_text, anonymized_text, audit)
|
|
report = generate_report(issues, base_name)
|
|
|
|
all_reports.append(report)
|
|
print(report)
|
|
|
|
# Sauvegarder le rapport
|
|
output_file = Path(".kiro/specs/anonymization-quality-optimization/DEEP_REGRESSION_ANALYSIS.md")
|
|
output_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
full_report = "\n\n".join(all_reports)
|
|
output_file.write_text(full_report, encoding='utf-8')
|
|
|
|
print(f"\n✅ Rapport sauvegardé: {output_file}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|