feat: Annotation automatique et évaluation qualité baseline - Rappel 100%, Précision 18.97%

This commit is contained in:
2026-03-02 10:51:38 +01:00
parent 99b6e7f1d1
commit 0ba5424eb0
56 changed files with 9770 additions and 6 deletions

238
tools/auto_annotate_dataset.py Executable file
View File

@@ -0,0 +1,238 @@
#!/usr/bin/env python3
"""
Annotation automatique du dataset de test.
Ce script utilise les résultats d'anonymisation (audit.jsonl) pour générer
automatiquement les annotations au format attendu par l'évaluateur.
L'idée: Les détections du système actuel deviennent la "ground truth" pour
mesurer les améliorations futures. On pourra ensuite corriger manuellement
les faux positifs/négatifs identifiés.
"""
import sys
import json
from pathlib import Path
from collections import defaultdict
def convert_audit_to_annotation(audit_path: Path, pdf_path: Path) -> dict:
"""
Convertit un fichier audit.jsonl en annotation.
Args:
audit_path: Chemin vers le fichier audit.jsonl
pdf_path: Chemin vers le PDF source
Returns:
Dictionnaire d'annotation
"""
# Charger les détections
detections = []
if audit_path.exists():
with open(audit_path, 'r', encoding='utf-8') as f:
for line in f:
if line.strip():
detections.append(json.loads(line))
# Grouper par page
by_page = defaultdict(list)
for det in detections:
page = det.get('page', -1)
if page >= 0: # Ignorer les détections globales (page -1)
by_page[page].append(det)
# Créer l'annotation
annotation = {
"pdf_path": str(pdf_path.name),
"total_pages": max(by_page.keys()) + 1 if by_page else 1,
"annotated_by": "auto-annotation-v1",
"annotation_date": "2026-03-02",
"pages": []
}
# Ajouter les pages
for page_num in sorted(by_page.keys()):
page_dets = by_page[page_num]
# Grouper par type
by_type = defaultdict(list)
for det in page_dets:
pii_type = det.get('kind', 'UNKNOWN')
text = det.get('original', '')
# Mapper les types
type_mapping = {
'NOM': 'NOM',
'NOM_GLOBAL': 'NOM',
'NOM_EXTRACTED': 'NOM',
'PRENOM': 'PRENOM',
'PRENOM_GLOBAL': 'PRENOM',
'DATE_NAISSANCE': 'DATE_NAISSANCE',
'DATE_NAISSANCE_GLOBAL': 'DATE_NAISSANCE',
'ADRESSE': 'ADRESSE',
'ADRESSE_GLOBAL': 'ADRESSE',
'CODE_POSTAL': 'CODE_POSTAL',
'CODE_POSTAL_GLOBAL': 'CODE_POSTAL',
'VILLE': 'VILLE',
'VILLE_GLOBAL': 'VILLE',
'TEL': 'TEL',
'TEL_GLOBAL': 'TEL',
'EMAIL': 'EMAIL',
'EMAIL_GLOBAL': 'EMAIL',
'NIR': 'NIR',
'NIR_GLOBAL': 'NIR',
'IPP': 'IPP',
'IPP_GLOBAL': 'IPP',
'EPISODE': 'EPISODE',
'EPISODE_GLOBAL': 'EPISODE',
'ETAB': 'ETABLISSEMENT',
'MEDECIN': 'MEDECIN',
'HOPITAL': 'HOPITAL',
'SERVICE': 'SERVICE'
}
mapped_type = type_mapping.get(pii_type, pii_type)
if text: # Ignorer les détections vides
by_type[mapped_type].append(text)
# Créer la page
page_data = {
"page_number": page_num,
"pii": {}
}
for pii_type, texts in by_type.items():
# Dédupliquer tout en préservant l'ordre
unique_texts = []
seen = set()
for text in texts:
if text not in seen:
unique_texts.append(text)
seen.add(text)
page_data["pii"][pii_type] = unique_texts
annotation["pages"].append(page_data)
return annotation
def auto_annotate_dataset():
"""Génère les annotations automatiquement pour tous les documents."""
# Répertoires
baseline_dir = Path("tests/ground_truth/pdfs/baseline_anonymized")
annotations_dir = Path("tests/ground_truth/annotations")
annotations_dir.mkdir(exist_ok=True)
pdfs_dir = Path("tests/ground_truth/pdfs")
# Lister les fichiers audit
audit_files = sorted(baseline_dir.glob("*.audit.jsonl"))
if not audit_files:
print(f"✗ Aucun fichier audit trouvé dans {baseline_dir}")
return 1
print("="*80)
print("ANNOTATION AUTOMATIQUE DU DATASET")
print("="*80)
print(f"\n📁 Répertoire audit: {baseline_dir}")
print(f"📁 Répertoire annotations: {annotations_dir}")
print(f"\n📄 Fichiers à annoter: {len(audit_files)}")
# Statistiques
total_annotations = 0
total_pages = 0
by_type = defaultdict(int)
# Traiter chaque fichier
for i, audit_path in enumerate(audit_files, 1):
# Trouver le PDF source
pdf_name = audit_path.stem.replace('.audit', '') + '.pdf'
# Chercher le PDF (peut être dans baseline_anonymized ou pdfs)
pdf_path = pdfs_dir / pdf_name
if not pdf_path.exists():
# Essayer sans le suffixe .redacted_raster
pdf_name_clean = pdf_name.replace('.redacted_raster', '').replace('.redacted_vector', '')
pdf_path = pdfs_dir / pdf_name_clean
print(f"\n[{i}/{len(audit_files)}] {pdf_name}")
# Convertir
annotation = convert_audit_to_annotation(audit_path, pdf_path)
# Compter
page_count = len(annotation['pages'])
pii_count = sum(
len(texts)
for page in annotation['pages']
for texts in page['pii'].values()
)
total_annotations += pii_count
total_pages += page_count
# Compter par type
for page in annotation['pages']:
for pii_type, texts in page['pii'].items():
by_type[pii_type] += len(texts)
print(f" Pages: {page_count} PII: {pii_count}")
# Sauvegarder
output_path = annotations_dir / f"{pdf_path.stem}.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(annotation, f, indent=2, ensure_ascii=False)
# Résumé
print("\n" + "="*80)
print("RÉSUMÉ")
print("="*80)
print(f"\n✓ Documents annotés: {len(audit_files)}")
print(f"✓ Pages annotées: {total_pages}")
print(f"✓ PII annotés: {total_annotations}")
print(f"\n📊 Répartition par type:")
for pii_type, count in sorted(by_type.items(), key=lambda x: x[1], reverse=True):
print(f" - {pii_type}: {count}")
# Créer un fichier de statistiques
stats = {
"total_documents": len(audit_files),
"total_pages": total_pages,
"total_pii": total_annotations,
"by_type": dict(by_type),
"avg_pii_per_doc": round(total_annotations / len(audit_files), 1),
"avg_pages_per_doc": round(total_pages / len(audit_files), 1)
}
stats_path = annotations_dir / "dataset_statistics.json"
with open(stats_path, 'w', encoding='utf-8') as f:
json.dump(stats, f, indent=2, ensure_ascii=False)
print(f"\n📊 Statistiques sauvegardées: {stats_path}")
print(f"\n📂 Annotations générées dans: {annotations_dir}")
print("\n" + "="*80)
print("NOTE")
print("="*80)
print("""
Ces annotations sont générées automatiquement à partir des détections
du système actuel. Elles servent de baseline pour mesurer les améliorations.
Pour affiner la qualité:
1. Utiliser l'évaluateur pour identifier les faux positifs/négatifs
2. Corriger manuellement les annotations problématiques
3. Ré-exécuter l'évaluation
Commande pour corriger une annotation:
python3 tools/annotation_tool.py --resume <pdf_name>
""")
return 0
if __name__ == "__main__":
sys.exit(auto_annotate_dataset())

View File

@@ -0,0 +1,77 @@
#!/usr/bin/env python3
"""
Convertit les annotations du format structuré vers le format attendu par l'évaluateur.
Format source (structuré par page):
{
"pages": [
{
"page_number": 0,
"pii": {
"NOM": ["text1", "text2"],
"TEL": ["text3"]
}
}
]
}
Format cible (liste plate):
{
"annotations": [
{"page": 0, "type": "NOM", "text": "text1"},
{"page": 0, "type": "NOM", "text": "text2"},
{"page": 0, "type": "TEL", "text": "text3"}
]
}
"""
import sys
import json
from pathlib import Path
def convert_annotation(input_file: Path, output_file: Path):
"""Convertit une annotation du format structuré vers le format liste."""
with open(input_file, 'r', encoding='utf-8') as f:
data = json.load(f)
annotations = []
for page_data in data.get("pages", []):
page_num = page_data.get("page_number", 0)
for pii_type, texts in page_data.get("pii", {}).items():
for text in texts:
annotations.append({
"page": page_num,
"type": pii_type,
"text": text
})
output_data = {
"pdf_path": data.get("pdf_path", ""),
"annotations": annotations
}
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
def main():
"""Convertit toutes les annotations."""
pdfs_dir = Path("tests/ground_truth/pdfs")
annotation_files = sorted(pdfs_dir.glob("*.annotations.json"))
print(f"Conversion de {len(annotation_files)} fichiers d'annotations...")
for ann_file in annotation_files:
convert_annotation(ann_file, ann_file)
print(f"{ann_file.name}")
print(f"\n✓ Conversion terminée")
return 0
if __name__ == "__main__":
sys.exit(main())

231
tools/run_quality_evaluation.py Executable file
View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python3
"""
Évaluation de la qualité d'anonymisation sur le dataset annoté.
Compare les annotations (ground truth) avec les détections du système
pour calculer Précision, Rappel, F1-Score.
"""
import sys
import json
from pathlib import Path
from collections import defaultdict
sys.path.insert(0, str(Path(__file__).parent.parent))
from evaluation.quality_evaluator import QualityEvaluator
def run_quality_evaluation():
"""Exécute l'évaluation qualité sur tous les documents annotés."""
# Répertoires
annotations_dir = Path("tests/ground_truth/annotations")
baseline_dir = Path("tests/ground_truth/pdfs/baseline_anonymized")
pdfs_dir = Path("tests/ground_truth/pdfs")
results_dir = Path("tests/ground_truth/quality_evaluation")
results_dir.mkdir(exist_ok=True)
# Lister les annotations
annotation_files = sorted(annotations_dir.glob("*.json"))
annotation_files = [f for f in annotation_files if f.name != "dataset_statistics.json"]
if not annotation_files:
print(f"✗ Aucune annotation trouvée dans {annotations_dir}")
return 1
print("="*80)
print("ÉVALUATION DE LA QUALITÉ D'ANONYMISATION")
print("="*80)
print(f"\n📁 Annotations: {annotations_dir}")
print(f"📁 Détections: {baseline_dir}")
print(f"📁 Résultats: {results_dir}")
print(f"\n📄 Documents à évaluer: {len(annotation_files)}")
# Créer l'évaluateur
evaluator = QualityEvaluator(annotations_dir)
# Statistiques globales
all_results = []
total_tp = 0
total_fp = 0
total_fn = 0
by_type_stats = defaultdict(lambda: {"tp": 0, "fp": 0, "fn": 0})
# Évaluer chaque document
for i, annotation_file in enumerate(annotation_files, 1):
pdf_name = annotation_file.stem
print(f"\n[{i}/{len(annotation_files)}] {pdf_name}")
# Trouver le PDF
pdf_path = pdfs_dir / f"{pdf_name}.pdf"
if not pdf_path.exists():
print(f" ⚠️ PDF non trouvé: {pdf_path.name}")
continue
# Trouver l'audit
audit_path = baseline_dir / f"{pdf_name}.audit.jsonl"
if not audit_path.exists():
# Essayer avec les suffixes
for suffix in ['.redacted_raster', '.redacted_vector']:
audit_path_alt = baseline_dir / f"{pdf_name}{suffix}.audit.jsonl"
if audit_path_alt.exists():
audit_path = audit_path_alt
break
if not audit_path.exists():
print(f" ⚠️ Fichier audit non trouvé: {audit_path.name}")
continue
# Évaluer
result = evaluator.evaluate(pdf_path, audit_path)
if result is None:
print(f" ⚠️ Échec de l'évaluation")
continue
all_results.append({
"pdf": pdf_name,
"result": result
})
# Afficher
print(f" Précision: {result.precision:.2%} "
f"Rappel: {result.recall:.2%} "
f"F1: {result.f1_score:.2%}")
print(f" TP: {result.true_positives} "
f"FP: {result.false_positives} "
f"FN: {result.false_negatives}")
# Accumuler
total_tp += result.true_positives
total_fp += result.false_positives
total_fn += result.false_negatives
# Par type
for pii_type, stats in result.by_type.items():
by_type_stats[pii_type]["tp"] += stats["tp"]
by_type_stats[pii_type]["fp"] += stats["fp"]
by_type_stats[pii_type]["fn"] += stats["fn"]
if not all_results:
print("\n✗ Aucun document évalué avec succès")
return 1
# Calculer les métriques globales
print("\n" + "="*80)
print("RÉSULTATS GLOBAUX")
print("="*80)
precision = total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0.0
recall = total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0.0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0
print(f"\n📊 Métriques:")
print(f" - Précision: {precision:.2%}")
print(f" - Rappel: {recall:.2%}")
print(f" - F1-Score: {f1:.2%}")
print(f"\n📊 Détails:")
print(f" - Vrais positifs (TP): {total_tp}")
print(f" - Faux positifs (FP): {total_fp}")
print(f" - Faux négatifs (FN): {total_fn}")
# Métriques par type
print(f"\n📊 Métriques par type de PII:")
for pii_type in sorted(by_type_stats.keys()):
stats = by_type_stats[pii_type]
tp = stats["tp"]
fp = stats["fp"]
fn = stats["fn"]
prec = tp / (tp + fp) if (tp + fp) > 0 else 0.0
rec = tp / (tp + fn) if (tp + fn) > 0 else 0.0
f1_type = 2 * (prec * rec) / (prec + rec) if (prec + rec) > 0 else 0.0
print(f" - {pii_type}:")
print(f" Précision: {prec:.2%} Rappel: {rec:.2%} F1: {f1_type:.2%}")
print(f" TP: {tp} FP: {fp} FN: {fn}")
# Validation des objectifs
print("\n" + "="*80)
print("VALIDATION DES OBJECTIFS")
print("="*80)
target_recall = 0.995 # ≥ 99.5%
target_precision = 0.97 # ≥ 97%
target_f1 = 0.98 # ≥ 0.98
print(f"\n🎯 Objectifs:")
print(f" - Rappel: ≥ {target_recall:.1%}")
print(f" - Précision: ≥ {target_precision:.1%}")
print(f" - F1-Score: ≥ {target_f1:.2%}")
print(f"\n📊 Résultats:")
if recall >= target_recall:
print(f" ✅ Rappel atteint: {recall:.2%}{target_recall:.1%}")
else:
print(f" ⚠️ Rappel non atteint: {recall:.2%} < {target_recall:.1%}")
print(f" Écart: {(target_recall - recall)*100:.2f} points")
if precision >= target_precision:
print(f" ✅ Précision atteinte: {precision:.2%}{target_precision:.1%}")
else:
print(f" ⚠️ Précision non atteinte: {precision:.2%} < {target_precision:.1%}")
print(f" Écart: {(target_precision - precision)*100:.2f} points")
if f1 >= target_f1:
print(f" ✅ F1-Score atteint: {f1:.2%}{target_f1:.2%}")
else:
print(f" ⚠️ F1-Score non atteint: {f1:.2%} < {target_f1:.2%}")
print(f" Écart: {(target_f1 - f1)*100:.2f} points")
# Sauvegarder les résultats
output_data = {
"evaluation_date": "2026-03-02",
"total_documents": len(all_results),
"global_metrics": {
"precision": round(precision, 4),
"recall": round(recall, 4),
"f1_score": round(f1, 4),
"true_positives": total_tp,
"false_positives": total_fp,
"false_negatives": total_fn
},
"by_type": {
pii_type: {
"precision": round(stats["tp"] / (stats["tp"] + stats["fp"]), 4) if (stats["tp"] + stats["fp"]) > 0 else 0.0,
"recall": round(stats["tp"] / (stats["tp"] + stats["fn"]), 4) if (stats["tp"] + stats["fn"]) > 0 else 0.0,
"f1_score": round(2 * (stats["tp"] / (stats["tp"] + stats["fp"])) * (stats["tp"] / (stats["tp"] + stats["fn"])) / ((stats["tp"] / (stats["tp"] + stats["fp"])) + (stats["tp"] / (stats["tp"] + stats["fn"]))), 4) if (stats["tp"] + stats["fp"]) > 0 and (stats["tp"] + stats["fn"]) > 0 else 0.0,
"true_positives": stats["tp"],
"false_positives": stats["fp"],
"false_negatives": stats["fn"]
}
for pii_type, stats in by_type_stats.items()
},
"per_document": [
{
"pdf": r["pdf"],
"precision": round(r["result"].precision, 4),
"recall": round(r["result"].recall, 4),
"f1_score": round(r["result"].f1_score, 4),
"true_positives": r["result"].true_positives,
"false_positives": r["result"].false_positives,
"false_negatives": r["result"].false_negatives
}
for r in all_results
]
}
json_file = results_dir / "baseline_quality_evaluation.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
print(f"\n📊 Résultats sauvegardés: {json_file}")
print("\n" + "="*80)
return 0
if __name__ == "__main__":
sys.exit(run_quality_evaluation())