#!/usr/bin/env python3 # -*- coding: utf-8 -*- from __future__ import annotations import os, re, sys, json, queue, hashlib, warnings, threading, subprocess, unicodedata from dataclasses import dataclass, asdict from pathlib import Path from typing import List, Tuple, Optional, Dict from datetime import datetime, timedelta # GUI import tkinter as tk from tkinter import filedialog, messagebox, ttk # Core import pdfplumber import requests import spacy from spacy.util import load_model_from_path try: import yaml except Exception: yaml = None APP_TITLE = "Pseudonymisation (Robuste + Backbones)" MODEL_DIR_NAME = "fr_core_news_lg" # ----------- Utilitaires & Unicode ----------- def resolve_base_dir() -> Path: return Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent)) def sha256(s: str) -> str: h = hashlib.sha256(); h.update(s.encode("utf-8", errors="ignore")); return h.hexdigest() def normalize_text(s: str) -> str: if not s: return "" s = unicodedata.normalize("NFKC", s) s = s.replace("fi","fi").replace("fl","fl") s = s.replace("“","\"").replace("”","\"").replace("’","'").replace("«","\"").replace("»","\"") s = s.replace("\u00A0"," ") s = re.sub(r"[\u0000-\u001f]", " ", s) s = re.sub(r"\s+", " ", s).strip() return s def find_model_dir(root: Path) -> Optional[Path]: if (root / "config.cfg").exists() and (root / "meta.json").exists(): return root for p in root.rglob("config.cfg"): if (p.parent / "meta.json").exists(): return p.parent return None # ----------- Règles & Whitelist ----------- DEFAULT_WHITELIST = { "PMSI","T2A","GHM","GHS","DP","DR","DAS","RUM","UM","UF","CMA","CMD","CIM","CIM-10","CCAM","NGAP","NABM","ICD","ICD-10", "CHU","CH","CLCC","SSR","USI","USC","USLD","UHCD","SAU","UCA","HDJ","HAD","EHPAD","CMP","SMUR","SAMU","DIM", "IRM","TDM","TEP","RX","ETT","ETO","ECG","EEG","EMG","EFR","BHC", "NFS","CRP","VS","HB","HT","TSH","T3","T4","ASAT","ALAT","GGT","LDH","BNP","NTPROBNP","DFG","INR","PAO2","PACO2","SPO2","TA","FC","IMC","BMI", "IGS2","SAPS2","APACHE","SOFA","NEWS","HAS","ARS", "FINESS","OGC", } EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b") PHONE_RE = re.compile(r"(?:\+33|0)[1-9](?:[ .-]?\d{2}){4}\b") IPP_RE = re.compile(r"\bIPP[: ]?\d{6,10}\b", re.IGNORECASE) IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b") NIR_RAW_RE = re.compile(r"\b(\d{13})(\d{2})\b") FINESS_LINE_RE = re.compile(r"\bFINESS\s*:\s*\d{9}\b", re.IGNORECASE) OGC_LINE_RE = re.compile(r"N[°º]?\s*OGC\s*:\s*\d+", re.IGNORECASE) ETAB_LINE_RE = re.compile(r"Etablissement\s*:\s*.*", re.IGNORECASE) PRATICIEN_LINE_RE = re.compile(r"Nom du praticien[- ]conseil\s*:\s*.*", re.IGNORECASE) DIM_LINE_RE = re.compile(r"Nom du m[ée]decin du DIM\s*:\s*.*", re.IGNORECASE) DR_MAJ_RE = re.compile(r"Dr\s+[A-ZÀ-Ü' \-]{2,}") NOMS_MAJ_RE = re.compile(r"(? bool: try: n = int(nir13); k = int(cle2) return (97 - (n % 97)) == k except Exception: return False # ----------- Modèle avancé HF (cascade) ----------- MODEL_PRESETS = { "CamemBERT NER (Jean-Baptiste)": "Jean-Baptiste/camembert-ner", # NER prêt à l'emploi "CamemBERT-bio (base LM)": "almanach/camembert-base-bio", # base LM, pas NER -> pour tests / remplacez par un NER biomédical si vous en avez un "DrBERT (base LM)": "Dr-BERT/DrBERT-7GB", # base LM, pas NER -> idem } class AdvancedHF: def __init__(self, model_id: str, cache_dir: Path, status_cb=None): self.model_id = model_id self.cache_dir = cache_dir self.pipe = None self.status_cb = status_cb or (lambda msg: None) def load(self) -> Tuple[bool, str]: try: os.environ["HF_HOME"] = str(self.cache_dir) self.status_cb("Initialisation Transformers…") from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline, AutoModel # sentencepiece requis pour camembert/drbert try: import sentencepiece # noqa: F401 except Exception: return False, "Dépendance 'sentencepiece' manquante. Installez-la puis rebuild." self.status_cb("Chargement tokenizer…") tok = AutoTokenizer.from_pretrained(self.model_id) self.status_cb("Chargement modèle (peut prendre 1–2 min la 1ère fois)…") mdl = None try: mdl = AutoModelForTokenClassification.from_pretrained(self.model_id) head_ok = True except Exception as e: # si ce n'est pas un modèle NER, on télécharge au moins la base pour le cache self.status_cb("Le modèle semble être un 'base LM'. Téléchargement de la base pour cache…") try: AutoModel.from_pretrained(self.model_id) except Exception: pass return False, ("Le modèle sélectionné ne semble pas être un modèle NER (token-classification). " "Choisissez un ID fine-tuné pour le NER (ex. 'Jean-Baptiste/camembert-ner').") try: import torch torch.set_num_threads(1) except Exception: pass self.pipe = pipeline("token-classification", model=mdl, tokenizer=tok, aggregation_strategy="simple", device=-1) return True, f"Modèle avancé prêt: {self.model_id}" except Exception as e: msg = str(e) if "sentencepiece" in msg.lower(): return False, "Échec: 'sentencepiece' requis." return False, f"Échec modèle avancé: {e}" def apply(self, text: str) -> Tuple[str, List[Tuple[int,int,str,str]]]: if not self.pipe: return text, [] res = self.pipe(text) spans=[] for r in res: grp = r.get("entity_group") or r.get("entity") or "" start, end = int(r["start"]), int(r["end"]) if grp.startswith("PER"): rep = "[NOM]" elif grp.startswith("ORG"): rep = "[ETABLISSEMENT]" elif grp in ("LOC","GPE") or grp.startswith("LOC"): rep = "[VILLE]" else: continue spans.append((start,end,rep,text[start:end])) if not spans: return text, [] spans.sort(key=lambda x:x[0]) out=[]; last=0; audit=[] for s,e,rep,raw in spans: if s Tuple[bool,str]: candidates = [] if custom_dir: candidates.append(custom_dir) candidates.append(resolve_base_dir()/ "models" / MODEL_DIR_NAME) for c in candidates: if c.exists(): real = find_model_dir(c) if real: try: self.nlp = load_model_from_path(real); self.use_ner=True return True, f"Local: {real}" except Exception as e: warnings.warn(f"Echec load local {real}: {e}") try: self.nlp = spacy.load(MODEL_DIR_NAME); self.use_ner=True return True, f"spacy.load('{MODEL_DIR_NAME}')" except Exception as e: self.nlp=None; self.use_ner=False return False, f"Indisponible: {e}" # Dates def transform_dates(self, text: str) -> str: if self.date_policy == "keep": return text def as_mo_year(m, fmt): try: return datetime.strptime(m.group(0), fmt).strftime("%m/%Y") except: return m.group(0) def shift(m, fmt): try: dt = datetime.strptime(m.group(0), fmt) + timedelta(days=self.date_shift_days) return dt.strftime(fmt) except: return m.group(0) for rx,fmt in DATE_PATTERNS: if self.date_policy=="month_year": text = rx.sub(lambda m: as_mo_year(m,fmt), text) elif self.date_policy=="shift": text = rx.sub(lambda m: shift(m,fmt), text) return text # Regex ciblées def regex_pass(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]: repls: List[Replacement] = [] def add(kind, val, placeholder): repls.append(Replacement(kind, page, sha256(val)[:8], placeholder)) def sub_line(rx, placeholder, s): return rx.sub(lambda m: (add("RULE", m.group(0), placeholder) or placeholder), s) text = sub_line(ETAB_LINE_RE, "[ETABLISSEMENT]", text) text = sub_line(FINESS_LINE_RE, "[FINESS]", text) text = sub_line(OGC_LINE_RE, "[OGC]", text) text = sub_line(PRATICIEN_LINE_RE, "[NOM_MEDECIN]", text) text = sub_line(DIM_LINE_RE, "[NOM_MEDECIN]", text) text = sub_line(DR_MAJ_RE, "[NOM_MEDECIN]", text) for rx, ph, kind in [ (EMAIL_RE, "[EMAIL]", "EMAIL"), (PHONE_RE, "[TEL]", "TEL"), (IPP_RE, "[IPP]", "IPP"), (IBAN_RE, "[IBAN]","IBAN"), ]: text = rx.sub(lambda m: (repls.append(Replacement(kind,page,sha256(m.group(0))[:8],ph)) or ph), text) def _nir(m): nir13, cle2 = m.group(1), m.group(2) if nir_is_valid(nir13, cle2): repls.append(Replacement("NIR", page, sha256(m.group(0))[:8], "[NIR]")); return "[NIR]" return m.group(0) text = NIR_RAW_RE.sub(_nir, text) def repl_noms_maj(m): cand = m.group(0) tokens = re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand) if all(t in self.whitelist for t in tokens): return cand repls.append(Replacement("NOM", page, sha256(cand)[:8], "[NOM]")); return "[NOM]" text = NOMS_MAJ_RE.sub(repl_noms_maj, text) return text, repls # NER spaCy def ner_pass_spacy(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]: if not self.use_ner or not self.nlp: return text, [] doc = self.nlp(text) spans=[] for ent in doc.ents: lab = ent.label_ if lab in ("DATE","TIME"): continue if lab=="PERSON": rep="[NOM]" elif lab=="ORG": rep="[ETABLISSEMENT]" elif lab in ("GPE","LOC","FAC"): rep="[VILLE]" else: continue spans.append((ent.start_char, ent.end_char, rep, ent.text)) if not spans: return text, [] spans.sort(key=lambda x:x[0]) out=[]; last=0; repls=[] for s,e,rep,raw in spans: if s Tuple[bool,str]: if self.hf: return True, "Déjà prêt." self.hf = AdvancedHF(self.adv_model_id, self.adv_cache_dir, status_cb=status_cb) return self.hf.load() def ner_pass_hf(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]: if not self.hf: return text, [] t2, aud = self.hf.apply(text) repls=[Replacement("HF", page, sha256(raw)[:8], rep) for (_s,_e,rep,raw) in aud] return t2, repls # Filet sécurité def safety_rescan(self, text: str) -> str: for rx,ph in [(FINESS_LINE_RE,"[FINESS]"),(OGC_LINE_RE,"[OGC]"),(ETAB_LINE_RE,"[ETABLISSEMENT]"), (PRATICIEN_LINE_RE,"[NOM_MEDECIN]"),(DIM_LINE_RE,"[NOM_MEDECIN]"),(DR_MAJ_RE,"[NOM_MEDECIN]")]: text = rx.sub(ph, text) text = EMAIL_RE.sub("[EMAIL]", text) text = PHONE_RE.sub("[TEL]", text) text = IPP_RE.sub("[IPP]", text) text = IBAN_RE.sub("[IBAN]", text) def _nir(m): return "[NIR]" if nir_is_valid(m.group(1), m.group(2)) else m.group(0) text = NIR_RAW_RE.sub(_nir, text) def _maj(m): cand=m.group(0); toks=re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand) return cand if all(t in self.whitelist for t in toks) else "[NOM]" return NOMS_MAJ_RE.sub(_maj, text) # ----------- PDF Processor ----------- class PDFProcessor: def __init__(self, engine: RobustEngine, options: Dict): self.engine=engine; self.options=options def process_pdf(self, pdf_path: Path) -> Tuple[str, List[Replacement], bool]: chunks=[]; audit=[]; scanned_like=True with pdfplumber.open(str(pdf_path)) as pdf: for p_idx, page in enumerate(pdf.pages, start=1): page_chunks=[] # Tables try: tables = page.extract_tables() except Exception: tables=[] if tables: scanned_like=False lines_all=[] for t in tables: rows=[[normalize_text(c or "") for c in row] for row in t] text_lines, reps = self._handle_table(rows, p_idx) audit += reps; lines_all += text_lines if self.options.get("keep_tables", True) and lines_all: page_chunks.append("[TABLES]\n" + "\n".join(lines_all) + "\n[/TABLES]") # Narratif try: txt = page.extract_text(x_tolerance=1.5, y_tolerance=3.0) or "" except Exception: txt="" txt=normalize_text(txt) if txt.strip(): scanned_like=False txt = self.engine.transform_dates(txt) t1, r1 = self.engine.regex_pass(txt, p_idx) if self.options.get("apply_ner_on_narrative", True) and self.engine.use_ner: t2, r2 = self.engine.ner_pass_spacy(t1, p_idx) else: t2, r2 = t1, [] if self.options.get("aggressive_hf", False) and self.engine.hf: t3, r3 = self.engine.ner_pass_hf(t2, p_idx) else: t3, r3 = t2, [] audit += (r1+r2+r3) page_chunks.append(t3) if page_chunks: chunks.append(f"\n===== PAGE {p_idx} =====\n" + "\n\n".join(page_chunks)) final_text=("\n\n").join(chunks).strip() if self.options.get("safety_rescan", True): final_text=self.engine.safety_rescan(final_text) return final_text, audit, scanned_like def _handle_table(self, rows: List[List[str]], page: int) -> Tuple[List[str], List[Replacement]]: out_lines=[]; repls=[] for row in rows: if not any(row): continue line = "; ".join([c for c in row if c]); if not line: continue t, rr = self.engine.regex_pass(self.engine.transform_dates(line), page); repls += rr kept=False for k in self.engine.keep_fields: if re.search(rf"(?i)\b{k}\b", t): out_lines.append(t); kept=True; break if not kept: pass return out_lines, repls # ----------- GUI ----------- def load_config() -> Dict: cfg = { "whitelist": {"tokens": list(DEFAULT_WHITELIST)}, "tables": {"keep_fields": list(DEFAULT_KEEP_FIELDS)}, "policy": {"dates":"keep", "shift_days":0}, "advanced": {"hf_model_id": list(MODEL_PRESETS.values())[0]}, } cfg_path = resolve_base_dir() / "config.yaml" try: if yaml and cfg_path.exists(): with cfg_path.open("r", encoding="utf-8") as f: user_cfg = yaml.safe_load(f) or {} for k,v in user_cfg.items(): if isinstance(v, dict) and k in cfg: cfg[k].update(v) else: cfg[k]=v except Exception: pass return cfg class App: def __init__(self, root: tk.Tk): self.root=root; self.root.title(APP_TITLE); self.root.geometry("1100x780") self.dir_var = tk.StringVar(); self.status_var = tk.StringVar(value="Prêt.") self.model_status_var = tk.StringVar(value="Vérification du modèle spaCy…") self.hf_status_var = tk.StringVar(value="Modèle avancé HF : inactif") self.regex_only = tk.BooleanVar(value=False) self.keep_tables = tk.BooleanVar(value=True) self.apply_ner_on_narr = tk.BooleanVar(value=True) self.safety_rescan = tk.BooleanVar(value=True) self.aggressive_hf = tk.BooleanVar(value=False) self.date_policy = tk.StringVar(value="keep") self.date_shift_days = tk.StringVar(value="0") self.hf_model_label = tk.StringVar(value=list(MODEL_PRESETS.keys())[0]) self.hf_model_id = tk.StringVar(value=list(MODEL_PRESETS.values())[0]) self.queue: "queue.Queue[str]" = queue.Queue() self.config = load_config() self.engine = RobustEngine(self.config) self.engine.adv_cache_dir.mkdir(parents=True, exist_ok=True) self._build_ui() self._pump_logs() self.root.after(250, self._ensure_spacy) def _build_ui(self): top = tk.Frame(self.root, padx=10, pady=10); top.pack(fill=tk.BOTH, expand=True) # Ligne dossier row1 = tk.Frame(top); row1.pack(fill=tk.X) tk.Label(row1, text="Dossier PDF :").pack(side=tk.LEFT) tk.Entry(row1, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6) tk.Button(row1, text="Parcourir…", command=self._browse).pack(side=tk.LEFT, padx=3) self.btn_run = tk.Button(row1, text="Lancer", command=self._run, state=tk.DISABLED) self.btn_run.pack(side=tk.LEFT, padx=3) # Carte spaCy card = tk.LabelFrame(top, text="Modèle spaCy (FR)", padx=8, pady=8); card.pack(fill=tk.X, pady=6) tk.Label(card, textvariable=self.model_status_var, anchor="w").pack(fill=tk.X) pfrm = tk.Frame(card); pfrm.pack(fill=tk.X, pady=(6,0)) self.pbar = ttk.Progressbar(pfrm, orient="horizontal", mode="indeterminate", length=300); self.pbar.pack(side=tk.LEFT) tk.Button(card, text="Télécharger", command=self._download_spacy).pack(side=tk.LEFT, padx=6) tk.Button(card, text="Choisir un dossier…", command=self._choose_model_dir).pack(side=tk.LEFT) tk.Checkbutton(card, text="Mode regex seul", variable=self.regex_only, command=self._toggle_regex).pack(side=tk.RIGHT) # Carte HF card2 = tk.LabelFrame(top, text="Modèle avancé (Hugging Face)", padx=8, pady=8); card2.pack(fill=tk.X, pady=6) rowhf = tk.Frame(card2); rowhf.pack(fill=tk.X) tk.Label(rowhf, text="Préréglage :").pack(side=tk.LEFT) self.cmb = ttk.Combobox(rowhf, values=list(MODEL_PRESETS.keys()), textvariable=self.hf_model_label, state="readonly", width=35) self.cmb.pack(side=tk.LEFT, padx=6) self.cmb.bind("<>", self._preset_changed) tk.Label(rowhf, text="Model ID :").pack(side=tk.LEFT) tk.Entry(rowhf, textvariable=self.hf_model_id, width=44).pack(side=tk.LEFT, padx=6) tk.Button(rowhf, text="Charger modèle avancé", command=self._load_hf).pack(side=tk.LEFT) tk.Checkbutton(card2, text="Re-scanner agressif (ajoute le modèle avancé au narratif)", variable=self.aggressive_hf).pack(side=tk.LEFT, padx=10) tk.Label(card2, textvariable=self.hf_status_var, anchor="w").pack(fill=tk.X, pady=(6,0)) # Options opt = tk.LabelFrame(top, text="Options", padx=8, pady=8); opt.pack(fill=tk.X, pady=6) tk.Checkbutton(opt, text="Garder tables utiles (réduit)", variable=self.keep_tables).pack(side=tk.LEFT, padx=6) tk.Checkbutton(opt, text="Appliquer NER (spaCy) sur narratif", variable=self.apply_ner_on_narr).pack(side=tk.LEFT, padx=6) tk.Checkbutton(opt, text="Re-scanner (sécurité) après traitement", variable=self.safety_rescan).pack(side=tk.LEFT, padx=6) pol = tk.LabelFrame(top, text="Politique Dates", padx=8, pady=8); pol.pack(fill=tk.X, pady=6) tk.Label(pol, text="Dates :").pack(side=tk.LEFT) ttk.Combobox(pol, textvariable=self.date_policy, values=["keep","month_year","shift"], width=12, state="readonly").pack(side=tk.LEFT, padx=6) tk.Label(pol, text="Décalage (+/- jours) :").pack(side=tk.LEFT) tk.Entry(pol, textvariable=self.date_shift_days, width=6).pack(side=tk.LEFT, padx=6) tk.Label(top, text="Journal :").pack(anchor="w") self.txt = tk.Text(top, height=18); self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0)) tk.Label(top, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0)) # Helpers def _pbar_mode(self, mode:str): self.pbar.config(mode=mode) if mode=="indeterminate": self.pbar.start(60) else: self.pbar.stop(); self.pbar["value"]=0 def log(self, msg:str): self.queue.put(msg) def _pump_logs(self): try: while True: msg = self.queue.get_nowait() self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END) except queue.Empty: pass finally: self.root.after(60, self._pump_logs) # spaCy def _ensure_spacy(self): self._pbar_mode("indeterminate") ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME) if ok: self.model_status_var.set(f"Modèle prêt. {msg}") self.btn_run.config(state=tk.NORMAL) else: self.model_status_var.set(f"Modèle indisponible : {msg} — utilisez 'Télécharger' ou 'Mode regex seul'.") if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED) self._pbar_mode("determinate") def _download_spacy(self): self._pbar_mode("indeterminate"); self.model_status_var.set("Téléchargement spaCy en cours…") def work(): try: subprocess.check_call([sys.executable, "-m", "spacy", "download", MODEL_DIR_NAME]) ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME) if ok: self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL) else: self.model_status_var.set("Échec validation modèle. Essayez 'Choisir un dossier…' ou 'Mode regex seul'.") if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED) except Exception as e: self.model_status_var.set(f"Erreur téléchargement spaCy : {e}") if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED) finally: self._pbar_mode("determinate") threading.Thread(target=work, daemon=True).start() def _choose_model_dir(self): d = filedialog.askdirectory(title="Choisir le dossier du modèle spaCy") if d: ok,msg = self.engine.try_load_spacy(Path(d)) if ok: self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL) else: self.model_status_var.set("Échec chargement du modèle."); if not self.regex_only.get() and not ok: self.btn_run.config(state=tk.DISABLED) def _toggle_regex(self): if self.regex_only.get(): self.engine.use_ner=False; self.apply_ner_on_narr.set(False); self.btn_run.config(state=tk.NORMAL) self.model_status_var.set("Mode regex seul : précision NER réduite.") else: self._ensure_spacy() # HF def _preset_changed(self, _evt=None): label = self.hf_model_label.get() self.hf_model_id.set(MODEL_PRESETS.get(label, list(MODEL_PRESETS.values())[0])) def _load_hf(self): mid = self.hf_model_id.get().strip() self.hf_status_var.set(f"Chargement du modèle avancé : {mid} …") self._pbar_mode("indeterminate") def work(): try: self.engine.adv_model_id = mid ok,msg = self.engine.ensure_hf(status_cb=lambda m: self.hf_status_var.set(m)) self.hf_status_var.set(msg) finally: self._pbar_mode("determinate") threading.Thread(target=work, daemon=True).start() # Run def _browse(self): d = filedialog.askdirectory() if d: self.dir_var.set(d) def _run(self): folder = Path(self.dir_var.get().strip()) if not folder.is_dir(): messagebox.showwarning("Dossier invalide","Choisissez un dossier contenant des PDF.") return self.engine.use_ner = (not self.regex_only.get()) and (self.engine.nlp is not None) and self.apply_ner_on_narr.get() self.engine.date_policy = self.date_policy.get() try: self.engine.date_shift_days = int(self.date_shift_days.get() or "0") except: self.engine.date_shift_days = 0 opts = dict( keep_tables = self.keep_tables.get(), apply_ner_on_narrative = self.apply_ner_on_narr.get() and self.engine.use_ner, safety_rescan = self.safety_rescan.get(), aggressive_hf = self.aggressive_hf.get() and (self.engine.hf is not None), ) self.btn_run.config(state=tk.DISABLED) threading.Thread(target=self._worker, args=(folder,opts), daemon=True).start() def _worker(self, folder: Path, options: Dict): try: pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()]) if not pdfs: self.log("Aucun PDF trouvé."); return outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True) ok=ko=0 for i,pdf in enumerate(pdfs, start=1): self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}") try: proc = PDFProcessor(self.engine, options) text, audit, scanned = proc.process_pdf(pdf) (outdir / f"{pdf.stem}.pseudonymise.txt").write_text(text, encoding="utf-8") with (outdir / f"{pdf.stem}.pseudonymise.jsonl").open("w", encoding="utf-8") as f: for rep in audit: f.write(json.dumps(asdict(rep), ensure_ascii=False) + "\n") with (outdir / f"{pdf.stem}.log.txt").open("w", encoding="utf-8") as f: f.write(f"Fichier: {pdf.name}\nScanneSuspect: {scanned}\nRemplacements: {len(audit)}\n") self.log(f"✓ {pdf.name}"); ok+=1 except Exception as e: self.log(f"✗ {pdf.name} → ERREUR: {e}"); ko+=1 self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}") finally: self.btn_run.config(state=tk.NORMAL) # ----------- main ----------- def main(): root = tk.Tk() App(root) root.mainloop() if __name__ == "__main__": main()