Files
anonymisation/anonymizer_core_refactored_onnx.py
Domi31tls ac62a722bb Fix FP résiduels (Glyc, VIDER, FORTE) + rétrécissement rectangles masquage
- Ajout glyc, glycosurie, vider, forte aux stop words médicaux
- Shrink horizontal de 1.5px sur les rectangles raster pour éviter
  le débordement sur le texte adjacent (issue rectangles trop larges)
- Batch 10 OGC : 21 OK, 0 PII résiduel, 0 FP

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 20:25:13 +01:00

1732 lines
78 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Core d'anonymisation (v2.1) + NER ONNX (optionnel, narratif uniquement)
------------------------------------------------------------------------
- Extraction 2 passes (pdfplumber -> pdfminer) + fallback 3e passe PyMuPDF si texte pauvre ou (cid:xx)
- Règles regex (PII critiques) + clé:valeur (masquer valeur seulement) + overrides YAML
- Rescan sécurité **sélectif** (EMAIL/TEL/IBAN/NIR), jamais dans [TABLES]
- Redaction PDF (vector/raster) via PyMuPDF
- NER ONNX **optionnel** (CamemBERT family) appliqué **après** les règles, sur le narratif
Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), transformers, optimum, onnxruntime
"""
from __future__ import annotations
import io
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text
from pdfminer.layout import LAParams
from PIL import Image, ImageDraw
try:
import fitz # PyMuPDF
except Exception:
fitz = None
try:
import yaml # PyYAML for dictionaries
except Exception:
yaml = None
try:
from doctr.models import ocr_predictor as _doctr_ocr_predictor
_DOCTR_AVAILABLE = True
except Exception:
_doctr_ocr_predictor = None # type: ignore
_DOCTR_AVAILABLE = False
# NER manager (facultatif)
try:
from ner_manager_onnx import NerModelManager, NerThresholds
except Exception:
NerModelManager = None # type: ignore
NerThresholds = None # type: ignore
# EDS-Pseudo manager (facultatif)
try:
from eds_pseudo_manager import EdsPseudoManager
except Exception:
EdsPseudoManager = None # type: ignore
def _load_edsnlp_drug_names() -> set:
"""Charge les noms de médicaments mono-mot depuis edsnlp/resources/drugs.json.
Retourne un set lowercase. Fallback silencieux si edsnlp absent."""
try:
import edsnlp as _edsnlp
drugs_path = _edsnlp.BASE_DIR / "resources" / "drugs.json"
if not drugs_path.exists():
return set()
import json as _json
data = _json.loads(drugs_path.read_text(encoding="utf-8"))
result = set()
for _code, names in data.items():
for name in names:
if " " not in name and len(name) >= 4:
result.add(name.lower())
return result
except Exception:
return set()
# ----------------- Defaults & Config -----------------
DEFAULTS_CFG = {
"version": 1,
"encoding": "utf-8",
"normalization": "NFKC",
"whitelist": {
"sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"],
"noms_maj_excepts": ["Médecin DIM", "Praticien conseil"],
"org_gpe_keep": True,
},
"blacklist": {
"force_mask_terms": [],
"force_mask_regex": [],
},
"kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"],
"regex_overrides": [
{
"name": "OGC_court",
"pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b",
"placeholder": "[OGC]",
"flags": ["IGNORECASE"],
}
],
"flags": {
"case_insensitive": True,
"unicode_word_boundaries": True,
"regex_engine": "python",
},
}
PLACEHOLDERS = {
"EMAIL": "[EMAIL]",
"TEL": "[TEL]",
"IBAN": "[IBAN]",
"NIR": "[NIR]",
"IPP": "[IPP]",
"FINESS": "[FINESS]",
"OGC": "[OGC]",
"NOM": "[NOM]",
"VILLE": "[VILLE]",
"ETAB": "[ETABLISSEMENT]",
"MASK": "[MASK]",
"DATE": "[DATE]",
"DATE_NAISSANCE": "[DATE_NAISSANCE]",
"ADRESSE": "[ADRESSE]",
"CODE_POSTAL": "[CODE_POSTAL]",
"AGE": "[AGE]",
"DOSSIER": "[DOSSIER]",
"NDA": "[NDA]",
"EPISODE": "[EPISODE]",
"RPPS": "[RPPS]",
}
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"}
# Baseline regex
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .-]?\d){8}(?!\d)")
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE)
RE_RPPS = re.compile(r"\b(?:N°\s*)?RPPS\s*[:\-]?\s*(\d{8,11})\b", re.IGNORECASE)
RE_NIR = re.compile(
r"\b([12])\s*(\d{2})\s*(0[1-9]|1[0-2]|2[AB])\s*(\d{2,3})\s*(\d{3})\s*(\d{3})\s*(\d{2})\b",
re.IGNORECASE,
)
def validate_nir(nir_raw: str) -> bool:
"""Vérifie la clé modulo 97 d'un NIR (13 chiffres + 2 clé). Supporte la Corse (2A/2B)."""
digits_only = re.sub(r"\s+", "", nir_raw)
if len(digits_only) < 15:
return False
body_str = digits_only[:13]
key_str = digits_only[13:15]
# Corse : 2A → 19, 2B → 18 (pour le calcul)
body_str_calc = body_str.upper().replace("2A", "19").replace("2B", "18")
try:
body_int = int(body_str_calc)
key_int = int(key_str)
except ValueError:
return False
return key_int == (97 - (body_int % 97))
# Mots médicaux/techniques/courants qui ne sont pas des noms de personnes
_MEDICAL_STOP_WORDS_SET = {
# Mots français courants (déterminants, prépositions, adverbes, etc.)
"pas", "mon", "bien", "ancien", "ancienne", "bon", "bonne", "tout", "tous",
"mais", "donc", "car", "que", "qui", "avec", "dans", "pour", "sur", "par",
"les", "des", "une", "est", "son", "ses", "nos", "aux", "cette", "ces",
"cher", "chez", "entre", "sans", "sous", "vers", "selon", "après", "avant",
"puis", "aussi", "très", "plus", "moins", "peu", "non", "oui", "quelques",
"mise", "début", "fin", "suite", "fait", "lieu", "cas", "jour", "jours",
"semaine", "semaines", "mois", "temps", "place", "nouvelle", "nouveau",
"franche", "légère", "quelque", "depuis", "comme", "encore", "votre",
"date", "note", "notes", "nom", "heure", "matin", "soir", "midi",
"signé", "réalisé", "courrier", "cabinet", "rue",
# Verbes / participes courants
"remontée", "associée", "réalisée", "débuté", "prolongé", "prolongée",
"prescrit", "prescrite", "présente", "présent", "absente", "absent",
"reprise", "introduction", "arrêt", "relais",
# Titres / rôles hospitaliers
"chef", "assistant", "assistante", "praticien", "praticienne",
"docteur", "professeur", "hospitalier", "hospitalière", "hospitaliers",
"spécialiste", "contractuel", "contractuelle", "titulaire",
"confrère", "consoeur", "coordonnateur", "coordonnatrice",
"médecin", "médical", "infirmier", "infirmière",
"praticiens", "patient", "patiente",
# Structure hospitalière
"service", "pôle", "clinique", "consultation", "secrétariat",
"hôpital", "hôpitaux", "centre", "établissement", "polyclinique",
# Villes / géographie (pas des noms de personnes)
"bordeaux", "bayonne", "paris", "lyon", "lille", "marseille",
"toulouse", "nantes", "montpellier", "pessac", "biarritz", "soustons",
"basque", "basques", "sud", "côte",
# Médicaments génériques et spécialités (DCI + noms commerciaux)
"colchicine", "aspirine", "cortancyl", "bisoprolol", "entresto",
"methotrexate", "eplerenone", "speciafoldine", "prednisone",
"corticoïdes", "cortisone",
"paracetamol", "metformine", "solupred", "novorapid", "abasaglar",
"lovenox", "methylprednisolone", "potassium", "humalog", "furosemide",
"insuline", "trulicity", "forxiga", "atorvastatine", "amlodipine",
"ondansetron", "eliquis", "nebivolol", "gaviscon", "loxen",
"morphine", "oxycodone", "kardegic", "tercian", "zopiclone",
"seresta", "tramadol", "alprazolam", "forlax", "levothyrox",
"bromazepam", "gliclazide", "zymad", "pravastatine", "spiriva",
"quetiapine", "sertraline", "crestor", "lercanidipine", "amoxicilline",
"opocalcium", "ferinject", "candesartan", "ceftriaxone", "calcidose",
"laroxyl", "brintellix", "ketoprofene", "adrenaline", "exacyl",
"terbutaline", "ipratropium", "actiskenan", "vialebex", "oxynormoro",
"lansoprazole", "perindopril", "sodium", "velmetia",
"doliprane", "dafalgan", "efferalgan", "spasfon", "vogalene",
"augmentin", "inexium", "omeprazole", "pantoprazole", "esomeprazole",
"ramipril", "lisinopril", "enalapril", "losartan", "valsartan",
"irbesartan", "olmesartan", "telmisartan", "hydrochlorothiazide",
"spironolactone", "furosemide", "lasilix", "aldactone",
"tahor", "crestor", "rosuvastatine", "simvastatine", "fluvastatine",
"xarelto", "pradaxa", "apixaban", "rivaroxaban", "dabigatran",
"plavix", "clopidogrel", "ticagrelor", "brilique",
"ventoline", "seretide", "symbicort", "salmeterol", "fluticasone",
"salbutamol", "tiotropium", "budesonide", "beclometasone",
"oxycodone", "oxynorm", "skenan", "actiskenan", "fentanyl",
"nubain", "nalbuphine", "nefopam", "acupan", "profenid",
"ibuprofene", "diclofenac", "naproxene", "celecoxib",
"gabapentine", "pregabaline", "lyrica", "neurontin",
"amitriptyline", "duloxetine", "venlafaxine", "fluoxetine",
"paroxetine", "escitalopram", "citalopram", "mirtazapine",
"olanzapine", "risperidone", "aripiprazole", "haloperidol",
"loxapine", "cyamemazine", "diazepam", "oxazepam", "lorazepam",
"clonazepam", "midazolam", "hydroxyzine", "atarax", "melatonine",
"stilnox", "zolpidem", "imovane",
"levothyroxine", "metformine", "glimepiride", "sitagliptine",
"januvia", "jardiance", "empagliflozine", "dapagliflozine",
"ozempic", "semaglutide", "dulaglutide", "liraglutide", "victoza",
"heparine", "enoxaparine", "tinzaparine", "innohep",
"warfarine", "coumadine", "fluindione", "previscan",
"ciprofloxacine", "levofloxacine", "ofloxacine", "metronidazole",
"vancomycine", "gentamicine", "tazocilline", "piperacilline",
"meropenem", "imipenem", "clindamycine", "doxycycline",
"azithromycine", "clarithromycine", "cotrimoxazole", "bactrim",
"polyionique", "propranolol", "apidra", "solostar",
# Suffixes laboratoires pharmaceutiques
"arw", "myl", "myp", "arg", "teva", "bga", "agt",
# Formes galéniques / voies d'administration
"cpr", "sachet", "orale", "oral", "sol", "buv", "stylo", "flexpen",
"flestouch", "kwikpen", "inj", "susp", "gelule", "comprime",
"unidose", "perf", "inh", "seringue", "aerosol", "sach", "pdr",
"orodisp", "capsule", "patch", "suppositoire", "gouttes",
# Termes de prescription / pharmacie
"prescription", "prescriptions", "dose", "fréquence", "statut",
"technique", "capteur", "bandelettes", "glycemiques", "glycemique",
"lancettes", "aiguilles", "fines", "micro", "pompe", "réserve",
"glycemie", "capillaire", "hgt",
# Termes médicaux / cliniques
"myocardite", "myosite", "corticothérapie", "biopsie", "pathologie",
"dysimmunitaire", "récidive", "récidivante", "traitement", "diagnostic",
"antécédents", "examen", "bilan", "résultats", "analyse",
"interne", "externe", "médecine", "chirurgie", "rhumatologie",
"dermatologie", "immunologie", "cardiologie", "pneumologie",
"neurologie", "gynécologie", "radiologie", "sénologie",
"douleur", "douleurs", "douloureux", "musculaire", "musculaires",
"thoracique", "thoraciques", "membres", "supérieurs", "inférieurs",
"normale", "normaux", "habituelle", "habituelles",
"synthèse", "hospitalisation", "syndrome", "vaccination", "ophtalmo",
"pelvien", "diabétique", "sommeil", "régime", "diet",
"desinfection", "environnement", "identification", "bracelet",
"toilettes", "accompagner", "installer", "transfusion",
"signes", "vitaux", "alimentaire", "avis", "zone",
"calcémie",
# Abréviations médicales
"irm", "ett", "ecg", "mtx", "fevg", "bdc", "crp", "sfu", "hdj",
"bnp", "asat", "alat", "cpk", "ctc", "hba", "hba1c",
"saos", "tsh", "inr", "vgm", "pnn", "plq", "hb",
"poc", "bax", "act", "bic", "cfx", "acc", "ado", "acf", "vfo",
"qvl", "cci", "pse", "pca", "chl", "crt", "bbm", "pds", "ren",
"vit", "zen",
"scanner", "radio", "écho", "échographie",
# Spécialités médicales (éviter faux positifs NOM)
"hépato-gastro-entérologue", "gastro-entérologue", "gastro-entérologie",
"proctologue", "oncologue", "anesthésiste", "pneumologue", "gérontologue",
"cardiologue", "néphrologue", "urologue", "gériatre",
"hépatologue", "endocrinologue", "stomatologue",
# Termes médicaux / titres fréquemment détectés comme NOM par le NER
"supplémentation", "supplementation", "endocrinologie", "monsieur", "madame",
"suivi", "sortie", "emog", "ophtalmo",
# Médicaments détectés comme NOM/PRENOM par EDS-Pseudo
"eliquis", "trulicity", "saos", "wind", "taxotere", "eupantol", "ezetimibe",
"lansoyl", "xatral", "xenetix", "trimbow", "buspirone", "cetirizine",
"depakote", "versatis", "durogesic", "montelukast", "metformine", "viatris",
"rosuvastatine", "gliclazide", "amlodipine", "perindopril", "nebivolol",
"pravastatine", "bisoprolol", "amoxicilline", "kardegic", "lovenox",
# Termes médicaux / soins / actes détectés comme NOM
"partielle", "cutanee", "cutané", "cutanée", "osseuse", "diabetique",
"diabétique", "transdermique", "transderm", "diarrhees", "diarrhées",
"ionogramme", "scintigraphie", "thoraco", "thorax", "négative", "negative",
"diététicienne", "pressurise", "pressuriser", "inhalee", "inhalée", "inhal",
# Mots courants français détectés comme NOM dans les trackare
"toilette", "repas", "poche", "installation", "education", "éducation",
"refection", "réfection", "complete", "complète", "regime", "régime",
"normal", "traité", "traite", "arrêté", "arrete", "volume",
"commentaires", "france", "covid", "framboise", "epoux", "époux",
# Abréviations médicales courtes (3-4 chars) détectées comme NOM
"ide", "ipp", "pcr", "tap", "gel", "ahl", "ssr", "hds", "tca", "etp",
"mcg", "sdz", "iao", "ser", "orod", "clav", "disp", "cart", "atcd", "mdrd",
"amox", "endoc", "microg", "item", "pyélo", "néphro",
# En-têtes de colonnes / mots structurels trackare
"observations", "observation", "commentaires", "commentaire",
"surveillance", "température", "temperature", "glycémie", "glycemie",
"diurèse", "diurese", "balance", "pouls", "systolique", "diastolique",
"saturation", "fréquence", "frequence", "respiratoire", "douleur",
"alertes", "alerte", "antécédents", "antecedents", "habitus",
"allergies", "prescriptions", "prescription", "administration",
"catégorie", "categorie", "expiration", "message",
"destination", "diagnostique", "diagnostiques",
"date", "note", "nom", "heure", "type", "code", "etat",
"comprime", "comprimé", "gelule", "gélule", "solution", "injectable",
# Médicaments supplémentaires détectés dans les trackare
"depakote", "versatis", "humalog", "forxiga", "durogesic",
"montelukast", "rosuvastatine",
# Abréviations pharma courtes
"cpr", "sol", "bic", "agt", "poche", "inhal", "regina",
# Faux positifs EDS supplémentaires
"psy", "inhales", "inhalés", "kwikpen", "lansoprazole", "tiorfan", "smecta",
"axa", "ttt", "anionique", "abdomino", "cod", "omi", "urg", "med",
"10mg", "20mg", "40mg", "100mg", "300ui", "500ml", "innohep", "coaprovel",
"actiskenan", "simvastatine", "forlax",
# Mots temporels / contextuels détectés comme EDS_HOPITAL
"semaine", "jour", "matin", "soir", "nuit", "midi",
# Mots clés de contexte document
"compétences", "maladies", "inflammatoires", "systémiques", "rares",
"fret", "fax", "contexte", "résultat", "resultat", "résultats", "resultats",
"haute", "maison", "aide", "rpps", "poste", "fonct",
"sante", "santé", "etxe", "ttipi", "gastro", "concha",
"endoscopie", "endoscopique", "fibroscopie",
"indication", "conclusion", "technique", "anesthésie",
"digestif", "digestive", "digestives", "nutritive",
# Abréviations soins trackare détectées comme NOM (batch 20 OGC)
"soins", "lit", "jeun", "lever", "pose", "surv", "ggt", "vvp",
"verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "ute", "nfs",
# Mots narratifs CRH capturés par fusion sidebar 2-colonnes
"evolution", "évolution", "explorations", "fermeture", "allergie", "allergies",
"lotissement", "cholangiographie", "cholecystectomie", "cholécystectomie",
"paracetamol", "paracétamol", "unité", "unite",
# FP résiduels batch 10 OGC (termes médicaux/instructions soins)
"glyc", "glycosurie", "vider", "forte",
}
# Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp
_MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names())
_MEDICAL_STOP_WORDS = (
r"(?:" + "|".join(re.escape(w) for w in _MEDICAL_STOP_WORDS_SET) + r")"
)
# Un token de nom : commence par majuscule, lettres/tirets/apostrophes (PAS d'espace ni de point)
_PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+"
RE_PERSON_CONTEXT = re.compile(
r"(?:(?:Dr\.?|DR\.?|Docteur|Pr\.?|Professeur|Mme|MME|Madame|M\.|Mr\.?|Monsieur"
r"|Nom\s*:\s*"
r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par|Réalisé\s+par"
r")\s+)"
rf"({_PERSON_TOKEN}(?:\s+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots
)
# Noms en MAJUSCULES dans des listes virgulées (ex: "le Dr X, Y, LAZARO")
RE_DR_COMMA_LIST = re.compile(
r"(?:Dr\.?|DR\.?|Docteur)\s+"
r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+"
r"(?:\s*,\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+)+",
re.IGNORECASE,
)
# Token nom : mot commençant par une majuscule d'au moins 3 lettres
_NAME_TOKEN_RE = re.compile(r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']{2,}")
SPLITTER = re.compile(r"\s*[:|;\t]\s*")
# --- Extraction globale de noms depuis champs structurés ---
_UC_NAME_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+"
RE_EXTRACT_PATIENT = re.compile(
r"Patient\(?e?\)?\s*:\s*"
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)"
r"(?=\s+Né|\s+né|\s+N°|\s*$)",
re.MULTILINE,
)
# Champs d'identité structurés (documents trackare / DPI)
RE_EXTRACT_NOM_NAISSANCE = re.compile(
r"Nom\s+de\s+naissance\s*:\s*"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+IPP|\s*$)",
re.MULTILINE,
)
RE_EXTRACT_NOM_PRENOM = re.compile(
r"Nom\s+et\s+Pr[ée]nom\s*:\s*"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+Date|\s+Né|\s*$)",
re.MULTILINE,
)
RE_EXTRACT_LIEU_NAISSANCE = re.compile(
r"Lieu\s+de\s+naissance\s*:\s*"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)",
re.MULTILINE,
)
RE_EXTRACT_VILLE_RESIDENCE = re.compile(
r"Ville\s+de\s+r[ée]sidence\s*:\s*"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)",
re.MULTILINE,
)
# Contacts structurés : Conjoint/Concubin/Epoux/Epouse/Parent + NOM PRENOM
RE_EXTRACT_CONTACT = re.compile(
r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+)"
r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+))?",
)
RE_EXTRACT_REDIGE = re.compile(
r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+"
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)",
)
# Token nom composé : JEAN-PIERRE, CAZELLES-BOUDIER, etc.
_UC_COMPOUND = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}(?:-[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*"
RE_EXTRACT_MME_MR = re.compile(
r"(?:MME|Mme|Madame|Monsieur|Mr?\.?)\s+"
r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?"
rf"((?:{_UC_COMPOUND})(?:\s+(?:{_UC_COMPOUND}))*)",
)
_INITIAL_OPT = r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?"
RE_EXTRACT_DR_DEST = re.compile(
r"(?:DR\.?|Dr\.?|Docteur)\s+"
+ _INITIAL_OPT +
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)",
)
# Noms du personnel médical après un rôle : "Aide : Marie-Paule BORDABERRY"
RE_EXTRACT_STAFF_ROLE = re.compile(
r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier"
r"|Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*"
r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+(?:\s*-\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+)?\s+)?"
r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}[\-]?)(?:[\s\-]+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)",
)
# "Pr DUVAL", "Pr. J.-M. DUVAL", "Professeur DUVAL"
RE_EXTRACT_PR = re.compile(
r"(?:Pr\.?|Professeur)\s+"
+ _INITIAL_OPT +
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)",
)
CID_PATTERN = re.compile(r"\(cid:\d+\)")
# --- Nouvelles regex : dates, adresses, âges, dossiers ---
_MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)"
RE_DATE_NAISSANCE = re.compile(
r"(?:n[ée]+\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*"
r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})",
re.IGNORECASE,
)
RE_DATE = re.compile(
r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b"
r"|"
r"\b(\d{1,2})\s+" + _MOIS_FR + r"\s+(\d{4})\b",
re.IGNORECASE,
)
RE_ADRESSE = re.compile(
r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*"
r"(?:rue|avenue|av\.?|boulevard|bd\.?|place|chemin|all[ée]e|impasse|route|cours|passage|square|r[ée]sidence)"
r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}",
re.IGNORECASE,
)
RE_CODE_POSTAL = re.compile(
r"(?:(?:[Cc]ode\s*[Pp]ostal|CP)\s*[:\-]?\s*(\d{5}))"
r"|"
# 5 chiffres + nom de ville (Title Case ou MAJUSCULES), pas précédé d'un chiffre (évite RPPS)
# Exclure les unités médicales (UI, mg, ml, etc.) via negative lookahead
r"(?:(?<!\d)(\d{5})[ \t]+(?!UI\b|mg\b|ml\b|µg\b)[A-ZÉÈÀÙ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû]+"
r"(?:[\s\-][A-ZÉÈÀÙ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû]+)*"
r"(?:\s+CEDEX)?)",
)
RE_BP = re.compile(
r"(?:[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\.\-]+\s+)?BP\s+\d+",
re.IGNORECASE,
)
RE_AGE = re.compile(
r"(?:âg[ée]+\s+de\s+|patient(?:e)?\s+de\s+)(\d{1,3})\s*ans\b",
re.IGNORECASE,
)
# Établissements de santé : avec nom (EHPAD Bayonne) ou seuls (EHPAD, SSR, USLD)
RE_ETABLISSEMENT = re.compile(
r"\b((?:EHPAD|SSR|USLD|HAD|SSR/USLD|CSAPA|CMP|CMPP|UGA)"
r"(?:\s+(?:de\s+|d['']\s*)?[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)*)",
)
RE_HOPITAL_VILLE = re.compile(
r"\b((?:[Hh]ôpital|[Cc]linique|[Pp]olyclinique|[Cc]entre\s+[Hh]ospitalier)"
r"\s+(?:de\s+|d['']\s*)?(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)"
r"(?:\s+(?:de\s+|d['']\s*)?[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)*)",
)
RE_NUMERO_DOSSIER = re.compile(
r"(?:dossier|n°\s*dossier|NDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})"
r"|"
r"(?:référence|réf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})",
re.IGNORECASE,
)
RE_EPISODE = re.compile(
r"\s*[ÉéEe]pisode\s*[:\-]?\s*([A-Za-z0-9\-]{4,})",
re.IGNORECASE,
)
@dataclass
class PiiHit:
page: int
kind: str
original: str
placeholder: str
bbox_hint: Optional[Tuple[float, float, float, float]] = None
@dataclass
class AnonResult:
text_out: str
tables_block: str
audit: List[PiiHit] = field(default_factory=list)
is_trackare: bool = False
# ----------------- Config loader -----------------
def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
cfg = DEFAULTS_CFG.copy()
if config_path and config_path.exists() and yaml is not None:
try:
user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
for k, v in user.items():
cfg[k] = v
except Exception:
pass
return cfg
# ----------------- Extraction -----------------
def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]:
"""Extraction texte multi-passes avec fallback OCR (docTR).
Retourne (pages_text, tables_lines, ocr_used).
"""
pages_text: List[str] = []
tables_lines: List[List[str]] = []
ocr_used = False
with pdfplumber.open(pdf_path) as pdf:
for p in pdf.pages:
t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or ""
pages_text.append(t)
rows: List[str] = []
try:
tables = p.extract_tables()
for tbl in tables or []:
for row in tbl:
clean = [c if c is not None else "" for c in row]
rows.append("\t".join(clean).strip())
except Exception:
pass
tables_lines.append(rows)
total_chars = sum(len(x or "") for x in pages_text)
need_fallback = total_chars < 500
if not need_fallback:
need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text)
if need_fallback:
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
split = [x for x in text_all.split("\f") if x]
if split:
pages_text = split
# 3e passe PyMuPDF si toujours pauvre/cid
total_chars = sum(len(x or "") for x in pages_text)
if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None:
try:
doc = fitz.open(str(pdf_path))
pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))]
doc.close()
except Exception:
pass
# 4e passe : OCR docTR si toujours très peu de texte (PDF scanné)
total_chars = sum(len(x or "") for x in pages_text)
if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None:
try:
model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True)
doc = fitz.open(str(pdf_path))
ocr_pages: List[str] = []
for i in range(len(doc)):
pix = doc[i].get_pixmap(dpi=300)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
import numpy as np
result = model([np.array(img)])
page_text = ""
for block in result.pages[0].blocks:
for line in block.lines:
words = [w.value for w in line.words]
page_text += " ".join(words) + "\n"
ocr_pages.append(page_text)
doc.close()
if sum(len(p) for p in ocr_pages) > total_chars:
pages_text = ocr_pages
ocr_used = True
except Exception:
pass
return pages_text, tables_lines, ocr_used
# Alias pour compatibilité ascendante
def extract_text_three_passes(pdf_path: Path):
pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path)
return pages_text, tables_lines
# ----------------- Helpers -----------------
def _compile_user_regex(pattern: str, flags_list: List[str]):
flags = 0
for f in flags_list or []:
u = f.upper()
if u == "IGNORECASE": flags |= re.IGNORECASE
if u == "MULTILINE": flags |= re.MULTILINE
if u == "DOTALL": flags |= re.DOTALL
return re.compile(pattern, flags)
def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
for ov in cfg.get("regex_overrides", []) or []:
pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override")
flags_list = ov.get("flags", [])
try:
rx = _compile_user_regex(pattern, flags_list)
except Exception:
continue
def _rep(m: re.Match):
audit.append(PiiHit(page_idx, name, m.group(0), placeholder))
return placeholder
line = rx.sub(_rep, line)
# force-mask literals
for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []):
if not term: continue
word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE)
if word_rx.search(line):
audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"]))
line = word_rx.sub(PLACEHOLDERS["MASK"], line)
# force-mask regex
for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []):
try:
rx = re.compile(pat, re.IGNORECASE)
except Exception:
continue
if rx.search(line):
audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"]))
line = rx.sub(PLACEHOLDERS["MASK"], line)
return line
def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str:
m = RE_FINESS.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line)
m = RE_OGC.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"]))
return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line)
m = RE_IPP.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"]))
return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line)
m = RE_RPPS.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "RPPS", val, PLACEHOLDERS["RPPS"]))
return RE_RPPS.sub(lambda _: f"RPPS : {PLACEHOLDERS['RPPS']}", line)
return line
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
# user overrides & force-masks d'abord
line = _apply_overrides(line, audit, page_idx, cfg)
# EMAIL
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
line = RE_EMAIL.sub(_repl_email, line)
# TEL
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
line = RE_TEL.sub(_repl_tel, line)
# IBAN
def _repl_iban(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"]))
return PLACEHOLDERS["IBAN"]
line = RE_IBAN.sub(_repl_iban, line)
# NIR (avec validation clé modulo 97)
def _repl_nir(m: re.Match) -> str:
raw = m.group(0)
if not validate_nir(raw):
return raw # faux positif, on ne masque pas
audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"]))
return PLACEHOLDERS["NIR"]
line = RE_NIR.sub(_repl_nir, line)
# DATE_NAISSANCE (plus spécifique, avant DATE générique)
def _repl_date_naissance(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"]))
return PLACEHOLDERS["DATE_NAISSANCE"]
line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line)
# DATE générique — désactivé : seules les dates de naissance sont masquées
# def _repl_date(m: re.Match) -> str:
# audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"]))
# return PLACEHOLDERS["DATE"]
# line = RE_DATE.sub(_repl_date, line)
# ADRESSE
def _repl_adresse(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_ADRESSE.sub(_repl_adresse, line)
# BOITE POSTALE (BP)
def _repl_bp(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_BP.sub(_repl_bp, line)
# CODE_POSTAL
def _repl_code_postal(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"]))
return PLACEHOLDERS["CODE_POSTAL"]
line = RE_CODE_POSTAL.sub(_repl_code_postal, line)
# AGE
def _repl_age(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "AGE", m.group(0), PLACEHOLDERS["AGE"]))
return PLACEHOLDERS["AGE"]
line = RE_AGE.sub(_repl_age, line)
# NUMERO DOSSIER / NDA
def _repl_dossier(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DOSSIER", m.group(0), PLACEHOLDERS["DOSSIER"]))
return PLACEHOLDERS["DOSSIER"]
line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line)
# N° EPISODE
def _repl_episode(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"]))
return PLACEHOLDERS["EPISODE"]
line = RE_EPISODE.sub(_repl_episode, line)
# Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.)
def _repl_etab(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["ETAB"]))
return PLACEHOLDERS["ETAB"]
line = RE_ETABLISSEMENT.sub(_repl_etab, line)
line = RE_HOPITAL_VILLE.sub(_repl_etab, line)
# Champs structurés : Lieu de naissance, Ville de résidence (masquage direct, sans filtre stop words)
_re_lieu = re.compile(r"(Lieu\s+de\s+naissance\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)")
def _repl_lieu(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"]))
return m.group(1) + PLACEHOLDERS["VILLE"]
line = _re_lieu.sub(_repl_lieu, line)
_re_ville_res = re.compile(r"(Ville\s+de\s+r[ée]sidence\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)")
def _repl_ville_res(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"]))
return m.group(1) + PLACEHOLDERS["VILLE"]
line = _re_ville_res.sub(_repl_ville_res, line)
# PERSON uppercase avec contexte, whitelist/acronymes courts
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
_stop_rx = re.compile(_MEDICAL_STOP_WORDS, re.IGNORECASE)
def _clean_name_span(span: str) -> str:
"""Tronque le span au premier mot médical/stop word."""
tokens = span.split()
clean = []
for t in tokens:
if _stop_rx.fullmatch(t):
break
clean.append(t)
return " ".join(clean).strip(" .-'")
def _repl_person_ctx(m: re.Match) -> str:
span = m.group(1).strip(); raw = m.group(0)
if span in wl_sections or raw in wl_phrases: return raw
# Tronquer avant les mots médicaux
cleaned = _clean_name_span(span)
if not cleaned:
return raw
tokens = [t for t in cleaned.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3: return raw
audit.append(PiiHit(page_idx, "NOM", cleaned, PLACEHOLDERS["NOM"]))
return raw.replace(cleaned, PLACEHOLDERS["NOM"])
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
# Passe supplémentaire : noms dans des listes virgulées après "Dr"
# ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom
for m in RE_DR_COMMA_LIST.finditer(line):
fragment = m.group(0)
# Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr")
parts = [p.strip() for p in fragment.split(",")]
for part in parts:
# Extraire les tokens nom de chaque segment
for tok in _NAME_TOKEN_RE.findall(part):
if tok in wl_sections or len(tok) <= 2:
continue
if _stop_rx.fullmatch(tok):
continue
if tok not in line:
continue
# Vérifier qu'il n'est pas déjà masqué
if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}:
continue
audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"]))
line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line)
return line
def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str:
"""Masque les TEL et EMAIL même dans la partie 'clé' d'une ligne clé:valeur."""
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
key = RE_TEL.sub(_repl_tel, key)
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
key = RE_EMAIL.sub(_repl_email, key)
return key
def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
line = _mask_admin_label(line, audit, page_idx)
parts = SPLITTER.split(line, maxsplit=1)
if len(parts) == 2:
key, value = parts
masked_key = _mask_critical_in_key(key, audit, page_idx)
masked_val = _mask_line_by_regex(value, audit, page_idx, cfg)
return f"{masked_key.strip()} : {masked_val.strip()}"
else:
return _mask_line_by_regex(line, audit, page_idx, cfg)
# ----------------- Extraction globale de noms -----------------
def _is_trackare_document(text: str) -> bool:
"""Détecte si le document est un export Trackare/TrakCare (DPI structuré)."""
markers = ["Détails des patients", "Nom de naissance", "Dossier Patient"]
t = text[:3000].lower()
return sum(1 for m in markers if m.lower() in t) >= 2
def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]:
"""Parse les champs structurés d'un document Trackare pour extraire les PII.
Retourne (name_tokens, pii_hits) avec les noms à masquer et les hits additionnels."""
names: set = set()
hits: List[PiiHit] = []
def _add_name(s: str):
for tok in s.split():
tok = tok.strip(" .-'(),")
if len(tok) >= 2 and tok[0].isupper():
names.add(tok)
# --- Identité patient ---
# Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire)
for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE):
_add_name(m.group(1).strip())
# Nom et Prénom: DIEGO PATRICIA
for m in re.finditer(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE):
_add_name(m.group(1).strip())
# Lieu de naissance: BAYONNE → masquer comme VILLE
for m in re.finditer(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE):
val = m.group(1).strip()
hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"]))
names.add(val)
# Ville de résidence: TARNOS → masquer comme VILLE
for m in re.finditer(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE):
val = m.group(1).strip()
hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"]))
names.add(val)
# Code Postal (toutes occurrences)
for m in re.finditer(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text):
hits.append(PiiHit(-1, "CODE_POSTAL", m.group(1), PLACEHOLDERS["CODE_POSTAL"]))
# N° épisode (= NDA, identifiant de séjour)
for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text):
hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]")))
# Adresse patient (toutes les occurrences)
for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE):
val = m.group(1).strip()
if len(val) > 3:
hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"]))
# --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." ---
for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text):
_add_name(m.group(1).strip())
# --- Médecin courant (toutes occurrences) ---
for m in re.finditer(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE):
_add_name(m.group(1).strip())
# --- Médecin traitant (ligne après "Nom Adresse Téléphone") ---
for m in re.finditer(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE):
_add_name(m.group(1).strip())
# --- Contacts structurés ---
# Pattern: Relation NOM PRENOM [ADRESSE] [TEL]
for m in re.finditer(
r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+"
r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+)"
r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+))?",
full_text,
):
_add_name(m.group(1))
if m.group(2):
_add_name(m.group(2))
# --- Prescripteurs / Exécutants (trackare) ---
for m in re.finditer(
r"(?:Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*"
r"(?:(?:Dr|Pr)\.?\s+)?"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)"
r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+))?",
full_text,
):
_add_name(m.group(1))
if m.group(2):
_add_name(m.group(2))
# --- Médecins urgences (IAO, prise en charge, décision) ---
for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text):
_add_name(m.group(1))
for m in re.finditer(
r"Médecin\s+de\s+la\s+(?:prise\s+en\s+charge|décision)\s+médicale\s+"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)"
r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+))?",
full_text,
):
_add_name(m.group(1))
if m.group(2):
_add_name(m.group(2))
# --- Noms soignants dans les Notes d'évolution / Notes IDE / Notes médicales ---
# Pattern: "Note IDE\nPrenom NOM" ou "Note d'évolution\nPrenom NOM"
for m in re.finditer(
r"Note\s+(?:IDE|AS|d'[ée]volution|m[ée]dicale|kin[ée])\s*\n\s*"
r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][a-zéèàùâêîôûäëïöüç]+)\s+"
r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+)",
full_text
):
prenom, nom = m.group(1), m.group(2)
if prenom.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(prenom)
if nom.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(nom)
# --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins ---
for m in re.finditer(
r'\b([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})\s*\n\s*([A-ZÉÈÀÙÂÊÎÔÛ]{3,})\b',
full_text
):
prenom, nom = m.group(1), m.group(2)
if prenom.lower() not in _MEDICAL_STOP_WORDS_SET and nom.lower() not in _MEDICAL_STOP_WORDS_SET:
_add_name(prenom)
_add_name(nom)
# Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement)
city_tokens = {h.original for h in hits if h.kind == "VILLE"}
filtered = set()
for tok in names:
if tok in city_tokens:
filtered.add(tok)
continue
if len(tok) < 3:
continue
if tok.lower() in _MEDICAL_STOP_WORDS_SET:
continue
filtered.add(tok)
return filtered, hits
def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set:
"""Pré-scan du document brut pour extraire les noms de personnes
depuis les champs structurés (Patient, Rédigé par, etc.).
Retourne un ensemble de tokens (mots) à masquer globalement."""
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
names: set = set()
def _add_tokens(match_str: str):
for token in match_str.split():
token = token.strip(" .-'")
if len(token) < 3:
continue
if token.upper() in wl_sections or token in wl_phrases:
continue
if token.lower() in _MEDICAL_STOP_WORDS_SET:
continue
names.add(token)
def _add_tokens_force_first(match_str):
"""Comme _add_tokens mais force le 1er token (contexte Dr/Mme fort)."""
tokens = match_str.split()
for i, token in enumerate(tokens):
token = token.strip(" .-'")
if len(token) < 2:
continue
if i == 0:
# Premier token après Dr/Mme : toujours un nom, bypass stop words
if token.upper() not in wl_sections:
names.add(token)
else:
if len(token) < 3:
continue
if token.upper() in wl_sections or token in wl_phrases:
continue
if token.lower() in _MEDICAL_STOP_WORDS_SET:
continue
names.add(token)
for m in RE_EXTRACT_PATIENT.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_REDIGE.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_MME_MR.finditer(full_text):
_add_tokens_force_first(m.group(1))
for m in RE_EXTRACT_DR_DEST.finditer(full_text):
_add_tokens_force_first(m.group(1))
# Champs d'identité structurés (trackare / DPI)
for m in RE_EXTRACT_NOM_NAISSANCE.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_NOM_PRENOM.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text):
_add_tokens(m.group(1))
# Contacts structurés (conjoint, concubin, etc.)
for m in RE_EXTRACT_CONTACT.finditer(full_text):
_add_tokens(m.group(1))
if m.group(2):
_add_tokens(m.group(2))
# Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.)
for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text):
_add_tokens(m.group(1))
# Pr / Professeur + nom(s)
for m in RE_EXTRACT_PR.finditer(full_text):
_add_tokens_force_first(m.group(1))
# Extraction des noms dans les listes virgulées après Dr/Docteur
# ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé"
for m in RE_DR_COMMA_LIST.finditer(full_text):
fragment = m.group(0)
parts = [p.strip() for p in fragment.split(",")]
for part in parts:
for tok in _NAME_TOKEN_RE.findall(part):
tok = tok.strip(" .-'")
if len(tok) < 3:
continue
if tok.upper() in wl_sections or tok in wl_phrases:
continue
if tok.lower() in _MEDICAL_STOP_WORDS_SET:
continue
names.add(tok)
# Retirer les sous-parties de noms composés avec tiret
# Si "JEAN-PIERRE" est dans names, retirer "JEAN" et "PIERRE" individuels
compound_names = {n for n in names if "-" in n}
parts_to_remove = set()
for compound in compound_names:
for part in compound.split("-"):
part = part.strip()
if len(part) >= 2 and part in names:
parts_to_remove.add(part)
names -= parts_to_remove
return names
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str:
"""Remplace globalement chaque nom extrait dans le texte."""
placeholder = PLACEHOLDERS["NOM"]
# Filtrer les stop words et tokens trop courts en dernière ligne de défense
safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET}
for token in sorted(safe_names, key=len, reverse=True):
pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE)
new_text = []
last_end = 0
for m in pattern.finditer(text):
# Ne pas remplacer si déjà dans un placeholder
ctx_start = max(0, m.start() - 1)
ctx_end = min(len(text), m.end() + 1)
if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]:
continue
# Ne pas remplacer si le token fait partie d'un mot composé (tiret)
if m.start() > 0 and text[m.start() - 1] == "-":
continue
if m.end() < len(text) and text[m.end()] == "-":
continue
audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder))
new_text.append(text[last_end:m.start()])
new_text.append(placeholder)
last_end = m.end()
new_text.append(text[last_end:])
text = "".join(new_text)
return text
# ----------------- Anonymisation (regex) -----------------
def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult:
audit: List[PiiHit] = []
# Phase 0 : extraction globale des noms depuis les champs structurés
full_raw = "\n".join(pages_text) + "\n" + "\n".join(
"\n".join(rows) for rows in tables_lines
)
extracted_names = _extract_document_names(full_raw, cfg)
# Phase 0b : si document Trackare, extraction renforcée des PII structurés
is_trackare = _is_trackare_document(full_raw)
if is_trackare:
trackare_names, trackare_hits = _extract_trackare_identity(full_raw)
extracted_names.update(trackare_names)
audit.extend(trackare_hits)
# Phase 1 : masquage ligne par ligne (regex classiques)
out_pages: List[str] = []
for i, page_txt in enumerate(pages_text):
lines = [ln for ln in (page_txt or "").splitlines()]
masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines]
out_pages.append("\n".join(masked))
table_blocks: List[str] = []
for i, rows in enumerate(tables_lines):
mbuf: List[str] = []
for r in rows:
masked = _kv_value_only_mask(r, audit, i, cfg)
mbuf.append(masked)
if mbuf:
table_blocks.append("\n".join(mbuf))
tables_block = "\n\n".join(table_blocks)
text_out = "\f".join(out_pages) # séparateur de pages
if tables_block.strip():
text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]"
# Phase 2 : application globale des noms extraits (rattrapage)
if extracted_names:
text_out = _apply_extracted_names(text_out, extracted_names, audit)
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare)
# ----------------- NER ONNX sur narratif -----------------
def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
# remplace via regex sur les 'word' détectés (approche pragmatique)
keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", True))
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
for e in ents:
w = e.get("word") or ""; grp = (e.get("entity_group") or e.get("entity") or "").upper()
if not w or "[" in w or "]" in w: # ignore placeholders
continue
if len(w) <= 2: # trop court
continue
if grp in {"PER", "PERSON"}:
audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"]))
out = repl_once(out, w, PLACEHOLDERS["NOM"])
elif grp in {"ORG"}:
if keep_org_gpe:
continue
audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"]))
out = repl_once(out, w, PLACEHOLDERS["ETAB"])
elif grp in {"LOC"}:
if keep_org_gpe:
continue
audit.append(PiiHit(-1, "NER_LOC", w, PLACEHOLDERS["VILLE"]))
out = repl_once(out, w, PLACEHOLDERS["VILLE"])
elif grp in {"DATE"}:
# facultatif : si vous masquez déjà les dates via règles, laissez tel quel
continue
return out
def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optional[NerModelManager], thresholds: Optional[NerThresholds]) -> Tuple[str, List[PiiHit]]:
if manager is None or not manager.is_loaded():
return text_out, []
# isoler [TABLES]
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
tables: List[Tuple[int,int,str]] = []
keep = []
last = 0
cleaned = ""
for m in pattern.finditer(text_out):
cleaned += text_out[last:m.start()]
keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0)))
cleaned += "\x00" * len(m.group(0))
last = m.end()
cleaned += text_out[last:]
# par pages (séparées par \f) → par paragraphes
pages = cleaned.split("\f")
hits: List[PiiHit] = []
rebuilt_pages: List[str] = []
for pg in pages:
paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()]
ents_per_para = manager.infer_paragraphs(paras, thresholds=thresholds)
# remplace entités
idx = 0
buf = []
for para, ents in zip(paras, ents_per_para):
masked = _mask_with_hf(para, ents, cfg, hits)
buf.append(masked)
rebuilt_pages.append("\n\n".join(buf))
rebuilt = "\f".join(rebuilt_pages)
# réinsérer [TABLES]
rebuilt_list = list(rebuilt)
for start, end, payload in keep:
rebuilt_list[start:end] = list(payload)
final = "".join(rebuilt_list)
return final, hits
# ----------------- NER EDS-Pseudo sur narratif -----------------
def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
"""Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key."""
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
for e in ents:
w = e.get("word") or ""
mapped_key = e.get("eds_mapped_key", "")
if not w or "[" in w or "]" in w:
continue
if len(w) <= 2:
continue
# Filtrer les faux positifs NOM/PRENOM (médicaments, acronymes médicaux)
label = e.get("entity_group", "EDS")
if label in ("NOM", "PRENOM", "HOPITAL", "VILLE"):
if w.lower() in _MEDICAL_STOP_WORDS_SET:
continue
# Filtrer aussi les tokens multi-mots dont un composant est un stop word
if " " in w and any(part.lower() in _MEDICAL_STOP_WORDS_SET for part in w.split()):
continue
# Filtrer les dosages détectés comme noms (ex: "10MG", "300UI", "1 000")
if re.match(r"^\d[\d\s]*(?:mg|MG|ml|ML|UI|µg|mcg|g|kg|%)?$", w.strip()):
continue
# Règles de validation heuristiques par type d'entité
if label in ("NOM", "PRENOM"):
# Rejeter si le contexte précédent (15 chars) contient un dosage
pos = text.find(w)
if pos > 0:
ctx_before = text[max(0, pos - 15):pos]
if re.search(r"\d+\s*(?:mg|UI|ml|µg|mcg)\b", ctx_before, re.IGNORECASE):
continue
elif label == "HOPITAL":
_STRUCTURAL_WORDS = {"SERVICE", "POLE", "PÔLE", "UNITE", "UNITÉ", "SECTEUR"}
if len(w) < 5:
continue
if w.upper() in _STRUCTURAL_WORDS:
continue
placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"])
audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder))
out = repl_once(out, w, placeholder)
return out
def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager") -> Tuple[str, List[PiiHit]]:
"""Applique EDS-Pseudo sur le narratif (même structure que apply_hf_ner_on_narrative)."""
if manager is None or not manager.is_loaded():
return text_out, []
# isoler [TABLES]
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
keep = []
last = 0
cleaned = ""
for m in pattern.finditer(text_out):
cleaned += text_out[last:m.start()]
keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0)))
cleaned += "\x00" * len(m.group(0))
last = m.end()
cleaned += text_out[last:]
# par pages → par paragraphes
pages = cleaned.split("\f")
hits: List[PiiHit] = []
rebuilt_pages: List[str] = []
for pg in pages:
paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()]
ents_per_para = manager.infer_paragraphs(paras)
buf = []
for para, ents in zip(paras, ents_per_para):
masked = _mask_with_eds_pseudo(para, ents, cfg, hits)
buf.append(masked)
rebuilt_pages.append("\n\n".join(buf))
rebuilt = "\f".join(rebuilt_pages)
# réinsérer [TABLES]
rebuilt_list = list(rebuilt)
for start, end, payload in keep:
rebuilt_list[start:end] = list(payload)
final = "".join(rebuilt_list)
return final, hits
# ----------------- Selective safety rescan -----------------
def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
"""Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage."""
# enlève TABLES du scope
def strip_tables(s: str):
kept = []
out = []
i = 0
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
for m in pattern.finditer(s):
out.append(s[i:m.start()])
kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1)))
out.append("\x00" * (m.end() - m.start()))
i = m.end()
out.append(s[i:])
return "".join(out), kept
protected, kept = strip_tables(text)
# PII critiques (comme avant)
protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
# NIR avec validation
def _rescan_nir(m: re.Match) -> str:
return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0)
protected = RE_NIR.sub(_rescan_nir, protected)
# Nouvelles regex : dates de naissance, dates, adresses, codes postaux
protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected)
# protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé
protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected)
# N° Episode
protected = RE_EPISODE.sub(PLACEHOLDERS["EPISODE"], protected)
# N° RPPS
protected = RE_RPPS.sub(PLACEHOLDERS["RPPS"], protected)
# Établissements
protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected)
protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected)
# Personnes contextuelles (avec whitelist)
wl_sections = set()
wl_phrases = set()
if cfg:
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
def _rescan_person(m: re.Match) -> str:
span = m.group(1).strip(); raw = m.group(0)
if span in wl_sections or raw in wl_phrases:
return raw
tokens = [t for t in span.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3:
return raw
return raw.replace(span, PLACEHOLDERS["NOM"])
protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected)
res = list(protected)
for start, end, payload in kept:
res[start:end] = list(payload)
return "".join(res)
# ----------------- PDF Redaction -----------------
def _search_whole_word(page, token: str) -> list:
"""Cherche un token comme mot entier (pas substring) via get_text('words').
Évite les faux positifs de page.search_for() qui fait du substring matching."""
rects = []
token_lower = token.lower().strip()
for w in page.get_text("words"):
# w = (x0, y0, x1, y1, word, block_no, line_no, word_no)
word_text = w[4].strip(".,;:!?()[]{}\"'«»-–—/\\")
if word_text.lower() == token_lower:
rects.append(fitz.Rect(w[0], w[1], w[2], w[3]))
return rects
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
# index hits par page; page==-1 → rechercher sur toutes pages
by_page: Dict[int, List[PiiHit]] = {}
for h in audit:
by_page.setdefault(h.page, []).append(h)
# Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte,
# pas dans le PDF où elles rendent les tableaux illisibles)
_VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"}
# Kinds dont les tokens courts (< 5) risquent le substring matching via page.search_for()
_VECTOR_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
"EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"}
for pno in range(len(doc)):
page = doc[pno]
hits = by_page.get(pno, []) + by_page.get(-1, [])
if not hits:
continue
for h in hits:
token = h.original.strip()
if not token:
continue
# Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF)
if h.kind in _VECTOR_SKIP_KINDS:
continue
# Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter
# les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations")
if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5:
if token.lower() not in _MEDICAL_STOP_WORDS_SET:
rects = _search_whole_word(page, token)
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
continue
rects = page.search_for(token)
if not rects and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
if compact != token:
rects = page.search_for(compact)
# Fallback : chercher chaque mot individuellement (uniquement pour les NOM)
if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}:
for word in token.split():
word = word.strip(" .-'")
if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET:
continue
if not word[0].isupper():
continue
rects.extend(page.search_for(word))
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
try:
page.apply_redactions()
except Exception:
pass
doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False)
doc.close()
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf)); out = fitz.open()
all_rects: Dict[int, List["fitz.Rect"]] = {}
for pno in range(len(doc)):
page = doc[pno]
rects = []
_RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"}
_RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM",
"EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"}
hits = [x for x in audit if x.page in {pno, -1}]
for h in hits:
token = h.original.strip()
if not token: continue
# Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF)
if h.kind in _RASTER_SKIP_KINDS:
continue
# Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter
# les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations")
if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5:
if token.lower() not in _MEDICAL_STOP_WORDS_SET:
rects.extend(_search_whole_word(page, token))
continue
found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
found = page.search_for(compact)
# Fallback : si la chaîne complète n'est pas trouvée,
# chercher chaque mot individuellement (uniquement pour les NOM)
if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}:
for word in token.split():
word = word.strip(" .-'")
if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET:
continue
# Ne garder que les mots qui ressemblent à des noms propres
if not word[0].isupper():
continue
found.extend(page.search_for(word))
rects.extend(found)
all_rects[pno] = rects
for pno in range(len(doc)):
src = doc[pno]; rect = src.rect
zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom)
pix = src.get_pixmap(matrix=mat, annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
for r in all_rects.get(pno, []):
# Rétrécir légèrement les rectangles pour éviter le débordement sur le texte adjacent
shrink = 1.5 # pixels à retirer de chaque côté
x0 = r.x0 * zoom + shrink
y0 = r.y0 * zoom
x1 = r.x1 * zoom - shrink
y1 = r.y1 * zoom
if x1 > x0:
draw.rectangle([x0, y0, x1, y1], fill=(0, 0, 0))
# Incrustation OGC en haut à droite
if ogc_label:
from PIL import ImageFont
font_size = int(14 * zoom)
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size)
except Exception:
font = ImageFont.load_default()
text = f"OGC: {ogc_label}"
bbox = draw.textbbox((0, 0), text, font=font)
tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1]
margin = int(10 * zoom)
x = img.width - tw - margin
y = margin
# Fond blanc + texte noir
draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255))
draw.text((x, y), text, fill=(0, 0, 0), font=font)
buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0)
dst = out.new_page(width=rect.width, height=rect.height)
dst.insert_image(rect, stream=buf.getvalue())
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close(); doc.close()
# ----------------- Orchestration -----------------
def process_pdf(
pdf_path: Path,
out_dir: Path,
make_vector_redaction: bool = True,
also_make_raster_burn: bool = False,
config_path: Optional[Path] = None,
use_hf: bool = False,
ner_manager=None,
ner_thresholds=None,
ogc_label: Optional[str] = None,
) -> Dict[str, str]:
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path)
# 1) Regex rules
anon = anonymise_document_regex(pages_text, tables_lines, cfg)
# 2) NER (optionnel) — sur le narratif
final_text = anon.text_out
hf_hits: List[PiiHit] = []
if use_hf and ner_manager is not None and ner_manager.is_loaded():
# Détecter le type de manager et appeler la bonne fonction
if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager):
final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager)
else:
final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds)
anon.audit.extend(hf_hits)
# 3) Rescan selectif
final_text = selective_rescan(final_text, cfg=cfg)
# 3b) Nettoyage post-masquage : codes postaux orphelins (5 chiffres collés à un placeholder)
# et téléphones fragmentés sur plusieurs lignes
_re_cp_orphan = re.compile(r"(\[(?:ADRESSE|NOM|VILLE)\])\s*(\d{5})\b")
def _clean_cp_orphan(m):
anon.audit.append(PiiHit(-1, "CODE_POSTAL", m.group(2), PLACEHOLDERS["CODE_POSTAL"]))
return m.group(1) + PLACEHOLDERS["CODE_POSTAL"]
final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text)
# Téléphones fragmentés : "0X XX XX XX\nXX" coupé en fin de ligne (ligne suivante immédiate)
_re_tel_frag = re.compile(r"((?:\+33\s?|0)\d(?:[ .-]?\d){6,7})\s*\n\s*(\d{2}(?!\d))")
def _clean_tel_frag(m):
full = m.group(1).replace(" ", "").replace(".", "").replace("-", "") + m.group(2)
if len(full.replace("+33", "0")) == 10:
anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"] + "\n"
return m.group(0)
final_text = _re_tel_frag.sub(_clean_tel_frag, final_text)
# Téléphones incomplets en fin de ligne (8 ou 9 chiffres au format 0X XX XX XX) : masquer la partie visible
_re_tel_partial = re.compile(r"(?<!\d)((?:\+33\s?|0)\d(?:[ .-]?\d){5,7})(?!\d)\s*$", re.MULTILINE)
def _clean_tel_partial(m):
digits = re.sub(r"[ .\-]", "", m.group(1))
if 8 <= len(digits) <= 9:
anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
return m.group(0)
final_text = _re_tel_partial.sub(_clean_tel_partial, final_text)
# 4) Consolidation : propager les PII détectés sur toutes les pages (page=-1)
# pour que la redaction PDF les cherche partout (sidebar répété, etc.)
# 4a) Noms : extraire les tokens individuels
_nom_kinds = {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}
_global_name_tokens: set = set()
for h in anon.audit:
if h.kind not in _nom_kinds:
continue
for word in h.original.split():
word = word.strip(" .-'")
if len(word) < 3:
continue
if word.lower() in _MEDICAL_STOP_WORDS_SET:
continue
if not word[0].isupper():
continue
_global_name_tokens.add(word)
# 4a-bis) Noms compagnons : si un token connu est suivi/précédé d'un mot majuscule inconnu
# dans le texte brut, c'est aussi un nom (ex: "Diego OLIVER" → OLIVER est un nom)
raw_full = "\n\n".join(pages_text)
_companion_tokens: set = set()
for token in _global_name_tokens:
# Token connu suivi d'un mot ALL-CAPS
for m in re.finditer(rf"\b{re.escape(token)}\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{{3,}})\b", raw_full):
candidate = m.group(1)
if candidate.lower() not in _MEDICAL_STOP_WORDS_SET and candidate not in _global_name_tokens:
_companion_tokens.add(candidate)
# Mot ALL-CAPS suivi du token connu
for m in re.finditer(rf"\b([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{{3,}})\s+{re.escape(token)}\b", raw_full):
candidate = m.group(1)
if candidate.lower() not in _MEDICAL_STOP_WORDS_SET and candidate not in _global_name_tokens:
_companion_tokens.add(candidate)
_global_name_tokens.update(_companion_tokens)
# Retirer les sous-parties de noms composés (JEAN, PIERRE si JEAN-PIERRE existe)
_compound = {t for t in _global_name_tokens if "-" in t}
_parts_to_drop = set()
for comp in _compound:
for part in comp.split("-"):
part = part.strip()
if len(part) >= 2 and part in _global_name_tokens:
_parts_to_drop.add(part)
_global_name_tokens -= _parts_to_drop
# 4a-ter) Filtrage final des tokens globaux : rejeter les mots qui ne ressemblent pas à des noms propres
# - Mots courants français (minuscule initiale déjà filtrés en amont)
# - ALL-CAPS <= 4 chars confirmés par une seule source seulement
_nom_kind_counts: Dict[str, set] = {}
for h in anon.audit:
if h.kind in _nom_kinds:
for word in h.original.split():
word = word.strip(" .-'")
if word:
_nom_kind_counts.setdefault(word, set()).add(h.kind)
_filtered_global: set = set()
for token in _global_name_tokens:
# ALL-CAPS court (<=4) avec une seule source → probablement une abréviation
if token.isupper() and len(token) <= 4 and len(_nom_kind_counts.get(token, set())) < 2:
continue
_filtered_global.add(token)
_global_name_tokens = _filtered_global
for token in _global_name_tokens:
anon.audit.append(PiiHit(page=-1, kind="NOM_GLOBAL", original=token, placeholder=PLACEHOLDERS["NOM"]))
# 4b) TEL, EMAIL, ADRESSE, CODE_POSTAL : propager les valeurs uniques sur toutes les pages
_global_pii: Dict[str, set] = {}
for h in anon.audit:
if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB"}:
_global_pii.setdefault(h.kind, set()).add(h.original.strip())
for kind, values in _global_pii.items():
placeholder = PLACEHOLDERS.get(kind, PLACEHOLDERS["MASK"])
for val in values:
anon.audit.append(PiiHit(page=-1, kind=f"{kind}_GLOBAL", original=val, placeholder=placeholder))
# 4e) Appliquer les tokens globaux sur le texte pseudonymisé
_GLOBAL_SKIP_KINDS = {"EDS_DATE_GLOBAL", "DATE_NAISSANCE_GLOBAL"}
for h in anon.audit:
if h.page != -1:
continue
if not (h.kind == "NOM_GLOBAL" or h.kind.endswith("_GLOBAL")):
continue
if h.kind in _GLOBAL_SKIP_KINDS:
continue
token = h.original.strip()
if not token or len(token) < 3:
continue
# Garde trackare : NOM_GLOBAL très court (<=3) risque de masquer des codes diagnostics
if anon.is_trackare and h.kind == "NOM_GLOBAL" and len(token) <= 3:
continue
try:
final_text = re.sub(rf"\b{re.escape(token)}\b", h.placeholder, final_text)
except re.error:
final_text = final_text.replace(token, h.placeholder)
# Log OCR dans l'audit
if ocr_used:
anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder=""))
# Sauvegardes
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"
audit_path = out_dir / f"{base}.audit.jsonl"
txt_path.write_text(final_text, encoding="utf-8")
with audit_path.open("w", encoding="utf-8") as f:
for hit in anon.audit:
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
outputs = {"text": str(txt_path), "audit": str(audit_path)}
# PDFs
if make_vector_redaction and fitz is not None:
vec_path = out_dir / f"{base}.redacted_vector.pdf"
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path)
outputs["pdf_vector"] = str(vec_path)
except Exception:
pass
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label)
outputs["pdf_raster"] = str(ras_path)
return outputs
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)")
ap.add_argument("pdf", type=str)
ap.add_argument("--out", type=str, default="out")
ap.add_argument("--no-vector", action="store_true")
ap.add_argument("--raster", action="store_true")
ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml")))
ap.add_argument("--hf", action="store_true", help="Activer NER ONNX sur narratif (nécessite ner_manager_onnx)")
ap.add_argument("--model", type=str, default="cmarkea/distilcamembert-base-ner")
args = ap.parse_args()
manager = None
if args.hf and NerModelManager is not None:
manager = NerModelManager(cache_dir=Path("models"))
manager.load(args.model)
outs = process_pdf(
Path(args.pdf),
Path(args.out),
make_vector_redaction=not args.no_vector,
also_make_raster_burn=args.raster,
config_path=Path(args.config),
use_hf=bool(args.hf),
ner_manager=manager,
ner_thresholds=NerThresholds() if NerThresholds else None,
)
print(json.dumps(outs, indent=2, ensure_ascii=False))