chore(archives): move 6 legacy GUI/pipeline files to archives/legacy_gui/
## Fichiers déplacés (git mv, historique préservé) - Pseudonymisation_Gui_Models_V4.py (V4 obsolète) - pseudonymisation_pipeline_gui_v3.py (V3 obsolète) - Pseudonymisation_Pipeline_Robuste_Patch.py (oct 2025, abandonné) - pseudonymisation_pipeline_robuste.py (oct 2025, abandonné) - test_gui_error.py (test orphelin V4) - test_gui_fixed.py (test orphelin V4) ## Pourquoi Pour éviter toute confusion avec la GUI active (Pseudonymisation_Gui_V5.py) maintenant que le stash WIP 2026-04-27 (profils + masques + build windows) a été appliqué et que Dom va y faire des modifications avant le MVP. ## README ajouté archives/legacy_gui/README.md documente le contenu, les raisons d'archivage, les fichiers actifs en production, et la procédure de restauration. ## Restauration Réversible via : git mv archives/legacy_gui/<file> . Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
390
archives/legacy_gui/Pseudonymisation_Gui_Models_V4.py
Normal file
390
archives/legacy_gui/Pseudonymisation_Gui_Models_V4.py
Normal file
@@ -0,0 +1,390 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Pseudonymisation – GUI v4 (Gestionnaire de modèles ONNX + mode Simple/Avancé)
|
||||
-----------------------------------------------------------------------------
|
||||
- Onglet Simple : parcours en 3 clics + choix "PDF anonymisé (léger)" / "PDF image (très sûr)"
|
||||
- Onglet Avancé : gestion des règles YAML + Créateur de règle + Gestionnaire de modèles ONNX
|
||||
- Chargement paresseux du modèle NER (CamemBERT family, ONNX Runtime via Optimum)
|
||||
- Application du NER uniquement au narratif, avec seuils par type
|
||||
|
||||
Fichiers requis à côté :
|
||||
- anonymizer_core_refactored_onnx.py
|
||||
- ner_manager_onnx.py
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import queue
|
||||
import re
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox, ttk
|
||||
|
||||
# Core
|
||||
try:
|
||||
import anonymizer_core_refactored_onnx as core
|
||||
except Exception as e:
|
||||
raise SystemExit(f"Impossible d'importer le core ONNX : {e}")
|
||||
|
||||
# NER manager
|
||||
try:
|
||||
from ner_manager_onnx import NerModelManager, NerThresholds
|
||||
except Exception as e:
|
||||
NerModelManager = None # type: ignore
|
||||
NerThresholds = None # type: ignore
|
||||
|
||||
try:
|
||||
from eds_pseudo_manager import EdsPseudoManager
|
||||
except Exception:
|
||||
EdsPseudoManager = None # type: ignore
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except Exception:
|
||||
yaml = None
|
||||
|
||||
from config_defaults import (
|
||||
RUNTIME_DICTIONARIES_CONFIG_PATH,
|
||||
read_default_dictionaries_text,
|
||||
read_runtime_dictionaries_overlay_text,
|
||||
)
|
||||
|
||||
APP_TITLE = "Pseudonymisation de PDF"
|
||||
DEFAULT_CFG = RUNTIME_DICTIONARIES_CONFIG_PATH
|
||||
DEFAULTS_CFG_TEXT = read_default_dictionaries_text()
|
||||
RUNTIME_CFG_TEXT = read_runtime_dictionaries_overlay_text()
|
||||
|
||||
|
||||
class ToolTip:
|
||||
def __init__(self, widget, text: str):
|
||||
self.widget = widget; self.text = text; self.tip=None
|
||||
widget.bind("<Enter>", self.show); widget.bind("<Leave>", self.hide)
|
||||
def show(self, *_):
|
||||
if self.tip: return
|
||||
x = self.widget.winfo_rootx() + 20; y = self.widget.winfo_rooty() + self.widget.winfo_height() + 4
|
||||
self.tip = tw = tk.Toplevel(self.widget); tw.wm_overrideredirect(True); tw.wm_geometry(f"+{x}+{y}")
|
||||
tk.Label(tw, text=self.text, justify=tk.LEFT, relief=tk.SOLID, borderwidth=1, padx=6, pady=4).pack(ipadx=1)
|
||||
def hide(self, *_):
|
||||
if self.tip: self.tip.destroy(); self.tip=None
|
||||
|
||||
def open_folder(path: Path):
|
||||
try:
|
||||
if platform.system() == "Windows": os.startfile(str(path)) # type: ignore
|
||||
elif platform.system() == "Darwin": os.system(f"open '{path}'")
|
||||
else: os.system(f"xdg-open '{path}'")
|
||||
except Exception: pass
|
||||
|
||||
class App:
|
||||
def __init__(self, root: tk.Tk):
|
||||
self.root = root; self.root.title(APP_TITLE); self.root.geometry("1280x900")
|
||||
self.dir_var = tk.StringVar(); self.status_var = tk.StringVar(value="Prêt.")
|
||||
self.cfg_path = tk.StringVar(value=str(DEFAULT_CFG))
|
||||
self.queue: "queue.Queue[str]" = queue.Queue()
|
||||
self.format_var = tk.StringVar(value="raster")
|
||||
|
||||
# NER state
|
||||
self.use_hf = tk.BooleanVar(value=False)
|
||||
self.model_choice = tk.StringVar(value="DistilCamemBERT-NER (ONNX)")
|
||||
self.model_id = tk.StringVar(value="")
|
||||
self.th_per = tk.DoubleVar(value=0.90); self.th_org = tk.DoubleVar(value=0.90); self.th_loc = tk.DoubleVar(value=0.90)
|
||||
self.model_status = tk.StringVar(value="Aucun modèle chargé.")
|
||||
self._onnx_manager: NerModelManager | None = NerModelManager(cache_dir=Path("models")) if NerModelManager else None
|
||||
self._eds_manager: EdsPseudoManager | None = EdsPseudoManager(cache_dir=Path("models")) if EdsPseudoManager else None
|
||||
self._active_manager = None # le manager actuellement chargé
|
||||
|
||||
self.cfg_data: Dict[str, Any] = {}
|
||||
|
||||
self._build_ui(); self._pump_logs(); self._ensure_cfg_exists(); self._load_cfg()
|
||||
|
||||
def _build_ui(self):
|
||||
wrap = tk.Frame(self.root, padx=10, pady=10); wrap.pack(fill=tk.BOTH, expand=True)
|
||||
nb = ttk.Notebook(wrap); nb.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# --- Simple ---
|
||||
simple = tk.Frame(nb, padx=12, pady=12); nb.add(simple, text="Simple")
|
||||
row = tk.Frame(simple); row.pack(fill=tk.X)
|
||||
tk.Label(row, text="Répertoire documents :").pack(side=tk.LEFT)
|
||||
tk.Entry(row, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
|
||||
tk.Button(row, text="Choisir…", command=self._browse).pack(side=tk.LEFT, padx=3)
|
||||
|
||||
fmt = tk.LabelFrame(simple, text="Format du document final"); fmt.pack(fill=tk.X, pady=10)
|
||||
rb_ras = tk.Radiobutton(fmt, text="PDF image (très sûr — recommandé)", variable=self.format_var, value="raster"); rb_ras.pack(anchor="w", padx=6)
|
||||
ToolTip(rb_ras, "Convertit chaque page en image avec boîtes noires. Aucun texte résiduel. Fichier plus lourd, non sélectionnable.")
|
||||
rb_vec = tk.Radiobutton(fmt, text="PDF anonymisé (léger)", variable=self.format_var, value="vector"); rb_vec.pack(anchor="w", padx=6)
|
||||
ToolTip(rb_vec, "⚠ Le texte sous-jacent reste potentiellement récupérable par copier-coller. Utilisez le mode image pour une sécurité maximale.")
|
||||
|
||||
actions = tk.Frame(simple); actions.pack(fill=tk.X, pady=(6,2))
|
||||
self.btn_run = tk.Button(actions, text="Anonymiser", command=self._run); self.btn_run.pack(side=tk.LEFT)
|
||||
tk.Button(actions, text="Aide (2 min)", command=self._show_help).pack(side=tk.LEFT, padx=6)
|
||||
self.btn_open_out = tk.Button(actions, text="Ouvrir le dossier de résultats", command=self._open_out, state=tk.DISABLED); self.btn_open_out.pack(side=tk.RIGHT)
|
||||
|
||||
tk.Label(simple, text="Rapport d’exécution :").pack(anchor="w")
|
||||
self.txt = tk.Text(simple, height=22); self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
|
||||
tk.Label(simple, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
|
||||
|
||||
# --- Avancé ---
|
||||
adv = tk.Frame(nb, padx=12, pady=12); nb.add(adv, text="Avancé")
|
||||
# YAML
|
||||
cfg = tk.LabelFrame(adv, text="Règles & dictionnaires (YAML)", padx=8, pady=8); cfg.pack(fill=tk.X, pady=6)
|
||||
tk.Label(cfg, text="Fichier YAML :").grid(row=0, column=0, sticky="w")
|
||||
tk.Entry(cfg, textvariable=self.cfg_path, width=60).grid(row=0, column=1, sticky="we", padx=6)
|
||||
tk.Button(cfg, text="Parcourir", command=self._cfg_browse).grid(row=0, column=2)
|
||||
tk.Button(cfg, text="Créer/Charger", command=self._load_cfg).grid(row=0, column=3, padx=4)
|
||||
tk.Button(cfg, text="Sauver", command=self._save_cfg).grid(row=0, column=4)
|
||||
tk.Button(cfg, text="Recharger", command=self._reload_cfg).grid(row=0, column=5, padx=4)
|
||||
tk.Button(cfg, text="Restaurer défauts", command=self._restore_defaults).grid(row=0, column=6)
|
||||
cfg.grid_columnconfigure(1, weight=1)
|
||||
|
||||
# Créateur de règle (résumé)
|
||||
rc = tk.LabelFrame(adv, text="Créer rapidement une règle", padx=8, pady=8); rc.pack(fill=tk.X, pady=6)
|
||||
tk.Label(rc, text="Exemple (copiez une ligne du PDF) :").grid(row=0, column=0, sticky="w")
|
||||
self.rule_example = tk.Entry(rc, width=80); self.rule_example.grid(row=0, column=1, columnspan=4, sticky="we", padx=6)
|
||||
tk.Label(rc, text="Type :").grid(row=1, column=0, sticky="e")
|
||||
self.rule_type = ttk.Combobox(rc, values=["Mot exact", "Forme proche", "Modèle avancé"], state="readonly"); self.rule_type.set("Mot exact"); self.rule_type.grid(row=1, column=1, sticky="w")
|
||||
tk.Label(rc, text="Remplacer par :").grid(row=1, column=2, sticky="e")
|
||||
self.rule_placeholder = tk.Entry(rc, width=18); self.rule_placeholder.insert(0, "[MASK]"); self.rule_placeholder.grid(row=1, column=3, sticky="w")
|
||||
tk.Label(rc, text="Où :").grid(row=1, column=4, sticky="e")
|
||||
self.rule_scope = ttk.Combobox(rc, values=["partout", "narratif", "tables_valeur", "entetes_pieds"], state="readonly"); self.rule_scope.set("partout"); self.rule_scope.grid(row=1, column=5, sticky="w")
|
||||
self.flag_ic = tk.BooleanVar(value=True); self.flag_bow = tk.BooleanVar(value=True)
|
||||
tk.Checkbutton(rc, text="Ignorer la casse (A=a)", variable=self.flag_ic).grid(row=2, column=1, sticky="w")
|
||||
tk.Checkbutton(rc, text="Respecter les mots entiers", variable=self.flag_bow).grid(row=2, column=2, sticky="w")
|
||||
tk.Button(rc, text="Prévisualiser", command=self._preview_rule).grid(row=2, column=4)
|
||||
tk.Button(rc, text="Enregistrer la règle", command=self._save_rule).grid(row=2, column=5)
|
||||
|
||||
# Gestionnaire de modèles ONNX
|
||||
mm = tk.LabelFrame(adv, text="Renforcement NER (ONNX – narratif uniquement)", padx=8, pady=8); mm.pack(fill=tk.X, pady=6)
|
||||
tk.Checkbutton(mm, text="Activer le renforcement NER", variable=self.use_hf).grid(row=0, column=0, sticky="w")
|
||||
tk.Label(mm, text="Modèle :").grid(row=1, column=0, sticky="e")
|
||||
# Fusionner les catalogues ONNX + EDS-Pseudo
|
||||
catalog = {}
|
||||
if self._onnx_manager:
|
||||
catalog.update(self._onnx_manager.models_catalog())
|
||||
if self._eds_manager:
|
||||
catalog.update(self._eds_manager.models_catalog())
|
||||
self._merged_catalog = catalog
|
||||
self.model_combo = ttk.Combobox(mm, values=list(catalog.keys()), state="readonly")
|
||||
if self.model_combo["values"]:
|
||||
self.model_combo.set(self.model_combo["values"][0])
|
||||
self.model_combo.grid(row=1, column=1, sticky="w")
|
||||
tk.Label(mm, text="ou ID/chemin :").grid(row=1, column=2, sticky="e")
|
||||
tk.Entry(mm, textvariable=self.model_id, width=36).grid(row=1, column=3, sticky="w")
|
||||
tk.Button(mm, text="Charger", command=self._load_model).grid(row=1, column=4, padx=4)
|
||||
tk.Button(mm, text="Décharger", command=self._unload_model).grid(row=1, column=5)
|
||||
tk.Label(mm, textvariable=self.model_status).grid(row=2, column=0, columnspan=6, sticky="w", pady=(4,2))
|
||||
ToolTip(mm, "Le modèle détecte les noms propres dans le texte libre. Les tableaux (clé : valeur) ne sont pas modifiés.")
|
||||
|
||||
tk.Label(mm, text="Seuils (0–1)").grid(row=3, column=0, sticky="e")
|
||||
tk.Label(mm, text="PERSON").grid(row=3, column=1, sticky="w")
|
||||
tk.Entry(mm, textvariable=self.th_per, width=6).grid(row=3, column=2, sticky="w")
|
||||
tk.Label(mm, text="ORG").grid(row=3, column=3, sticky="w")
|
||||
tk.Entry(mm, textvariable=self.th_org, width=6).grid(row=3, column=4, sticky="w")
|
||||
tk.Label(mm, text="LOC").grid(row=3, column=5, sticky="w")
|
||||
tk.Entry(mm, textvariable=self.th_loc, width=6).grid(row=3, column=6, sticky="w")
|
||||
|
||||
mm.grid_columnconfigure(1, weight=1)
|
||||
|
||||
# YAML helpers
|
||||
def _ensure_cfg_exists(self):
|
||||
p = Path(self.cfg_path.get()); p.parent.mkdir(parents=True, exist_ok=True)
|
||||
if not p.exists(): p.write_text(RUNTIME_CFG_TEXT, encoding="utf-8")
|
||||
def _cfg_browse(self):
|
||||
d = filedialog.asksaveasfilename(defaultextension=".yml", filetypes=[("YAML","*.yml *.yaml"), ("Tous","*.*")])
|
||||
if d: self.cfg_path.set(d)
|
||||
def _load_cfg(self):
|
||||
if yaml is None:
|
||||
messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml)."); return
|
||||
self._ensure_cfg_exists()
|
||||
try:
|
||||
self.cfg_data = yaml.safe_load(Path(self.cfg_path.get()).read_text(encoding="utf-8")) or {}
|
||||
self._log(f"Règles chargées: {self.cfg_path.get()}")
|
||||
except Exception as e:
|
||||
messagebox.showerror("Fichier de règles invalide", str(e))
|
||||
def _save_cfg(self):
|
||||
if yaml is None:
|
||||
messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml)."); return
|
||||
try:
|
||||
Path(self.cfg_path.get()).write_text(yaml.safe_dump(self.cfg_data or {}, allow_unicode=True, sort_keys=False), encoding="utf-8")
|
||||
self._log("Règles sauvegardées.")
|
||||
except Exception as e:
|
||||
messagebox.showerror("Erreur", f"Impossible d'écrire le YAML: {e}")
|
||||
def _reload_cfg(self): self._load_cfg(); self._log("Règles rechargées.")
|
||||
def _restore_defaults(self):
|
||||
try:
|
||||
Path(self.cfg_path.get()).write_text(RUNTIME_CFG_TEXT, encoding="utf-8"); self._log("Surcharge locale réinitialisée."); self._load_cfg()
|
||||
except Exception as e:
|
||||
messagebox.showerror("Erreur", f"Impossible d'écrire le YAML par défaut: {e}")
|
||||
|
||||
# Règles rapides (résumé)
|
||||
def _build_simple_regex(self, sample: str, bow: bool) -> str:
|
||||
s = sample.strip(); s = re.sub(r"\s+", r"\\s+", re.escape(s))
|
||||
return rf"\b{s}\b" if bow else s
|
||||
def _preview_rule(self):
|
||||
sample = getattr(self, 'rule_example').get().strip()
|
||||
if not sample: messagebox.showinfo("Info", "Exemple vide."); return
|
||||
rtype = getattr(self, 'rule_type').get(); ic = getattr(self, 'flag_ic').get(); bow = getattr(self, 'flag_bow').get()
|
||||
pattern = sample if rtype == "Modèle avancé" else self._build_simple_regex(sample, bow)
|
||||
try:
|
||||
rx = re.compile(pattern, re.IGNORECASE if ic else 0)
|
||||
except Exception as e:
|
||||
messagebox.showerror("Modèle invalide", str(e)); return
|
||||
folder = Path(self.dir_var.get().strip()); pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()]) if folder.is_dir() else []
|
||||
if not pdfs: messagebox.showinfo("Info", "Aucun PDF pour prévisualiser."); return
|
||||
try:
|
||||
pages_text, tables_lines = core.extract_text_three_passes(pdfs[0])
|
||||
text = "\n".join(pages_text) + "\n\n" + "\n".join("\n".join(r) for r in tables_lines)
|
||||
hits = len(rx.findall(text)); self._log(f"Prévisualisation: {hits} occurences sur {pdfs[0].name}")
|
||||
except Exception as e:
|
||||
self._log(f"Prévisualisation indisponible: {e}")
|
||||
def _save_rule(self):
|
||||
if yaml is None: messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml)."); return
|
||||
sample = getattr(self, 'rule_example').get().strip()
|
||||
if not sample: messagebox.showinfo("Info", "Exemple vide."); return
|
||||
rtype = getattr(self, 'rule_type').get(); ic = getattr(self, 'flag_ic').get(); bow = getattr(self, 'flag_bow').get(); placeholder = getattr(self, 'rule_placeholder').get().strip() or "[MASK]"; scope = getattr(self, 'rule_scope').get()
|
||||
cfg = self.cfg_data or {}; cfg.setdefault("blacklist", {}); cfg.setdefault("regex_overrides", [])
|
||||
if rtype == "Mot exact":
|
||||
lst = cfg["blacklist"].setdefault("force_mask_terms", [])
|
||||
if sample not in lst: lst.append(sample)
|
||||
elif rtype == "Forme proche":
|
||||
pattern = self._build_simple_regex(sample, bow)
|
||||
lst = cfg["blacklist"].setdefault("force_mask_regex", [])
|
||||
if pattern not in lst: lst.append(pattern)
|
||||
else:
|
||||
entry = {"name": f"custom_{len(cfg['regex_overrides'])+1}", "pattern": sample, "placeholder": placeholder, "flags": ["IGNORECASE"] if ic else [], "scope": scope}
|
||||
cfg["regex_overrides"].append(entry)
|
||||
self.cfg_data = cfg; self._save_cfg(); self._log("Règle ajoutée au YAML.")
|
||||
|
||||
# Gestionnaire de modèles
|
||||
def _load_model(self):
|
||||
choice = self.model_combo.get().strip()
|
||||
mid = self.model_id.get().strip()
|
||||
model_id = self._merged_catalog.get(choice) if choice else None
|
||||
model_id = mid or model_id or "cmarkea/distilcamembert-base-ner"
|
||||
# Déterminer quel manager utiliser
|
||||
is_eds = False
|
||||
if self._eds_manager:
|
||||
eds_ids = set(self._eds_manager.models_catalog().values())
|
||||
if model_id in eds_ids:
|
||||
is_eds = True
|
||||
if is_eds:
|
||||
if not self._eds_manager:
|
||||
messagebox.showerror("edsnlp indisponible", "Installez : pip install 'edsnlp[ml]>=0.12.0'"); return
|
||||
manager = self._eds_manager
|
||||
else:
|
||||
if not self._onnx_manager:
|
||||
messagebox.showerror("ONNX indisponible", "Installez 'onnxruntime' et 'optimum'."); return
|
||||
manager = self._onnx_manager
|
||||
try:
|
||||
self.model_status.set("Chargement du modèle…")
|
||||
self.root.update_idletasks()
|
||||
manager.load(model_id)
|
||||
self._active_manager = manager
|
||||
label = "EDS-Pseudo" if is_eds else "ONNX"
|
||||
self.model_status.set(f"Modèle chargé ({label}) : {model_id}")
|
||||
self.use_hf.set(True)
|
||||
except Exception as e:
|
||||
self.model_status.set(f"Échec : {e}")
|
||||
self.use_hf.set(False)
|
||||
|
||||
def _unload_model(self):
|
||||
if self._onnx_manager:
|
||||
self._onnx_manager.unload()
|
||||
if self._eds_manager:
|
||||
self._eds_manager.unload()
|
||||
self._active_manager = None
|
||||
self.model_status.set("Aucun modèle chargé.")
|
||||
self.use_hf.set(False)
|
||||
|
||||
# Actions
|
||||
def _browse(self):
|
||||
d = filedialog.askdirectory();
|
||||
if d: self.dir_var.set(d)
|
||||
|
||||
def _run(self):
|
||||
folder = Path(self.dir_var.get().strip())
|
||||
if not folder.is_dir(): messagebox.showwarning("Dossier invalide", "Choisissez un dossier contenant des PDF."); return
|
||||
self.btn_run.config(state=tk.DISABLED)
|
||||
threading.Thread(target=self._worker, args=(folder,), daemon=True).start()
|
||||
|
||||
def _worker(self, folder: Path):
|
||||
try:
|
||||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
|
||||
if not pdfs: self._log("Aucun PDF trouvé."); return
|
||||
outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True)
|
||||
ok = ko = 0; global_counts: Dict[str,int] = {}
|
||||
for i, pdf in enumerate(pdfs, start=1):
|
||||
self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}")
|
||||
make_vec = (self.format_var.get() == "vector"); make_ras = (self.format_var.get() == "raster")
|
||||
try:
|
||||
active = self._active_manager
|
||||
use_ner = bool(active and self.use_hf.get() and active.is_loaded())
|
||||
thresholds = NerThresholds(self.th_per.get(), self.th_org.get(), self.th_loc.get(), 0.85) if (use_ner and NerThresholds and not (EdsPseudoManager and isinstance(active, EdsPseudoManager))) else None
|
||||
outputs = core.process_pdf(
|
||||
pdf_path=pdf,
|
||||
out_dir=outdir,
|
||||
make_vector_redaction=make_vec,
|
||||
also_make_raster_burn=make_ras,
|
||||
config_path=Path(self.cfg_path.get()),
|
||||
use_hf=use_ner,
|
||||
ner_manager=active,
|
||||
ner_thresholds=thresholds,
|
||||
)
|
||||
self._log("✓ " + pdf.name)
|
||||
for k, v in outputs.items(): self._log(f" - {k}: {v}")
|
||||
# Résumé
|
||||
audit_path = Path(outputs.get("audit", ""))
|
||||
counts = self._count_audit(audit_path)
|
||||
if counts:
|
||||
self._log(" ~ résumé : " + ", ".join(f"{k}={v}" for k, v in sorted(counts.items())))
|
||||
for k,v in counts.items(): global_counts[k] = global_counts.get(k,0)+v
|
||||
ok += 1
|
||||
except Exception as e:
|
||||
self._log(f"✗ {pdf.name} → ERREUR: {e}"); ko += 1
|
||||
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
|
||||
if ok: self.btn_open_out.config(state=tk.NORMAL); self._last_outdir = outdir
|
||||
if ok: self._log("RÉSUMÉ DU LOT : " + ", ".join(f"{k}={v}" for k, v in sorted(global_counts.items())))
|
||||
finally:
|
||||
self.btn_run.config(state=tk.NORMAL)
|
||||
|
||||
def _count_audit(self, audit_path: Path) -> Dict[str,int]:
|
||||
d: Dict[str,int] = {}
|
||||
try:
|
||||
with open(audit_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
try:
|
||||
obj = json.loads(line); k = obj.get("kind", "?"); d[k] = d.get(k,0)+1
|
||||
except Exception: pass
|
||||
except Exception: pass
|
||||
return d
|
||||
|
||||
def _open_out(self):
|
||||
p = getattr(self, "_last_outdir", None)
|
||||
if p: open_folder(p)
|
||||
|
||||
def _pump_logs(self):
|
||||
try:
|
||||
while True:
|
||||
msg = self.queue.get_nowait(); self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
|
||||
except queue.Empty:
|
||||
pass
|
||||
finally:
|
||||
self.root.after(60, self._pump_logs)
|
||||
def _log(self, msg: str): self.queue.put(msg)
|
||||
|
||||
def _show_help(self):
|
||||
messagebox.showinfo(
|
||||
"Aide (2 minutes)",
|
||||
"1) Choisissez un dossier avec vos PDF.\n"
|
||||
"2) Choisissez le format du document final.\n"
|
||||
" - PDF anonymisé (léger) : texte supprimé + boîtes noires (sélection possible).\n"
|
||||
" - PDF image (très sûr) : chaque page en image, aucun texte résiduel.\n"
|
||||
"3) (Option) Chargez un modèle pour renforcer la détection des noms dans le texte libre.\n"
|
||||
"4) Cliquez sur Anonymiser, puis ouvrez le dossier de résultats.",
|
||||
)
|
||||
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk(); App(root); root.mainloop()
|
||||
167
archives/legacy_gui/Pseudonymisation_Pipeline_Robuste_Patch.py
Normal file
167
archives/legacy_gui/Pseudonymisation_Pipeline_Robuste_Patch.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
GUI Pseudonymisation – Patch d'intégration du Core refactorisé (P0)
|
||||
-------------------------------------------------------------------
|
||||
Ce patch remplace le moteur interne d'extraction/anonymisation par le module
|
||||
`anonymizer_core_refactored.py` livré précédemment, et ajoute la génération
|
||||
optionnelle de PDF anonymisés avec **boîtes noires** (vector redaction et raster burn).
|
||||
|
||||
Points clés :
|
||||
- Appel unique : core.process_pdf(pdf_path, out_dir, make_vector_redaction, also_make_raster_burn)
|
||||
- Sorties : .pseudonymise.txt, .audit.jsonl, .redacted_vector.pdf (option), .redacted_raster.pdf (option)
|
||||
- UI : ajout de cases à cocher pour activer la sortie PDF vector/raster ;
|
||||
désactivation du bouton « Télécharger » spaCy après succès.
|
||||
|
||||
Dépendances : pdfplumber, pdfminer.six, pymupdf, pillow, spacy (optionnel pour l'UI), transformers (optionnel)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import queue
|
||||
import threading
|
||||
from dataclasses import asdict
|
||||
from pathlib import Path
|
||||
from typing import Dict
|
||||
|
||||
# GUI
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox, ttk
|
||||
|
||||
# Core refactorisé
|
||||
try:
|
||||
import anonymizer_core_refactored as core
|
||||
except Exception as e:
|
||||
raise SystemExit("Impossible d'importer anonymizer_core_refactored.py. Placez-le à côté de ce script.")
|
||||
|
||||
APP_TITLE = "Pseudonymisation (Refactor P0 + PDF Redaction)"
|
||||
|
||||
# ---------------- Utilitaires ----------------
|
||||
|
||||
def resolve_base_dir() -> Path:
|
||||
return Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
|
||||
|
||||
# ---------------- Application ----------------
|
||||
|
||||
class App:
|
||||
def __init__(self, root: tk.Tk):
|
||||
self.root = root
|
||||
self.root.title(APP_TITLE)
|
||||
self.root.geometry("1100x780")
|
||||
|
||||
# State/UI vars
|
||||
self.dir_var = tk.StringVar()
|
||||
self.status_var = tk.StringVar(value="Prêt.")
|
||||
self.model_status_var = tk.StringVar(value="Modèle spaCy : optionnel (désactivez si absent)")
|
||||
self.queue: "queue.Queue[str]" = queue.Queue()
|
||||
|
||||
# Options
|
||||
self.opt_vector_pdf = tk.BooleanVar(value=True)
|
||||
self.opt_raster_pdf = tk.BooleanVar(value=False)
|
||||
|
||||
# spaCy (optionnel) — on garde l'emplacement UI mais on ne le rend pas bloquant
|
||||
self._build_ui()
|
||||
self._pump_logs()
|
||||
|
||||
# ---------------- UI ----------------
|
||||
def _build_ui(self):
|
||||
top = tk.Frame(self.root, padx=10, pady=10)
|
||||
top.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# Ligne dossier
|
||||
row1 = tk.Frame(top); row1.pack(fill=tk.X)
|
||||
tk.Label(row1, text="Dossier PDF :").pack(side=tk.LEFT)
|
||||
tk.Entry(row1, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
|
||||
tk.Button(row1, text="Parcourir…", command=self._browse).pack(side=tk.LEFT, padx=3)
|
||||
self.btn_run = tk.Button(row1, text="Lancer", command=self._run)
|
||||
self.btn_run.pack(side=tk.LEFT, padx=3)
|
||||
|
||||
# Carte spaCy (informative)
|
||||
card = tk.LabelFrame(top, text="Modèle spaCy (FR) — optionnel", padx=8, pady=8)
|
||||
card.pack(fill=tk.X, pady=6)
|
||||
self.btn_download = tk.Button(card, text="Télécharger (wheel recommandé)", command=self._download_spacy_disabled, state=tk.DISABLED)
|
||||
self.btn_download.pack(side=tk.RIGHT)
|
||||
tk.Label(card, textvariable=self.model_status_var, anchor="w").pack(fill=tk.X)
|
||||
|
||||
# Options de sortie PDF
|
||||
opt = tk.LabelFrame(top, text="Sorties PDF anonymisées", padx=8, pady=8)
|
||||
opt.pack(fill=tk.X, pady=6)
|
||||
tk.Checkbutton(opt, text="PDF vectoriel (redaction réelle)", variable=self.opt_vector_pdf).pack(side=tk.LEFT, padx=6)
|
||||
tk.Checkbutton(opt, text="PDF raster (sécurité maximale)", variable=self.opt_raster_pdf).pack(side=tk.LEFT, padx=6)
|
||||
|
||||
# Journal
|
||||
tk.Label(top, text="Journal :").pack(anchor="w")
|
||||
self.txt = tk.Text(top, height=22)
|
||||
self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
|
||||
tk.Label(top, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
|
||||
|
||||
def _download_spacy_disabled(self):
|
||||
messagebox.showinfo("Info", "L'installation via wheel est recommandée et gérée hors app. Bouton désactivé.")
|
||||
|
||||
def _pump_logs(self):
|
||||
try:
|
||||
while True:
|
||||
msg = self.queue.get_nowait()
|
||||
self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
|
||||
except queue.Empty:
|
||||
pass
|
||||
finally:
|
||||
self.root.after(60, self._pump_logs)
|
||||
|
||||
# ---------------- Actions ----------------
|
||||
def _browse(self):
|
||||
d = filedialog.askdirectory()
|
||||
if d:
|
||||
self.dir_var.set(d)
|
||||
|
||||
def _run(self):
|
||||
folder = Path(self.dir_var.get().strip())
|
||||
if not folder.is_dir():
|
||||
messagebox.showwarning("Dossier invalide", "Choisissez un dossier contenant des PDF.")
|
||||
return
|
||||
self.btn_run.config(state=tk.DISABLED)
|
||||
threading.Thread(target=self._worker, args=(folder,), daemon=True).start()
|
||||
|
||||
def _worker(self, folder: Path):
|
||||
try:
|
||||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
|
||||
if not pdfs:
|
||||
self._log("Aucun PDF trouvé."); return
|
||||
outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True)
|
||||
ok = ko = 0
|
||||
for i, pdf in enumerate(pdfs, start=1):
|
||||
self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}")
|
||||
try:
|
||||
outputs = core.process_pdf(
|
||||
pdf_path=pdf,
|
||||
out_dir=outdir,
|
||||
make_vector_redaction=self.opt_vector_pdf.get(),
|
||||
also_make_raster_burn=self.opt_raster_pdf.get(),
|
||||
)
|
||||
# Log bref des artefacts
|
||||
self._log("✓ " + pdf.name)
|
||||
for k, v in outputs.items():
|
||||
self._log(f" - {k}: {v}")
|
||||
ok += 1
|
||||
except Exception as e:
|
||||
self._log(f"✗ {pdf.name} → ERREUR: {e}")
|
||||
ko += 1
|
||||
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
|
||||
finally:
|
||||
self.btn_run.config(state=tk.NORMAL)
|
||||
|
||||
def _log(self, msg: str):
|
||||
self.queue.put(msg)
|
||||
|
||||
|
||||
# ---------------- main ----------------
|
||||
|
||||
def main():
|
||||
root = tk.Tk()
|
||||
App(root)
|
||||
root.mainloop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
35
archives/legacy_gui/README.md
Normal file
35
archives/legacy_gui/README.md
Normal file
@@ -0,0 +1,35 @@
|
||||
# Archives — Anciennes GUIs et pipelines
|
||||
|
||||
Ce dossier contient les fichiers obsolètes mis de côté en juin 2026 lors du
|
||||
sprint MVP Q-1 / déploiement bêta Réunion.
|
||||
|
||||
**Aucun fichier ici n'est utilisé en production.** L'historique git est
|
||||
préservé — restauration possible via `git mv archives/legacy_gui/<file> .`.
|
||||
|
||||
## Contenu
|
||||
|
||||
| Fichier | Dernière modif | Statut | Pourquoi archivé |
|
||||
|---|---|---|---|
|
||||
| `Pseudonymisation_Gui_Models_V4.py` | 2026-04-20 | obsolète | Remplacée par `Pseudonymisation_Gui_V5.py` |
|
||||
| `pseudonymisation_pipeline_gui_v3.py` | 2026-04-20 | obsolète | V3 antérieure à V4 |
|
||||
| `Pseudonymisation_Pipeline_Robuste_Patch.py` | 2025-10-03 | abandonné | Patch obsolète du pipeline RobustEngine |
|
||||
| `pseudonymisation_pipeline_robuste.py` | 2025-10-02 | abandonné | RobustEngine non utilisé dans le pipeline principal |
|
||||
| `test_gui_error.py` | 2026-04-20 | orphelin | Test de la V4, plus pertinent |
|
||||
| `test_gui_fixed.py` | 2026-04-20 | orphelin | Test de la V4, plus pertinent |
|
||||
|
||||
## Pipeline / GUI actifs en production
|
||||
|
||||
- **GUI active** : `Pseudonymisation_Gui_V5.py` (à la racine du projet)
|
||||
- **Pipeline / core** : `anonymizer_core_refactored_onnx.py`
|
||||
- **Launcher EXE** : `launcher.py`
|
||||
- **Quarantaine Q-1** : `quarantine.py`
|
||||
|
||||
## Restauration
|
||||
|
||||
Pour remettre un fichier en place :
|
||||
|
||||
```bash
|
||||
git mv archives/legacy_gui/<fichier> .
|
||||
```
|
||||
|
||||
L'historique git complet de chaque fichier est intact (`git log --follow`).
|
||||
439
archives/legacy_gui/pseudonymisation_pipeline_gui_v3.py
Normal file
439
archives/legacy_gui/pseudonymisation_pipeline_gui_v3.py
Normal file
@@ -0,0 +1,439 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Pseudonymisation – GUI v3 (UX simplifiée + infobulles + créateur de règle)
|
||||
--------------------------------------------------------------------------
|
||||
- Mode "Simple" par défaut (vocabulaire non-tech) + Mode "Avancé" (règles YAML)
|
||||
- Options de sortie claires : "PDF anonymisé (léger)" et "PDF image (très sûr)" avec infobulles
|
||||
- Gestion de dictionnaires YAML (whitelist/blacklist/overrides)
|
||||
- Créateur de règle (Mot exact / Forme proche / Modèle avancé) avec prévisualisation
|
||||
- Résumé par document (compte des remplacements) + bouton "Ouvrir dossier des résultats"
|
||||
- Auto-fix YAML : conversion automatique des patterns en bloc littéral si le YAML est mal cité
|
||||
|
||||
Dépendances : tkinter, PyYAML, PyMuPDF, pdfplumber, pdfminer.six, Pillow
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import queue
|
||||
import threading
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any, List
|
||||
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox, ttk
|
||||
|
||||
# Core anonymisation (laisse ce fichier à côté de ce script)
|
||||
try:
|
||||
import anonymizer_core_refactored as core
|
||||
except Exception as e:
|
||||
raise SystemExit(f"Impossible d'importer anonymizer_core_refactored: {e}")
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except Exception:
|
||||
yaml = None
|
||||
|
||||
from config_defaults import (
|
||||
RUNTIME_DICTIONARIES_CONFIG_PATH,
|
||||
read_default_dictionaries_text,
|
||||
read_runtime_dictionaries_overlay_text,
|
||||
)
|
||||
|
||||
APP_TITLE = "Pseudonymisation de PDF"
|
||||
DEFAULT_CFG = RUNTIME_DICTIONARIES_CONFIG_PATH
|
||||
|
||||
# YAML par défaut externalisé dans config/dictionnaires.default.yml
|
||||
DEFAULTS_CFG_TEXT = read_default_dictionaries_text()
|
||||
RUNTIME_CFG_TEXT = read_runtime_dictionaries_overlay_text()
|
||||
|
||||
# ---------- util : ToolTip & helpers ----------
|
||||
class ToolTip:
|
||||
def __init__(self, widget, text: str):
|
||||
self.widget = widget
|
||||
self.text = text
|
||||
self.tip = None
|
||||
widget.bind("<Enter>", self.show)
|
||||
widget.bind("<Leave>", self.hide)
|
||||
def show(self, *_):
|
||||
if self.tip is not None: return
|
||||
x = self.widget.winfo_rootx() + 20
|
||||
y = self.widget.winfo_rooty() + self.widget.winfo_height() + 6
|
||||
self.tip = tw = tk.Toplevel(self.widget)
|
||||
tw.wm_overrideredirect(True)
|
||||
tw.wm_geometry(f"+{x}+{y}")
|
||||
lab = tk.Label(tw, text=self.text, justify=tk.LEFT, relief=tk.SOLID, borderwidth=1, padx=8, pady=6)
|
||||
lab.pack(ipadx=1)
|
||||
def hide(self, *_):
|
||||
if self.tip:
|
||||
self.tip.destroy(); self.tip=None
|
||||
|
||||
def open_folder(path: Path):
|
||||
try:
|
||||
if platform.system() == "Windows":
|
||||
os.startfile(str(path)) # type: ignore[attr-defined]
|
||||
elif platform.system() == "Darwin":
|
||||
os.system(f"open '{path}'")
|
||||
else:
|
||||
os.system(f"xdg-open '{path}'")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# ---------- App ----------
|
||||
class App:
|
||||
def __init__(self, root: tk.Tk):
|
||||
self.root = root
|
||||
self.root.title(APP_TITLE)
|
||||
self.root.geometry("1250x880")
|
||||
|
||||
# Etat
|
||||
self.dir_var = tk.StringVar()
|
||||
self.status_var = tk.StringVar(value="Prêt.")
|
||||
self.cfg_path = tk.StringVar(value=str(DEFAULT_CFG))
|
||||
self.queue: "queue.Queue[str]" = queue.Queue()
|
||||
|
||||
# Choix format
|
||||
self.format_var = tk.StringVar(value="vector") # "vector" ou "raster"
|
||||
|
||||
# Mémoire config
|
||||
self.cfg_data: Dict[str, Any] = {}
|
||||
|
||||
# UI
|
||||
self._build_ui()
|
||||
self._pump_logs()
|
||||
|
||||
# Prépare YAML
|
||||
self._ensure_cfg_exists()
|
||||
self._load_cfg()
|
||||
|
||||
# ----- UI -----
|
||||
def _build_ui(self):
|
||||
wrap = tk.Frame(self.root, padx=10, pady=10)
|
||||
wrap.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# Tabs Simple / Avancé
|
||||
self.nb = ttk.Notebook(wrap)
|
||||
self.nb.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# --- Onglet Simple ---
|
||||
simple = tk.Frame(self.nb, padx=12, pady=12)
|
||||
self.nb.add(simple, text="Simple")
|
||||
|
||||
row = tk.Frame(simple); row.pack(fill=tk.X)
|
||||
tk.Label(row, text="Vos documents :").pack(side=tk.LEFT)
|
||||
tk.Entry(row, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
|
||||
tk.Button(row, text="Choisir…", command=self._browse).pack(side=tk.LEFT, padx=3)
|
||||
|
||||
# Choix format clair
|
||||
fmt = tk.LabelFrame(simple, text="Format du document final")
|
||||
fmt.pack(fill=tk.X, pady=10)
|
||||
|
||||
# PDF anonymisé (léger)
|
||||
rb_vec = tk.Radiobutton(fmt, text="PDF anonymisé (léger)", variable=self.format_var, value="vector")
|
||||
rb_vec.pack(anchor="w", padx=6, pady=2)
|
||||
ToolTip(rb_vec, "Supprime le texte et applique des boîtes noires.\nFichier léger. Le texte n’est plus lisible mais la sélection reste possible.")
|
||||
|
||||
# PDF image (très sûr)
|
||||
rb_ras = tk.Radiobutton(fmt, text="PDF image (très sûr)", variable=self.format_var, value="raster")
|
||||
rb_ras.pack(anchor="w", padx=6, pady=2)
|
||||
ToolTip(rb_ras, "Convertit chaque page en image puis ajoute des boîtes noires.\nAucun texte résiduel. Fichier plus lourd et non sélectionnable.")
|
||||
|
||||
# Boutons action
|
||||
actions = tk.Frame(simple); actions.pack(fill=tk.X, pady=(6,2))
|
||||
self.btn_run = tk.Button(actions, text="Anonymiser", command=self._run, height=1)
|
||||
self.btn_run.pack(side=tk.LEFT)
|
||||
tk.Button(actions, text="Aide (2 min)", command=self._show_help).pack(side=tk.LEFT, padx=6)
|
||||
self.btn_open_out = tk.Button(actions, text="Ouvrir le dossier de résultats", command=self._open_out, state=tk.DISABLED)
|
||||
self.btn_open_out.pack(side=tk.RIGHT)
|
||||
|
||||
# Rapport
|
||||
tk.Label(simple, text="Rapport d’exécution :").pack(anchor="w")
|
||||
self.txt = tk.Text(simple, height=22)
|
||||
self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
|
||||
tk.Label(simple, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
|
||||
|
||||
# --- Onglet Avancé ---
|
||||
adv = tk.Frame(self.nb, padx=12, pady=12)
|
||||
self.nb.add(adv, text="Avancé")
|
||||
|
||||
# Bloc dictionnaires YAML
|
||||
cfg = tk.LabelFrame(adv, text="Règles & dictionnaires (YAML)", padx=8, pady=8)
|
||||
cfg.pack(fill=tk.X, pady=6)
|
||||
tk.Label(cfg, text="Fichier YAML :").grid(row=0, column=0, sticky="w")
|
||||
tk.Entry(cfg, textvariable=self.cfg_path, width=60).grid(row=0, column=1, sticky="we", padx=6)
|
||||
tk.Button(cfg, text="Parcourir", command=self._cfg_browse).grid(row=0, column=2)
|
||||
tk.Button(cfg, text="Créer/Charger", command=self._load_cfg).grid(row=0, column=3, padx=4)
|
||||
tk.Button(cfg, text="Sauver", command=self._save_cfg).grid(row=0, column=4)
|
||||
tk.Button(cfg, text="Recharger", command=self._reload_cfg).grid(row=0, column=5, padx=4)
|
||||
tk.Button(cfg, text="Restaurer défauts", command=self._restore_defaults).grid(row=0, column=6)
|
||||
cfg.grid_columnconfigure(1, weight=1)
|
||||
ToolTip(cfg, "Les règles définissent ce qu’il faut masquer (blacklist), ce qu’il faut garder (whitelist) et les modèles personnalisés.")
|
||||
|
||||
# Créateur de règle
|
||||
rc = tk.LabelFrame(adv, text="Créer rapidement une règle", padx=8, pady=8)
|
||||
rc.pack(fill=tk.X, pady=6)
|
||||
tk.Label(rc, text="Exemple (copiez/collez une ligne du PDF) :").grid(row=0, column=0, sticky="w")
|
||||
self.rule_example = tk.Entry(rc, width=80); self.rule_example.grid(row=0, column=1, columnspan=4, sticky="we", padx=6)
|
||||
tk.Label(rc, text="Type de modèle :").grid(row=1, column=0, sticky="e")
|
||||
self.rule_type = ttk.Combobox(rc, values=["Mot exact", "Forme proche", "Modèle avancé"], state="readonly"); self.rule_type.set("Mot exact")
|
||||
self.rule_type.grid(row=1, column=1, sticky="w")
|
||||
ToolTip(self.rule_type, "Mot exact : masque exactement ce que vous tapez.\nForme proche : tolère espaces/variantes.\nModèle avancé : expression régulière (pour experts).")
|
||||
tk.Label(rc, text="Remplacer par :").grid(row=1, column=2, sticky="e")
|
||||
self.rule_placeholder = tk.Entry(rc, width=18); self.rule_placeholder.insert(0, "[MASK]"); self.rule_placeholder.grid(row=1, column=3, sticky="w")
|
||||
tk.Label(rc, text="Où appliquer :").grid(row=1, column=4, sticky="e")
|
||||
self.rule_scope = ttk.Combobox(rc, values=["partout", "narratif", "tables_valeur", "entetes_pieds"], state="readonly"); self.rule_scope.set("partout"); self.rule_scope.grid(row=1, column=5, sticky="w")
|
||||
self.flag_ic = tk.BooleanVar(value=True); self.flag_bow = tk.BooleanVar(value=True)
|
||||
tk.Checkbutton(rc, text="Ignorer la casse (A=a)", variable=self.flag_ic).grid(row=2, column=1, sticky="w")
|
||||
tk.Checkbutton(rc, text="Respecter les mots entiers", variable=self.flag_bow).grid(row=2, column=2, sticky="w")
|
||||
tk.Button(rc, text="Prévisualiser", command=self._preview_rule).grid(row=2, column=4)
|
||||
tk.Button(rc, text="Enregistrer la règle", command=self._save_rule).grid(row=2, column=5)
|
||||
|
||||
# ----- YAML helpers -----
|
||||
def _ensure_cfg_exists(self):
|
||||
p = Path(self.cfg_path.get())
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
if not p.exists():
|
||||
p.write_text(RUNTIME_CFG_TEXT, encoding="utf-8")
|
||||
|
||||
def _cfg_browse(self):
|
||||
d = filedialog.asksaveasfilename(defaultextension=".yml", filetypes=[("YAML","*.yml *.yaml"), ("Tous","*.*")])
|
||||
if d:
|
||||
self.cfg_path.set(d)
|
||||
|
||||
def _load_cfg(self):
|
||||
if yaml is None:
|
||||
messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml).")
|
||||
return
|
||||
self._ensure_cfg_exists()
|
||||
try:
|
||||
with open(self.cfg_path.get(), "r", encoding="utf-8") as f:
|
||||
self.cfg_data = yaml.safe_load(f) or {}
|
||||
self._log(f"Règles chargées depuis : {self.cfg_path.get()}")
|
||||
except Exception as e:
|
||||
# Auto-fix : convertir pattern: "..." en bloc littéral
|
||||
try:
|
||||
raw = Path(self.cfg_path.get()).read_text(encoding="utf-8")
|
||||
fixed = re.sub(r"(^\s*pattern\s*:\s*)(\"[^\n]*\")", r"\1|-\n \2", raw, flags=re.MULTILINE)
|
||||
if fixed != raw:
|
||||
Path(self.cfg_path.get()).write_text(fixed, encoding="utf-8")
|
||||
with open(self.cfg_path.get(), "r", encoding="utf-8") as f:
|
||||
self.cfg_data = yaml.safe_load(f) or {}
|
||||
self._log("Le fichier YAML contenait des guillemets problématiques. Correction automatique appliquée.")
|
||||
else:
|
||||
raise
|
||||
except Exception as e2:
|
||||
messagebox.showerror("Fichier de règles invalide", f"Impossible de charger le YAML:\n{e}\n\nEssayez de restaurer les valeurs par défaut.")
|
||||
|
||||
def _save_cfg(self):
|
||||
if yaml is None:
|
||||
messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml).")
|
||||
return
|
||||
try:
|
||||
with open(self.cfg_path.get(), "w", encoding="utf-8") as f:
|
||||
yaml.safe_dump(self.cfg_data or {}, f, allow_unicode=True, sort_keys=False)
|
||||
self._log("Règles sauvegardées.")
|
||||
except Exception as e:
|
||||
messagebox.showerror("Erreur", f"Impossible d'écrire le fichier de règles: {e}")
|
||||
|
||||
def _reload_cfg(self):
|
||||
self._load_cfg(); self._log("Règles rechargées.")
|
||||
|
||||
def _restore_defaults(self):
|
||||
try:
|
||||
Path(self.cfg_path.get()).write_text(RUNTIME_CFG_TEXT, encoding="utf-8")
|
||||
self._log("Surcharge locale réinitialisée.")
|
||||
self._load_cfg()
|
||||
except Exception as e:
|
||||
messagebox.showerror("Erreur", f"Impossible d'écrire le YAML par défaut: {e}")
|
||||
|
||||
# ----- Règles rapides -----
|
||||
def _build_simple_regex(self, sample: str, bow: bool) -> str:
|
||||
s = sample.strip()
|
||||
s = re.sub(r"\s+", r"\\s+", re.escape(s))
|
||||
return rf"\b{s}\b" if bow else s
|
||||
|
||||
def _preview_rule(self):
|
||||
sample = self.rule_example.get().strip()
|
||||
if not sample:
|
||||
messagebox.showinfo("Info", "Exemple vide."); return
|
||||
rtype = self.rule_type.get(); ic = self.flag_ic.get(); bow = self.flag_bow.get()
|
||||
placeholder = self.rule_placeholder.get().strip() or "[MASK]"
|
||||
|
||||
if rtype == "Mot exact":
|
||||
pattern = self._build_simple_regex(sample, bow)
|
||||
elif rtype == "Forme proche":
|
||||
pattern = self._build_simple_regex(sample, bow)
|
||||
else:
|
||||
pattern = sample # modèle avancé (regex)
|
||||
|
||||
try:
|
||||
rx = re.compile(pattern, re.IGNORECASE if ic else 0)
|
||||
except Exception as e:
|
||||
messagebox.showerror("Modèle invalide", str(e)); return
|
||||
|
||||
# Prévisualisation sur le premier PDF du dossier
|
||||
folder = Path(self.dir_var.get().strip())
|
||||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()]) if folder.is_dir() else []
|
||||
if not pdfs:
|
||||
messagebox.showinfo("Info", "Aucun PDF pour prévisualiser."); return
|
||||
try:
|
||||
pages_text, tables_lines = core.extract_text_two_passes(pdfs[0]) # type: ignore[attr-defined]
|
||||
text = "\n".join(pages_text) + "\n\n" + "\n".join("\n".join(r) for r in tables_lines)
|
||||
hits = len(rx.findall(text))
|
||||
self._log(f"Prévisualisation : {hits} occurrence(s) sur {pdfs[0].name}")
|
||||
except Exception as e:
|
||||
self._log(f"Prévisualisation indisponible: {e}")
|
||||
|
||||
def _save_rule(self):
|
||||
if yaml is None:
|
||||
messagebox.showerror("PyYAML manquant", "Installez PyYAML (pip install pyyaml).")
|
||||
return
|
||||
sample = self.rule_example.get().strip()
|
||||
if not sample:
|
||||
messagebox.showinfo("Info", "Exemple vide."); return
|
||||
rtype = self.rule_type.get(); ic = self.flag_ic.get(); bow = self.flag_bow.get()
|
||||
placeholder = self.rule_placeholder.get().strip() or "[MASK]"
|
||||
scope = self.rule_scope.get()
|
||||
|
||||
cfg = self.cfg_data or {}
|
||||
cfg.setdefault("blacklist", {})
|
||||
cfg.setdefault("regex_overrides", [])
|
||||
|
||||
if rtype in ("Mot exact", "Forme proche"):
|
||||
# On utilise la blacklist simple
|
||||
if rtype == "Mot exact":
|
||||
lst = cfg["blacklist"].setdefault("force_mask_terms", [])
|
||||
if sample not in lst:
|
||||
lst.append(sample)
|
||||
else:
|
||||
pattern = self._build_simple_regex(sample, bow)
|
||||
lst = cfg["blacklist"].setdefault("force_mask_regex", [])
|
||||
if pattern not in lst:
|
||||
lst.append(pattern)
|
||||
else:
|
||||
# Modèle avancé → override avec placeholder explicite
|
||||
entry = {
|
||||
"name": f"custom_{len(cfg['regex_overrides'])+1}",
|
||||
"pattern": sample,
|
||||
"placeholder": placeholder,
|
||||
"flags": ["IGNORECASE"] if ic else [],
|
||||
"scope": scope,
|
||||
}
|
||||
cfg["regex_overrides"].append(entry)
|
||||
|
||||
self.cfg_data = cfg
|
||||
self._save_cfg()
|
||||
self._log("Règle ajoutée. Cliquez sur Recharger pour l'appliquer.")
|
||||
|
||||
# ----- Actions -----
|
||||
def _browse(self):
|
||||
d = filedialog.askdirectory()
|
||||
if d:
|
||||
self.dir_var.set(d)
|
||||
|
||||
def _run(self):
|
||||
folder = Path(self.dir_var.get().strip())
|
||||
if not folder.is_dir():
|
||||
messagebox.showwarning("Dossier invalide", "Choisissez un dossier contenant des PDF.")
|
||||
return
|
||||
self.btn_run.config(state=tk.DISABLED)
|
||||
threading.Thread(target=self._worker, args=(folder,), daemon=True).start()
|
||||
|
||||
def _worker(self, folder: Path):
|
||||
try:
|
||||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
|
||||
if not pdfs:
|
||||
self._log("Aucun PDF trouvé.")
|
||||
return
|
||||
outdir = folder / "pseudonymise"
|
||||
outdir.mkdir(exist_ok=True)
|
||||
ok = ko = 0
|
||||
global_counts: Dict[str,int] = {}
|
||||
for i, pdf in enumerate(pdfs, start=1):
|
||||
self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}")
|
||||
make_vec = (self.format_var.get() == "vector")
|
||||
make_ras = (self.format_var.get() == "raster")
|
||||
try:
|
||||
outputs = core.process_pdf(
|
||||
pdf_path=pdf,
|
||||
out_dir=outdir,
|
||||
make_vector_redaction=make_vec,
|
||||
also_make_raster_burn=make_ras,
|
||||
config_path=Path(self.cfg_path.get()),
|
||||
)
|
||||
self._log("✓ " + pdf.name)
|
||||
for k, v in outputs.items():
|
||||
self._log(f" - {k}: {v}")
|
||||
# Résumé par doc (compte des remplacements)
|
||||
audit_path = Path(outputs.get("audit", ""))
|
||||
counts = self._count_audit(audit_path)
|
||||
if counts:
|
||||
self._log(" ~ résumé : " + ", ".join(f"{k}={v}" for k, v in sorted(counts.items())))
|
||||
for k,v in counts.items():
|
||||
global_counts[k] = global_counts.get(k,0)+v
|
||||
ok += 1
|
||||
except Exception as e:
|
||||
self._log(f"✗ {pdf.name} → ERREUR: {e}")
|
||||
ko += 1
|
||||
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
|
||||
if ok:
|
||||
self._log("—")
|
||||
self._log("RÉSUMÉ DU LOT : " + ", ".join(f"{k}={v}" for k, v in sorted(global_counts.items())))
|
||||
self.btn_open_out.config(state=tk.NORMAL)
|
||||
self._last_outdir = outdir
|
||||
finally:
|
||||
self.btn_run.config(state=tk.NORMAL)
|
||||
|
||||
def _count_audit(self, audit_path: Path) -> Dict[str,int]:
|
||||
d: Dict[str,int] = {}
|
||||
try:
|
||||
with open(audit_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
k = obj.get("kind", "?")
|
||||
d[k] = d.get(k,0)+1
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
return d
|
||||
|
||||
def _open_out(self):
|
||||
p = getattr(self, "_last_outdir", None)
|
||||
if p:
|
||||
open_folder(p)
|
||||
|
||||
def _pump_logs(self):
|
||||
try:
|
||||
while True:
|
||||
msg = self.queue.get_nowait()
|
||||
self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
|
||||
except queue.Empty:
|
||||
pass
|
||||
finally:
|
||||
self.root.after(60, self._pump_logs)
|
||||
|
||||
def _log(self, msg: str):
|
||||
self.queue.put(msg)
|
||||
|
||||
def _show_help(self):
|
||||
messagebox.showinfo(
|
||||
"Aide (2 minutes)",
|
||||
"1) Choisissez un dossier avec vos PDF.\n"
|
||||
"2) Choisissez le format du document final.\n"
|
||||
" - PDF anonymisé (léger) : texte supprimé + boîtes noires (sélection possible).\n"
|
||||
" - PDF image (très sûr) : chaque page en image, aucun texte résiduel.\n"
|
||||
"3) Cliquez sur Anonymiser.\n"
|
||||
"4) Ouvrez le dossier de résultats pour vérifier.\n"
|
||||
"5) Onglet Avancé : ajustez les règles si besoin (mots à garder, à masquer, modèles).",
|
||||
)
|
||||
|
||||
# ---------- main ----------
|
||||
if __name__ == "__main__":
|
||||
root = tk.Tk()
|
||||
App(root)
|
||||
root.mainloop()
|
||||
627
archives/legacy_gui/pseudonymisation_pipeline_robuste.py
Normal file
627
archives/legacy_gui/pseudonymisation_pipeline_robuste.py
Normal file
@@ -0,0 +1,627 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os, re, sys, json, queue, hashlib, warnings, threading, subprocess, unicodedata
|
||||
from dataclasses import dataclass, asdict
|
||||
from pathlib import Path
|
||||
from typing import List, Tuple, Optional, Dict
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
# GUI
|
||||
import tkinter as tk
|
||||
from tkinter import filedialog, messagebox, ttk
|
||||
|
||||
# Core
|
||||
import pdfplumber
|
||||
import requests
|
||||
import spacy
|
||||
from spacy.util import load_model_from_path
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except Exception:
|
||||
yaml = None
|
||||
|
||||
APP_TITLE = "Pseudonymisation (Robuste + Backbones)"
|
||||
MODEL_DIR_NAME = "fr_core_news_lg"
|
||||
|
||||
# ----------- Utilitaires & Unicode -----------
|
||||
|
||||
def resolve_base_dir() -> Path:
|
||||
return Path(getattr(sys, "_MEIPASS", Path(__file__).resolve().parent))
|
||||
|
||||
def sha256(s: str) -> str:
|
||||
h = hashlib.sha256(); h.update(s.encode("utf-8", errors="ignore")); return h.hexdigest()
|
||||
|
||||
def normalize_text(s: str) -> str:
|
||||
if not s: return ""
|
||||
s = unicodedata.normalize("NFKC", s)
|
||||
s = s.replace("fi","fi").replace("fl","fl")
|
||||
s = s.replace("“","\"").replace("”","\"").replace("’","'").replace("«","\"").replace("»","\"")
|
||||
s = s.replace("\u00A0"," ")
|
||||
s = re.sub(r"[\u0000-\u001f]", " ", s)
|
||||
s = re.sub(r"\s+", " ", s).strip()
|
||||
return s
|
||||
|
||||
def find_model_dir(root: Path) -> Optional[Path]:
|
||||
if (root / "config.cfg").exists() and (root / "meta.json").exists():
|
||||
return root
|
||||
for p in root.rglob("config.cfg"):
|
||||
if (p.parent / "meta.json").exists():
|
||||
return p.parent
|
||||
return None
|
||||
|
||||
# ----------- Règles & Whitelist -----------
|
||||
|
||||
DEFAULT_WHITELIST = {
|
||||
"PMSI","T2A","GHM","GHS","DP","DR","DAS","RUM","UM","UF","CMA","CMD","CIM","CIM-10","CCAM","NGAP","NABM","ICD","ICD-10",
|
||||
"CHU","CH","CLCC","SSR","USI","USC","USLD","UHCD","SAU","UCA","HDJ","HAD","EHPAD","CMP","SMUR","SAMU","DIM",
|
||||
"IRM","TDM","TEP","RX","ETT","ETO","ECG","EEG","EMG","EFR","BHC",
|
||||
"NFS","CRP","VS","HB","HT","TSH","T3","T4","ASAT","ALAT","GGT","LDH","BNP","NTPROBNP","DFG","INR","PAO2","PACO2","SPO2","TA","FC","IMC","BMI",
|
||||
"IGS2","SAPS2","APACHE","SOFA","NEWS","HAS","ARS",
|
||||
"FINESS","OGC",
|
||||
}
|
||||
|
||||
EMAIL_RE = re.compile(r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b")
|
||||
PHONE_RE = re.compile(r"(?:\+33|0)[1-9](?:[ .-]?\d{2}){4}\b")
|
||||
IPP_RE = re.compile(r"\bIPP[: ]?\d{6,10}\b", re.IGNORECASE)
|
||||
IBAN_RE = re.compile(r"\b[A-Z]{2}\d{2}[A-Z0-9]{11,30}\b")
|
||||
NIR_RAW_RE = re.compile(r"\b(\d{13})(\d{2})\b")
|
||||
FINESS_LINE_RE = re.compile(r"\bFINESS\s*:\s*\d{9}\b", re.IGNORECASE)
|
||||
OGC_LINE_RE = re.compile(r"N[°º]?\s*OGC\s*:\s*\d+", re.IGNORECASE)
|
||||
ETAB_LINE_RE = re.compile(r"Etablissement\s*:\s*.*", re.IGNORECASE)
|
||||
PRATICIEN_LINE_RE = re.compile(r"Nom du praticien[- ]conseil\s*:\s*.*", re.IGNORECASE)
|
||||
DIM_LINE_RE = re.compile(r"Nom du m[ée]decin du DIM\s*:\s*.*", re.IGNORECASE)
|
||||
DR_MAJ_RE = re.compile(r"Dr\s+[A-ZÀ-Ü' \-]{2,}")
|
||||
NOMS_MAJ_RE = re.compile(r"(?<![A-Z])(?:[A-ZÀ-Ü’\-]{2,}\s+){1,}[A-ZÀ-Ü’\-]{2,}")
|
||||
|
||||
DATE_PATTERNS = [
|
||||
(re.compile(r"\b(\d{2})/(\d{2})/(\d{4})\b"), "%d/%m/%Y"),
|
||||
(re.compile(r"\b(\d{4})-(\d{2})-(\d{2})\b"), "%Y-%m-%d"),
|
||||
]
|
||||
|
||||
DEFAULT_KEEP_FIELDS = ["Etablissement", "FINESS", "N° OGC", "Dates de séjour", "Service", "RUM", "UM"]
|
||||
|
||||
def nir_is_valid(nir13: str, cle2: str) -> bool:
|
||||
try:
|
||||
n = int(nir13); k = int(cle2)
|
||||
return (97 - (n % 97)) == k
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
# ----------- Modèle avancé HF (cascade) -----------
|
||||
|
||||
MODEL_PRESETS = {
|
||||
"CamemBERT NER (Jean-Baptiste)": "Jean-Baptiste/camembert-ner", # NER prêt à l'emploi
|
||||
"CamemBERT-bio (base LM)": "almanach/camembert-base-bio", # base LM, pas NER -> pour tests / remplacez par un NER biomédical si vous en avez un
|
||||
"DrBERT (base LM)": "Dr-BERT/DrBERT-7GB", # base LM, pas NER -> idem
|
||||
}
|
||||
|
||||
class AdvancedHF:
|
||||
def __init__(self, model_id: str, cache_dir: Path, status_cb=None):
|
||||
self.model_id = model_id
|
||||
self.cache_dir = cache_dir
|
||||
self.pipe = None
|
||||
self.status_cb = status_cb or (lambda msg: None)
|
||||
|
||||
def load(self) -> Tuple[bool, str]:
|
||||
try:
|
||||
os.environ["HF_HOME"] = str(self.cache_dir)
|
||||
self.status_cb("Initialisation Transformers…")
|
||||
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline, AutoModel
|
||||
# sentencepiece requis pour camembert/drbert
|
||||
try:
|
||||
import sentencepiece # noqa: F401
|
||||
except Exception:
|
||||
return False, "Dépendance 'sentencepiece' manquante. Installez-la puis rebuild."
|
||||
|
||||
self.status_cb("Chargement tokenizer…")
|
||||
tok = AutoTokenizer.from_pretrained(self.model_id)
|
||||
|
||||
self.status_cb("Chargement modèle (peut prendre 1–2 min la 1ère fois)…")
|
||||
mdl = None
|
||||
try:
|
||||
mdl = AutoModelForTokenClassification.from_pretrained(self.model_id)
|
||||
head_ok = True
|
||||
except Exception as e:
|
||||
# si ce n'est pas un modèle NER, on télécharge au moins la base pour le cache
|
||||
self.status_cb("Le modèle semble être un 'base LM'. Téléchargement de la base pour cache…")
|
||||
try:
|
||||
AutoModel.from_pretrained(self.model_id)
|
||||
except Exception:
|
||||
pass
|
||||
return False, ("Le modèle sélectionné ne semble pas être un modèle NER (token-classification). "
|
||||
"Choisissez un ID fine-tuné pour le NER (ex. 'Jean-Baptiste/camembert-ner').")
|
||||
|
||||
try:
|
||||
import torch
|
||||
torch.set_num_threads(1)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self.pipe = pipeline("token-classification", model=mdl, tokenizer=tok,
|
||||
aggregation_strategy="simple", device=-1)
|
||||
return True, f"Modèle avancé prêt: {self.model_id}"
|
||||
except Exception as e:
|
||||
msg = str(e)
|
||||
if "sentencepiece" in msg.lower():
|
||||
return False, "Échec: 'sentencepiece' requis."
|
||||
return False, f"Échec modèle avancé: {e}"
|
||||
|
||||
def apply(self, text: str) -> Tuple[str, List[Tuple[int,int,str,str]]]:
|
||||
if not self.pipe: return text, []
|
||||
res = self.pipe(text)
|
||||
spans=[]
|
||||
for r in res:
|
||||
grp = r.get("entity_group") or r.get("entity") or ""
|
||||
start, end = int(r["start"]), int(r["end"])
|
||||
if grp.startswith("PER"):
|
||||
rep = "[NOM]"
|
||||
elif grp.startswith("ORG"):
|
||||
rep = "[ETABLISSEMENT]"
|
||||
elif grp in ("LOC","GPE") or grp.startswith("LOC"):
|
||||
rep = "[VILLE]"
|
||||
else:
|
||||
continue
|
||||
spans.append((start,end,rep,text[start:end]))
|
||||
if not spans: return text, []
|
||||
spans.sort(key=lambda x:x[0])
|
||||
out=[]; last=0; audit=[]
|
||||
for s,e,rep,raw in spans:
|
||||
if s<last: continue
|
||||
out.append(text[last:s]); out.append(rep); last=e
|
||||
audit.append((s,e,rep,raw))
|
||||
out.append(text[last:])
|
||||
return "".join(out), audit
|
||||
|
||||
# ----------- Moteur Robuste -----------
|
||||
|
||||
@dataclass
|
||||
class Replacement:
|
||||
kind: str
|
||||
page: Optional[int]
|
||||
text_hash: str
|
||||
replacement: str
|
||||
|
||||
class RobustEngine:
|
||||
def __init__(self, config: Dict):
|
||||
self.nlp = None
|
||||
self.use_ner = False
|
||||
self.date_policy = config.get("policy",{}).get("dates","keep")
|
||||
self.date_shift_days = int(config.get("policy",{}).get("shift_days",0))
|
||||
self.whitelist = set(config.get("whitelist",{}).get("tokens", list(DEFAULT_WHITELIST)))
|
||||
self.keep_fields = config.get("tables",{}).get("keep_fields", list(DEFAULT_KEEP_FIELDS))
|
||||
self.apply_ner_on_narr = True
|
||||
# HF
|
||||
adv = config.get("advanced", {})
|
||||
self.adv_model_id = adv.get("hf_model_id", list(MODEL_PRESETS.values())[0])
|
||||
self.adv_cache_dir = Path(os.environ.get("LOCALAPPDATA", resolve_base_dir())) / "Pseudonymiseur" / "models" / "hf_cache"
|
||||
self.hf: Optional[AdvancedHF] = None
|
||||
|
||||
# spaCy
|
||||
def try_load_spacy(self, custom_dir: Optional[Path]=None) -> Tuple[bool,str]:
|
||||
candidates = []
|
||||
if custom_dir: candidates.append(custom_dir)
|
||||
candidates.append(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||||
for c in candidates:
|
||||
if c.exists():
|
||||
real = find_model_dir(c)
|
||||
if real:
|
||||
try:
|
||||
self.nlp = load_model_from_path(real); self.use_ner=True
|
||||
return True, f"Local: {real}"
|
||||
except Exception as e:
|
||||
warnings.warn(f"Echec load local {real}: {e}")
|
||||
try:
|
||||
self.nlp = spacy.load(MODEL_DIR_NAME); self.use_ner=True
|
||||
return True, f"spacy.load('{MODEL_DIR_NAME}')"
|
||||
except Exception as e:
|
||||
self.nlp=None; self.use_ner=False
|
||||
return False, f"Indisponible: {e}"
|
||||
|
||||
# Dates
|
||||
def transform_dates(self, text: str) -> str:
|
||||
if self.date_policy == "keep": return text
|
||||
def as_mo_year(m, fmt):
|
||||
try: return datetime.strptime(m.group(0), fmt).strftime("%m/%Y")
|
||||
except: return m.group(0)
|
||||
def shift(m, fmt):
|
||||
try:
|
||||
dt = datetime.strptime(m.group(0), fmt) + timedelta(days=self.date_shift_days)
|
||||
return dt.strftime(fmt)
|
||||
except: return m.group(0)
|
||||
for rx,fmt in DATE_PATTERNS:
|
||||
if self.date_policy=="month_year": text = rx.sub(lambda m: as_mo_year(m,fmt), text)
|
||||
elif self.date_policy=="shift": text = rx.sub(lambda m: shift(m,fmt), text)
|
||||
return text
|
||||
|
||||
# Regex ciblées
|
||||
def regex_pass(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||||
repls: List[Replacement] = []
|
||||
def add(kind, val, placeholder): repls.append(Replacement(kind, page, sha256(val)[:8], placeholder))
|
||||
def sub_line(rx, placeholder, s):
|
||||
return rx.sub(lambda m: (add("RULE", m.group(0), placeholder) or placeholder), s)
|
||||
|
||||
text = sub_line(ETAB_LINE_RE, "[ETABLISSEMENT]", text)
|
||||
text = sub_line(FINESS_LINE_RE, "[FINESS]", text)
|
||||
text = sub_line(OGC_LINE_RE, "[OGC]", text)
|
||||
text = sub_line(PRATICIEN_LINE_RE, "[NOM_MEDECIN]", text)
|
||||
text = sub_line(DIM_LINE_RE, "[NOM_MEDECIN]", text)
|
||||
text = sub_line(DR_MAJ_RE, "[NOM_MEDECIN]", text)
|
||||
|
||||
for rx, ph, kind in [
|
||||
(EMAIL_RE, "[EMAIL]", "EMAIL"),
|
||||
(PHONE_RE, "[TEL]", "TEL"),
|
||||
(IPP_RE, "[IPP]", "IPP"),
|
||||
(IBAN_RE, "[IBAN]","IBAN"),
|
||||
]:
|
||||
text = rx.sub(lambda m: (repls.append(Replacement(kind,page,sha256(m.group(0))[:8],ph)) or ph), text)
|
||||
|
||||
def _nir(m):
|
||||
nir13, cle2 = m.group(1), m.group(2)
|
||||
if nir_is_valid(nir13, cle2):
|
||||
repls.append(Replacement("NIR", page, sha256(m.group(0))[:8], "[NIR]")); return "[NIR]"
|
||||
return m.group(0)
|
||||
text = NIR_RAW_RE.sub(_nir, text)
|
||||
|
||||
def repl_noms_maj(m):
|
||||
cand = m.group(0)
|
||||
tokens = re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
|
||||
if all(t in self.whitelist for t in tokens): return cand
|
||||
repls.append(Replacement("NOM", page, sha256(cand)[:8], "[NOM]")); return "[NOM]"
|
||||
text = NOMS_MAJ_RE.sub(repl_noms_maj, text)
|
||||
|
||||
return text, repls
|
||||
|
||||
# NER spaCy
|
||||
def ner_pass_spacy(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||||
if not self.use_ner or not self.nlp: return text, []
|
||||
doc = self.nlp(text)
|
||||
spans=[]
|
||||
for ent in doc.ents:
|
||||
lab = ent.label_
|
||||
if lab in ("DATE","TIME"): continue
|
||||
if lab=="PERSON": rep="[NOM]"
|
||||
elif lab=="ORG": rep="[ETABLISSEMENT]"
|
||||
elif lab in ("GPE","LOC","FAC"): rep="[VILLE]"
|
||||
else: continue
|
||||
spans.append((ent.start_char, ent.end_char, rep, ent.text))
|
||||
if not spans: return text, []
|
||||
spans.sort(key=lambda x:x[0])
|
||||
out=[]; last=0; repls=[]
|
||||
for s,e,rep,raw in spans:
|
||||
if s<last: continue
|
||||
out.append(text[last:s]); out.append(rep); last=e
|
||||
repls.append(Replacement("NER", page, sha256(raw)[:8], rep))
|
||||
out.append(text[last:])
|
||||
return "".join(out), repls
|
||||
|
||||
# HF
|
||||
def ensure_hf(self, status_cb=None) -> Tuple[bool,str]:
|
||||
if self.hf: return True, "Déjà prêt."
|
||||
self.hf = AdvancedHF(self.adv_model_id, self.adv_cache_dir, status_cb=status_cb)
|
||||
return self.hf.load()
|
||||
|
||||
def ner_pass_hf(self, text: str, page: Optional[int]) -> Tuple[str, List[Replacement]]:
|
||||
if not self.hf: return text, []
|
||||
t2, aud = self.hf.apply(text)
|
||||
repls=[Replacement("HF", page, sha256(raw)[:8], rep) for (_s,_e,rep,raw) in aud]
|
||||
return t2, repls
|
||||
|
||||
# Filet sécurité
|
||||
def safety_rescan(self, text: str) -> str:
|
||||
for rx,ph in [(FINESS_LINE_RE,"[FINESS]"),(OGC_LINE_RE,"[OGC]"),(ETAB_LINE_RE,"[ETABLISSEMENT]"),
|
||||
(PRATICIEN_LINE_RE,"[NOM_MEDECIN]"),(DIM_LINE_RE,"[NOM_MEDECIN]"),(DR_MAJ_RE,"[NOM_MEDECIN]")]:
|
||||
text = rx.sub(ph, text)
|
||||
text = EMAIL_RE.sub("[EMAIL]", text)
|
||||
text = PHONE_RE.sub("[TEL]", text)
|
||||
text = IPP_RE.sub("[IPP]", text)
|
||||
text = IBAN_RE.sub("[IBAN]", text)
|
||||
def _nir(m): return "[NIR]" if nir_is_valid(m.group(1), m.group(2)) else m.group(0)
|
||||
text = NIR_RAW_RE.sub(_nir, text)
|
||||
def _maj(m):
|
||||
cand=m.group(0); toks=re.findall(r"[A-ZÀ-Ü’\-]{2,}", cand)
|
||||
return cand if all(t in self.whitelist for t in toks) else "[NOM]"
|
||||
return NOMS_MAJ_RE.sub(_maj, text)
|
||||
|
||||
# ----------- PDF Processor -----------
|
||||
|
||||
class PDFProcessor:
|
||||
def __init__(self, engine: RobustEngine, options: Dict):
|
||||
self.engine=engine; self.options=options
|
||||
|
||||
def process_pdf(self, pdf_path: Path) -> Tuple[str, List[Replacement], bool]:
|
||||
chunks=[]; audit=[]; scanned_like=True
|
||||
with pdfplumber.open(str(pdf_path)) as pdf:
|
||||
for p_idx, page in enumerate(pdf.pages, start=1):
|
||||
page_chunks=[]
|
||||
# Tables
|
||||
try: tables = page.extract_tables()
|
||||
except Exception: tables=[]
|
||||
if tables:
|
||||
scanned_like=False
|
||||
lines_all=[]
|
||||
for t in tables:
|
||||
rows=[[normalize_text(c or "") for c in row] for row in t]
|
||||
text_lines, reps = self._handle_table(rows, p_idx)
|
||||
audit += reps; lines_all += text_lines
|
||||
if self.options.get("keep_tables", True) and lines_all:
|
||||
page_chunks.append("[TABLES]\n" + "\n".join(lines_all) + "\n[/TABLES]")
|
||||
# Narratif
|
||||
try:
|
||||
txt = page.extract_text(x_tolerance=1.5, y_tolerance=3.0) or ""
|
||||
except Exception:
|
||||
txt=""
|
||||
txt=normalize_text(txt)
|
||||
if txt.strip():
|
||||
scanned_like=False
|
||||
txt = self.engine.transform_dates(txt)
|
||||
t1, r1 = self.engine.regex_pass(txt, p_idx)
|
||||
if self.options.get("apply_ner_on_narrative", True) and self.engine.use_ner:
|
||||
t2, r2 = self.engine.ner_pass_spacy(t1, p_idx)
|
||||
else:
|
||||
t2, r2 = t1, []
|
||||
if self.options.get("aggressive_hf", False) and self.engine.hf:
|
||||
t3, r3 = self.engine.ner_pass_hf(t2, p_idx)
|
||||
else:
|
||||
t3, r3 = t2, []
|
||||
audit += (r1+r2+r3)
|
||||
page_chunks.append(t3)
|
||||
if page_chunks:
|
||||
chunks.append(f"\n===== PAGE {p_idx} =====\n" + "\n\n".join(page_chunks))
|
||||
final_text=("\n\n").join(chunks).strip()
|
||||
if self.options.get("safety_rescan", True):
|
||||
final_text=self.engine.safety_rescan(final_text)
|
||||
return final_text, audit, scanned_like
|
||||
|
||||
def _handle_table(self, rows: List[List[str]], page: int) -> Tuple[List[str], List[Replacement]]:
|
||||
out_lines=[]; repls=[]
|
||||
for row in rows:
|
||||
if not any(row): continue
|
||||
line = "; ".join([c for c in row if c]);
|
||||
if not line: continue
|
||||
t, rr = self.engine.regex_pass(self.engine.transform_dates(line), page); repls += rr
|
||||
kept=False
|
||||
for k in self.engine.keep_fields:
|
||||
if re.search(rf"(?i)\b{k}\b", t):
|
||||
out_lines.append(t); kept=True; break
|
||||
if not kept:
|
||||
pass
|
||||
return out_lines, repls
|
||||
|
||||
# ----------- GUI -----------
|
||||
|
||||
def load_config() -> Dict:
|
||||
cfg = {
|
||||
"whitelist": {"tokens": list(DEFAULT_WHITELIST)},
|
||||
"tables": {"keep_fields": list(DEFAULT_KEEP_FIELDS)},
|
||||
"policy": {"dates":"keep", "shift_days":0},
|
||||
"advanced": {"hf_model_id": list(MODEL_PRESETS.values())[0]},
|
||||
}
|
||||
cfg_path = resolve_base_dir() / "config.yaml"
|
||||
try:
|
||||
if yaml and cfg_path.exists():
|
||||
with cfg_path.open("r", encoding="utf-8") as f:
|
||||
user_cfg = yaml.safe_load(f) or {}
|
||||
for k,v in user_cfg.items():
|
||||
if isinstance(v, dict) and k in cfg: cfg[k].update(v)
|
||||
else: cfg[k]=v
|
||||
except Exception:
|
||||
pass
|
||||
return cfg
|
||||
|
||||
class App:
|
||||
def __init__(self, root: tk.Tk):
|
||||
self.root=root; self.root.title(APP_TITLE); self.root.geometry("1100x780")
|
||||
self.dir_var = tk.StringVar(); self.status_var = tk.StringVar(value="Prêt.")
|
||||
self.model_status_var = tk.StringVar(value="Vérification du modèle spaCy…")
|
||||
self.hf_status_var = tk.StringVar(value="Modèle avancé HF : inactif")
|
||||
self.regex_only = tk.BooleanVar(value=False)
|
||||
self.keep_tables = tk.BooleanVar(value=True)
|
||||
self.apply_ner_on_narr = tk.BooleanVar(value=True)
|
||||
self.safety_rescan = tk.BooleanVar(value=True)
|
||||
self.aggressive_hf = tk.BooleanVar(value=False)
|
||||
self.date_policy = tk.StringVar(value="keep")
|
||||
self.date_shift_days = tk.StringVar(value="0")
|
||||
self.hf_model_label = tk.StringVar(value=list(MODEL_PRESETS.keys())[0])
|
||||
self.hf_model_id = tk.StringVar(value=list(MODEL_PRESETS.values())[0])
|
||||
self.queue: "queue.Queue[str]" = queue.Queue()
|
||||
|
||||
self.config = load_config()
|
||||
self.engine = RobustEngine(self.config)
|
||||
self.engine.adv_cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self._build_ui()
|
||||
self._pump_logs()
|
||||
|
||||
self.root.after(250, self._ensure_spacy)
|
||||
|
||||
def _build_ui(self):
|
||||
top = tk.Frame(self.root, padx=10, pady=10); top.pack(fill=tk.BOTH, expand=True)
|
||||
|
||||
# Ligne dossier
|
||||
row1 = tk.Frame(top); row1.pack(fill=tk.X)
|
||||
tk.Label(row1, text="Dossier PDF :").pack(side=tk.LEFT)
|
||||
tk.Entry(row1, textvariable=self.dir_var).pack(side=tk.LEFT, fill=tk.X, expand=True, padx=6)
|
||||
tk.Button(row1, text="Parcourir…", command=self._browse).pack(side=tk.LEFT, padx=3)
|
||||
self.btn_run = tk.Button(row1, text="Lancer", command=self._run, state=tk.DISABLED)
|
||||
self.btn_run.pack(side=tk.LEFT, padx=3)
|
||||
|
||||
# Carte spaCy
|
||||
card = tk.LabelFrame(top, text="Modèle spaCy (FR)", padx=8, pady=8); card.pack(fill=tk.X, pady=6)
|
||||
tk.Label(card, textvariable=self.model_status_var, anchor="w").pack(fill=tk.X)
|
||||
pfrm = tk.Frame(card); pfrm.pack(fill=tk.X, pady=(6,0))
|
||||
self.pbar = ttk.Progressbar(pfrm, orient="horizontal", mode="indeterminate", length=300); self.pbar.pack(side=tk.LEFT)
|
||||
tk.Button(card, text="Télécharger", command=self._download_spacy).pack(side=tk.LEFT, padx=6)
|
||||
tk.Button(card, text="Choisir un dossier…", command=self._choose_model_dir).pack(side=tk.LEFT)
|
||||
tk.Checkbutton(card, text="Mode regex seul", variable=self.regex_only, command=self._toggle_regex).pack(side=tk.RIGHT)
|
||||
|
||||
# Carte HF
|
||||
card2 = tk.LabelFrame(top, text="Modèle avancé (Hugging Face)", padx=8, pady=8); card2.pack(fill=tk.X, pady=6)
|
||||
rowhf = tk.Frame(card2); rowhf.pack(fill=tk.X)
|
||||
tk.Label(rowhf, text="Préréglage :").pack(side=tk.LEFT)
|
||||
self.cmb = ttk.Combobox(rowhf, values=list(MODEL_PRESETS.keys()), textvariable=self.hf_model_label, state="readonly", width=35)
|
||||
self.cmb.pack(side=tk.LEFT, padx=6)
|
||||
self.cmb.bind("<<ComboboxSelected>>", self._preset_changed)
|
||||
tk.Label(rowhf, text="Model ID :").pack(side=tk.LEFT)
|
||||
tk.Entry(rowhf, textvariable=self.hf_model_id, width=44).pack(side=tk.LEFT, padx=6)
|
||||
tk.Button(rowhf, text="Charger modèle avancé", command=self._load_hf).pack(side=tk.LEFT)
|
||||
tk.Checkbutton(card2, text="Re-scanner agressif (ajoute le modèle avancé au narratif)", variable=self.aggressive_hf).pack(side=tk.LEFT, padx=10)
|
||||
tk.Label(card2, textvariable=self.hf_status_var, anchor="w").pack(fill=tk.X, pady=(6,0))
|
||||
|
||||
# Options
|
||||
opt = tk.LabelFrame(top, text="Options", padx=8, pady=8); opt.pack(fill=tk.X, pady=6)
|
||||
tk.Checkbutton(opt, text="Garder tables utiles (réduit)", variable=self.keep_tables).pack(side=tk.LEFT, padx=6)
|
||||
tk.Checkbutton(opt, text="Appliquer NER (spaCy) sur narratif", variable=self.apply_ner_on_narr).pack(side=tk.LEFT, padx=6)
|
||||
tk.Checkbutton(opt, text="Re-scanner (sécurité) après traitement", variable=self.safety_rescan).pack(side=tk.LEFT, padx=6)
|
||||
|
||||
pol = tk.LabelFrame(top, text="Politique Dates", padx=8, pady=8); pol.pack(fill=tk.X, pady=6)
|
||||
tk.Label(pol, text="Dates :").pack(side=tk.LEFT)
|
||||
ttk.Combobox(pol, textvariable=self.date_policy, values=["keep","month_year","shift"], width=12, state="readonly").pack(side=tk.LEFT, padx=6)
|
||||
tk.Label(pol, text="Décalage (+/- jours) :").pack(side=tk.LEFT)
|
||||
tk.Entry(pol, textvariable=self.date_shift_days, width=6).pack(side=tk.LEFT, padx=6)
|
||||
|
||||
tk.Label(top, text="Journal :").pack(anchor="w")
|
||||
self.txt = tk.Text(top, height=18); self.txt.pack(fill=tk.BOTH, expand=True, pady=(2,0))
|
||||
tk.Label(top, textvariable=self.status_var, anchor="w").pack(fill=tk.X, pady=(4,0))
|
||||
|
||||
# Helpers
|
||||
def _pbar_mode(self, mode:str):
|
||||
self.pbar.config(mode=mode)
|
||||
if mode=="indeterminate": self.pbar.start(60)
|
||||
else: self.pbar.stop(); self.pbar["value"]=0
|
||||
|
||||
def log(self, msg:str):
|
||||
self.queue.put(msg)
|
||||
|
||||
def _pump_logs(self):
|
||||
try:
|
||||
while True:
|
||||
msg = self.queue.get_nowait()
|
||||
self.txt.insert(tk.END, msg + "\n"); self.txt.see(tk.END)
|
||||
except queue.Empty:
|
||||
pass
|
||||
finally:
|
||||
self.root.after(60, self._pump_logs)
|
||||
|
||||
# spaCy
|
||||
def _ensure_spacy(self):
|
||||
self._pbar_mode("indeterminate")
|
||||
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||||
if ok:
|
||||
self.model_status_var.set(f"Modèle prêt. {msg}")
|
||||
self.btn_run.config(state=tk.NORMAL)
|
||||
else:
|
||||
self.model_status_var.set(f"Modèle indisponible : {msg} — utilisez 'Télécharger' ou 'Mode regex seul'.")
|
||||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||||
self._pbar_mode("determinate")
|
||||
|
||||
def _download_spacy(self):
|
||||
self._pbar_mode("indeterminate"); self.model_status_var.set("Téléchargement spaCy en cours…")
|
||||
def work():
|
||||
try:
|
||||
subprocess.check_call([sys.executable, "-m", "spacy", "download", MODEL_DIR_NAME])
|
||||
ok,msg = self.engine.try_load_spacy(resolve_base_dir()/ "models" / MODEL_DIR_NAME)
|
||||
if ok:
|
||||
self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
|
||||
else:
|
||||
self.model_status_var.set("Échec validation modèle. Essayez 'Choisir un dossier…' ou 'Mode regex seul'.")
|
||||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||||
except Exception as e:
|
||||
self.model_status_var.set(f"Erreur téléchargement spaCy : {e}")
|
||||
if not self.regex_only.get(): self.btn_run.config(state=tk.DISABLED)
|
||||
finally:
|
||||
self._pbar_mode("determinate")
|
||||
threading.Thread(target=work, daemon=True).start()
|
||||
|
||||
def _choose_model_dir(self):
|
||||
d = filedialog.askdirectory(title="Choisir le dossier du modèle spaCy")
|
||||
if d:
|
||||
ok,msg = self.engine.try_load_spacy(Path(d))
|
||||
if ok: self.model_status_var.set(f"Modèle prêt. {msg}"); self.btn_run.config(state=tk.NORMAL)
|
||||
else: self.model_status_var.set("Échec chargement du modèle.");
|
||||
if not self.regex_only.get() and not ok: self.btn_run.config(state=tk.DISABLED)
|
||||
|
||||
def _toggle_regex(self):
|
||||
if self.regex_only.get():
|
||||
self.engine.use_ner=False; self.apply_ner_on_narr.set(False); self.btn_run.config(state=tk.NORMAL)
|
||||
self.model_status_var.set("Mode regex seul : précision NER réduite.")
|
||||
else:
|
||||
self._ensure_spacy()
|
||||
|
||||
# HF
|
||||
def _preset_changed(self, _evt=None):
|
||||
label = self.hf_model_label.get()
|
||||
self.hf_model_id.set(MODEL_PRESETS.get(label, list(MODEL_PRESETS.values())[0]))
|
||||
|
||||
def _load_hf(self):
|
||||
mid = self.hf_model_id.get().strip()
|
||||
self.hf_status_var.set(f"Chargement du modèle avancé : {mid} …")
|
||||
self._pbar_mode("indeterminate")
|
||||
def work():
|
||||
try:
|
||||
self.engine.adv_model_id = mid
|
||||
ok,msg = self.engine.ensure_hf(status_cb=lambda m: self.hf_status_var.set(m))
|
||||
self.hf_status_var.set(msg)
|
||||
finally:
|
||||
self._pbar_mode("determinate")
|
||||
threading.Thread(target=work, daemon=True).start()
|
||||
|
||||
# Run
|
||||
def _browse(self):
|
||||
d = filedialog.askdirectory()
|
||||
if d: self.dir_var.set(d)
|
||||
|
||||
def _run(self):
|
||||
folder = Path(self.dir_var.get().strip())
|
||||
if not folder.is_dir():
|
||||
messagebox.showwarning("Dossier invalide","Choisissez un dossier contenant des PDF.")
|
||||
return
|
||||
self.engine.use_ner = (not self.regex_only.get()) and (self.engine.nlp is not None) and self.apply_ner_on_narr.get()
|
||||
self.engine.date_policy = self.date_policy.get()
|
||||
try: self.engine.date_shift_days = int(self.date_shift_days.get() or "0")
|
||||
except: self.engine.date_shift_days = 0
|
||||
|
||||
opts = dict(
|
||||
keep_tables = self.keep_tables.get(),
|
||||
apply_ner_on_narrative = self.apply_ner_on_narr.get() and self.engine.use_ner,
|
||||
safety_rescan = self.safety_rescan.get(),
|
||||
aggressive_hf = self.aggressive_hf.get() and (self.engine.hf is not None),
|
||||
)
|
||||
self.btn_run.config(state=tk.DISABLED)
|
||||
threading.Thread(target=self._worker, args=(folder,opts), daemon=True).start()
|
||||
|
||||
def _worker(self, folder: Path, options: Dict):
|
||||
try:
|
||||
pdfs = sorted([p for p in folder.glob("*.pdf") if p.is_file()])
|
||||
if not pdfs: self.log("Aucun PDF trouvé."); return
|
||||
outdir = folder / "pseudonymise"; outdir.mkdir(exist_ok=True)
|
||||
ok=ko=0
|
||||
for i,pdf in enumerate(pdfs, start=1):
|
||||
self.status_var.set(f"{i}/{len(pdfs)} — {pdf.name}")
|
||||
try:
|
||||
proc = PDFProcessor(self.engine, options)
|
||||
text, audit, scanned = proc.process_pdf(pdf)
|
||||
(outdir / f"{pdf.stem}.pseudonymise.txt").write_text(text, encoding="utf-8")
|
||||
with (outdir / f"{pdf.stem}.pseudonymise.jsonl").open("w", encoding="utf-8") as f:
|
||||
for rep in audit: f.write(json.dumps(asdict(rep), ensure_ascii=False) + "\n")
|
||||
with (outdir / f"{pdf.stem}.log.txt").open("w", encoding="utf-8") as f:
|
||||
f.write(f"Fichier: {pdf.name}\nScanneSuspect: {scanned}\nRemplacements: {len(audit)}\n")
|
||||
self.log(f"✓ {pdf.name}"); ok+=1
|
||||
except Exception as e:
|
||||
self.log(f"✗ {pdf.name} → ERREUR: {e}"); ko+=1
|
||||
self.status_var.set(f"Terminé : {ok} OK, {ko} erreurs. Sortie: {outdir}")
|
||||
finally:
|
||||
self.btn_run.config(state=tk.NORMAL)
|
||||
|
||||
# ----------- main -----------
|
||||
|
||||
def main():
|
||||
root = tk.Tk()
|
||||
App(root)
|
||||
root.mainloop()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
29
archives/legacy_gui/test_gui_error.py
Normal file
29
archives/legacy_gui/test_gui_error.py
Normal file
@@ -0,0 +1,29 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test pour reproduire l'erreur du GUI."""
|
||||
|
||||
from pathlib import Path
|
||||
import anonymizer_core_refactored_onnx as core
|
||||
from config_defaults import RUNTIME_DICTIONARIES_CONFIG_PATH
|
||||
|
||||
# Tester avec un seul PDF
|
||||
test_pdf = Path("/home/dom/Téléchargements").rglob("*.pdf")
|
||||
test_pdf = next(test_pdf, None)
|
||||
|
||||
if test_pdf:
|
||||
print(f"Test avec: {test_pdf}")
|
||||
try:
|
||||
result = core.process_pdf(
|
||||
test_pdf,
|
||||
Path("/tmp/test_gui"),
|
||||
make_vector_redaction=False,
|
||||
also_make_raster_burn=True,
|
||||
config_path=RUNTIME_DICTIONARIES_CONFIG_PATH,
|
||||
use_hf=False,
|
||||
)
|
||||
print(f"✅ Succès: {result}")
|
||||
except Exception as e:
|
||||
print(f"❌ Erreur: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
else:
|
||||
print("Aucun PDF trouvé")
|
||||
47
archives/legacy_gui/test_gui_fixed.py
Normal file
47
archives/legacy_gui/test_gui_fixed.py
Normal file
@@ -0,0 +1,47 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Test rapide pour vérifier que le GUI peut anonymiser correctement."""
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent))
|
||||
|
||||
import anonymizer_core_refactored_onnx as core
|
||||
from config_defaults import RUNTIME_DICTIONARIES_CONFIG_PATH
|
||||
|
||||
# Test avec un PDF simple
|
||||
test_pdf = Path("/tmp/test_gui_pdfs")
|
||||
if not test_pdf.exists():
|
||||
print("❌ Répertoire de test non trouvé:", test_pdf)
|
||||
sys.exit(1)
|
||||
|
||||
pdfs = list(test_pdf.glob("*.pdf"))
|
||||
if not pdfs:
|
||||
print("❌ Aucun PDF trouvé dans:", test_pdf)
|
||||
sys.exit(1)
|
||||
|
||||
pdf = pdfs[0]
|
||||
print(f"Test avec: {pdf}")
|
||||
|
||||
out_dir = Path("/tmp/test_gui_fixed")
|
||||
out_dir.mkdir(exist_ok=True)
|
||||
|
||||
try:
|
||||
# Simuler l'appel du GUI (sans use_vlm)
|
||||
outputs = core.process_pdf(
|
||||
pdf_path=pdf,
|
||||
out_dir=out_dir,
|
||||
make_vector_redaction=False,
|
||||
also_make_raster_burn=True,
|
||||
config_path=RUNTIME_DICTIONARIES_CONFIG_PATH,
|
||||
use_hf=False,
|
||||
ner_manager=None,
|
||||
ner_thresholds=None,
|
||||
ogc_label=None,
|
||||
vlm_manager=None,
|
||||
)
|
||||
print(f"✅ Succès: {outputs}")
|
||||
except Exception as e:
|
||||
print(f"❌ Erreur: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user