#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Core d'anonymisation (v2.1) + NER ONNX (optionnel, narratif uniquement) ------------------------------------------------------------------------ - Extraction 2 passes (pdfplumber -> pdfminer) + fallback 3e passe PyMuPDF si texte pauvre ou (cid:xx) - Règles regex (PII critiques) + clé:valeur (masquer valeur seulement) + overrides YAML - Rescan sécurité **sélectif** (EMAIL/TEL/IBAN/NIR), jamais dans [TABLES] - Redaction PDF (vector/raster) via PyMuPDF - NER ONNX **optionnel** (CamemBERT family) appliqué **après** les règles, sur le narratif Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), transformers, optimum, onnxruntime """ from __future__ import annotations import io import json import re from dataclasses import dataclass, field from pathlib import Path from typing import List, Dict, Tuple, Optional, Any import pdfplumber from pdfminer.high_level import extract_text as pdfminer_extract_text from pdfminer.layout import LAParams from PIL import Image, ImageDraw try: import fitz # PyMuPDF except Exception: fitz = None try: import yaml # PyYAML for dictionaries except Exception: yaml = None try: from doctr.models import ocr_predictor as _doctr_ocr_predictor _DOCTR_AVAILABLE = True except Exception: _doctr_ocr_predictor = None # type: ignore _DOCTR_AVAILABLE = False # NER manager (facultatif) try: from ner_manager_onnx import NerModelManager, NerThresholds except Exception: NerModelManager = None # type: ignore NerThresholds = None # type: ignore # EDS-Pseudo manager (facultatif) try: from eds_pseudo_manager import EdsPseudoManager except Exception: EdsPseudoManager = None # type: ignore # ----------------- Defaults & Config ----------------- DEFAULTS_CFG = { "version": 1, "encoding": "utf-8", "normalization": "NFKC", "whitelist": { "sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"], "noms_maj_excepts": ["Médecin DIM", "Praticien conseil"], "org_gpe_keep": True, }, "blacklist": { "force_mask_terms": [], "force_mask_regex": [], }, "kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"], "regex_overrides": [ { "name": "OGC_court", "pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b", "placeholder": "[OGC]", "flags": ["IGNORECASE"], } ], "flags": { "case_insensitive": True, "unicode_word_boundaries": True, "regex_engine": "python", }, } PLACEHOLDERS = { "EMAIL": "[EMAIL]", "TEL": "[TEL]", "IBAN": "[IBAN]", "NIR": "[NIR]", "IPP": "[IPP]", "FINESS": "[FINESS]", "OGC": "[OGC]", "NOM": "[NOM]", "VILLE": "[VILLE]", "ETAB": "[ETABLISSEMENT]", "MASK": "[MASK]", "DATE": "[DATE]", "DATE_NAISSANCE": "[DATE_NAISSANCE]", "ADRESSE": "[ADRESSE]", "CODE_POSTAL": "[CODE_POSTAL]", "AGE": "[AGE]", "DOSSIER": "[DOSSIER]", "NDA": "[NDA]", } CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"} # Baseline regex RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}") RE_TEL = re.compile(r"(? bool: """Vérifie la clé modulo 97 d'un NIR (13 chiffres + 2 clé). Supporte la Corse (2A/2B).""" digits_only = re.sub(r"\s+", "", nir_raw) if len(digits_only) < 15: return False body_str = digits_only[:13] key_str = digits_only[13:15] # Corse : 2A → 19, 2B → 18 (pour le calcul) body_str_calc = body_str.upper().replace("2A", "19").replace("2B", "18") try: body_int = int(body_str_calc) key_int = int(key_str) except ValueError: return False return key_int == (97 - (body_int % 97)) RE_PERSON_CONTEXT = re.compile( r"(?:(?:Dr\.?|DR\.?|Docteur|Mme|MME|Madame|M\.|Mr\.?|Monsieur" r"|Nom\s*:\s*|Praticien|Médecin" r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par" r")\s+)" r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\'.]+)*)" ) SPLITTER = re.compile(r"\s*[:|;\t]\s*") # --- Extraction globale de noms depuis champs structurés --- _UC_NAME_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+" RE_EXTRACT_PATIENT = re.compile( r"Patient\(?e?\)?\s*:\s*" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)" r"(?=\s+Né|\s+né|\s+N°|\s*$)", re.MULTILINE, ) RE_EXTRACT_REDIGE = re.compile( r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) RE_EXTRACT_MME_MR = re.compile( r"(?:MME|Madame|Monsieur|Mr\.?)\s+" r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)", ) RE_EXTRACT_DR_DEST = re.compile( r"(?:DR\.?|Docteur)\s+" rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)", ) CID_PATTERN = re.compile(r"\(cid:\d+\)") # --- Nouvelles regex : dates, adresses, âges, dossiers --- _MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)" RE_DATE_NAISSANCE = re.compile( r"(?:n[ée]+\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*" r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})", re.IGNORECASE, ) RE_DATE = re.compile( r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b" r"|" r"\b(\d{1,2})\s+" + _MOIS_FR + r"\s+(\d{4})\b", re.IGNORECASE, ) RE_ADRESSE = re.compile( r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*" r"(?:rue|avenue|av\.|boulevard|bd|place|chemin|allée|impasse|route|cours|passage|square)" r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}", re.IGNORECASE, ) RE_CODE_POSTAL = re.compile( r"(?:(?:code\s*postal|CP)\s*[:\-]?\s*(\d{5}))" r"|" r"(?:(\d{5})[ \t]+[A-ZÉÈÀÙ][a-zéèàùâêîôû]+(?:[\s\-][A-ZÉÈÀÙ][a-zéèàùâêîôû]+)*)", re.IGNORECASE, ) RE_AGE = re.compile( r"(?:âg[ée]+\s+de\s+|patient(?:e)?\s+de\s+)?(\d{1,3})\s*ans\b", re.IGNORECASE, ) RE_NUMERO_DOSSIER = re.compile( r"(?:dossier|n°\s*dossier|NDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})" r"|" r"(?:référence|réf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})", re.IGNORECASE, ) @dataclass class PiiHit: page: int kind: str original: str placeholder: str bbox_hint: Optional[Tuple[float, float, float, float]] = None @dataclass class AnonResult: text_out: str tables_block: str audit: List[PiiHit] = field(default_factory=list) # ----------------- Config loader ----------------- def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]: cfg = DEFAULTS_CFG.copy() if config_path and config_path.exists() and yaml is not None: try: user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} for k, v in user.items(): cfg[k] = v except Exception: pass return cfg # ----------------- Extraction ----------------- def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]: """Extraction texte multi-passes avec fallback OCR (docTR). Retourne (pages_text, tables_lines, ocr_used). """ pages_text: List[str] = [] tables_lines: List[List[str]] = [] ocr_used = False with pdfplumber.open(pdf_path) as pdf: for p in pdf.pages: t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or "" pages_text.append(t) rows: List[str] = [] try: tables = p.extract_tables() for tbl in tables or []: for row in tbl: clean = [c if c is not None else "" for c in row] rows.append("\t".join(clean).strip()) except Exception: pass tables_lines.append(rows) total_chars = sum(len(x or "") for x in pages_text) need_fallback = total_chars < 500 if not need_fallback: need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text) if need_fallback: text_all = pdfminer_extract_text( str(pdf_path), laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5), ) split = [x for x in text_all.split("\f") if x] if split: pages_text = split # 3e passe PyMuPDF si toujours pauvre/cid total_chars = sum(len(x or "") for x in pages_text) if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None: try: doc = fitz.open(str(pdf_path)) pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))] doc.close() except Exception: pass # 4e passe : OCR docTR si toujours très peu de texte (PDF scanné) total_chars = sum(len(x or "") for x in pages_text) if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None: try: model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True) doc = fitz.open(str(pdf_path)) ocr_pages: List[str] = [] for i in range(len(doc)): pix = doc[i].get_pixmap(dpi=300) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) import numpy as np result = model([np.array(img)]) page_text = "" for block in result.pages[0].blocks: for line in block.lines: words = [w.value for w in line.words] page_text += " ".join(words) + "\n" ocr_pages.append(page_text) doc.close() if sum(len(p) for p in ocr_pages) > total_chars: pages_text = ocr_pages ocr_used = True except Exception: pass return pages_text, tables_lines, ocr_used # Alias pour compatibilité ascendante def extract_text_three_passes(pdf_path: Path): pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path) return pages_text, tables_lines # ----------------- Helpers ----------------- def _compile_user_regex(pattern: str, flags_list: List[str]): flags = 0 for f in flags_list or []: u = f.upper() if u == "IGNORECASE": flags |= re.IGNORECASE if u == "MULTILINE": flags |= re.MULTILINE if u == "DOTALL": flags |= re.DOTALL return re.compile(pattern, flags) def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: for ov in cfg.get("regex_overrides", []) or []: pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override") flags_list = ov.get("flags", []) try: rx = _compile_user_regex(pattern, flags_list) except Exception: continue def _rep(m: re.Match): audit.append(PiiHit(page_idx, name, m.group(0), placeholder)) return placeholder line = rx.sub(_rep, line) # force-mask literals for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []): if not term: continue word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE) if word_rx.search(line): audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"])) line = word_rx.sub(PLACEHOLDERS["MASK"], line) # force-mask regex for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []): try: rx = re.compile(pat, re.IGNORECASE) except Exception: continue if rx.search(line): audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"])) line = rx.sub(PLACEHOLDERS["MASK"], line) return line def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str: m = RE_FINESS.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"])) return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line) m = RE_OGC.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"])) return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line) m = RE_IPP.search(line) if m: val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"])) return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line) return line def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: # user overrides & force-masks d'abord line = _apply_overrides(line, audit, page_idx, cfg) # EMAIL def _repl_email(m: re.Match) -> str: audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"])) return PLACEHOLDERS["EMAIL"] line = RE_EMAIL.sub(_repl_email, line) # TEL def _repl_tel(m: re.Match) -> str: audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"])) return PLACEHOLDERS["TEL"] line = RE_TEL.sub(_repl_tel, line) # IBAN def _repl_iban(m: re.Match) -> str: audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"])) return PLACEHOLDERS["IBAN"] line = RE_IBAN.sub(_repl_iban, line) # NIR (avec validation clé modulo 97) def _repl_nir(m: re.Match) -> str: raw = m.group(0) if not validate_nir(raw): return raw # faux positif, on ne masque pas audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"])) return PLACEHOLDERS["NIR"] line = RE_NIR.sub(_repl_nir, line) # DATE_NAISSANCE (plus spécifique, avant DATE générique) def _repl_date_naissance(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"])) return PLACEHOLDERS["DATE_NAISSANCE"] line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line) # DATE générique def _repl_date(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"])) return PLACEHOLDERS["DATE"] line = RE_DATE.sub(_repl_date, line) # ADRESSE def _repl_adresse(m: re.Match) -> str: audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"])) return PLACEHOLDERS["ADRESSE"] line = RE_ADRESSE.sub(_repl_adresse, line) # CODE_POSTAL def _repl_code_postal(m: re.Match) -> str: audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"])) return PLACEHOLDERS["CODE_POSTAL"] line = RE_CODE_POSTAL.sub(_repl_code_postal, line) # AGE def _repl_age(m: re.Match) -> str: audit.append(PiiHit(page_idx, "AGE", m.group(0), PLACEHOLDERS["AGE"])) return PLACEHOLDERS["AGE"] line = RE_AGE.sub(_repl_age, line) # NUMERO DOSSIER / NDA def _repl_dossier(m: re.Match) -> str: audit.append(PiiHit(page_idx, "DOSSIER", m.group(0), PLACEHOLDERS["DOSSIER"])) return PLACEHOLDERS["DOSSIER"] line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line) # PERSON uppercase avec contexte, whitelist/acronymes courts wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) def _repl_person_ctx(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw tokens = [t for t in span.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw audit.append(PiiHit(page_idx, "NOM", span, PLACEHOLDERS["NOM"])) return raw.replace(span, PLACEHOLDERS["NOM"]) # conserve le préfixe Dr/Mme line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line) return line def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str: line = _mask_admin_label(line, audit, page_idx) parts = SPLITTER.split(line, maxsplit=1) if len(parts) == 2: key, value = parts masked_val = _mask_line_by_regex(value, audit, page_idx, cfg) return f"{key.strip()} : {masked_val.strip()}" else: return _mask_line_by_regex(line, audit, page_idx, cfg) # ----------------- Extraction globale de noms ----------------- def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set: """Pré-scan du document brut pour extraire les noms de personnes depuis les champs structurés (Patient, Rédigé par, etc.). Retourne un ensemble de tokens (mots) à masquer globalement.""" wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) names: set = set() def _add_tokens(match_str: str): for token in match_str.split(): token = token.strip(" .-'") if len(token) >= 3 and token.upper() not in wl_sections and token not in wl_phrases: names.add(token) for m in RE_EXTRACT_PATIENT.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_REDIGE.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_MME_MR.finditer(full_text): _add_tokens(m.group(1)) for m in RE_EXTRACT_DR_DEST.finditer(full_text): _add_tokens(m.group(1)) return names def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str: """Remplace globalement chaque nom extrait dans le texte.""" placeholder = PLACEHOLDERS["NOM"] for token in sorted(names, key=len, reverse=True): pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE) for m in pattern.finditer(text): # Ne pas remplacer si déjà dans un placeholder ctx_start = max(0, m.start() - 1) ctx_end = min(len(text), m.end() + 1) if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]: continue audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder)) text = pattern.sub(placeholder, text) return text # ----------------- Anonymisation (regex) ----------------- def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult: audit: List[PiiHit] = [] # Phase 0 : extraction globale des noms depuis les champs structurés full_raw = "\n".join(pages_text) + "\n" + "\n".join( "\n".join(rows) for rows in tables_lines ) extracted_names = _extract_document_names(full_raw, cfg) # Phase 1 : masquage ligne par ligne (regex classiques) out_pages: List[str] = [] for i, page_txt in enumerate(pages_text): lines = [ln for ln in (page_txt or "").splitlines()] masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines] out_pages.append("\n".join(masked)) table_blocks: List[str] = [] for i, rows in enumerate(tables_lines): mbuf: List[str] = [] for r in rows: masked = _kv_value_only_mask(r, audit, i, cfg) mbuf.append(masked) if mbuf: table_blocks.append("\n".join(mbuf)) tables_block = "\n\n".join(table_blocks) text_out = "\f".join(out_pages) # séparateur de pages if tables_block.strip(): text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]" # Phase 2 : application globale des noms extraits (rattrapage) if extracted_names: text_out = _apply_extracted_names(text_out, extracted_names, audit) return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit) # ----------------- NER ONNX sur narratif ----------------- def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: # remplace via regex sur les 'word' détectés (approche pragmatique) keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", True)) def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or ""; grp = (e.get("entity_group") or e.get("entity") or "").upper() if not w or "[" in w or "]" in w: # ignore placeholders continue if len(w) <= 2: # trop court continue if grp in {"PER", "PERSON"}: audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"])) out = repl_once(out, w, PLACEHOLDERS["NOM"]) elif grp in {"ORG"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"])) out = repl_once(out, w, PLACEHOLDERS["ETAB"]) elif grp in {"LOC"}: if keep_org_gpe: continue audit.append(PiiHit(-1, "NER_LOC", w, PLACEHOLDERS["VILLE"])) out = repl_once(out, w, PLACEHOLDERS["VILLE"]) elif grp in {"DATE"}: # facultatif : si vous masquez déjà les dates via règles, laissez tel quel continue return out def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optional[NerModelManager], thresholds: Optional[NerThresholds]) -> Tuple[str, List[PiiHit]]: if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) tables: List[Tuple[int,int,str]] = [] keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages (séparées par \f) → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras, thresholds=thresholds) # remplace entités idx = 0 buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_hf(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- NER EDS-Pseudo sur narratif ----------------- def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str: """Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key.""" def repl_once(s: str, old: str, new: str) -> str: return re.sub(rf"\b{re.escape(old)}\b", new, s) out = text for e in ents: w = e.get("word") or "" mapped_key = e.get("eds_mapped_key", "") if not w or "[" in w or "]" in w: continue if len(w) <= 2: continue placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"]) label = e.get("entity_group", "EDS") audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder)) out = repl_once(out, w, placeholder) return out def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager") -> Tuple[str, List[PiiHit]]: """Applique EDS-Pseudo sur le narratif (même structure que apply_hf_ner_on_narrative).""" if manager is None or not manager.is_loaded(): return text_out, [] # isoler [TABLES] pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) keep = [] last = 0 cleaned = "" for m in pattern.finditer(text_out): cleaned += text_out[last:m.start()] keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0))) cleaned += "\x00" * len(m.group(0)) last = m.end() cleaned += text_out[last:] # par pages → par paragraphes pages = cleaned.split("\f") hits: List[PiiHit] = [] rebuilt_pages: List[str] = [] for pg in pages: paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()] ents_per_para = manager.infer_paragraphs(paras) buf = [] for para, ents in zip(paras, ents_per_para): masked = _mask_with_eds_pseudo(para, ents, cfg, hits) buf.append(masked) rebuilt_pages.append("\n\n".join(buf)) rebuilt = "\f".join(rebuilt_pages) # réinsérer [TABLES] rebuilt_list = list(rebuilt) for start, end, payload in keep: rebuilt_list[start:end] = list(payload) final = "".join(rebuilt_list) return final, hits # ----------------- Selective safety rescan ----------------- def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str: """Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage.""" # enlève TABLES du scope def strip_tables(s: str): kept = [] out = [] i = 0 pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL) for m in pattern.finditer(s): out.append(s[i:m.start()]) kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1))) out.append("\x00" * (m.end() - m.start())) i = m.end() out.append(s[i:]) return "".join(out), kept protected, kept = strip_tables(text) # PII critiques (comme avant) protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected) protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected) protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected) # NIR avec validation def _rescan_nir(m: re.Match) -> str: return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0) protected = RE_NIR.sub(_rescan_nir, protected) # Nouvelles regex : dates de naissance, dates, adresses, codes postaux protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected) protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected) protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected) protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected) # Personnes contextuelles (avec whitelist) wl_sections = set() wl_phrases = set() if cfg: wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or []) wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or []) def _rescan_person(m: re.Match) -> str: span = m.group(1).strip(); raw = m.group(0) if span in wl_sections or raw in wl_phrases: return raw tokens = [t for t in span.split() if t] if len(tokens) == 1 and len(tokens[0]) <= 3: return raw return raw.replace(span, PLACEHOLDERS["NOM"]) protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected) res = list(protected) for start, end, payload in kept: res[start:end] = list(payload) return "".join(res) # ----------------- PDF Redaction ----------------- def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)) # index hits par page; page==-1 → rechercher sur toutes pages by_page: Dict[int, List[PiiHit]] = {} for h in audit: by_page.setdefault(h.page, []).append(h) for pno in range(len(doc)): page = doc[pno] hits = by_page.get(pno, []) + by_page.get(-1, []) if not hits: continue for h in hits: token = h.original.strip() if not token: continue rects = page.search_for(token) if not rects and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) if compact != token: rects = page.search_for(compact) for r in rects: page.add_redact_annot(r, fill=(0,0,0)) try: page.apply_redactions() except Exception: pass doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False) doc.close() def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300) -> None: if fitz is None: raise RuntimeError("PyMuPDF non disponible – installez pymupdf.") doc = fitz.open(str(original_pdf)); out = fitz.open() all_rects: Dict[int, List["fitz.Rect"]] = {} for pno in range(len(doc)): page = doc[pno] rects = [] hits = [x for x in audit if x.page in {pno, -1}] for h in hits: token = h.original.strip() if not token: continue found = page.search_for(token) if not found and h.kind in {"NIR", "IBAN", "TEL"}: compact = re.sub(r"\s+", "", token) found = page.search_for(compact) rects.extend(found) all_rects[pno] = rects for pno in range(len(doc)): src = doc[pno]; rect = src.rect zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom) pix = src.get_pixmap(matrix=mat, annots=False) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) draw = ImageDraw.Draw(img) for r in all_rects.get(pno, []): draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0)) buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0) dst = out.new_page(width=rect.width, height=rect.height) dst.insert_image(rect, stream=buf.getvalue()) out.save(str(out_pdf), deflate=True, garbage=4, clean=True) out.close(); doc.close() # ----------------- Orchestration ----------------- def process_pdf( pdf_path: Path, out_dir: Path, make_vector_redaction: bool = True, also_make_raster_burn: bool = False, config_path: Optional[Path] = None, use_hf: bool = False, ner_manager=None, ner_thresholds=None, ) -> Dict[str, str]: out_dir.mkdir(parents=True, exist_ok=True) cfg = load_dictionaries(config_path) pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path) # 1) Regex rules anon = anonymise_document_regex(pages_text, tables_lines, cfg) # 2) NER (optionnel) — sur le narratif final_text = anon.text_out hf_hits: List[PiiHit] = [] if use_hf and ner_manager is not None and ner_manager.is_loaded(): # Détecter le type de manager et appeler la bonne fonction if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager): final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager) else: final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds) anon.audit.extend(hf_hits) # 3) Rescan selectif final_text = selective_rescan(final_text, cfg=cfg) # Log OCR dans l'audit if ocr_used: anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder="")) # Sauvegardes base = pdf_path.stem txt_path = out_dir / f"{base}.pseudonymise.txt" audit_path = out_dir / f"{base}.audit.jsonl" txt_path.write_text(final_text, encoding="utf-8") with audit_path.open("w", encoding="utf-8") as f: for hit in anon.audit: f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n") outputs = {"text": str(txt_path), "audit": str(audit_path)} # PDFs if make_vector_redaction and fitz is not None: vec_path = out_dir / f"{base}.redacted_vector.pdf" try: redact_pdf_vector(pdf_path, anon.audit, vec_path) outputs["pdf_vector"] = str(vec_path) except Exception: pass if also_make_raster_burn and fitz is not None: ras_path = out_dir / f"{base}.redacted_raster.pdf" redact_pdf_raster(pdf_path, anon.audit, ras_path) outputs["pdf_raster"] = str(ras_path) return outputs if __name__ == "__main__": import argparse ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)") ap.add_argument("pdf", type=str) ap.add_argument("--out", type=str, default="out") ap.add_argument("--no-vector", action="store_true") ap.add_argument("--raster", action="store_true") ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml"))) ap.add_argument("--hf", action="store_true", help="Activer NER ONNX sur narratif (nécessite ner_manager_onnx)") ap.add_argument("--model", type=str, default="cmarkea/distilcamembert-base-ner") args = ap.parse_args() manager = None if args.hf and NerModelManager is not None: manager = NerModelManager(cache_dir=Path("models")) manager.load(args.model) outs = process_pdf( Path(args.pdf), Path(args.out), make_vector_redaction=not args.no_vector, also_make_raster_burn=args.raster, config_path=Path(args.config), use_hf=bool(args.hf), ner_manager=manager, ner_thresholds=NerThresholds() if NerThresholds else None, ) print(json.dumps(outs, indent=2, ensure_ascii=False))