- GUI v5 : vue unique épurée (tkinter), 2 étapes visuelles - Core ONNX : anonymisation regex + NER optionnel - Extraction globale des noms depuis champs structurés (Patient, Rédigé par, MME/Madame, DR) - Génération simultanée PDF Image + PDF Anonymisé (structure préservée) - Build Windows via Nuitka (script batch + GitHub Actions CI) - install.sh pour setup/run Linux Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
628 lines
29 KiB
Python
628 lines
29 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
from __future__ import annotations
|
||
|
||
import os, re, sys, json, queue, hashlib, warnings, threading, subprocess, unicodedata
|
||
from dataclasses import dataclass, asdict
|
||
from pathlib import Path
|
||
from typing import List, Tuple, Optional, Dict
|
||
from datetime import datetime, timedelta
|
||
|
||
# GUI
|
||
import tkinter as tk
|
||
from tkinter import filedialog, messagebox, ttk
|
||
|
||
# Core
|
||
import pdfplumber
|
||
import requests
|
||
import spacy
|
||
from spacy.util import load_model_from_path
|
||
|
||
try:
|
||
import yaml
|
||
except Exception:
|
||
yaml = None
|
||
|
||
APP_TITLE = "Pseudonymisation (Robuste + Backbones)"
|
||
MODEL_DIR_NAME = "fr_core_news_lg"
|
||
|
||
# ----------- Utilitaires & Unicode -----------
|
||
|
||
def resolve_base_dir() -> Path:
|
||
return Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
|
||
|
||
def sha256(s: str) -> str:
|
||
h = hashlib.sha256(); h.update(s.encode("utf-8", errors="ignore")); return h.hexdigest()
|
||
|
||
def normalize_text(s: str) -> str:
|
||
if not s: return ""
|
||
s = unicodedata.normalize("NFKC", s)
|
||
s = s.replace("fi","fi").replace("fl","fl")
|
||
s = s.replace("“","\"").replace("”","\"").replace("’","'").replace("«","\"").replace("»","\"")
|
||
s = s.replace("\u00A0"," ")
|
||
s = re.sub(r"[\u0000-\u001f]", " ", s)
|
||
s = re.sub(r"\s+", " ", s).strip()
|
||
return s
|
||
|
||
def find_model_dir(root: Path) -> Optional[Path]:
|
||
if (root / "config.cfg").exists() and (root / "meta.json").exists():
|
||
return root
|
||
for p in root.rglob("config.cfg"):
|
||
if (p.parent / "meta.json").exists():
|
||
return p.parent
|
||
return None
|
||
|
||
# ----------- Règles & Whitelist -----------
|
||
|
||
DEFAULT_WHITELIST = {
|
||
"PMSI","T2A","GHM","GHS","DP","DR","DAS","RUM","UM","UF","CMA","CMD","CIM","CIM-10","CCAM","NGAP","NABM","ICD","ICD-10",
|
||
"CHU","CH","CLCC","SSR","USI","USC","USLD","UHCD","SAU","UCA","HDJ","HAD","EHPAD","CMP","SMUR","SAMU","DIM",
|
||
"IRM","TDM","TEP","RX","ETT","ETO","ECG","EEG","EMG","EFR","BHC",
|
||
"NFS","CRP","VS","HB","HT","TSH","T3","T4","ASAT","ALAT","GGT","LDH","BNP","NTPROBNP","DFG","INR","PAO2","PACO2","SPO2","TA","FC","IMC","BMI",
|
||
"IGS2","SAPS2","APACHE","SOFA","NEWS","HAS","ARS",
|
||
"FINESS","OGC",
|
||
}
|
||
|
||
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
|
||
PHONE_RE = re.compile(r"(?:\+33|0)[1-9](?:[ .-]?\d{2}){4}\b")
|
||
IPP_RE = re.compile(r"\bIPP[: ]?\d{6,10}\b", re.IGNORECASE)
|
||
IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
|
||
NIR_RAW_RE = re.compile(r"\b(\d{13})(\d{2})\b")
|
||
FINESS_LINE_RE = re.compile(r"\bFINESS\s*:\s*\d{9}\b", re.IGNORECASE)
|
||
OGC_LINE_RE = re.compile(r"N[°º]?\s*OGC\s*:\s*\d+", re.IGNORECASE)
|
||
ETAB_LINE_RE = re.compile(r"Etablissement\s*:\s*.*", re.IGNORECASE)
|
||
PRATICIEN_LINE_RE = re.compile(r"Nom du praticien[- ]conseil\s*:\s*.*", re.IGNORECASE)
|
||
DIM_LINE_RE = re.compile(r"Nom du m[ée]decin du DIM\s*:\s*.*", re.IGNORECASE)
|
||
DR_MAJ_RE = re.compile(r"Dr\s+[A-ZÀ-Ü' \-]{2,}")
|
||
NOMS_MAJ_RE = re.compile(r"(?<![A-Z])(?:[A-ZÀ-Ü’\-]{2,}\s+){1,}[A-ZÀ-Ü’\-]{2,}")
|
||
|
||
DATE_PATTERNS = [
|
||
(re.compile(r"\b(\d{2})/(\d{2})/(\d{4})\b"), "%d/%m/%Y"),
|
||
(re.compile(r"\b(\d{4})-(\d{2})-(\d{2})\b"), "%Y-%m-%d"),
|
||
]
|
||
|
||
DEFAULT_KEEP_FIELDS = ["Etablissement", "FINESS", "N° OGC", "Dates de séjour", "Service", "RUM", "UM"]
|
||
|
||
def nir_is_valid(nir13: str, cle2: str) -> bool:
|
||
try:
|
||
n = int(nir13); k = int(cle2)
|
||
return (97 - (n % 97)) == k
|
||
except Exception:
|
||
return False
|
||
|
||
# ----------- Modèle avancé HF (cascade) -----------
|
||
|
||
MODEL_PRESETS = {
|
||
"CamemBERT NER (Jean-Baptiste)": "Jean-Baptiste/camembert-ner", # NER prêt à l'emploi
|
||
"CamemBERT-bio (base LM)": "almanach/camembert-base-bio", # base LM, pas NER -> pour tests / remplacez par un NER biomédical si vous en avez un
|
||
"DrBERT (base LM)": "Dr-BERT/DrBERT-7GB", # base LM, pas NER -> idem
|
||
}
|
||
|
||
class AdvancedHF:
|
||
def __init__(self, model_id: str, cache_dir: Path, status_cb=None):
|
||
self.model_id = model_id
|
||
self.cache_dir = cache_dir
|
||
self.pipe = None
|
||
self.status_cb = status_cb or (lambda msg: None)
|
||
|
||
def load(self) -> Tuple[bool, str]:
|
||
try:
|
||
os.environ["HF_HOME"] = str(self.cache_dir)
|
||
self.status_cb("Initialisation Transformers…")
|
||
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline, AutoModel
|
||
# sentencepiece requis pour camembert/drbert
|
||
try:
|
||
import sentencepiece # noqa: F401
|
||
except Exception:
|
||
return False, "Dépendance 'sentencepiece' manquante. Installez-la puis rebuild."
|
||
|
||
self.status_cb("Chargement tokenizer…")
|
||
tok = AutoTokenizer.from_pretrained(self.model_id)
|
||
|
||
self.status_cb("Chargement modèle (peut prendre 1–2 min la 1ère fois)…")
|
||
mdl = None
|
||
try:
|
||
mdl = AutoModelForTokenClassification.from_pretrained(self.model_id)
|
||
head_ok = True
|
||
except Exception as e:
|
||
# si ce n'est pas un modèle NER, on télécharge au moins la base pour le cache
|
||
self.status_cb("Le modèle semble être un 'base LM'. Téléchargement de la base pour cache…")
|
||
try:
|
||
AutoModel.from_pretrained(self.model_id)
|
||
except Exception:
|
||
pass
|
||
return False, ("Le modèle sélectionné ne semble pas être un modèle NER (token-classification). "
|
||
"Choisissez un ID fine-tuné pour le NER (ex. 'Jean-Baptiste/camembert-ner').")
|
||
|
||
try:
|
||
import torch
|
||
torch.set_num_threads(1)
|
||
except Exception:
|
||
pass
|
||
|
||
self.pipe = pipeline("token-classification", model=mdl, tokenizer=tok,
|
||
aggregation_strategy="simple", device=-1)
|
||
return True, f"Modèle avancé prêt: {self.model_id}"
|
||
except Exception as e:
|
||
msg = str(e)
|
||
if "sentencepiece" in msg.lower():
|
||
return False, "Échec: 'sentencepiece' requis."
|
||
return False, f"Échec modèle avancé: {e}"
|
||
|
||
def apply(self, text: str) -> Tuple[str, List[Tuple[int,int,str,str]]]:
|
||
if not self.pipe: return text, []
|
||
res = self.pipe(text)
|
||
spans=[]
|
||
for r in res:
|
||
grp = r.get("entity_group") or r.get("entity") or ""
|
||
start, end = int(r["start"]), int(r["end"])
|
||
if grp.startswith("PER"):
|
||
rep = "[NOM]"
|
||
elif grp.startswith("ORG"):
|
||
rep = "[ETABLISSEMENT]"
|
||
elif grp in ("LOC","GPE") or grp.startswith("LOC"):
|
||
rep = "[VILLE]"
|
||
else:
|
||
continue
|
||
spans.append((start,end,rep,text[start:end]))
|
||
if not spans: return text, []
|
||
spans.sort(key=lambda x:x[0])
|
||
out=[]; last=0; audit=[]
|
||
for s,e,rep,raw in spans:
|
||
if s<last: continue
|
||
out.append(text[last:s]); out.append(rep); last=e
|
||
audit.append((s,e,rep,raw))
|
||
out.append(text[last:])
|
||
return "".join(out), audit
|
||
|
||
# ----------- Moteur Robuste -----------
|
||
|
||
@dataclass
|
||
class Replacement:
|
||
kind: str
|
||
page: Optional[int]
|
||
text_hash: str
|
||
replacement: str
|
||
|
||
class RobustEngine:
|
||
def __init__(self, config: Dict):
|
||
self.nlp = None
|
||
self.use_ner = False
|
||
self.date_policy = config.get("policy",{}).get("dates","keep")
|
||
self.date_shift_days = int(config.get("policy",{}).get("shift_days",0))
|
||
self.whitelist = set(config.get("whitelist",{}).get("tokens", list(DEFAULT_WHITELIST)))
|
||
self.keep_fields = config.get("tables",{}).get("keep_fields", list(DEFAULT_KEEP_FIELDS))
|
||
self.apply_ner_on_narr = True
|
||
# HF
|
||
adv = config.get("advanced", {})
|
||
self.adv_model_id = adv.get("hf_model_id", list(MODEL_PRESETS.values())[0])
|
||
self.adv_cache_dir = Path(os.environ.get("LOCALAPPDATA", resolve_base_dir())) / "Pseudonymiseur" / "models" / "hf_cache"
|
||
self.hf: Optional[AdvancedHF] = None
|
||
|
||
# spaCy
|
||
def try_load_spacy(self, custom_dir: Optional[Path]=None) -> Tuple[bool,str]:
|
||
candidates = []
|
||
if custom_dir: candidates.append(custom_dir)
|
||
candidates.append(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||
for c in candidates:
|
||
if c.exists():
|
||
real = find_model_dir(c)
|
||
if real:
|
||
try:
|
||
self.nlp = load_model_from_path(real); self.use_ner=True
|
||
return True, f"Local: {real}"
|
||
except Exception as e:
|
||
warnings.warn(f"Echec load local {real}: {e}")
|
||
try:
|
||
self.nlp = spacy.load(MODEL_DIR_NAME); self.use_ner=True
|
||
return True, f"spacy.load('{MODEL_DIR_NAME}')"
|
||
except Exception as e:
|
||
self.nlp=None; self.use_ner=False
|
||
return False, f"Indisponible: {e}"
|
||
|
||
# Dates
|
||
def transform_dates(self, text: str) -> str:
|
||
if self.date_policy == "keep": return text
|
||
def as_mo_year(m, fmt):
|
||
try: return datetime.strptime(m.group(0), fmt).strftime("%m/%Y")
|
||
except: return m.group(0)
|
||
def shift(m, fmt):
|
||
try:
|
||
dt = datetime.strptime(m.group(0), fmt) + timedelta(days=self.date_shift_days)
|
||
return dt.strftime(fmt)
|
||
except: return m.group(0)
|
||
for rx,fmt in DATE_PATTERNS:
|
||
if self.date_policy=="month_year": text = rx.sub(lambda m: as_mo_year(m,fmt), text)
|
||
elif self.date_policy=="shift": text = rx.sub(lambda m: shift(m,fmt), text)
|
||
return text
|
||
|
||
# Regex ciblées
|
||
def regex_pass(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||
repls: List[Replacement] = []
|
||
def add(kind, val, placeholder): repls.append(Replacement(kind, page, sha256(val)[:8], placeholder))
|
||
def sub_line(rx, placeholder, s):
|
||
return rx.sub(lambda m: (add("RULE", m.group(0), placeholder) or placeholder), s)
|
||
|
||
text = sub_line(ETAB_LINE_RE, "[ETABLISSEMENT]", text)
|
||
text = sub_line(FINESS_LINE_RE, "[FINESS]", text)
|
||
text = sub_line(OGC_LINE_RE, "[OGC]", text)
|
||
text = sub_line(PRATICIEN_LINE_RE, "[NOM_MEDECIN]", text)
|
||
text = sub_line(DIM_LINE_RE, "[NOM_MEDECIN]", text)
|
||
text = sub_line(DR_MAJ_RE, "[NOM_MEDECIN]", text)
|
||
|
||
for rx, ph, kind in [
|
||
(EMAIL_RE, "[EMAIL]", "EMAIL"),
|
||
(PHONE_RE, "[TEL]", "TEL"),
|
||
(IPP_RE, "[IPP]", "IPP"),
|
||
(IBAN_RE, "[IBAN]","IBAN"),
|
||
]:
|
||
text = rx.sub(lambda m: (repls.append(Replacement(kind,page,sha256(m.group(0))[:8],ph)) or ph), text)
|
||
|
||
def _nir(m):
|
||
nir13, cle2 = m.group(1), m.group(2)
|
||
if nir_is_valid(nir13, cle2):
|
||
repls.append(Replacement("NIR", page, sha256(m.group(0))[:8], "[NIR]")); return "[NIR]"
|
||
return m.group(0)
|
||
text = NIR_RAW_RE.sub(_nir, text)
|
||
|
||
def repl_noms_maj(m):
|
||
cand = m.group(0)
|
||
tokens = re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
|
||
if all(t in self.whitelist for t in tokens): return cand
|
||
repls.append(Replacement("NOM", page, sha256(cand)[:8], "[NOM]")); return "[NOM]"
|
||
text = NOMS_MAJ_RE.sub(repl_noms_maj, text)
|
||
|
||
return text, repls
|
||
|
||
# NER spaCy
|
||
def ner_pass_spacy(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||
if not self.use_ner or not self.nlp: return text, []
|
||
doc = self.nlp(text)
|
||
spans=[]
|
||
for ent in doc.ents:
|
||
lab = ent.label_
|
||
if lab in ("DATE","TIME"): continue
|
||
if lab=="PERSON": rep="[NOM]"
|
||
elif lab=="ORG": rep="[ETABLISSEMENT]"
|
||
elif lab in ("GPE","LOC","FAC"): rep="[VILLE]"
|
||
else: continue
|
||
spans.append((ent.start_char, ent.end_char, rep, ent.text))
|
||
if not spans: return text, []
|
||
spans.sort(key=lambda x:x[0])
|
||
out=[]; last=0; repls=[]
|
||
for s,e,rep,raw in spans:
|
||
if s<last: continue
|
||
out.append(text[last:s]); out.append(rep); last=e
|
||
repls.append(Replacement("NER", page, sha256(raw)[:8], rep))
|
||
out.append(text[last:])
|
||
return "".join(out), repls
|
||
|
||
# HF
|
||
def ensure_hf(self, status_cb=None) -> Tuple[bool,str]:
|
||
if self.hf: return True, "Déjà prêt."
|
||
self.hf = AdvancedHF(self.adv_model_id, self.adv_cache_dir, status_cb=status_cb)
|
||
return self.hf.load()
|
||
|
||
def ner_pass_hf(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||
if not self.hf: return text, []
|
||
t2, aud = self.hf.apply(text)
|
||
repls=[Replacement("HF", page, sha256(raw)[:8], rep) for (_s,_e,rep,raw) in aud]
|
||
return t2, repls
|
||
|
||
# Filet sécurité
|
||
def safety_rescan(self, text: str) -> str:
|
||
for rx,ph in [(FINESS_LINE_RE,"[FINESS]"),(OGC_LINE_RE,"[OGC]"),(ETAB_LINE_RE,"[ETABLISSEMENT]"),
|
||
(PRATICIEN_LINE_RE,"[NOM_MEDECIN]"),(DIM_LINE_RE,"[NOM_MEDECIN]"),(DR_MAJ_RE,"[NOM_MEDECIN]")]:
|
||
text = rx.sub(ph, text)
|
||
text = EMAIL_RE.sub("[EMAIL]", text)
|
||
text = PHONE_RE.sub("[TEL]", text)
|
||
text = IPP_RE.sub("[IPP]", text)
|
||
text = IBAN_RE.sub("[IBAN]", text)
|
||
def _nir(m): return "[NIR]" if nir_is_valid(m.group(1), m.group(2)) else m.group(0)
|
||
text = NIR_RAW_RE.sub(_nir, text)
|
||
def _maj(m):
|
||
cand=m.group(0); toks=re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
|
||
return cand if all(t in self.whitelist for t in toks) else "[NOM]"
|
||
return NOMS_MAJ_RE.sub(_maj, text)
|
||
|
||
# ----------- PDF Processor -----------
|
||
|
||
class PDFProcessor:
|
||
def __init__(self, engine: RobustEngine, options: Dict):
|
||
self.engine=engine; self.options=options
|
||
|
||
def process_pdf(self, pdf_path: Path) -> Tuple[str, List[Replacement], bool]:
|
||
chunks=[]; audit=[]; scanned_like=True
|
||
with pdfplumber.open(str(pdf_path)) as pdf:
|
||
for p_idx, page in enumerate(pdf.pages, start=1):
|
||
page_chunks=[]
|
||
# Tables
|
||
try: tables = page.extract_tables()
|
||
except Exception: tables=[]
|
||
if tables:
|
||
scanned_like=False
|
||
lines_all=[]
|
||
for t in tables:
|
||
rows=[[normalize_text(c or "") for c in row] for row in t]
|
||
text_lines, reps = self._handle_table(rows, p_idx)
|
||
audit += reps; lines_all += text_lines
|
||
if self.options.get("keep_tables", True) and lines_all:
|
||
page_chunks.append("[TABLES]\n" + "\n".join(lines_all) + "\n[/TABLES]")
|
||
# Narratif
|
||
try:
|
||
txt = page.extract_text(x_tolerance=1.5, y_tolerance=3.0) or ""
|
||
except Exception:
|
||
txt=""
|
||
txt=normalize_text(txt)
|
||
if txt.strip():
|
||
scanned_like=False
|
||
txt = self.engine.transform_dates(txt)
|
||
t1, r1 = self.engine.regex_pass(txt, p_idx)
|
||
if self.options.get("apply_ner_on_narrative", True) and self.engine.use_ner:
|
||
t2, r2 = self.engine.ner_pass_spacy(t1, p_idx)
|
||
else:
|
||
t2, r2 = t1, []
|
||
if self.options.get("aggressive_hf", False) and self.engine.hf:
|
||
t3, r3 = self.engine.ner_pass_hf(t2, p_idx)
|
||
else:
|
||
t3, r3 = t2, []
|
||
audit += (r1+r2+r3)
|
||
page_chunks.append(t3)
|
||
if page_chunks:
|
||
chunks.append(f"\n===== PAGE {p_idx} =====\n" + "\n\n".join(page_chunks))
|
||
final_text=("\n\n").join(chunks).strip()
|
||
if self.options.get("safety_rescan", True):
|
||
final_text=self.engine.safety_rescan(final_text)
|
||
return final_text, audit, scanned_like
|
||
|
||
def _handle_table(self, rows: List[List[str]], page: int) -> Tuple[List[str], List[Replacement]]:
|
||
out_lines=[]; repls=[]
|
||
for row in rows:
|
||
if not any(row): continue
|
||
line = "; ".join([c for c in row if c]);
|
||
if not line: continue
|
||
t, rr = self.engine.regex_pass(self.engine.transform_dates(line), page); repls += rr
|
||
kept=False
|
||
for k in self.engine.keep_fields:
|
||
if re.search(rf"(?i)\b{k}\b", t):
|
||
out_lines.append(t); kept=True; break
|
||
if not kept:
|
||
pass
|
||
return out_lines, repls
|
||
|
||
# ----------- GUI -----------
|
||
|
||
def load_config() -> Dict:
|
||
cfg = {
|
||
"whitelist": {"tokens": list(DEFAULT_WHITELIST)},
|
||
"tables": {"keep_fields": list(DEFAULT_KEEP_FIELDS)},
|
||
"policy": {"dates":"keep", "shift_days":0},
|
||
"advanced": {"hf_model_id": list(MODEL_PRESETS.values())[0]},
|
||
}
|
||
cfg_path = resolve_base_dir() / "config.yaml"
|
||
try:
|
||
if yaml and cfg_path.exists():
|
||
with cfg_path.open("r", encoding="utf-8") as f:
|
||
user_cfg = yaml.safe_load(f) or {}
|
||
for k,v in user_cfg.items():
|
||
if isinstance(v, dict) and k in cfg: cfg[k].update(v)
|
||
else: cfg[k]=v
|
||
except Exception:
|
||
pass
|
||
return cfg
|
||
|
||
class App:
|
||
def __init__(self, root: tk.Tk):
|
||
self.root=root; self.root.title(APP_TITLE); self.root.geometry("1100x780")
|
||
self.dir_var = tk.StringVar(); self.status_var = tk.StringVar(value="Prêt.")
|
||
self.model_status_var = tk.StringVar(value="Vérification du modèle spaCy…")
|
||
self.hf_status_var = tk.StringVar(value="Modèle avancé HF : inactif")
|
||
self.regex_only = tk.BooleanVar(value=False)
|
||
self.keep_tables = tk.BooleanVar(value=True)
|
||
self.apply_ner_on_narr = tk.BooleanVar(value=True)
|
||
self.safety_rescan = tk.BooleanVar(value=True)
|
||
self.aggressive_hf = tk.BooleanVar(value=False)
|
||
self.date_policy = tk.StringVar(value="keep")
|
||
self.date_shift_days = tk.StringVar(value="0")
|
||
self.hf_model_label = tk.StringVar(value=list(MODEL_PRESETS.keys())[0])
|
||
self.hf_model_id = tk.StringVar(value=list(MODEL_PRESETS.values())[0])
|
||
self.queue: "queue.Queue[str]" = queue.Queue()
|
||
|
||
self.config = load_config()
|
||
self.engine = RobustEngine(self.config)
|
||
self.engine.adv_cache_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
self._build_ui()
|
||
self._pump_logs()
|
||
|
||
self.root.after(250, self._ensure_spacy)
|
||
|
||
def _build_ui(self):
|
||
top = tk.Frame(self.root, padx=10, pady=10); top.pack(fill=tk.BOTH, expand=True)
|
||
|
||
# Ligne dossier
|
||
row1 = tk.Frame(top); row1.pack(fill=tk.X)
|
||
tk.Label(row1, text="Dossier PDF :").pack(side=tk.LEFT)
|
||
tk.Entry(row1, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
|
||
tk.Button(row1, text="Parcourir…", command=self._browse).pack(side=tk.LEFT, padx=3)
|
||
self.btn_run = tk.Button(row1, text="Lancer", command=self._run, state=tk.DISABLED)
|
||
self.btn_run.pack(side=tk.LEFT, padx=3)
|
||
|
||
# Carte spaCy
|
||
card = tk.LabelFrame(top, text="Modèle spaCy (FR)", padx=8, pady=8); card.pack(fill=tk.X, pady=6)
|
||
tk.Label(card, textvariable=self.model_status_var, anchor="w").pack(fill=tk.X)
|
||
pfrm = tk.Frame(card); pfrm.pack(fill=tk.X, pady=(6,0))
|
||
self.pbar = ttk.Progressbar(pfrm, orient="horizontal", mode="indeterminate", length=300); self.pbar.pack(side=tk.LEFT)
|
||
tk.Button(card, text="Télécharger", command=self._download_spacy).pack(side=tk.LEFT, padx=6)
|
||
tk.Button(card, text="Choisir un dossier…", command=self._choose_model_dir).pack(side=tk.LEFT)
|
||
tk.Checkbutton(card, text="Mode regex seul", variable=self.regex_only, command=self._toggle_regex).pack(side=tk.RIGHT)
|
||
|
||
# Carte HF
|
||
card2 = tk.LabelFrame(top, text="Modèle avancé (Hugging Face)", padx=8, pady=8); card2.pack(fill=tk.X, pady=6)
|
||
rowhf = tk.Frame(card2); rowhf.pack(fill=tk.X)
|
||
tk.Label(rowhf, text="Préréglage :").pack(side=tk.LEFT)
|
||
self.cmb = ttk.Combobox(rowhf, values=list(MODEL_PRESETS.keys()), textvariable=self.hf_model_label, state="readonly", width=35)
|
||
self.cmb.pack(side=tk.LEFT, padx=6)
|
||
self.cmb.bind("<<ComboboxSelected>>", self._preset_changed)
|
||
tk.Label(rowhf, text="Model ID :").pack(side=tk.LEFT)
|
||
tk.Entry(rowhf, textvariable=self.hf_model_id, width=44).pack(side=tk.LEFT, padx=6)
|
||
tk.Button(rowhf, text="Charger modèle avancé", command=self._load_hf).pack(side=tk.LEFT)
|
||
tk.Checkbutton(card2, text="Re-scanner agressif (ajoute le modèle avancé au narratif)", variable=self.aggressive_hf).pack(side=tk.LEFT, padx=10)
|
||
tk.Label(card2, textvariable=self.hf_status_var, anchor="w").pack(fill=tk.X, pady=(6,0))
|
||
|
||
# Options
|
||
opt = tk.LabelFrame(top, text="Options", padx=8, pady=8); opt.pack(fill=tk.X, pady=6)
|
||
tk.Checkbutton(opt, text="Garder tables utiles (réduit)", variable=self.keep_tables).pack(side=tk.LEFT, padx=6)
|
||
tk.Checkbutton(opt, text="Appliquer NER (spaCy) sur narratif", variable=self.apply_ner_on_narr).pack(side=tk.LEFT, padx=6)
|
||
tk.Checkbutton(opt, text="Re-scanner (sécurité) après traitement", variable=self.safety_rescan).pack(side=tk.LEFT, padx=6)
|
||
|
||
pol = tk.LabelFrame(top, text="Politique Dates", padx=8, pady=8); pol.pack(fill=tk.X, pady=6)
|
||
tk.Label(pol, text="Dates :").pack(side=tk.LEFT)
|
||
ttk.Combobox(pol, textvariable=self.date_policy, values=["keep","month_year","shift"], width=12, state="readonly").pack(side=tk.LEFT, padx=6)
|
||
tk.Label(pol, text="Décalage (+/- jours) :").pack(side=tk.LEFT)
|
||
tk.Entry(pol, textvariable=self.date_shift_days, width=6).pack(side=tk.LEFT, padx=6)
|
||
|
||
tk.Label(top, text="Journal :").pack(anchor="w")
|
||
self.txt = tk.Text(top, height=18); self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
|
||
tk.Label(top, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
|
||
|
||
# Helpers
|
||
def _pbar_mode(self, mode:str):
|
||
self.pbar.config(mode=mode)
|
||
if mode=="indeterminate": self.pbar.start(60)
|
||
else: self.pbar.stop(); self.pbar["value"]=0
|
||
|
||
def log(self, msg:str):
|
||
self.queue.put(msg)
|
||
|
||
def _pump_logs(self):
|
||
try:
|
||
while True:
|
||
msg = self.queue.get_nowait()
|
||
self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
|
||
except queue.Empty:
|
||
pass
|
||
finally:
|
||
self.root.after(60, self._pump_logs)
|
||
|
||
# spaCy
|
||
def _ensure_spacy(self):
|
||
self._pbar_mode("indeterminate")
|
||
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||
if ok:
|
||
self.model_status_var.set(f"Modèle prêt. {msg}")
|
||
self.btn_run.config(state=tk.NORMAL)
|
||
else:
|
||
self.model_status_var.set(f"Modèle indisponible : {msg} — utilisez 'Télécharger' ou 'Mode regex seul'.")
|
||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||
self._pbar_mode("determinate")
|
||
|
||
def _download_spacy(self):
|
||
self._pbar_mode("indeterminate"); self.model_status_var.set("Téléchargement spaCy en cours…")
|
||
def work():
|
||
try:
|
||
subprocess.check_call([sys.executable, "-m", "spacy", "download", MODEL_DIR_NAME])
|
||
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||
if ok:
|
||
self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
|
||
else:
|
||
self.model_status_var.set("Échec validation modèle. Essayez 'Choisir un dossier…' ou 'Mode regex seul'.")
|
||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||
except Exception as e:
|
||
self.model_status_var.set(f"Erreur téléchargement spaCy : {e}")
|
||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||
finally:
|
||
self._pbar_mode("determinate")
|
||
threading.Thread(target=work, daemon=True).start()
|
||
|
||
def _choose_model_dir(self):
|
||
d = filedialog.askdirectory(title="Choisir le dossier du modèle spaCy")
|
||
if d:
|
||
ok,msg = self.engine.try_load_spacy(Path(d))
|
||
if ok: self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
|
||
else: self.model_status_var.set("Échec chargement du modèle.");
|
||
if not self.regex_only.get() and not ok: self.btn_run.config(state=tk.DISABLED)
|
||
|
||
def _toggle_regex(self):
|
||
if self.regex_only.get():
|
||
self.engine.use_ner=False; self.apply_ner_on_narr.set(False); self.btn_run.config(state=tk.NORMAL)
|
||
self.model_status_var.set("Mode regex seul : précision NER réduite.")
|
||
else:
|
||
self._ensure_spacy()
|
||
|
||
# HF
|
||
def _preset_changed(self, _evt=None):
|
||
label = self.hf_model_label.get()
|
||
self.hf_model_id.set(MODEL_PRESETS.get(label, list(MODEL_PRESETS.values())[0]))
|
||
|
||
def _load_hf(self):
|
||
mid = self.hf_model_id.get().strip()
|
||
self.hf_status_var.set(f"Chargement du modèle avancé : {mid} …")
|
||
self._pbar_mode("indeterminate")
|
||
def work():
|
||
try:
|
||
self.engine.adv_model_id = mid
|
||
ok,msg = self.engine.ensure_hf(status_cb=lambda m: self.hf_status_var.set(m))
|
||
self.hf_status_var.set(msg)
|
||
finally:
|
||
self._pbar_mode("determinate")
|
||
threading.Thread(target=work, daemon=True).start()
|
||
|
||
# Run
|
||
def _browse(self):
|
||
d = filedialog.askdirectory()
|
||
if d: self.dir_var.set(d)
|
||
|
||
def _run(self):
|
||
folder = Path(self.dir_var.get().strip())
|
||
if not folder.is_dir():
|
||
messagebox.showwarning("Dossier invalide","Choisissez un dossier contenant des PDF.")
|
||
return
|
||
self.engine.use_ner = (not self.regex_only.get()) and (self.engine.nlp is not None) and self.apply_ner_on_narr.get()
|
||
self.engine.date_policy = self.date_policy.get()
|
||
try: self.engine.date_shift_days = int(self.date_shift_days.get() or "0")
|
||
except: self.engine.date_shift_days = 0
|
||
|
||
opts = dict(
|
||
keep_tables = self.keep_tables.get(),
|
||
apply_ner_on_narrative = self.apply_ner_on_narr.get() and self.engine.use_ner,
|
||
safety_rescan = self.safety_rescan.get(),
|
||
aggressive_hf = self.aggressive_hf.get() and (self.engine.hf is not None),
|
||
)
|
||
self.btn_run.config(state=tk.DISABLED)
|
||
threading.Thread(target=self._worker, args=(folder,opts), daemon=True).start()
|
||
|
||
def _worker(self, folder: Path, options: Dict):
|
||
try:
|
||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
|
||
if not pdfs: self.log("Aucun PDF trouvé."); return
|
||
outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True)
|
||
ok=ko=0
|
||
for i,pdf in enumerate(pdfs, start=1):
|
||
self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}")
|
||
try:
|
||
proc = PDFProcessor(self.engine, options)
|
||
text, audit, scanned = proc.process_pdf(pdf)
|
||
(outdir / f"{pdf.stem}.pseudonymise.txt").write_text(text, encoding="utf-8")
|
||
with (outdir / f"{pdf.stem}.pseudonymise.jsonl").open("w", encoding="utf-8") as f:
|
||
for rep in audit: f.write(json.dumps(asdict(rep), ensure_ascii=False) + "\n")
|
||
with (outdir / f"{pdf.stem}.log.txt").open("w", encoding="utf-8") as f:
|
||
f.write(f"Fichier: {pdf.name}\nScanneSuspect: {scanned}\nRemplacements: {len(audit)}\n")
|
||
self.log(f"✓ {pdf.name}"); ok+=1
|
||
except Exception as e:
|
||
self.log(f"✗ {pdf.name} → ERREUR: {e}"); ko+=1
|
||
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
|
||
finally:
|
||
self.btn_run.config(state=tk.NORMAL)
|
||
|
||
# ----------- main -----------
|
||
|
||
def main():
|
||
root = tk.Tk()
|
||
App(root)
|
||
root.mainloop()
|
||
|
||
if __name__ == "__main__":
|
||
main()
|