#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Core d'anonymisation (v2.1) + NER ONNX (optionnel, narratif uniquement) ------------------------------------------------------------------------ - Extraction 2 passes (pdfplumber -> pdfminer) + fallback 3e passe PyMuPDF si texte pauvre ou (cid:xx) - Règles regex (PII critiques) + clé:valeur (masquer valeur seulement) + overrides YAML - Rescan sécurité **sélectif** (EMAIL/TEL/IBAN/NIR), jamais dans [TABLES] - Redaction PDF (vector/raster) via PyMuPDF - NER ONNX **optionnel** (CamemBERT family) appliqué **après** les règles, sur le narratif Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), transformers, optimum, onnxruntime """ from __future__ import annotations import io import json import re from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional, Any import pdfplumber from pdfminer.high_level import extract_text as pdfminer_extract_text from pdfminer.layout import LAParams from PIL import Image, ImageDraw try: import fitz # PyMuPDF except Exception: fitz = None try: import yaml # PyYAML for dictionaries except Exception: yaml = None try: from doctr.models import ocr_predictor as _doctr_ocr_predictor _DOCTR_AVAILABLE = True except Exception: _doctr_ocr_predictor = None # type: ignore _DOCTR_AVAILABLE = False # NER manager (facultatif) try: from ner_manager_onnx import NerModelManager, NerThresholds except Exception: NerModelManager = None # type: ignore NerThresholds = None # type: ignore # EDS-Pseudo manager (facultatif) try: from eds_pseudo_manager import EdsPseudoManager except Exception: EdsPseudoManager = None # type: ignore def _load_edsnlp_drug_names() -> set: """Charge les noms de médicaments mono-mot depuis edsnlp/resources/drugs.json. Retourne un set lowercase. Fallback silencieux si edsnlp absent.""" try: import edsnlp as _edsnlp drugs_path = _edsnlp.BASE_DIR / "resources" / "drugs.json" if not drugs_path.exists(): return set() import json as _json data = _json.loads(drugs_path.read_text(encoding="utf-8")) result = set() for _code, names in data.items(): for name in names: if " " not in name and len(name) >= 4: result.add(name.lower()) return result except Exception: return set() # ----------------- Defaults & Config ----------------- DEFAULTS_CFG = { "version": 1, "encoding": "utf-8", "normalization": "NFKC", "whitelist": { "sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"], "noms_maj_excepts": ["Médecin DIM", "Praticien conseil"], "org_gpe_keep": True, }, "blacklist": { "force_mask_terms": [], "force_mask_regex": [], }, "kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"], "regex_overrides": [ { "name": "OGC_court", "pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b", "placeholder": "[OGC]", "flags": ["IGNORECASE"], } ], "flags": { "case_insensitive": True, "unicode_word_boundaries": True, "regex_engine": "python", }, } PLACEHOLDERS = { "EMAIL": "[EMAIL]", "TEL": "[TEL]", "IBAN": "[IBAN]", "NIR": "[NIR]", "IPP": "[IPP]", "FINESS": "[FINESS]", "OGC": "[OGC]", "NOM": "[NOM]", "VILLE": "[VILLE]", "ETAB": "[ETABLISSEMENT]", "MASK": "[MASK]", "DATE": "[DATE]", "DATE_NAISSANCE": "[DATE_NAISSANCE]", "ADRESSE": "[ADRESSE]", "CODE_POSTAL": "[CODE_POSTAL]", "AGE": "[AGE]", "DOSSIER": "[DOSSIER]", "NDA": "[NDA]", "EPISODE": "[EPISODE]", "RPPS": "[RPPS]", } CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"} # Baseline regex RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") RE_TEL = re.compile(r"(? bool: """Vérifie la clé modulo 97 d'un NIR (13 chiffres + 2 clé). Supporte la Corse (2A/2B).""" digits_only = re.sub(r"\s+", "", nir_raw) if len(digits_only) < 15: return False body_str = digits_only[:13] key_str = digits_only[13:15] # Corse : 2A → 19, 2B → 18 (pour le calcul) body_str_calc = body_str.upper().replace("2A", "19").replace("2B", "18") try: body_int = int(body_str_calc) key_int = int(key_str) except ValueError: return False return key_int == (97 - (body_int % 97)) # Mots médicaux/techniques/courants qui ne sont pas des noms de personnes _MEDICAL_STOP_WORDS_SET = { # Mots français courants (déterminants, prépositions, adverbes, etc.) "pas", "mon", "bien", "ancien", "ancienne", "bon", "bonne", "tout", "tous", "mais", "donc", "car", "que", "qui", "avec", "dans", "pour", "sur", "par", "les", "des", "une", "est", "son", "ses", "nos", "aux", "cette", "ces", "cher", "chez", "entre", "sans", "sous", "vers", "selon", "après", "avant", "puis", "aussi", "très", "plus", "moins", "peu", "non", "oui", "quelques", "mise", "début", "fin", "suite", "fait", "lieu", "cas", "jour", "jours", "semaine", "semaines", "mois", "temps", "place", "nouvelle", "nouveau", "franche", "légère", "quelque", "depuis", "comme", "encore", "votre", "date", "note", "notes", "nom", "heure", "matin", "soir", "midi", "signé", "réalisé", "courrier", "cabinet", "rue", # Verbes / participes courants "remontée", "associée", "réalisée", "débuté", "prolongé", "prolongée", "prescrit", "prescrite", "présente", "présent", "absente", "absent", "reprise", "introduction", "arrêt", "relais", # Titres / rôles hospitaliers "chef", "assistant", "assistante", "praticien", "praticienne", "docteur", "professeur", "hospitalier", "hospitalière", "hospitaliers", "spécialiste", "contractuel", "contractuelle", "titulaire", "confrère", "consoeur", "coordonnateur", "coordonnatrice", "médecin", "médical", "infirmier", "infirmière", "praticiens", "patient", "patiente", # Structure hospitalière "service", "pôle", "clinique", "consultation", "secrétariat", "hôpital", "hôpitaux", "centre", "établissement", "polyclinique", # Villes / géographie (pas des noms de personnes) "bordeaux", "bayonne", "paris", "lyon", "lille", "marseille", "toulouse", "nantes", "montpellier", "pessac", "biarritz", "soustons", "basque", "basques", "sud", "côte", # Médicaments génériques et spécialités (DCI + noms commerciaux) "colchicine", "aspirine", "cortancyl", "bisoprolol", "entresto", "methotrexate", "eplerenone", "speciafoldine", "prednisone", "corticoïdes", "cortisone", "paracetamol", "metformine", "solupred", "novorapid", "abasaglar", "lovenox", "methylprednisolone", "potassium", "humalog", "furosemide", "insuline", "trulicity", "forxiga", "atorvastatine", "amlodipine", "ondansetron", "eliquis", "nebivolol", "gaviscon", "loxen", "morphine", "oxycodone", "kardegic", "tercian", "zopiclone", "seresta", "tramadol", "alprazolam", "forlax", "levothyrox", "bromazepam", "gliclazide", "zymad", "pravastatine", "spiriva", "quetiapine", "sertraline", "crestor", "lercanidipine", "amoxicilline", "opocalcium", "ferinject", "candesartan", "ceftriaxone", "calcidose", "laroxyl", "brintellix", "ketoprofene", "adrenaline", "exacyl", "terbutaline", "ipratropium", "actiskenan", "vialebex", "oxynormoro", "lansoprazole", "perindopril", "sodium", "velmetia", "doliprane", "dafalgan", "efferalgan", "spasfon", "vogalene", "augmentin", "inexium", "omeprazole", "pantoprazole", "esomeprazole", "ramipril", "lisinopril", "enalapril", "losartan", "valsartan", "irbesartan", "olmesartan", "telmisartan", "hydrochlorothiazide", "spironolactone", "furosemide", "lasilix", "aldactone", "tahor", "crestor", "rosuvastatine", "simvastatine", "fluvastatine", "xarelto", "pradaxa", "apixaban", "rivaroxaban", "dabigatran", "plavix", "clopidogrel", "ticagrelor", "brilique", "ventoline", "seretide", "symbicort", "salmeterol", "fluticasone", "salbutamol", "tiotropium", "budesonide", "beclometasone", "oxycodone", "oxynorm", "skenan", "actiskenan", "fentanyl", "nubain", "nalbuphine", "nefopam", "acupan", "profenid", "ibuprofene", "diclofenac", "naproxene", "celecoxib", "gabapentine", "pregabaline", "lyrica", "neurontin", "amitriptyline", "duloxetine", "venlafaxine", "fluoxetine", "paroxetine", "escitalopram", "citalopram", "mirtazapine", "olanzapine", "risperidone", "aripiprazole", "haloperidol", "loxapine", "cyamemazine", "diazepam", "oxazepam", "lorazepam", "clonazepam", "midazolam", "hydroxyzine", "atarax", "melatonine", "stilnox", "zolpidem", "imovane", "levothyroxine", "metformine", "glimepiride", "sitagliptine", "januvia", "jardiance", "empagliflozine", "dapagliflozine", "ozempic", "semaglutide", "dulaglutide", "liraglutide", "victoza", "heparine", "enoxaparine", "tinzaparine", "innohep", "warfarine", "coumadine", "fluindione", "previscan", "ciprofloxacine", "levofloxacine", "ofloxacine", "metronidazole", "vancomycine", "gentamicine", "tazocilline", "piperacilline", "meropenem", "imipenem", "clindamycine", "doxycycline", "azithromycine", "clarithromycine", "cotrimoxazole", "bactrim", "polyionique", "propranolol", "apidra", "solostar", # Suffixes laboratoires pharmaceutiques "arw", "myl", "myp", "arg", "teva", "bga", "agt", # Formes galéniques / voies d'administration "cpr", "sachet", "orale", "oral", "sol", "buv", "stylo", "flexpen", "flestouch", "kwikpen", "inj", "susp", "gelule", "comprime", "unidose", "perf", "inh", "seringue", "aerosol", "sach", "pdr", "orodisp", "capsule", "patch", "suppositoire", "gouttes", # Termes de prescription / pharmacie "prescription", "prescriptions", "dose", "fréquence", "statut", "technique", "capteur", "bandelettes", "glycemiques", "glycemique", "lancettes", "aiguilles", "fines", "micro", "pompe", "réserve", "glycemie", "capillaire", "hgt", # Termes médicaux / cliniques "myocardite", "myosite", "corticothérapie", "biopsie", "pathologie", "dysimmunitaire", "récidive", "récidivante", "traitement", "diagnostic", "antécédents", "examen", "bilan", "résultats", "analyse", "interne", "externe", "médecine", "chirurgie", "rhumatologie", "dermatologie", "immunologie", "cardiologie", "pneumologie", "neurologie", "gynécologie", "radiologie", "sénologie", "douleur", "douleurs", "douloureux", "musculaire", "musculaires", "thoracique", "thoraciques", "membres", "supérieurs", "inférieurs", "normale", "normaux", "habituelle", "habituelles", "synthèse", "hospitalisation", "syndrome", "vaccination", "ophtalmo", "pelvien", "diabétique", "sommeil", "régime", "diet", "desinfection", "environnement", "identification", "bracelet", "toilettes", "accompagner", "installer", "transfusion", "signes", "vitaux", "alimentaire", "avis", "zone", "calcémie", # Abréviations médicales "irm", "ett", "ecg", "mtx", "fevg", "bdc", "crp", "sfu", "hdj", "bnp", "asat", "alat", "cpk", "ctc", "hba", "hba1c", "saos", "tsh", "inr", "vgm", "pnn", "plq", "hb", "poc", "bax", "act", "bic", "cfx", "acc", "ado", "acf", "vfo", "qvl", "cci", "pse", "pca", "chl", "crt", "bbm", "pds", "ren", "vit", "zen", "scanner", "radio", "écho", "échographie", # Spécialités médicales (éviter faux positifs NOM) "hépato-gastro-entérologue", "gastro-entérologue", "gastro-entérologie", "proctologue", "oncologue", "anesthésiste", "pneumologue", "gérontologue", "cardiologue", "néphrologue", "urologue", "gériatre", "hépatologue", "endocrinologue", "stomatologue", # Termes médicaux / titres fréquemment détectés comme NOM par le NER "supplémentation", "supplementation", "endocrinologie", "monsieur", "madame", "suivi", "sortie", "emog", "ophtalmo", # Médicaments détectés comme NOM/PRENOM par EDS-Pseudo "eliquis", "trulicity", "saos", "wind", "taxotere", "eupantol", "ezetimibe", "lansoyl", "xatral", "xenetix", "trimbow", "buspirone", "cetirizine", "depakote", "versatis", "durogesic", "montelukast", "metformine", "viatris", "rosuvastatine", "gliclazide", "amlodipine", "perindopril", "nebivolol", "pravastatine", "bisoprolol", "amoxicilline", "kardegic", "lovenox", # Termes médicaux / soins / actes détectés comme NOM "partielle", "cutanee", "cutané", "cutanée", "osseuse", "diabetique", "diabétique", "transdermique", "transderm", "diarrhees", "diarrhées", "ionogramme", "scintigraphie", "thoraco", "thorax", "négative", "negative", "diététicienne", "pressurise", "pressuriser", "inhalee", "inhalée", "inhal", # Mots courants français détectés comme NOM dans les trackare "toilette", "repas", "poche", "installation", "education", "éducation", "refection", "réfection", "complete", "complète", "regime", "régime", "normal", "traité", "traite", "arrêté", "arrete", "volume", "commentaires", "france", "covid", "framboise", "epoux", "époux", # Abréviations médicales courtes (3-4 chars) détectées comme NOM "ide", "ipp", "pcr", "tap", "gel", "ahl", "ssr", "hds", "tca", "etp", "mcg", "sdz", "iao", "ser", "orod", "clav", "disp", "cart", "atcd", "mdrd", "amox", "endoc", "microg", "item", "pyélo", "néphro", # En-têtes de colonnes / mots structurels trackare "observations", "observation", "commentaires", "commentaire", "surveillance", "température", "temperature", "glycémie", "glycemie", "diurèse", "diurese", "balance", "pouls", "systolique", "diastolique", "saturation", "fréquence", "frequence", "respiratoire", "douleur", "alertes", "alerte", "antécédents", "antecedents", "habitus", "allergies", "prescriptions", "prescription", "administration", "catégorie", "categorie", "expiration", "message", "destination", "diagnostique", "diagnostiques", "date", "note", "nom", "heure", "type", "code", "etat", "comprime", "comprimé", "gelule", "gélule", "solution", "injectable", # Médicaments supplémentaires détectés dans les trackare "depakote", "versatis", "humalog", "forxiga", "durogesic", "montelukast", "rosuvastatine", # Abréviations pharma courtes "cpr", "sol", "bic", "agt", "poche", "inhal", "regina", # Faux positifs EDS supplémentaires "psy", "inhales", "inhalés", "kwikpen", "lansoprazole", "tiorfan", "smecta", "axa", "ttt", "anionique", "abdomino", "cod", "omi", "urg", "med", "10mg", "20mg", "40mg", "100mg", "300ui", "500ml", "innohep", "coaprovel", "actiskenan", "simvastatine", "forlax", # Mots temporels / contextuels détectés comme EDS_HOPITAL "semaine", "jour", "matin", "soir", "nuit", "midi", # Mots clés de contexte document "compétences", "maladies", "inflammatoires", "systémiques", "rares", "fret", "fax", "contexte", "résultat", "resultat", "résultats", "resultats", "haute", "maison", "aide", "rpps", "poste", "fonct", "sante", "santé", "etxe", "ttipi", "gastro", "concha", "endoscopie", "endoscopique", "fibroscopie", "indication", "conclusion", "technique", "anesthésie", "digestif", "digestive", "digestives", "nutritive", } # Enrichissement automatique avec les ~4000 noms de médicaments d'edsnlp _MEDICAL_STOP_WORDS_SET.update(_load_edsnlp_drug_names()) _MEDICAL_STOP_WORDS = ( r"(?:" + "|".join(re.escape(w) for w in _MEDICAL_STOP_WORDS_SET) + r")" ) # Un token de nom : commence par majuscule, lettres/tirets/apostrophes (PAS d'espace ni de point) _PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_PERSON_CONTEXT = re.compile( r"(?:(?:Dr\.?|DR\.?|Docteur|Pr\.?|Professeur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" r"|Nom\s*:\s*" r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par|Réalisé\s+par" r")\s+)" rf"({_PERSON_TOKEN}(?:\s+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots ) # Noms en MAJUSCULES dans des listes virgulées (ex: "le Dr X, Y, LAZARO") RE_DR_COMMA_LIST = re.compile( r"(?:Dr\.?|DR\.?|Docteur)\s+" r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+" r"(?:\s*,\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+)+", re.IGNORECASE, ) # Token nom : mot commençant par une majuscule d'au moins 3 lettres _NAME_TOKEN_RE = re.compile(r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']{2,}") SPLITTER = re.compile(r"\s*[:|;\t]\s*") # --- Extraction globale de noms depuis champs structurés --- _UC_NAME_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_EXTRACT_PATIENT = re.compile( r"Patient\(?e?\)?\s*:\s*" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)" r"(?=\s+Né|\s+né|\s+N°|\s*$)", re.MULTILINE, ) # Champs d'identité structurés (documents trackare / DPI) RE_EXTRACT_NOM_NAISSANCE = re.compile( r"Nom\s+de\s+naissance\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+IPP|\s*$)", re.MULTILINE, ) RE_EXTRACT_NOM_PRENOM = re.compile( r"Nom\s+et\s+Pr[ée]nom\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+Date|\s+Né|\s*$)", re.MULTILINE, ) RE_EXTRACT_LIEU_NAISSANCE = re.compile( r"Lieu\s+de\s+naissance\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", re.MULTILINE, ) RE_EXTRACT_VILLE_RESIDENCE = re.compile( r"Ville\s+de\s+r[ée]sidence\s*:\s*" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", re.MULTILINE, ) # Contacts structurés : Conjoint/Concubin/Epoux/Epouse/Parent + NOM PRENOM RE_EXTRACT_CONTACT = re.compile( r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+))?", ) RE_EXTRACT_REDIGE = re.compile( r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) # Token nom composé : JEAN-PIERRE, CAZELLES-BOUDIER, etc. _UC_COMPOUND = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}(?:-[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*" RE_EXTRACT_MME_MR = re.compile( r"(?:MME|Mme|Madame|Monsieur|Mr?\.?)\s+" r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" rf"((?:{_UC_COMPOUND})(?:\s+(?:{_UC_COMPOUND}))*)", ) _INITIAL_OPT = r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" RE_EXTRACT_DR_DEST = re.compile( r"(?:DR\.?|Dr\.?|Docteur)\s+" + _INITIAL_OPT + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) # Noms du personnel médical après un rôle : "Aide : Marie-Paule BORDABERRY" RE_EXTRACT_STAFF_ROLE = re.compile( r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier" r"|Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+(?:\s*-\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+)?\s+)?" r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}[\-]?)(?:[\s\-]+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)", ) # "Pr DUVAL", "Pr. J.-M. DUVAL", "Professeur DUVAL" RE_EXTRACT_PR = re.compile( r"(?:Pr\.?|Professeur)\s+" + _INITIAL_OPT + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) CID_PATTERN = re.compile(r"\(cid:\d+\)") # --- Nouvelles regex : dates, adresses, âges, dossiers --- _MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)" RE_DATE_NAISSANCE = re.compile( r"(?:n[ée]+\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*" r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})", re.IGNORECASE, ) RE_DATE = re.compile( r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b" r"|" r"\b(\d{1,2})\s+" + _MOIS_FR + r"\s+(\d{4})\b", re.IGNORECASE, ) RE_ADRESSE = re.compile( r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*" r"(?:rue|avenue|av\.?|boulevard|bd\.?|place|chemin|all[ée]e|impasse|route|cours|passage|square|r[ée]sidence)" r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}", re.IGNORECASE, ) RE_CODE_POSTAL = re.compile( r"(?:(?:[Cc]ode\s*[Pp]ostal|CP)\s*[:\-]?\s*(\d{5}))" r"|" # 5 chiffres + nom de ville (Title Case ou MAJUSCULES), pas précédé d'un chiffre (évite RPPS) # Exclure les unités médicales (UI, mg, ml, etc.) via negative lookahead r"(?:(? Dict[str, Any]: cfg = DEFAULTS_CFG.copy() if config_path and config_path.exists() and yaml is not None: try: user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} for k, v in user.items(): cfg[k] = v except Exception: pass return cfg # ----------------- Extraction ----------------- def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]: """Extraction texte multi-passes avec fallback OCR (docTR). Retourne (pages_text, tables_lines, ocr_used). """ pages_text: List[str] = [] tables_lines: List[List[str]] = [] ocr_used = False with pdfplumber.open(pdf_path) as pdf: for p in pdf.pages: t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" pages_text.append(t) rows: List[str] = [] try: tables = p.extract_tables() for tbl in tables or []: for row in tbl: clean = [c if c is not None else "" for c in row] rows.append("\t".join(clean).strip()) except Exception: pass tables_lines.append(rows) total_chars = sum(len(x or "") for x in pages_text) need_fallback = total_chars < 500 if not need_fallback: need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text) if need_fallback: text_all = pdfminer_extract_text( str(pdf_path), laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5), ) split = [x for x in text_all.split("\f") if x] if split: pages_text = split # 3e passe PyMuPDF si toujours pauvre/cid total_chars = sum(len(x or "") for x in pages_text) if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None: try: doc = fitz.open(str(pdf_path)) pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))] doc.close() except Exception: pass # 4e passe : OCR docTR si toujours très peu de texte (PDF scanné) total_chars = sum(len(x or "") for x in pages_text) if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None: try: model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True) doc = fitz.open(str(pdf_path)) ocr_pages: List[str] = [] for i in range(len(doc)): pix = doc[i].get_pixmap(dpi=300) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) import numpy as np result = model([np.array(img)]) page_text = "" for block in result.pages[0].blocks: for line in block.lines: words = [w.value for w in line.words] page_text += " ".join(words) + "\n" ocr_pages.append(page_text) doc.close() if sum(len(p) for p in ocr_pages) > total_chars: pages_text = ocr_pages ocr_used = True except Exception: pass return pages_text, tables_lines, ocr_used # Alias pour compatibilité ascendante def extract_text_three_passes(pdf_path: Path): pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path) return pages_text, tables_lines # ----------------- Helpers ----------------- def _compile_user_regex(pattern: str, flags_list: List[str]): flags = 0 for f in flags_list or []: u = f.upper() if u == "IGNORECASE": flags |= re.IGNORECASE if u == "MULTILINE": flags |= re.MULTILINE if u == "DOTALL": flags |= re.DOTALL return re.compile(pattern, flags) def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: for ov in cfg.get("regex_overrides", []) or []: pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override") flags_list = ov.get("flags", []) try: rx = _compile_user_regex(pattern, flags_list) except Exception: continue def _rep(m: re.Match): audit.append(PiiHit(page_idx, name, m.group(0), placeholder)) return placeholder line = rx.sub(_rep, line) # force-mask literals for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []): if not term: continue word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE) if word_rx.search(line): audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"])) line = word_rx.sub(PLACEHOLDERS["MASK"], line) # force-mask regex for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []): try: rx = re.compile(pat, re.IGNORECASE) except Exception: continue if rx.search(line): audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"])) line = rx.sub(PLACEHOLDERS["MASK"], line) return line def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str: m = RE_FINESS.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"])) return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line) m = RE_OGC.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"])) return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line) m = RE_IPP.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"])) return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line) m = RE_RPPS.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "RPPS", val, PLACEHOLDERS["RPPS"])) return RE_RPPS.sub(lambda _: f"RPPS : {PLACEHOLDERS['RPPS']}", line) return line def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: # user overrides & force-masks d'abord line = _apply_overrides(line, audit, page_idx, cfg) # EMAIL def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] line = RE_EMAIL.sub(_repl_email, line) # TEL def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] line = RE_TEL.sub(_repl_tel, line) # IBAN def _repl_iban(m: re.Match) -> str: audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"])) return PLACEHOLDERS["IBAN"] line = RE_IBAN.sub(_repl_iban, line) # NIR (avec validation clé modulo 97) def _repl_nir(m: re.Match) -> str: raw = m.group(0) if not validate_nir(raw): return raw # faux positif, on ne masque pas audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"])) return PLACEHOLDERS["NIR"] line = RE_NIR.sub(_repl_nir, line) # DATE_NAISSANCE (plus spécifique, avant DATE générique) def _repl_date_naissance(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"])) return PLACEHOLDERS["DATE_NAISSANCE"] line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line) # DATE générique — désactivé : seules les dates de naissance sont masquées # def _repl_date(m: re.Match) -> str: # audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"])) # return PLACEHOLDERS["DATE"] # line = RE_DATE.sub(_repl_date, line) # ADRESSE def _repl_adresse(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) return PLACEHOLDERS["ADRESSE"] line = RE_ADRESSE.sub(_repl_adresse, line) # BOITE POSTALE (BP) def _repl_bp(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) return PLACEHOLDERS["ADRESSE"] line = RE_BP.sub(_repl_bp, line) # CODE_POSTAL def _repl_code_postal(m: re.Match) -> str: audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"])) return PLACEHOLDERS["CODE_POSTAL"] line = RE_CODE_POSTAL.sub(_repl_code_postal, line) # AGE def _repl_age(m: re.Match) -> str: audit.append(PiiHit(page_idx, "AGE", m.group(0), PLACEHOLDERS["AGE"])) return PLACEHOLDERS["AGE"] line = RE_AGE.sub(_repl_age, line) # NUMERO DOSSIER / NDA def _repl_dossier(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DOSSIER", m.group(0), PLACEHOLDERS["DOSSIER"])) return PLACEHOLDERS["DOSSIER"] line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line) # N° EPISODE def _repl_episode(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"])) return PLACEHOLDERS["EPISODE"] line = RE_EPISODE.sub(_repl_episode, line) # Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.) def _repl_etab(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["ETAB"])) return PLACEHOLDERS["ETAB"] line = RE_ETABLISSEMENT.sub(_repl_etab, line) line = RE_HOPITAL_VILLE.sub(_repl_etab, line) # Champs structurés : Lieu de naissance, Ville de résidence (masquage direct, sans filtre stop words) _re_lieu = re.compile(r"(Lieu\s+de\s+naissance\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") def _repl_lieu(m: re.Match) -> str: audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) return m.group(1) + PLACEHOLDERS["VILLE"] line = _re_lieu.sub(_repl_lieu, line) _re_ville_res = re.compile(r"(Ville\s+de\s+r[ée]sidence\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") def _repl_ville_res(m: re.Match) -> str: audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) return m.group(1) + PLACEHOLDERS["VILLE"] line = _re_ville_res.sub(_repl_ville_res, line) # PERSON uppercase avec contexte, whitelist/acronymes courts wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) _stop_rx = re.compile(_MEDICAL_STOP_WORDS, re.IGNORECASE) def _clean_name_span(span: str) -> str: """Tronque le span au premier mot médical/stop word.""" tokens = span.split() clean = [] for t in tokens: if _stop_rx.fullmatch(t): break clean.append(t) return " ".join(clean).strip(" .-'") def _repl_person_ctx(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw # Tronquer avant les mots médicaux cleaned = _clean_name_span(span) if not cleaned: return raw tokens = [t for t in cleaned.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw audit.append(PiiHit(page_idx, "NOM", cleaned, PLACEHOLDERS["NOM"])) return raw.replace(cleaned, PLACEHOLDERS["NOM"]) line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line) # Passe supplémentaire : noms dans des listes virgulées après "Dr" # ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom for m in RE_DR_COMMA_LIST.finditer(line): fragment = m.group(0) # Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr") parts = [p.strip() for p in fragment.split(",")] for part in parts: # Extraire les tokens nom de chaque segment for tok in _NAME_TOKEN_RE.findall(part): if tok in wl_sections or len(tok) <= 2: continue if _stop_rx.fullmatch(tok): continue if tok not in line: continue # Vérifier qu'il n'est pas déjà masqué if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}: continue audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"])) line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line) return line def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str: """Masque les TEL et EMAIL même dans la partie 'clé' d'une ligne clé:valeur.""" def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] key = RE_TEL.sub(_repl_tel, key) def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] key = RE_EMAIL.sub(_repl_email, key) return key def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: line = _mask_admin_label(line, audit, page_idx) parts = SPLITTER.split(line, maxsplit=1) if len(parts) == 2: key, value = parts masked_key = _mask_critical_in_key(key, audit, page_idx) masked_val = _mask_line_by_regex(value, audit, page_idx, cfg) return f"{masked_key.strip()} : {masked_val.strip()}" else: return _mask_line_by_regex(line, audit, page_idx, cfg) # ----------------- Extraction globale de noms ----------------- def _is_trackare_document(text: str) -> bool: """Détecte si le document est un export Trackare/TrakCare (DPI structuré).""" markers = ["Détails des patients", "Nom de naissance", "Dossier Patient"] t = text[:3000].lower() return sum(1 for m in markers if m.lower() in t) >= 2 def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: """Parse les champs structurés d'un document Trackare pour extraire les PII. Retourne (name_tokens, pii_hits) avec les noms à masquer et les hits additionnels.""" names: set = set() hits: List[PiiHit] = [] def _add_name(s: str): for tok in s.split(): tok = tok.strip(" .-'(),") if len(tok) >= 2 and tok[0].isupper(): names.add(tok) # --- Identité patient --- # Nom de naissance: DIEGO (peut apparaître 2x : en-tête + récap tabulaire) for m in re.finditer(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Nom et Prénom: DIEGO PATRICIA for m in re.finditer(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # Lieu de naissance: BAYONNE → masquer comme VILLE for m in re.finditer(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) # Ville de résidence: TARNOS → masquer comme VILLE for m in re.finditer(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) names.add(val) # Code Postal (toutes occurrences) for m in re.finditer(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text): hits.append(PiiHit(-1, "CODE_POSTAL", m.group(1), PLACEHOLDERS["CODE_POSTAL"])) # N° épisode (= NDA, identifiant de séjour) for m in re.finditer(r"Episode\s*N[o°.]?\s*\.?\s*:\s*(\d{5,})", full_text): hits.append(PiiHit(-1, "EPISODE", m.group(1), PLACEHOLDERS.get("NDA", "[NDA]"))) # Adresse patient (toutes les occurrences) for m in re.finditer(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE): val = m.group(1).strip() if len(val) > 3: hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"])) # --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." --- for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text): _add_name(m.group(1).strip()) # --- Médecin courant (toutes occurrences) --- for m in re.finditer(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Médecin traitant (ligne après "Nom Adresse Téléphone") --- for m in re.finditer(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE): _add_name(m.group(1).strip()) # --- Contacts structurés --- # Pattern: Relation NOM PRENOM [ADRESSE] [TEL] for m in re.finditer( r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Prescripteurs / Exécutants (trackare) --- for m in re.finditer( r"(?:Prescripteur|Prescrit\s+par|Exécut[ée]\s+par|Réalisé\s+par)\s*:?\s*" r"(?:(?:Dr|Pr)\.?\s+)?" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-']+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Médecins urgences (IAO, prise en charge, décision) --- for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text): _add_name(m.group(1)) for m in re.finditer( r"Médecin\s+de\s+la\s+(?:prise\s+en\s+charge|décision)\s+médicale\s+" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)" r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+))?", full_text, ): _add_name(m.group(1)) if m.group(2): _add_name(m.group(2)) # --- Noms soignants dans les Notes d'évolution / Notes IDE / Notes médicales --- # Pattern: "Note IDE\nPrenom NOM" ou "Note d'évolution\nPrenom NOM" for m in re.finditer( r"Note\s+(?:IDE|AS|d'[ée]volution|m[ée]dicale|kin[ée])\s*\n\s*" r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][a-zéèàùâêîôûäëïöüç]+)\s+" r"([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\-]+)", full_text ): prenom, nom = m.group(1), m.group(2) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) if nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(nom) # --- Noms soignants multi-lignes : "Prénom\nNOM" dans les tableaux de prescriptions/soins --- for m in re.finditer( r'\b([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôûäëïöüç]{2,})\s*\n\s*([A-ZÉÈÀÙÂÊÎÔÛ]{3,})\b', full_text ): prenom, nom = m.group(1), m.group(2) if prenom.lower() not in _MEDICAL_STOP_WORDS_SET and nom.lower() not in _MEDICAL_STOP_WORDS_SET: _add_name(prenom) _add_name(nom) # Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement) city_tokens = {h.original for h in hits if h.kind == "VILLE"} filtered = set() for tok in names: if tok in city_tokens: filtered.add(tok) continue if len(tok) < 3: continue if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue filtered.add(tok) return filtered, hits def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: """Pré-scan du document brut pour extraire les noms de personnes depuis les champs structurés (Patient, Rédigé par, etc.). Retourne un ensemble de tokens (mots) à masquer globalement.""" wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) names: set = set() def _add_tokens(match_str: str): for token in match_str.split(): token = token.strip(" .-'") if len(token) < 3: continue if token.upper() in wl_sections or token in wl_phrases: continue if token.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(token) def _add_tokens_force_first(match_str): """Comme _add_tokens mais force le 1er token (contexte Dr/Mme fort).""" tokens = match_str.split() for i, token in enumerate(tokens): token = token.strip(" .-'") if len(token) < 2: continue if i == 0: # Premier token après Dr/Mme : toujours un nom, bypass stop words if token.upper() not in wl_sections: names.add(token) else: if len(token) < 3: continue if token.upper() in wl_sections or token in wl_phrases: continue if token.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(token) for m in RE_EXTRACT_PATIENT.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_REDIGE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_MME_MR.finditer(full_text): _add_tokens_force_first(m.group(1)) for m in RE_EXTRACT_DR_DEST.finditer(full_text): _add_tokens_force_first(m.group(1)) # Champs d'identité structurés (trackare / DPI) for m in RE_EXTRACT_NOM_NAISSANCE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_NOM_PRENOM.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text): _add_tokens(m.group(1)) # Contacts structurés (conjoint, concubin, etc.) for m in RE_EXTRACT_CONTACT.finditer(full_text): _add_tokens(m.group(1)) if m.group(2): _add_tokens(m.group(2)) # Personnel médical avec rôle (Aide, Cadre Infirmier, Prescripteur, etc.) for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): _add_tokens(m.group(1)) # Pr / Professeur + nom(s) for m in RE_EXTRACT_PR.finditer(full_text): _add_tokens_force_first(m.group(1)) # Extraction des noms dans les listes virgulées après Dr/Docteur # ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé" for m in RE_DR_COMMA_LIST.finditer(full_text): fragment = m.group(0) parts = [p.strip() for p in fragment.split(",")] for part in parts: for tok in _NAME_TOKEN_RE.findall(part): tok = tok.strip(" .-'") if len(tok) < 3: continue if tok.upper() in wl_sections or tok in wl_phrases: continue if tok.lower() in _MEDICAL_STOP_WORDS_SET: continue names.add(tok) # Retirer les sous-parties de noms composés avec tiret # Si "JEAN-PIERRE" est dans names, retirer "JEAN" et "PIERRE" individuels compound_names = {n for n in names if "-" in n} parts_to_remove = set() for compound in compound_names: for part in compound.split("-"): part = part.strip() if len(part) >= 2 and part in names: parts_to_remove.add(part) names -= parts_to_remove return names def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: """Remplace globalement chaque nom extrait dans le texte.""" placeholder = PLACEHOLDERS["NOM"] # Filtrer les stop words et tokens trop courts en dernière ligne de défense safe_names = {n for n in names if len(n) >= 3 and n.lower() not in _MEDICAL_STOP_WORDS_SET} for token in sorted(safe_names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) new_text = [] last_end = 0 for m in pattern.finditer(text): # Ne pas remplacer si déjà dans un placeholder ctx_start = max(0, m.start() - 1) ctx_end = min(len(text), m.end() + 1) if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]: continue # Ne pas remplacer si le token fait partie d'un mot composé (tiret) if m.start() > 0 and text[m.start() - 1] == "-": continue if m.end() < len(text) and text[m.end()] == "-": continue audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder)) new_text.append(text[last_end:m.start()]) new_text.append(placeholder) last_end = m.end() new_text.append(text[last_end:]) text = "".join(new_text) return text # ----------------- Anonymisation (regex) ----------------- def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult: audit: List[PiiHit] = [] # Phase 0 : extraction globale des noms depuis les champs structurés full_raw = "\n".join(pages_text) + "\n" + "\n".join( "\n".join(rows) for rows in tables_lines ) extracted_names = _extract_document_names(full_raw, cfg) # Phase 0b : si document Trackare, extraction renforcée des PII structurés is_trackare = _is_trackare_document(full_raw) if is_trackare: trackare_names, trackare_hits = _extract_trackare_identity(full_raw) extracted_names.update(trackare_names) audit.extend(trackare_hits) # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): lines = [ln for ln in (page_txt or "").splitlines()] masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines] out_pages.append("\n".join(masked)) table_blocks: List[str] = [] for i, rows in enumerate(tables_lines): mbuf: List[str] = [] for r in rows: masked = _kv_value_only_mask(r, audit, i, cfg) mbuf.append(masked) if mbuf: table_blocks.append("\n".join(mbuf)) tables_block = "\n\n".join(table_blocks) text_out = "\f".join(out_pages) # séparateur de pages if tables_block.strip(): text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]" # Phase 2 : application globale des noms extraits (rattrapage) if extracted_names: text_out = _apply_extracted_names(text_out, extracted_names, audit) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit, is_trackare=is_trackare) # ----------------- NER ONNX sur narratif ----------------- def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: # remplace via regex sur les 'word' détectés (approche pragmatique) keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", True)) def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or ""; grp = (e.get("entity_group") or e.get("entity") or "").upper() if not w or "[" in w or "]" in w: # ignore placeholders continue if len(w) <= 2: # trop court continue if grp in {"PER", "PERSON"}: audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"])) out = repl_once(out, w, PLACEHOLDERS["NOM"]) elif grp in {"ORG"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"])) out = repl_once(out, w, PLACEHOLDERS["ETAB"]) elif grp in {"LOC"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_LOC", w, PLACEHOLDERS["VILLE"])) out = repl_once(out, w, PLACEHOLDERS["VILLE"]) elif grp in {"DATE"}: # facultatif : si vous masquez déjà les dates via règles, laissez tel quel continue return out def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optional[NerModelManager], thresholds: Optional[NerThresholds]) -> Tuple[str, List[PiiHit]]: if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) tables: List[Tuple[int,int,str]] = [] keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages (séparées par \f) → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras, thresholds=thresholds) # remplace entités idx = 0 buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_hf(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- NER EDS-Pseudo sur narratif ----------------- def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: """Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key.""" def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or "" mapped_key = e.get("eds_mapped_key", "") if not w or "[" in w or "]" in w: continue if len(w) <= 2: continue # Filtrer les faux positifs NOM/PRENOM (médicaments, acronymes médicaux) label = e.get("entity_group", "EDS") if label in ("NOM", "PRENOM", "HOPITAL", "VILLE"): if w.lower() in _MEDICAL_STOP_WORDS_SET: continue # Filtrer aussi les tokens multi-mots dont un composant est un stop word if " " in w and any(part.lower() in _MEDICAL_STOP_WORDS_SET for part in w.split()): continue # Filtrer les dosages détectés comme noms (ex: "10MG", "300UI", "1 000") if re.match(r"^\d[\d\s]*(?:mg|MG|ml|ML|UI|µg|mcg|g|kg|%)?$", w.strip()): continue # Règles de validation heuristiques par type d'entité if label in ("NOM", "PRENOM"): # Rejeter si le contexte précédent (15 chars) contient un dosage pos = text.find(w) if pos > 0: ctx_before = text[max(0, pos - 15):pos] if re.search(r"\d+\s*(?:mg|UI|ml|µg|mcg)\b", ctx_before, re.IGNORECASE): continue elif label == "HOPITAL": _STRUCTURAL_WORDS = {"SERVICE", "POLE", "PÔLE", "UNITE", "UNITÉ", "SECTEUR"} if len(w) < 5: continue if w.upper() in _STRUCTURAL_WORDS: continue placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder)) out = repl_once(out, w, placeholder) return out def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager") -> Tuple[str, List[PiiHit]]: """Applique EDS-Pseudo sur le narratif (même structure que apply_hf_ner_on_narrative).""" if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras) buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_eds_pseudo(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- Selective safety rescan ----------------- def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: """Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage.""" # enlève TABLES du scope def strip_tables(s: str): kept = [] out = [] i = 0 pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) for m in pattern.finditer(s): out.append(s[i:m.start()]) kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1))) out.append("\x00" * (m.end() - m.start())) i = m.end() out.append(s[i:]) return "".join(out), kept protected, kept = strip_tables(text) # PII critiques (comme avant) protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected) protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected) protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected) # NIR avec validation def _rescan_nir(m: re.Match) -> str: return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0) protected = RE_NIR.sub(_rescan_nir, protected) # Nouvelles regex : dates de naissance, dates, adresses, codes postaux protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected) # protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected) # N° Episode protected = RE_EPISODE.sub(PLACEHOLDERS["EPISODE"], protected) # N° RPPS protected = RE_RPPS.sub(PLACEHOLDERS["RPPS"], protected) # Établissements protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected) protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected) # Personnes contextuelles (avec whitelist) wl_sections = set() wl_phrases = set() if cfg: wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) def _rescan_person(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw tokens = [t for t in span.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw return raw.replace(span, PLACEHOLDERS["NOM"]) protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected) res = list(protected) for start, end, payload in kept: res[start:end] = list(payload) return "".join(res) # ----------------- PDF Redaction ----------------- def _search_whole_word(page, token: str) -> list: """Cherche un token comme mot entier (pas substring) via get_text('words'). Évite les faux positifs de page.search_for() qui fait du substring matching.""" rects = [] token_lower = token.lower().strip() for w in page.get_text("words"): # w = (x0, y0, x1, y1, word, block_no, line_no, word_no) word_text = w[4].strip(".,;:!?()[]{}\"'«»-–—/\\") if word_text.lower() == token_lower: rects.append(fitz.Rect(w[0], w[1], w[2], w[3])) return rects def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)) # index hits par page; page==-1 → rechercher sur toutes pages by_page: Dict[int, List[PiiHit]] = {} for h in audit: by_page.setdefault(h.page, []).append(h) # Kinds à ne pas chercher dans le PDF (dates masquées uniquement dans le texte, # pas dans le PDF où elles rendent les tableaux illisibles) _VECTOR_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} # Kinds dont les tokens courts (< 5) risquent le substring matching via page.search_for() _VECTOR_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} for pno in range(len(doc)): page = doc[pno] hits = by_page.get(pno, []) + by_page.get(-1, []) if not hits: continue for h in hits: token = h.original.strip() if not token: continue # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) if h.kind in _VECTOR_SKIP_KINDS: continue # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") if h.kind in _VECTOR_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: rects = _search_whole_word(page, token) for r in rects: page.add_redact_annot(r, fill=(0,0,0)) continue rects = page.search_for(token) if not rects and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) if compact != token: rects = page.search_for(compact) # Fallback : chercher chaque mot individuellement (uniquement pour les NOM) if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue if not word[0].isupper(): continue rects.extend(page.search_for(word)) for r in rects: page.add_redact_annot(r, fill=(0,0,0)) try: page.apply_redactions() except Exception: pass doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False) doc.close() def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)); out = fitz.open() all_rects: Dict[int, List["fitz.Rect"]] = {} for pno in range(len(doc)): page = doc[pno] rects = [] _RASTER_SKIP_KINDS = {"EDS_DATE", "EDS_DATE_NAISSANCE", "EDS_SECU", "EDS_TEL"} _RASTER_SHORT_TOKEN_KINDS = {"NOM_GLOBAL", "NOM_EXTRACTED", "EDS_NOM", "EDS_PRENOM", "EDS_HOPITAL", "EDS_VILLE", "ETAB", "ETAB_GLOBAL"} hits = [x for x in audit if x.page in {pno, -1}] for h in hits: token = h.original.strip() if not token: continue # Sauter toutes les dates EDS (masquées dans le texte, pas dans le PDF) if h.kind in _RASTER_SKIP_KINDS: continue # Tokens NOM courts (< 5 chars) : matching par mots entiers pour éviter # les faux positifs substring ("AXa" dans "laxatifs", "SER" dans "Observations") if h.kind in _RASTER_SHORT_TOKEN_KINDS and len(token) < 5: if token.lower() not in _MEDICAL_STOP_WORDS_SET: rects.extend(_search_whole_word(page, token)) continue found = page.search_for(token) if not found and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) found = page.search_for(compact) # Fallback : si la chaîne complète n'est pas trouvée, # chercher chaque mot individuellement (uniquement pour les NOM) if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: for word in token.split(): word = word.strip(" .-'") if len(word) < 5 or word.lower() in _MEDICAL_STOP_WORDS_SET: continue # Ne garder que les mots qui ressemblent à des noms propres if not word[0].isupper(): continue found.extend(page.search_for(word)) rects.extend(found) all_rects[pno] = rects for pno in range(len(doc)): src = doc[pno]; rect = src.rect zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom) pix = src.get_pixmap(matrix=mat, annots=False) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) draw = ImageDraw.Draw(img) for r in all_rects.get(pno, []): draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0)) # Incrustation OGC en haut à droite if ogc_label: from PIL import ImageFont font_size = int(14 * zoom) try: font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) except Exception: font = ImageFont.load_default() text = f"OGC: {ogc_label}" bbox = draw.textbbox((0, 0), text, font=font) tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] margin = int(10 * zoom) x = img.width - tw - margin y = margin # Fond blanc + texte noir draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255)) draw.text((x, y), text, fill=(0, 0, 0), font=font) buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0) dst = out.new_page(width=rect.width, height=rect.height) dst.insert_image(rect, stream=buf.getvalue()) out.save(str(out_pdf), deflate=True, garbage=4, clean=True) out.close(); doc.close() # ----------------- Orchestration ----------------- def process_pdf( pdf_path: Path, out_dir: Path, make_vector_redaction: bool = True, also_make_raster_burn: bool = False, config_path: Optional[Path] = None, use_hf: bool = False, ner_manager=None, ner_thresholds=None, ogc_label: Optional[str] = None, ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path) # 1) Regex rules anon = anonymise_document_regex(pages_text, tables_lines, cfg) # 2) NER (optionnel) — sur le narratif final_text = anon.text_out hf_hits: List[PiiHit] = [] if use_hf and ner_manager is not None and ner_manager.is_loaded(): # Détecter le type de manager et appeler la bonne fonction if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager): final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager) else: final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds) anon.audit.extend(hf_hits) # 3) Rescan selectif final_text = selective_rescan(final_text, cfg=cfg) # 3b) Nettoyage post-masquage : codes postaux orphelins (5 chiffres collés à un placeholder) # et téléphones fragmentés sur plusieurs lignes _re_cp_orphan = re.compile(r"(\[(?:ADRESSE|NOM|VILLE)\])\s*(\d{5})\b") def _clean_cp_orphan(m): anon.audit.append(PiiHit(-1, "CODE_POSTAL", m.group(2), PLACEHOLDERS["CODE_POSTAL"])) return m.group(1) + PLACEHOLDERS["CODE_POSTAL"] final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text) # Téléphones fragmentés : "0X XX XX XX\nXX" coupé en fin de ligne (ligne suivante immédiate) _re_tel_frag = re.compile(r"((?:\+33\s?|0)\d(?:[ .-]?\d){6,7})\s*\n\s*(\d{2}(?!\d))") def _clean_tel_frag(m): full = m.group(1).replace(" ", "").replace(".", "").replace("-", "") + m.group(2) if len(full.replace("+33", "0")) == 10: anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] + "\n" return m.group(0) final_text = _re_tel_frag.sub(_clean_tel_frag, final_text) # Téléphones incomplets en fin de ligne (8 ou 9 chiffres au format 0X XX XX XX) : masquer la partie visible _re_tel_partial = re.compile(r"(?= 2 and part in _global_name_tokens: _parts_to_drop.add(part) _global_name_tokens -= _parts_to_drop for token in _global_name_tokens: anon.audit.append(PiiHit(page=-1, kind="NOM_GLOBAL", original=token, placeholder=PLACEHOLDERS["NOM"])) # 4b) TEL, EMAIL, ADRESSE, CODE_POSTAL : propager les valeurs uniques sur toutes les pages _global_pii: Dict[str, set] = {} for h in anon.audit: if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB"}: _global_pii.setdefault(h.kind, set()).add(h.original.strip()) for kind, values in _global_pii.items(): placeholder = PLACEHOLDERS.get(kind, PLACEHOLDERS["MASK"]) for val in values: anon.audit.append(PiiHit(page=-1, kind=f"{kind}_GLOBAL", original=val, placeholder=placeholder)) # 4e) Appliquer les tokens globaux sur le texte pseudonymisé _GLOBAL_SKIP_KINDS = {"EDS_DATE_GLOBAL", "DATE_NAISSANCE_GLOBAL"} for h in anon.audit: if h.page != -1: continue if not (h.kind == "NOM_GLOBAL" or h.kind.endswith("_GLOBAL")): continue if h.kind in _GLOBAL_SKIP_KINDS: continue token = h.original.strip() if not token or len(token) < 3: continue # Garde trackare : NOM_GLOBAL très court (<=3) risque de masquer des codes diagnostics if anon.is_trackare and h.kind == "NOM_GLOBAL" and len(token) <= 3: continue try: final_text = re.sub(rf"\b{re.escape(token)}\b", h.placeholder, final_text) except re.error: final_text = final_text.replace(token, h.placeholder) # Log OCR dans l'audit if ocr_used: anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder="")) # Sauvegardes base = pdf_path.stem txt_path = out_dir / f"{base}.pseudonymise.txt" audit_path = out_dir / f"{base}.audit.jsonl" txt_path.write_text(final_text, encoding="utf-8") with audit_path.open("w", encoding="utf-8") as f: for hit in anon.audit: f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n") outputs = {"text": str(txt_path), "audit": str(audit_path)} # PDFs if make_vector_redaction and fitz is not None: vec_path = out_dir / f"{base}.redacted_vector.pdf" try: redact_pdf_vector(pdf_path, anon.audit, vec_path) outputs["pdf_vector"] = str(vec_path) except Exception: pass if also_make_raster_burn and fitz is not None: ras_path = out_dir / f"{base}.redacted_raster.pdf" redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label) outputs["pdf_raster"] = str(ras_path) return outputs if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)") ap.add_argument("pdf", type=str) ap.add_argument("--out", type=str, default="out") ap.add_argument("--no-vector", action="store_true") ap.add_argument("--raster", action="store_true") ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml"))) ap.add_argument("--hf", action="store_true", help="Activer NER ONNX sur narratif (nécessite ner_manager_onnx)") ap.add_argument("--model", type=str, default="cmarkea/distilcamembert-base-ner") args = ap.parse_args() manager = None if args.hf and NerModelManager is not None: manager = NerModelManager(cache_dir=Path("models")) manager.load(args.model) outs = process_pdf( Path(args.pdf), Path(args.out), make_vector_redaction=not args.no_vector, also_make_raster_burn=args.raster, config_path=Path(args.config), use_hf=bool(args.hf), ner_manager=manager, ner_thresholds=NerThresholds() if NerThresholds else None, ) print(json.dumps(outs, indent=2, ensure_ascii=False))