Files
anonymisation/anonymizer_core_refactored.py
Domi31tls 8339069c83 Initial commit — Pseudonymisation de PDF v5
- GUI v5 : vue unique épurée (tkinter), 2 étapes visuelles
- Core ONNX : anonymisation regex + NER optionnel
- Extraction globale des noms depuis champs structurés
  (Patient, Rédigé par, MME/Madame, DR)
- Génération simultanée PDF Image + PDF Anonymisé (structure préservée)
- Build Windows via Nuitka (script batch + GitHub Actions CI)
- install.sh pour setup/run Linux

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:03:37 +01:00

423 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==========================
# FILE 1/2 — anonymizer_core_refactored.py (FIXED)
# ==========================
from __future__ import annotations
import io
import json
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Dict, Tuple, Optional, Any
import pdfplumber
from pdfminer.high_level import extract_text as pdfminer_extract_text
from pdfminer.layout import LAParams
from PIL import Image, ImageDraw
# Optional deps
try:
import fitz # PyMuPDF
except Exception:
fitz = None
try:
import yaml # PyYAML for dictionaries
except Exception:
yaml = None
# ----------------- Defaults & Config -----------------
DEFAULTS_CFG = {
"version": 1,
"encoding": "utf-8",
"normalization": "NFKC",
"whitelist": {
"sections_titres": ["DIM", "GHM", "GHS", "RUM", "COMPTE", "RENDU", "DIAGNOSTIC"],
"noms_maj_excepts": ["Médecin DIM", "Praticien conseil"],
"org_gpe_keep": True,
},
"blacklist": {
"force_mask_terms": [],
"force_mask_regex": [],
},
"kv_labels_preserve": ["FINESS", "IPP", "N° OGC", "Etablissement"],
"regex_overrides": [
{
"name": "OGC_court",
"pattern": r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,3})\b",
"placeholder": "[OGC]",
"flags": ["IGNORECASE"],
}
],
"flags": {
"case_insensitive": True,
"unicode_word_boundaries": True,
"regex_engine": "python",
},
}
PLACEHOLDERS = {
"EMAIL": "[EMAIL]",
"TEL": "[TEL]",
"IBAN": "[IBAN]",
"NIR": "[NIR]",
"IPP": "[IPP]",
"FINESS": "[FINESS]",
"OGC": "[OGC]",
"NOM": "[NOM]",
"VILLE": "[VILLE]",
"ETAB": "[ETABLISSEMENT]",
"MASK": "[MASK]",
}
CRITICAL_PII_KEYS = {"EMAIL", "TEL", "IBAN", "NIR", "IPP"}
# Baseline regex
RE_EMAIL = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
RE_TEL = re.compile(r"(?<!\d)(?:\+33\s?|0)\d(?:[ .-]?\d){8}(?!\d)")
RE_IBAN = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
RE_IPP = re.compile(r"\bIPP\s*[:\-]?\s*([A-Za-z0-9]{6,})\b", re.IGNORECASE)
RE_FINESS = re.compile(r"\bFINESS\s*[:\-]?\s*(\d{9})\b", re.IGNORECASE)
RE_OGC = re.compile(r"\b(?:N°\s*)?OGC\s*[:\-]?\s*([A-Za-z0-9\-]{1,})\b", re.IGNORECASE) # élargi
RE_NIR = re.compile(r"\b(\d{13})\s*([0-9]{2})\b")
RE_PERSON_CONTEXT = re.compile(
r"(?:(?:Dr\.?|Docteur|Mme|M\.|Monsieur|Nom\s*:\s*|Praticien|Médecin)\s*)([A-ZÉÈÀÙÂÊÎÔÛÄËÏÖÜÇ\-\' ]{2,})"
)
SPLITTER = re.compile(r"\s*[:|;\t]\s*")
@dataclass
class PiiHit:
page: int
kind: str
original: str
placeholder: str
bbox_hint: Optional[Tuple[float, float, float, float]] = None
@dataclass
class AnonResult:
text_out: str
tables_block: str
audit: List[PiiHit] = field(default_factory=list)
# ----------------- Config loader -----------------
def load_dictionaries(config_path: Optional[Path]) -> Dict[str, Any]:
cfg = DEFAULTS_CFG.copy()
if config_path and config_path.exists() and yaml is not None:
try:
user = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
# shallow-merge for top-level keys
for k, v in user.items():
cfg[k] = v
except Exception:
pass
return cfg
# ----------------- Extraction -----------------
def extract_text_two_passes(pdf_path: Path):
pages_text: List[str] = []
tables_lines: List[List[str]] = []
with pdfplumber.open(pdf_path) as pdf:
for p in pdf.pages:
t = p.extract_text(x_tolerance=2.5, y_tolerance=4.0) or ""
pages_text.append(t)
rows: List[str] = []
try:
tables = p.extract_tables()
for tbl in tables or []:
for row in tbl:
clean = [c if c is not None else "" for c in row]
rows.append("\t".join(clean).strip())
except Exception:
pass
tables_lines.append(rows)
total_chars = sum(len(x or "") for x in pages_text)
if total_chars < 500:
text_all = pdfminer_extract_text(
str(pdf_path),
laparams=LAParams(char_margin=2.0, word_margin=0.1, line_margin=0.8, boxes_flow=0.5),
)
pages_text = [x for x in text_all.split("\f") if x]
return pages_text, tables_lines
# ----------------- Helpers (with dictionaries) -----------------
def _compile_user_regex(pattern: str, flags_list: List[str]):
flags = 0
for f in flags_list or []:
if f.upper() == "IGNORECASE":
flags |= re.IGNORECASE
if f.upper() == "MULTILINE":
flags |= re.MULTILINE
if f.upper() == "DOTALL":
flags |= re.DOTALL
return re.compile(pattern, flags)
def _apply_overrides(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
for ov in cfg.get("regex_overrides", []) or []:
pattern = ov.get("pattern"); placeholder = ov.get("placeholder", PLACEHOLDERS["MASK"]) ; name = ov.get("name", "override")
flags_list = ov.get("flags", [])
try:
rx = _compile_user_regex(pattern, flags_list)
except Exception:
continue
def _rep(m: re.Match):
audit.append(PiiHit(page_idx, name, m.group(0), placeholder))
return placeholder
line = rx.sub(_rep, line)
# force-mask literals
for term in (cfg.get("blacklist", {}).get("force_mask_terms", []) or []):
if not term:
continue
word_rx = re.compile(rf"\b{re.escape(term)}\b", re.IGNORECASE)
if word_rx.search(line):
audit.append(PiiHit(page_idx, "force_term", term, PLACEHOLDERS["MASK"]))
line = word_rx.sub(PLACEHOLDERS["MASK"], line)
# force-mask regex
for pat in (cfg.get("blacklist", {}).get("force_mask_regex", []) or []):
try:
rx = re.compile(pat, re.IGNORECASE)
except Exception:
continue
if rx.search(line):
audit.append(PiiHit(page_idx, "force_regex", pat, PLACEHOLDERS["MASK"]))
line = rx.sub(PLACEHOLDERS["MASK"], line)
return line
def _mask_admin_label(line: str, audit: List[PiiHit], page_idx: int) -> str:
m = RE_FINESS.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "FINESS", val, PLACEHOLDERS["FINESS"]))
return RE_FINESS.sub(lambda _: f"FINESS : {PLACEHOLDERS['FINESS']}", line)
m = RE_OGC.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "OGC", val, PLACEHOLDERS["OGC"]))
return RE_OGC.sub(lambda _: f"N° OGC : {PLACEHOLDERS['OGC']}", line)
m = RE_IPP.search(line)
if m:
val = m.group(1); audit.append(PiiHit(page_idx, "IPP", val, PLACEHOLDERS["IPP"]))
return RE_IPP.sub(lambda _: f"IPP : {PLACEHOLDERS['IPP']}", line)
return line
def _mask_line_by_regex(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
# Apply user overrides & force-masks first
line = _apply_overrides(line, audit, page_idx, cfg)
# EMAIL
def _repl_email(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "EMAIL", m.group(0), PLACEHOLDERS["EMAIL"]))
return PLACEHOLDERS["EMAIL"]
line = RE_EMAIL.sub(_repl_email, line)
# TEL
def _repl_tel(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "TEL", m.group(0), PLACEHOLDERS["TEL"]))
return PLACEHOLDERS["TEL"]
line = RE_TEL.sub(_repl_tel, line)
# IBAN
def _repl_iban(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "IBAN", m.group(0), PLACEHOLDERS["IBAN"]))
return PLACEHOLDERS["IBAN"]
line = RE_IBAN.sub(_repl_iban, line)
# NIR
def _repl_nir(m: re.Match) -> str:
audit.append(PiiHit(page_idx, "NIR", m.group(0), PLACEHOLDERS["NIR"]))
return PLACEHOLDERS["NIR"]
line = RE_NIR.sub(_repl_nir, line)
# PERSON uppercase with context, but with whitelist/short-token guards
wl_sections = set((cfg.get("whitelist", {}) or {}).get("sections_titres", []) or [])
wl_phrases = set((cfg.get("whitelist", {}) or {}).get("noms_maj_excepts", []) or [])
def _repl_person_ctx(m: re.Match) -> str:
span = m.group(1).strip()
raw = m.group(0)
if span in wl_sections or raw in wl_phrases:
return raw
tokens = [t for t in span.split() if t]
if len(tokens) == 1 and len(tokens[0]) <= 3:
return raw # acronym short (DIM/DR/DP...)
# Otherwise mask
audit.append(PiiHit(page_idx, "NOM", span, PLACEHOLDERS["NOM"]))
return raw.replace(span, PLACEHOLDERS["NOM"]) # keep prefix (Dr/Mme/etc.)
line = RE_PERSON_CONTEXT.sub(_repl_person_ctx, line)
return line
def _kv_value_only_mask(line: str, audit: List[PiiHit], page_idx: int, cfg: Dict[str, Any]) -> str:
line = _mask_admin_label(line, audit, page_idx)
parts = SPLITTER.split(line, maxsplit=1)
if len(parts) == 2:
key, value = parts
masked_val = _mask_line_by_regex(value, audit, page_idx, cfg)
return f"{key.strip()} : {masked_val.strip()}"
else:
return _mask_line_by_regex(line, audit, page_idx, cfg)
# ----------------- Anonymisation -----------------
def anonymise_document(pages_text: List[str], tables_lines: List[List[str]], cfg: Dict[str, Any]) -> AnonResult:
audit: List[PiiHit] = []
out_pages: List[str] = []
for i, page_txt in enumerate(pages_text):
lines = [ln for ln in (page_txt or "").splitlines()]
masked = [_kv_value_only_mask(ln, audit, i, cfg) for ln in lines]
out_pages.append("\n".join(masked))
table_blocks: List[str] = []
for i, rows in enumerate(tables_lines):
mbuf: List[str] = []
for r in rows:
masked = _kv_value_only_mask(r, audit, i, cfg)
mbuf.append(masked)
if mbuf:
table_blocks.append("\n".join(mbuf))
tables_block = "\n\n".join(table_blocks)
text_out = "\n\n".join(out_pages)
if tables_block.strip():
text_out += "\n\n[TABLES]\n" + tables_block + "\n[/TABLES]"
return AnonResult(text_out=text_out, tables_block=tables_block, audit=audit)
# ----------------- Selective safety rescan -----------------
def selective_rescan(text: str) -> str:
# remove TABLES from scope
def strip_tables(s: str):
kept = []
out = []
i = 0
pattern = re.compile(r"\[TABLES\](.*?)\[/TABLES\]", re.DOTALL)
for m in pattern.finditer(s):
out.append(s[i:m.start()])
kept.append((len("".join(out)), len("".join(out)) + len(m.group(1)), m.group(1)))
out.append("\x00" * (m.end() - m.start()))
i = m.end()
out.append(s[i:])
return "".join(out), kept
protected, kept = strip_tables(text)
protected = RE_EMAIL.sub(PLACEHOLDERS["EMAIL"], protected)
protected = RE_TEL.sub(PLACEHOLDERS["TEL"], protected)
protected = RE_IBAN.sub(PLACEHOLDERS["IBAN"], protected)
protected = RE_NIR.sub(PLACEHOLDERS["NIR"], protected)
res = list(protected)
for start, end, payload in kept:
res[start:end] = list(payload)
return "".join(res)
# ----------------- PDF Redaction -----------------
def redact_pdf_vector(original_pdf: Path, audit: List[PiiHit], out_pdf: Path) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF not disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
by_page: Dict[int, List[PiiHit]] = {}
for h in audit:
by_page.setdefault(h.page, []).append(h)
for pno, hits in by_page.items():
if pno >= len(doc):
continue
page = doc[pno]
for h in hits:
token = h.original.strip()
if not token:
continue
rects = page.search_for(token)
if not rects and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
if compact != token:
rects = page.search_for(compact)
for r in rects:
page.add_redact_annot(r, fill=(0,0,0))
try:
page.apply_redactions()
except Exception:
pass
doc.save(str(out_pdf), deflate=True, garbage=4, clean=True, incremental=False)
doc.close()
def redact_pdf_raster(original_pdf: Path, audit: List[PiiHit], out_pdf: Path, dpi: int = 300) -> None:
if fitz is None:
raise RuntimeError("PyMuPDF not disponible installez pymupdf.")
doc = fitz.open(str(original_pdf))
out = fitz.open()
# search rects per page
all_rects: Dict[int, List["fitz.Rect"]] = {}
for pno in range(len(doc)):
page = doc[pno]
rects = []
for h in [x for x in audit if x.page == pno]:
token = h.original.strip()
if not token:
continue
found = page.search_for(token)
if not found and h.kind in {"NIR", "IBAN", "TEL"}:
compact = re.sub(r"\s+", "", token)
found = page.search_for(compact)
rects.extend(found)
all_rects[pno] = rects
# render + compose
for pno in range(len(doc)):
src_page = doc[pno]
page_rect = src_page.rect
zoom = dpi / 72.0
mat = fitz.Matrix(zoom, zoom)
pix = src_page.get_pixmap(matrix=mat, annots=False)
img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)
draw = ImageDraw.Draw(img)
for r in all_rects.get(pno, []):
draw.rectangle([r.x0 * zoom, r.y0 * zoom, r.x1 * zoom, r.y1 * zoom], fill=(0, 0, 0))
buf = io.BytesIO(); img.save(buf, format="PNG"); buf.seek(0)
dst_page = out.new_page(width=page_rect.width, height=page_rect.height)
dst_page.insert_image(page_rect, stream=buf.getvalue())
out.save(str(out_pdf), deflate=True, garbage=4, clean=True)
out.close(); doc.close()
# ----------------- Orchestration -----------------
def process_pdf(pdf_path: Path, out_dir: Path, make_vector_redaction: bool = True, also_make_raster_burn: bool = False, config_path: Optional[Path] = None) -> Dict[str, str]:
out_dir.mkdir(parents=True, exist_ok=True)
cfg = load_dictionaries(config_path)
pages_text, tables_lines = extract_text_two_passes(pdf_path)
anon = anonymise_document(pages_text, tables_lines, cfg)
final_text = selective_rescan(anon.text_out)
base = pdf_path.stem
txt_path = out_dir / f"{base}.pseudonymise.txt"
audit_path = out_dir / f"{base}.audit.jsonl"
txt_path.write_text(final_text, encoding="utf-8")
with audit_path.open("w", encoding="utf-8") as f:
for hit in anon.audit:
f.write(json.dumps(hit.__dict__, ensure_ascii=False) + "\n")
outputs = {"text": str(txt_path), "audit": str(audit_path)}
if make_vector_redaction and fitz is not None:
vec_path = out_dir / f"{base}.redacted_vector.pdf"
try:
redact_pdf_vector(pdf_path, anon.audit, vec_path)
outputs["pdf_vector"] = str(vec_path)
except Exception:
pass
if also_make_raster_burn and fitz is not None:
ras_path = out_dir / f"{base}.redacted_raster.pdf"
redact_pdf_raster(pdf_path, anon.audit, ras_path)
outputs["pdf_raster"] = str(ras_path)
return outputs
if __name__ == "__main__":
import argparse
ap = argparse.ArgumentParser(description="Anonymiser PDF avec dictionnaires YAML + PDF redactions")
ap.add_argument("pdf", type=str)
ap.add_argument("--out", type=str, default="out")
ap.add_argument("--no-vector", action="store_true")
ap.add_argument("--raster", action="store_true")
ap.add_argument("--config", type=str, default=str(Path("config/dictionnaires.yml")))
args = ap.parse_args()
outs = process_pdf(Path(args.pdf), Path(args.out), make_vector_redaction=not args.no_vector, also_make_raster_burn=args.raster, config_path=Path(args.config))
print(json.dumps(outs, indent=2, ensure_ascii=False))