From 5ed2312d93373ce1b7cccbeca2cd5695a9f2bbeb Mon Sep 17 00:00:00 2001 From: Domi31tls Date: Thu, 26 Feb 2026 00:25:18 +0100 Subject: [PATCH] =?UTF-8?q?Am=C3=A9lioration=20majeure=20de=20l'anonymisat?= =?UTF-8?q?ion=20regex=20:=20trackare,=20noms=20compos=C3=A9s,=20faux=20po?= =?UTF-8?q?sitifs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Parseur trackare spécifique (détection par contenu, extraction structurée des PII) - Support format "Dr X. NOM" et "Mme X. NOM" (initiales + noms composés avec tiret) - Détection noms personnel médical (Aide, Cadre Infirmier, etc.) - Masquage RPPS, établissements (EHPAD/SSR/USLD standalone), lieux de naissance - Stop words médicaux enrichis (~270 entrées : DCI, spécialités, termes contextuels) - Détection compagnon (noms adjacents à des noms connus dans le texte brut) - Protection noms composés (JEAN-PIERRE traité comme un tout, pas JEAN + PIERRE) - Nettoyage codes postaux orphelins, téléphones fragmentés/partiels - Désactivation masquage dates génériques, AGE avec contexte obligatoire - GUI : extraction OGC depuis le nom du répertoire parent, incrustation sur les pages Co-Authored-By: Claude Opus 4.6 --- Pseudonymisation_Gui_V5.py | 45 +- anonymizer_core_refactored_onnx.py | 677 ++++++++++++++++++++++++++++- 2 files changed, 679 insertions(+), 43 deletions(-) diff --git a/Pseudonymisation_Gui_V5.py b/Pseudonymisation_Gui_V5.py index a29e237..8c39aa6 100644 --- a/Pseudonymisation_Gui_V5.py +++ b/Pseudonymisation_Gui_V5.py @@ -399,7 +399,7 @@ class App: self._folder_text_lbl = tk.Label( self._folder_inner, - text="Cliquez pour choisir un dossier contenant vos PDF", + text="Cliquez pour choisir un dossier (tous les PDF seront recherchés récursivement)", font=self._f_body, bg=CLR_CARD_BG, fg=CLR_TEXT_SECONDARY, ) self._folder_text_lbl.pack(pady=(4, 0)) @@ -427,14 +427,15 @@ class App: tk.Label( info_inner, - text="Les deux formats sont générés automatiquement :", + text="Paramètres de traitement :", font=self._f_body_bold, bg=CLR_BLUE_LIGHT, fg=CLR_TEXT, anchor="w", ).pack(fill=tk.X) tk.Label( info_inner, - text=("\u2022 PDF Image — sécurité maximale, chaque page en image, aucun texte résiduel\n" - "\u2022 PDF Anonymisé — structure préservée comme l'original, fichier léger"), + text=("\u2022 Recherche récursive de tous les PDF dans les sous-dossiers\n" + "\u2022 Sortie PDF Image (raster) — sécurité maximale, aucun texte résiduel\n" + "\u2022 Résultats dans le dossier « anonymise/ » à la racine"), font=self._f_card_desc, bg=CLR_BLUE_LIGHT, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, ).pack(fill=tk.X, pady=(4, 0)) @@ -589,10 +590,10 @@ class App: if not folder: return - # Compter les PDF + # Compter les PDF (récursif) pdf_count = 0 try: - pdf_count = len([p for p in Path(folder).glob("*.pdf") if p.is_file()]) + pdf_count = len([p for p in Path(folder).rglob("*.pdf") if p.is_file()]) except Exception: pass @@ -620,7 +621,7 @@ class App: bg=CLR_CARD_BG, fg=CLR_TEXT, anchor="w", ).pack(fill=tk.X) - suffix = "PDF trouvé" if pdf_count <= 1 else "PDF trouvés" + suffix = "PDF trouvé (récursif)" if pdf_count <= 1 else "PDF trouvés (récursif)" tk.Label( info_frame, text=f"{pdf_count} {suffix}", font=self._f_small, bg=CLR_CARD_BG, fg=CLR_TEXT_SECONDARY, anchor="w", @@ -648,11 +649,11 @@ class App: ) return - pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()]) + pdfs = sorted([p for p in folder.rglob("*.pdf") if p.is_file()]) if not pdfs: messagebox.showwarning( "Aucun PDF", - "Le dossier sélectionné ne contient aucun fichier PDF.", + "Aucun fichier PDF trouvé\n(recherche récursive dans les sous-dossiers).", ) return @@ -663,7 +664,7 @@ class App: def _worker(self, folder: Path, pdfs: List[Path]): try: - outdir = folder / "pseudonymise" + outdir = folder / "anonymise" outdir.mkdir(exist_ok=True) ok = ko = 0 global_counts: Dict[str, int] = {} @@ -681,15 +682,21 @@ class App: if use_ner and NerThresholds and not (EdsPseudoManager and isinstance(active, EdsPseudoManager)): thresholds = NerThresholds(self.th_per, self.th_org, self.th_loc, 0.85) + # Extraire le numéro OGC du nom du répertoire parent + # Ex: "257_23209962" → OGC = "257" + parent_name = pdf.parent.name + ogc = parent_name.split("_")[0] if "_" in parent_name else None + outputs = core.process_pdf( pdf_path=pdf, out_dir=outdir, - make_vector_redaction=True, + make_vector_redaction=False, also_make_raster_burn=True, config_path=Path(self.cfg_path.get()), use_hf=use_ner, ner_manager=active, ner_thresholds=thresholds, + ogc_label=ogc, ) self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\u2713 {pdf.name}")) for k, v in outputs.items(): @@ -822,15 +829,15 @@ class App: def _show_help(self): messagebox.showinfo( "Comment ça marche ?", - "1) Choisissez le dossier contenant vos fichiers PDF.\n\n" + "1) Choisissez le dossier racine contenant vos fichiers PDF.\n\n" "2) Cliquez sur « Lancer la pseudonymisation ».\n\n" - "Deux fichiers sont générés pour chaque PDF :\n" - " \u2022 PDF Image : chaque page devient une image avec les\n" - " données masquées. Sécurité maximale.\n" - " \u2022 PDF Anonymisé : structure préservée comme l'original,\n" - " fichier léger et texte sélectionnable.\n\n" - "Les résultats apparaissent dans un sous-dossier\n" - "« pseudonymise » à côté de vos originaux.", + "Tous les fichiers PDF sont traités\n" + "(recherche récursive dans les sous-dossiers).\n\n" + "Un PDF Image (raster) est généré pour chaque fichier :\n" + "chaque page devient une image avec les données masquées.\n" + "Sécurité maximale, aucun texte résiduel.\n\n" + "Les résultats sont regroupés à plat dans le dossier\n" + "« anonymise/ » à la racine du dossier sélectionné.", ) # --------------------------------------------------------------- diff --git a/anonymizer_core_refactored_onnx.py b/anonymizer_core_refactored_onnx.py index 3723f3a..72925e6 100644 --- a/anonymizer_core_refactored_onnx.py +++ b/anonymizer_core_refactored_onnx.py @@ -103,6 +103,8 @@ PLACEHOLDERS = { "AGE": "[AGE]", "DOSSIER": "[DOSSIER]", "NDA": "[NDA]", + "EPISODE": "[EPISODE]", + "RPPS": "[RPPS]", } CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"} @@ -114,6 +116,7 @@ RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b") RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE) RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE) RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE) +RE_RPPS = re.compile(r"\b(?:N°\s*)?RPPS\s*[:\-]?\s*(\d{8,11})\b", re.IGNORECASE) RE_NIR = re.compile( r"\b([12])\s*(\d{2})\s*(0[1-9]|1[0-2]|2[AB])\s*(\d{2,3})\s*(\d{3})\s*(\d{3})\s*(\d{2})\b", re.IGNORECASE, @@ -136,13 +139,155 @@ def validate_nir(nir_raw: str) -> bool: return False return key_int == (97 - (body_int % 97)) +# Mots médicaux/techniques/courants qui ne sont pas des noms de personnes +_MEDICAL_STOP_WORDS_SET = { + # Mots français courants (déterminants, prépositions, adverbes, etc.) + "pas", "mon", "bien", "ancien", "ancienne", "bon", "bonne", "tout", "tous", + "mais", "donc", "car", "que", "qui", "avec", "dans", "pour", "sur", "par", + "les", "des", "une", "est", "son", "ses", "nos", "aux", "cette", "ces", + "cher", "chez", "entre", "sans", "sous", "vers", "selon", "après", "avant", + "puis", "aussi", "très", "plus", "moins", "peu", "non", "oui", "quelques", + "mise", "début", "fin", "suite", "fait", "lieu", "cas", "jour", "jours", + "semaine", "semaines", "mois", "temps", "place", "nouvelle", "nouveau", + "franche", "légère", "quelque", "depuis", "comme", "encore", "votre", + "date", "note", "notes", "nom", "heure", "matin", "soir", "midi", + "signé", "réalisé", "courrier", "cabinet", "rue", + # Verbes / participes courants + "remontée", "associée", "réalisée", "débuté", "prolongé", "prolongée", + "prescrit", "prescrite", "présente", "présent", "absente", "absent", + "reprise", "introduction", "arrêt", "relais", + # Titres / rôles hospitaliers + "chef", "assistant", "assistante", "praticien", "praticienne", + "docteur", "professeur", "hospitalier", "hospitalière", "hospitaliers", + "spécialiste", "contractuel", "contractuelle", "titulaire", + "confrère", "consoeur", "coordonnateur", "coordonnatrice", + "médecin", "médical", "infirmier", "infirmière", + "praticiens", "patient", "patiente", + # Structure hospitalière + "service", "pôle", "clinique", "consultation", "secrétariat", + "hôpital", "hôpitaux", "centre", "établissement", "polyclinique", + # Villes / géographie (pas des noms de personnes) + "bordeaux", "bayonne", "paris", "lyon", "lille", "marseille", + "toulouse", "nantes", "montpellier", "pessac", "biarritz", "soustons", + "basque", "basques", "sud", "côte", + # Médicaments génériques et spécialités (DCI + noms commerciaux) + "colchicine", "aspirine", "cortancyl", "bisoprolol", "entresto", + "methotrexate", "eplerenone", "speciafoldine", "prednisone", + "corticoïdes", "cortisone", + "paracetamol", "metformine", "solupred", "novorapid", "abasaglar", + "lovenox", "methylprednisolone", "potassium", "humalog", "furosemide", + "insuline", "trulicity", "forxiga", "atorvastatine", "amlodipine", + "ondansetron", "eliquis", "nebivolol", "gaviscon", "loxen", + "morphine", "oxycodone", "kardegic", "tercian", "zopiclone", + "seresta", "tramadol", "alprazolam", "forlax", "levothyrox", + "bromazepam", "gliclazide", "zymad", "pravastatine", "spiriva", + "quetiapine", "sertraline", "crestor", "lercanidipine", "amoxicilline", + "opocalcium", "ferinject", "candesartan", "ceftriaxone", "calcidose", + "laroxyl", "brintellix", "ketoprofene", "adrenaline", "exacyl", + "terbutaline", "ipratropium", "actiskenan", "vialebex", "oxynormoro", + "lansoprazole", "perindopril", "sodium", "velmetia", + "doliprane", "dafalgan", "efferalgan", "spasfon", "vogalene", + "augmentin", "inexium", "omeprazole", "pantoprazole", "esomeprazole", + "ramipril", "lisinopril", "enalapril", "losartan", "valsartan", + "irbesartan", "olmesartan", "telmisartan", "hydrochlorothiazide", + "spironolactone", "furosemide", "lasilix", "aldactone", + "tahor", "crestor", "rosuvastatine", "simvastatine", "fluvastatine", + "xarelto", "pradaxa", "apixaban", "rivaroxaban", "dabigatran", + "plavix", "clopidogrel", "ticagrelor", "brilique", + "ventoline", "seretide", "symbicort", "salmeterol", "fluticasone", + "salbutamol", "tiotropium", "budesonide", "beclometasone", + "oxycodone", "oxynorm", "skenan", "actiskenan", "fentanyl", + "nubain", "nalbuphine", "nefopam", "acupan", "profenid", + "ibuprofene", "diclofenac", "naproxene", "celecoxib", + "gabapentine", "pregabaline", "lyrica", "neurontin", + "amitriptyline", "duloxetine", "venlafaxine", "fluoxetine", + "paroxetine", "escitalopram", "citalopram", "mirtazapine", + "olanzapine", "risperidone", "aripiprazole", "haloperidol", + "loxapine", "cyamemazine", "diazepam", "oxazepam", "lorazepam", + "clonazepam", "midazolam", "hydroxyzine", "atarax", "melatonine", + "stilnox", "zolpidem", "imovane", + "levothyroxine", "metformine", "glimepiride", "sitagliptine", + "januvia", "jardiance", "empagliflozine", "dapagliflozine", + "ozempic", "semaglutide", "dulaglutide", "liraglutide", "victoza", + "heparine", "enoxaparine", "tinzaparine", "innohep", + "warfarine", "coumadine", "fluindione", "previscan", + "ciprofloxacine", "levofloxacine", "ofloxacine", "metronidazole", + "vancomycine", "gentamicine", "tazocilline", "piperacilline", + "meropenem", "imipenem", "clindamycine", "doxycycline", + "azithromycine", "clarithromycine", "cotrimoxazole", "bactrim", + "polyionique", "propranolol", "apidra", "solostar", + # Suffixes laboratoires pharmaceutiques + "arw", "myl", "myp", "arg", "teva", "bga", "agt", + # Formes galéniques / voies d'administration + "cpr", "sachet", "orale", "oral", "sol", "buv", "stylo", "flexpen", + "flestouch", "kwikpen", "inj", "susp", "gelule", "comprime", + "unidose", "perf", "inh", "seringue", "aerosol", "sach", "pdr", + "orodisp", "capsule", "patch", "suppositoire", "gouttes", + # Termes de prescription / pharmacie + "prescription", "prescriptions", "dose", "fréquence", "statut", + "technique", "capteur", "bandelettes", "glycemiques", "glycemique", + "lancettes", "aiguilles", "fines", "micro", "pompe", "réserve", + "glycemie", "capillaire", + # Termes médicaux / cliniques + "myocardite", "myosite", "corticothérapie", "biopsie", "pathologie", + "dysimmunitaire", "récidive", "récidivante", "traitement", "diagnostic", + "antécédents", "examen", "bilan", "résultats", "analyse", + "interne", "externe", "médecine", "chirurgie", "rhumatologie", + "dermatologie", "immunologie", "cardiologie", "pneumologie", + "neurologie", "gynécologie", "radiologie", "sénologie", + "douleur", "douleurs", "douloureux", "musculaire", "musculaires", + "thoracique", "thoraciques", "membres", "supérieurs", "inférieurs", + "normale", "normaux", "habituelle", "habituelles", + "synthèse", "hospitalisation", "syndrome", "vaccination", "ophtalmo", + "pelvien", "diabétique", "sommeil", "régime", "diet", + "desinfection", "environnement", "identification", "bracelet", + "toilettes", "accompagner", "installer", "transfusion", + "signes", "vitaux", "alimentaire", "avis", "zone", + "calcémie", + # Abréviations médicales + "irm", "ett", "ecg", "mtx", "fevg", "bdc", "crp", "sfu", "hdj", + "bnp", "asat", "alat", "cpk", "ctc", "hba", "hba1c", + "saos", "tsh", "inr", "vgm", "pnn", "plq", "hb", + "poc", "bax", "act", "bic", "cfx", "acc", "ado", "acf", "vfo", + "qvl", "cci", "pse", "pca", "chl", "crt", "bbm", "pds", "ren", + "vit", "zen", + "scanner", "radio", "écho", "échographie", + # Spécialités médicales (éviter faux positifs NOM) + "hépato-gastro-entérologue", "gastro-entérologue", "gastro-entérologie", + "proctologue", "oncologue", "anesthésiste", "pneumologue", "gérontologue", + "cardiologue", "néphrologue", "urologue", "gériatre", + "hépatologue", "endocrinologue", "stomatologue", + # Mots clés de contexte document + "compétences", "maladies", "inflammatoires", "systémiques", "rares", + "fret", "fax", "contexte", "résultat", "resultat", "résultats", "resultats", + "haute", "maison", "aide", "rpps", "poste", "fonct", + "sante", "santé", "etxe", "ttipi", "gastro", "concha", + "endoscopie", "endoscopique", "fibroscopie", + "indication", "conclusion", "technique", "anesthésie", + "digestif", "digestive", "digestives", "nutritive", +} +_MEDICAL_STOP_WORDS = ( + r"(?:" + "|".join(re.escape(w) for w in _MEDICAL_STOP_WORDS_SET) + r")" +) +# Un token de nom : commence par majuscule, lettres/tirets/apostrophes (PAS d'espace ni de point) +_PERSON_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_PERSON_CONTEXT = re.compile( r"(?:(?:Dr\.?|DR\.?|Docteur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" - r"|Nom\s*:\s*|Praticien|Médecin" + r"|Nom\s*:\s*" r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par" r")\s+)" - r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\'.]+)*)" + rf"({_PERSON_TOKEN}(?:\s+{_PERSON_TOKEN}){{0,2}})" # Max 3 mots ) + +# Noms en MAJUSCULES dans des listes virgulées (ex: "le Dr X, Y, LAZARO") +RE_DR_COMMA_LIST = re.compile( + r"(?:Dr\.?|DR\.?|Docteur)\s+" + r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+" + r"(?:\s*,\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+)+", + re.IGNORECASE, +) +# Token nom : mot commençant par une majuscule d'au moins 3 lettres +_NAME_TOKEN_RE = re.compile(r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']{2,}") SPLITTER = re.compile(r"\s*[:|;\t]\s*") # --- Extraction globale de noms depuis champs structurés --- @@ -153,18 +298,56 @@ RE_EXTRACT_PATIENT = re.compile( r"(?=\s+Né|\s+né|\s+N°|\s*$)", re.MULTILINE, ) +# Champs d'identité structurés (documents trackare / DPI) +RE_EXTRACT_NOM_NAISSANCE = re.compile( + r"Nom\s+de\s+naissance\s*:\s*" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+IPP|\s*$)", + re.MULTILINE, +) +RE_EXTRACT_NOM_PRENOM = re.compile( + r"Nom\s+et\s+Pr[ée]nom\s*:\s*" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s+Date|\s+Né|\s*$)", + re.MULTILINE, +) +RE_EXTRACT_LIEU_NAISSANCE = re.compile( + r"Lieu\s+de\s+naissance\s*:\s*" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", + re.MULTILINE, +) +RE_EXTRACT_VILLE_RESIDENCE = re.compile( + r"Ville\s+de\s+r[ée]sidence\s*:\s*" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+?)(?:\s*$)", + re.MULTILINE, +) +# Contacts structurés : Conjoint/Concubin/Epoux/Epouse/Parent + NOM PRENOM +RE_EXTRACT_CONTACT = re.compile( + r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+)" + r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+))?", +) RE_EXTRACT_REDIGE = re.compile( r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) +# Token nom composé : JEAN-PIERRE, CAZELLES-BOUDIER, etc. +_UC_COMPOUND = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}(?:-[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*" RE_EXTRACT_MME_MR = re.compile( - r"(?:MME|Madame|Monsieur|Mr\.?)\s+" - r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)", + r"(?:MME|Mme|Madame|Monsieur|Mr?\.?)\s+" + r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" + rf"((?:{_UC_COMPOUND})(?:\s+(?:{_UC_COMPOUND}))*)", ) +_INITIAL_OPT = r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*(?:-?\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]\.\s*)?)?" RE_EXTRACT_DR_DEST = re.compile( - r"(?:DR\.?|Docteur)\s+" + r"(?:DR\.?|Dr\.?|Docteur)\s+" + + _INITIAL_OPT + rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) +# Noms du personnel médical après un rôle : "Aide : Marie-Paule BORDABERRY" +RE_EXTRACT_STAFF_ROLE = re.compile( + r"(?:Aide|Infirmière?|IDE|IADE|IBODE|ASH?|Cadre\s+Infirmier)\s*:\s*" + r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+(?:\s*-\s*[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][a-zéèàùâêîôûäëïöüç]+)?\s+)?" + r"(?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,}[\-]?)(?:[\s\-]+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)", +) CID_PATTERN = re.compile(r"\(cid:\d+\)") @@ -183,26 +366,46 @@ RE_DATE = re.compile( ) RE_ADRESSE = re.compile( r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*" - r"(?:rue|avenue|av\.|boulevard|bd|place|chemin|allée|impasse|route|cours|passage|square)" + r"(?:rue|avenue|av\.?|boulevard|bd\.?|place|chemin|all[ée]e|impasse|route|cours|passage|square|r[ée]sidence)" r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}", re.IGNORECASE, ) RE_CODE_POSTAL = re.compile( - r"(?:(?:code\s*postal|CP)\s*[:\-]?\s*(\d{5}))" + r"(?:(?:[Cc]ode\s*[Pp]ostal|CP)\s*[:\-]?\s*(\d{5}))" r"|" - r"(?:(\d{5})[ \t]+[A-ZÉÈÀÙ][a-zéèàùâêîôû]+(?:[\s\-][A-ZÉÈÀÙ][a-zéèàùâêîôû]+)*)", + # 5 chiffres + nom de ville (Title Case ou MAJUSCULES), pas précédé d'un chiffre (évite RPPS) + r"(?:(? str: if m: val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"])) return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line) + m = RE_RPPS.search(line) + if m: + val = m.group(1); audit.append(PiiHit(page_idx, "RPPS", val, PLACEHOLDERS["RPPS"])) + return RE_RPPS.sub(lambda _: f"RPPS : {PLACEHOLDERS['RPPS']}", line) return line @@ -403,11 +610,11 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict return PLACEHOLDERS["DATE_NAISSANCE"] line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line) - # DATE générique - def _repl_date(m: re.Match) -> str: - audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"])) - return PLACEHOLDERS["DATE"] - line = RE_DATE.sub(_repl_date, line) + # DATE générique — désactivé : seules les dates de naissance sont masquées + # def _repl_date(m: re.Match) -> str: + # audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"])) + # return PLACEHOLDERS["DATE"] + # line = RE_DATE.sub(_repl_date, line) # ADRESSE def _repl_adresse(m: re.Match) -> str: @@ -415,6 +622,12 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict return PLACEHOLDERS["ADRESSE"] line = RE_ADRESSE.sub(_repl_adresse, line) + # BOITE POSTALE (BP) + def _repl_bp(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) + return PLACEHOLDERS["ADRESSE"] + line = RE_BP.sub(_repl_bp, line) + # CODE_POSTAL def _repl_code_postal(m: re.Match) -> str: audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"])) @@ -433,34 +646,229 @@ def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict return PLACEHOLDERS["DOSSIER"] line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line) + # N° EPISODE + def _repl_episode(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "EPISODE", m.group(0), PLACEHOLDERS["EPISODE"])) + return PLACEHOLDERS["EPISODE"] + line = RE_EPISODE.sub(_repl_episode, line) + + # Établissements de santé (EHPAD Bayonne, SSR La Concha, Hôpital de Bayonne, etc.) + def _repl_etab(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "ETAB", m.group(0), PLACEHOLDERS["ETAB"])) + return PLACEHOLDERS["ETAB"] + line = RE_ETABLISSEMENT.sub(_repl_etab, line) + line = RE_HOPITAL_VILLE.sub(_repl_etab, line) + + # Champs structurés : Lieu de naissance, Ville de résidence (masquage direct, sans filtre stop words) + _re_lieu = re.compile(r"(Lieu\s+de\s+naissance\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") + def _repl_lieu(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) + return m.group(1) + PLACEHOLDERS["VILLE"] + line = _re_lieu.sub(_repl_lieu, line) + + _re_ville_res = re.compile(r"(Ville\s+de\s+r[ée]sidence\s*:\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' ]+)") + def _repl_ville_res(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "VILLE", m.group(2).strip(), PLACEHOLDERS["VILLE"])) + return m.group(1) + PLACEHOLDERS["VILLE"] + line = _re_ville_res.sub(_repl_ville_res, line) + # PERSON uppercase avec contexte, whitelist/acronymes courts wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) + _stop_rx = re.compile(_MEDICAL_STOP_WORDS, re.IGNORECASE) + + def _clean_name_span(span: str) -> str: + """Tronque le span au premier mot médical/stop word.""" + tokens = span.split() + clean = [] + for t in tokens: + if _stop_rx.fullmatch(t): + break + clean.append(t) + return " ".join(clean).strip(" .-'") + def _repl_person_ctx(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw - tokens = [t for t in span.split() if t] + # Tronquer avant les mots médicaux + cleaned = _clean_name_span(span) + if not cleaned: + return raw + tokens = [t for t in cleaned.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw - audit.append(PiiHit(page_idx, "NOM", span, PLACEHOLDERS["NOM"])) - return raw.replace(span, PLACEHOLDERS["NOM"]) # conserve le préfixe Dr/Mme + audit.append(PiiHit(page_idx, "NOM", cleaned, PLACEHOLDERS["NOM"])) + return raw.replace(cleaned, PLACEHOLDERS["NOM"]) line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line) + + # Passe supplémentaire : noms dans des listes virgulées après "Dr" + # ex: "le Dr DUVAL, MACHELART, LAZARO" → masquer chaque nom + for m in RE_DR_COMMA_LIST.finditer(line): + fragment = m.group(0) + # Extraire les segments séparés par des virgules (sauf le premier qui inclut "Dr") + parts = [p.strip() for p in fragment.split(",")] + for part in parts: + # Extraire les tokens nom de chaque segment + for tok in _NAME_TOKEN_RE.findall(part): + if tok in wl_sections or len(tok) <= 2: + continue + if _stop_rx.fullmatch(tok): + continue + if tok not in line: + continue + # Vérifier qu'il n'est pas déjà masqué + if f"[{tok}]" in line or tok in {v for v in PLACEHOLDERS.values()}: + continue + audit.append(PiiHit(page_idx, "NOM", tok, PLACEHOLDERS["NOM"])) + line = re.sub(rf"\b{re.escape(tok)}\b", PLACEHOLDERS["NOM"], line) + return line +def _mask_critical_in_key(key: str, audit: List[PiiHit], page_idx: int) -> str: + """Masque les TEL et EMAIL même dans la partie 'clé' d'une ligne clé:valeur.""" + def _repl_tel(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) + return PLACEHOLDERS["TEL"] + key = RE_TEL.sub(_repl_tel, key) + def _repl_email(m: re.Match) -> str: + audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) + return PLACEHOLDERS["EMAIL"] + key = RE_EMAIL.sub(_repl_email, key) + return key + + def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: line = _mask_admin_label(line, audit, page_idx) parts = SPLITTER.split(line, maxsplit=1) if len(parts) == 2: key, value = parts + masked_key = _mask_critical_in_key(key, audit, page_idx) masked_val = _mask_line_by_regex(value, audit, page_idx, cfg) - return f"{key.strip()} : {masked_val.strip()}" + return f"{masked_key.strip()} : {masked_val.strip()}" else: return _mask_line_by_regex(line, audit, page_idx, cfg) # ----------------- Extraction globale de noms ----------------- +def _is_trackare_document(text: str) -> bool: + """Détecte si le document est un export Trackare/TrakCare (DPI structuré).""" + markers = ["Détails des patients", "Nom de naissance", "Dossier Patient"] + t = text[:3000].lower() + return sum(1 for m in markers if m.lower() in t) >= 2 + + +def _extract_trackare_identity(full_text: str) -> Tuple[set, List[PiiHit]]: + """Parse les champs structurés d'un document Trackare pour extraire les PII. + Retourne (name_tokens, pii_hits) avec les noms à masquer et les hits additionnels.""" + names: set = set() + hits: List[PiiHit] = [] + + def _add_name(s: str): + for tok in s.split(): + tok = tok.strip(" .-'(),") + if len(tok) >= 2 and tok[0].isupper(): + names.add(tok) + + # --- Identité patient --- + # Nom de naissance: DIEGO + m = re.search(r"Nom\s+de\s+naissance\s*:\s*(.+?)(?:\s+IPP\b|\s*$)", full_text, re.MULTILINE) + if m: + _add_name(m.group(1).strip()) + + # Nom et Prénom: DIEGO PATRICIA + m = re.search(r"Nom\s+et\s+Pr[ée]nom\s*:\s*(.+?)(?:\s+Date\s+de\s+naissance|\s*$)", full_text, re.MULTILINE) + if m: + _add_name(m.group(1).strip()) + + # Lieu de naissance: BAYONNE → masquer comme VILLE + m = re.search(r"Lieu\s+de\s+naissance\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE) + if m: + val = m.group(1).strip() + hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) + names.add(val) + + # Ville de résidence: TARNOS → masquer comme VILLE + m = re.search(r"Ville\s+de\s+r[ée]sidence\s*:\s*([A-ZÉÈÀÙÂÊÎÔÛ][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû\s\-']+?)(?:\s*$)", full_text, re.MULTILINE) + if m: + val = m.group(1).strip() + hits.append(PiiHit(-1, "VILLE", val, PLACEHOLDERS["VILLE"])) + names.add(val) + + # Code Postal (seul sur la ligne "Nationalité: FRANCE Code Postal: 40220") + m = re.search(r"[Cc]ode\s*[Pp]ostal\s*:\s*(\d{5})", full_text) + if m: + hits.append(PiiHit(-1, "CODE_POSTAL", m.group(1), PLACEHOLDERS["CODE_POSTAL"])) + + # Adresse patient + m = re.search(r"Adresse\s*:\s*(.+?)(?:\s+Ville\s+de\s+r[ée]sidence|\s*$)", full_text, re.MULTILINE) + if m: + val = m.group(1).strip() + if len(val) > 3: + hits.append(PiiHit(-1, "ADRESSE", val, PLACEHOLDERS["ADRESSE"])) + + # --- Pied de page : "Patient : NOM PRENOM - Date de naissance..." --- + for m in re.finditer(r"Patient\s*:\s*(.+?)\s*-\s*Date\s+de\s+naissance", full_text): + _add_name(m.group(1).strip()) + + # --- Médecin courant --- + m = re.search(r"Médecin\s+courant\s*:\s*(?:DR\.?\s*)?(.+?)(?:\s*$)", full_text, re.MULTILINE) + if m: + _add_name(m.group(1).strip()) + + # --- Médecin traitant (ligne après "Nom Adresse Téléphone") --- + m = re.search(r"Médecin\s+traitant\s*\n.*?Nom\s+Adresse\s+Téléphone\s*\n\s*(?:DR\.?\s*)?(.+?)(?:\d{5}|\s*$)", full_text, re.MULTILINE) + if m: + _add_name(m.group(1).strip()) + + # --- Contacts structurés --- + # Pattern: Relation NOM PRENOM [ADRESSE] [TEL] + for m in re.finditer( + r"(?:Conjoint|Concubin|Epoux|Epouse|Parent|Père|Mère|Fils|Fille|Frère|Soeur|Tuteur)\s+" + r"([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+)" + r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôû][A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûä\-']+))?", + full_text, + ): + _add_name(m.group(1)) + if m.group(2): + _add_name(m.group(2)) + + # --- Médecins urgences (IAO, prise en charge, décision) --- + for m in re.finditer(r"IAO\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)", full_text): + _add_name(m.group(1)) + for m in re.finditer( + r"Médecin\s+de\s+la\s+(?:prise\s+en\s+charge|décision)\s+médicale\s+" + r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+)" + r"(?:\s+([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-]+))?", + full_text, + ): + _add_name(m.group(1)) + if m.group(2): + _add_name(m.group(2)) + + # --- Noms soignants dans les Notes d'évolution --- + # Pattern: "Note d'évolution PRENOM NOM" ou "NOM HH:MM texte..." + for m in re.finditer(r"Note\s+d'[ée]volution\s+([A-ZÉÈÀÙÂÊÎÔÛ][a-zéèàùâêîôû]+)\s+([A-ZÉÈÀÙÂÊÎÔÛ]{2,})", full_text): + _add_name(m.group(1)) + _add_name(m.group(2)) + + # Filtrer les tokens trop courts ou stop words (sauf noms de villes extraits explicitement) + city_tokens = {h.original for h in hits if h.kind == "VILLE"} + filtered = set() + for tok in names: + if tok in city_tokens: + filtered.add(tok) + continue + if len(tok) < 3: + continue + if tok.lower() in _MEDICAL_STOP_WORDS_SET: + continue + filtered.add(tok) + + return filtered, hits + + def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: """Pré-scan du document brut pour extraire les noms de personnes depuis les champs structurés (Patient, Rédigé par, etc.). @@ -472,7 +880,32 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: def _add_tokens(match_str: str): for token in match_str.split(): token = token.strip(" .-'") - if len(token) >= 3 and token.upper() not in wl_sections and token not in wl_phrases: + if len(token) < 3: + continue + if token.upper() in wl_sections or token in wl_phrases: + continue + if token.lower() in _MEDICAL_STOP_WORDS_SET: + continue + names.add(token) + + def _add_tokens_force_first(match_str): + """Comme _add_tokens mais force le 1er token (contexte Dr/Mme fort).""" + tokens = match_str.split() + for i, token in enumerate(tokens): + token = token.strip(" .-'") + if len(token) < 2: + continue + if i == 0: + # Premier token après Dr/Mme : toujours un nom, bypass stop words + if token.upper() not in wl_sections: + names.add(token) + else: + if len(token) < 3: + continue + if token.upper() in wl_sections or token in wl_phrases: + continue + if token.lower() in _MEDICAL_STOP_WORDS_SET: + continue names.add(token) for m in RE_EXTRACT_PATIENT.finditer(full_text): @@ -480,9 +913,54 @@ def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: for m in RE_EXTRACT_REDIGE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_MME_MR.finditer(full_text): - _add_tokens(m.group(1)) + _add_tokens_force_first(m.group(1)) for m in RE_EXTRACT_DR_DEST.finditer(full_text): + _add_tokens_force_first(m.group(1)) + # Champs d'identité structurés (trackare / DPI) + for m in RE_EXTRACT_NOM_NAISSANCE.finditer(full_text): _add_tokens(m.group(1)) + for m in RE_EXTRACT_NOM_PRENOM.finditer(full_text): + _add_tokens(m.group(1)) + for m in RE_EXTRACT_LIEU_NAISSANCE.finditer(full_text): + _add_tokens(m.group(1)) + for m in RE_EXTRACT_VILLE_RESIDENCE.finditer(full_text): + _add_tokens(m.group(1)) + # Contacts structurés (conjoint, concubin, etc.) + for m in RE_EXTRACT_CONTACT.finditer(full_text): + _add_tokens(m.group(1)) + if m.group(2): + _add_tokens(m.group(2)) + # Personnel médical avec rôle (Aide, Cadre Infirmier, etc.) + for m in RE_EXTRACT_STAFF_ROLE.finditer(full_text): + _add_tokens(m.group(1)) + + # Extraction des noms dans les listes virgulées après Dr/Docteur + # ex: "le Dr DUVAL, MACHELART, CHARLANNE, LAZARO, il a été proposé" + for m in RE_DR_COMMA_LIST.finditer(full_text): + fragment = m.group(0) + parts = [p.strip() for p in fragment.split(",")] + for part in parts: + for tok in _NAME_TOKEN_RE.findall(part): + tok = tok.strip(" .-'") + if len(tok) < 3: + continue + if tok.upper() in wl_sections or tok in wl_phrases: + continue + if tok.lower() in _MEDICAL_STOP_WORDS_SET: + continue + names.add(tok) + + # Retirer les sous-parties de noms composés avec tiret + # Si "JEAN-PIERRE" est dans names, retirer "JEAN" et "PIERRE" individuels + compound_names = {n for n in names if "-" in n} + parts_to_remove = set() + for compound in compound_names: + for part in compound.split("-"): + part = part.strip() + if len(part) >= 2 and part in names: + parts_to_remove.add(part) + names -= parts_to_remove + return names @@ -491,14 +969,25 @@ def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: placeholder = PLACEHOLDERS["NOM"] for token in sorted(names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) + new_text = [] + last_end = 0 for m in pattern.finditer(text): # Ne pas remplacer si déjà dans un placeholder ctx_start = max(0, m.start() - 1) ctx_end = min(len(text), m.end() + 1) if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]: continue + # Ne pas remplacer si le token fait partie d'un mot composé (tiret) + if m.start() > 0 and text[m.start() - 1] == "-": + continue + if m.end() < len(text) and text[m.end()] == "-": + continue audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder)) - text = pattern.sub(placeholder, text) + new_text.append(text[last_end:m.start()]) + new_text.append(placeholder) + last_end = m.end() + new_text.append(text[last_end:]) + text = "".join(new_text) return text @@ -513,6 +1002,12 @@ def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str] ) extracted_names = _extract_document_names(full_raw, cfg) + # Phase 0b : si document Trackare, extraction renforcée des PII structurés + if _is_trackare_document(full_raw): + trackare_names, trackare_hits = _extract_trackare_identity(full_raw) + extracted_names.update(trackare_names) + audit.extend(trackare_hits) + # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): @@ -696,9 +1191,17 @@ def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: protected = RE_NIR.sub(_rescan_nir, protected) # Nouvelles regex : dates de naissance, dates, adresses, codes postaux protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected) - protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) + # protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) # désactivé protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected) + protected = RE_BP.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected) + # N° Episode + protected = RE_EPISODE.sub(PLACEHOLDERS["EPISODE"], protected) + # N° RPPS + protected = RE_RPPS.sub(PLACEHOLDERS["RPPS"], protected) + # Établissements + protected = RE_ETABLISSEMENT.sub(PLACEHOLDERS["ETAB"], protected) + protected = RE_HOPITAL_VILLE.sub(PLACEHOLDERS["ETAB"], protected) # Personnes contextuelles (avec whitelist) wl_sections = set() wl_phrases = set() @@ -743,6 +1246,15 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> compact = re.sub(r"\s+", "", token) if compact != token: rects = page.search_for(compact) + # Fallback : chercher chaque mot individuellement (uniquement pour les NOM) + if not rects and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: + for word in token.split(): + word = word.strip(" .-'") + if len(word) < 3 or word.lower() in _MEDICAL_STOP_WORDS_SET: + continue + if not word[0].isupper(): + continue + rects.extend(page.search_for(word)) for r in rects: page.add_redact_annot(r, fill=(0,0,0)) try: @@ -753,7 +1265,7 @@ def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> doc.close() -def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300) -> None: +def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300, ogc_label: Optional[str] = None) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)); out = fitz.open() @@ -769,6 +1281,19 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp if not found and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) found = page.search_for(compact) + # Fallback : si la chaîne complète n'est pas trouvée, + # chercher chaque mot individuellement (uniquement pour les NOM) + if not found and " " in token and h.kind in {"NOM", "NOM_EXTRACTED", "NER_PER", "EDS_NOM"}: + for word in token.split(): + word = word.strip(" .-'") + if len(word) < 3: + continue + if word.lower() in _MEDICAL_STOP_WORDS_SET: + continue + # Ne garder que les mots qui ressemblent à des noms propres + if not word[0].isupper(): + continue + found.extend(page.search_for(word)) rects.extend(found) all_rects[pno] = rects for pno in range(len(doc)): @@ -779,6 +1304,23 @@ def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dp draw = ImageDraw.Draw(img) for r in all_rects.get(pno, []): draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0)) + # Incrustation OGC en haut à droite + if ogc_label: + from PIL import ImageFont + font_size = int(14 * zoom) + try: + font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", font_size) + except Exception: + font = ImageFont.load_default() + text = f"OGC: {ogc_label}" + bbox = draw.textbbox((0, 0), text, font=font) + tw, th = bbox[2] - bbox[0], bbox[3] - bbox[1] + margin = int(10 * zoom) + x = img.width - tw - margin + y = margin + # Fond blanc + texte noir + draw.rectangle([x - 4, y - 2, x + tw + 4, y + th + 2], fill=(255, 255, 255)) + draw.text((x, y), text, fill=(0, 0, 0), font=font) buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0) dst = out.new_page(width=rect.width, height=rect.height) dst.insert_image(rect, stream=buf.getvalue()) @@ -796,6 +1338,7 @@ def process_pdf( use_hf: bool = False, ner_manager=None, ner_thresholds=None, + ogc_label: Optional[str] = None, ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) @@ -818,6 +1361,92 @@ def process_pdf( # 3) Rescan selectif final_text = selective_rescan(final_text, cfg=cfg) + # 3b) Nettoyage post-masquage : codes postaux orphelins (5 chiffres collés à un placeholder) + # et téléphones fragmentés sur plusieurs lignes + _re_cp_orphan = re.compile(r"(\[(?:ADRESSE|NOM|VILLE)\])\s*(\d{5})\b") + def _clean_cp_orphan(m): + anon.audit.append(PiiHit(-1, "CODE_POSTAL", m.group(2), PLACEHOLDERS["CODE_POSTAL"])) + return m.group(1) + PLACEHOLDERS["CODE_POSTAL"] + final_text = _re_cp_orphan.sub(_clean_cp_orphan, final_text) + + # Téléphones fragmentés : "0X XX XX XX\nXX" coupé en fin de ligne (ligne suivante immédiate) + _re_tel_frag = re.compile(r"((?:\+33\s?|0)\d(?:[ .-]?\d){6,7})\s*\n\s*(\d{2}(?!\d))") + def _clean_tel_frag(m): + full = m.group(1).replace(" ", "").replace(".", "").replace("-", "") + m.group(2) + if len(full.replace("+33", "0")) == 10: + anon.audit.append(PiiHit(-1, "TEL", m.group(0).strip(), PLACEHOLDERS["TEL"])) + return PLACEHOLDERS["TEL"] + "\n" + return m.group(0) + final_text = _re_tel_frag.sub(_clean_tel_frag, final_text) + + # Téléphones incomplets en fin de ligne (8 ou 9 chiffres au format 0X XX XX XX) : masquer la partie visible + _re_tel_partial = re.compile(r"(?= 2 and part in _global_name_tokens: + _parts_to_drop.add(part) + _global_name_tokens -= _parts_to_drop + + for token in _global_name_tokens: + anon.audit.append(PiiHit(page=-1, kind="NOM_GLOBAL", original=token, placeholder=PLACEHOLDERS["NOM"])) + + # 4b) TEL, EMAIL, ADRESSE, CODE_POSTAL : propager les valeurs uniques sur toutes les pages + _global_pii: Dict[str, set] = {} + for h in anon.audit: + if h.kind in {"TEL", "EMAIL", "ADRESSE", "CODE_POSTAL", "EPISODE", "RPPS", "VILLE", "ETAB"}: + _global_pii.setdefault(h.kind, set()).add(h.original.strip()) + for kind, values in _global_pii.items(): + placeholder = PLACEHOLDERS.get(kind, PLACEHOLDERS["MASK"]) + for val in values: + anon.audit.append(PiiHit(page=-1, kind=f"{kind}_GLOBAL", original=val, placeholder=placeholder)) + # Log OCR dans l'audit if ocr_used: anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder="")) @@ -842,7 +1471,7 @@ def process_pdf( pass if also_make_raster_burn and fitz is not None: ras_path = out_dir / f"{base}.redacted_raster.pdf" - redact_pdf_raster(pdf_path, anon.audit, ras_path) + redact_pdf_raster(pdf_path, anon.audit, ras_path, ogc_label=ogc_label) outputs["pdf_raster"] = str(ras_path) return outputs