Gazetteers FINESS (data.gouv.fr open data): - 102K numéros FINESS → détection par lookup exact dans _mask_admin_label + selective_rescan - 122K noms d'établissements, 113K téléphones, 76K adresses (disponibles) - Un nombre 9 chiffres matchant un vrai FINESS est masqué même sans label "FINESS" Fine-tuning CamemBERT-bio (almanach/camembert-bio-base): - Export silver annotations réécrit : alignement original↔pseudonymisé (difflib) → 6862 entités B- (vs 3344 avec l'ancien audit-only) sur 222K tokens - Sliding windows (200 tokens, stride 100) pour documents longs - WeightedNERTrainer avec class weights cappés (max 10x) + label smoothing - Résultat: Precision=88.1%, Recall=89.8%, F1=88.9% (20 epochs, lr=1e-5) - Modèle sauvegardé dans models/camembert-bio-deid/best (non commité) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
252 lines
9.5 KiB
Python
252 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Export silver annotations — BIO via alignement texte original ↔ pseudonymisé.
|
|
=============================================================================
|
|
Aligne le texte extrait du PDF original avec le texte pseudonymisé (.pseudonymise.txt)
|
|
pour créer des annotations BIO fiables. Les placeholders [NOM], [TEL], etc. dans le
|
|
texte pseudonymisé indiquent exactement quels tokens ont été masqués.
|
|
|
|
Usage:
|
|
python scripts/export_silver_annotations.py [--limit N] [--out-dir DIR]
|
|
|
|
Output: data/silver_annotations/ avec un fichier .bio par document
|
|
Format BIO: TOKEN\tLABEL (un token par ligne, lignes vides entre phrases)
|
|
"""
|
|
import sys
|
|
import re
|
|
import difflib
|
|
import argparse
|
|
from pathlib import Path
|
|
from typing import Dict, List, Tuple
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
# Mapping placeholder → label BIO
|
|
PLACEHOLDER_TO_BIO: Dict[str, str] = {
|
|
"NOM": "PER",
|
|
"TEL": "TEL",
|
|
"EMAIL": "EMAIL",
|
|
"NIR": "NIR",
|
|
"IPP": "IPP",
|
|
"DOSSIER": "NDA",
|
|
"NDA": "NDA",
|
|
"EPISODE": "NDA",
|
|
"RPPS": "RPPS",
|
|
"DATE_NAISSANCE": "DATE_NAISSANCE",
|
|
"ADRESSE": "ADRESSE",
|
|
"CODE_POSTAL": "ZIP",
|
|
"VILLE": "VILLE",
|
|
"HOPITAL": "HOPITAL",
|
|
"MASK": "HOPITAL", # [MASK] = hôpital masqué par force_regex
|
|
"IBAN": "IBAN",
|
|
"AGE": "AGE",
|
|
}
|
|
|
|
RE_PLACEHOLDER = re.compile(r"^\[([A-Z_]+)\]$")
|
|
|
|
SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)")
|
|
AUDIT_DIR = SRC / "anonymise_audit_30"
|
|
|
|
|
|
def extract_original_text(pdf_path: Path) -> str:
|
|
"""Extrait le texte brut d'un PDF (même méthode que le pipeline)."""
|
|
import anonymizer_core_refactored_onnx as core
|
|
pages_text, _, _, _ = core.extract_text_with_fallback_ocr(pdf_path)
|
|
return "\f".join(pages_text)
|
|
|
|
|
|
def tokenize_text(text: str) -> List[str]:
|
|
"""Split en tokens whitespace, en nettoyant les caractères de contrôle."""
|
|
# Remplacer \f et \r par \n pour l'alignement
|
|
text = text.replace("\f", "\n").replace("\r", "")
|
|
tokens = []
|
|
for line in text.split("\n"):
|
|
line_toks = line.split()
|
|
if line_toks:
|
|
tokens.extend(line_toks)
|
|
return tokens
|
|
|
|
|
|
def align_and_annotate(original_text: str, pseudo_text: str) -> List[Tuple[str, str]]:
|
|
"""Aligne texte original et pseudonymisé pour créer les annotations BIO.
|
|
|
|
Utilise SequenceMatcher pour trouver les différences.
|
|
Quand le pseudo contient [PLACEHOLDER], les tokens originaux correspondants
|
|
reçoivent le label BIO approprié.
|
|
"""
|
|
orig_tokens = tokenize_text(original_text)
|
|
pseudo_tokens = tokenize_text(pseudo_text)
|
|
|
|
# Normaliser pour l'alignement (lowercase, sans accents pour meilleur matching)
|
|
def normalize(tok):
|
|
return tok.lower().strip(".,;:!?()[]{}\"'")
|
|
|
|
orig_norm = [normalize(t) for t in orig_tokens]
|
|
pseudo_norm = [normalize(t) for t in pseudo_tokens]
|
|
|
|
sm = difflib.SequenceMatcher(None, orig_norm, pseudo_norm, autojunk=False)
|
|
opcodes = sm.get_opcodes()
|
|
|
|
bio_tokens: List[Tuple[str, str]] = []
|
|
|
|
for tag, i1, i2, j1, j2 in opcodes:
|
|
if tag == "equal":
|
|
# Tokens identiques → O
|
|
for t in orig_tokens[i1:i2]:
|
|
bio_tokens.append((t, "O"))
|
|
|
|
elif tag == "replace":
|
|
# Analyser le côté pseudo : quels tokens sont des placeholders ?
|
|
pseudo_chunk = pseudo_tokens[j1:j2]
|
|
placeholder_labels = [] # (index_in_pseudo, bio_label) pour chaque placeholder
|
|
non_placeholder_norms = set()
|
|
for pi, pt in enumerate(pseudo_chunk):
|
|
m = RE_PLACEHOLDER.match(pt)
|
|
if m:
|
|
bio_label = PLACEHOLDER_TO_BIO.get(m.group(1))
|
|
if bio_label:
|
|
placeholder_labels.append((pi, bio_label))
|
|
else:
|
|
non_placeholder_norms.add(normalize(pt))
|
|
|
|
if not placeholder_labels:
|
|
# Pas de placeholder → O
|
|
for t in orig_tokens[i1:i2]:
|
|
bio_tokens.append((t, "O"))
|
|
elif len(placeholder_labels) == 1:
|
|
# Un seul placeholder : tous les tokens originaux (sauf ceux
|
|
# qui matchent un token non-placeholder du pseudo) prennent ce label
|
|
label = placeholder_labels[0][1]
|
|
first = True
|
|
for t in orig_tokens[i1:i2]:
|
|
if normalize(t) in non_placeholder_norms:
|
|
bio_tokens.append((t, "O"))
|
|
first = True
|
|
else:
|
|
prefix = "B-" if first else "I-"
|
|
bio_tokens.append((t, f"{prefix}{label}"))
|
|
first = False
|
|
else:
|
|
# Plusieurs placeholders : distribuer les tokens originaux
|
|
# Stratégie : répartir proportionnellement, chaque groupe commence par B-
|
|
n_orig = i2 - i1
|
|
n_placeholders = len(placeholder_labels)
|
|
# Exclure d'abord les tokens qui matchent des non-placeholders
|
|
orig_assignments = []
|
|
for t in orig_tokens[i1:i2]:
|
|
if normalize(t) in non_placeholder_norms:
|
|
orig_assignments.append(("O", None))
|
|
else:
|
|
orig_assignments.append(("PII", None))
|
|
|
|
# Distribuer les tokens PII entre les placeholders
|
|
pii_indices = [k for k, (tp, _) in enumerate(orig_assignments) if tp == "PII"]
|
|
n_pii = len(pii_indices)
|
|
if n_pii > 0 and n_placeholders > 0:
|
|
chunk_size = max(1, n_pii // n_placeholders)
|
|
for pi_idx, (_, label) in enumerate(placeholder_labels):
|
|
start_pii = pi_idx * chunk_size
|
|
end_pii = (pi_idx + 1) * chunk_size if pi_idx < n_placeholders - 1 else n_pii
|
|
for k in range(start_pii, min(end_pii, n_pii)):
|
|
orig_assignments[pii_indices[k]] = ("PII", label)
|
|
|
|
# Générer les BIO tokens
|
|
prev_label = None
|
|
for k, (t, (tp, label)) in enumerate(zip(orig_tokens[i1:i2], orig_assignments)):
|
|
if tp == "O" or label is None:
|
|
bio_tokens.append((t, "O"))
|
|
prev_label = None
|
|
else:
|
|
prefix = "B-" if label != prev_label else "I-"
|
|
bio_tokens.append((t, f"{prefix}{label}"))
|
|
prev_label = label
|
|
|
|
elif tag == "delete":
|
|
# Tokens présents uniquement dans l'original → O
|
|
for t in orig_tokens[i1:i2]:
|
|
bio_tokens.append((t, "O"))
|
|
|
|
elif tag == "insert":
|
|
# Tokens ajoutés dans le pseudo (rare) → ignorer
|
|
pass
|
|
|
|
return bio_tokens
|
|
|
|
|
|
def export_document(pdf_path: Path, pseudo_path: Path, out_dir: Path) -> Tuple[int, int]:
|
|
"""Exporte un document en format BIO. Retourne (nb_tokens, nb_entités)."""
|
|
# Extraire le texte original
|
|
original_text = extract_original_text(pdf_path)
|
|
if not original_text.strip():
|
|
return 0, 0
|
|
|
|
# Lire le texte pseudonymisé
|
|
pseudo_text = pseudo_path.read_text(encoding="utf-8")
|
|
if not pseudo_text.strip():
|
|
return 0, 0
|
|
|
|
# Aligner et annoter
|
|
bio_tokens = align_and_annotate(original_text, pseudo_text)
|
|
|
|
# Écrire en format CoNLL
|
|
out_name = pdf_path.stem + ".bio"
|
|
out_path = out_dir / out_name
|
|
lines = []
|
|
for token, label in bio_tokens:
|
|
# Séparer les phrases par des lignes vides (ponctuation finale)
|
|
if token in (".", "!", "?") and label == "O":
|
|
lines.append(f"{token}\t{label}")
|
|
lines.append("")
|
|
else:
|
|
lines.append(f"{token}\t{label}")
|
|
|
|
out_path.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
n_ents = sum(1 for _, l in bio_tokens if l.startswith("B-"))
|
|
return len(bio_tokens), n_ents
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Export silver annotations BIO (alignement original ↔ pseudo)")
|
|
parser.add_argument("--out-dir", type=Path,
|
|
default=Path(__file__).parent.parent / "data" / "silver_annotations",
|
|
help="Répertoire de sortie")
|
|
parser.add_argument("--limit", type=int, default=0, help="Limiter à N fichiers (0=tous)")
|
|
args = parser.parse_args()
|
|
|
|
args.out_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Trouver les paires PDF + pseudo
|
|
pseudo_files = sorted(AUDIT_DIR.glob("*.pseudonymise.txt"))
|
|
pairs = []
|
|
for pseudo_path in pseudo_files:
|
|
# Retrouver le PDF source
|
|
base_name = pseudo_path.name.replace(".pseudonymise.txt", ".pdf")
|
|
# Chercher dans les sous-dossiers OGC
|
|
found = list(SRC.glob(f"*/{base_name}"))
|
|
if found:
|
|
pairs.append((found[0], pseudo_path))
|
|
|
|
if args.limit > 0:
|
|
pairs = pairs[:args.limit]
|
|
|
|
print(f"Export silver annotations: {len(pairs)} documents → {args.out_dir}")
|
|
|
|
total_tokens = 0
|
|
total_entities = 0
|
|
for pdf_path, pseudo_path in pairs:
|
|
try:
|
|
n_tok, n_ent = export_document(pdf_path, pseudo_path, args.out_dir)
|
|
total_tokens += n_tok
|
|
total_entities += n_ent
|
|
print(f" {pdf_path.name}: {n_tok} tokens, {n_ent} entités")
|
|
except Exception as e:
|
|
print(f" {pdf_path.name}: ERREUR {e}")
|
|
|
|
print(f"\nTotal: {total_tokens} tokens, {total_entities} entités B-")
|
|
print(f"Sortie: {args.out_dir}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|