feat(phase2): Intégration CamemBERT-bio ONNX comme 3e signal NER (vote triple)

- camembert_ner_manager.py : inférence ONNX CPU (~10ms), predict/predict_long/validate_eds_entities
- Vote triple NER : EDS-Pseudo (confiance) + GLiNER (zero-shot) + CamemBERT-bio (fine-tuné F1=89%)
- CamemBERT-bio peut sauver un vrai nom à basse confiance EDS (camembert_confirmed=True)
- CamemBERT-bio confirme le rejet des FP médicaux (Paracétamol, Tramadol → False)
- Intégré dans process_pdf via paramètre camembert_manager
- run_batch_30_audit.py mis à jour pour charger le modèle

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-03-09 13:42:56 +01:00
parent 26b210607c
commit 19e089ea38
3 changed files with 326 additions and 10 deletions

298
camembert_ner_manager.py Normal file
View File

@@ -0,0 +1,298 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
CamemBERT-bio NER Manager — Inférence ONNX pour la désidentification clinique.
================================================================================
Modèle fine-tuné sur almanach/camembert-bio-base avec des annotations silver
issues de 29 documents cliniques français (F1=89% sur validation).
Utilisé comme signal NER supplémentaire dans le pipeline d'anonymisation,
en complément d'EDS-Pseudo et GLiNER (vote majoritaire).
Inférence ONNX Runtime CPU : ~20 ms pour 512 tokens.
"""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import Any, Dict, List, Optional
import numpy as np
log = logging.getLogger(__name__)
try:
import onnxruntime as ort
_ORT_AVAILABLE = True
except ImportError:
ort = None # type: ignore
_ORT_AVAILABLE = False
try:
from transformers import AutoTokenizer
_TOKENIZERS_AVAILABLE = True
except ImportError:
AutoTokenizer = None # type: ignore
_TOKENIZERS_AVAILABLE = False
DEFAULT_MODEL_DIR = Path(__file__).parent / "models" / "camembert-bio-deid" / "onnx"
# Mapping labels BIO du modèle → clés PLACEHOLDERS (anonymizer_core)
CAMEMBERT_LABEL_MAP: Dict[str, str] = {
"PER": "NOM",
"TEL": "TEL",
"EMAIL": "EMAIL",
"NIR": "NIR",
"IPP": "IPP",
"NDA": "NDA",
"RPPS": "RPPS",
"DATE_NAISSANCE": "DATE_NAISSANCE",
"ADRESSE": "ADRESSE",
"ZIP": "CODE_POSTAL",
"VILLE": "VILLE",
"HOPITAL": "ETAB",
"IBAN": "IBAN",
"AGE": "AGE",
}
class CamembertNerManager:
"""Gestionnaire CamemBERT-bio ONNX pour NER token classification."""
def __init__(self, model_dir: Optional[Path] = None):
self._model_dir = Path(model_dir) if model_dir else DEFAULT_MODEL_DIR
self._session: Optional[Any] = None
self._tokenizer: Optional[Any] = None
self._id2label: Dict[int, str] = {}
self._loaded = False
def is_loaded(self) -> bool:
return self._loaded
def load(self) -> None:
"""Charge le modèle ONNX et le tokenizer."""
if not _ORT_AVAILABLE:
raise RuntimeError("onnxruntime non disponible. Installez : pip install onnxruntime")
if not _TOKENIZERS_AVAILABLE:
raise RuntimeError("transformers non disponible. Installez : pip install transformers")
model_path = self._model_dir / "model.onnx"
if not model_path.exists():
raise FileNotFoundError(f"Modèle ONNX non trouvé: {model_path}")
self.unload()
# Charger id2label depuis config.json
config_path = self._model_dir / "config.json"
with open(config_path, encoding="utf-8") as f:
cfg = json.load(f)
self._id2label = {int(k): v for k, v in cfg.get("id2label", {}).items()}
# Session ONNX (CPU)
opts = ort.SessionOptions()
opts.inter_op_num_threads = 2
opts.intra_op_num_threads = 4
self._session = ort.InferenceSession(
str(model_path),
sess_options=opts,
providers=["CPUExecutionProvider"],
)
# Tokenizer
self._tokenizer = AutoTokenizer.from_pretrained(str(self._model_dir))
self._loaded = True
log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)")
def unload(self) -> None:
self._session = None
self._tokenizer = None
self._id2label = {}
self._loaded = False
def predict(self, text: str, threshold: float = 0.5) -> List[Dict[str, Any]]:
"""Prédit les entités NER dans un texte.
Agrège les sous-tokens en entités mot-level avec label BIO.
Returns:
Liste de dicts avec: word, label, bio_label, score, start, end
(label = catégorie sans B-/I-, bio_label = label complet)
"""
if not self._loaded:
return []
# Tokenize
encoding = self._tokenizer(
text,
return_tensors="np",
truncation=True,
max_length=512,
return_offsets_mapping=True,
)
offsets = encoding.pop("offset_mapping")[0] # (seq_len, 2)
# Inférence
inputs = {k: v for k, v in encoding.items() if k in ("input_ids", "attention_mask")}
outputs = self._session.run(None, inputs)
logits = outputs[0][0] # (seq_len, num_labels)
# Softmax pour les scores
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
predictions = np.argmax(logits, axis=-1)
scores = np.max(probs, axis=-1)
# Agréger les sous-tokens en entités
entities = []
current_entity = None
for i, (pred_id, score, (start, end)) in enumerate(zip(predictions, scores, offsets)):
# Ignorer les tokens spéciaux (offset 0,0)
if start == 0 and end == 0:
if current_entity is not None:
entities.append(current_entity)
current_entity = None
continue
label = self._id2label.get(int(pred_id), "O")
if label == "O":
if current_entity is not None:
entities.append(current_entity)
current_entity = None
continue
# Extraire la catégorie (sans B-/I-)
if label.startswith("B-"):
category = label[2:]
# Nouvelle entité
if current_entity is not None:
entities.append(current_entity)
current_entity = {
"word": text[int(start):int(end)],
"label": category,
"bio_label": label,
"score": float(score),
"start": int(start),
"end": int(end),
"_scores": [float(score)],
}
elif label.startswith("I-"):
category = label[2:]
if current_entity is not None and current_entity["label"] == category:
# Continuer l'entité
current_entity["word"] = text[current_entity["start"]:int(end)]
current_entity["end"] = int(end)
current_entity["_scores"].append(float(score))
else:
# I- sans B- correspondant → traiter comme B-
if current_entity is not None:
entities.append(current_entity)
current_entity = {
"word": text[int(start):int(end)],
"label": category,
"bio_label": f"B-{category}",
"score": float(score),
"start": int(start),
"end": int(end),
"_scores": [float(score)],
}
if current_entity is not None:
entities.append(current_entity)
# Calculer le score moyen et filtrer par seuil
result = []
for e in entities:
avg_score = sum(e["_scores"]) / len(e["_scores"])
e["score"] = avg_score
del e["_scores"]
if avg_score >= threshold:
result.append(e)
return result
def predict_long(self, text: str, threshold: float = 0.5,
window_size: int = 400, stride: int = 200) -> List[Dict[str, Any]]:
"""Prédit sur un texte long avec fenêtres glissantes.
Pour les documents > 512 tokens, découpe en fenêtres chevauchantes
et fusionne les résultats (déduplique par position).
"""
if not self._loaded:
return []
# Si le texte est court, prédiction directe
tokens_estimate = len(text.split())
if tokens_estimate <= 400:
return self.predict(text, threshold=threshold)
# Découper en fenêtres par mots (approximation)
words = text.split()
all_entities = []
seen_spans = set()
for start_word in range(0, len(words), stride):
end_word = min(start_word + window_size, len(words))
chunk = " ".join(words[start_word:end_word])
# Calculer l'offset de caractère du début de la fenêtre
char_offset = len(" ".join(words[:start_word]))
if start_word > 0:
char_offset += 1 # espace avant le premier mot de la fenêtre
entities = self.predict(chunk, threshold=threshold)
for e in entities:
# Ajuster les positions par rapport au texte complet
abs_start = e["start"] + char_offset
abs_end = e["end"] + char_offset
span_key = (abs_start, abs_end)
if span_key not in seen_spans:
seen_spans.add(span_key)
e["start"] = abs_start
e["end"] = abs_end
all_entities.append(e)
if end_word >= len(words):
break
return sorted(all_entities, key=lambda e: e["start"])
def validate_eds_entities(
self,
text: str,
eds_entities: List[Dict[str, Any]],
threshold: float = 0.4,
) -> List[Dict[str, Any]]:
"""Valide les entités EDS-Pseudo via CamemBERT-bio (vote croisé).
Chaque entité EDS reçoit un champ 'camembert_confirmed': True/False/None.
- True : CamemBERT-bio aussi détecte ce span comme PII
- False : CamemBERT-bio ne détecte rien à cette position
- None : pas de prédiction (modèle non chargé)
"""
if not self._loaded or not eds_entities:
return eds_entities
# Prédiction CamemBERT-bio
cam_preds = self.predict_long(text, threshold=threshold)
for e in eds_entities:
e_word = (e.get("word") or "").lower().strip()
if not e_word:
e["camembert_confirmed"] = None
continue
confirmed = False
for c in cam_preds:
c_word = c["word"].lower().strip()
# Match par texte (tolérant aux sous-chaînes)
if c_word == e_word or e_word in c_word or c_word in e_word:
confirmed = True
break
e["camembert_confirmed"] = confirmed
return eds_entities