- Sélection et copie de 27 documents représentatifs (10 simples, 12 moyens, 5 complexes) - Outil d'annotation CLI complet (tools/annotation_tool.py) - Guide d'annotation détaillé (docs/annotation_guide.md) - Évaluateur de qualité (evaluation/quality_evaluator.py) * Calcul Précision, Rappel, F1-Score * Identification faux positifs/négatifs * Métriques par type de PII * Export JSON et rapports texte - Scanner de fuite (evaluation/leak_scanner.py) * Détection PII résiduels (CRITIQUE) * Détection nouveaux PII (HAUTE) * Scan métadonnées PDF (MOYENNE) - Benchmark de performance (evaluation/benchmark.py) * Mesure temps de traitement * Mesure CPU/RAM * Export JSON/CSV - Tests unitaires complets pour tous les composants - Documentation complète du module d'évaluation Tâches complétées: - 1.1.1 Sélection de 27 documents (au lieu de 30) - 1.1.2 Outil d'annotation CLI - 1.2.1 Évaluateur de qualité - 1.2.2 Scanner de fuite - 1.2.3 Benchmark de performance Prochaines étapes: - 1.1.3 Annotation des 27 documents (manuel) - 1.1.4 Enrichissement stopwords médicaux - 1.3 Mesure de la baseline
401 lines
13 KiB
Python
Executable File
401 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Outil d'annotation CLI pour créer le dataset de test annoté.
|
|
|
|
Usage:
|
|
python tools/annotation_tool.py <pdf_path>
|
|
python tools/annotation_tool.py --list
|
|
python tools/annotation_tool.py --resume
|
|
"""
|
|
import json
|
|
import sys
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
from typing import List, Dict, Optional, Tuple
|
|
import re
|
|
|
|
try:
|
|
import pymupdf as fitz
|
|
except ImportError:
|
|
import fitz
|
|
|
|
|
|
class AnnotationTool:
|
|
"""Outil d'annotation interactif pour les documents PDF."""
|
|
|
|
PII_TYPES = [
|
|
"NOM", "PRENOM", "DATE_NAISSANCE", "AGE",
|
|
"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "VILLE",
|
|
"NIR", "IPP", "NDA", "RPPS", "FINESS", "OGC",
|
|
"NUMERO_PATIENT", "NUMERO_LOT", "NUMERO_ORDONNANCE", "NUMERO_SEJOUR",
|
|
"ETABLISSEMENT", "SERVICE", "DATE", "AUTRE"
|
|
]
|
|
|
|
def __init__(self, pdf_path: Path):
|
|
self.pdf_path = pdf_path
|
|
self.annotations_path = pdf_path.parent / f"{pdf_path.stem}.annotations.json"
|
|
self.doc = None
|
|
self.annotations = []
|
|
self.medical_terms = []
|
|
self.metadata = {
|
|
"annotator": "annotator_1",
|
|
"annotation_date": datetime.now().isoformat(),
|
|
"document_type": "unknown",
|
|
"page_count": 0,
|
|
"difficulty": "medium"
|
|
}
|
|
|
|
def load_pdf(self) -> bool:
|
|
"""Charge le PDF et extrait le texte."""
|
|
try:
|
|
self.doc = fitz.open(self.pdf_path)
|
|
self.metadata["page_count"] = len(self.doc)
|
|
print(f"✓ PDF chargé: {self.pdf_path.name}")
|
|
print(f" Pages: {len(self.doc)}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement du PDF: {e}")
|
|
return False
|
|
|
|
def extract_text(self, page_num: int) -> str:
|
|
"""Extrait le texte d'une page."""
|
|
if not self.doc or page_num >= len(self.doc):
|
|
return ""
|
|
|
|
page = self.doc[page_num]
|
|
return page.get_text()
|
|
|
|
def display_page(self, page_num: int):
|
|
"""Affiche le texte d'une page."""
|
|
text = self.extract_text(page_num)
|
|
|
|
print(f"\n{'='*80}")
|
|
print(f"PAGE {page_num + 1}/{len(self.doc)}")
|
|
print(f"{'='*80}")
|
|
print(text)
|
|
print(f"{'='*80}\n")
|
|
|
|
def get_context(self, text: str, pii_text: str, window: int = 50) -> str:
|
|
"""Extrait le contexte autour d'un PII."""
|
|
pos = text.find(pii_text)
|
|
if pos == -1:
|
|
return ""
|
|
|
|
start = max(0, pos - window)
|
|
end = min(len(text), pos + len(pii_text) + window)
|
|
context = text[start:end]
|
|
|
|
# Nettoyer les retours à la ligne multiples
|
|
context = re.sub(r'\n+', ' ', context)
|
|
context = re.sub(r'\s+', ' ', context)
|
|
|
|
return context.strip()
|
|
|
|
def input_with_default(self, prompt: str, default: str = "") -> str:
|
|
"""Demande une entrée avec valeur par défaut."""
|
|
if default:
|
|
user_input = input(f"{prompt} [{default}]: ").strip()
|
|
return user_input if user_input else default
|
|
else:
|
|
return input(f"{prompt}: ").strip()
|
|
|
|
def select_from_list(self, prompt: str, options: List[str], default: Optional[str] = None) -> str:
|
|
"""Sélection dans une liste d'options."""
|
|
print(f"\n{prompt}")
|
|
for i, option in enumerate(options, 1):
|
|
marker = " (défaut)" if option == default else ""
|
|
print(f" {i}. {option}{marker}")
|
|
|
|
while True:
|
|
choice = input(f"Choix [1-{len(options)}]: ").strip()
|
|
|
|
if not choice and default:
|
|
return default
|
|
|
|
try:
|
|
idx = int(choice) - 1
|
|
if 0 <= idx < len(options):
|
|
return options[idx]
|
|
except ValueError:
|
|
pass
|
|
|
|
print(f"✗ Choix invalide. Entrez un nombre entre 1 et {len(options)}")
|
|
|
|
def annotate_pii(self, page_num: int, text: str) -> List[Dict]:
|
|
"""Annotation interactive des PII d'une page."""
|
|
page_annotations = []
|
|
|
|
print(f"\n--- Annotation de la page {page_num + 1} ---")
|
|
print("Commandes: 'q' pour terminer la page, 's' pour sauter")
|
|
|
|
ann_id = len(self.annotations) + 1
|
|
|
|
while True:
|
|
print(f"\n[Annotation #{ann_id}]")
|
|
|
|
# Texte du PII
|
|
pii_text = input("Texte du PII (ou 'q' pour terminer, 's' pour sauter): ").strip()
|
|
|
|
if pii_text.lower() == 'q':
|
|
break
|
|
if pii_text.lower() == 's':
|
|
continue
|
|
if not pii_text:
|
|
print("✗ Le texte ne peut pas être vide")
|
|
continue
|
|
|
|
# Type de PII
|
|
pii_type = self.select_from_list(
|
|
"Type de PII:",
|
|
self.PII_TYPES,
|
|
default="NOM"
|
|
)
|
|
|
|
# Contexte
|
|
context = self.get_context(text, pii_text)
|
|
if context:
|
|
print(f"Contexte détecté: {context[:100]}...")
|
|
use_context = input("Utiliser ce contexte? [O/n]: ").strip().lower()
|
|
if use_context == 'n':
|
|
context = input("Contexte manuel: ").strip()
|
|
else:
|
|
context = input("Contexte: ").strip()
|
|
|
|
# Obligatoire?
|
|
mandatory_input = input("PII obligatoire (RGPD)? [O/n]: ").strip().lower()
|
|
mandatory = mandatory_input != 'n'
|
|
|
|
# Difficulté
|
|
difficulty = self.select_from_list(
|
|
"Difficulté de détection:",
|
|
["easy", "medium", "hard"],
|
|
default="medium"
|
|
)
|
|
|
|
# Méthodes de détection attendues
|
|
print("\nMéthodes de détection attendues (séparées par des virgules):")
|
|
print(" Options: regex, vlm, ner, contextual, trackare")
|
|
methods_input = input("Méthodes [regex,ner]: ").strip()
|
|
if not methods_input:
|
|
methods = ["regex", "ner"]
|
|
else:
|
|
methods = [m.strip() for m in methods_input.split(',')]
|
|
|
|
# Créer l'annotation
|
|
annotation = {
|
|
"id": f"ann_{ann_id:03d}",
|
|
"page": page_num,
|
|
"type": pii_type,
|
|
"text": pii_text,
|
|
"bbox": None, # Pas de bbox pour l'instant (annotation manuelle)
|
|
"context": context,
|
|
"mandatory": mandatory,
|
|
"difficulty": difficulty,
|
|
"detection_method_expected": methods
|
|
}
|
|
|
|
page_annotations.append(annotation)
|
|
print(f"✓ Annotation ajoutée: {pii_type} = '{pii_text}'")
|
|
|
|
ann_id += 1
|
|
|
|
return page_annotations
|
|
|
|
def annotate_document(self):
|
|
"""Annotation complète du document."""
|
|
if not self.load_pdf():
|
|
return False
|
|
|
|
# Métadonnées du document
|
|
print("\n=== Métadonnées du document ===")
|
|
|
|
self.metadata["annotator"] = self.input_with_default(
|
|
"Nom de l'annotateur",
|
|
default="annotator_1"
|
|
)
|
|
|
|
self.metadata["document_type"] = self.select_from_list(
|
|
"Type de document:",
|
|
["compte_rendu", "trackare", "anapath", "bacterio", "consultation", "autre"],
|
|
default="compte_rendu"
|
|
)
|
|
|
|
self.metadata["difficulty"] = self.select_from_list(
|
|
"Difficulté globale du document:",
|
|
["simple", "moyen", "complexe"],
|
|
default="moyen"
|
|
)
|
|
|
|
# Annoter chaque page
|
|
for page_num in range(len(self.doc)):
|
|
self.display_page(page_num)
|
|
|
|
annotate_page = input(f"\nAnnoter cette page? [O/n]: ").strip().lower()
|
|
if annotate_page == 'n':
|
|
continue
|
|
|
|
text = self.extract_text(page_num)
|
|
page_annotations = self.annotate_pii(page_num, text)
|
|
self.annotations.extend(page_annotations)
|
|
|
|
# Termes médicaux à préserver
|
|
print("\n=== Termes médicaux à préserver ===")
|
|
print("Entrez les termes médicaux qui ne doivent PAS être masqués")
|
|
print("(un par ligne, ligne vide pour terminer)")
|
|
|
|
while True:
|
|
term = input("Terme médical: ").strip()
|
|
if not term:
|
|
break
|
|
self.medical_terms.append(term)
|
|
print(f"✓ Ajouté: {term}")
|
|
|
|
return True
|
|
|
|
def save_annotations(self):
|
|
"""Sauvegarde les annotations au format JSON."""
|
|
# Calculer les statistiques
|
|
stats = {
|
|
"total_pii": len(self.annotations),
|
|
"by_type": {}
|
|
}
|
|
|
|
for ann in self.annotations:
|
|
pii_type = ann["type"]
|
|
stats["by_type"][pii_type] = stats["by_type"].get(pii_type, 0) + 1
|
|
|
|
# Créer la structure finale
|
|
output = {
|
|
"pdf_path": str(self.pdf_path),
|
|
"metadata": self.metadata,
|
|
"annotations": self.annotations,
|
|
"medical_terms_to_preserve": self.medical_terms,
|
|
"statistics": stats
|
|
}
|
|
|
|
# Sauvegarder
|
|
with open(self.annotations_path, 'w', encoding='utf-8') as f:
|
|
json.dump(output, f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"\n✓ Annotations sauvegardées: {self.annotations_path}")
|
|
print(f" Total PII: {stats['total_pii']}")
|
|
print(f" Types: {', '.join(f'{k}={v}' for k, v in stats['by_type'].items())}")
|
|
print(f" Termes médicaux: {len(self.medical_terms)}")
|
|
|
|
def load_existing_annotations(self) -> bool:
|
|
"""Charge les annotations existantes si disponibles."""
|
|
if not self.annotations_path.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(self.annotations_path, 'r', encoding='utf-8') as f:
|
|
data = json.load(f)
|
|
|
|
self.metadata = data.get("metadata", self.metadata)
|
|
self.annotations = data.get("annotations", [])
|
|
self.medical_terms = data.get("medical_terms_to_preserve", [])
|
|
|
|
print(f"✓ Annotations existantes chargées: {len(self.annotations)} PII")
|
|
return True
|
|
except Exception as e:
|
|
print(f"✗ Erreur lors du chargement des annotations: {e}")
|
|
return False
|
|
|
|
def run(self):
|
|
"""Exécute l'outil d'annotation."""
|
|
print(f"\n{'='*80}")
|
|
print(f"OUTIL D'ANNOTATION - {self.pdf_path.name}")
|
|
print(f"{'='*80}")
|
|
|
|
# Vérifier si des annotations existent déjà
|
|
if self.annotations_path.exists():
|
|
overwrite = input(f"\n⚠ Des annotations existent déjà. Écraser? [o/N]: ").strip().lower()
|
|
if overwrite != 'o':
|
|
print("Annotation annulée.")
|
|
return False
|
|
|
|
# Annoter le document
|
|
if not self.annotate_document():
|
|
return False
|
|
|
|
# Sauvegarder
|
|
self.save_annotations()
|
|
|
|
return True
|
|
|
|
|
|
def list_documents():
|
|
"""Liste les documents disponibles pour annotation."""
|
|
pdfs_dir = Path("tests/ground_truth/pdfs")
|
|
|
|
if not pdfs_dir.exists():
|
|
print(f"✗ Répertoire introuvable: {pdfs_dir}")
|
|
return
|
|
|
|
pdfs = sorted(pdfs_dir.glob("*.pdf"))
|
|
|
|
if not pdfs:
|
|
print(f"✗ Aucun PDF trouvé dans {pdfs_dir}")
|
|
return
|
|
|
|
print(f"\n{'='*80}")
|
|
print(f"DOCUMENTS DISPONIBLES ({len(pdfs)})")
|
|
print(f"{'='*80}\n")
|
|
|
|
for pdf in pdfs:
|
|
annotation_file = pdf.parent / f"{pdf.stem}.annotations.json"
|
|
status = "✓ Annoté" if annotation_file.exists() else "○ À annoter"
|
|
print(f"{status} {pdf.name}")
|
|
|
|
|
|
def find_next_unannotated() -> Optional[Path]:
|
|
"""Trouve le prochain document non annoté."""
|
|
pdfs_dir = Path("tests/ground_truth/pdfs")
|
|
|
|
if not pdfs_dir.exists():
|
|
return None
|
|
|
|
for pdf in sorted(pdfs_dir.glob("*.pdf")):
|
|
annotation_file = pdf.parent / f"{pdf.stem}.annotations.json"
|
|
if not annotation_file.exists():
|
|
return pdf
|
|
|
|
return None
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) > 1:
|
|
if sys.argv[1] == "--list":
|
|
list_documents()
|
|
return
|
|
elif sys.argv[1] == "--resume":
|
|
next_pdf = find_next_unannotated()
|
|
if next_pdf:
|
|
print(f"Prochain document à annoter: {next_pdf.name}")
|
|
tool = AnnotationTool(next_pdf)
|
|
tool.run()
|
|
else:
|
|
print("✓ Tous les documents sont annotés!")
|
|
return
|
|
else:
|
|
pdf_path = Path(sys.argv[1])
|
|
else:
|
|
print("Usage:")
|
|
print(" python tools/annotation_tool.py <pdf_path>")
|
|
print(" python tools/annotation_tool.py --list")
|
|
print(" python tools/annotation_tool.py --resume")
|
|
sys.exit(1)
|
|
|
|
if not pdf_path.exists():
|
|
print(f"✗ Fichier introuvable: {pdf_path}")
|
|
sys.exit(1)
|
|
|
|
tool = AnnotationTool(pdf_path)
|
|
success = tool.run()
|
|
|
|
sys.exit(0 if success else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|