Files
anonymisation/pseudonymisation_pipeline_robuste.py
Domi31tls 8339069c83 Initial commit — Pseudonymisation de PDF v5
- GUI v5 : vue unique épurée (tkinter), 2 étapes visuelles
- Core ONNX : anonymisation regex + NER optionnel
- Extraction globale des noms depuis champs structurés
  (Patient, Rédigé par, MME/Madame, DR)
- Génération simultanée PDF Image + PDF Anonymisé (structure préservée)
- Build Windows via Nuitka (script batch + GitHub Actions CI)
- install.sh pour setup/run Linux

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-16 15:03:37 +01:00

628 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from __future__ import annotations
import os, re, sys, json, queue, hashlib, warnings, threading, subprocess, unicodedata
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import List, Tuple, Optional, Dict
from datetime import datetime, timedelta
# GUI
import tkinter as tk
from tkinter import filedialog, messagebox, ttk
# Core
import pdfplumber
import requests
import spacy
from spacy.util import load_model_from_path
try:
import yaml
except Exception:
yaml = None
APP_TITLE = "Pseudonymisation (Robuste + Backbones)"
MODEL_DIR_NAME = "fr_core_news_lg"
# ----------- Utilitaires & Unicode -----------
def resolve_base_dir() -> Path:
return Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
def sha256(s: str) -> str:
h = hashlib.sha256(); h.update(s.encode("utf-8", errors="ignore")); return h.hexdigest()
def normalize_text(s: str) -> str:
if not s: return ""
s = unicodedata.normalize("NFKC", s)
s = s.replace("","fi").replace("","fl")
s = s.replace("","\"").replace("","\"").replace("","'").replace("«","\"").replace("»","\"")
s = s.replace("\u00A0"," ")
s = re.sub(r"[\u0000-\u001f]", " ", s)
s = re.sub(r"\s+", " ", s).strip()
return s
def find_model_dir(root: Path) -> Optional[Path]:
if (root / "config.cfg").exists() and (root / "meta.json").exists():
return root
for p in root.rglob("config.cfg"):
if (p.parent / "meta.json").exists():
return p.parent
return None
# ----------- Règles & Whitelist -----------
DEFAULT_WHITELIST = {
"PMSI","T2A","GHM","GHS","DP","DR","DAS","RUM","UM","UF","CMA","CMD","CIM","CIM-10","CCAM","NGAP","NABM","ICD","ICD-10",
"CHU","CH","CLCC","SSR","USI","USC","USLD","UHCD","SAU","UCA","HDJ","HAD","EHPAD","CMP","SMUR","SAMU","DIM",
"IRM","TDM","TEP","RX","ETT","ETO","ECG","EEG","EMG","EFR","BHC",
"NFS","CRP","VS","HB","HT","TSH","T3","T4","ASAT","ALAT","GGT","LDH","BNP","NTPROBNP","DFG","INR","PAO2","PACO2","SPO2","TA","FC","IMC","BMI",
"IGS2","SAPS2","APACHE","SOFA","NEWS","HAS","ARS",
"FINESS","OGC",
}
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
PHONE_RE = re.compile(r"(?:\+33|0)[1-9](?:[ .-]?\d{2}){4}\b")
IPP_RE = re.compile(r"\bIPP[: ]?\d{6,10}\b", re.IGNORECASE)
IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
NIR_RAW_RE = re.compile(r"\b(\d{13})(\d{2})\b")
FINESS_LINE_RE = re.compile(r"\bFINESS\s*:\s*\d{9}\b", re.IGNORECASE)
OGC_LINE_RE = re.compile(r"N[°º]?\s*OGC\s*:\s*\d+", re.IGNORECASE)
ETAB_LINE_RE = re.compile(r"Etablissement\s*:\s*.*", re.IGNORECASE)
PRATICIEN_LINE_RE = re.compile(r"Nom du praticien[- ]conseil\s*:\s*.*", re.IGNORECASE)
DIM_LINE_RE = re.compile(r"Nom du m[ée]decin du DIM\s*:\s*.*", re.IGNORECASE)
DR_MAJ_RE = re.compile(r"Dr\s+[A-ZÀ-Ü' \-]{2,}")
NOMS_MAJ_RE = re.compile(r"(?<![A-Z])(?:[A-ZÀ-Ü’\-]{2,}\s+){1,}[A-ZÀ-Ü’\-]{2,}")
DATE_PATTERNS = [
(re.compile(r"\b(\d{2})/(\d{2})/(\d{4})\b"), "%d/%m/%Y"),
(re.compile(r"\b(\d{4})-(\d{2})-(\d{2})\b"), "%Y-%m-%d"),
]
DEFAULT_KEEP_FIELDS = ["Etablissement", "FINESS", "N° OGC", "Dates de séjour", "Service", "RUM", "UM"]
def nir_is_valid(nir13: str, cle2: str) -> bool:
try:
n = int(nir13); k = int(cle2)
return (97 - (n % 97)) == k
except Exception:
return False
# ----------- Modèle avancé HF (cascade) -----------
MODEL_PRESETS = {
"CamemBERT NER (Jean-Baptiste)": "Jean-Baptiste/camembert-ner", # NER prêt à l'emploi
"CamemBERT-bio (base LM)": "almanach/camembert-base-bio", # base LM, pas NER -> pour tests / remplacez par un NER biomédical si vous en avez un
"DrBERT (base LM)": "Dr-BERT/DrBERT-7GB", # base LM, pas NER -> idem
}
class AdvancedHF:
def __init__(self, model_id: str, cache_dir: Path, status_cb=None):
self.model_id = model_id
self.cache_dir = cache_dir
self.pipe = None
self.status_cb = status_cb or (lambda msg: None)
def load(self) -> Tuple[bool, str]:
try:
os.environ["HF_HOME"] = str(self.cache_dir)
self.status_cb("Initialisation Transformers…")
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline, AutoModel
# sentencepiece requis pour camembert/drbert
try:
import sentencepiece # noqa: F401
except Exception:
return False, "Dépendance 'sentencepiece' manquante. Installez-la puis rebuild."
self.status_cb("Chargement tokenizer…")
tok = AutoTokenizer.from_pretrained(self.model_id)
self.status_cb("Chargement modèle (peut prendre 12 min la 1ère fois)…")
mdl = None
try:
mdl = AutoModelForTokenClassification.from_pretrained(self.model_id)
head_ok = True
except Exception as e:
# si ce n'est pas un modèle NER, on télécharge au moins la base pour le cache
self.status_cb("Le modèle semble être un 'base LM'. Téléchargement de la base pour cache…")
try:
AutoModel.from_pretrained(self.model_id)
except Exception:
pass
return False, ("Le modèle sélectionné ne semble pas être un modèle NER (token-classification). "
"Choisissez un ID fine-tuné pour le NER (ex. 'Jean-Baptiste/camembert-ner').")
try:
import torch
torch.set_num_threads(1)
except Exception:
pass
self.pipe = pipeline("token-classification", model=mdl, tokenizer=tok,
aggregation_strategy="simple", device=-1)
return True, f"Modèle avancé prêt: {self.model_id}"
except Exception as e:
msg = str(e)
if "sentencepiece" in msg.lower():
return False, "Échec: 'sentencepiece' requis."
return False, f"Échec modèle avancé: {e}"
def apply(self, text: str) -> Tuple[str, List[Tuple[int,int,str,str]]]:
if not self.pipe: return text, []
res = self.pipe(text)
spans=[]
for r in res:
grp = r.get("entity_group") or r.get("entity") or ""
start, end = int(r["start"]), int(r["end"])
if grp.startswith("PER"):
rep = "[NOM]"
elif grp.startswith("ORG"):
rep = "[ETABLISSEMENT]"
elif grp in ("LOC","GPE") or grp.startswith("LOC"):
rep = "[VILLE]"
else:
continue
spans.append((start,end,rep,text[start:end]))
if not spans: return text, []
spans.sort(key=lambda x:x[0])
out=[]; last=0; audit=[]
for s,e,rep,raw in spans:
if s<last: continue
out.append(text[last:s]); out.append(rep); last=e
audit.append((s,e,rep,raw))
out.append(text[last:])
return "".join(out), audit
# ----------- Moteur Robuste -----------
@dataclass
class Replacement:
kind: str
page: Optional[int]
text_hash: str
replacement: str
class RobustEngine:
def __init__(self, config: Dict):
self.nlp = None
self.use_ner = False
self.date_policy = config.get("policy",{}).get("dates","keep")
self.date_shift_days = int(config.get("policy",{}).get("shift_days",0))
self.whitelist = set(config.get("whitelist",{}).get("tokens", list(DEFAULT_WHITELIST)))
self.keep_fields = config.get("tables",{}).get("keep_fields", list(DEFAULT_KEEP_FIELDS))
self.apply_ner_on_narr = True
# HF
adv = config.get("advanced", {})
self.adv_model_id = adv.get("hf_model_id", list(MODEL_PRESETS.values())[0])
self.adv_cache_dir = Path(os.environ.get("LOCALAPPDATA", resolve_base_dir())) / "Pseudonymiseur" / "models" / "hf_cache"
self.hf: Optional[AdvancedHF] = None
# spaCy
def try_load_spacy(self, custom_dir: Optional[Path]=None) -> Tuple[bool,str]:
candidates = []
if custom_dir: candidates.append(custom_dir)
candidates.append(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
for c in candidates:
if c.exists():
real = find_model_dir(c)
if real:
try:
self.nlp = load_model_from_path(real); self.use_ner=True
return True, f"Local: {real}"
except Exception as e:
warnings.warn(f"Echec load local {real}: {e}")
try:
self.nlp = spacy.load(MODEL_DIR_NAME); self.use_ner=True
return True, f"spacy.load('{MODEL_DIR_NAME}')"
except Exception as e:
self.nlp=None; self.use_ner=False
return False, f"Indisponible: {e}"
# Dates
def transform_dates(self, text: str) -> str:
if self.date_policy == "keep": return text
def as_mo_year(m, fmt):
try: return datetime.strptime(m.group(0), fmt).strftime("%m/%Y")
except: return m.group(0)
def shift(m, fmt):
try:
dt = datetime.strptime(m.group(0), fmt) + timedelta(days=self.date_shift_days)
return dt.strftime(fmt)
except: return m.group(0)
for rx,fmt in DATE_PATTERNS:
if self.date_policy=="month_year": text = rx.sub(lambda m: as_mo_year(m,fmt), text)
elif self.date_policy=="shift": text = rx.sub(lambda m: shift(m,fmt), text)
return text
# Regex ciblées
def regex_pass(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
repls: List[Replacement] = []
def add(kind, val, placeholder): repls.append(Replacement(kind, page, sha256(val)[:8], placeholder))
def sub_line(rx, placeholder, s):
return rx.sub(lambda m: (add("RULE", m.group(0), placeholder) or placeholder), s)
text = sub_line(ETAB_LINE_RE, "[ETABLISSEMENT]", text)
text = sub_line(FINESS_LINE_RE, "[FINESS]", text)
text = sub_line(OGC_LINE_RE, "[OGC]", text)
text = sub_line(PRATICIEN_LINE_RE, "[NOM_MEDECIN]", text)
text = sub_line(DIM_LINE_RE, "[NOM_MEDECIN]", text)
text = sub_line(DR_MAJ_RE, "[NOM_MEDECIN]", text)
for rx, ph, kind in [
(EMAIL_RE, "[EMAIL]", "EMAIL"),
(PHONE_RE, "[TEL]", "TEL"),
(IPP_RE, "[IPP]", "IPP"),
(IBAN_RE, "[IBAN]","IBAN"),
]:
text = rx.sub(lambda m: (repls.append(Replacement(kind,page,sha256(m.group(0))[:8],ph)) or ph), text)
def _nir(m):
nir13, cle2 = m.group(1), m.group(2)
if nir_is_valid(nir13, cle2):
repls.append(Replacement("NIR", page, sha256(m.group(0))[:8], "[NIR]")); return "[NIR]"
return m.group(0)
text = NIR_RAW_RE.sub(_nir, text)
def repl_noms_maj(m):
cand = m.group(0)
tokens = re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
if all(t in self.whitelist for t in tokens): return cand
repls.append(Replacement("NOM", page, sha256(cand)[:8], "[NOM]")); return "[NOM]"
text = NOMS_MAJ_RE.sub(repl_noms_maj, text)
return text, repls
# NER spaCy
def ner_pass_spacy(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
if not self.use_ner or not self.nlp: return text, []
doc = self.nlp(text)
spans=[]
for ent in doc.ents:
lab = ent.label_
if lab in ("DATE","TIME"): continue
if lab=="PERSON": rep="[NOM]"
elif lab=="ORG": rep="[ETABLISSEMENT]"
elif lab in ("GPE","LOC","FAC"): rep="[VILLE]"
else: continue
spans.append((ent.start_char, ent.end_char, rep, ent.text))
if not spans: return text, []
spans.sort(key=lambda x:x[0])
out=[]; last=0; repls=[]
for s,e,rep,raw in spans:
if s<last: continue
out.append(text[last:s]); out.append(rep); last=e
repls.append(Replacement("NER", page, sha256(raw)[:8], rep))
out.append(text[last:])
return "".join(out), repls
# HF
def ensure_hf(self, status_cb=None) -> Tuple[bool,str]:
if self.hf: return True, "Déjà prêt."
self.hf = AdvancedHF(self.adv_model_id, self.adv_cache_dir, status_cb=status_cb)
return self.hf.load()
def ner_pass_hf(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
if not self.hf: return text, []
t2, aud = self.hf.apply(text)
repls=[Replacement("HF", page, sha256(raw)[:8], rep) for (_s,_e,rep,raw) in aud]
return t2, repls
# Filet sécurité
def safety_rescan(self, text: str) -> str:
for rx,ph in [(FINESS_LINE_RE,"[FINESS]"),(OGC_LINE_RE,"[OGC]"),(ETAB_LINE_RE,"[ETABLISSEMENT]"),
(PRATICIEN_LINE_RE,"[NOM_MEDECIN]"),(DIM_LINE_RE,"[NOM_MEDECIN]"),(DR_MAJ_RE,"[NOM_MEDECIN]")]:
text = rx.sub(ph, text)
text = EMAIL_RE.sub("[EMAIL]", text)
text = PHONE_RE.sub("[TEL]", text)
text = IPP_RE.sub("[IPP]", text)
text = IBAN_RE.sub("[IBAN]", text)
def _nir(m): return "[NIR]" if nir_is_valid(m.group(1), m.group(2)) else m.group(0)
text = NIR_RAW_RE.sub(_nir, text)
def _maj(m):
cand=m.group(0); toks=re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
return cand if all(t in self.whitelist for t in toks) else "[NOM]"
return NOMS_MAJ_RE.sub(_maj, text)
# ----------- PDF Processor -----------
class PDFProcessor:
def __init__(self, engine: RobustEngine, options: Dict):
self.engine=engine; self.options=options
def process_pdf(self, pdf_path: Path) -> Tuple[str, List[Replacement], bool]:
chunks=[]; audit=[]; scanned_like=True
with pdfplumber.open(str(pdf_path)) as pdf:
for p_idx, page in enumerate(pdf.pages, start=1):
page_chunks=[]
# Tables
try: tables = page.extract_tables()
except Exception: tables=[]
if tables:
scanned_like=False
lines_all=[]
for t in tables:
rows=[[normalize_text(c or "") for c in row] for row in t]
text_lines, reps = self._handle_table(rows, p_idx)
audit += reps; lines_all += text_lines
if self.options.get("keep_tables", True) and lines_all:
page_chunks.append("[TABLES]\n" + "\n".join(lines_all) + "\n[/TABLES]")
# Narratif
try:
txt = page.extract_text(x_tolerance=1.5, y_tolerance=3.0) or ""
except Exception:
txt=""
txt=normalize_text(txt)
if txt.strip():
scanned_like=False
txt = self.engine.transform_dates(txt)
t1, r1 = self.engine.regex_pass(txt, p_idx)
if self.options.get("apply_ner_on_narrative", True) and self.engine.use_ner:
t2, r2 = self.engine.ner_pass_spacy(t1, p_idx)
else:
t2, r2 = t1, []
if self.options.get("aggressive_hf", False) and self.engine.hf:
t3, r3 = self.engine.ner_pass_hf(t2, p_idx)
else:
t3, r3 = t2, []
audit += (r1+r2+r3)
page_chunks.append(t3)
if page_chunks:
chunks.append(f"\n===== PAGE {p_idx} =====\n" + "\n\n".join(page_chunks))
final_text=("\n\n").join(chunks).strip()
if self.options.get("safety_rescan", True):
final_text=self.engine.safety_rescan(final_text)
return final_text, audit, scanned_like
def _handle_table(self, rows: List[List[str]], page: int) -> Tuple[List[str], List[Replacement]]:
out_lines=[]; repls=[]
for row in rows:
if not any(row): continue
line = "; ".join([c for c in row if c]);
if not line: continue
t, rr = self.engine.regex_pass(self.engine.transform_dates(line), page); repls += rr
kept=False
for k in self.engine.keep_fields:
if re.search(rf"(?i)\b{k}\b", t):
out_lines.append(t); kept=True; break
if not kept:
pass
return out_lines, repls
# ----------- GUI -----------
def load_config() -> Dict:
cfg = {
"whitelist": {"tokens": list(DEFAULT_WHITELIST)},
"tables": {"keep_fields": list(DEFAULT_KEEP_FIELDS)},
"policy": {"dates":"keep", "shift_days":0},
"advanced": {"hf_model_id": list(MODEL_PRESETS.values())[0]},
}
cfg_path = resolve_base_dir() / "config.yaml"
try:
if yaml and cfg_path.exists():
with cfg_path.open("r", encoding="utf-8") as f:
user_cfg = yaml.safe_load(f) or {}
for k,v in user_cfg.items():
if isinstance(v, dict) and k in cfg: cfg[k].update(v)
else: cfg[k]=v
except Exception:
pass
return cfg
class App:
def __init__(self, root: tk.Tk):
self.root=root; self.root.title(APP_TITLE); self.root.geometry("1100x780")
self.dir_var = tk.StringVar(); self.status_var = tk.StringVar(value="Prêt.")
self.model_status_var = tk.StringVar(value="Vérification du modèle spaCy…")
self.hf_status_var = tk.StringVar(value="Modèle avancé HF : inactif")
self.regex_only = tk.BooleanVar(value=False)
self.keep_tables = tk.BooleanVar(value=True)
self.apply_ner_on_narr = tk.BooleanVar(value=True)
self.safety_rescan = tk.BooleanVar(value=True)
self.aggressive_hf = tk.BooleanVar(value=False)
self.date_policy = tk.StringVar(value="keep")
self.date_shift_days = tk.StringVar(value="0")
self.hf_model_label = tk.StringVar(value=list(MODEL_PRESETS.keys())[0])
self.hf_model_id = tk.StringVar(value=list(MODEL_PRESETS.values())[0])
self.queue: "queue.Queue[str]" = queue.Queue()
self.config = load_config()
self.engine = RobustEngine(self.config)
self.engine.adv_cache_dir.mkdir(parents=True, exist_ok=True)
self._build_ui()
self._pump_logs()
self.root.after(250, self._ensure_spacy)
def _build_ui(self):
top = tk.Frame(self.root, padx=10, pady=10); top.pack(fill=tk.BOTH, expand=True)
# Ligne dossier
row1 = tk.Frame(top); row1.pack(fill=tk.X)
tk.Label(row1, text="Dossier PDF :").pack(side=tk.LEFT)
tk.Entry(row1, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
tk.Button(row1, text="Parcourir…", command=self._browse).pack(side=tk.LEFT, padx=3)
self.btn_run = tk.Button(row1, text="Lancer", command=self._run, state=tk.DISABLED)
self.btn_run.pack(side=tk.LEFT, padx=3)
# Carte spaCy
card = tk.LabelFrame(top, text="Modèle spaCy (FR)", padx=8, pady=8); card.pack(fill=tk.X, pady=6)
tk.Label(card, textvariable=self.model_status_var, anchor="w").pack(fill=tk.X)
pfrm = tk.Frame(card); pfrm.pack(fill=tk.X, pady=(6,0))
self.pbar = ttk.Progressbar(pfrm, orient="horizontal", mode="indeterminate", length=300); self.pbar.pack(side=tk.LEFT)
tk.Button(card, text="Télécharger", command=self._download_spacy).pack(side=tk.LEFT, padx=6)
tk.Button(card, text="Choisir un dossier…", command=self._choose_model_dir).pack(side=tk.LEFT)
tk.Checkbutton(card, text="Mode regex seul", variable=self.regex_only, command=self._toggle_regex).pack(side=tk.RIGHT)
# Carte HF
card2 = tk.LabelFrame(top, text="Modèle avancé (Hugging Face)", padx=8, pady=8); card2.pack(fill=tk.X, pady=6)
rowhf = tk.Frame(card2); rowhf.pack(fill=tk.X)
tk.Label(rowhf, text="Préréglage :").pack(side=tk.LEFT)
self.cmb = ttk.Combobox(rowhf, values=list(MODEL_PRESETS.keys()), textvariable=self.hf_model_label, state="readonly", width=35)
self.cmb.pack(side=tk.LEFT, padx=6)
self.cmb.bind("<<ComboboxSelected>>", self._preset_changed)
tk.Label(rowhf, text="Model ID :").pack(side=tk.LEFT)
tk.Entry(rowhf, textvariable=self.hf_model_id, width=44).pack(side=tk.LEFT, padx=6)
tk.Button(rowhf, text="Charger modèle avancé", command=self._load_hf).pack(side=tk.LEFT)
tk.Checkbutton(card2, text="Re-scanner agressif (ajoute le modèle avancé au narratif)", variable=self.aggressive_hf).pack(side=tk.LEFT, padx=10)
tk.Label(card2, textvariable=self.hf_status_var, anchor="w").pack(fill=tk.X, pady=(6,0))
# Options
opt = tk.LabelFrame(top, text="Options", padx=8, pady=8); opt.pack(fill=tk.X, pady=6)
tk.Checkbutton(opt, text="Garder tables utiles (réduit)", variable=self.keep_tables).pack(side=tk.LEFT, padx=6)
tk.Checkbutton(opt, text="Appliquer NER (spaCy) sur narratif", variable=self.apply_ner_on_narr).pack(side=tk.LEFT, padx=6)
tk.Checkbutton(opt, text="Re-scanner (sécurité) après traitement", variable=self.safety_rescan).pack(side=tk.LEFT, padx=6)
pol = tk.LabelFrame(top, text="Politique Dates", padx=8, pady=8); pol.pack(fill=tk.X, pady=6)
tk.Label(pol, text="Dates :").pack(side=tk.LEFT)
ttk.Combobox(pol, textvariable=self.date_policy, values=["keep","month_year","shift"], width=12, state="readonly").pack(side=tk.LEFT, padx=6)
tk.Label(pol, text="Décalage (+/- jours) :").pack(side=tk.LEFT)
tk.Entry(pol, textvariable=self.date_shift_days, width=6).pack(side=tk.LEFT, padx=6)
tk.Label(top, text="Journal :").pack(anchor="w")
self.txt = tk.Text(top, height=18); self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
tk.Label(top, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
# Helpers
def _pbar_mode(self, mode:str):
self.pbar.config(mode=mode)
if mode=="indeterminate": self.pbar.start(60)
else: self.pbar.stop(); self.pbar["value"]=0
def log(self, msg:str):
self.queue.put(msg)
def _pump_logs(self):
try:
while True:
msg = self.queue.get_nowait()
self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
except queue.Empty:
pass
finally:
self.root.after(60, self._pump_logs)
# spaCy
def _ensure_spacy(self):
self._pbar_mode("indeterminate")
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
if ok:
self.model_status_var.set(f"Modèle prêt. {msg}")
self.btn_run.config(state=tk.NORMAL)
else:
self.model_status_var.set(f"Modèle indisponible : {msg} — utilisez 'Télécharger' ou 'Mode regex seul'.")
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
self._pbar_mode("determinate")
def _download_spacy(self):
self._pbar_mode("indeterminate"); self.model_status_var.set("Téléchargement spaCy en cours…")
def work():
try:
subprocess.check_call([sys.executable, "-m", "spacy", "download", MODEL_DIR_NAME])
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
if ok:
self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
else:
self.model_status_var.set("Échec validation modèle. Essayez 'Choisir un dossier…' ou 'Mode regex seul'.")
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
except Exception as e:
self.model_status_var.set(f"Erreur téléchargement spaCy : {e}")
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
finally:
self._pbar_mode("determinate")
threading.Thread(target=work, daemon=True).start()
def _choose_model_dir(self):
d = filedialog.askdirectory(title="Choisir le dossier du modèle spaCy")
if d:
ok,msg = self.engine.try_load_spacy(Path(d))
if ok: self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
else: self.model_status_var.set("Échec chargement du modèle.");
if not self.regex_only.get() and not ok: self.btn_run.config(state=tk.DISABLED)
def _toggle_regex(self):
if self.regex_only.get():
self.engine.use_ner=False; self.apply_ner_on_narr.set(False); self.btn_run.config(state=tk.NORMAL)
self.model_status_var.set("Mode regex seul : précision NER réduite.")
else:
self._ensure_spacy()
# HF
def _preset_changed(self, _evt=None):
label = self.hf_model_label.get()
self.hf_model_id.set(MODEL_PRESETS.get(label, list(MODEL_PRESETS.values())[0]))
def _load_hf(self):
mid = self.hf_model_id.get().strip()
self.hf_status_var.set(f"Chargement du modèle avancé : {mid}")
self._pbar_mode("indeterminate")
def work():
try:
self.engine.adv_model_id = mid
ok,msg = self.engine.ensure_hf(status_cb=lambda m: self.hf_status_var.set(m))
self.hf_status_var.set(msg)
finally:
self._pbar_mode("determinate")
threading.Thread(target=work, daemon=True).start()
# Run
def _browse(self):
d = filedialog.askdirectory()
if d: self.dir_var.set(d)
def _run(self):
folder = Path(self.dir_var.get().strip())
if not folder.is_dir():
messagebox.showwarning("Dossier invalide","Choisissez un dossier contenant des PDF.")
return
self.engine.use_ner = (not self.regex_only.get()) and (self.engine.nlp is not None) and self.apply_ner_on_narr.get()
self.engine.date_policy = self.date_policy.get()
try: self.engine.date_shift_days = int(self.date_shift_days.get() or "0")
except: self.engine.date_shift_days = 0
opts = dict(
keep_tables = self.keep_tables.get(),
apply_ner_on_narrative = self.apply_ner_on_narr.get() and self.engine.use_ner,
safety_rescan = self.safety_rescan.get(),
aggressive_hf = self.aggressive_hf.get() and (self.engine.hf is not None),
)
self.btn_run.config(state=tk.DISABLED)
threading.Thread(target=self._worker, args=(folder,opts), daemon=True).start()
def _worker(self, folder: Path, options: Dict):
try:
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
if not pdfs: self.log("Aucun PDF trouvé."); return
outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True)
ok=ko=0
for i,pdf in enumerate(pdfs, start=1):
self.status_var.set(f"{i}/{len(pdfs)}{pdf.name}")
try:
proc = PDFProcessor(self.engine, options)
text, audit, scanned = proc.process_pdf(pdf)
(outdir / f"{pdf.stem}.pseudonymise.txt").write_text(text, encoding="utf-8")
with (outdir / f"{pdf.stem}.pseudonymise.jsonl").open("w", encoding="utf-8") as f:
for rep in audit: f.write(json.dumps(asdict(rep), ensure_ascii=False) + "\n")
with (outdir / f"{pdf.stem}.log.txt").open("w", encoding="utf-8") as f:
f.write(f"Fichier: {pdf.name}\nScanneSuspect: {scanned}\nRemplacements: {len(audit)}\n")
self.log(f"{pdf.name}"); ok+=1
except Exception as e:
self.log(f"{pdf.name} → ERREUR: {e}"); ko+=1
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
finally:
self.btn_run.config(state=tk.NORMAL)
# ----------- main -----------
def main():
root = tk.Tk()
App(root)
root.mainloop()
if __name__ == "__main__":
main()