feat(phase2): Multi-signal NER — BDPM gazetteers, confiance EDS, safe patterns, GLiNER

Chantier 1: Intégration BDPM (5737 médicaments officiels) dans medication whitelist
Chantier 2: Safe patterns contextuels (dosages mg/mL/cpr, formes pharma, même ligne)
Chantier 3: Scores de confiance NER réels (edsnlp 0.20 ner_confidence_score)
Chantier 4: GLiNER zero-shot (urchade/gliner_multi_pii-v1) en vote croisé
Chantier 5: Scripts export silver annotations + fine-tuning CamemBERT-bio

0 fuite, 0 régression, -18 FP supplémentaires éliminés.
Sécurité: GLiNER ne peut rejeter que si confiance NER < 0.70.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 12:01:46 +01:00
parent 782551c1c6
commit 26ac02b0cb
16 changed files with 6431 additions and 41 deletions

View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""
Export silver annotations — Génère des données d'entraînement BIO à partir du pipeline existant.
================================================================================================
Utilise le pipeline regex+NER+VLM actuel pour produire des annotations "silver standard"
sur les 706 OGC. Ces annotations servent de base pour fine-tuner CamemBERT-bio.
Usage:
python scripts/export_silver_annotations.py [--limit N] [--out-dir DIR]
Output: data/silver_annotations/ avec un fichier .bio par document
Format BIO: TOKEN\tLABEL (un token par ligne, lignes vides entre phrases)
"""
import sys
import re
import json
import argparse
from pathlib import Path
from typing import List, Tuple
sys.path.insert(0, str(Path(__file__).parent.parent))
# Regex pour détecter les placeholders et reconstruire l'alignement
PLACEHOLDER_RE = re.compile(
r"\[(NOM|TEL|EMAIL|NIR|IPP|DOSSIER|NDA|EPISODE|RPPS|DATE_NAISSANCE|"
r"ADRESSE|CODE_POSTAL|VILLE|MASK|FINESS|OGC|AGE|ETAB|IBAN)\]"
)
# Mapping placeholder → label BIO
PH_TO_BIO = {
"NOM": "PER",
"TEL": "TEL",
"EMAIL": "EMAIL",
"NIR": "NIR",
"IPP": "IPP",
"DOSSIER": "NDA",
"NDA": "NDA",
"EPISODE": "NDA",
"RPPS": "RPPS",
"DATE_NAISSANCE": "DATE_NAISSANCE",
"ADRESSE": "ADRESSE",
"CODE_POSTAL": "ZIP",
"VILLE": "VILLE",
"ETAB": "HOPITAL",
"FINESS": "HOPITAL",
"IBAN": "IBAN",
"AGE": "AGE",
"OGC": "NDA",
"MASK": "O", # MASK générique = pas d'annotation spécifique
}
def text_to_bio(pseudonymised_text: str) -> List[Tuple[str, str]]:
"""Convertit un texte pseudonymisé en séquence BIO.
Les tokens [PLACEHOLDER] deviennent B-TYPE / I-TYPE.
Les tokens normaux deviennent O.
"""
bio_tokens: List[Tuple[str, str]] = []
# Split le texte en segments : alternance texte normal / placeholder
parts = PLACEHOLDER_RE.split(pseudonymised_text)
# parts = [texte, label, texte, label, texte, ...]
i = 0
while i < len(parts):
if i % 2 == 0:
# Texte normal
text_part = parts[i]
for word in text_part.split():
word = word.strip()
if word:
bio_tokens.append((word, "O"))
else:
# Label de placeholder
label = parts[i]
bio_label = PH_TO_BIO.get(label, "O")
if bio_label != "O":
# Le placeholder remplace un ou plusieurs tokens
bio_tokens.append((f"[{label}]", f"B-{bio_label}"))
else:
bio_tokens.append((f"[{label}]", "O"))
i += 1
return bio_tokens
def export_document(pseudo_path: Path, out_dir: Path) -> int:
"""Exporte un fichier pseudonymisé en format BIO. Retourne le nombre de tokens."""
text = pseudo_path.read_text(encoding="utf-8", errors="replace")
bio_tokens = text_to_bio(text)
if not bio_tokens:
return 0
# Écrire en format CoNLL (TOKEN\tLABEL)
out_path = out_dir / pseudo_path.name.replace(".pseudonymise.txt", ".bio")
lines = []
for token, label in bio_tokens:
# Séparer les "phrases" par des lignes vides (heuristique: point final ou retour ligne)
if token in (".", "!", "?") and label == "O":
lines.append(f"{token}\t{label}")
lines.append("") # séparateur de phrase
else:
lines.append(f"{token}\t{label}")
out_path.write_text("\n".join(lines), encoding="utf-8")
return len(bio_tokens)
def main():
parser = argparse.ArgumentParser(description="Export silver annotations BIO")
parser.add_argument("--input-dir", type=Path,
default=Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)/anonymise_audit_30"),
help="Répertoire contenant les .pseudonymise.txt")
parser.add_argument("--out-dir", type=Path,
default=Path(__file__).parent.parent / "data" / "silver_annotations",
help="Répertoire de sortie")
parser.add_argument("--limit", type=int, default=0, help="Limiter à N fichiers (0=tous)")
args = parser.parse_args()
args.out_dir.mkdir(parents=True, exist_ok=True)
pseudo_files = sorted(args.input_dir.glob("*.pseudonymise.txt"))
if args.limit > 0:
pseudo_files = pseudo_files[:args.limit]
print(f"Export silver annotations: {len(pseudo_files)} fichiers → {args.out_dir}")
total_tokens = 0
total_entities = 0
for f in pseudo_files:
n = export_document(f, args.out_dir)
ent_count = sum(1 for line in (args.out_dir / f.name.replace(".pseudonymise.txt", ".bio")).read_text().splitlines()
if line and not line.endswith("\tO"))
total_tokens += n
total_entities += ent_count
print(f" {f.name}: {n} tokens, {ent_count} entités")
print(f"\nTotal: {total_tokens} tokens, {total_entities} entités annotées")
print(f"Sortie: {args.out_dir}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""
Fine-tune CamemBERT-bio pour la désidentification clinique française.
=====================================================================
Entraîne almanach/camembert-bio-base sur les annotations silver/gold
exportées par export_silver_annotations.py.
Usage:
python scripts/finetune_camembert_bio.py [--epochs 5] [--batch-size 8] [--lr 2e-5]
Prérequis: pip install transformers datasets seqeval accelerate
Export ONNX post-training: python scripts/export_onnx.py
"""
import sys
import argparse
from pathlib import Path
from typing import Dict, List
import numpy as np
# Vérifier les dépendances
try:
from transformers import (
AutoTokenizer,
AutoModelForTokenClassification,
TrainingArguments,
Trainer,
DataCollatorForTokenClassification,
)
from datasets import Dataset, DatasetDict
import evaluate
except ImportError as e:
print(f"Dépendance manquante: {e}")
print("Installez: pip install transformers datasets seqeval accelerate")
sys.exit(1)
# Labels BIO pour la désidentification
LABEL_LIST = [
"O",
"B-PER", "I-PER",
"B-TEL", "I-TEL",
"B-EMAIL", "I-EMAIL",
"B-NIR", "I-NIR",
"B-IPP", "I-IPP",
"B-NDA", "I-NDA",
"B-RPPS", "I-RPPS",
"B-DATE_NAISSANCE", "I-DATE_NAISSANCE",
"B-ADRESSE", "I-ADRESSE",
"B-ZIP", "I-ZIP",
"B-VILLE", "I-VILLE",
"B-HOPITAL", "I-HOPITAL",
"B-IBAN", "I-IBAN",
"B-AGE", "I-AGE",
]
LABEL2ID = {l: i for i, l in enumerate(LABEL_LIST)}
ID2LABEL = {i: l for l, i in LABEL2ID.items()}
MODEL_NAME = "almanach/camembert-bio-base"
def load_bio_files(data_dir: Path) -> Dict[str, List]:
"""Charge les fichiers .bio en format HuggingFace datasets."""
tokens_list: List[List[str]] = []
labels_list: List[List[int]] = []
for bio_file in sorted(data_dir.glob("*.bio")):
text = bio_file.read_text(encoding="utf-8")
current_tokens: List[str] = []
current_labels: List[int] = []
for line in text.splitlines():
line = line.strip()
if not line:
# Fin de phrase
if current_tokens:
tokens_list.append(current_tokens)
labels_list.append(current_labels)
current_tokens = []
current_labels = []
continue
parts = line.split("\t")
if len(parts) != 2:
continue
token, label = parts
label_id = LABEL2ID.get(label, LABEL2ID["O"])
current_tokens.append(token)
current_labels.append(label_id)
if current_tokens:
tokens_list.append(current_tokens)
labels_list.append(current_labels)
return {"tokens": tokens_list, "ner_tags": labels_list}
def tokenize_and_align(examples, tokenizer):
"""Tokenize et aligne les labels avec les sous-tokens."""
tokenized = tokenizer(
examples["tokens"],
truncation=True,
is_split_into_words=True,
max_length=512,
padding=False,
)
all_labels = []
for i, labels in enumerate(examples["ner_tags"]):
word_ids = tokenized.word_ids(batch_index=i)
label_ids = []
prev_word_id = None
for word_id in word_ids:
if word_id is None:
label_ids.append(-100)
elif word_id != prev_word_id:
label_ids.append(labels[word_id])
else:
# Sous-token : I- si le premier est B-, sinon même label
orig = labels[word_id]
if orig > 0 and LABEL_LIST[orig].startswith("B-"):
# Convertir B- en I-
i_label = LABEL_LIST[orig].replace("B-", "I-")
label_ids.append(LABEL2ID.get(i_label, orig))
else:
label_ids.append(orig)
prev_word_id = word_id
all_labels.append(label_ids)
tokenized["labels"] = all_labels
return tokenized
def main():
parser = argparse.ArgumentParser(description="Fine-tune CamemBERT-bio pour désidentification")
parser.add_argument("--data-dir", type=Path,
default=Path(__file__).parent.parent / "data" / "silver_annotations",
help="Répertoire des fichiers .bio")
parser.add_argument("--output-dir", type=Path,
default=Path(__file__).parent.parent / "models" / "camembert-bio-deid",
help="Répertoire de sortie du modèle")
parser.add_argument("--epochs", type=int, default=5)
parser.add_argument("--batch-size", type=int, default=8)
parser.add_argument("--lr", type=float, default=2e-5)
parser.add_argument("--val-split", type=float, default=0.15, help="Fraction pour validation")
args = parser.parse_args()
# Charger les données
print(f"Chargement des données depuis {args.data_dir}...")
raw_data = load_bio_files(args.data_dir)
n_sentences = len(raw_data["tokens"])
n_entities = sum(1 for labels in raw_data["ner_tags"] for l in labels if l != 0)
print(f" {n_sentences} phrases, {n_entities} entités annotées")
if n_sentences < 10:
print("ERREUR: pas assez de données. Lancez d'abord export_silver_annotations.py")
sys.exit(1)
# Split train/val
dataset = Dataset.from_dict(raw_data)
split = dataset.train_test_split(test_size=args.val_split, seed=42)
datasets = DatasetDict({"train": split["train"], "validation": split["test"]})
print(f" Train: {len(datasets['train'])}, Validation: {len(datasets['validation'])}")
# Tokenizer + modèle
print(f"\nChargement du modèle {MODEL_NAME}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model = AutoModelForTokenClassification.from_pretrained(
MODEL_NAME,
num_labels=len(LABEL_LIST),
id2label=ID2LABEL,
label2id=LABEL2ID,
)
# Tokenization
tokenized = datasets.map(
lambda ex: tokenize_and_align(ex, tokenizer),
batched=True,
remove_columns=datasets["train"].column_names,
)
# Métriques
seqeval = evaluate.load("seqeval")
def compute_metrics(eval_pred):
logits, labels = eval_pred
predictions = np.argmax(logits, axis=-1)
true_labels = []
true_preds = []
for pred_seq, label_seq in zip(predictions, labels):
t_labels = []
t_preds = []
for p, l in zip(pred_seq, label_seq):
if l != -100:
t_labels.append(LABEL_LIST[l])
t_preds.append(LABEL_LIST[p])
true_labels.append(t_labels)
true_preds.append(t_preds)
results = seqeval.compute(predictions=true_preds, references=true_labels)
return {
"precision": results["overall_precision"],
"recall": results["overall_recall"],
"f1": results["overall_f1"],
}
# Training
args.output_dir.mkdir(parents=True, exist_ok=True)
training_args = TrainingArguments(
output_dir=str(args.output_dir),
num_train_epochs=args.epochs,
per_device_train_batch_size=args.batch_size,
per_device_eval_batch_size=args.batch_size * 2,
learning_rate=args.lr,
weight_decay=0.01,
warmup_ratio=0.1,
eval_strategy="epoch",
save_strategy="epoch",
load_best_model_at_end=True,
metric_for_best_model="f1",
logging_steps=50,
fp16=False, # CPU training
report_to="none",
save_total_limit=2,
)
data_collator = DataCollatorForTokenClassification(tokenizer)
trainer = Trainer(
model=model,
args=training_args,
train_dataset=tokenized["train"],
eval_dataset=tokenized["validation"],
data_collator=data_collator,
compute_metrics=compute_metrics,
tokenizer=tokenizer,
)
print(f"\nDémarrage du fine-tuning ({args.epochs} epochs, batch={args.batch_size}, lr={args.lr})...")
trainer.train()
# Sauvegarder
trainer.save_model(str(args.output_dir / "best"))
tokenizer.save_pretrained(str(args.output_dir / "best"))
print(f"\nModèle sauvegardé: {args.output_dir / 'best'}")
# Évaluation finale
results = trainer.evaluate()
print(f"\nRésultats finaux:")
print(f" Precision: {results['eval_precision']:.4f}")
print(f" Recall: {results['eval_recall']:.4f}")
print(f" F1: {results['eval_f1']:.4f}")
print(f"\nPour exporter en ONNX:")
print(f" python -m optimum.exporters.onnx --model {args.output_dir / 'best'} {args.output_dir / 'onnx'}")
if __name__ == "__main__":
main()