diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index ab1708d..45c09e5 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -1943,20 +1943,21 @@ def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, # Vérifier si c'est un médicament connu if w.lower() in _MEDICATION_WHITELIST: continue - # Chantier 3+4 : Confiance NER + vote croisé GLiNER + gazetteers INSEE + # Chantier 3+4+5 : Confiance NER + vote croisé GLiNER + CamemBERT-bio + gazetteers INSEE # Sécurité d'abord : haute confiance NER → toujours masquer - # GLiNER peut rejeter SEULEMENT si confiance NER basse - gliner_vote = e.get("gliner_confirmed") # True=PII, False=médical, None=neutre + # GLiNER/CamemBERT peuvent rejeter SEULEMENT si confiance NER basse + gliner_vote = e.get("gliner_confirmed") # True=PII, False=médical, None=neutre + camembert_vote = e.get("camembert_confirmed") # True=PII confirmé, False=non détecté, None=neutre if label in ("NOM", "PRENOM"): score = e.get("score", 1.0) # Gazetteer INSEE : prénom connu = renforcement confiance (ne pas filtrer) is_known_prenom = w.lower() in _INSEE_PRENOMS if isinstance(score, float) and score < 0.70 and not is_known_prenom: - # Basse confiance NER + pas un prénom connu : GLiNER peut trancher - if gliner_vote is False: - continue # NER pas sûr + GLiNER dit "médical" → skip - if score < 0.30: - continue # Très basse confiance → skip même sans GLiNER + # Basse confiance NER + pas un prénom connu + if gliner_vote is False and camembert_vote is not True: + continue # GLiNER dit "médical" + CamemBERT ne confirme pas → skip + if score < 0.30 and camembert_vote is not True: + continue # Très basse confiance + CamemBERT ne confirme pas → skip # Chantier 2 : Safe patterns contextuels (Philter-style) # Token suivi/précédé de dosages ou formes pharma → jamais un nom de personne pos = text.find(w) @@ -1994,7 +1995,8 @@ def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager", - gliner_mgr: Any = None) -> Tuple[str, List[PiiHit]]: + gliner_mgr: Any = None, + camembert_mgr: Any = None) -> Tuple[str, List[PiiHit]]: """Applique EDS-Pseudo sur le narratif avec validation croisée GLiNER optionnelle.""" if manager is None or not manager.is_loaded(): return text_out, [] @@ -2021,6 +2023,10 @@ def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: " if gliner_mgr is not None and hasattr(gliner_mgr, 'validate_entities') and gliner_mgr.is_loaded(): for i, (para, ents) in enumerate(zip(paras, ents_per_para)): ents_per_para[i] = gliner_mgr.validate_entities(para, ents, threshold=0.4) + # Chantier 5 : Validation croisée CamemBERT-bio (vote NER fine-tuné) + if camembert_mgr is not None and hasattr(camembert_mgr, 'validate_eds_entities') and camembert_mgr.is_loaded(): + for i, (para, ents) in enumerate(zip(paras, ents_per_para)): + ents_per_para[i] = camembert_mgr.validate_eds_entities(para, ents, threshold=0.3) buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_eds_pseudo(para, ents, cfg, hits) @@ -2465,6 +2471,7 @@ def process_pdf( ogc_label: Optional[str] = None, vlm_manager=None, gliner_manager=None, + camembert_manager=None, ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) @@ -2487,7 +2494,7 @@ def process_pdf( if use_hf and ner_manager is not None and ner_manager.is_loaded(): # Détecter le type de manager et appeler la bonne fonction if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager): - final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager, gliner_mgr=gliner_manager) + final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager, gliner_mgr=gliner_manager, camembert_mgr=camembert_manager) else: final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds) anon.audit.extend(hf_hits) diff --git a/camembert_ner_manager.py b/camembert_ner_manager.py new file mode 100644 index 0000000..821e61e --- /dev/null +++ b/camembert_ner_manager.py @@ -0,0 +1,298 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +CamemBERT-bio NER Manager — Inférence ONNX pour la désidentification clinique. +================================================================================ +Modèle fine-tuné sur almanach/camembert-bio-base avec des annotations silver +issues de 29 documents cliniques français (F1=89% sur validation). + +Utilisé comme signal NER supplémentaire dans le pipeline d'anonymisation, +en complément d'EDS-Pseudo et GLiNER (vote majoritaire). + +Inférence ONNX Runtime CPU : ~20 ms pour 512 tokens. +""" +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any, Dict, List, Optional + +import numpy as np + +log = logging.getLogger(__name__) + +try: + import onnxruntime as ort + _ORT_AVAILABLE = True +except ImportError: + ort = None # type: ignore + _ORT_AVAILABLE = False + +try: + from transformers import AutoTokenizer + _TOKENIZERS_AVAILABLE = True +except ImportError: + AutoTokenizer = None # type: ignore + _TOKENIZERS_AVAILABLE = False + +DEFAULT_MODEL_DIR = Path(__file__).parent / "models" / "camembert-bio-deid" / "onnx" + +# Mapping labels BIO du modèle → clés PLACEHOLDERS (anonymizer_core) +CAMEMBERT_LABEL_MAP: Dict[str, str] = { + "PER": "NOM", + "TEL": "TEL", + "EMAIL": "EMAIL", + "NIR": "NIR", + "IPP": "IPP", + "NDA": "NDA", + "RPPS": "RPPS", + "DATE_NAISSANCE": "DATE_NAISSANCE", + "ADRESSE": "ADRESSE", + "ZIP": "CODE_POSTAL", + "VILLE": "VILLE", + "HOPITAL": "ETAB", + "IBAN": "IBAN", + "AGE": "AGE", +} + + +class CamembertNerManager: + """Gestionnaire CamemBERT-bio ONNX pour NER token classification.""" + + def __init__(self, model_dir: Optional[Path] = None): + self._model_dir = Path(model_dir) if model_dir else DEFAULT_MODEL_DIR + self._session: Optional[Any] = None + self._tokenizer: Optional[Any] = None + self._id2label: Dict[int, str] = {} + self._loaded = False + + def is_loaded(self) -> bool: + return self._loaded + + def load(self) -> None: + """Charge le modèle ONNX et le tokenizer.""" + if not _ORT_AVAILABLE: + raise RuntimeError("onnxruntime non disponible. Installez : pip install onnxruntime") + if not _TOKENIZERS_AVAILABLE: + raise RuntimeError("transformers non disponible. Installez : pip install transformers") + + model_path = self._model_dir / "model.onnx" + if not model_path.exists(): + raise FileNotFoundError(f"Modèle ONNX non trouvé: {model_path}") + + self.unload() + + # Charger id2label depuis config.json + config_path = self._model_dir / "config.json" + with open(config_path, encoding="utf-8") as f: + cfg = json.load(f) + self._id2label = {int(k): v for k, v in cfg.get("id2label", {}).items()} + + # Session ONNX (CPU) + opts = ort.SessionOptions() + opts.inter_op_num_threads = 2 + opts.intra_op_num_threads = 4 + self._session = ort.InferenceSession( + str(model_path), + sess_options=opts, + providers=["CPUExecutionProvider"], + ) + + # Tokenizer + self._tokenizer = AutoTokenizer.from_pretrained(str(self._model_dir)) + self._loaded = True + log.info(f"CamemBERT-bio ONNX chargé: {self._model_dir} ({len(self._id2label)} labels)") + + def unload(self) -> None: + self._session = None + self._tokenizer = None + self._id2label = {} + self._loaded = False + + def predict(self, text: str, threshold: float = 0.5) -> List[Dict[str, Any]]: + """Prédit les entités NER dans un texte. + + Agrège les sous-tokens en entités mot-level avec label BIO. + + Returns: + Liste de dicts avec: word, label, bio_label, score, start, end + (label = catégorie sans B-/I-, bio_label = label complet) + """ + if not self._loaded: + return [] + + # Tokenize + encoding = self._tokenizer( + text, + return_tensors="np", + truncation=True, + max_length=512, + return_offsets_mapping=True, + ) + offsets = encoding.pop("offset_mapping")[0] # (seq_len, 2) + + # Inférence + inputs = {k: v for k, v in encoding.items() if k in ("input_ids", "attention_mask")} + outputs = self._session.run(None, inputs) + logits = outputs[0][0] # (seq_len, num_labels) + + # Softmax pour les scores + exp_logits = np.exp(logits - np.max(logits, axis=-1, keepdims=True)) + probs = exp_logits / np.sum(exp_logits, axis=-1, keepdims=True) + + predictions = np.argmax(logits, axis=-1) + scores = np.max(probs, axis=-1) + + # Agréger les sous-tokens en entités + entities = [] + current_entity = None + + for i, (pred_id, score, (start, end)) in enumerate(zip(predictions, scores, offsets)): + # Ignorer les tokens spéciaux (offset 0,0) + if start == 0 and end == 0: + if current_entity is not None: + entities.append(current_entity) + current_entity = None + continue + + label = self._id2label.get(int(pred_id), "O") + + if label == "O": + if current_entity is not None: + entities.append(current_entity) + current_entity = None + continue + + # Extraire la catégorie (sans B-/I-) + if label.startswith("B-"): + category = label[2:] + # Nouvelle entité + if current_entity is not None: + entities.append(current_entity) + current_entity = { + "word": text[int(start):int(end)], + "label": category, + "bio_label": label, + "score": float(score), + "start": int(start), + "end": int(end), + "_scores": [float(score)], + } + elif label.startswith("I-"): + category = label[2:] + if current_entity is not None and current_entity["label"] == category: + # Continuer l'entité + current_entity["word"] = text[current_entity["start"]:int(end)] + current_entity["end"] = int(end) + current_entity["_scores"].append(float(score)) + else: + # I- sans B- correspondant → traiter comme B- + if current_entity is not None: + entities.append(current_entity) + current_entity = { + "word": text[int(start):int(end)], + "label": category, + "bio_label": f"B-{category}", + "score": float(score), + "start": int(start), + "end": int(end), + "_scores": [float(score)], + } + + if current_entity is not None: + entities.append(current_entity) + + # Calculer le score moyen et filtrer par seuil + result = [] + for e in entities: + avg_score = sum(e["_scores"]) / len(e["_scores"]) + e["score"] = avg_score + del e["_scores"] + if avg_score >= threshold: + result.append(e) + + return result + + def predict_long(self, text: str, threshold: float = 0.5, + window_size: int = 400, stride: int = 200) -> List[Dict[str, Any]]: + """Prédit sur un texte long avec fenêtres glissantes. + + Pour les documents > 512 tokens, découpe en fenêtres chevauchantes + et fusionne les résultats (déduplique par position). + """ + if not self._loaded: + return [] + + # Si le texte est court, prédiction directe + tokens_estimate = len(text.split()) + if tokens_estimate <= 400: + return self.predict(text, threshold=threshold) + + # Découper en fenêtres par mots (approximation) + words = text.split() + all_entities = [] + seen_spans = set() + + for start_word in range(0, len(words), stride): + end_word = min(start_word + window_size, len(words)) + chunk = " ".join(words[start_word:end_word]) + + # Calculer l'offset de caractère du début de la fenêtre + char_offset = len(" ".join(words[:start_word])) + if start_word > 0: + char_offset += 1 # espace avant le premier mot de la fenêtre + + entities = self.predict(chunk, threshold=threshold) + for e in entities: + # Ajuster les positions par rapport au texte complet + abs_start = e["start"] + char_offset + abs_end = e["end"] + char_offset + span_key = (abs_start, abs_end) + if span_key not in seen_spans: + seen_spans.add(span_key) + e["start"] = abs_start + e["end"] = abs_end + all_entities.append(e) + + if end_word >= len(words): + break + + return sorted(all_entities, key=lambda e: e["start"]) + + def validate_eds_entities( + self, + text: str, + eds_entities: List[Dict[str, Any]], + threshold: float = 0.4, + ) -> List[Dict[str, Any]]: + """Valide les entités EDS-Pseudo via CamemBERT-bio (vote croisé). + + Chaque entité EDS reçoit un champ 'camembert_confirmed': True/False/None. + - True : CamemBERT-bio aussi détecte ce span comme PII + - False : CamemBERT-bio ne détecte rien à cette position + - None : pas de prédiction (modèle non chargé) + """ + if not self._loaded or not eds_entities: + return eds_entities + + # Prédiction CamemBERT-bio + cam_preds = self.predict_long(text, threshold=threshold) + + for e in eds_entities: + e_word = (e.get("word") or "").lower().strip() + if not e_word: + e["camembert_confirmed"] = None + continue + + confirmed = False + for c in cam_preds: + c_word = c["word"].lower().strip() + # Match par texte (tolérant aux sous-chaînes) + if c_word == e_word or e_word in c_word or c_word in e_word: + confirmed = True + break + + e["camembert_confirmed"] = confirmed + + return eds_entities diff --git a/run_batch_30_audit.py b/run_batch_30_audit.py index 3b24772..4764cfb 100644 --- a/run_batch_30_audit.py +++ b/run_batch_30_audit.py @@ -12,6 +12,7 @@ import anonymizer_core_refactored_onnx as core from eds_pseudo_manager import EdsPseudoManager from vlm_manager import VlmManager from gliner_manager import GlinerManager +from camembert_ner_manager import CamembertNerManager SRC = Path("/home/dom/Téléchargements/II-1 Ctrl_T2A_2025_CHCB_DocJustificatifs (1)") OUTDIR = SRC / "anonymise_audit_30" @@ -67,6 +68,15 @@ def main(): print(f"GLiNER indisponible ({e}), on continue sans.", flush=True) gliner = None + print("Chargement CamemBERT-bio ONNX (vote croisé NER)...", flush=True) + camembert = CamembertNerManager() + try: + camembert.load() + print("CamemBERT-bio ONNX chargé.", flush=True) + except Exception as e: + print(f"CamemBERT-bio indisponible ({e}), on continue sans.", flush=True) + camembert = None + print("Chargement VLM (Ollama qwen2.5vl:7b)...", flush=True) vlm = VlmManager() try: @@ -108,6 +118,7 @@ def main(): ogc_label=ogc, vlm_manager=vlm, gliner_manager=gliner, + camembert_manager=camembert, ) audit_path = Path(outputs.get("audit", "")) if audit_path.exists():