Files
anonymisation/anonymizer_core_refactored_onnx.py
Domi31tls 8339069c83 Initial commit — Pseudonymisation de PDF v5
- GUI v5 : vue unique épurée (tkinter), 2 étapes visuelles
- Core ONNX : anonymisation regex + NER optionnel
- Extraction globale des noms depuis champs structurés
  (Patient, Rédigé par, MME/Madame, DR)
- Génération simultanée PDF Image + PDF Anonymisé (structure préservée)
- Build Windows via Nuitka (script batch + GitHub Actions CI)
- install.sh pour setup/run Linux

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:03:37 +01:00

875 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Core d'anonymisation (v2.1) + NER ONNX (optionnel, narratif uniquement)
------------------------------------------------------------------------
- Extraction 2 passes (pdfplumber -> pdfminer) + fallback 3e passe PyMuPDF si texte pauvre ou (cid:xx)
- Règles regex (PII critiques) + clé:valeur (masquer valeur seulement) + overrides YAML
- Rescan sécurité **sélectif** (EMAIL/TEL/IBAN/NIR), jamais dans [TABLES]
- Redaction PDF (vector/raster) via PyMuPDF
- NER ONNX **optionnel** (CamemBERT family) appliqué **après** les règles, sur le narratif
Dépendances : pdfplumber, pdfminer.six, pillow, pymupdf, pyyaml (optionnel), transformers, optimum, onnxruntime
"""
from __future__ import annotations
import io
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text
from pdfminer.layout import LAParams
from PIL import Image, ImageDraw
try:
import fitz # PyMuPDF
except Exception:
fitz = None
try:
import yaml # PyYAML for dictionaries
except Exception:
yaml = None
try:
from doctr.models import ocr_predictor as _doctr_ocr_predictor
_DOCTR_AVAILABLE = True
except Exception:
_doctr_ocr_predictor = None # type: ignore
_DOCTR_AVAILABLE = False
# NER manager (facultatif)
try:
from ner_manager_onnx import NerModelManager, NerThresholds
except Exception:
NerModelManager = None # type: ignore
NerThresholds = None # type: ignore
# EDS-Pseudo manager (facultatif)
try:
from eds_pseudo_manager import EdsPseudoManager
except Exception:
EdsPseudoManager = None # type: ignore
# ----------------- Defaults & Config -----------------
DEFAULTS_CFG = {
"version": 1,
"encoding": "utf-8",
"normalization": "NFKC",
"whitelist": {
"sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"],
"noms_maj_excepts": ["Médecin DIM", "Praticien conseil"],
"org_gpe_keep": True,
},
"blacklist": {
"force_mask_terms": [],
"force_mask_regex": [],
},
"kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"],
"regex_overrides": [
{
"name": "OGC_court",
"pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b",
"placeholder": "[OGC]",
"flags": ["IGNORECASE"],
}
],
"flags": {
"case_insensitive": True,
"unicode_word_boundaries": True,
"regex_engine": "python",
},
}
PLACEHOLDERS = {
"EMAIL": "[EMAIL]",
"TEL": "[TEL]",
"IBAN": "[IBAN]",
"NIR": "[NIR]",
"IPP": "[IPP]",
"FINESS": "[FINESS]",
"OGC": "[OGC]",
"NOM": "[NOM]",
"VILLE": "[VILLE]",
"ETAB": "[ETABLISSEMENT]",
"MASK": "[MASK]",
"DATE": "[DATE]",
"DATE_NAISSANCE": "[DATE_NAISSANCE]",
"ADRESSE": "[ADRESSE]",
"CODE_POSTAL": "[CODE_POSTAL]",
"AGE": "[AGE]",
"DOSSIER": "[DOSSIER]",
"NDA": "[NDA]",
}
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP", "DATE_NAISSANCE"}
# Baseline regex
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .-]?\d){8}(?!\d)")
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE)
RE_NIR = re.compile(
r"\b([12])\s*(\d{2})\s*(0[1-9]|1[0-2]|2[AB])\s*(\d{2,3})\s*(\d{3})\s*(\d{3})\s*(\d{2})\b",
re.IGNORECASE,
)
def validate_nir(nir_raw: str) -> bool:
"""Vérifie la clé modulo 97 d'un NIR (13 chiffres + 2 clé). Supporte la Corse (2A/2B)."""
digits_only = re.sub(r"\s+", "", nir_raw)
if len(digits_only) < 15:
return False
body_str = digits_only[:13]
key_str = digits_only[13:15]
# Corse : 2A → 19, 2B → 18 (pour le calcul)
body_str_calc = body_str.upper().replace("2A", "19").replace("2B", "18")
try:
body_int = int(body_str_calc)
key_int = int(key_str)
except ValueError:
return False
return key_int == (97 - (body_int % 97))
RE_PERSON_CONTEXT = re.compile(
r"(?:(?:Dr\.?|DR\.?|Docteur|Mme|MME|Madame|M\.|Mr\.?|Monsieur"
r"|Nom\s*:\s*|Praticien|Médecin"
r"|Rédigé\s+par|Validé\s+par|Signé\s+par|Saisi\s+par"
r")\s+)"
r"([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\' .]+(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\'.]+)*)"
)
SPLITTER = re.compile(r"\s*[:|;\t]\s*")
# --- Extraction globale de noms depuis champs structurés ---
_UC_NAME_TOKEN = r"[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ][A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇa-zéèàùâêîôûäëïöüç\-\']+"
RE_EXTRACT_PATIENT = re.compile(
r"Patient\(?e?\)?\s*:\s*"
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)"
r"(?=\s+Né|\s+né|\s+N°|\s*$)",
re.MULTILINE,
)
RE_EXTRACT_REDIGE = re.compile(
r"(?:Rédigé|Validé|Signé|Saisi)\s+par\s+"
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)",
)
RE_EXTRACT_MME_MR = re.compile(
r"(?:MME|Madame|Monsieur|Mr\.?)\s+"
r"((?:[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})(?:\s+[A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ]{2,})*)",
)
RE_EXTRACT_DR_DEST = re.compile(
r"(?:DR\.?|Docteur)\s+"
rf"((?:{_UC_NAME_TOKEN})(?:\s+(?:{_UC_NAME_TOKEN}))*)",
)
CID_PATTERN = re.compile(r"\(cid:\d+\)")
# --- Nouvelles regex : dates, adresses, âges, dossiers ---
_MOIS_FR = r"(?:janvier|février|mars|avril|mai|juin|juillet|août|septembre|octobre|novembre|décembre)"
RE_DATE_NAISSANCE = re.compile(
r"(?:n[ée]+\s+le|date\s+de\s+naissance|DDN)\s*[:\-]?\s*"
r"(\d{1,2}[\s/.\-]\d{1,2}[\s/.\-]\d{2,4}|\d{1,2}\s+" + _MOIS_FR + r"\s+\d{4})",
re.IGNORECASE,
)
RE_DATE = re.compile(
r"\b(\d{1,2})\s*[/.\-]\s*(\d{1,2})\s*[/.\-]\s*(\d{4})\b"
r"|"
r"\b(\d{1,2})\s+" + _MOIS_FR + r"\s+(\d{4})\b",
re.IGNORECASE,
)
RE_ADRESSE = re.compile(
r"\b\d{1,4}[\s,]*(?:bis|ter)?\s*,?\s*"
r"(?:rue|avenue|av\.|boulevard|bd|place|chemin|allée|impasse|route|cours|passage|square)"
r"\s+[A-ZÉÈÀÙÂÊÎÔÛa-zéèàùâêîôûäëïöüç\s\-']{2,}",
re.IGNORECASE,
)
RE_CODE_POSTAL = re.compile(
r"(?:(?:code\s*postal|CP)\s*[:\-]?\s*(\d{5}))"
r"|"
r"(?:(\d{5})[ \t]+[A-ZÉÈÀÙ][a-zéèàùâêîôû]+(?:[\s\-][A-ZÉÈÀÙ][a-zéèàùâêîôû]+)*)",
re.IGNORECASE,
)
RE_AGE = re.compile(
r"(?:âg[ée]+\s+de\s+|patient(?:e)?\s+de\s+)?(\d{1,3})\s*ans\b",
re.IGNORECASE,
)
RE_NUMERO_DOSSIER = re.compile(
r"(?:dossier|n°\s*dossier|NDA)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})"
r"|"
r"(?:référence|réf\.)\s*[:\-n°]+\s*([A-Za-z0-9\-/]{4,})",
re.IGNORECASE,
)
@dataclass
class PiiHit:
page: int
kind: str
original: str
placeholder: str
bbox_hint: Optional[Tuple[float, float, float, float]] = None
@dataclass
class AnonResult:
text_out: str
tables_block: str
audit: List[PiiHit] = field(default_factory=list)
# ----------------- Config loader -----------------
def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
cfg = DEFAULTS_CFG.copy()
if config_path and config_path.exists() and yaml is not None:
try:
user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
for k, v in user.items():
cfg[k] = v
except Exception:
pass
return cfg
# ----------------- Extraction -----------------
def extract_text_with_fallback_ocr(pdf_path: Path) -> Tuple[List[str], List[List[str]], bool]:
"""Extraction texte multi-passes avec fallback OCR (docTR).
Retourne (pages_text, tables_lines, ocr_used).
"""
pages_text: List[str] = []
tables_lines: List[List[str]] = []
ocr_used = False
with pdfplumber.open(pdf_path) as pdf:
for p in pdf.pages:
t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or ""
pages_text.append(t)
rows: List[str] = []
try:
tables = p.extract_tables()
for tbl in tables or []:
for row in tbl:
clean = [c if c is not None else "" for c in row]
rows.append("\t".join(clean).strip())
except Exception:
pass
tables_lines.append(rows)
total_chars = sum(len(x or "") for x in pages_text)
need_fallback = total_chars < 500
if not need_fallback:
need_fallback = any(CID_PATTERN.search(x or "") for x in pages_text)
if need_fallback:
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
split = [x for x in text_all.split("\f") if x]
if split:
pages_text = split
# 3e passe PyMuPDF si toujours pauvre/cid
total_chars = sum(len(x or "") for x in pages_text)
if (total_chars < 500 or any(CID_PATTERN.search(x or "") for x in pages_text)) and fitz is not None:
try:
doc = fitz.open(str(pdf_path))
pages_text = [doc[i].get_text("text") or "" for i in range(len(doc))]
doc.close()
except Exception:
pass
# 4e passe : OCR docTR si toujours très peu de texte (PDF scanné)
total_chars = sum(len(x or "") for x in pages_text)
if total_chars < 200 and _DOCTR_AVAILABLE and fitz is not None:
try:
model = _doctr_ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_vgg16_bn", pretrained=True)
doc = fitz.open(str(pdf_path))
ocr_pages: List[str] = []
for i in range(len(doc)):
pix = doc[i].get_pixmap(dpi=300)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
import numpy as np
result = model([np.array(img)])
page_text = ""
for block in result.pages[0].blocks:
for line in block.lines:
words = [w.value for w in line.words]
page_text += " ".join(words) + "\n"
ocr_pages.append(page_text)
doc.close()
if sum(len(p) for p in ocr_pages) > total_chars:
pages_text = ocr_pages
ocr_used = True
except Exception:
pass
return pages_text, tables_lines, ocr_used
# Alias pour compatibilité ascendante
def extract_text_three_passes(pdf_path: Path):
pages_text, tables_lines, _ = extract_text_with_fallback_ocr(pdf_path)
return pages_text, tables_lines
# ----------------- Helpers -----------------
def _compile_user_regex(pattern: str, flags_list: List[str]):
flags = 0
for f in flags_list or []:
u = f.upper()
if u == "IGNORECASE": flags |= re.IGNORECASE
if u == "MULTILINE": flags |= re.MULTILINE
if u == "DOTALL": flags |= re.DOTALL
return re.compile(pattern, flags)
def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
for ov in cfg.get("regex_overrides", []) or []:
pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override")
flags_list = ov.get("flags", [])
try:
rx = _compile_user_regex(pattern, flags_list)
except Exception:
continue
def _rep(m: re.Match):
audit.append(PiiHit(page_idx, name, m.group(0), placeholder))
return placeholder
line = rx.sub(_rep, line)
# force-mask literals
for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []):
if not term: continue
word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE)
if word_rx.search(line):
audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"]))
line = word_rx.sub(PLACEHOLDERS["MASK"], line)
# force-mask regex
for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []):
try:
rx = re.compile(pat, re.IGNORECASE)
except Exception:
continue
if rx.search(line):
audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"]))
line = rx.sub(PLACEHOLDERS["MASK"], line)
return line
def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str:
m = RE_FINESS.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line)
m = RE_OGC.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"]))
return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line)
m = RE_IPP.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"]))
return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line)
return line
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
# user overrides & force-masks d'abord
line = _apply_overrides(line, audit, page_idx, cfg)
# EMAIL
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
line = RE_EMAIL.sub(_repl_email, line)
# TEL
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
line = RE_TEL.sub(_repl_tel, line)
# IBAN
def _repl_iban(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"]))
return PLACEHOLDERS["IBAN"]
line = RE_IBAN.sub(_repl_iban, line)
# NIR (avec validation clé modulo 97)
def _repl_nir(m: re.Match) -> str:
raw = m.group(0)
if not validate_nir(raw):
return raw # faux positif, on ne masque pas
audit.append(PiiHit(page_idx, "NIR", raw, PLACEHOLDERS["NIR"]))
return PLACEHOLDERS["NIR"]
line = RE_NIR.sub(_repl_nir, line)
# DATE_NAISSANCE (plus spécifique, avant DATE générique)
def _repl_date_naissance(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DATE_NAISSANCE", m.group(0), PLACEHOLDERS["DATE_NAISSANCE"]))
return PLACEHOLDERS["DATE_NAISSANCE"]
line = RE_DATE_NAISSANCE.sub(_repl_date_naissance, line)
# DATE générique
def _repl_date(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DATE", m.group(0), PLACEHOLDERS["DATE"]))
return PLACEHOLDERS["DATE"]
line = RE_DATE.sub(_repl_date, line)
# ADRESSE
def _repl_adresse(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "ADRESSE", m.group(0), PLACEHOLDERS["ADRESSE"]))
return PLACEHOLDERS["ADRESSE"]
line = RE_ADRESSE.sub(_repl_adresse, line)
# CODE_POSTAL
def _repl_code_postal(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "CODE_POSTAL", m.group(0), PLACEHOLDERS["CODE_POSTAL"]))
return PLACEHOLDERS["CODE_POSTAL"]
line = RE_CODE_POSTAL.sub(_repl_code_postal, line)
# AGE
def _repl_age(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "AGE", m.group(0), PLACEHOLDERS["AGE"]))
return PLACEHOLDERS["AGE"]
line = RE_AGE.sub(_repl_age, line)
# NUMERO DOSSIER / NDA
def _repl_dossier(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "DOSSIER", m.group(0), PLACEHOLDERS["DOSSIER"]))
return PLACEHOLDERS["DOSSIER"]
line = RE_NUMERO_DOSSIER.sub(_repl_dossier, line)
# PERSON uppercase avec contexte, whitelist/acronymes courts
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
def _repl_person_ctx(m: re.Match) -> str:
span = m.group(1).strip(); raw = m.group(0)
if span in wl_sections or raw in wl_phrases: return raw
tokens = [t for t in span.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3: return raw
audit.append(PiiHit(page_idx, "NOM", span, PLACEHOLDERS["NOM"]))
return raw.replace(span, PLACEHOLDERS["NOM"]) # conserve le préfixe Dr/Mme
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
return line
def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
line = _mask_admin_label(line, audit, page_idx)
parts = SPLITTER.split(line, maxsplit=1)
if len(parts) == 2:
key, value = parts
masked_val = _mask_line_by_regex(value, audit, page_idx, cfg)
return f"{key.strip()} : {masked_val.strip()}"
else:
return _mask_line_by_regex(line, audit, page_idx, cfg)
# ----------------- Extraction globale de noms -----------------
def _extract_document_names(full_text: str, cfg: Dict[str, Any]) -> set:
"""Pré-scan du document brut pour extraire les noms de personnes
depuis les champs structurés (Patient, Rédigé par, etc.).
Retourne un ensemble de tokens (mots) à masquer globalement."""
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
names: set = set()
def _add_tokens(match_str: str):
for token in match_str.split():
token = token.strip(" .-'")
if len(token) >= 3 and token.upper() not in wl_sections and token not in wl_phrases:
names.add(token)
for m in RE_EXTRACT_PATIENT.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_REDIGE.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_MME_MR.finditer(full_text):
_add_tokens(m.group(1))
for m in RE_EXTRACT_DR_DEST.finditer(full_text):
_add_tokens(m.group(1))
return names
def _apply_extracted_names(text: str, names: set, audit: List[PiiHit]) -> str:
"""Remplace globalement chaque nom extrait dans le texte."""
placeholder = PLACEHOLDERS["NOM"]
for token in sorted(names, key=len, reverse=True):
pattern = re.compile(rf"\b{re.escape(token)}\b", re.IGNORECASE)
for m in pattern.finditer(text):
# Ne pas remplacer si déjà dans un placeholder
ctx_start = max(0, m.start() - 1)
ctx_end = min(len(text), m.end() + 1)
if "[" in text[ctx_start:m.start()] or "]" in text[m.end():ctx_end]:
continue
audit.append(PiiHit(-1, "NOM_EXTRACTED", m.group(0), placeholder))
text = pattern.sub(placeholder, text)
return text
# ----------------- Anonymisation (regex) -----------------
def anonymise_document_regex(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult:
audit: List[PiiHit] = []
# Phase 0 : extraction globale des noms depuis les champs structurés
full_raw = "\n".join(pages_text) + "\n" + "\n".join(
"\n".join(rows) for rows in tables_lines
)
extracted_names = _extract_document_names(full_raw, cfg)
# Phase 1 : masquage ligne par ligne (regex classiques)
out_pages: List[str] = []
for i, page_txt in enumerate(pages_text):
lines = [ln for ln in (page_txt or "").splitlines()]
masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines]
out_pages.append("\n".join(masked))
table_blocks: List[str] = []
for i, rows in enumerate(tables_lines):
mbuf: List[str] = []
for r in rows:
masked = _kv_value_only_mask(r, audit, i, cfg)
mbuf.append(masked)
if mbuf:
table_blocks.append("\n".join(mbuf))
tables_block = "\n\n".join(table_blocks)
text_out = "\f".join(out_pages) # séparateur de pages
if tables_block.strip():
text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]"
# Phase 2 : application globale des noms extraits (rattrapage)
if extracted_names:
text_out = _apply_extracted_names(text_out, extracted_names, audit)
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit)
# ----------------- NER ONNX sur narratif -----------------
def _mask_with_hf(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
# remplace via regex sur les 'word' détectés (approche pragmatique)
keep_org_gpe = bool((cfg.get("whitelist", {}) or {}).get("org_gpe_keep", True))
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
for e in ents:
w = e.get("word") or ""; grp = (e.get("entity_group") or e.get("entity") or "").upper()
if not w or "[" in w or "]" in w: # ignore placeholders
continue
if len(w) <= 2: # trop court
continue
if grp in {"PER", "PERSON"}:
audit.append(PiiHit(-1, "NER_PER", w, PLACEHOLDERS["NOM"]))
out = repl_once(out, w, PLACEHOLDERS["NOM"])
elif grp in {"ORG"}:
if keep_org_gpe:
continue
audit.append(PiiHit(-1, "NER_ORG", w, PLACEHOLDERS["ETAB"]))
out = repl_once(out, w, PLACEHOLDERS["ETAB"])
elif grp in {"LOC"}:
if keep_org_gpe:
continue
audit.append(PiiHit(-1, "NER_LOC", w, PLACEHOLDERS["VILLE"]))
out = repl_once(out, w, PLACEHOLDERS["VILLE"])
elif grp in {"DATE"}:
# facultatif : si vous masquez déjà les dates via règles, laissez tel quel
continue
return out
def apply_hf_ner_on_narrative(text_out: str, cfg: Dict[str, Any], manager: Optional[NerModelManager], thresholds: Optional[NerThresholds]) -> Tuple[str, List[PiiHit]]:
if manager is None or not manager.is_loaded():
return text_out, []
# isoler [TABLES]
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
tables: List[Tuple[int,int,str]] = []
keep = []
last = 0
cleaned = ""
for m in pattern.finditer(text_out):
cleaned += text_out[last:m.start()]
keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0)))
cleaned += "\x00" * len(m.group(0))
last = m.end()
cleaned += text_out[last:]
# par pages (séparées par \f) → par paragraphes
pages = cleaned.split("\f")
hits: List[PiiHit] = []
rebuilt_pages: List[str] = []
for pg in pages:
paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()]
ents_per_para = manager.infer_paragraphs(paras, thresholds=thresholds)
# remplace entités
idx = 0
buf = []
for para, ents in zip(paras, ents_per_para):
masked = _mask_with_hf(para, ents, cfg, hits)
buf.append(masked)
rebuilt_pages.append("\n\n".join(buf))
rebuilt = "\f".join(rebuilt_pages)
# réinsérer [TABLES]
rebuilt_list = list(rebuilt)
for start, end, payload in keep:
rebuilt_list[start:end] = list(payload)
final = "".join(rebuilt_list)
return final, hits
# ----------------- NER EDS-Pseudo sur narratif -----------------
def _mask_with_eds_pseudo(text: str, ents: List[Dict[str, Any]], cfg: Dict[str, Any], audit: List[PiiHit]) -> str:
"""Masque les entités détectées par EDS-Pseudo en utilisant le mapping eds_mapped_key."""
def repl_once(s: str, old: str, new: str) -> str:
return re.sub(rf"\b{re.escape(old)}\b", new, s)
out = text
for e in ents:
w = e.get("word") or ""
mapped_key = e.get("eds_mapped_key", "")
if not w or "[" in w or "]" in w:
continue
if len(w) <= 2:
continue
placeholder = PLACEHOLDERS.get(mapped_key, PLACEHOLDERS["MASK"])
label = e.get("entity_group", "EDS")
audit.append(PiiHit(-1, f"EDS_{label}", w, placeholder))
out = repl_once(out, w, placeholder)
return out
def apply_eds_pseudo_on_narrative(text_out: str, cfg: Dict[str, Any], manager: "EdsPseudoManager") -> Tuple[str, List[PiiHit]]:
"""Applique EDS-Pseudo sur le narratif (même structure que apply_hf_ner_on_narrative)."""
if manager is None or not manager.is_loaded():
return text_out, []
# isoler [TABLES]
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
keep = []
last = 0
cleaned = ""
for m in pattern.finditer(text_out):
cleaned += text_out[last:m.start()]
keep.append((len(cleaned), len(cleaned) + len(m.group(0)), m.group(0)))
cleaned += "\x00" * len(m.group(0))
last = m.end()
cleaned += text_out[last:]
# par pages → par paragraphes
pages = cleaned.split("\f")
hits: List[PiiHit] = []
rebuilt_pages: List[str] = []
for pg in pages:
paras = [p for p in re.split(r"\n\s*\n", pg) if p.strip()]
ents_per_para = manager.infer_paragraphs(paras)
buf = []
for para, ents in zip(paras, ents_per_para):
masked = _mask_with_eds_pseudo(para, ents, cfg, hits)
buf.append(masked)
rebuilt_pages.append("\n\n".join(buf))
rebuilt = "\f".join(rebuilt_pages)
# réinsérer [TABLES]
rebuilt_list = list(rebuilt)
for start, end, payload in keep:
rebuilt_list[start:end] = list(payload)
final = "".join(rebuilt_list)
return final, hits
# ----------------- Selective safety rescan -----------------
def selective_rescan(text: str, cfg: Dict[str, Any] | None = None) -> str:
"""Rescan de sécurité : re-détecte les PII critiques qui auraient échappé au premier passage."""
# enlève TABLES du scope
def strip_tables(s: str):
kept = []
out = []
i = 0
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
for m in pattern.finditer(s):
out.append(s[i:m.start()])
kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1)))
out.append("\x00" * (m.end() - m.start()))
i = m.end()
out.append(s[i:])
return "".join(out), kept
protected, kept = strip_tables(text)
# PII critiques (comme avant)
protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
# NIR avec validation
def _rescan_nir(m: re.Match) -> str:
return PLACEHOLDERS["NIR"] if validate_nir(m.group(0)) else m.group(0)
protected = RE_NIR.sub(_rescan_nir, protected)
# Nouvelles regex : dates de naissance, dates, adresses, codes postaux
protected = RE_DATE_NAISSANCE.sub(PLACEHOLDERS["DATE_NAISSANCE"], protected)
protected = RE_DATE.sub(PLACEHOLDERS["DATE"], protected)
protected = RE_ADRESSE.sub(PLACEHOLDERS["ADRESSE"], protected)
protected = RE_CODE_POSTAL.sub(PLACEHOLDERS["CODE_POSTAL"], protected)
# Personnes contextuelles (avec whitelist)
wl_sections = set()
wl_phrases = set()
if cfg:
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
def _rescan_person(m: re.Match) -> str:
span = m.group(1).strip(); raw = m.group(0)
if span in wl_sections or raw in wl_phrases:
return raw
tokens = [t for t in span.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3:
return raw
return raw.replace(span, PLACEHOLDERS["NOM"])
protected = RE_PERSON_CONTEXT.sub(_rescan_person, protected)
res = list(protected)
for start, end, payload in kept:
res[start:end] = list(payload)
return "".join(res)
# ----------------- PDF Redaction -----------------
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
# index hits par page; page==-1 → rechercher sur toutes pages
by_page: Dict[int, List[PiiHit]] = {}
for h in audit:
by_page.setdefault(h.page, []).append(h)
for pno in range(len(doc)):
page = doc[pno]
hits = by_page.get(pno, []) + by_page.get(-1, [])
if not hits:
continue
for h in hits:
token = h.original.strip()
if not token:
continue
rects = page.search_for(token)
if not rects and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
if compact != token:
rects = page.search_for(compact)
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
try:
page.apply_redactions()
except Exception:
pass
doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False)
doc.close()
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF non disponible installez pymupdf.")
doc = fitz.open(str(original_pdf)); out = fitz.open()
all_rects: Dict[int, List["fitz.Rect"]] = {}
for pno in range(len(doc)):
page = doc[pno]
rects = []
hits = [x for x in audit if x.page in {pno, -1}]
for h in hits:
token = h.original.strip()
if not token: continue
found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
found = page.search_for(compact)
rects.extend(found)
all_rects[pno] = rects
for pno in range(len(doc)):
src = doc[pno]; rect = src.rect
zoom = dpi / 72.0; mat = fitz.Matrix(zoom, zoom)
pix = src.get_pixmap(matrix=mat, annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
for r in all_rects.get(pno, []):
draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0))
buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0)
dst = out.new_page(width=rect.width, height=rect.height)
dst.insert_image(rect, stream=buf.getvalue())
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close(); doc.close()
# ----------------- Orchestration -----------------
def process_pdf(
pdf_path: Path,
out_dir: Path,
make_vector_redaction: bool = True,
also_make_raster_burn: bool = False,
config_path: Optional[Path] = None,
use_hf: bool = False,
ner_manager=None,
ner_thresholds=None,
) -> Dict[str, str]:
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
pages_text, tables_lines, ocr_used = extract_text_with_fallback_ocr(pdf_path)
# 1) Regex rules
anon = anonymise_document_regex(pages_text, tables_lines, cfg)
# 2) NER (optionnel) — sur le narratif
final_text = anon.text_out
hf_hits: List[PiiHit] = []
if use_hf and ner_manager is not None and ner_manager.is_loaded():
# Détecter le type de manager et appeler la bonne fonction
if EdsPseudoManager is not None and isinstance(ner_manager, EdsPseudoManager):
final_text, hf_hits = apply_eds_pseudo_on_narrative(final_text, cfg, ner_manager)
else:
final_text, hf_hits = apply_hf_ner_on_narrative(final_text, cfg, ner_manager, ner_thresholds)
anon.audit.extend(hf_hits)
# 3) Rescan selectif
final_text = selective_rescan(final_text, cfg=cfg)
# Log OCR dans l'audit
if ocr_used:
anon.audit.insert(0, PiiHit(page=-1, kind="OCR_USED", original="docTR", placeholder=""))
# Sauvegardes
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"
audit_path = out_dir / f"{base}.audit.jsonl"
txt_path.write_text(final_text, encoding="utf-8")
with audit_path.open("w", encoding="utf-8") as f:
for hit in anon.audit:
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
outputs = {"text": str(txt_path), "audit": str(audit_path)}
# PDFs
if make_vector_redaction and fitz is not None:
vec_path = out_dir / f"{base}.redacted_vector.pdf"
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path)
outputs["pdf_vector"] = str(vec_path)
except Exception:
pass
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path)
outputs["pdf_raster"] = str(ras_path)
return outputs
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="Anonymiser PDF (regex + NER ONNX optionnel)")
ap.add_argument("pdf", type=str)
ap.add_argument("--out", type=str, default="out")
ap.add_argument("--no-vector", action="store_true")
ap.add_argument("--raster", action="store_true")
ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml")))
ap.add_argument("--hf", action="store_true", help="Activer NER ONNX sur narratif (nécessite ner_manager_onnx)")
ap.add_argument("--model", type=str, default="cmarkea/distilcamembert-base-ner")
args = ap.parse_args()
manager = None
if args.hf and NerModelManager is not None:
manager = NerModelManager(cache_dir=Path("models"))
manager.load(args.model)
outs = process_pdf(
Path(args.pdf),
Path(args.out),
make_vector_redaction=not args.no_vector,
also_make_raster_burn=args.raster,
config_path=Path(args.config),
use_hf=bool(args.hf),
ner_manager=manager,
ner_thresholds=NerThresholds() if NerThresholds else None,
)
print(json.dumps(outs, indent=2, ensure_ascii=False))