docs: Analyse complète de la régression de qualité - Causes racines identifiées

This commit is contained in:
2026-03-02 23:09:25 +01:00
parent eb797a4761
commit dfa6e2957b
5 changed files with 930 additions and 3 deletions

View File

@@ -0,0 +1,172 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Comparaison entre test dataset (100% qualité) et production (régression)
Identifie les différences de traitement
"""
import json
from pathlib import Path
from typing import Dict, List
import re
def analyze_audit_file(audit_path: Path) -> Dict:
"""Analyse un fichier audit"""
audit = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
audit.append(json.loads(line))
stats = {
"total": len(audit),
"by_kind": {},
"by_page": {},
"global_tokens": 0,
"extracted_tokens": 0,
}
for h in audit:
kind = h['kind']
page = h.get('page', -1)
stats["by_kind"][kind] = stats["by_kind"].get(kind, 0) + 1
stats["by_page"][page] = stats["by_page"].get(page, 0) + 1
if kind.endswith("_GLOBAL"):
stats["global_tokens"] += 1
if kind == "NOM_EXTRACTED":
stats["extracted_tokens"] += 1
return stats
def compare_datasets():
"""Compare test dataset vs production"""
# Test dataset (bonne qualité)
test_dir = Path("tests/ground_truth/pdfs/baseline_anonymized")
# Production (régression)
prod_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs/anonymise")
print("\n" + "="*80)
print("COMPARAISON TEST DATASET vs PRODUCTION")
print("="*80 + "\n")
# Analyser test dataset
print("📊 Analyse TEST DATASET (bonne qualité)...")
test_audits = list(test_dir.glob("*.audit.jsonl"))
test_stats_all = []
for audit_file in test_audits[:5]: # 5 premiers
stats = analyze_audit_file(audit_file)
test_stats_all.append(stats)
print(f"{audit_file.name}: {stats['total']} PII, {stats['global_tokens']} global, {stats['extracted_tokens']} extracted")
# Moyennes test
test_avg = {
"total": sum(s["total"] for s in test_stats_all) / len(test_stats_all),
"global": sum(s["global_tokens"] for s in test_stats_all) / len(test_stats_all),
"extracted": sum(s["extracted_tokens"] for s in test_stats_all) / len(test_stats_all),
}
print(f"\n Moyennes TEST:")
print(f" - PII/doc: {test_avg['total']:.1f}")
print(f" - Global/doc: {test_avg['global']:.1f}")
print(f" - Extracted/doc: {test_avg['extracted']:.1f}")
# Analyser production
print("\n📊 Analyse PRODUCTION (régression)...")
prod_audits = list(prod_dir.glob("*.audit.jsonl"))
prod_stats_all = []
for audit_file in prod_audits[:5]: # 5 premiers
stats = analyze_audit_file(audit_file)
prod_stats_all.append(stats)
print(f"{audit_file.name}: {stats['total']} PII, {stats['global_tokens']} global, {stats['extracted_tokens']} extracted")
# Moyennes production
prod_avg = {
"total": sum(s["total"] for s in prod_stats_all) / len(prod_stats_all),
"global": sum(s["global_tokens"] for s in prod_stats_all) / len(prod_stats_all),
"extracted": sum(s["extracted_tokens"] for s in prod_stats_all) / len(prod_stats_all),
}
print(f"\n Moyennes PRODUCTION:")
print(f" - PII/doc: {prod_avg['total']:.1f}")
print(f" - Global/doc: {prod_avg['global']:.1f}")
print(f" - Extracted/doc: {prod_avg['extracted']:.1f}")
# Comparaison
print("\n" + "="*80)
print("DIFFÉRENCES")
print("="*80)
diff_total = prod_avg['total'] - test_avg['total']
diff_global = prod_avg['global'] - test_avg['global']
diff_extracted = prod_avg['extracted'] - test_avg['extracted']
print(f"\n PII/doc: {diff_total:+.1f} ({diff_total/test_avg['total']*100:+.1f}%)")
print(f" Global/doc: {diff_global:+.1f} ({diff_global/max(1,test_avg['global'])*100:+.1f}%)")
print(f" Extracted/doc: {diff_extracted:+.1f} ({diff_extracted/max(1,test_avg['extracted'])*100:+.1f}%)")
# Analyse des types de PII
print("\n" + "="*80)
print("RÉPARTITION PAR TYPE")
print("="*80)
# Test dataset
test_by_kind = {}
for stats in test_stats_all:
for kind, count in stats["by_kind"].items():
test_by_kind[kind] = test_by_kind.get(kind, 0) + count
# Production
prod_by_kind = {}
for stats in prod_stats_all:
for kind, count in stats["by_kind"].items():
prod_by_kind[kind] = prod_by_kind.get(kind, 0) + count
# Top 10 types
all_kinds = set(test_by_kind.keys()) | set(prod_by_kind.keys())
kind_diffs = []
for kind in all_kinds:
test_count = test_by_kind.get(kind, 0)
prod_count = prod_by_kind.get(kind, 0)
diff = prod_count - test_count
kind_diffs.append((kind, test_count, prod_count, diff))
kind_diffs.sort(key=lambda x: abs(x[3]), reverse=True)
print("\n Top 10 différences:")
print(f" {'Type':<25} {'Test':<10} {'Prod':<10} {'Diff':<10}")
print(f" {'-'*60}")
for kind, test_c, prod_c, diff in kind_diffs[:10]:
print(f" {kind:<25} {test_c:<10} {prod_c:<10} {diff:+<10}")
# Identifier les problèmes
print("\n" + "="*80)
print("PROBLÈMES IDENTIFIÉS")
print("="*80 + "\n")
problems = []
# NOM_EXTRACTED
if prod_avg['extracted'] > 0:
problems.append("⚠️ NOM_EXTRACTED activé en production (devrait être désactivé)")
# *_GLOBAL
if prod_avg['global'] > test_avg['global'] * 2:
problems.append(f"⚠️ Trop de tokens _GLOBAL en production ({prod_avg['global']:.1f} vs {test_avg['global']:.1f})")
# PII total
if prod_avg['total'] > test_avg['total'] * 1.5:
problems.append(f"⚠️ Trop de PII détectés en production ({prod_avg['total']:.1f} vs {test_avg['total']:.1f})")
if problems:
for p in problems:
print(f" {p}")
else:
print(" ✅ Aucun problème majeur détecté")
if __name__ == "__main__":
compare_datasets()

View File

@@ -0,0 +1,261 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Analyse approfondie de la régression de qualité
Comparaison détaillée entre documents originaux et anonymisés
"""
import json
import re
from pathlib import Path
from typing import Dict, List, Tuple
import pdfplumber
def extract_original_text(pdf_path: str) -> str:
"""Extrait le texte du PDF original"""
with pdfplumber.open(pdf_path) as pdf:
return "\n".join(page.extract_text() or "" for page in pdf.pages)
def load_anonymized_text(txt_path: str) -> str:
"""Charge le texte anonymisé"""
return Path(txt_path).read_text(encoding='utf-8')
def load_audit(audit_path: str) -> List[Dict]:
"""Charge le fichier audit"""
audit = []
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
audit.append(json.loads(line))
return audit
def analyze_masking_quality(original: str, anonymized: str, audit: List[Dict]) -> Dict:
"""Analyse la qualité du masquage"""
issues = {
"ocr_artifacts": [],
"over_masked_medical_terms": [],
"over_masked_medications": [],
"over_masked_dates": [],
"over_masked_cities": [],
"legitimate_masking": [],
"false_positives": [],
"text_quality_degradation": []
}
# 1. Détecter les artefacts OCR
ocr_patterns = [
r'P Nr °a t Ric Pi Pen S',
r'[A-Z]\s[a-z]\s[a-z]\s[a-z]', # Lettres espacées
r'\d\s\d\s\d\s\d', # Chiffres espacés
]
for pattern in ocr_patterns:
for match in re.finditer(pattern, anonymized):
issues["ocr_artifacts"].append({
"text": match.group(0),
"position": match.start(),
"context": anonymized[max(0, match.start()-30):match.end()+30]
})
# 2. Détecter les termes médicaux sur-masqués
medical_terms_masked = [
("Chef de service", "Chef de [MASK]"),
("Chef de Clinique", "Chef de [ETABLISSEMENT]"),
("Note IDE", "[NOM] IDE"),
("Avis ORL", "[NOM] ORL"),
("Examen ORL", "[NOM] ORL"),
]
for original_term, masked_term in medical_terms_masked:
if masked_term in anonymized and original_term in original:
issues["over_masked_medical_terms"].append({
"original": original_term,
"masked": masked_term,
"count": anonymized.count(masked_term)
})
# 3. Détecter les médicaments sur-masqués
medication_pattern = r'\[NOM\]\s+\d+\s*mg'
for match in re.finditer(medication_pattern, anonymized):
# Trouver le médicament original
context_start = max(0, match.start() - 50)
context_end = min(len(anonymized), match.end() + 50)
context = anonymized[context_start:context_end]
issues["over_masked_medications"].append({
"masked": match.group(0),
"context": context
})
# 4. Analyser les dates masquées
date_masks = re.findall(r'\[DATE\]', anonymized)
date_naissance_masks = re.findall(r'\[DATE_NAISSANCE\]', anonymized)
# Compter les dates dans l'original
date_pattern = r'\b\d{1,2}[/.\-]\d{1,2}[/.\-]\d{2,4}\b'
original_dates = re.findall(date_pattern, original)
issues["over_masked_dates"] = {
"total_date_masks": len(date_masks),
"date_naissance_masks": len(date_naissance_masks),
"original_dates_count": len(original_dates),
"ratio": len(date_masks) / max(1, len(original_dates)),
"problem": len(date_masks) > len(date_naissance_masks) * 5 # Si >5x plus de dates que de dates de naissance
}
# 5. Analyser les villes masquées
ville_masks = re.findall(r'\[VILLE\]', anonymized)
issues["over_masked_cities"] = {
"count": len(ville_masks),
"contexts": []
}
for match in re.finditer(r'\[VILLE\]', anonymized):
context_start = max(0, match.start() - 30)
context_end = min(len(anonymized), match.end() + 30)
issues["over_masked_cities"]["contexts"].append(
anonymized[context_start:context_end]
)
# 6. Analyser l'audit pour les faux positifs
nom_count = sum(1 for h in audit if h['kind'] == 'NOM')
nom_global_count = sum(1 for h in audit if h['kind'] == 'NOM_GLOBAL')
issues["false_positives"] = {
"nom_count": nom_count,
"nom_global_count": nom_global_count,
"suspicious": nom_count > 50 # Plus de 50 noms dans un document = suspect
}
# 7. Comparer la qualité du texte
original_clean = re.sub(r'\s+', ' ', original).strip()
anonymized_clean = re.sub(r'\s+', ' ', anonymized).strip()
# Ratio de caractères préservés (hors masques)
mask_chars = len(re.findall(r'\[.*?\]', anonymized_clean))
preserved_ratio = (len(anonymized_clean) - mask_chars) / max(1, len(original_clean))
issues["text_quality_degradation"] = {
"original_length": len(original_clean),
"anonymized_length": len(anonymized_clean),
"preserved_ratio": preserved_ratio,
"degraded": preserved_ratio < 0.7 # Si <70% du texte préservé
}
return issues
def generate_report(issues: Dict, doc_name: str) -> str:
"""Génère un rapport détaillé"""
report = []
report.append(f"\n{'='*80}")
report.append(f"ANALYSE DE RÉGRESSION - {doc_name}")
report.append(f"{'='*80}\n")
# Artefacts OCR
if issues["ocr_artifacts"]:
report.append(f"⚠️ ARTEFACTS OCR DÉTECTÉS: {len(issues['ocr_artifacts'])}")
for i, artifact in enumerate(issues["ocr_artifacts"][:3], 1):
report.append(f" {i}. '{artifact['text']}'")
report.append(f" Contexte: ...{artifact['context']}...")
report.append("")
# Termes médicaux sur-masqués
if issues["over_masked_medical_terms"]:
report.append(f"⚠️ TERMES MÉDICAUX SUR-MASQUÉS: {len(issues['over_masked_medical_terms'])}")
for term in issues["over_masked_medical_terms"]:
report.append(f"'{term['original']}''{term['masked']}' ({term['count']}x)")
report.append("")
# Médicaments sur-masqués
if issues["over_masked_medications"]:
report.append(f"⚠️ MÉDICAMENTS SUR-MASQUÉS: {len(issues['over_masked_medications'])}")
for i, med in enumerate(issues["over_masked_medications"][:3], 1):
report.append(f" {i}. {med['masked']}")
report.append(f" Contexte: ...{med['context']}...")
report.append("")
# Dates sur-masquées
if issues["over_masked_dates"]["problem"]:
report.append(f"⚠️ DATES SUR-MASQUÉES:")
report.append(f" • Total [DATE]: {issues['over_masked_dates']['total_date_masks']}")
report.append(f" • [DATE_NAISSANCE]: {issues['over_masked_dates']['date_naissance_masks']}")
report.append(f" • Dates originales: {issues['over_masked_dates']['original_dates_count']}")
report.append(f" • Ratio: {issues['over_masked_dates']['ratio']:.1f}x")
report.append(f" • PROBLÈME: Toutes les dates sont masquées, pas seulement les dates de naissance!")
report.append("")
# Villes sur-masquées
if issues["over_masked_cities"]["count"] > 0:
report.append(f"⚠️ VILLES SUR-MASQUÉES: {issues['over_masked_cities']['count']}")
for i, ctx in enumerate(issues["over_masked_cities"]["contexts"][:3], 1):
report.append(f" {i}. ...{ctx}...")
report.append("")
# Faux positifs
if issues["false_positives"]["suspicious"]:
report.append(f"⚠️ FAUX POSITIFS SUSPECTS:")
report.append(f" • NOM détectés: {issues['false_positives']['nom_count']}")
report.append(f" • NOM_GLOBAL: {issues['false_positifs']['nom_global_count']}")
report.append(f" • PROBLÈME: Trop de noms détectés (>50), probablement des termes médicaux")
report.append("")
# Dégradation qualité texte
if issues["text_quality_degradation"]["degraded"]:
report.append(f"⚠️ DÉGRADATION QUALITÉ TEXTE:")
report.append(f" • Longueur originale: {issues['text_quality_degradation']['original_length']}")
report.append(f" • Longueur anonymisée: {issues['text_quality_degradation']['anonymized_length']}")
report.append(f" • Ratio préservé: {issues['text_quality_degradation']['preserved_ratio']:.1%}")
report.append(f" • PROBLÈME: Moins de 70% du texte préservé")
report.append("")
return "\n".join(report)
def main():
"""Analyse un échantillon de documents"""
# Chemins
original_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
anonymized_dir = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs/anonymise")
# Documents à analyser
test_docs = [
("102_23056463/CRH 23056364.pdf", "CRH 23056364"),
]
all_reports = []
for original_rel, base_name in test_docs:
print(f"\n🔍 Analyse de {base_name}...")
original_path = original_dir / original_rel
anonymized_txt = anonymized_dir / f"{base_name}.pseudonymise.txt"
audit_file = anonymized_dir / f"{base_name}.audit.jsonl"
if not original_path.exists():
print(f" ❌ Original non trouvé: {original_path}")
continue
if not anonymized_txt.exists():
print(f" ❌ Anonymisé non trouvé: {anonymized_txt}")
continue
if not audit_file.exists():
print(f" ❌ Audit non trouvé: {audit_file}")
continue
# Extraire et analyser
original_text = extract_original_text(str(original_path))
anonymized_text = load_anonymized_text(str(anonymized_txt))
audit = load_audit(str(audit_file))
issues = analyze_masking_quality(original_text, anonymized_text, audit)
report = generate_report(issues, base_name)
all_reports.append(report)
print(report)
# Sauvegarder le rapport
output_file = Path(".kiro/specs/anonymization-quality-optimization/DEEP_REGRESSION_ANALYSIS.md")
output_file.parent.mkdir(parents=True, exist_ok=True)
full_report = "\n\n".join(all_reports)
output_file.write_text(full_report, encoding='utf-8')
print(f"\n✅ Rapport sauvegardé: {output_file}")
if __name__ == "__main__":
main()