Intégration du modèle CamemBERT-bio-deid v3 (F1=0.96, Recall=0.97, 1112 docs)
et corrections qualité issues de l'audit approfondi sur 29 fichiers.
Détection des villes en texte libre :
- Automate Aho-Corasick sur 33K communes INSEE + 11.6K villes FINESS
- Stratégie contextuelle : exige un contexte géographique (à, de, vers,
habite, urgences de, etc.) sauf pour les villes composées (Saint-Palais)
- Blacklist de ~80 communes homonymes de mots courants (charge, signes, plan...)
- Normalisation SAINT↔ST pour les variantes orthographiques
- De 18 fuites de villes à 2 cas résiduels atypiques
Masquage des initiales de prénom :
- Post-traitement regex : "Dr T. [NOM]" → "Dr [NOM] [NOM]"
- Références initiales : "Ref : JF/VA" → "Ref : [NOM]/[NOM]"
Détection texte espacé d'en-tête :
- "C E N T R E H O S P I T A L I E R" → [ETABLISSEMENT]
Autres corrections :
- Fix regex RE_EXTRACT_MME_MR (Mr?.? → Mr.?, \s+ → [ \t]+, * → {0,4})
- Stop words médicaux : lever, coucher, services hospitaliers (viscérale, etc.)
- CamemBERT NER manager : version tracking, propriété version, log F1/Recall
- Script finetune : export ONNX automatique + mise à jour VERSION.json
- Évaluateur qualité : exclusion stop words médicaux des alertes INSEE
Documentation :
- Spécifications techniques CamemBERT-bio-deid v3
- Conformité RGPD + AI Act (caviardage PDF raster)
- AIPD (Analyse d'Impact Protection des Données)
Score qualité : 97.0/100 (Grade A), Leak score 100/100
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
322 lines
11 KiB
Python
322 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
CamemBERT-bio NER Manager — Inférence ONNX pour la désidentification clinique.
|
|
================================================================================
|
|
Modèle fine-tuné sur almanach/camembert-bio-base avec des annotations silver.
|
|
|
|
Versions:
|
|
v2 (2026-03-09): 29 docs, 7K exemples — F1=0.90, Recall=0.93
|
|
v3 (2026-03-11): 1112 docs, 198K exemples — F1=0.96, Recall=0.97
|
|
|
|
Utilisé comme signal NER supplémentaire dans le pipeline d'anonymisation,
|
|
en complément d'EDS-Pseudo et GLiNER (vote majoritaire).
|
|
|
|
Inférence ONNX Runtime CPU : ~10-20 ms pour 512 tokens.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import numpy as np
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import onnxruntime as ort
|
|
_ORT_AVAILABLE = True
|
|
except ImportError:
|
|
ort = None # type: ignore
|
|
_ORT_AVAILABLE = False
|
|
|
|
try:
|
|
from transformers import AutoTokenizer
|
|
_TOKENIZERS_AVAILABLE = True
|
|
except ImportError:
|
|
AutoTokenizer = None # type: ignore
|
|
_TOKENIZERS_AVAILABLE = False
|
|
|
|
DEFAULT_MODEL_DIR = Path(__file__).parent / "models" / "camembert-bio-deid" / "onnx"
|
|
|
|
# Mapping labels BIO du modèle → clés PLACEHOLDERS (anonymizer_core)
|
|
CAMEMBERT_LABEL_MAP: Dict[str, str] = {
|
|
"PER": "NOM",
|
|
"TEL": "TEL",
|
|
"EMAIL": "EMAIL",
|
|
"NIR": "NIR",
|
|
"IPP": "IPP",
|
|
"NDA": "NDA",
|
|
"RPPS": "RPPS",
|
|
"DATE_NAISSANCE": "DATE_NAISSANCE",
|
|
"ADRESSE": "ADRESSE",
|
|
"ZIP": "CODE_POSTAL",
|
|
"VILLE": "VILLE",
|
|
"HOPITAL": "ETAB",
|
|
"IBAN": "IBAN",
|
|
"AGE": "AGE",
|
|
}
|
|
|
|
|
|
class CamembertNerManager:
|
|
"""Gestionnaire CamemBERT-bio ONNX pour NER token classification."""
|
|
|
|
def __init__(self, model_dir: Optional[Path] = None):
|
|
self._model_dir = Path(model_dir) if model_dir else DEFAULT_MODEL_DIR
|
|
self._session: Optional[Any] = None
|
|
self._tokenizer: Optional[Any] = None
|
|
self._id2label: Dict[int, str] = {}
|
|
self._loaded = False
|
|
|
|
def is_loaded(self) -> bool:
|
|
return self._loaded
|
|
|
|
@property
|
|
def version(self) -> str:
|
|
return getattr(self, "_version", "?")
|
|
|
|
def load(self) -> None:
|
|
"""Charge le modèle ONNX et le tokenizer."""
|
|
if not _ORT_AVAILABLE:
|
|
raise RuntimeError("onnxruntime non disponible. Installez : pip install onnxruntime")
|
|
if not _TOKENIZERS_AVAILABLE:
|
|
raise RuntimeError("transformers non disponible. Installez : pip install transformers")
|
|
|
|
model_path = self._model_dir / "model.onnx"
|
|
if not model_path.exists():
|
|
raise FileNotFoundError(f"Modèle ONNX non trouvé: {model_path}")
|
|
|
|
self.unload()
|
|
|
|
# Charger id2label depuis config.json
|
|
config_path = self._model_dir / "config.json"
|
|
with open(config_path, encoding="utf-8") as f:
|
|
cfg = json.load(f)
|
|
self._id2label = {int(k): v for k, v in cfg.get("id2label", {}).items()}
|
|
|
|
# Session ONNX (CPU)
|
|
opts = ort.SessionOptions()
|
|
opts.inter_op_num_threads = 2
|
|
opts.intra_op_num_threads = 4
|
|
self._session = ort.InferenceSession(
|
|
str(model_path),
|
|
sess_options=opts,
|
|
providers=["CPUExecutionProvider"],
|
|
)
|
|
|
|
# Tokenizer
|
|
self._tokenizer = AutoTokenizer.from_pretrained(str(self._model_dir))
|
|
self._loaded = True
|
|
|
|
# Lire la version depuis VERSION.json (si disponible)
|
|
self._version = "?"
|
|
version_path = self._model_dir.parent / "VERSION.json"
|
|
if version_path.exists():
|
|
try:
|
|
with open(version_path, encoding="utf-8") as vf:
|
|
vinfo = json.load(vf)
|
|
self._version = vinfo.get("current_version", "?")
|
|
v_meta = vinfo.get("versions", {}).get(self._version, {})
|
|
f1 = v_meta.get("f1", "?")
|
|
recall = v_meta.get("recall", "?")
|
|
log.info(f"CamemBERT-bio ONNX {self._version} chargé (F1={f1}, R={recall}, {len(self._id2label)} labels)")
|
|
except Exception:
|
|
log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)")
|
|
else:
|
|
log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)")
|
|
|
|
def unload(self) -> None:
|
|
self._session = None
|
|
self._tokenizer = None
|
|
self._id2label = {}
|
|
self._loaded = False
|
|
|
|
def predict(self, text: str, threshold: float = 0.5) -> List[Dict[str, Any]]:
|
|
"""Prédit les entités NER dans un texte.
|
|
|
|
Agrège les sous-tokens en entités mot-level avec label BIO.
|
|
|
|
Returns:
|
|
Liste de dicts avec: word, label, bio_label, score, start, end
|
|
(label = catégorie sans B-/I-, bio_label = label complet)
|
|
"""
|
|
if not self._loaded:
|
|
return []
|
|
|
|
# Tokenize
|
|
encoding = self._tokenizer(
|
|
text,
|
|
return_tensors="np",
|
|
truncation=True,
|
|
max_length=512,
|
|
return_offsets_mapping=True,
|
|
)
|
|
offsets = encoding.pop("offset_mapping")[0] # (seq_len, 2)
|
|
|
|
# Inférence
|
|
inputs = {k: v for k, v in encoding.items() if k in ("input_ids", "attention_mask")}
|
|
outputs = self._session.run(None, inputs)
|
|
logits = outputs[0][0] # (seq_len, num_labels)
|
|
|
|
# Softmax pour les scores
|
|
exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True))
|
|
probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True)
|
|
|
|
predictions = np.argmax(logits, axis=-1)
|
|
scores = np.max(probs, axis=-1)
|
|
|
|
# Agréger les sous-tokens en entités
|
|
entities = []
|
|
current_entity = None
|
|
|
|
for i, (pred_id, score, (start, end)) in enumerate(zip(predictions, scores, offsets)):
|
|
# Ignorer les tokens spéciaux (offset 0,0)
|
|
if start == 0 and end == 0:
|
|
if current_entity is not None:
|
|
entities.append(current_entity)
|
|
current_entity = None
|
|
continue
|
|
|
|
label = self._id2label.get(int(pred_id), "O")
|
|
|
|
if label == "O":
|
|
if current_entity is not None:
|
|
entities.append(current_entity)
|
|
current_entity = None
|
|
continue
|
|
|
|
# Extraire la catégorie (sans B-/I-)
|
|
if label.startswith("B-"):
|
|
category = label[2:]
|
|
# Nouvelle entité
|
|
if current_entity is not None:
|
|
entities.append(current_entity)
|
|
current_entity = {
|
|
"word": text[int(start):int(end)],
|
|
"label": category,
|
|
"bio_label": label,
|
|
"score": float(score),
|
|
"start": int(start),
|
|
"end": int(end),
|
|
"_scores": [float(score)],
|
|
}
|
|
elif label.startswith("I-"):
|
|
category = label[2:]
|
|
if current_entity is not None and current_entity["label"] == category:
|
|
# Continuer l'entité
|
|
current_entity["word"] = text[current_entity["start"]:int(end)]
|
|
current_entity["end"] = int(end)
|
|
current_entity["_scores"].append(float(score))
|
|
else:
|
|
# I- sans B- correspondant → traiter comme B-
|
|
if current_entity is not None:
|
|
entities.append(current_entity)
|
|
current_entity = {
|
|
"word": text[int(start):int(end)],
|
|
"label": category,
|
|
"bio_label": f"B-{category}",
|
|
"score": float(score),
|
|
"start": int(start),
|
|
"end": int(end),
|
|
"_scores": [float(score)],
|
|
}
|
|
|
|
if current_entity is not None:
|
|
entities.append(current_entity)
|
|
|
|
# Calculer le score moyen et filtrer par seuil
|
|
result = []
|
|
for e in entities:
|
|
avg_score = sum(e["_scores"]) / len(e["_scores"])
|
|
e["score"] = avg_score
|
|
del e["_scores"]
|
|
if avg_score >= threshold:
|
|
result.append(e)
|
|
|
|
return result
|
|
|
|
def predict_long(self, text: str, threshold: float = 0.5,
|
|
window_size: int = 400, stride: int = 200) -> List[Dict[str, Any]]:
|
|
"""Prédit sur un texte long avec fenêtres glissantes.
|
|
|
|
Pour les documents > 512 tokens, découpe en fenêtres chevauchantes
|
|
et fusionne les résultats (déduplique par position).
|
|
"""
|
|
if not self._loaded:
|
|
return []
|
|
|
|
# Si le texte est court, prédiction directe
|
|
tokens_estimate = len(text.split())
|
|
if tokens_estimate <= 400:
|
|
return self.predict(text, threshold=threshold)
|
|
|
|
# Découper en fenêtres par mots (approximation)
|
|
words = text.split()
|
|
all_entities = []
|
|
seen_spans = set()
|
|
|
|
for start_word in range(0, len(words), stride):
|
|
end_word = min(start_word + window_size, len(words))
|
|
chunk = " ".join(words[start_word:end_word])
|
|
|
|
# Calculer l'offset de caractère du début de la fenêtre
|
|
char_offset = len(" ".join(words[:start_word]))
|
|
if start_word > 0:
|
|
char_offset += 1 # espace avant le premier mot de la fenêtre
|
|
|
|
entities = self.predict(chunk, threshold=threshold)
|
|
for e in entities:
|
|
# Ajuster les positions par rapport au texte complet
|
|
abs_start = e["start"] + char_offset
|
|
abs_end = e["end"] + char_offset
|
|
span_key = (abs_start, abs_end)
|
|
if span_key not in seen_spans:
|
|
seen_spans.add(span_key)
|
|
e["start"] = abs_start
|
|
e["end"] = abs_end
|
|
all_entities.append(e)
|
|
|
|
if end_word >= len(words):
|
|
break
|
|
|
|
return sorted(all_entities, key=lambda e: e["start"])
|
|
|
|
def validate_eds_entities(
|
|
self,
|
|
text: str,
|
|
eds_entities: List[Dict[str, Any]],
|
|
threshold: float = 0.4,
|
|
) -> List[Dict[str, Any]]:
|
|
"""Valide les entités EDS-Pseudo via CamemBERT-bio (vote croisé).
|
|
|
|
Chaque entité EDS reçoit un champ 'camembert_confirmed': True/False/None.
|
|
- True : CamemBERT-bio aussi détecte ce span comme PII
|
|
- False : CamemBERT-bio ne détecte rien à cette position
|
|
- None : pas de prédiction (modèle non chargé)
|
|
"""
|
|
if not self._loaded or not eds_entities:
|
|
return eds_entities
|
|
|
|
# Prédiction CamemBERT-bio
|
|
cam_preds = self.predict_long(text, threshold=threshold)
|
|
|
|
for e in eds_entities:
|
|
e_word = (e.get("word") or "").lower().strip()
|
|
if not e_word:
|
|
e["camembert_confirmed"] = None
|
|
continue
|
|
|
|
confirmed = False
|
|
for c in cam_preds:
|
|
c_word = c["word"].lower().strip()
|
|
# Match par texte (tolérant aux sous-chaînes)
|
|
if c_word == e_word or e_word in c_word or c_word in e_word:
|
|
confirmed = True
|
|
break
|
|
|
|
e["camembert_confirmed"] = confirmed
|
|
|
|
return eds_entities
|