Initial commit — Pseudonymisation de PDF v5

- GUI v5 : vue unique épurée (tkinter), 2 étapes visuelles
- Core ONNX : anonymisation regex + NER optionnel
- Extraction globale des noms depuis champs structurés
  (Patient, Rédigé par, MME/Madame, DR)
- Génération simultanée PDF Image + PDF Anonymisé (structure préservée)
- Build Windows via Nuitka (script batch + GitHub Actions CI)
- install.sh pour setup/run Linux

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-16 15:03:37 +01:00
commit 8339069c83
18 changed files with 5127 additions and 0 deletions

View File

@@ -0,0 +1,422 @@
# ==========================
# FILE 1/2 — anonymizer_core_refactored.py (FIXED)
# ==========================
from __future__ import annotations
import io
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text
from pdfminer.layout import LAParams
from PIL import Image, ImageDraw
# Optional deps
try:
import fitz # PyMuPDF
except Exception:
fitz = None
try:
import yaml # PyYAML for dictionaries
except Exception:
yaml = None
# ----------------- Defaults & Config -----------------
DEFAULTS_CFG = {
"version": 1,
"encoding": "utf-8",
"normalization": "NFKC",
"whitelist": {
"sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"],
"noms_maj_excepts": ["Médecin DIM", "Praticien conseil"],
"org_gpe_keep": True,
},
"blacklist": {
"force_mask_terms": [],
"force_mask_regex": [],
},
"kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"],
"regex_overrides": [
{
"name": "OGC_court",
"pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b",
"placeholder": "[OGC]",
"flags": ["IGNORECASE"],
}
],
"flags": {
"case_insensitive": True,
"unicode_word_boundaries": True,
"regex_engine": "python",
},
}
PLACEHOLDERS = {
"EMAIL": "[EMAIL]",
"TEL": "[TEL]",
"IBAN": "[IBAN]",
"NIR": "[NIR]",
"IPP": "[IPP]",
"FINESS": "[FINESS]",
"OGC": "[OGC]",
"NOM": "[NOM]",
"VILLE": "[VILLE]",
"ETAB": "[ETABLISSEMENT]",
"MASK": "[MASK]",
}
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP"}
# Baseline regex
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .-]?\d){8}(?!\d)")
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE) # élargi
RE_NIR = re.compile(r"\b(\d{13})\s*([0-9]{2})\b")
RE_PERSON_CONTEXT = re.compile(
r"(?:(?:Dr\.?|Docteur|Mme|M\.|Monsieur|Nom\s*:\s*|Praticien|Médecin)\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ\-\' ]{2,})"
)
SPLITTER = re.compile(r"\s*[:|;\t]\s*")
@dataclass
class PiiHit:
page: int
kind: str
original: str
placeholder: str
bbox_hint: Optional[Tuple[float, float, float, float]] = None
@dataclass
class AnonResult:
text_out: str
tables_block: str
audit: List[PiiHit] = field(default_factory=list)
# ----------------- Config loader -----------------
def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
cfg = DEFAULTS_CFG.copy()
if config_path and config_path.exists() and yaml is not None:
try:
user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
# shallow-merge for top-level keys
for k, v in user.items():
cfg[k] = v
except Exception:
pass
return cfg
# ----------------- Extraction -----------------
def extract_text_two_passes(pdf_path: Path):
pages_text: List[str] = []
tables_lines: List[List[str]] = []
with pdfplumber.open(pdf_path) as pdf:
for p in pdf.pages:
t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or ""
pages_text.append(t)
rows: List[str] = []
try:
tables = p.extract_tables()
for tbl in tables or []:
for row in tbl:
clean = [c if c is not None else "" for c in row]
rows.append("\t".join(clean).strip())
except Exception:
pass
tables_lines.append(rows)
total_chars = sum(len(x or "") for x in pages_text)
if total_chars < 500:
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
pages_text = [x for x in text_all.split("\f") if x]
return pages_text, tables_lines
# ----------------- Helpers (with dictionaries) -----------------
def _compile_user_regex(pattern: str, flags_list: List[str]):
flags = 0
for f in flags_list or []:
if f.upper() == "IGNORECASE":
flags |= re.IGNORECASE
if f.upper() == "MULTILINE":
flags |= re.MULTILINE
if f.upper() == "DOTALL":
flags |= re.DOTALL
return re.compile(pattern, flags)
def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
for ov in cfg.get("regex_overrides", []) or []:
pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override")
flags_list = ov.get("flags", [])
try:
rx = _compile_user_regex(pattern, flags_list)
except Exception:
continue
def _rep(m: re.Match):
audit.append(PiiHit(page_idx, name, m.group(0), placeholder))
return placeholder
line = rx.sub(_rep, line)
# force-mask literals
for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []):
if not term:
continue
word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE)
if word_rx.search(line):
audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"]))
line = word_rx.sub(PLACEHOLDERS["MASK"], line)
# force-mask regex
for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []):
try:
rx = re.compile(pat, re.IGNORECASE)
except Exception:
continue
if rx.search(line):
audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"]))
line = rx.sub(PLACEHOLDERS["MASK"], line)
return line
def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str:
m = RE_FINESS.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line)
m = RE_OGC.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"]))
return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line)
m = RE_IPP.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"]))
return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line)
return line
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
# Apply user overrides & force-masks first
line = _apply_overrides(line, audit, page_idx, cfg)
# EMAIL
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
line = RE_EMAIL.sub(_repl_email, line)
# TEL
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
line = RE_TEL.sub(_repl_tel, line)
# IBAN
def _repl_iban(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"]))
return PLACEHOLDERS["IBAN"]
line = RE_IBAN.sub(_repl_iban, line)
# NIR
def _repl_nir(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "NIR", m.group(0), PLACEHOLDERS["NIR"]))
return PLACEHOLDERS["NIR"]
line = RE_NIR.sub(_repl_nir, line)
# PERSON uppercase with context, but with whitelist/short-token guards
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
def _repl_person_ctx(m: re.Match) -> str:
span = m.group(1).strip()
raw = m.group(0)
if span in wl_sections or raw in wl_phrases:
return raw
tokens = [t for t in span.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3:
return raw # acronym short (DIM/DR/DP...)
# Otherwise mask
audit.append(PiiHit(page_idx, "NOM", span, PLACEHOLDERS["NOM"]))
return raw.replace(span, PLACEHOLDERS["NOM"]) # keep prefix (Dr/Mme/etc.)
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
return line
def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
line = _mask_admin_label(line, audit, page_idx)
parts = SPLITTER.split(line, maxsplit=1)
if len(parts) == 2:
key, value = parts
masked_val = _mask_line_by_regex(value, audit, page_idx, cfg)
return f"{key.strip()} : {masked_val.strip()}"
else:
return _mask_line_by_regex(line, audit, page_idx, cfg)
# ----------------- Anonymisation -----------------
def anonymise_document(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult:
audit: List[PiiHit] = []
out_pages: List[str] = []
for i, page_txt in enumerate(pages_text):
lines = [ln for ln in (page_txt or "").splitlines()]
masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines]
out_pages.append("\n".join(masked))
table_blocks: List[str] = []
for i, rows in enumerate(tables_lines):
mbuf: List[str] = []
for r in rows:
masked = _kv_value_only_mask(r, audit, i, cfg)
mbuf.append(masked)
if mbuf:
table_blocks.append("\n".join(mbuf))
tables_block = "\n\n".join(table_blocks)
text_out = "\n\n".join(out_pages)
if tables_block.strip():
text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]"
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit)
# ----------------- Selective safety rescan -----------------
def selective_rescan(text: str) -> str:
# remove TABLES from scope
def strip_tables(s: str):
kept = []
out = []
i = 0
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
for m in pattern.finditer(s):
out.append(s[i:m.start()])
kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1)))
out.append("\x00" * (m.end() - m.start()))
i = m.end()
out.append(s[i:])
return "".join(out), kept
protected, kept = strip_tables(text)
protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
protected = RE_NIR.sub(PLACEHOLDERS["NIR"], protected)
res = list(protected)
for start, end, payload in kept:
res[start:end] = list(payload)
return "".join(res)
# ----------------- PDF Redaction -----------------
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF not disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
by_page: Dict[int, List[PiiHit]] = {}
for h in audit:
by_page.setdefault(h.page, []).append(h)
for pno, hits in by_page.items():
if pno >= len(doc):
continue
page = doc[pno]
for h in hits:
token = h.original.strip()
if not token:
continue
rects = page.search_for(token)
if not rects and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
if compact != token:
rects = page.search_for(compact)
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
try:
page.apply_redactions()
except Exception:
pass
doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False)
doc.close()
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF not disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
out = fitz.open()
# search rects per page
all_rects: Dict[int, List["fitz.Rect"]] = {}
for pno in range(len(doc)):
page = doc[pno]
rects = []
for h in [x for x in audit if x.page == pno]:
token = h.original.strip()
if not token:
continue
found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
found = page.search_for(compact)
rects.extend(found)
all_rects[pno] = rects
# render + compose
for pno in range(len(doc)):
src_page = doc[pno]
page_rect = src_page.rect
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = src_page.get_pixmap(matrix=mat, annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
for r in all_rects.get(pno, []):
draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0))
buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0)
dst_page = out.new_page(width=page_rect.width, height=page_rect.height)
dst_page.insert_image(page_rect, stream=buf.getvalue())
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close(); doc.close()
# ----------------- Orchestration -----------------
def process_pdf(pdf_path: Path, out_dir: Path, make_vector_redaction: bool = True, also_make_raster_burn: bool = False, config_path: Optional[Path] = None) -> Dict[str, str]:
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
pages_text, tables_lines = extract_text_two_passes(pdf_path)
anon = anonymise_document(pages_text, tables_lines, cfg)
final_text = selective_rescan(anon.text_out)
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"
audit_path = out_dir / f"{base}.audit.jsonl"
txt_path.write_text(final_text, encoding="utf-8")
with audit_path.open("w", encoding="utf-8") as f:
for hit in anon.audit:
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
outputs = {"text": str(txt_path), "audit": str(audit_path)}
if make_vector_redaction and fitz is not None:
vec_path = out_dir / f"{base}.redacted_vector.pdf"
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path)
outputs["pdf_vector"] = str(vec_path)
except Exception:
pass
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path)
outputs["pdf_raster"] = str(ras_path)
return outputs
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="Anonymiser PDF avec dictionnaires YAML + PDF redactions")
ap.add_argument("pdf", type=str)
ap.add_argument("--out", type=str, default="out")
ap.add_argument("--no-vector", action="store_true")
ap.add_argument("--raster", action="store_true")
ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml")))
args = ap.parse_args()
outs = process_pdf(Path(args.pdf), Path(args.out), make_vector_redaction=not args.no_vector, also_make_raster_burn=args.raster, config_path=Path(args.config))
print(json.dumps(outs, indent=2, ensure_ascii=False))