#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ CamemBERT-bio NER Manager — Inférence ONNX pour la désidentification clinique. ================================================================================ Modèle fine-tuné sur almanach/camembert-bio-base avec des annotations silver. Versions: v2 (2026-03-09): 29 docs, 7K exemples — F1=0.90, Recall=0.93 v3 (2026-03-11): 1112 docs, 198K exemples — F1=0.96, Recall=0.97 Utilisé comme signal NER supplémentaire dans le pipeline d'anonymisation, en complément d'EDS-Pseudo et GLiNER (vote majoritaire). Inférence ONNX Runtime CPU : ~10-20 ms pour 512 tokens. """ from __future__ import annotations import json import logging from pathlib import Path from typing import Any, Dict, List, Optional import numpy as np log = logging.getLogger(__name__) try: import onnxruntime as ort _ORT_AVAILABLE = True except ImportError: ort = None # type: ignore _ORT_AVAILABLE = False try: from transformers import AutoTokenizer _TOKENIZERS_AVAILABLE = True except ImportError: AutoTokenizer = None # type: ignore _TOKENIZERS_AVAILABLE = False DEFAULT_MODEL_DIR = Path(__file__).parent / "models" / "camembert-bio-deid" / "onnx" # Mapping labels BIO du modèle → clés PLACEHOLDERS (anonymizer_core) CAMEMBERT_LABEL_MAP: Dict[str, str] = { "PER": "NOM", "TEL": "TEL", "EMAIL": "EMAIL", "NIR": "NIR", "IPP": "IPP", "NDA": "NDA", "RPPS": "RPPS", "DATE_NAISSANCE": "DATE_NAISSANCE", "ADRESSE": "ADRESSE", "ZIP": "CODE_POSTAL", "VILLE": "VILLE", "HOPITAL": "ETAB", "IBAN": "IBAN", "AGE": "AGE", } class CamembertNerManager: """Gestionnaire CamemBERT-bio ONNX pour NER token classification.""" def __init__(self, model_dir: Optional[Path] = None): self._model_dir = Path(model_dir) if model_dir else DEFAULT_MODEL_DIR self._session: Optional[Any] = None self._tokenizer: Optional[Any] = None self._id2label: Dict[int, str] = {} self._loaded = False def is_loaded(self) -> bool: return self._loaded @property def version(self) -> str: return getattr(self, "_version", "?") def load(self) -> None: """Charge le modèle ONNX et le tokenizer.""" if not _ORT_AVAILABLE: raise RuntimeError("onnxruntime non disponible. Installez : pip install onnxruntime") if not _TOKENIZERS_AVAILABLE: raise RuntimeError("transformers non disponible. Installez : pip install transformers") model_path = self._model_dir / "model.onnx" if not model_path.exists(): raise FileNotFoundError(f"Modèle ONNX non trouvé: {model_path}") self.unload() # Charger id2label depuis config.json config_path = self._model_dir / "config.json" with open(config_path, encoding="utf-8") as f: cfg = json.load(f) self._id2label = {int(k): v for k, v in cfg.get("id2label", {}).items()} # Session ONNX (CPU) opts = ort.SessionOptions() opts.inter_op_num_threads = 2 opts.intra_op_num_threads = 4 self._session = ort.InferenceSession( str(model_path), sess_options=opts, providers=["CPUExecutionProvider"], ) # Tokenizer self._tokenizer = AutoTokenizer.from_pretrained(str(self._model_dir)) self._loaded = True # Lire la version depuis VERSION.json (si disponible) self._version = "?" version_path = self._model_dir.parent / "VERSION.json" if version_path.exists(): try: with open(version_path, encoding="utf-8") as vf: vinfo = json.load(vf) self._version = vinfo.get("current_version", "?") v_meta = vinfo.get("versions", {}).get(self._version, {}) f1 = v_meta.get("f1", "?") recall = v_meta.get("recall", "?") log.info(f"CamemBERT-bio ONNX {self._version} chargé (F1={f1}, R={recall}, {len(self._id2label)} labels)") except Exception: log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)") else: log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)") def unload(self) -> None: self._session = None self._tokenizer = None self._id2label = {} self._loaded = False def predict(self, text: str, threshold: float = 0.5) -> List[Dict[str, Any]]: """Prédit les entités NER dans un texte. Agrège les sous-tokens en entités mot-level avec label BIO. Returns: Liste de dicts avec: word, label, bio_label, score, start, end (label = catégorie sans B-/I-, bio_label = label complet) """ if not self._loaded: return [] # Tokenize encoding = self._tokenizer( text, return_tensors="np", truncation=True, max_length=512, return_offsets_mapping=True, ) offsets = encoding.pop("offset_mapping")[0] # (seq_len, 2) # Inférence inputs = {k: v for k, v in encoding.items() if k in ("input_ids", "attention_mask")} outputs = self._session.run(None, inputs) logits = outputs[0][0] # (seq_len, num_labels) # Softmax pour les scores exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True)) probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) predictions = np.argmax(logits, axis=-1) scores = np.max(probs, axis=-1) # Agréger les sous-tokens en entités entities = [] current_entity = None for i, (pred_id, score, (start, end)) in enumerate(zip(predictions, scores, offsets)): # Ignorer les tokens spéciaux (offset 0,0) if start == 0 and end == 0: if current_entity is not None: entities.append(current_entity) current_entity = None continue label = self._id2label.get(int(pred_id), "O") if label == "O": if current_entity is not None: entities.append(current_entity) current_entity = None continue # Extraire la catégorie (sans B-/I-) if label.startswith("B-"): category = label[2:] # Nouvelle entité if current_entity is not None: entities.append(current_entity) current_entity = { "word": text[int(start):int(end)], "label": category, "bio_label": label, "score": float(score), "start": int(start), "end": int(end), "_scores": [float(score)], } elif label.startswith("I-"): category = label[2:] if current_entity is not None and current_entity["label"] == category: # Continuer l'entité current_entity["word"] = text[current_entity["start"]:int(end)] current_entity["end"] = int(end) current_entity["_scores"].append(float(score)) else: # I- sans B- correspondant → traiter comme B- if current_entity is not None: entities.append(current_entity) current_entity = { "word": text[int(start):int(end)], "label": category, "bio_label": f"B-{category}", "score": float(score), "start": int(start), "end": int(end), "_scores": [float(score)], } if current_entity is not None: entities.append(current_entity) # Calculer le score moyen et filtrer par seuil result = [] for e in entities: avg_score = sum(e["_scores"]) / len(e["_scores"]) e["score"] = avg_score del e["_scores"] if avg_score >= threshold: result.append(e) return result def predict_long(self, text: str, threshold: float = 0.5, window_size: int = 400, stride: int = 200) -> List[Dict[str, Any]]: """Prédit sur un texte long avec fenêtres glissantes. Pour les documents > 512 tokens, découpe en fenêtres chevauchantes et fusionne les résultats (déduplique par position). """ if not self._loaded: return [] # Si le texte est court, prédiction directe tokens_estimate = len(text.split()) if tokens_estimate <= 400: return self.predict(text, threshold=threshold) # Découper en fenêtres par mots (approximation) words = text.split() all_entities = [] seen_spans = set() for start_word in range(0, len(words), stride): end_word = min(start_word + window_size, len(words)) chunk = " ".join(words[start_word:end_word]) # Calculer l'offset de caractère du début de la fenêtre char_offset = len(" ".join(words[:start_word])) if start_word > 0: char_offset += 1 # espace avant le premier mot de la fenêtre entities = self.predict(chunk, threshold=threshold) for e in entities: # Ajuster les positions par rapport au texte complet abs_start = e["start"] + char_offset abs_end = e["end"] + char_offset span_key = (abs_start, abs_end) if span_key not in seen_spans: seen_spans.add(span_key) e["start"] = abs_start e["end"] = abs_end all_entities.append(e) if end_word >= len(words): break return sorted(all_entities, key=lambda e: e["start"]) def validate_eds_entities( self, text: str, eds_entities: List[Dict[str, Any]], threshold: float = 0.4, ) -> List[Dict[str, Any]]: """Valide les entités EDS-Pseudo via CamemBERT-bio (vote croisé). Chaque entité EDS reçoit un champ 'camembert_confirmed': True/False/None. - True : CamemBERT-bio aussi détecte ce span comme PII - False : CamemBERT-bio ne détecte rien à cette position - None : pas de prédiction (modèle non chargé) """ if not self._loaded or not eds_entities: return eds_entities # Prédiction CamemBERT-bio cam_preds = self.predict_long(text, threshold=threshold) for e in eds_entities: e_word = (e.get("word") or "").lower().strip() if not e_word: e["camembert_confirmed"] = None continue confirmed = False for c in cam_preds: c_word = c["word"].lower().strip() # Match par texte (tolérant aux sous-chaînes) if c_word == e_word or e_word in c_word or c_word in e_word: confirmed = True break e["camembert_confirmed"] = confirmed return eds_entities