#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Core d'anonymisation (v2.1) + NER ONNX (optionnel, narratif uniquement) ------------------------------------------------------------------------ - Extraction 2 passes (pdfplumber -> pdfminer) + fallback 3e passe PyMuPDF si texte pauvre ou (cid:xx) - Règles regex (PII critiques) + clé:valeur (masquer valeur seulement) + overrides YAML - Rescan sécurité **sélectif** (EMAIL/TEL/IBAN/NIR), jamais dans [TABLES] - Redaction PDF (vector/raster) via PyMuPDF - NER ONNX **optionnel** (CamemBERT family) appliqué **après** les règles, sur le narratif Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), transformers, optimum, onnxruntime """ from __future__ import annotations import io import json import logging import os import re from concurrent.futures import ProcessPoolExecutor log = logging.getLogger(__name__) from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional, Any # {page_idx: [(word_text, x0_norm, y0_norm, x1_norm, y1_norm), ...]} # Coordonnées normalisées 0→1 (format natif docTR word.geometry) OcrWordMap = Dict[int, List[Tuple[str, float, float, float, float]]] import pdfplumber from pdfminer.high_level import extract_text as pdfminer_extract_text from pdfminer.layout import LAParams from PIL import Image, ImageDraw try: import fitz # PyMuPDF except Exception: fitz = None try: import yaml # PyYAML for dictionaries except Exception: yaml = None try: from doctr.models import ocr_predictor as _doctr_ocr_predictor _DOCTR_AVAILABLE = True except Exception: _doctr_ocr_predictor = None # type: ignore _DOCTR_AVAILABLE = False # NER manager (facultatif) try: from ner_manager_onnx import NerModelManager, NerThresholds except Exception: NerModelManager = None # type: ignore NerThresholds = None # type: ignore # EDS-Pseudo manager (facultatif) try: from eds_pseudo_manager import EdsPseudoManager except Exception: EdsPseudoManager = None # type: ignore # VLM manager (facultatif) try: from vlm_manager import VlmManager except Exception: VlmManager = None # type: ignore def _load_edsnlp_drug_names() -> set: """Charge les noms de médicaments mono-mot depuis edsnlp/resources/drugs.json. Retourne un set lowercase. Fallback silencieux si edsnlp absent.""" try: import edsnlp as _edsnlp drugs_path = _edsnlp.BASE_DIR / "resources" / "drugs.json" if not drugs_path.exists(): return set() import json as _json data = _json.loads(drugs_path.read_text(encoding="utf-8")) result = set() for _code, names in data.items(): for name in names: if " " not in name and len(name) >= 4: result.add(name.lower()) return result except Exception: return set() # ----------------- Defaults & Config ----------------- DEFAULTS_CFG = { "version": 1, "encoding": "utf-8", "normalization": "NFKC", "whitelist": { "sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"], "noms_maj_excepts": ["Médecin DIM", "Praticien conseil"], "org_gpe_keep": False, }, "blacklist": { "force_mask_terms": [], "force_mask_regex": [], }, "kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"], "regex_overrides": [ { "name": "OGC_court", "pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b", "placeholder": "[OGC]", "flags": ["IGNORECASE"], } ], "flags": { "case_insensitive": True, "unicode_word_boundaries": True, "regex_engine": "python", }, } PLACEHOLDERS = { "EMAIL": "[EMAIL]", "TEL": "[TEL]", "IBAN": "[IBAN]", "NIR": "[NIR]", "IPP": "[IPP]", "FINESS": "[FINESS]", "OGC": "[OGC]", "NOM": "[NOM]", "VILLE": "[VILLE]", "ETAB": "[ETABLISSEMENT]", "MASK": "[MASK]", "DATE": "[DATE]", "DATE_NAISSANCE": "[DATE_NAISSANCE]", "ADRESSE": "[ADRESSE]", "CODE_POSTAL": "[CODE_POSTAL]", "AGE": "[AGE]", "DOSSIER": "[DOSSIER]", "NDA": "[NDA]", "EPISODE": "[EPISODE]", "RPPS": "[RPPS]", } CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"} # Baseline regex RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") RE_TEL = re.compile(r"(? bool: """Vérifie la clé modulo 97 d'un NIR (13 chiffres + 2 clé). Supporte la Corse (2A/2B).""" digits_only = re.sub(r"\s+", "", nir_raw) if len(digits_only) < 15: return False body_str = digits_only[:13] key_str = digits_only[13:15] # Corse : 2A → 19, 2B → 18 (pour le calcul) body_str_calc = body_str.upper().replace("2A", "19").replace("2B", "18") try: body_int = int(body_str_calc) key_int = int(key_str) except ValueError: return False return key_int == (97 - (body_int % 97)) # Mots médicaux/techniques/courants qui ne sont pas des noms de personnes _MEDICAL_STOP_WORDS_SET = { # Mots français courants (déterminants, prépositions, adverbes, etc.) "pas", "mon", "bien", "ancien", "ancienne", "bon", "bonne", "tout", "tous", "mais", "donc", "car", "que", "qui", "avec", "dans", "pour", "sur", "par", "les", "des", "une", "est", "son", "ses", "nos", "aux", "cette", "ces", "cher", "chez", "entre", "sans", "sous", "vers", "selon", "après", "avant", "puis", "aussi", "très", "plus", "moins", "peu", "non", "oui", "quelques", "mise", "début", "fin", "suite", "fait", "lieu", "cas", "jour", "jours", "semaine", "semaines", "mois", "temps", "place", "nouvelle", "nouveau", "franche", "légère", "quelque", "depuis", "comme", "encore", "votre", "date", "note", "notes", "nom", "heure", "matin", "soir", "midi", "signé", "réalisé", "courrier", "cabinet", "rue", # Verbes / participes courants "remontée", "associée", "réalisée", "débuté", "prolongé", "prolongée", "prescrit", "prescrite", "présente", "présent", "absente", "absent", "reprise", "introduction", "arrêt", "relais", # Titres / rôles hospitaliers "chef", "assistant", "assistante", "praticien", "praticienne", "docteur", "professeur", "hospitalier", "hospitalière", "hospitaliers", "spécialiste", "contractuel", "contractuelle", "titulaire", "confrère", "consoeur", "coordonnateur", "coordonnatrice", "médecin", "médical", "infirmier", "infirmière", "praticiens", "patient", "patiente", # Structure hospitalière "service", "pôle", "clinique", "consultation", "secrétariat", "hôpital", "hôpitaux", "centre", "établissement", "polyclinique", # Villes / géographie (pas des noms de personnes) "bordeaux", "bayonne", "paris", "lyon", "lille", "marseille", "toulouse", "nantes", "montpellier", "pessac", "biarritz", "soustons", "basque", "basques", "sud", "côte", # Médicaments génériques et spécialités (DCI + noms commerciaux) "colchicine", "aspirine", "cortancyl", "bisoprolol", "entresto", "methotrexate", "eplerenone", "speciafoldine", "prednisone", "corticoïdes", "cortisone", "paracetamol", "metformine", "solupred", "novorapid", "abasaglar", "lovenox", "methylprednisolone", "potassium", "humalog", "furosemide", "insuline", "trulicity", "forxiga", "atorvastatine", "amlodipine", "ondansetron", "eliquis", "nebivolol", "gaviscon", "loxen", "morphine", "oxycodone", "kardegic", "tercian", "zopiclone", "seresta", "tramadol", "alprazolam", "forlax", "levothyrox", "bromazepam", "gliclazide", "zymad", "pravastatine", "spiriva", "quetiapine", "sertraline", "crestor", "lercanidipine", "amoxicilline", "opocalcium", "ferinject", "candesartan", "ceftriaxone", "calcidose", "laroxyl", "brintellix", "ketoprofene", "adrenaline", "exacyl", "terbutaline", "ipratropium", "actiskenan", "vialebex", "oxynormoro", "lansoprazole", "perindopril", "sodium", "velmetia", "doliprane", "dafalgan", "efferalgan", "spasfon", "vogalene", "augmentin", "inexium", "omeprazole", "pantoprazole", "esomeprazole", "ramipril", "lisinopril", "enalapril", "losartan", "valsartan", "irbesartan", "olmesartan", "telmisartan", "hydrochlorothiazide", "spironolactone", "furosemide", "lasilix", "aldactone", "tahor", "crestor", "rosuvastatine", "simvastatine", "fluvastatine", "xarelto", "pradaxa", "apixaban", "rivaroxaban", "dabigatran", "plavix", "clopidogrel", "ticagrelor", "brilique", "ventoline", "seretide", "symbicort", "salmeterol", "fluticasone", "salbutamol", "tiotropium", "budesonide", "beclometasone", "oxycodone", "oxynorm", "skenan", "actiskenan", "fentanyl", "nubain", "nalbuphine", "nefopam", "acupan", "profenid", "ibuprofene", "diclofenac", "naproxene", "celecoxib", "gabapentine", "pregabaline", "lyrica", "neurontin", "amitriptyline", "duloxetine", "venlafaxine", "fluoxetine", "paroxetine", "escitalopram", "citalopram", "mirtazapine", "olanzapine", "risperidone", "aripiprazole", "haloperidol", "loxapine", "cyamemazine", "diazepam", "oxazepam", "lorazepam", "clonazepam", "midazolam", "hydroxyzine", "atarax", "melatonine", "stilnox", "zolpidem", "imovane", "levothyroxine", "metformine", "glimepiride", "sitagliptine", "januvia", "jardiance", "empagliflozine", "dapagliflozine", "ozempic", "semaglutide", "dulaglutide", "liraglutide", "victoza", "heparine", "enoxaparine", "tinzaparine", "innohep", "warfarine", "coumadine", "fluindione", "previscan", "ciprofloxacine", "levofloxacine", "ofloxacine", "metronidazole", "vancomycine", "gentamicine", "tazocilline", "piperacilline", "meropenem", "imipenem", "clindamycine", "doxycycline", "azithromycine", "clarithromycine", "cotrimoxazole", "bactrim", "polyionique", "propranolol", "apidra", "solostar", # Suffixes laboratoires pharmaceutiques "arw", "myl", "myp", "arg", "teva", "bga", "agt", # Formes galéniques / voies d'administration "cpr", "sachet", "orale", "oral", "sol", "buv", "stylo", "flexpen", "flestouch", "kwikpen", "inj", "susp", "gelule", "comprime", "unidose", "perf", "inh", "seringue", "aerosol", "sach", "pdr", "orodisp", "capsule", "patch", "suppositoire", "gouttes", # Termes de prescription / pharmacie "prescription", "prescriptions", "dose", "fréquence", "statut", "technique", "capteur", "bandelettes", "glycemiques", "glycemique", "lancettes", "aiguilles", "fines", "micro", "pompe", "réserve", "glycemie", "capillaire", "hgt", # Termes médicaux / cliniques "myocardite", "myosite", "corticothérapie", "biopsie", "pathologie", "dysimmunitaire", "récidive", "récidivante", "traitement", "diagnostic", "antécédents", "examen", "bilan", "résultats", "analyse", "interne", "externe", "médecine", "chirurgie", "rhumatologie", "dermatologie", "immunologie", "cardiologie", "pneumologie", "neurologie", "gynécologie", "radiologie", "sénologie", "douleur", "douleurs", "douloureux", "musculaire", "musculaires", "thoracique", "thoraciques", "membres", "supérieurs", "inférieurs", "normale", "normaux", "habituelle", "habituelles", "synthèse", "hospitalisation", "syndrome", "vaccination", "ophtalmo", "pelvien", "diabétique", "sommeil", "régime", "diet", "desinfection", "environnement", "identification", "bracelet", "toilettes", "accompagner", "installer", "transfusion", "signes", "vitaux", "alimentaire", "avis", "zone", "calcémie", # Abréviations médicales "irm", "ett", "ecg", "mtx", "fevg", "bdc", "crp", "sfu", "hdj", "bnp", "asat", "alat", "cpk", "ctc", "hba", "hba1c", "saos", "tsh", "inr", "vgm", "pnn", "plq", "hb", "poc", "bax", "act", "bic", "cfx", "acc", "ado", "acf", "vfo", "qvl", "cci", "pse", "pca", "chl", "crt", "bbm", "pds", "ren", "vit", "zen", "scanner", "radio", "écho", "échographie", # Spécialités médicales (éviter faux positifs NOM) "hépato-gastro-entérologue", "gastro-entérologue", "gastro-entérologie", "proctologue", "oncologue", "anesthésiste", "pneumologue", "gérontologue", "cardiologue", "néphrologue", "urologue", "gériatre", "hépatologue", "endocrinologue", "stomatologue", # Termes médicaux / titres fréquemment détectés comme NOM par le NER "supplémentation", "supplementation", "endocrinologie", "monsieur", "madame", "suivi", "sortie", "emog", "ophtalmo", # Médicaments détectés comme NOM/PRENOM par EDS-Pseudo "eliquis", "trulicity", "saos", "wind", "taxotere", "eupantol", "ezetimibe", "lansoyl", "xatral", "xenetix", "trimbow", "buspirone", "cetirizine", "depakote", "versatis", "durogesic", "montelukast", "metformine", "viatris", "rosuvastatine", "gliclazide", "amlodipine", "perindopril", "nebivolol", "pravastatine", "bisoprolol", "amoxicilline", "kardegic", "lovenox", # Termes médicaux / soins / actes détectés comme NOM "partielle", "cutanee", "cutané", "cutanée", "osseuse", "diabetique", "diabétique", "transdermique", "transderm", "diarrhees", "diarrhées", "ionogramme", "scintigraphie", "thoraco", "thorax", "négative", "negative", "diététicienne", "pressurise", "pressuriser", "inhalee", "inhalée", "inhal", # Mots courants français détectés comme NOM dans les trackare "toilette", "repas", "poche", "installation", "education", "éducation", "refection", "réfection", "complete", "complète", "regime", "régime", "normal", "traité", "traite", "arrêté", "arrete", "volume", "commentaires", "france", "covid", "framboise", "epoux", "époux", # Abréviations médicales courtes (3-4 chars) détectées comme NOM "ide", "ipp", "pcr", "tap", "gel", "ahl", "ssr", "hds", "tca", "etp", "mcg", "sdz", "iao", "ser", "orod", "clav", "disp", "cart", "atcd", "mdrd", "amox", "endoc", "microg", "item", "pyélo", "néphro", # En-têtes de colonnes / mots structurels trackare "observations", "observation", "commentaires", "commentaire", "surveillance", "température", "temperature", "glycémie", "glycemie", "diurèse", "diurese", "balance", "pouls", "systolique", "diastolique", "saturation", "fréquence", "frequence", "respiratoire", "douleur", "alertes", "alerte", "antécédents", "antecedents", "habitus", "allergies", "prescriptions", "prescription", "administration", "catégorie", "categorie", "expiration", "message", "destination", "diagnostique", "diagnostiques", "date", "note", "nom", "heure", "type", "code", "etat", "comprime", "comprimé", "gelule", "gélule", "solution", "injectable", # Médicaments supplémentaires détectés dans les trackare "depakote", "versatis", "humalog", "forxiga", "durogesic", "montelukast", "rosuvastatine", # Abréviations pharma courtes "cpr", "sol", "bic", "agt", "poche", "inhal", # Faux positifs EDS supplémentaires "psy", "inhales", "inhalés", "kwikpen", "lansoprazole", "tiorfan", "smecta", "axa", "ttt", "anionique", "abdomino", "cod", "omi", "urg", "med", "10mg", "20mg", "40mg", "100mg", "300ui", "500ml", "innohep", "coaprovel", "actiskenan", "simvastatine", "forlax", # Mots temporels / contextuels détectés comme EDS_HOPITAL "semaine", "jour", "matin", "soir", "nuit", "midi", # Mots clés de contexte document "compétences", "maladies", "inflammatoires", "systémiques", "rares", "fret", "fax", "contexte", "résultat", "resultat", "résultats", "resultats", "haute", "maison", "aide", "rpps", "poste", "fonct", "sante", "santé", "etxe", "ttipi", "gastro", "concha", "endoscopie", "endoscopique", "fibroscopie", "indication", "conclusion", "technique", "anesthésie", "digestif", "digestive", "digestives", "nutritive", # Abréviations soins trackare détectées comme NOM (batch 20 OGC) "soins", "lit", "jeun", "lever", "pose", "surv", "ggt", "vvp", "verif", "crop", "evs", "maco", "pan", "cet", "trou", "nit", "ute", "nfs", # Mots narratifs CRH capturés par fusion sidebar 2-colonnes "evolution", "évolution", "explorations", "fermeture", "allergie", "allergies", "lotissement", "cholangiographie", "cholecystectomie", "cholécystectomie", "paracetamol", "paracétamol", "unité", "unite", # FP résiduels batch 10 OGC (termes médicaux/instructions soins) "glyc", "glycosurie", "vider", "forte", # FP audit batch 59 OGC (mots courants/médicaux flagués comme NOM) "oncologie", "confrères", "confrere", "doubles", "chers", "motif", "responsable", "autre", "autres", "autonome", "autonomes", "préparations", "preparations", "prévenir", "prevenir", "acétylsalicylique", "acetylsalicylique", "angio", "desc", "diu", "cambo", "bains", "dogue", "barreau", "haitz", "alde", # FP audit OGC 21 — termes médicaux/courants flagués NOM_GLOBAL "alimentation", "augmentation", "amelioration", "amélioration", "biliaire", "biliaires", "bili", "voies", "voie", "apyrexie", "apyrétique", "apyretique", "clavulanique", "mecillinam", "sulfamides", "sulfamide", "tazobactam", "temocilline", "ecoflac", "furanes", "furane", "exilar", "lipruzet", "mopral", "sensible", "sensibles", "dossier", "dossiers", "entero", "entéro", "medecine", "bio", "aviation", "contention", "isolement", "elimination", "élimination", "infectieux", "hémodynamique", "hemodynamique", "pancréatite", "pancreatite", "cholecystite", "cholécystite", "cholécystectomie", "cholecystectomie", "appendicectomie", "néoplasie", "neoplasie", "ovarienne", "prandial", "fébrile", "febrile", "eupnéique", "eupneique", "normocarde", "normotendue", "variable", "dosage", "posologie", # Abréviations diététiques/soins trackare "bcy", "po2", "po1", "po3", "bha", "atg", "ras", "cat", # Spécialités/services récurrents comme FP NOM "cancérologie", "cancerologie", "réanimation", "reanimation", "urologie", "néphrologie", "nephrologie", "hématologie", "hematologie", "gériatrie", "geriatrie", "pédiatrie", "pediatrie", "ophtalmologie", "stomatologie", "allergologie", "kinésithérapie", "kinesitherapie", "ergothérapie", "ergotherapie", "orthopédie", "orthopedie", "traumatologie", "palliatifs", "palliative", "palliatif", "addictologie", "alcoologie", "tabacologie", # Termes structurels trackare "transmissions", "transmission", "releve", "relevé", "objectif", "objectifs", "evaluation", "évaluation", "planification", "planifié", "planifiee", } # Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp _MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names()) _MEDICAL_STOP_WORDS = ( r"(?:" + "|".join(re.escape(w) for w in _MEDICAL_STOP_WORDS_SET) + r")" ) # Un token de nom : commence par majuscule, lettres/tirets/apostrophes (PAS d'espace ni de point) _PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_PERSON_CONTEXT = re.compile( r"(?:(?:Dr\.?|DR\.?|Docteur|Pr\.?|Professeur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" r"|Nom\s*:\s*" r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par|Réalisé\s+par" r")\s+)" rf"({_PERSON_TOKEN}(?:\s+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots ) # Noms en MAJUSCULES dans des listes virgulées (ex: "le Dr X, Y, LAZARO") RE_DR_COMMA_LIST = re.compile( r"(?:Dr\.?|DR\.?|Docteur)\s+" r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+" r"(?:\s*,\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+)+", re.IGNORECASE, ) # Token nom : mot commençant par une majuscule d'au moins 3 lettres _NAME_TOKEN_RE = re.compile(r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']{2,}") SPLITTER = re.compile(r"\s*[:|;\t]\s*") # --- Extraction globale de noms depuis champs structurés --- _UC_NAME_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_EXTRACT_PATIENT = re.compile( r"Patient\(?e?\)?\s*:\s*" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)" r"(?=\s+Né|\s+né|\s+N°|\s*$)", re.MULTILINE, ) # Champs d'identité structurés (documents trackare / DPI) RE_EXTRACT_NOM_NAISSANCE = re.compile( r"Nom\s+de\s+naissance\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+IPP|\s*$)", re.MULTILINE, ) RE_EXTRACT_NOM_PRENOM = re.compile( r"Nom\s+et\s+Pr[ée]nom\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+Date|\s+Né|\s*$)", re.MULTILINE, ) RE_EXTRACT_LIEU_NAISSANCE = re.compile( r"Lieu\s+de\s+naissance\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", re.MULTILINE, ) RE_EXTRACT_VILLE_RESIDENCE = re.compile( r"Ville\s+de\s+r[ée]sidence\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", re.MULTILINE, ) # Contacts structurés : Conjoint/Concubin/Epoux/Epouse/Parent + NOM PRENOM RE_EXTRACT_CONTACT = re.compile( r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+))?", ) RE_EXTRACT_REDIGE = re.compile( r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) # Token nom composé : JEAN-PIERRE, CAZELLES-BOUDIER, etc. _UC_COMPOUND = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}(?:-[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*" RE_EXTRACT_MME_MR = re.compile( r"(?:MME|Mme|Madame|Monsieur|Mr?\.?)\s+" r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" rf"((?:{_UC_COMPOUND})(?:\s+(?:{_UC_COMPOUND}))*)", ) _INITIAL_OPT = r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" RE_EXTRACT_DR_DEST = re.compile( r"(?:DR\.?|Dr\.?|Docteur)\s+" + _INITIAL_OPT + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) # Noms du personnel médical après un rôle : "Aide : Marie-Paule BORDABERRY" RE_EXTRACT_STAFF_ROLE = re.compile( r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier" r"|Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+(?:\s*-\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+)?\s+)?" r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}[\-]?)(?:[\s\-]+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}){0,2})", ) # "Pr DUVAL", "Pr. J.-M. DUVAL", "Professeur DUVAL" RE_EXTRACT_PR = re.compile( r"(?:Pr\.?|Professeur)\s+" + _INITIAL_OPT + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) CID_PATTERN = re.compile(r"\(cid:\d+\)") # --- Nouvelles regex : dates, adresses, âges, dossiers --- _MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)" RE_DATE_NAISSANCE = re.compile( r"(?:n[ée]+\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*" r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})", re.IGNORECASE, ) RE_DATE = re.compile( r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b" r"|" r"\b(\d{1,2})\s+" + _MOIS_FR + r"\s+(\d{4})\b", re.IGNORECASE, ) RE_ADRESSE = re.compile( r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*" r"(?:rue|avenue|av\.?|boulevard|bd\.?|place|chemin|all[ée]e|impasse|route|cours|passage|square|r[ée]sidence" r"|lotissement|lot\.?|cit[ée]|hameau|quartier|voie|parvis|esplanade|promenade|côte)" r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}", re.IGNORECASE, ) RE_CODE_POSTAL = re.compile( r"(?:(?:[Cc]ode\s*[Pp]ostal|CP)\s*[:\-]?\s*(\d{5}))" r"|" # 5 chiffres + nom de ville (Title Case ou MAJUSCULES), pas précédé d'un chiffre (évite RPPS) # Exclure les unités médicales (UI, mg, ml, etc.) via negative lookahead r"(?:(? Dict[str, Any]: cfg = DEFAULTS_CFG.copy() if config_path and config_path.exists() and yaml is not None: try: user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} for k, v in user.items(): cfg[k] = v except Exception: pass return cfg # ----------------- Extraction ----------------- _doctr_model_cache = None def _get_doctr_model(): global _doctr_model_cache if _doctr_model_cache is None: _doctr_model_cache = _doctr_ocr_predictor( det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True ) return _doctr_model_cache def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool, OcrWordMap]: """Extraction texte multi-passes avec fallback OCR (docTR). Retourne (pages_text, tables_lines, ocr_used, ocr_word_map). """ pages_text: List[str] = [] tables_lines: List[List[str]] = [] ocr_used = False with pdfplumber.open(pdf_path) as pdf: for p in pdf.pages: t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" pages_text.append(t) rows: List[str] = [] try: tables = p.extract_tables() for tbl in tables or []: for row in tbl: clean = [c if c is not None else "" for c in row] rows.append("\t".join(clean).strip()) except Exception: pass tables_lines.append(rows) total_chars = sum(len(x or "") for x in pages_text) need_fallback = total_chars < 500 if not need_fallback: need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text) if need_fallback: text_all = pdfminer_extract_text( str(pdf_path), laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5), ) split = [x for x in text_all.split("\f") if x] if split: pages_text = split # 3e passe PyMuPDF si toujours pauvre/cid total_chars = sum(len(x or "") for x in pages_text) if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None: try: doc = fitz.open(str(pdf_path)) pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))] doc.close() except Exception: pass # 4e passe : OCR docTR si toujours très peu de texte (PDF scanné) total_chars = sum(len(x or "") for x in pages_text) ocr_word_map: OcrWordMap = {} if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None: try: model = _get_doctr_model() doc = fitz.open(str(pdf_path)) ocr_pages: List[str] = [] import numpy as np for i in range(len(doc)): pix = doc[i].get_pixmap(dpi=300) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) result = model([np.array(img)]) page_text = "" page_words: List[Tuple[str, float, float, float, float]] = [] for block in result.pages[0].blocks: for line in block.lines: for w in line.words: (x0, y0), (x1, y1) = w.geometry page_words.append((w.value, x0, y0, x1, y1)) page_text += " ".join(w.value for w in line.words) + "\n" ocr_word_map[i] = page_words ocr_pages.append(page_text) doc.close() if sum(len(p) for p in ocr_pages) > total_chars: pages_text = ocr_pages ocr_used = True else: ocr_word_map = {} except Exception: ocr_word_map = {} return pages_text, tables_lines, ocr_used, ocr_word_map # Alias pour compatibilité ascendante def extract_text_three_passes(pdf_path: Path): pages_text, tables_lines, _, _ = extract_text_with_fallback_ocr(pdf_path) return pages_text, tables_lines # ----------------- Helpers ----------------- def _compile_user_regex(pattern: str, flags_list: List[str]): flags = 0 for f in flags_list or []: u = f.upper() if u == "IGNORECASE": flags |= re.IGNORECASE if u == "MULTILINE": flags |= re.MULTILINE if u == "DOTALL": flags |= re.DOTALL return re.compile(pattern, flags) def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: for ov in cfg.get("regex_overrides", []) or []: pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override") flags_list = ov.get("flags", []) try: rx = _compile_user_regex(pattern, flags_list) except Exception: continue def _rep(m: re.Match): audit.append(PiiHit(page_idx, name, m.group(0), placeholder)) return placeholder line = rx.sub(_rep, line) # force-mask literals for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []): if not term: continue word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE) if word_rx.search(line): audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"])) line = word_rx.sub(PLACEHOLDERS["MASK"], line) # force-mask regex for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []): try: rx = re.compile(pat, re.IGNORECASE) except Exception: continue if rx.search(line): audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"])) line = rx.sub(PLACEHOLDERS["MASK"], line) return line def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str: m = RE_FINESS.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"])) return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line) m = RE_OGC.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"])) return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line) m = RE_IPP.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"])) return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line) m = RE_RPPS.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "RPPS", val, PLACEHOLDERS["RPPS"])) return RE_RPPS.sub(lambda _: f"RPPS : {PLACEHOLDERS['RPPS']}", line) return line def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: # user overrides & force-masks d'abord line = _apply_overrides(line, audit, page_idx, cfg) # EMAIL def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] line = RE_EMAIL.sub(_repl_email, line) # TEL def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] line = RE_TEL.sub(_repl_tel, line) line = RE_TEL_COMPACT.sub(_repl_tel, line) # IBAN def _repl_iban(m: re.Match) -> str: audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"])) return PLACEHOLDERS["IBAN"] line = RE_IBAN.sub(_repl_iban, line) # NIR (avec validation clé modulo 97) def _repl_nir(m: re.Match) -> str: raw = m.group(0) if not validate_nir(raw): return raw # faux positif, on ne masque pas audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"])) return PLACEHOLDERS["NIR"] line = RE_NIR.sub(_repl_nir, line) # DATE_NAISSANCE (plus spécifique, avant DATE générique) def _repl_date_naissance(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"])) return PLACEHOLDERS["DATE_NAISSANCE"] line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line) # DATE générique — désactivé : seules les dates de naissance sont masquées # def _repl_date(m: re.Match) -> str: # audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"])) # return PLACEHOLDERS["DATE"] # line = RE_DATE.sub(_repl_date, line) # ADRESSE def _repl_adresse(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) return PLACEHOLDERS["ADRESSE"] line = RE_ADRESSE.sub(_repl_adresse, line) # BOITE POSTALE (BP) def _repl_bp(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) return PLACEHOLDERS["ADRESSE"] line = RE_BP.sub(_repl_bp, line) # CODE_POSTAL def _repl_code_postal(m: re.Match) -> str: audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"])) return PLACEHOLDERS["CODE_POSTAL"] line = RE_CODE_POSTAL.sub(_repl_code_postal, line) # AGE def _repl_age(m: re.Match) -> str: audit.append(PiiHit(page_idx, "AGE", m.group(0), PLACEHOLDERS["AGE"])) return PLACEHOLDERS["AGE"] line = RE_AGE.sub(_repl_age, line) # NUMERO DOSSIER / NDA def _repl_dossier(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DOSSIER", m.group(0), PLACEHOLDERS["DOSSIER"])) return PLACEHOLDERS["DOSSIER"] line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line) # N° EPISODE def _repl_episode(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"])) return PLACEHOLDERS["EPISODE"] line = RE_EPISODE.sub(_repl_episode, line) # Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.) def _repl_etab(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["ETAB"])) return PLACEHOLDERS["ETAB"] line = RE_ETABLISSEMENT.sub(_repl_etab, line) line = RE_HOPITAL_VILLE.sub(_repl_etab, line) # Services hospitaliers (service de Cardiologie, unité de soins palliatifs, etc.) def _repl_service(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["MASK"])) return PLACEHOLDERS["MASK"] line = RE_SERVICE.sub(_repl_service, line) # Champs structurés : Lieu de naissance, Ville de résidence (masquage direct, sans filtre stop words) _re_lieu = re.compile(r"(Lieu\s+de\s+naissance\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") def _repl_lieu(m: re.Match) -> str: audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) return m.group(1) + PLACEHOLDERS["VILLE"] line = _re_lieu.sub(_repl_lieu, line) _re_ville_res = re.compile(r"(Ville\s+de\s+r[ée]sidence\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") def _repl_ville_res(m: re.Match) -> str: audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) return m.group(1) + PLACEHOLDERS["VILLE"] line = _re_ville_res.sub(_repl_ville_res, line) # PERSON uppercase avec contexte, whitelist/acronymes courts wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) _stop_rx = re.compile(_MEDICAL_STOP_WORDS, re.IGNORECASE) def _clean_name_span(span: str) -> str: """Tronque le span au premier mot médical/stop word.""" tokens = span.split() clean = [] for t in tokens: if _stop_rx.fullmatch(t): break clean.append(t) return " ".join(clean).strip(" .-'") def _repl_person_ctx(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw # Tronquer avant les mots médicaux cleaned = _clean_name_span(span) if not cleaned: return raw tokens = [t for t in cleaned.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw audit.append(PiiHit(page_idx, "NOM", cleaned, PLACEHOLDERS["NOM"])) return raw.replace(cleaned, PLACEHOLDERS["NOM"]) line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line) # Passe supplémentaire : noms dans des listes virgulées après "Dr" # ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom for m in RE_DR_COMMA_LIST.finditer(line): fragment = m.group(0) # Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr") parts = [p.strip() for p in fragment.split(",")] for part in parts: # Extraire les tokens nom de chaque segment for tok in _NAME_TOKEN_RE.findall(part): if tok in wl_sections or len(tok) <= 2: continue if _stop_rx.fullmatch(tok): continue if tok not in line: continue # Vérifier qu'il n'est pas déjà masqué if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}: continue audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"])) line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line) return line def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str: """Masque les TEL et EMAIL même dans la partie 'clé' d'une ligne clé:valeur.""" def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] key = RE_TEL.sub(_repl_tel, key) key = RE_TEL_COMPACT.sub(_repl_tel, key) def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] key = RE_EMAIL.sub(_repl_email, key) return key def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: line = _mask_admin_label(line, audit, page_idx) parts = SPLITTER.split(line, maxsplit=1) if len(parts) == 2: key, value = parts masked_key = _mask_critical_in_key(key, audit, page_idx) masked_val = _mask_line_by_regex(value, audit, page_idx, cfg) return f"{masked_key.strip()} : {masked_val.strip()}" else: return _mask_line_by_regex(line, audit, page_idx, cfg) # ----------------- Extraction globale de noms ----------------- def _is_trackare_document(text: str) -> bool: """Détecte si le document est un export Trackare/TrakCare (DPI structuré).""" markers = ["Détails des patients", "Nom de naissance", "Dossier Patient"] t = text[:3000].lower() return sum(1 for m in markers if m.lower() in t) >= 2 def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: """Parse les champs structurés d'un document Trackare pour extraire les PII. Retourne (name_tokens, pii_hits) avec les noms à masquer et les hits additionnels.""" names: set = set() hits: List[PiiHit] = [] def _add_name(s: str): for tok in s.split(): tok = tok.strip(" .-'(),") if len(tok) >= 2 and tok[0].isupper(): names.add(tok) # --- Identité patient --- # Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire) for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Nom et Prénom: DIEGO PATRICIA for m in re.finditer(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Prénom de naissance / Prénom utilisé : REGINA for m in re.finditer(r"Pr[ée]nom\s+(?:de\s+naissance|utilis[ée])\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Lieu de naissance: BAYONNE → masquer comme VILLE for m in re.finditer(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) # Ville de résidence: TARNOS → masquer comme VILLE for m in re.finditer(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) # Code Postal (toutes occurrences) for m in re.finditer(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text): hits.append(PiiHit(-1, "CODE_POSTAL", m.group(1), PLACEHOLDERS["CODE_POSTAL"])) # N° épisode (= NDA, identifiant de séjour) for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text): hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]"))) # Adresse patient (toutes les occurrences) for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() if len(val) > 3: hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"])) # --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." --- for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text): _add_name(m.group(1).strip()) # --- Médecin courant (toutes occurrences) --- for m in re.finditer(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Médecin traitant (ligne après "Nom Adresse Téléphone") --- for m in re.finditer(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Contacts structurés --- # Pattern: Relation NOM PRENOM [ADRESSE] [TEL] for m in re.finditer( r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Prescripteurs / Exécutants (trackare) --- for m in re.finditer( r"(?:Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"(?:(?:Dr|Pr)\.?\s+)?" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Médecins urgences (IAO, prise en charge, décision) --- for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text): _add_name(m.group(1)) for m in re.finditer( r"Médecin\s+de\s+la\s+(?:prise\s+en\s+charge|décision)\s+médicale\s+" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Noms soignants dans les Notes d'évolution / Notes IDE / Notes médicales --- # Pattern: "Note IDE\nPrenom NOM" ou "Note d'évolution\nPrenom NOM" for m in re.finditer( r"Note\s+(?:IDE|AS|d'[ée]volution|m[ée]dicale|kin[ée])\s*\n\s*" r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][a-zéèàùâêîôûäëïöüç]+)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+)", full_text ): prenom, nom = m.group(1), m.group(2) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) if nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(nom) # --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins --- for m in re.finditer( r'\b([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})\s*\n\s*([A-ZÉÈÀÙÂÊÎÔÛ]{3,})\b', full_text ): prenom, nom = m.group(1), m.group(2) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET and nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) _add_name(nom) # Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement) city_tokens = {h.original for h in hits if h.kind == "VILLE"} filtered = set() for tok in names: if tok in city_tokens: filtered.add(tok) continue if len(tok) < 3: continue if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue filtered.add(tok) return filtered, hits def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: """Pré-scan du document brut pour extraire les noms de personnes depuis les champs structurés (Patient, Rédigé par, etc.). Retourne un ensemble de tokens (mots) à masquer globalement.""" wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) names: set = set() def _add_tokens(match_str: str): for token in match_str.split(): token = token.strip(" .-'") if len(token) < 3: continue if token.upper() in wl_sections or token in wl_phrases: continue if token.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(token) def _add_tokens_force_first(match_str): """Comme _add_tokens mais force le 1er token (contexte Dr/Mme fort).""" tokens = match_str.split() for i, token in enumerate(tokens): token = token.strip(" .-'") if len(token) < 2: continue if i == 0: # Premier token après Dr/Mme : toujours un nom, bypass stop words if token.upper() not in wl_sections: names.add(token) else: if len(token) < 3: continue if token.upper() in wl_sections or token in wl_phrases: continue if token.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(token) for m in RE_EXTRACT_PATIENT.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_REDIGE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_MME_MR.finditer(full_text): _add_tokens_force_first(m.group(1)) for m in RE_EXTRACT_DR_DEST.finditer(full_text): _add_tokens_force_first(m.group(1)) # Champs d'identité structurés (trackare / DPI) for m in RE_EXTRACT_NOM_NAISSANCE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_NOM_PRENOM.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text): _add_tokens(m.group(1)) # Contacts structurés (conjoint, concubin, etc.) for m in RE_EXTRACT_CONTACT.finditer(full_text): _add_tokens(m.group(1)) if m.group(2): _add_tokens(m.group(2)) # Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.) for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): _add_tokens(m.group(1)) # Pr / Professeur + nom(s) for m in RE_EXTRACT_PR.finditer(full_text): _add_tokens_force_first(m.group(1)) # Extraction des noms dans les listes virgulées après Dr/Docteur # ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé" for m in RE_DR_COMMA_LIST.finditer(full_text): fragment = m.group(0) parts = [p.strip() for p in fragment.split(",")] for part in parts: for tok in _NAME_TOKEN_RE.findall(part): tok = tok.strip(" .-'") if len(tok) < 3: continue if tok.upper() in wl_sections or tok in wl_phrases: continue if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(tok) # Retirer les sous-parties de noms composés avec tiret # Si "JEAN-PIERRE" est dans names, retirer "JEAN" et "PIERRE" individuels compound_names = {n for n in names if "-" in n} parts_to_remove = set() for compound in compound_names: for part in compound.split("-"): part = part.strip() if len(part) >= 2 and part in names: parts_to_remove.add(part) names -= parts_to_remove return names def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: """Remplace globalement chaque nom extrait dans le texte.""" placeholder = PLACEHOLDERS["NOM"] # Filtrer les stop words et tokens trop courts en dernière ligne de défense safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET} for token in sorted(safe_names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) new_text = [] last_end = 0 for m in pattern.finditer(text): # Ne pas remplacer si déjà dans un placeholder ctx_start = max(0, m.start() - 1) ctx_end = min(len(text), m.end() + 1) if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]: continue # Ne pas remplacer si le token fait partie d'un mot composé (tiret) if m.start() > 0 and text[m.start() - 1] == "-": continue if m.end() < len(text) and text[m.end()] == "-": continue audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder)) new_text.append(text[last_end:m.start()]) new_text.append(placeholder) last_end = m.end() new_text.append(text[last_end:]) text = "".join(new_text) return text # ----------------- Anonymisation (regex) ----------------- def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult: audit: List[PiiHit] = [] # Phase 0 : extraction globale des noms depuis les champs structurés full_raw = "\n".join(pages_text) + "\n" + "\n".join( "\n".join(rows) for rows in tables_lines ) extracted_names = _extract_document_names(full_raw, cfg) # Phase 0b : si document Trackare, extraction renforcée des PII structurés is_trackare = _is_trackare_document(full_raw) if is_trackare: trackare_names, trackare_hits = _extract_trackare_identity(full_raw) extracted_names.update(trackare_names) audit.extend(trackare_hits) # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): lines = [ln for ln in (page_txt or "").splitlines()] masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines] out_pages.append("\n".join(masked)) table_blocks: List[str] = [] for i, rows in enumerate(tables_lines): mbuf: List[str] = [] for r in rows: masked = _kv_value_only_mask(r, audit, i, cfg) mbuf.append(masked) if mbuf: table_blocks.append("\n".join(mbuf)) tables_block = "\n\n".join(table_blocks) text_out = "\f".join(out_pages) # séparateur de pages if tables_block.strip(): text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]" # Phase 2 : application globale des noms extraits (rattrapage) if extracted_names: text_out = _apply_extracted_names(text_out, extracted_names, audit) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) # ----------------- NER ONNX sur narratif ----------------- def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: # remplace via regex sur les 'word' détectés (approche pragmatique) keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", False)) def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or ""; grp = (e.get("entity_group") or e.get("entity") or "").upper() if not w or "[" in w or "]" in w: # ignore placeholders continue if len(w) <= 2: # trop court continue if grp in {"PER", "PERSON"}: audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"])) out = repl_once(out, w, PLACEHOLDERS["NOM"]) elif grp in {"ORG"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"])) out = repl_once(out, w, PLACEHOLDERS["ETAB"]) elif grp in {"LOC"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_LOC", w, PLACEHOLDERS["VILLE"])) out = repl_once(out, w, PLACEHOLDERS["VILLE"]) elif grp in {"DATE"}: # facultatif : si vous masquez déjà les dates via règles, laissez tel quel continue return out def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optional[NerModelManager], thresholds: Optional[NerThresholds]) -> Tuple[str, List[PiiHit]]: if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) tables: List[Tuple[int,int,str]] = [] keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages (séparées par \f) → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras, thresholds=thresholds) # remplace entités idx = 0 buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_hf(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- NER EDS-Pseudo sur narratif ----------------- def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: """Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key.""" def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or "" mapped_key = e.get("eds_mapped_key", "") if not w or "[" in w or "]" in w: continue if len(w) <= 2: continue # Filtrer les faux positifs NOM/PRENOM (médicaments, acronymes médicaux) label = e.get("entity_group", "EDS") if label in ("NOM", "PRENOM", "HOPITAL", "VILLE"): if w.lower() in _MEDICAL_STOP_WORDS_SET: continue # Filtrer aussi les tokens multi-mots dont un composant est un stop word if " " in w and any(part.lower() in _MEDICAL_STOP_WORDS_SET for part in w.split()): continue # Filtrer les dosages détectés comme noms (ex: "10MG", "300UI", "1 000") if re.match(r"^\d[\d\s]*(?:mg|MG|ml|ML|UI|µg|mcg|g|kg|%)?$", w.strip()): continue # Règles de validation heuristiques par type d'entité if label in ("NOM", "PRENOM"): # Rejeter si le contexte précédent (15 chars) contient un dosage pos = text.find(w) if pos > 0: ctx_before = text[max(0, pos - 15):pos] if re.search(r"\d+\s*(?:mg|UI|ml|µg|mcg)\b", ctx_before, re.IGNORECASE): continue elif label == "HOPITAL": _STRUCTURAL_WORDS = {"SERVICE", "POLE", "PÔLE", "UNITE", "UNITÉ", "SECTEUR"} if len(w) < 5: continue if w.upper() in _STRUCTURAL_WORDS: continue placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder)) out = repl_once(out, w, placeholder) return out def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager") -> Tuple[str, List[PiiHit]]: """Applique EDS-Pseudo sur le narratif (même structure que apply_hf_ner_on_narrative).""" if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras) buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_eds_pseudo(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- Selective safety rescan ----------------- def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: """Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage.""" # enlève TABLES du scope def strip_tables(s: str): kept = [] out = [] i = 0 pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) for m in pattern.finditer(s): out.append(s[i:m.start()]) kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1))) out.append("\x00" * (m.end() - m.start())) i = m.end() out.append(s[i:]) return "".join(out), kept protected, kept = strip_tables(text) # PII critiques (comme avant) protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected) protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected) protected = RE_TEL_COMPACT.sub(PLACEHOLDERS["TEL"], protected) protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected) # NIR avec validation def _rescan_nir(m: re.Match) -> str: return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0) protected = RE_NIR.sub(_rescan_nir, protected) # Nouvelles regex : dates de naissance, dates, adresses, codes postaux protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected) # protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected) # N° Episode protected = RE_EPISODE.sub(PLACEHOLDERS["EPISODE"], protected) # N° RPPS protected = RE_RPPS.sub(PLACEHOLDERS["RPPS"], protected) # Établissements protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected) protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected) # Services hospitaliers protected = RE_SERVICE.sub(PLACEHOLDERS["MASK"], protected) # Personnes contextuelles (avec whitelist) wl_sections = set() wl_phrases = set() if cfg: wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) def _rescan_person(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw tokens = [t for t in span.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw return raw.replace(span, PLACEHOLDERS["NOM"]) protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected) res = list(protected) for start, end, payload in kept: res[start:end] = list(payload) return "".join(res) # ----------------- PDF Redaction ----------------- def _search_ocr_words_fuzzy_digits(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect, min_ratio: float = 0.7) -> list: """Matching flou pour identifiants numériques manuscrits. Compare les séquences de chiffres entre le token VLM et les mots OCR. Accepte une correspondance si ≥ min_ratio des chiffres matchent.""" token_digits = re.sub(r"[^0-9]", "", token) if len(token_digits) < 4: return [] rects = [] for (word, x0n, y0n, x1n, y1n) in ocr_words: word_digits = re.sub(r"[^0-9]", "", word) if len(word_digits) < 3: continue # Match exact des chiffres (après nettoyage) if word_digits == token_digits: rects.append(fitz.Rect( x0n * page_rect.width, y0n * page_rect.height, x1n * page_rect.width, y1n * page_rect.height, )) continue # Match partiel : le token est contenu dans le mot OCR ou vice-versa if token_digits in word_digits or word_digits in token_digits: if min(len(token_digits), len(word_digits)) / max(len(token_digits), len(word_digits)) >= min_ratio: rects.append(fitz.Rect( x0n * page_rect.width, y0n * page_rect.height, x1n * page_rect.width, y1n * page_rect.height, )) continue # Match par distance : comparer caractère par caractère (Hamming-like) if abs(len(word_digits) - len(token_digits)) <= 2: shorter, longer = (word_digits, token_digits) if len(word_digits) <= len(token_digits) else (token_digits, word_digits) matches = sum(1 for a, b in zip(shorter, longer) if a == b) if matches / len(longer) >= min_ratio: rects.append(fitz.Rect( x0n * page_rect.width, y0n * page_rect.height, x1n * page_rect.width, y1n * page_rect.height, )) return rects def _search_ocr_words(ocr_words: List[Tuple[str, float, float, float, float]], token: str, page_rect) -> list: """Cherche un token dans les mots OCR d'une page. Pour les tokens multi-mots, cherche chaque mot individuellement. Retourne des fitz.Rect en coordonnées PDF points.""" rects = [] tokens_to_search = token.split() if " " in token else [token] for t in tokens_to_search: t_lower = t.lower().strip() if not t_lower: continue for (word, x0n, y0n, x1n, y1n) in ocr_words: if word.lower().strip(".,;:!?()") == t_lower: rects.append(fitz.Rect( x0n * page_rect.width, y0n * page_rect.height, x1n * page_rect.width, y1n * page_rect.height, )) return rects def _search_whole_word(page, token: str) -> list: """Cherche un token comme mot entier (pas substring) via get_text('words'). Évite les faux positifs de page.search_for() qui fait du substring matching.""" rects = [] token_lower = token.lower().strip() for w in page.get_text("words"): # w = (x0, y0, x1, y1, word, block_no, line_no, word_no) word_text = w[4].strip(".,;:!?()[]{}\"'«»-–—/\\") if word_text.lower() == token_lower: rects.append(fitz.Rect(w[0], w[1], w[2], w[3])) return rects def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, ocr_word_map: OcrWordMap = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)) # index hits par page; page==-1 → rechercher sur toutes pages by_page: Dict[int, List[PiiHit]] = {} for h in audit: by_page.setdefault(h.page, []).append(h) # Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte, # pas dans le PDF où elles rendent les tableaux illisibles) _VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} # Kinds dont les tokens courts (< 5) risquent le substring matching via page.search_for() _VECTOR_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} for pno in range(len(doc)): page = doc[pno] hits = by_page.get(pno, []) + by_page.get(-1, []) if not hits: continue # Dédupliquer les tokens : (token, kind) → rechercher une seule fois par page seen_tokens: set = set() all_rects = [] for h in hits: token = h.original.strip() if not token: continue if h.kind in _VECTOR_SKIP_KINDS: continue # Clé de déduplication : le token lui-même (même token cherché une seule fois) dedup_key = token if dedup_key in seen_tokens: continue seen_tokens.add(dedup_key) if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: rects = _search_whole_word(page, token) if not rects and ocr_word_map and pno in ocr_word_map: rects = _search_ocr_words(ocr_word_map[pno], token, page.rect) all_rects.extend(rects) continue rects = page.search_for(token) if not rects and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) if compact != token: rects = page.search_for(compact) if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue if not word[0].isupper(): continue rects.extend(page.search_for(word)) if not rects and ocr_word_map and pno in ocr_word_map: rects = _search_ocr_words(ocr_word_map[pno], token, page.rect) all_rects.extend(rects) # Appliquer toutes les annotations d'un coup (évite de ralentir search_for) for r in all_rects: page.add_redact_annot(r, fill=(0, 0, 0)) try: page.apply_redactions() except Exception: pass doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False) doc.close() def _rasterize_page(args): """Worker parallèle : rasterise une page + dessine les rectangles noirs.""" pdf_path_str, pno, rects_tuples, dpi, ogc_label = args doc = fitz.open(pdf_path_str) src = doc[pno] rect_w, rect_h = src.rect.width, src.rect.height zoom = dpi / 72.0 pix = src.get_pixmap(matrix=fitz.Matrix(zoom, zoom), annots=False) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) draw = ImageDraw.Draw(img) shrink = 1.5 for (x0, y0, x1, y1) in rects_tuples: rx0 = x0 * zoom + shrink ry0 = y0 * zoom rx1 = x1 * zoom - shrink ry1 = y1 * zoom if rx1 > rx0: draw.rectangle([rx0, ry0, rx1, ry1], fill=(0, 0, 0)) if ogc_label: from PIL import ImageFont font_size = int(14 * zoom) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except Exception: font = ImageFont.load_default() text = ogc_label if ogc_label.upper().startswith("OGC") else f"OGC: {ogc_label}" bbox = draw.textbbox((0, 0), text, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] margin = int(10 * zoom) x = img.width - tw - margin y = margin draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255)) draw.text((x, y), text, fill=(0, 0, 0), font=font) buf = io.BytesIO() img.save(buf, format="PNG") doc.close() return pno, buf.getvalue(), rect_w, rect_h def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None, ocr_word_map: OcrWordMap = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)) all_rects: Dict[int, List["fitz.Rect"]] = {} _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} _VLM_NUMERIC_KINDS = {"VLM_NUM_PATIENT", "VLM_NUM_LOT", "VLM_NUM_ORD", "VLM_NDA", "VLM_NIR", "VLM_IPP", "VLM_RPPS"} by_page: Dict[int, List[PiiHit]] = {} for h in audit: by_page.setdefault(h.page, []).append(h) for pno in range(len(doc)): page = doc[pno] rects = [] seen_tokens: set = set() hits = by_page.get(pno, []) + by_page.get(-1, []) # Masquage total si FULL_PAGE_MASK détecté (page manuscrite non déchiffrable) if any(h.kind == "FULL_PAGE_MASK" and h.page == pno for h in hits): margin = 5 # points — liseré fin autour du masque rects.append(fitz.Rect(margin, margin, page.rect.width - margin, page.rect.height - margin)) all_rects[pno] = rects continue for h in hits: token = h.original.strip() if not token or h.kind in _RASTER_SKIP_KINDS: continue if token in seen_tokens: continue seen_tokens.add(token) if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: found_short = _search_whole_word(page, token) if not found_short and ocr_word_map and pno in ocr_word_map: found_short = _search_ocr_words(ocr_word_map[pno], token, page.rect) rects.extend(found_short) continue found = page.search_for(token) if not found and h.kind in {"NIR", "IBAN", "TEL", "VLM_TEL", "VLM_NIR"}: compact = re.sub(r"\s+", "", token) found = page.search_for(compact) if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM", "VLM_NOM", "VLM_ETAB", "VLM_SERVICE"}: for word in token.split(): word = word.strip(" .-'") if len(word) < 3 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue found.extend(page.search_for(word)) # Fallback OCR pour chaque mot if not found and ocr_word_map and pno in ocr_word_map: found.extend(_search_ocr_words(ocr_word_map[pno], word, page.rect)) if not found and ocr_word_map and pno in ocr_word_map: found = _search_ocr_words(ocr_word_map[pno], token, page.rect) # Matching flou pour identifiants numériques VLM (manuscrit) if not found and h.kind in _VLM_NUMERIC_KINDS and ocr_word_map and pno in ocr_word_map: found = _search_ocr_words_fuzzy_digits(ocr_word_map[pno], token, page.rect) rects.extend(found) all_rects[pno] = rects # Phase 2 : rasterisation parallèle (ProcessPoolExecutor) n_pages = len(doc) rects_as_tuples = { pno: [(r.x0, r.y0, r.x1, r.y1) for r in rects] for pno, rects in all_rects.items() } doc.close() # fermer AVANT le fork n_workers = min(n_pages, os.cpu_count() or 4) tasks = [ (str(original_pdf), pno, rects_as_tuples.get(pno, []), dpi, ogc_label) for pno in range(n_pages) ] with ProcessPoolExecutor(max_workers=n_workers) as pool: results = sorted(pool.map(_rasterize_page, tasks), key=lambda x: x[0]) # Assemblage final (séquentiel, rapide) out = fitz.open() for pno, png_bytes, w, h in results: dst = out.new_page(width=w, height=h) dst.insert_image(fitz.Rect(0, 0, w, h), stream=png_bytes) out.save(str(out_pdf), deflate=True, garbage=4, clean=True) out.close() # ----------------- VLM pour PDFs scannés ----------------- def _apply_vlm_on_scanned_pdf(pdf_path: Path, anon: AnonResult, ocr_word_map: OcrWordMap, vlm_manager) -> None: """Utilise un VLM (Ollama) pour détecter visuellement les PII sur chaque page d'un PDF scanné. Les entités détectées sont ajoutées à anon.audit et au texte pseudonymisé. Auto-rotation : si une page a peu de mots OCR, essaie 4 orientations.""" from vlm_manager import VLM_CATEGORY_MAP doc = fitz.open(str(pdf_path)) # Collecter les PII déjà détectés pour contexte VLM existing_pii = list({h.original.strip() for h in anon.audit if h.original.strip()}) # Catégories contenant des identifiants numériques (matching flou) _NUMERIC_CATS = {"NUMERO_PATIENT", "NUMERO_LOT", "NUMERO_ORDONNANCE", "NUMERO_SEJOUR", "NDA", "NIR", "IPP", "RPPS"} # Catégories à splitter en mots (noms, services, établissements) _SPLIT_CATS = {"NOM", "PRENOM", "ETABLISSEMENT", "SERVICE"} for pno in range(len(doc)): pix = doc[pno].get_pixmap(dpi=150) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) # Détection page manuscrite : peu de mots OCR = scan manuscrit / mal orienté ocr_count = len(ocr_word_map.get(pno, [])) is_handwritten_page = ocr_count < 100 # Pages manuscrites : masquage total direct (VLM trop lent/hallucinatoire) if is_handwritten_page and ocr_count > 0: anon.audit.append(PiiHit(page=pno, kind="FULL_PAGE_MASK", original="page manuscrite", placeholder=PLACEHOLDERS["MASK"])) log.info("VLM page %d : masquage total direct (OCR=%d mots)", pno, ocr_count) continue # Pages lisibles : analyse VLM best_entities = [] try: best_entities = vlm_manager.analyze_page_image(img, page_number=pno, existing_pii=existing_pii[:20]) except Exception: best_entities = [] for ent in best_entities: cat = ent.get("categorie", "").upper() texte = ent.get("texte", "").strip() conf = ent.get("confiance", 0.0) if not texte or conf < 0.3: continue if cat not in VLM_CATEGORY_MAP: continue kind, placeholder_key = VLM_CATEGORY_MAP[cat] placeholder = PLACEHOLDERS.get(placeholder_key, PLACEHOLDERS["MASK"]) if cat in _SPLIT_CATS: # Splitter en mots pour meilleur matching OCR for word in texte.split(): word = word.strip(" .-'(),") if len(word) < 2 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue anon.audit.append(PiiHit(page=pno, kind=kind, original=word, placeholder=placeholder)) else: anon.audit.append(PiiHit(page=pno, kind=kind, original=texte, placeholder=placeholder)) # Pour les identifiants numériques, ajouter aussi le token nettoyé (chiffres seuls) if cat in _NUMERIC_CATS: digits_only = re.sub(r"[^0-9]", "", texte) if digits_only and digits_only != texte: anon.audit.append(PiiHit(page=pno, kind=kind, original=digits_only, placeholder=placeholder)) # Remplacer dans le texte pseudonymisé si trouvé try: anon.text_out = re.sub(rf"\b{re.escape(texte)}\b", placeholder, anon.text_out) except re.error: anon.text_out = anon.text_out.replace(texte, placeholder) doc.close() # ----------------- Orchestration ----------------- def process_pdf( pdf_path: Path, out_dir: Path, make_vector_redaction: bool = True, also_make_raster_burn: bool = False, config_path: Optional[Path] = None, use_hf: bool = False, ner_manager=None, ner_thresholds=None, ogc_label: Optional[str] = None, vlm_manager=None, ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) pages_text, tables_lines, ocr_used, ocr_word_map = extract_text_with_fallback_ocr(pdf_path) # 1) Regex rules anon = anonymise_document_regex(pages_text, tables_lines, cfg) # 1b) VLM (optionnel) — sur les PDFs scannés uniquement if ocr_used and vlm_manager is not None and VlmManager is not None: try: if vlm_manager.is_loaded(): _apply_vlm_on_scanned_pdf(pdf_path, anon, ocr_word_map, vlm_manager) except Exception: pass # dégradation gracieuse # 2) NER (optionnel) — sur le narratif final_text = anon.text_out hf_hits: List[PiiHit] = [] if use_hf and ner_manager is not None and ner_manager.is_loaded(): # Détecter le type de manager et appeler la bonne fonction if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager): final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager) else: final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds) anon.audit.extend(hf_hits) # 3) Rescan selectif final_text = selective_rescan(final_text, cfg=cfg) # 3b) Nettoyage post-masquage : codes postaux orphelins (5 chiffres collés à un placeholder) # et téléphones fragmentés sur plusieurs lignes _re_cp_orphan = re.compile(r"(\[(?:ADRESSE|NOM|VILLE)\])\s*(\d{5})\b") def _clean_cp_orphan(m): anon.audit.append(PiiHit(-1, "CODE_POSTAL", m.group(2), PLACEHOLDERS["CODE_POSTAL"])) return m.group(1) + PLACEHOLDERS["CODE_POSTAL"] final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text) # Téléphones fragmentés : "0X XX XX XX\nXX" coupé en fin de ligne (ligne suivante immédiate) _re_tel_frag = re.compile(r"((?:\+33\s?|0)\d(?:[ .-]?\d){6,7})\s*\n\s*(\d{2}(?!\d))") def _clean_tel_frag(m): full = m.group(1).replace(" ", "").replace(".", "").replace("-", "") + m.group(2) if len(full.replace("+33", "0")) == 10: anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] + "\n" return m.group(0) final_text = _re_tel_frag.sub(_clean_tel_frag, final_text) # Téléphones incomplets en fin de ligne (8 ou 9 chiffres au format 0X XX XX XX) : masquer la partie visible _re_tel_partial = re.compile(r"(?=5 chars) car le texte peut les scinder sur des lignes séparées _compound = {t for t in _global_name_tokens if "-" in t} _parts_to_drop = set() for comp in _compound: for part in comp.split("-"): part = part.strip() if len(part) >= 2 and len(part) < 5 and part in _global_name_tokens: _parts_to_drop.add(part) _global_name_tokens -= _parts_to_drop # 4a-ter) Filtrage final des tokens globaux : rejeter les mots qui ne ressemblent pas à des noms propres # - Mots courants français (minuscule initiale déjà filtrés en amont) # - ALL-CAPS <= 4 chars confirmés par une seule source seulement _nom_kind_counts: Dict[str, set] = {} for h in anon.audit: if h.kind in _nom_kinds: for word in h.original.split(): word = word.strip(" .-'") if word: _nom_kind_counts.setdefault(word, set()).add(h.kind) _filtered_global: set = set() for token in _global_name_tokens: # ALL-CAPS court (<=4) avec une seule source → probablement une abréviation if token.isupper() and len(token) <= 4 and len(_nom_kind_counts.get(token, set())) < 2: continue _filtered_global.add(token) _global_name_tokens = _filtered_global for token in _global_name_tokens: anon.audit.append(PiiHit(page=-1, kind="NOM_GLOBAL", original=token, placeholder=PLACEHOLDERS["NOM"])) # 4b) TEL, EMAIL, ADRESSE, CODE_POSTAL : propager les valeurs uniques sur toutes les pages _global_pii: Dict[str, set] = {} for h in anon.audit: if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB", "VLM_SERVICE", "VLM_ETAB", "DATE_NAISSANCE", "force_term", "force_regex"}: _global_pii.setdefault(h.kind, set()).add(h.original.strip()) for kind, values in _global_pii.items(): placeholder = PLACEHOLDERS.get(kind, PLACEHOLDERS["MASK"]) for val in values: anon.audit.append(PiiHit(page=-1, kind=f"{kind}_GLOBAL", original=val, placeholder=placeholder)) # 4e) Appliquer les tokens globaux sur le texte pseudonymisé _GLOBAL_SKIP_KINDS = {"EDS_DATE_GLOBAL"} for h in anon.audit: if h.page != -1: continue if not (h.kind == "NOM_GLOBAL" or h.kind.endswith("_GLOBAL")): continue if h.kind in _GLOBAL_SKIP_KINDS: continue token = h.original.strip() if not token or len(token) < 3: continue # Garde trackare : NOM_GLOBAL très court (<=3) risque de masquer des codes diagnostics if anon.is_trackare and h.kind == "NOM_GLOBAL" and len(token) <= 3: continue try: pat = re.escape(token) # Noms composés : tolérer les sauts de ligne/espaces autour du tiret if "-" in token: pat = pat.replace(r"\-", r"\-\s*") final_text = re.sub(rf"\b{pat}\b", h.placeholder, final_text) except re.error: final_text = final_text.replace(token, h.placeholder) # Log OCR dans l'audit if ocr_used: anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder="")) # Sauvegardes base = pdf_path.stem txt_path = out_dir / f"{base}.pseudonymise.txt" audit_path = out_dir / f"{base}.audit.jsonl" txt_path.write_text(final_text, encoding="utf-8") with audit_path.open("w", encoding="utf-8") as f: for hit in anon.audit: f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n") outputs = {"text": str(txt_path), "audit": str(audit_path)} # PDFs if make_vector_redaction and fitz is not None: vec_path = out_dir / f"{base}.redacted_vector.pdf" try: redact_pdf_vector(pdf_path, anon.audit, vec_path, ocr_word_map=ocr_word_map) outputs["pdf_vector"] = str(vec_path) except Exception: pass if also_make_raster_burn and fitz is not None: ras_path = out_dir / f"{base}.redacted_raster.pdf" redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label, ocr_word_map=ocr_word_map) outputs["pdf_raster"] = str(ras_path) return outputs def process_pdfs_batch( pdf_paths: List[Path], out_dir: Path, max_workers: int = None, **kwargs, ) -> List[Dict[str, str]]: """Traite plusieurs PDFs en parallèle (ProcessPoolExecutor). Ne fonctionne que quand ner_manager=None (les modèles NER ne sont pas picklables). Quand NER est actif, les PDFs restent séquentiels mais bénéficient de la parallélisation page-level de redact_pdf_raster(). """ if not pdf_paths: return [] if max_workers is None: max_workers = min(len(pdf_paths), os.cpu_count() or 4) out_dir.mkdir(parents=True, exist_ok=True) def _one(pdf_path): return process_pdf(pdf_path, out_dir, **kwargs) with ProcessPoolExecutor(max_workers=max_workers) as pool: return list(pool.map(_one, pdf_paths)) if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)") ap.add_argument("pdf", type=str) ap.add_argument("--out", type=str, default="out") ap.add_argument("--no-vector", action="store_true") ap.add_argument("--raster", action="store_true") ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml"))) ap.add_argument("--hf", action="store_true", help="Activer NER ONNX sur narratif (nécessite ner_manager_onnx)") ap.add_argument("--model", type=str, default="cmarkea/distilcamembert-base-ner") args = ap.parse_args() manager = None if args.hf and NerModelManager is not None: manager = NerModelManager(cache_dir=Path("models")) manager.load(args.model) outs = process_pdf( Path(args.pdf), Path(args.out), make_vector_redaction=not args.no_vector, also_make_raster_burn=args.raster, config_path=Path(args.config), use_hf=bool(args.hf), ner_manager=manager, ner_thresholds=NerThresholds() if NerThresholds else None, ) print(json.dumps(outs, indent=2, ensure_ascii=False))