diff --git a/.gitignore b/.gitignore index afa741c..7ec37e5 100644 --- a/.gitignore +++ b/.gitignore @@ -6,10 +6,12 @@ __pycache__/ *.egg dist/ build/ +release/ *.whl # === Virtual environments === .venv/ +.venv_build_win/ venv/ venv_*/ env/ @@ -66,6 +68,9 @@ Thumbs.db # === Secrets === .env *.env +*.pfx +*.p12 +build_signing.local.ps1 credentials.json token.pickle diff --git a/Pseudonymisation_Gui_V5.py b/Pseudonymisation_Gui_V5.py index bd680f0..af0e9f6 100644 --- a/Pseudonymisation_Gui_V5.py +++ b/Pseudonymisation_Gui_V5.py @@ -22,13 +22,16 @@ import queue import re import subprocess import sys +import tempfile import threading +import unicodedata +from copy import deepcopy from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional import tkinter as tk -from tkinter import filedialog, messagebox, ttk +from tkinter import filedialog, messagebox, simpledialog, ttk # --------------------------------------------------------------------------- # Core @@ -63,6 +66,11 @@ try: except Exception: EdsPseudoManager = None # type: ignore +try: + from camembert_ner_manager import CamembertNerManager +except Exception: + CamembertNerManager = None # type: ignore + try: from vlm_manager import VlmManager, VlmConfig except Exception: @@ -75,9 +83,47 @@ except Exception: yaml = None from config_defaults import ( + deep_merge_dict, + load_effective_dictionaries_dict, + load_effective_param_lists, read_default_dictionaries_text, read_runtime_dictionaries_overlay_text, ) +from gui_batch_paths import ( + build_batch_output_dir, + iter_pseudonymized_texts, + list_supported_documents, +) +from manual_masking import ( + append_jsonl_file, + ensure_mask_templates_dir, + list_mask_templates, + mask_template_label, + resolve_manual_mask_pdf, +) +from profile_defaults import ( + delete_runtime_profile, + ensure_runtime_profiles_config, + get_default_profile_key, + list_default_profile_keys, + list_effective_profiles, + read_runtime_profiles_overlay_text, + save_runtime_profile, + set_runtime_default_profile, +) + +try: + from pdf_mask_designer import ( + MaskDesignerApp, + Template, + apply_template_vector, + load_template_yaml, + ) +except Exception: + MaskDesignerApp = None # type: ignore + Template = None # type: ignore + apply_template_vector = None # type: ignore + load_template_yaml = None # type: ignore # --------------------------------------------------------------------------- # Thème optionnel @@ -99,6 +145,7 @@ except Exception: # --------------------------------------------------------------------------- APP_TITLE = "Pseudonymisation de vos documents" APP_VERSION = "v5.5" +MANUAL_MASK_NONE_LABEL = "Aucun masque manuel" # Métadonnées de build — chargées depuis build_info.py (régénéré par rebuild_anon.ps1) try: @@ -154,7 +201,19 @@ def _resolve_config() -> Path: exe_cfg.write_text(read_runtime_dictionaries_overlay_text(), encoding="utf-8") return exe_cfg + +def _resolve_profiles_config() -> Path: + exe_cfg = _exe_dir() / "config" / "profiles.yml" + + if exe_cfg.exists(): + return exe_cfg + + exe_cfg.parent.mkdir(parents=True, exist_ok=True) + exe_cfg.write_text(read_runtime_profiles_overlay_text(), encoding="utf-8") + return exe_cfg + DEFAULT_CFG = _resolve_config() +DEFAULT_PROFILES_CFG = _resolve_profiles_config() MODELS_DIR = _app_dir() / "models" DEFAULTS_CFG_TEXT = read_default_dictionaries_text() @@ -335,7 +394,21 @@ class App: self.dir_var = tk.StringVar() self.status_var = tk.StringVar(value="Prêt.") self.cfg_path = tk.StringVar(value=str(DEFAULT_CFG)) + self.profiles_path = tk.StringVar(value=str(DEFAULT_PROFILES_CFG)) + self.processing_profile_label_var = tk.StringVar(value="") + self.manual_mask_template_var = tk.StringVar(value=MANUAL_MASK_NONE_LABEL) + self.profile_description_var = tk.StringVar(value="") + self.profile_require_manual_mask_var = tk.BooleanVar(value=False) + self.profile_force_disable_vlm_var = tk.BooleanVar(value=False) self.queue: "queue.Queue[UiMessage]" = queue.Queue() + self._processing_profiles: Dict[str, Dict[str, Any]] = {} + self._processing_profile_labels_to_keys: Dict[str, str] = {} + self._manual_mask_templates: Dict[str, Optional[Path]] = { + MANUAL_MASK_NONE_LABEL: None, + } + self._profile_base_description = "" + self._profile_manager_win: Optional[tk.Toplevel] = None + self._advanced_params_win: Optional[tk.Toplevel] = None # --- NER (interne) --- self.use_hf = False @@ -344,6 +417,7 @@ class App: self.th_loc = 0.90 self._onnx_manager: Optional[Any] = NerModelManager(cache_dir=MODELS_DIR) if NerModelManager else None self._eds_manager: Optional[Any] = EdsPseudoManager(cache_dir=MODELS_DIR) if EdsPseudoManager else None + self._camembert_manager: Optional[Any] = CamembertNerManager() if CamembertNerManager else None self._active_manager: Optional[Any] = None self.cfg_data: Dict[str, Any] = {} @@ -521,6 +595,7 @@ class App: _make_tab_button(tabs_bar, "anonym", "Anonymisation") _make_tab_button(tabs_bar, "params", "Paramètres") + _make_tab_button(tabs_bar, "profiles", "Profils") # Séparateur gris clair sous les onglets tk.Frame(self.root, bg=CLR_DIVIDER, height=1).pack(fill=tk.X) @@ -531,8 +606,10 @@ class App: tab_anonym_outer = tk.Frame(tabs_content, bg=CLR_BG) tab_params_outer = tk.Frame(tabs_content, bg=CLR_BG) + tab_profiles_outer = tk.Frame(tabs_content, bg=CLR_BG) self._tab_frames["anonym"] = tab_anonym_outer self._tab_frames["params"] = tab_params_outer + self._tab_frames["profiles"] = tab_profiles_outer # --- Scroll pour l'onglet Anonymisation --- canvas = tk.Canvas(tab_anonym_outer, bg=CLR_BG, highlightthickness=0) @@ -578,6 +655,22 @@ class App: canvas2.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) scrollbar2.pack(side=tk.RIGHT, fill=tk.Y) + # --- Scroll pour l'onglet Profils --- + canvas3 = tk.Canvas(tab_profiles_outer, bg=CLR_BG, highlightthickness=0) + scrollbar3 = ttk.Scrollbar(tab_profiles_outer, orient=tk.VERTICAL, command=canvas3.yview) + self._profiles_scroll = tk.Frame(canvas3, bg=CLR_BG) + self._profiles_scroll.bind( + "", + lambda e: canvas3.configure(scrollregion=canvas3.bbox("all")), + ) + canvas3_window = canvas3.create_window((0, 0), window=self._profiles_scroll, anchor="nw") + canvas3.configure(yscrollcommand=scrollbar3.set) + def _on_canvas3_configure(event): + canvas3.itemconfig(canvas3_window, width=event.width) + canvas3.bind("", _on_canvas3_configure) + canvas3.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar3.pack(side=tk.RIGHT, fill=tk.Y) + # "main" pointe désormais sur le scroll de l'onglet Anonymisation. # Tout le contenu existant (étape 1, formats, boutons, progress, résultats) # reste inchangé — seul le parent implicite a changé. @@ -645,7 +738,7 @@ class App: info_inner, text=("\u2022 Recherche récursive de tous les documents dans les sous-dossiers\n" "\u2022 Sortie PDF Image (raster) — sécurité maximale, aucun texte résiduel\n" - "\u2022 Résultats dans le dossier « anonymise/ » à la racine"), + "\u2022 Résultats dans « anonymise/ » en conservant les sous-dossiers source"), font=self._f_card_desc, bg=CLR_BLUE_LIGHT, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, ).pack(fill=tk.X, pady=(4, 0)) @@ -718,9 +811,88 @@ class App: "Utile pour gérer les spécificités de votre établissement."), font=self._f_small, bg=CLR_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=700, + ).pack(fill=tk.X, padx=pad_x, pady=(0, 4)) + + self._params_summary = tk.Label( + self._params_frame, + text="", + font=self._f_small, + bg=CLR_BG, fg=CLR_TEXT, anchor="w", justify=tk.LEFT, wraplength=700, + ) + self._params_summary.pack(fill=tk.X, padx=pad_x, pady=(0, 4)) + + tk.Label( + self._params_frame, + text=("Les listes ci-dessous ne montrent que les paramètres manuels éditables. " + "Le moteur applique aussi des règles automatiques non listées ici " + "(regex, gazetteers FINESS/INSEE, dictionnaires et règles admin)."), + font=self._f_small, + bg=CLR_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=700, ).pack(fill=tk.X, padx=pad_x, pady=(0, 16)) - # Conteneur interne avec padding latéral pour les listboxes + tk.Label( + self._params_frame, + text="Masques PDF réutilisables", + font=(self._font_family, 12, "bold"), + bg=CLR_BG, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, padx=pad_x, pady=(0, 4)) + + tk.Label( + self._params_frame, + text=( + "Pour les formulaires toujours mis en page de la même façon, " + "ouvrez l'éditeur de masques PDF, dessinez les zones à caviarder " + "puis enregistrez un modèle réutilisable." + ), + font=self._f_small, + bg=CLR_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=700, + ).pack(fill=tk.X, padx=pad_x, pady=(0, 8)) + + manual_mask_row = tk.Frame(self._params_frame, bg=CLR_BG) + manual_mask_row.pack(fill=tk.X, padx=pad_x, pady=(0, 16)) + + manual_mask_btn = tk.Button( + manual_mask_row, text="Ouvrir l'éditeur de masques PDF", + font=self._f_small, bg=CLR_PRIMARY_LIGHT, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._open_manual_mask_designer, + ) + manual_mask_btn.pack(side=tk.LEFT) + + self._manual_mask_combo = ttk.Combobox( + manual_mask_row, + textvariable=self.manual_mask_template_var, + state="readonly", + width=34, + ) + self._manual_mask_combo.pack(side=tk.LEFT, padx=(6, 0)) + self._manual_mask_combo.bind("<>", lambda _e: self._refresh_manual_mask_hint()) + + refresh_templates_btn = tk.Button( + manual_mask_row, text="Actualiser les modèles", + font=self._f_small, bg=CLR_CARD_BG, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._refresh_manual_mask_templates, + ) + refresh_templates_btn.pack(side=tk.LEFT, padx=(6, 0)) + + templates_btn = tk.Button( + manual_mask_row, text="Ouvrir le dossier des modèles", + font=self._f_small, bg=CLR_ACCENT_LIGHT, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._open_manual_mask_templates_dir, + ) + templates_btn.pack(side=tk.LEFT, padx=(6, 0)) + + self._manual_mask_hint = tk.Label( + self._params_frame, + text="", + font=self._f_small, + bg=CLR_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=700, + ) + self._manual_mask_hint.pack(fill=tk.X, padx=pad_x, pady=(0, 12)) + + # Conteneur interne visible : réglages manuels éditables. params_inner = tk.Frame(self._params_frame, bg=CLR_BG) params_inner.pack(fill=tk.X, padx=pad_x, pady=(0, 12)) @@ -730,6 +902,7 @@ class App: title="\u2705 Phrases à ne PAS anonymiser :", placeholder="Ajouter une phrase à protéger...", color_tag=CLR_GREEN_LIGHT, + on_change=self._refresh_params_summary, ) # --- Blacklist (phrases à toujours masquer) --- @@ -738,6 +911,7 @@ class App: title="\u26d4 Mots/phrases à TOUJOURS masquer :", placeholder="Ajouter un mot ou phrase à masquer...", color_tag=CLR_PRIMARY_LIGHT, + on_change=self._refresh_params_summary, ) # --- Stop-words additionnels (mots à ne jamais identifier comme noms) --- @@ -748,6 +922,7 @@ class App: title="\u26a0 Mots à ne jamais identifier comme noms (sigles, acronymes...) :", placeholder="Ajouter un mot (ex: sigle local, acronyme métier)...", color_tag=CLR_ACCENT_LIGHT, + on_change=self._refresh_params_summary, ) # Boutons sauvegarder + exporter @@ -781,6 +956,253 @@ class App: # Charger les valeurs initiales depuis la config self._load_params() + self._refresh_manual_mask_templates() + + # ============================================================= + # ONGLET "PROFILS" + # ============================================================= + self._profiles_frame = self._profiles_scroll + + tk.Label( + self._profiles_frame, + text="Profils métier", + font=(self._font_family, 14, "bold"), + bg=CLR_BG, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, padx=pad_x, pady=(20, 4)) + + tk.Label( + self._profiles_frame, + text=( + "Un profil mémorise les réglages courants de l'application. " + "Utilise cet onglet pour choisir le profil actif, modifier sa description, " + "et enregistrer un nouveau profil utilisateur." + ), + font=self._f_small, + bg=CLR_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=700, + ).pack(fill=tk.X, padx=pad_x, pady=(0, 12)) + + profile_card = tk.Frame( + self._profiles_frame, + bg=CLR_CARD_BG, + highlightbackground=CLR_CARD_BORDER, + highlightthickness=1, + ) + profile_card.pack(fill=tk.X, padx=pad_x, pady=(0, 16)) + + profile_card_inner = tk.Frame(profile_card, bg=CLR_CARD_BG) + profile_card_inner.pack(fill=tk.X, padx=16, pady=14) + profile_card_inner.columnconfigure(0, weight=3) + profile_card_inner.columnconfigure(1, weight=2) + + profile_left = tk.Frame(profile_card_inner, bg=CLR_CARD_BG) + profile_left.grid(row=0, column=0, sticky="nsew", padx=(0, 10)) + + profile_right = tk.Frame(profile_card_inner, bg=CLR_BLUE_LIGHT) + profile_right.grid(row=0, column=1, sticky="nsew") + + tk.Label( + profile_left, + text="Profil actif", + font=self._f_body_bold, + bg=CLR_CARD_BG, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, pady=(0, 4)) + + profile_select_row = tk.Frame(profile_left, bg=CLR_CARD_BG) + profile_select_row.pack(fill=tk.X, pady=(0, 10)) + + self._profile_combo = ttk.Combobox( + profile_select_row, + textvariable=self.processing_profile_label_var, + state="readonly", + width=34, + ) + self._profile_combo.pack(side=tk.LEFT) + self._profile_combo.bind("<>", lambda _e: self._apply_selected_processing_profile()) + + refresh_profiles_btn = tk.Button( + profile_select_row, text="Actualiser", + font=self._f_small, bg=CLR_CARD_BG, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._refresh_processing_profiles, + ) + refresh_profiles_btn.pack(side=tk.LEFT, padx=(6, 0)) + + self._profile_kind_label = tk.Label( + profile_left, + text="", + font=self._f_small, + bg=CLR_CARD_BG, fg=CLR_TEXT_SECONDARY, anchor="w", + ) + self._profile_kind_label.pack(fill=tk.X, pady=(0, 8)) + + tk.Label( + profile_left, + text="Description", + font=self._f_small, + bg=CLR_CARD_BG, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, pady=(0, 4)) + + self._profile_description_entry = tk.Entry( + profile_left, + textvariable=self.profile_description_var, + font=self._f_small, + relief=tk.GROOVE, + bd=1, + ) + self._profile_description_entry.pack(fill=tk.X, pady=(0, 10)) + self.profile_description_var.trace_add("write", self._on_profile_description_change) + + flags_row = tk.Frame(profile_left, bg=CLR_CARD_BG) + flags_row.pack(fill=tk.X, pady=(0, 10)) + + self._profile_require_manual_mask_check = tk.Checkbutton( + flags_row, + text="Masque manuel obligatoire", + variable=self.profile_require_manual_mask_var, + font=self._f_small, + bg=CLR_CARD_BG, + activebackground=CLR_CARD_BG, + command=self._on_profile_editor_change, + ) + self._profile_require_manual_mask_check.pack(side=tk.LEFT) + + self._profile_force_disable_vlm_check = tk.Checkbutton( + flags_row, + text="Désactiver le VLM", + variable=self.profile_force_disable_vlm_var, + font=self._f_small, + bg=CLR_CARD_BG, + activebackground=CLR_CARD_BG, + command=self._on_profile_editor_change, + ) + self._profile_force_disable_vlm_check.pack(side=tk.LEFT, padx=(12, 0)) + + tk.Label( + profile_left, + text="Masque PDF mémorisé par ce profil", + font=self._f_small, + bg=CLR_CARD_BG, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, pady=(0, 4)) + + profile_mask_row = tk.Frame(profile_left, bg=CLR_CARD_BG) + profile_mask_row.pack(fill=tk.X, pady=(0, 10)) + + self._profile_manual_mask_combo = ttk.Combobox( + profile_mask_row, + textvariable=self.manual_mask_template_var, + state="readonly", + width=34, + ) + self._profile_manual_mask_combo.pack(side=tk.LEFT) + self._profile_manual_mask_combo.bind( + "<>", + lambda _e: self._refresh_manual_mask_hint(), + ) + + tk.Button( + profile_mask_row, text="Actualiser les modèles", + font=self._f_small, bg=CLR_CARD_BG, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._refresh_manual_mask_templates, + ).pack(side=tk.LEFT, padx=(6, 0)) + + self._profile_mask_explainer = tk.Label( + profile_left, + text=( + "Ce choix est enregistré dans le profil. " + "Quand tu recharges ce profil, ce masque est re-sélectionné automatiquement." + ), + font=self._f_small, + bg=CLR_CARD_BG, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=420, + ) + self._profile_mask_explainer.pack(fill=tk.X, pady=(0, 10)) + + profile_actions_row = tk.Frame(profile_left, bg=CLR_CARD_BG) + profile_actions_row.pack(fill=tk.X) + + tk.Button( + profile_actions_row, text="Nouveau profil...", + font=self._f_small, bg=CLR_PRIMARY_LIGHT, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._create_processing_profile, + ).pack(side=tk.LEFT) + + tk.Button( + profile_actions_row, text="Enregistrer", + font=self._f_small, bg=CLR_PRIMARY, fg="white", + activebackground=CLR_PRIMARY_DARK, activeforeground="white", + relief=tk.FLAT, cursor="hand2", padx=10, pady=6, + command=self._save_selected_processing_profile, + ).pack(side=tk.LEFT, padx=(6, 0)) + + tk.Button( + profile_actions_row, text="Renommer...", + font=self._f_small, bg=CLR_CARD_BG, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._rename_selected_processing_profile, + ).pack(side=tk.LEFT, padx=(6, 0)) + + tk.Button( + profile_actions_row, text="Définir par défaut", + font=self._f_small, bg=CLR_ACCENT_LIGHT, fg=CLR_TEXT, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._set_selected_processing_profile_default, + ).pack(side=tk.LEFT, padx=(6, 0)) + + tk.Button( + profile_actions_row, text="Supprimer", + font=self._f_small, bg=CLR_RED_LIGHT, fg=CLR_RED, + relief=tk.GROOVE, cursor="hand2", padx=10, pady=6, + command=self._delete_selected_processing_profile, + ).pack(side=tk.LEFT, padx=(6, 0)) + + profile_right_inner = tk.Frame(profile_right, bg=CLR_BLUE_LIGHT) + profile_right_inner.pack(fill=tk.BOTH, expand=True, padx=14, pady=14) + + tk.Label( + profile_right_inner, + text="Résumé du profil", + font=self._f_body_bold, + bg=CLR_BLUE_LIGHT, fg=CLR_TEXT, anchor="w", + ).pack(fill=tk.X, pady=(0, 6)) + + self._profile_description = tk.Label( + profile_right_inner, + text="", + font=self._f_small, + bg=CLR_BLUE_LIGHT, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=300, + ) + self._profile_description.pack(fill=tk.X, pady=(0, 10)) + + self._profile_capture_summary = tk.Label( + profile_right_inner, + text="", + font=self._f_small, + bg=CLR_BLUE_LIGHT, fg=CLR_TEXT, anchor="w", justify=tk.LEFT, wraplength=300, + ) + self._profile_capture_summary.pack(fill=tk.X, pady=(0, 10)) + + tk.Label( + profile_right_inner, + text=( + "Sens de « masque manuel obligatoire » : le profil n'impose pas un masque précis, " + "mais il bloque le lancement si aucun masque PDF n'est sélectionné." + ), + font=self._f_small, + bg=CLR_BLUE_LIGHT, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=300, + ).pack(fill=tk.X, pady=(0, 10)) + + tk.Label( + profile_right_inner, + text=( + "Lien profil ↔ masque : le masque actuellement choisi dans cet onglet " + "est mémorisé dans le profil lors de l'enregistrement." + ), + font=self._f_small, + bg=CLR_BLUE_LIGHT, fg=CLR_TEXT_SECONDARY, anchor="w", justify=tk.LEFT, wraplength=300, + ).pack(fill=tk.X) + + self._refresh_processing_profiles() # Retour dans l'onglet Anonymisation ttk.Separator(main).pack(fill=tk.X, padx=pad_x, pady=(0, 8)) @@ -986,10 +1408,7 @@ class App: SUPPORTED_EXTENSIONS = {".pdf"} doc_count = 0 try: - doc_count = len([ - p for p in Path(folder).rglob("*") - if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS - ]) + doc_count = len(list_supported_documents(Path(folder), SUPPORTED_EXTENSIONS)) except Exception: pass display_label = folder @@ -1044,6 +1463,9 @@ class App: # --------------------------------------------------------------- def _run(self): is_single = getattr(self, '_single_file', None) is not None + profile_key = self._selected_processing_profile_key() + profile_spec = self._build_live_profile_spec() + manual_mask_template = self._selected_manual_mask_template_path() if is_single: # Mode fichier unique @@ -1065,17 +1487,46 @@ class App: from format_converter import SUPPORTED_EXTENSIONS except ImportError: SUPPORTED_EXTENSIONS = {".pdf"} - pdfs = sorted([ - p for p in folder.rglob("*") - if p.is_file() and p.suffix.lower() in SUPPORTED_EXTENSIONS - ]) + pdfs = list_supported_documents(folder, SUPPORTED_EXTENSIONS) if not pdfs: exts = ", ".join(sorted(SUPPORTED_EXTENSIONS)) messagebox.showwarning( "Aucun document", f"Aucun fichier supporté trouvé.\n" f"Formats acceptés : {exts}\n" - f"(recherche récursive dans les sous-dossiers)", + f"(recherche récursive dans les sous-dossiers, hors anonymise/)", + ) + return + + if profile_spec.get("require_manual_mask") and manual_mask_template is None: + messagebox.showwarning( + "Masque manuel requis", + "Le profil sélectionné exige un masque manuel.\n" + "Choisissez un modèle de masque avant de lancer le traitement.", + ) + return + + if manual_mask_template is not None: + if apply_template_vector is None or Template is None or load_template_yaml is None: + messagebox.showwarning( + "Masque manuel indisponible", + "Le template sélectionné ne peut pas être appliqué car " + "la bibliothèque PDF n'est pas disponible.", + ) + return + if not manual_mask_template.is_file(): + messagebox.showwarning( + "Masque manuel introuvable", + f"Le modèle sélectionné est introuvable :\n{manual_mask_template}", + ) + self._refresh_manual_mask_templates() + return + try: + self._load_manual_mask_template(manual_mask_template) + except Exception as e: + messagebox.showwarning( + "Masque manuel invalide", + f"Impossible de charger le modèle sélectionné :\n{e}", ) return @@ -1084,7 +1535,11 @@ class App: self.btn_stop.pack(fill=tk.X) self._show_progress(total=len(pdfs)) self._hide_results() - threading.Thread(target=self._worker, args=(folder, pdfs), daemon=True).start() + threading.Thread( + target=self._worker, + args=(folder, pdfs, manual_mask_template, profile_key, profile_spec), + daemon=True, + ).start() def _stop(self): """Demande l'arrêt du traitement en cours.""" @@ -1092,11 +1547,72 @@ class App: self.btn_stop.config(state=tk.DISABLED, bg="#fca5a5", text="Arrêt en cours...") self.status_var.set("Arrêt demandé, fin du document en cours...") - def _worker(self, folder: Path, pdfs: List[Path]): + def _worker( + self, + folder: Path, + pdfs: List[Path], + manual_mask_template_path: Optional[Path], + profile_key: str, + profile_spec: Dict[str, Any], + ): import time start_time = time.time() + manual_mask_template = None + temp_profile_cfg_path: Optional[Path] = None try: + config_path = Path(self.cfg_path.get()) + merged_cfg = load_effective_dictionaries_dict(config_path) + param_lists = profile_spec.get("param_lists") or {} + if isinstance(param_lists, dict): + merged_cfg["whitelist_phrases"] = list(param_lists.get("whitelist_phrases", [])) + if not isinstance(merged_cfg.get("blacklist"), dict): + merged_cfg["blacklist"] = {} + merged_cfg["blacklist"]["force_mask_terms"] = list( + param_lists.get("blacklist_force_mask_terms", []) + ) + merged_cfg["additional_stopwords"] = list( + param_lists.get("additional_stopwords", []) + ) + profile_overlay = profile_spec.get("dictionaries_overlay") or {} + if profile_overlay: + merged_cfg = deep_merge_dict(merged_cfg, profile_overlay) + if yaml is not None: + fd, temp_name = tempfile.mkstemp( + prefix="profile_", + suffix=".yml", + dir=str(config_path.parent), + ) + os.close(fd) + temp_profile_cfg_path = Path(temp_name) + temp_profile_cfg_path.write_text( + yaml.safe_dump( + merged_cfg, + allow_unicode=True, + default_flow_style=False, + sort_keys=False, + ), + encoding="utf-8", + ) + config_path = temp_profile_cfg_path + + if profile_spec: + label = profile_spec.get("label") or profile_key + self.queue.put( + UiMessage( + kind=MsgType.LOG, + text=f"~ profil métier actif : {label}", + ) + ) + + if manual_mask_template_path is not None: + manual_mask_template = self._load_manual_mask_template(manual_mask_template_path) + self.queue.put( + UiMessage( + kind=MsgType.LOG, + text=f"~ masque manuel actif : {manual_mask_template_path.name}", + ) + ) outdir = folder / "anonymise" outdir.mkdir(exist_ok=True) ok = ko = 0 @@ -1107,15 +1623,50 @@ class App: if self._stop_requested: self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\n⚠️ Arrêt demandé par l'utilisateur")) break + + display_name = pdf.name + if folder in pdf.parents: + display_name = str(pdf.relative_to(folder)) self.queue.put(UiMessage( kind=MsgType.PROGRESS, current=i, total=len(pdfs), - filename=pdf.name, + filename=display_name, )) try: + source_doc = pdf + temp_dir_ctx = None + manual_mask_audit = None + if manual_mask_template is not None: + if pdf.suffix.lower() == ".pdf": + temp_dir_ctx = tempfile.TemporaryDirectory(prefix="manual-mask-") + temp_dir = Path(temp_dir_ctx.name) + source_doc = temp_dir / pdf.name + manual_mask_audit = temp_dir / f"{pdf.stem}.manual_mask.audit.jsonl" + apply_template_vector(pdf, source_doc, manual_mask_template, manual_mask_audit) + self.queue.put( + UiMessage( + kind=MsgType.LOG, + text=f" ~ masque manuel appliqué : {manual_mask_template.name}", + ) + ) + else: + self.queue.put( + UiMessage( + kind=MsgType.LOG, + text=" ~ masque manuel ignoré : format non PDF", + ) + ) + active = self._active_manager use_ner = bool(active and self.use_hf and hasattr(active, 'is_loaded') and active.is_loaded()) + camembert_active = ( + self._camembert_manager + if self._camembert_manager + and hasattr(self._camembert_manager, "is_loaded") + and self._camembert_manager.is_loaded() + else None + ) thresholds = None if use_ner and NerThresholds and not (EdsPseudoManager and isinstance(active, EdsPseudoManager)): thresholds = NerThresholds(self.th_per, self.th_org, self.th_loc, 0.85) @@ -1137,19 +1688,24 @@ class App: # sinon fallback sur process_pdf (PDF uniquement) _process_fn = getattr(core, 'process_document', None) or core.process_pdf _path_key = "doc_path" if _process_fn.__name__ == "process_document" else "pdf_path" + doc_outdir = build_batch_output_dir(folder, outdir, pdf) + doc_outdir.mkdir(parents=True, exist_ok=True) outputs = _process_fn( - **{_path_key: pdf}, - out_dir=outdir, + **{_path_key: source_doc}, + out_dir=doc_outdir, make_vector_redaction=False, also_make_raster_burn=True, - config_path=Path(self.cfg_path.get()), + config_path=config_path, use_hf=use_ner, ner_manager=active, ner_thresholds=thresholds, ogc_label=ogc, vlm_manager=self._vlm_manager if vlm_active else None, + camembert_manager=camembert_active, ) - self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\u2713 {pdf.name}")) + if manual_mask_audit is not None and "audit" in outputs: + append_jsonl_file(Path(outputs["audit"]), manual_mask_audit) + self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\u2713 {display_name}")) for k, v in outputs.items(): self.queue.put(UiMessage(kind=MsgType.LOG, text=f" - {k}: {v}")) @@ -1164,8 +1720,11 @@ class App: global_counts[k] = global_counts.get(k, 0) + v ok += 1 except Exception as e: - self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\u2717 {pdf.name} \u2192 ERREUR: {e}")) + self.queue.put(UiMessage(kind=MsgType.LOG, text=f"\u2717 {display_name} \u2192 ERREUR: {e}")) ko += 1 + finally: + if temp_dir_ctx is not None: + temp_dir_ctx.cleanup() total_time = time.time() - start_time total_masked = sum(global_counts.values()) @@ -1195,6 +1754,12 @@ class App: self.queue.put(UiMessage(kind=MsgType.LOG, text=f"Erreur fatale : {e}")) total_time = time.time() - start_time self.queue.put(UiMessage(kind=MsgType.DONE, ok=0, ko=len(pdfs), masked=0, outdir="", total_time=total_time)) + finally: + if temp_profile_cfg_path is not None: + try: + temp_profile_cfg_path.unlink() + except Exception: + pass # --------------------------------------------------------------- # Pompe de messages @@ -1303,6 +1868,450 @@ class App: if self._last_outdir: open_folder(self._last_outdir) + def _manual_mask_templates_dir(self) -> Path: + return ensure_mask_templates_dir(_exe_dir()) + + def _selected_processing_profile_key(self) -> str: + label = self.processing_profile_label_var.get() + return self._processing_profile_labels_to_keys.get(label, "") + + def _selected_processing_profile_spec(self) -> Dict[str, Any]: + key = self._selected_processing_profile_key() + return self._processing_profiles.get(key, {}) + + def _set_listbox_values(self, listbox: tk.Listbox, values: List[str]): + listbox.delete(0, tk.END) + for value in values: + listbox.insert(tk.END, value) + + def _current_param_lists(self) -> Dict[str, List[str]]: + return { + "whitelist_phrases": list(self._wl_listbox.get(0, tk.END)), + "blacklist_force_mask_terms": list(self._bl_listbox.get(0, tk.END)), + "additional_stopwords": list(self._sw_listbox.get(0, tk.END)), + } + + def _apply_param_lists_to_widgets(self, param_lists: Dict[str, List[str]]): + self._set_listbox_values( + self._wl_listbox, + list(param_lists.get("whitelist_phrases", [])), + ) + self._set_listbox_values( + self._bl_listbox, + list(param_lists.get("blacklist_force_mask_terms", [])), + ) + self._set_listbox_values( + self._sw_listbox, + list(param_lists.get("additional_stopwords", [])), + ) + self._refresh_params_summary() + + def _current_manual_mask_template_setting(self) -> str: + selected = self._selected_manual_mask_template_path() + if selected is None: + return "" + return mask_template_label(selected, _exe_dir()) + + def _select_manual_mask_template_from_setting(self, template_name: str): + wanted = str(template_name or "").strip() + if not wanted: + self.manual_mask_template_var.set(MANUAL_MASK_NONE_LABEL) + return + template_path = self._manual_mask_templates_dir() / wanted + selected_label = MANUAL_MASK_NONE_LABEL + for label, path in self._manual_mask_templates.items(): + if path == template_path: + selected_label = label + break + self.manual_mask_template_var.set(selected_label) + + def _build_live_profile_spec( + self, + *, + label: Optional[str] = None, + description: Optional[str] = None, + base_spec: Optional[Dict[str, Any]] = None, + ) -> Dict[str, Any]: + spec = dict(base_spec or self._selected_processing_profile_spec()) + return { + "label": str(label if label is not None else spec.get("label") or self.processing_profile_label_var.get() or "Profil"), + "description": str( + description + if description is not None + else self.profile_description_var.get() or spec.get("description") or "" + ), + "require_manual_mask": bool(self.profile_require_manual_mask_var.get()), + "force_disable_vlm": bool(self.profile_force_disable_vlm_var.get()), + "dictionaries_overlay": deepcopy(spec.get("dictionaries_overlay") or {}), + "param_lists": self._current_param_lists(), + "has_param_lists": True, + "preferred_manual_mask_template": self._current_manual_mask_template_setting(), + "has_preferred_manual_mask_template": True, + } + + def _profile_key_from_label(self, label: str) -> str: + ascii_label = unicodedata.normalize("NFKD", label).encode("ascii", "ignore").decode("ascii") + slug = re.sub(r"[^a-zA-Z0-9]+", "_", ascii_label.lower()).strip("_") or "profil" + existing = set(self._processing_profiles.keys()) + candidate = slug + index = 2 + while candidate in existing: + candidate = f"{slug}_{index}" + index += 1 + return candidate + + def _refresh_profile_description(self): + description = self.profile_description_var.get().strip() + hints: list[str] = [] + if self.profile_require_manual_mask_var.get(): + hints.append("masque manuel requis") + if self.profile_force_disable_vlm_var.get(): + hints.append("VLM désactivé") + spec = self._selected_processing_profile_spec() + if spec.get("dictionaries_overlay"): + hints.append("règles de masquage renforcées") + if hints: + description = f"{description}\nOptions actives : {', '.join(hints)}." if description else f"Options actives : {', '.join(hints)}." + self._profile_description.configure(text=description) + + def _on_profile_editor_change(self): + self._apply_processing_profile_gui_state() + self._refresh_profile_description() + self._refresh_manual_mask_hint() + self._refresh_profile_capture_summary() + + def _on_profile_description_change(self, *_args): + self._refresh_profile_description() + + def _builtin_processing_profile_keys(self) -> set[str]: + return list_default_profile_keys() + + def _open_profile_manager(self): + self._switch_tab("profiles") + + def _refresh_profile_capture_summary(self): + if not hasattr(self, "_profile_capture_summary"): + return + profile_key = self._selected_processing_profile_key() + param_lists = self._current_param_lists() + wl_count = len(param_lists.get("whitelist_phrases", [])) + bl_count = len(param_lists.get("blacklist_force_mask_terms", [])) + sw_count = len(param_lists.get("additional_stopwords", [])) + mask_label = self.manual_mask_template_var.get() + default_key = get_default_profile_key(Path(self.profiles_path.get())) + default_text = "profil par défaut" if profile_key and profile_key == default_key else "profil secondaire" + self._profile_capture_summary.configure( + text=( + f"Ce profil enregistrera : {wl_count} préservations, {bl_count} masquages forcés, " + f"{sw_count} stop-word additionnel. Masque PDF courant : {mask_label}. " + f"Statut : {default_text}." + ) + ) + + def _refresh_profile_kind_label(self): + if not hasattr(self, "_profile_kind_label"): + return + profile_key = self._selected_processing_profile_key() + if not profile_key: + self._profile_kind_label.configure(text="") + return + profile_kind = "profil fourni" if profile_key in self._builtin_processing_profile_keys() else "profil utilisateur" + self._profile_kind_label.configure(text=f"Type : {profile_kind} ({profile_key})") + + def _rename_selected_processing_profile(self): + profile_key = self._selected_processing_profile_key() + if not profile_key: + messagebox.showwarning("Profils", "Aucun profil sélectionné.") + return + base_spec = self._selected_processing_profile_spec() + current_label_text = str(base_spec.get("label") or profile_key) + new_label = simpledialog.askstring( + "Renommer le profil", + "Nouveau nom visible du profil :", + initialvalue=current_label_text, + parent=self.root, + ) + if new_label is None: + return + new_label = new_label.strip() + if not new_label: + messagebox.showwarning("Profils", "Le nom du profil ne peut pas être vide.") + return + updated_spec = self._build_live_profile_spec(label=new_label, base_spec=base_spec) + save_runtime_profile(profile_key, updated_spec, Path(self.profiles_path.get())) + self._refresh_processing_profiles(preferred_key=profile_key) + messagebox.showinfo("Profils", f"Profil renommé : {new_label}") + + def _set_selected_processing_profile_default(self): + profile_key = self._selected_processing_profile_key() + if not profile_key: + messagebox.showwarning("Profils", "Aucun profil sélectionné.") + return + set_runtime_default_profile(profile_key, Path(self.profiles_path.get())) + self._refresh_processing_profiles(preferred_key=profile_key) + messagebox.showinfo("Profils", "Profil par défaut mis à jour.") + + def _delete_selected_processing_profile(self): + profile_key = self._selected_processing_profile_key() + spec = self._selected_processing_profile_spec() + profile_label = str(spec.get("label") or profile_key) + if not profile_key: + messagebox.showwarning("Profils", "Aucun profil sélectionné.") + return + if profile_key in self._builtin_processing_profile_keys(): + messagebox.showwarning( + "Profils", + "Les profils fournis par défaut ne peuvent pas être supprimés.\n" + "Crée un profil utilisateur si tu veux un profil spécifique.", + ) + return + confirmed = messagebox.askyesno( + "Supprimer le profil", + f"Supprimer définitivement le profil utilisateur « {profile_label} » ?", + parent=self.root, + ) + if not confirmed: + return + delete_runtime_profile(profile_key, Path(self.profiles_path.get())) + self._refresh_processing_profiles() + messagebox.showinfo("Profils", f"Profil supprimé : {profile_label}") + + def _create_processing_profile(self): + base_spec = self._selected_processing_profile_spec() + initial_label = f"{base_spec.get('label') or 'Profil'} copie" + label = simpledialog.askstring( + "Nouveau profil", + "Nom du nouveau profil :", + initialvalue=initial_label, + parent=self.root, + ) + if label is None: + return + label = label.strip() + if not label: + messagebox.showwarning("Profils", "Le nom du profil ne peut pas être vide.") + return + + description = simpledialog.askstring( + "Nouveau profil", + "Description du profil (optionnelle) :", + initialvalue=str(base_spec.get("description") or ""), + parent=self.root, + ) + if description is None: + description = str(base_spec.get("description") or "") + + profile_key = self._profile_key_from_label(label) + profile_spec = self._build_live_profile_spec( + label=label, + description=description.strip(), + base_spec=base_spec, + ) + set_default = messagebox.askyesno( + "Nouveau profil", + "Définir ce nouveau profil comme profil par défaut ?", + parent=self.root, + ) + save_runtime_profile( + profile_key, + profile_spec, + Path(self.profiles_path.get()), + set_default=set_default, + ) + self._refresh_processing_profiles(preferred_key=profile_key) + messagebox.showinfo( + "Profils", + f"Profil enregistré : {label}", + parent=self.root, + ) + + def _save_selected_processing_profile(self): + profile_key = self._selected_processing_profile_key() + if not profile_key: + messagebox.showwarning( + "Profils", + "Aucun profil sélectionné. Créez d'abord un nouveau profil.", + parent=self.root, + ) + return + base_spec = self._selected_processing_profile_spec() + profile_label = str(base_spec.get("label") or profile_key) + if profile_key in {"standard_local", "chcb_strict", "partage_recherche", "dossier_audit", "demo"}: + confirmed = messagebox.askyesno( + "Profils", + "Vous allez enregistrer une surcharge locale sur un profil fourni par défaut.\n\n" + f"Continuer pour « {profile_label} » ?", + parent=self.root, + ) + if not confirmed: + return + profile_spec = self._build_live_profile_spec(base_spec=base_spec) + save_runtime_profile( + profile_key, + profile_spec, + Path(self.profiles_path.get()), + ) + self._refresh_processing_profiles(preferred_key=profile_key) + messagebox.showinfo( + "Profils", + f"Profil mis à jour : {profile_label}", + parent=self.root, + ) + + def _refresh_processing_profiles(self, preferred_key: Optional[str] = None): + ensure_runtime_profiles_config(Path(self.profiles_path.get())) + current_key = preferred_key or self._selected_processing_profile_key() + profiles = list_effective_profiles(Path(self.profiles_path.get())) + self._processing_profiles = profiles + self._processing_profile_labels_to_keys = { + spec.get("label") or key: key + for key, spec in profiles.items() + } + labels = list(self._processing_profile_labels_to_keys.keys()) + self._profile_combo.configure(values=labels) + selected_key = current_key + if not selected_key or selected_key not in profiles: + selected_key = get_default_profile_key(Path(self.profiles_path.get())) + selected_label = next( + ( + label + for label, key in self._processing_profile_labels_to_keys.items() + if key == selected_key + ), + labels[0] if labels else "", + ) + if selected_label: + self.processing_profile_label_var.set(selected_label) + self._apply_selected_processing_profile() + + def _apply_selected_processing_profile(self): + spec = self._selected_processing_profile_spec() + if not spec: + self._profile_base_description = "" + self.profile_description_var.set("") + self._profile_description.configure(text="") + return + + self._profile_base_description = str(spec.get("description") or "") + self.profile_description_var.set(self._profile_base_description) + self.profile_require_manual_mask_var.set(bool(spec.get("require_manual_mask"))) + self.profile_force_disable_vlm_var.set(bool(spec.get("force_disable_vlm"))) + if spec.get("has_param_lists"): + self._apply_param_lists_to_widgets(spec.get("param_lists") or {}) + else: + self._load_params() + self._select_manual_mask_template_from_setting( + spec.get("preferred_manual_mask_template") or "" + ) + self._on_profile_editor_change() + self._refresh_profile_kind_label() + self._refresh_profile_description() + self._refresh_manual_mask_hint() + self._refresh_profile_capture_summary() + + def _apply_processing_profile_gui_state(self): + force_disable_vlm = bool(self.profile_force_disable_vlm_var.get()) + if not hasattr(self, "_vlm_check"): + return + if force_disable_vlm: + self.use_vlm.set(False) + self._vlm_available = False + self._vlm_check.configure(state=tk.DISABLED) + if hasattr(self, "_vlm_status_lbl"): + self._vlm_status_lbl.configure(text="Désactivé par profil", fg=CLR_TEXT_SECONDARY) + else: + self._vlm_check.configure(state=tk.NORMAL) + if hasattr(self, "_vlm_status_lbl") and self._vlm_status_lbl.cget("text") == "Désactivé par profil": + self._vlm_status_lbl.configure(text="", fg=CLR_TEXT_SECONDARY) + self._refresh_manual_mask_hint() + + def _selected_manual_mask_template_path(self) -> Optional[Path]: + return self._manual_mask_templates.get(self.manual_mask_template_var.get()) + + def _refresh_manual_mask_templates(self): + selected_path = self._selected_manual_mask_template_path() + templates = list_mask_templates(_exe_dir()) + options: Dict[str, Optional[Path]] = {MANUAL_MASK_NONE_LABEL: None} + for path in templates: + options[mask_template_label(path, _exe_dir())] = path + self._manual_mask_templates = options + labels = list(options.keys()) + self._manual_mask_combo.configure(values=labels) + if hasattr(self, "_profile_manual_mask_combo"): + self._profile_manual_mask_combo.configure(values=labels) + + selected_label = MANUAL_MASK_NONE_LABEL + if selected_path is not None: + for label, path in options.items(): + if path == selected_path: + selected_label = label + break + self.manual_mask_template_var.set(selected_label) + self._refresh_manual_mask_hint() + self._refresh_profile_capture_summary() + + def _refresh_manual_mask_hint(self): + selected = self._selected_manual_mask_template_path() + manual_mask_required = bool(self.profile_require_manual_mask_var.get()) + if selected is None: + if manual_mask_required: + text = ( + "Le profil sélectionné exige un masque manuel. " + "Choisissez un modèle avant de lancer le traitement." + ) + elif len(self._manual_mask_templates) == 1: + text = ( + "Aucun modèle enregistré. Crée un masque avec l'éditeur PDF, " + "puis clique sur « Actualiser les modèles »." + ) + else: + text = "Aucun masque manuel sélectionné pour ce lancement." + else: + text = ( + f"Masque sélectionné : {selected.name}. " + "Il sera appliqué à tous les PDF du lot avant l'anonymisation automatique." + ) + self._manual_mask_hint.configure(text=text) + self._refresh_profile_capture_summary() + + def _load_manual_mask_template(self, path: Path): + if load_template_yaml is None or Template is None: + raise RuntimeError("bibliothèque de templates PDF indisponible") + if path.suffix.lower() in (".yml", ".yaml"): + return load_template_yaml(path) + return Template.from_dict(json.loads(path.read_text(encoding="utf-8"))) + + def _open_manual_mask_templates_dir(self): + open_folder(self._manual_mask_templates_dir()) + + def _open_manual_mask_designer(self): + if MaskDesignerApp is None: + messagebox.showerror( + "Masques PDF", + "L'éditeur de masques PDF n'a pas pu être chargé.\n" + "Vérifiez que PyMuPDF, Pillow et PyYAML sont disponibles.", + ) + return + + initial_pdf = resolve_manual_mask_pdf(getattr(self, "_single_file", None)) + win = tk.Toplevel(self.root) + if initial_pdf is None: + message = ( + "L'éditeur s'ouvre sans PDF préchargé.\n\n" + "Astuce : choisissez d'abord un fichier PDF dans l'onglet " + "Anonymisation pour l'ouvrir automatiquement ici." + ) + self.status_var.set("Éditeur de masques PDF ouvert.") + messagebox.showinfo("Masques PDF", message) + else: + self.status_var.set(f"Éditeur de masques PDF ouvert pour {initial_pdf.name}.") + + MaskDesignerApp( + win, + initial_pdf=initial_pdf, + templates_dir=self._manual_mask_templates_dir(), + ) + # --------------------------------------------------------------- # Aide # --------------------------------------------------------------- @@ -1316,14 +2325,17 @@ class App: "Un PDF Image (raster) est généré pour chaque fichier :\n" "chaque page devient une image avec les données masquées.\n" "Sécurité maximale, aucun texte résiduel.\n\n" - "Les résultats sont regroupés à plat dans le dossier\n" - "« anonymise/ » à la racine du dossier sélectionné.", + "Les résultats sont écrits dans le dossier\n" + "« anonymise/ » à la racine du dossier sélectionné,\n" + "en conservant l'arborescence des sous-dossiers source.\n\n" + "Le sous-dossier « anonymise/ » est ignoré en entrée\n" + "pour éviter de retraiter d'anciennes sorties.", ) # --------------------------------------------------------------- # Paramètres avancés (whitelist/blacklist) # --------------------------------------------------------------- - def _build_phrase_list(self, parent, title: str, placeholder: str, color_tag: str): + def _build_phrase_list(self, parent, title: str, placeholder: str, color_tag: str, on_change=None): """Construit un widget liste + ajout/suppression pour les phrases.""" frame = tk.Frame(parent, bg=CLR_BG) frame.pack(fill=tk.X, pady=(4, 8)) @@ -1362,6 +2374,8 @@ class App: items = list(listbox.get(0, tk.END)) if text not in items: listbox.insert(tk.END, text) + if on_change: + on_change() entry.delete(0, tk.END) add_btn = tk.Button( @@ -1389,8 +2403,12 @@ class App: # Bouton supprimer def _remove(): sel = listbox.curselection() + removed = False for idx in reversed(sel): listbox.delete(idx) + removed = True + if removed and on_change: + on_change() rm_btn = tk.Button( frame, text="Supprimer la sélection", font=self._f_small, @@ -1401,42 +2419,62 @@ class App: return listbox, entry + def _refresh_params_summary(self): + wl_count = self._wl_listbox.size() + bl_count = self._bl_listbox.size() + sw_count = self._sw_listbox.size() + self._params_summary.configure( + text=( + f"Listes visibles chargées : {wl_count} préservations, " + f"{bl_count} masquages forcés, {sw_count} stop-word additionnel." + ) + ) + self._refresh_profile_capture_summary() + def _load_params(self): """Charge les whitelist/blacklist depuis la config YAML.""" try: cfg_path = Path(self.cfg_path.get()) - if cfg_path.exists() and yaml is not None: - data = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {} - # Whitelist - wl = data.get("whitelist_phrases", []) + if cfg_path.exists(): + param_lists = load_effective_param_lists(cfg_path) self._wl_listbox.delete(0, tk.END) - for phrase in wl: - if phrase and phrase.strip(): - self._wl_listbox.insert(tk.END, phrase.strip()) - # Blacklist - bl = data.get("blacklist", {}).get("force_mask_terms", []) + for phrase in param_lists["whitelist_phrases"]: + self._wl_listbox.insert(tk.END, phrase) self._bl_listbox.delete(0, tk.END) - for term in bl: - if term and str(term).strip(): - self._bl_listbox.insert(tk.END, str(term).strip()) - # Stop-words additionnels - sw = data.get("additional_stopwords", []) + for term in param_lists["blacklist_force_mask_terms"]: + self._bl_listbox.insert(tk.END, term) self._sw_listbox.delete(0, tk.END) - for term in sw: - if term and str(term).strip(): - self._sw_listbox.insert(tk.END, str(term).strip()) + for term in param_lists["additional_stopwords"]: + self._sw_listbox.insert(tk.END, term) + self._refresh_params_summary() except Exception: pass - def _export_params(self): - """Exporte les paramètres whitelist/blacklist dans un fichier JSON pour envoi par email.""" + def _listbox_values(self, listbox: tk.Listbox) -> List[str]: + return list(listbox.get(0, tk.END)) + + def _copy_param_listboxes( + self, + source_wl: tk.Listbox, + source_bl: tk.Listbox, + source_sw: tk.Listbox, + target_wl: tk.Listbox, + target_bl: tk.Listbox, + target_sw: tk.Listbox, + ): + self._set_listbox_values(target_wl, self._listbox_values(source_wl)) + self._set_listbox_values(target_bl, self._listbox_values(source_bl)) + self._set_listbox_values(target_sw, self._listbox_values(source_sw)) + + def _export_param_listboxes(self, wl_listbox: tk.Listbox, bl_listbox: tk.Listbox, sw_listbox: tk.Listbox): + """Exporte les paramètres visibles dans un fichier JSON pour envoi ou sauvegarde locale.""" try: import json as _json from datetime import datetime - wl = list(self._wl_listbox.get(0, tk.END)) - bl = list(self._bl_listbox.get(0, tk.END)) - sw = list(self._sw_listbox.get(0, tk.END)) + wl = self._listbox_values(wl_listbox) + bl = self._listbox_values(bl_listbox) + sw = self._listbox_values(sw_listbox) export_data = { "version": APP_VERSION, @@ -1479,7 +2517,10 @@ class App: except Exception as e: messagebox.showerror("Erreur", f"Erreur à l'export :\n{e}") - def _import_params(self): + def _export_params(self): + self._export_param_listboxes(self._wl_listbox, self._bl_listbox, self._sw_listbox) + + def _import_param_listboxes(self, wl_listbox: tk.Listbox, bl_listbox: tk.Listbox, sw_listbox: tk.Listbox): """Importe des paramètres depuis un fichier JSON (fusionne avec l'existant).""" try: import json as _json @@ -1495,29 +2536,29 @@ class App: # Fusionner whitelist new_wl = data.get("whitelist_phrases", []) - existing_wl = set(self._wl_listbox.get(0, tk.END)) + existing_wl = set(wl_listbox.get(0, tk.END)) added_wl = 0 for phrase in new_wl: if phrase and phrase.strip() and phrase.strip() not in existing_wl: - self._wl_listbox.insert(tk.END, phrase.strip()) + wl_listbox.insert(tk.END, phrase.strip()) added_wl += 1 # Fusionner blacklist new_bl = data.get("blacklist_force_mask_terms", []) - existing_bl = set(self._bl_listbox.get(0, tk.END)) + existing_bl = set(bl_listbox.get(0, tk.END)) added_bl = 0 for term in new_bl: if term and str(term).strip() and str(term).strip() not in existing_bl: - self._bl_listbox.insert(tk.END, str(term).strip()) + bl_listbox.insert(tk.END, str(term).strip()) added_bl += 1 # Fusionner stop-words additionnels new_sw = data.get("additional_stopwords", []) - existing_sw = set(self._sw_listbox.get(0, tk.END)) + existing_sw = set(sw_listbox.get(0, tk.END)) added_sw = 0 for term in new_sw: if term and str(term).strip() and str(term).strip() not in existing_sw: - self._sw_listbox.insert(tk.END, str(term).strip()) + sw_listbox.insert(tk.END, str(term).strip()) added_sw += 1 version = data.get("version", "?") @@ -1533,8 +2574,12 @@ class App: except Exception as e: messagebox.showerror("Erreur", f"Erreur à l'import :\n{e}") - def _save_params(self): - """Sauvegarde les whitelist/blacklist dans la config YAML.""" + def _import_params(self): + self._import_param_listboxes(self._wl_listbox, self._bl_listbox, self._sw_listbox) + self._refresh_params_summary() + + def _save_param_listboxes(self, wl_listbox: tk.Listbox, bl_listbox: tk.Listbox, sw_listbox: tk.Listbox): + """Sauvegarde les listes visibles dans la config YAML générale.""" try: cfg_path = Path(self.cfg_path.get()) if not cfg_path.exists() or yaml is None: @@ -1544,15 +2589,15 @@ class App: data = yaml.safe_load(cfg_path.read_text(encoding="utf-8")) or {} # Whitelist phrases - data["whitelist_phrases"] = list(self._wl_listbox.get(0, tk.END)) + data["whitelist_phrases"] = self._listbox_values(wl_listbox) # Blacklist terms if "blacklist" not in data: data["blacklist"] = {} - data["blacklist"]["force_mask_terms"] = list(self._bl_listbox.get(0, tk.END)) + data["blacklist"]["force_mask_terms"] = self._listbox_values(bl_listbox) # Stop-words additionnels (mots à ne jamais identifier comme noms) - data["additional_stopwords"] = list(self._sw_listbox.get(0, tk.END)) + data["additional_stopwords"] = self._listbox_values(sw_listbox) cfg_path.write_text( yaml.dump(data, allow_unicode=True, default_flow_style=False, sort_keys=False), @@ -1562,6 +2607,10 @@ class App: except Exception as e: messagebox.showerror("Erreur", f"Impossible de sauvegarder :\n{e}") + def _save_params(self): + self._save_param_listboxes(self._wl_listbox, self._bl_listbox, self._sw_listbox) + self._refresh_params_summary() + # --------------------------------------------------------------- # YAML (interne) # --------------------------------------------------------------- @@ -1572,13 +2621,9 @@ class App: p.write_text(RUNTIME_CFG_TEXT, encoding="utf-8") def _load_cfg(self): - if yaml is None: - return self._ensure_cfg_exists() try: - self.cfg_data = yaml.safe_load( - Path(self.cfg_path.get()).read_text(encoding="utf-8") - ) or {} + self.cfg_data = load_effective_dictionaries_dict(Path(self.cfg_path.get())) except Exception: pass @@ -1614,7 +2659,7 @@ class App: "chcb": re.compile(r"\bCHCB\b", re.IGNORECASE), } - for txt_file in output_dir.glob("*.pseudonymise.txt"): + for txt_file in iter_pseudonymized_texts(output_dir): try: with open(txt_file, 'r', encoding='utf-8') as f: content = f.read() @@ -1674,36 +2719,59 @@ class App: # --------------------------------------------------------------- def _auto_load_ner(self): """Charge le modèle NER par défaut en arrière-plan. - Priorité : EDS-Pseudo (meilleur sur données cliniques) → DistilCamemBERT-NER (fallback). + Priorité : EDS-Pseudo → CamemBERT-bio local → DistilCamemBERT-NER legacy. """ - if not self._eds_manager and not self._onnx_manager: + if not self._eds_manager and not self._camembert_manager and not self._onnx_manager: return self.status_var.set("Chargement du modèle NER...") threading.Thread(target=self._auto_load_ner_worker, daemon=True).start() def _auto_load_ner_worker(self): + camembert_loaded = False + # 1) Essayer EDS-Pseudo en priorité (F1=97.4% sur données cliniques) if self._eds_manager: try: self._eds_manager.load("AP-HP/eds-pseudo-public") self._active_manager = self._eds_manager self.use_hf = True - self.status_var.set("Prêt — EDS-Pseudo actif.") + if self._camembert_manager: + try: + self._camembert_manager.load() + camembert_loaded = True + except Exception as cam_err: + import logging + logging.getLogger(__name__).info("CamemBERT-bio local indisponible : %s", cam_err) + suffix = " + CamemBERT-bio local" if camembert_loaded else "" + self.status_var.set(f"Prêt — EDS-Pseudo actif{suffix}.") return except Exception as e: import logging logging.getLogger(__name__).info("EDS-Pseudo indisponible, fallback ONNX : %s", e) - # 2) Fallback : DistilCamemBERT-NER ONNX + # 2) Fallback local embarqué : CamemBERT-bio ONNX. + # Il est utilisé par le core comme signal NER-first séparé, pas comme + # ner_manager HuggingFace legacy. + if self._camembert_manager: + try: + self._camembert_manager.load() + self.use_hf = False + self.status_var.set("Prêt — CamemBERT-bio local actif.") + return + except Exception as cam_err: + import logging + logging.getLogger(__name__).info("CamemBERT-bio local indisponible : %s", cam_err) + + # 3) Fallback legacy : DistilCamemBERT-NER via optimum.onnxruntime. if self._onnx_manager: try: self._onnx_manager.load("cmarkea/distilcamembert-base-ner") self._active_manager = self._onnx_manager self.use_hf = True - self.status_var.set("Prêt — NER ONNX actif.") + self.status_var.set("Prêt — NER ONNX legacy actif.") return except Exception as e2: - self.status_var.set(f"Prêt (NER indisponible : {e2})") + self.status_var.set(f"Prêt (NER legacy indisponible : {e2})") return self.status_var.set("Prêt (aucun backend NER disponible).") @@ -1770,6 +2838,8 @@ class App: self._onnx_manager.unload() if self._eds_manager: self._eds_manager.unload() + if self._camembert_manager: + self._camembert_manager.unload() self._active_manager = None self.use_hf = False diff --git a/anonymisation_onefile.spec b/anonymisation_onefile.spec index edfa6a5..c903e4f 100644 --- a/anonymisation_onefile.spec +++ b/anonymisation_onefile.spec @@ -1,90 +1,128 @@ import os -block_cipher = None -app_dir = 'C:\\Users\\dom\\ai\\anonymisation' +from pathlib import Path -datas = [ - (os.path.join(app_dir, 'config'), 'config'), - (os.path.join(app_dir, 'data', 'bdpm'), os.path.join('data', 'bdpm')), - (os.path.join(app_dir, 'data', 'finess'), os.path.join('data', 'finess')), - (os.path.join(app_dir, 'data', 'insee'), os.path.join('data', 'insee')), - (os.path.join(app_dir, 'models', 'camembert-bio-deid', 'onnx'), os.path.join('models', 'camembert-bio-deid', 'onnx')), - (os.path.join(app_dir, 'detectors'), 'detectors'), - (os.path.join(app_dir, 'scripts'), 'scripts'), - # Assets UI : logo (header + splash), icônes fenêtre, splash image. - # Le launcher et la GUI y accèdent via _asset(name) qui résout sous - # sys._MEIPASS/assets en mode frozen. - (os.path.join(app_dir, 'assets'), 'assets'), -] -# Fichiers directs dans data/ — IMPÉRATIF pour fonctionnement correct du core. -# Sans eux : stop-words/villes/DPI labels/companion blacklist sont des sets vides, -# ce qui dégrade la qualité d'anonymisation et peut masquer/laisser passer des faux-positifs. -for data_file in [ - 'stopwords_manuels.txt', - 'villes_blacklist.txt', - 'dpi_labels_blacklist.txt', - 'companion_blacklist.txt', + +block_cipher = None + +project_dir = Path(globals().get("SPECPATH", os.getcwd())).resolve() + + +def _data_entry(relative_path: str, target_dir: str | None = None): + src = project_dir / relative_path + if not src.exists(): + return None + return (str(src), target_dir or relative_path) + + +datas = [] +for relative_path, target_dir in [ + ("config", "config"), + ("data/bdpm", "data/bdpm"), + ("data/finess", "data/finess"), + ("data/insee", "data/insee"), + ("models/camembert-bio-deid/onnx", "models/camembert-bio-deid/onnx"), + ("detectors", "detectors"), + ("scripts", "scripts"), + ("assets", "assets"), ]: - src = os.path.join(app_dir, 'data', data_file) - if os.path.exists(src): - datas.append((src, 'data')) -for pyfile in ['anonymizer_core_refactored_onnx.py', 'eds_pseudo_manager.py', - 'gliner_manager.py', 'camembert_ner_manager.py', - 'Pseudonymisation_Gui_V5.py', 'build_info.py']: - datas.append((os.path.join(app_dir, pyfile), '.')) + entry = _data_entry(relative_path, target_dir) + if entry is not None: + datas.append(entry) + +# Fichiers directs sous data/ requis par le core. +for relative_path in [ + "data/stopwords_manuels.txt", + "data/villes_blacklist.txt", + "data/dpi_labels_blacklist.txt", + "data/companion_blacklist.txt", +]: + entry = _data_entry(relative_path, "data") + if entry is not None: + datas.append(entry) + + +hiddenimports = [ + "Pseudonymisation_Gui_V5", + "anonymizer_core_refactored_onnx", + "admin_rules", + "config_defaults", + "profile_defaults", + "gui_batch_paths", + "manual_masking", + "pdf_mask_designer", + "format_converter", + "ner_manager_onnx", + "camembert_ner_manager", + "eds_pseudo_manager", + "gliner_manager", + "vlm_manager", + "build_info", + "doctr", + "doctr.io", + "doctr.models", + "doctr.models.detection", + "doctr.models.recognition", + "cv2", + "torchvision", + "edsnlp", + "edsnlp.pipes", + "edsnlp.pipes.ner", + "edsnlp.pipes.ner.pseudo", + "spacy", + "spacy.lang.fr", + "gliner", + "onnxruntime", + "transformers", + "tokenizers", + "torch", + "pdfplumber", + "fitz", + "PIL", + "yaml", + "loguru", + "regex", + "optimum", + "optimum.onnxruntime", + "optimum.pipelines", + "optimum.modeling_base", + "optimum.exporters.onnx", +] + a = Analysis( - [os.path.join(app_dir, 'launcher.py')], - pathex=[app_dir], + [str(project_dir / "launcher.py")], + pathex=[str(project_dir)], datas=datas, - hiddenimports=[ - 'anonymizer_core_refactored_onnx', 'eds_pseudo_manager', - 'gliner_manager', 'camembert_ner_manager', 'Pseudonymisation_Gui_V5', - 'edsnlp', 'edsnlp.pipes', 'edsnlp.pipes.ner', 'edsnlp.pipes.ner.pseudo', - 'spacy', 'spacy.lang.fr', 'gliner', 'onnxruntime', - 'transformers', 'tokenizers', 'torch', 'pdfplumber', - 'ahocorasick', 'sklearn', 'scipy', 'pydantic', 'yaml', 'PIL', - 'loguru', 'regex', - # optimum : utilisé par ner_manager_onnx.py (fallback NER legacy). - # Sans ça, la GUI affiche "NER indisponible : optimum.onnxruntime introuvable" - # si EDS-Pseudo échoue. Le pipeline principal (CamemBERT-bio ONNX + - # EDS-Pseudo + GLiNER) n'en dépend pas — mais l'absence du hiddenimport - # crée un message d'erreur cosmétique gênant. - 'optimum', 'optimum.onnxruntime', 'optimum.pipelines', - 'optimum.modeling_base', 'optimum.exporters.onnx', - ], + hiddenimports=hiddenimports, cipher=block_cipher, noarchive=False, ) pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) -# Splash natif PyInstaller : image affichée AU LANCEMENT DE L'EXE, -# avant même que Python démarre. Couvre les ~15-30 s de décompression -# du bundle --onefile dans %TEMP% qui laissaient l'écran vide auparavant. -# Le launcher ferme le splash via pyi_splash.close() une fois la GUI prête. splash = Splash( - os.path.join(app_dir, 'assets', 'splash.png'), + str(project_dir / "assets" / "splash.png"), binaries=a.binaries, datas=a.datas, - # Texte dynamique PyInstaller positionné dans la zone libre du PNG - # (y=170-235). text_pos correspond au coin haut-gauche du texte. text_pos=(60, 195), text_size=10, - text_color='white', + text_color="white", minify_script=True, always_on_top=False, ) exe = EXE( - pyz, a.scripts, - splash, # image affichée immédiatement - splash.binaries, # bootloader splash - a.binaries, a.zipfiles, a.datas, [], - name='Anonymisation', + pyz, + a.scripts, + splash, + splash.binaries, + a.binaries, + a.zipfiles, + a.datas, + [], + name="Anonymisation", debug=False, strip=False, upx=False, console=False, - # Icône du fichier .exe visible dans l'Explorateur Windows et la taskbar - # (dérivée du logo aivanonym, multi-résolution 16→256 dans le .ico). - icon=os.path.join(app_dir, 'assets', 'icons', 'app.ico'), + icon=str(project_dir / "assets" / "icons" / "app.ico"), ) diff --git a/config_defaults.py b/config_defaults.py index 2762c95..58877ca 100644 --- a/config_defaults.py +++ b/config_defaults.py @@ -153,6 +153,29 @@ def load_effective_dictionaries_dict(path: Path | None = None) -> Dict[str, Any] ) +def _normalize_string_list(values: Any) -> list[str]: + if not isinstance(values, list): + return [] + normalized: list[str] = [] + for value in values: + text = str(value).strip() + if text: + normalized.append(text) + return normalized + + +def load_effective_param_lists(path: Path | None = None) -> Dict[str, list[str]]: + """Return the effective parameter lists shown in the GUI.""" + data = load_effective_dictionaries_dict(path) + return { + "whitelist_phrases": _normalize_string_list(data.get("whitelist_phrases", [])), + "blacklist_force_mask_terms": _normalize_string_list( + data.get("blacklist", {}).get("force_mask_terms", []) + ), + "additional_stopwords": _normalize_string_list(data.get("additional_stopwords", [])), + } + + def deep_merge_dict(base: Dict[str, Any], override: Dict[str, Any]) -> Dict[str, Any]: merged = deepcopy(base) for key, value in (override or {}).items(): diff --git a/launcher.py b/launcher.py index 13280e3..d9c7c62 100644 --- a/launcher.py +++ b/launcher.py @@ -8,6 +8,8 @@ from tkinter import ttk, messagebox from pathlib import Path import threading import logging +import contextlib +import time # pyi_splash : module injecté par PyInstaller quand --splash est utilisé. # Permet d'actualiser / fermer le splash natif affiché au démarrage de l'exe @@ -38,6 +40,216 @@ def _splash_close() -> None: except Exception: pass + +class BrandedSplash: + """Splash applicatif avec le visuel existant + progression détaillée. + + PyInstaller affiche d'abord le splash natif pendant l'extraction du onefile. + Dès que Python est démarré, cette fenêtre prend le relais pour montrer des + étapes lisibles et un petit journal de chargement. + """ + + def __init__(self, total_steps: int = 6): + self.total_steps = max(total_steps, 1) + self.current_step = 0 + self.enabled = False + self.root = None + self.status_var = None + self.progress = None + self.log_box = None + self._image = None + self._lines = [] + + try: + self.root = tk.Tk() + self.root.withdraw() + self.root.title("aivanonym") + self.root.resizable(False, False) + self.root.overrideredirect(True) + self.root.configure(bg="white") + + container = tk.Frame( + self.root, + bg="white", + highlightthickness=1, + highlightbackground="#d8d8d8", + ) + container.pack(fill="both", expand=True) + + splash_path = APP_DIR / "assets" / "splash.png" + if splash_path.exists(): + self._image = tk.PhotoImage(file=str(splash_path)) + tk.Label(container, image=self._image, bg="white", bd=0).pack() + else: + fallback = tk.Frame(container, bg="white", width=500, height=170) + fallback.pack_propagate(False) + fallback.pack() + tk.Frame(fallback, bg="#cc0000", height=4).pack(fill="x") + tk.Label( + fallback, + text="aivanonym", + bg="white", + fg="#222222", + font=("Segoe UI", 28), + ).pack(expand=True) + + body = tk.Frame(container, bg="white", padx=24, pady=14) + body.pack(fill="x") + + self.status_var = tk.StringVar(value="Initialisation...") + tk.Label( + body, + textvariable=self.status_var, + bg="white", + fg="#222222", + font=("Segoe UI", 10, "bold"), + anchor="w", + ).pack(fill="x") + + self.progress = ttk.Progressbar( + body, + mode="determinate", + maximum=self.total_steps, + length=452, + ) + self.progress.pack(fill="x", pady=(8, 10)) + + tk.Label( + body, + text="Chargements en cours", + bg="white", + fg="#666666", + font=("Segoe UI", 8), + anchor="w", + ).pack(fill="x") + self.log_box = tk.Listbox( + body, + height=5, + activestyle="none", + bg="#f7f7f7", + fg="#333333", + bd=0, + highlightthickness=1, + highlightbackground="#e7e7e7", + font=("Consolas", 8), + ) + self.log_box.pack(fill="x", pady=(4, 0)) + + self._center() + self.root.deiconify() + self.root.lift() + self.root.update_idletasks() + self.root.update() + self.enabled = True + + # Le splash natif PyInstaller n'a qu'une ligne de texte. Une fois + # cette fenêtre prête, elle prend le relais sans changer le visuel. + _splash_close() + except Exception as exc: + try: + if self.root is not None: + self.root.destroy() + except Exception: + pass + self.root = None + log.warning(f"Branded splash unavailable: {exc}") + + def _center(self) -> None: + if self.root is None: + return + self.root.update_idletasks() + width = self.root.winfo_reqwidth() + height = self.root.winfo_reqheight() + screen_width = self.root.winfo_screenwidth() + screen_height = self.root.winfo_screenheight() + x = max(0, int((screen_width - width) / 2)) + y = max(0, int((screen_height - height) / 2)) + self.root.geometry(f"{width}x{height}+{x}+{y}") + + def step(self, message: str) -> None: + self.current_step = min(self.current_step + 1, self.total_steps) + status = f"[{self.current_step}/{self.total_steps}] {message}" + self.message(status) + if self.progress is not None: + self.progress["value"] = self.current_step + self._pump() + + def message(self, message: str) -> None: + _splash_update(message) + if self.enabled and self.status_var is not None: + self.status_var.set(message) + self._pump() + + def detail(self, message: str) -> None: + _splash_update(message) + clean = " ".join(str(message).split()) + if not clean: + return + if len(clean) > 150: + clean = clean[:147] + "..." + if self.enabled and self.log_box is not None: + self._lines.append(clean) + self._lines = self._lines[-7:] + self.log_box.delete(0, tk.END) + for line in self._lines: + self.log_box.insert(tk.END, line) + self.log_box.see(tk.END) + self._pump() + + def close(self) -> None: + _splash_close() + if self.root is not None: + try: + self.root.destroy() + except Exception: + pass + self.root = None + self.enabled = False + + def _pump(self) -> None: + if self.root is None: + return + try: + self.root.update_idletasks() + self.root.update() + except Exception: + self.enabled = False + + +class ModelProgressStream: + """Redirige les sorties type tqdm vers une callback UI.""" + + def __init__(self, callback, prefix: str): + self.callback = callback + self.prefix = prefix + self.buffer = "" + self.last_line = "" + self.last_emit = 0.0 + + def write(self, data) -> int: + text = str(data) + self.buffer += text.replace("\r", "\n") + while "\n" in self.buffer: + line, self.buffer = self.buffer.split("\n", 1) + self._emit(line) + return len(text) + + def flush(self) -> None: + if self.buffer: + self._emit(self.buffer) + self.buffer = "" + + def _emit(self, line: str) -> None: + clean = " ".join(line.split()) + if len(clean) < 3: + return + now = time.monotonic() + if clean == self.last_line and now - self.last_emit < 1.0: + return + self.last_line = clean + self.last_emit = now + self.callback(f"{self.prefix} : {clean}") + # --------------------------------------------------------------------------- # Single-instance guard (lock file in user's temp directory) # --------------------------------------------------------------------------- @@ -105,23 +317,10 @@ def check_models_ready(): def launch_gui(): - """Launch the main GUI — étapes de chargement affichées DANS le splash natif. - - Le splash natif PyInstaller (image avec logo + texte dynamique) reste - visible pendant TOUTE la phase de chargement. On intercepte les log.info() - du core via un logging.Handler et on pousse chaque étape traduite dans - le splash natif via pyi_splash.update_text(). L'utilisateur voit défiler - sous le logo : - "Chargement des prénoms français (INSEE)…" - "Chargement des noms de famille (INSEE)…" - "Chargement des numéros FINESS…" - … - Puis le splash se ferme et la GUI s'ouvre — pas de fenêtre intermédiaire. - - En mode dev (pas frozen), pyi_splash n'existe pas ; on ajoute un - mini-splash tkinter temporaire pour voir le même rendu pendant le test. - """ + """Launch the main GUI with visible startup progress.""" log.info("Launching GUI...") + progress = BrandedSplash(total_steps=5) + progress.step("Préparation de l'environnement") # Traductions log.info() → libellés "prod" lisibles pour l'utilisateur. _LOG_TRANSLATIONS = [ @@ -158,7 +357,7 @@ def launch_gui(): class _SplashHandler(logging.Handler): def emit(self, record): try: - _splash_update(_translate(record.getMessage())) + progress.detail(_translate(record.getMessage())) except Exception: pass @@ -167,17 +366,24 @@ def launch_gui(): logging.getLogger().addHandler(_handler) # Afficher tout de suite un message initial sous le logo - _splash_update("Démarrage…") + progress.detail("Démarrage du moteur applicatif") # Import du core et de la GUI (synchrone : pas besoin de thread puisque # le splash natif tourne dans son propre processus bootloader). result = {"error": None} try: - _splash_update("Chargement des dictionnaires médicaux…") + progress.step("Chargement des dictionnaires médicaux") import anonymizer_core_refactored_onnx # noqa log.info("Core imported OK") + progress.step("Chargement du moteur d'anonymisation") import Pseudonymisation_Gui_V5 # noqa log.info("GUI module imported OK") + progress.step("Vérification des modèles locaux") + if check_models_ready(): + progress.detail("CamemBERT-bio ONNX local disponible") + else: + progress.detail("CamemBERT-bio ONNX non trouvé dans le bundle") + progress.step("Ouverture de l'interface") except Exception as e: result["error"] = f"{e}\n{traceback.format_exc()}" log.error(f"Import error: {result['error']}") @@ -188,8 +394,8 @@ def launch_gui(): except Exception: pass - # Fermer le splash natif maintenant que tout est prêt - _splash_close() + # Fermer le splash maintenant que tout est prêt + progress.close() if result["error"]: try: @@ -239,12 +445,19 @@ class SetupWindow: def __init__(self): self.root = tk.Tk() self.root.title("Anonymisation — Configuration initiale") - self.root.geometry("620x450") + self.root.geometry("660x700") self.root.resizable(False, False) + self._logo_image = None + self._log_lines = [] - frame = ttk.Frame(self.root, padding=20) + frame = ttk.Frame(self.root, padding=18) frame.pack(fill="both", expand=True) + splash_path = APP_DIR / "assets" / "splash.png" + if splash_path.exists(): + self._logo_image = tk.PhotoImage(file=str(splash_path)) + ttk.Label(frame, image=self._logo_image).pack(pady=(0, 8)) + ttk.Label(frame, text="Préparation des modèles d'intelligence artificielle", font=("", 13, "bold")).pack(pady=(0, 4)) ttk.Label( @@ -278,6 +491,22 @@ class SetupWindow: font=("", 8)).pack(side="left") self.step_labels[key] = icon + log_frame = ttk.LabelFrame(frame, text=" Détail du chargement ", padding=8) + log_frame.pack(fill="x", pady=(0, 12)) + self.log_text = tk.Text( + log_frame, + height=7, + wrap="word", + state="disabled", + bg="#f7f7f7", + fg="#333333", + bd=0, + padx=8, + pady=6, + font=("Consolas", 8), + ) + self.log_text.pack(fill="x") + # Bouton relance (caché au début) self.btn = ttk.Button(frame, text="Relancer", command=self.start_download) self.btn.pack(pady=6) @@ -321,43 +550,54 @@ class SetupWindow: try: # 1. EDS-Pseudo self._update("Téléchargement d'EDS-Pseudo… (modèle CamemBERT clinique)") + self._append_log("EDS-Pseudo : téléchargement/chargement du modèle AP-HP") self._set_step("eds_pseudo", "running") log.info("Downloading EDS-Pseudo...") try: from eds_pseudo_manager import EdsPseudoManager mgr = EdsPseudoManager() - mgr.load() + with self._capture_model_output("EDS-Pseudo"): + mgr.load() self._set_step("eds_pseudo", "ok") + self._append_log("EDS-Pseudo : modèle prêt") log.info("EDS-Pseudo OK") except Exception as e: self._set_step("eds_pseudo", "fail") + self._append_log(f"EDS-Pseudo : échec - {e}") failures.append(("EDS-Pseudo", str(e))) log.warning(f"EDS-Pseudo failed: {e}") self._advance() # 2. GLiNER self._update("Téléchargement de GLiNER… (détection zero-shot)") + self._append_log("GLiNER : téléchargement/chargement du modèle PII") self._set_step("gliner", "running") log.info("Downloading GLiNER...") try: from gliner_manager import GlinerManager mgr = GlinerManager() - mgr.load() + with self._capture_model_output("GLiNER"): + mgr.load() self._set_step("gliner", "ok") + self._append_log("GLiNER : modèle prêt") log.info("GLiNER OK") except Exception as e: self._set_step("gliner", "fail") + self._append_log(f"GLiNER : échec - {e}") failures.append(("GLiNER", str(e))) log.warning(f"GLiNER failed: {e}") self._advance() # 3. CamemBERT-bio ONNX self._update("Vérification CamemBERT-bio ONNX (modèle embarqué)…") + self._append_log("CamemBERT-bio ONNX : vérification du modèle embarqué") self._set_step("camembert_onnx", "running") if check_models_ready(): self._set_step("camembert_onnx", "ok") + self._append_log("CamemBERT-bio ONNX : modèle local présent") else: self._set_step("camembert_onnx", "fail") + self._append_log("CamemBERT-bio ONNX : fichier ONNX introuvable") failures.append(("CamemBERT-bio ONNX", "fichier ONNX introuvable dans le bundle")) log.error("CamemBERT-bio ONNX not found") self._advance() @@ -384,6 +624,31 @@ class SetupWindow: def _update(self, msg): self.root.after(0, lambda: self.status_var.set(msg)) + def _append_log(self, msg): + clean = " ".join(str(msg).split()) + if not clean: + return + if len(clean) > 180: + clean = clean[:177] + "..." + + def _apply(): + self._log_lines.append(clean) + self._log_lines = self._log_lines[-80:] + self.log_text.configure(state="normal") + self.log_text.delete("1.0", tk.END) + self.log_text.insert("end", "\n".join(self._log_lines)) + self.log_text.configure(state="disabled") + self.log_text.see("end") + + self.root.after(0, _apply) + + @contextlib.contextmanager + def _capture_model_output(self, label): + stream = ModelProgressStream(self._append_log, label) + with contextlib.redirect_stdout(stream), contextlib.redirect_stderr(stream): + yield + stream.flush() + def _finish(self): try: self.root.destroy() diff --git a/pdf_mask_designer.py b/pdf_mask_designer.py index 3fef48a..e25449a 100644 --- a/pdf_mask_designer.py +++ b/pdf_mask_designer.py @@ -17,6 +17,7 @@ Dépendances : PyMuPDF (pymupdf), Pillow, PyYAML """ from __future__ import annotations +import argparse import io import json import math @@ -31,7 +32,12 @@ from PIL import Image, ImageTk import fitz # PyMuPDF import yaml -APP_TITLE = "PDF Mask Designer (Standalone)" +from manual_masking import ( + DEFAULT_MASK_OUTPUT_DIRNAME, + DEFAULT_MASK_PREVIEW_DIRNAME, +) + +APP_TITLE = "Éditeur de masques PDF" TEMPLATE_VERSION = 1 # ----------------------------- Data structures ----------------------------- @@ -167,7 +173,16 @@ def apply_template_raster(pdf_in: Path, pdf_out: Path, tpl: Template, dpi: int, # ----------------------------- GUI ------------------------------ class MaskDesignerApp: - def __init__(self, root: tk.Tk): + def __init__( + self, + root: tk.Tk, + *, + initial_pdf: Optional[Path] = None, + initial_template: Optional[Path] = None, + templates_dir: Optional[Path] = None, + output_dir_name: str = DEFAULT_MASK_OUTPUT_DIRNAME, + preview_dir_name: str = DEFAULT_MASK_PREVIEW_DIRNAME, + ): self.root = root self.root.title(APP_TITLE) self.root.geometry("1280x900") @@ -181,11 +196,18 @@ class MaskDesignerApp: self.template_name = tk.StringVar(value="template_masks") self.status = tk.StringVar(value="Prêt.") self.raster_dpi = tk.IntVar(value=200) + self.templates_dir = templates_dir + self.output_dir_name = output_dir_name + self.preview_dir_name = preview_dir_name self.is_drawing = False self.start_xy: Optional[Tuple[int,int]] = None self._build_ui() + if initial_pdf: + self.open_pdf_path(initial_pdf) + if initial_template: + self.load_template_path(initial_template) # UI layout def _build_ui(self): @@ -228,14 +250,17 @@ class MaskDesignerApp: def open_pdf(self): path = filedialog.askopenfilename(filetypes=[("PDF", "*.pdf")]) if not path: return + self.open_pdf_path(Path(path)) + + def open_pdf_path(self, path: Path): try: - self.doc = fitz.open(path) + self.doc = fitz.open(str(path)) self.doc_path = Path(path) self.curr_page = 0 self.masks.clear() self.template_name.set(self.doc_path.stem + "_template") self.refresh() - self.status.set(f"PDF ouvert : {Path(path).name} — {len(self.doc)} page(s)") + self.status.set(f"PDF ouvert : {self.doc_path.name} — {len(self.doc)} page(s)") except Exception as e: messagebox.showerror("Erreur", f"Impossible d'ouvrir le PDF : {e}") @@ -244,7 +269,7 @@ class MaskDesignerApp: img = page_pix(self.doc, self.curr_page, self.zoom) # overlay current page masks rects = self.masks.get(self.curr_page, []) - img_o = draw_overlay(img, rects, 1.0, self.curr_page) + img_o = draw_overlay(img, rects, self.zoom, self.curr_page) self.curr_image = img_o self.tk_image = ImageTk.PhotoImage(img_o) self.canvas.delete("all") @@ -269,19 +294,25 @@ class MaskDesignerApp: def on_down(self, ev): if not self.doc: return self.is_drawing = True - self.start_xy = (ev.x, ev.y) - self._preview_rect = self.canvas.create_rectangle(ev.x, ev.y, ev.x, ev.y, outline="#000", width=2) + x = self.canvas.canvasx(ev.x) + y = self.canvas.canvasy(ev.y) + self.start_xy = (x, y) + self._preview_rect = self.canvas.create_rectangle(x, y, x, y, outline="#000", width=2) def on_drag(self, ev): if not self.doc or not self.is_drawing: return sx, sy = self.start_xy - self.canvas.coords(self._preview_rect, sx, sy, ev.x, ev.y) + x = self.canvas.canvasx(ev.x) + y = self.canvas.canvasy(ev.y) + self.canvas.coords(self._preview_rect, sx, sy, x, y) def on_up(self, ev): if not self.doc or not self.is_drawing: return self.is_drawing = False sx, sy = self.start_xy - x0, y0, x1, y1 = rect_norm(sx, sy, ev.x, ev.y) + x = self.canvas.canvasx(ev.x) + y = self.canvas.canvasy(ev.y) + x0, y0, x1, y1 = rect_norm(sx, sy, x, y) # convert screen px to PDF points page = self.doc[self.curr_page] # we rendered with zoom, but here current image is at display resolution (zoom applied in page_pix) @@ -311,9 +342,12 @@ class MaskDesignerApp: tpl = self._current_template() except Exception as e: messagebox.showwarning("Info", str(e)); return - path = filedialog.asksaveasfilename(defaultextension=".yml", - filetypes=[("YAML", "*.yml *.yaml"), ("JSON", "*.json")], - initialfile=f"{tpl.name}.yml") + path = filedialog.asksaveasfilename( + defaultextension=".yml", + filetypes=[("YAML", "*.yml *.yaml"), ("JSON", "*.json")], + initialdir=str(self._template_initialdir()), + initialfile=f"{tpl.name}.yml", + ) if not path: return p = Path(path) try: @@ -326,8 +360,14 @@ class MaskDesignerApp: messagebox.showerror("Erreur", f"Impossible d'écrire le template : {e}") def load_template(self): - path = filedialog.askopenfilename(filetypes=[("YAML/JSON", "*.yml *.yaml *.json")]) + path = filedialog.askopenfilename( + filetypes=[("YAML/JSON", "*.yml *.yaml *.json")], + initialdir=str(self._template_initialdir()), + ) if not path: return + self.load_template_path(Path(path)) + + def load_template_path(self, path: Path): p = Path(path) try: if p.suffix.lower() in (".yml", ".yaml"): @@ -351,6 +391,14 @@ class MaskDesignerApp: self.refresh() self.status.set(f"Masques de la page {self.curr_page+1} supprimés.") + def _template_initialdir(self) -> Path: + if self.templates_dir is not None: + self.templates_dir.mkdir(parents=True, exist_ok=True) + return self.templates_dir + if self.doc_path is not None: + return self.doc_path.parent + return Path.cwd() + # Preview / Apply def _build_template_from_state(self) -> Optional[Template]: if not self.doc: @@ -365,7 +413,7 @@ class MaskDesignerApp: if not samp: return for i, s in enumerate(samp[:2], start=1): pdf_in = Path(s) - out_dir = pdf_in.parent / "masked_preview" + out_dir = pdf_in.parent / self.preview_dir_name out_dir.mkdir(exist_ok=True) pdf_out = out_dir / f"{pdf_in.stem}.preview_vector.pdf" audit = out_dir / f"{pdf_in.stem}.audit.jsonl" @@ -373,7 +421,10 @@ class MaskDesignerApp: apply_template_vector(pdf_in, pdf_out, tpl, audit) except Exception as e: messagebox.showerror("Erreur", f"Prévisualisation vectorielle échouée sur {pdf_in.name} : {e}") - messagebox.showinfo("Prévisualisation", "Terminé (vectoriel). Ouvrez le dossier 'masked_preview'.") + messagebox.showinfo( + "Prévisualisation", + f"Terminé (vectoriel). Ouvrez le dossier '{self.preview_dir_name}'.", + ) def preview_raster(self): tpl = self._build_template_from_state() @@ -383,7 +434,7 @@ class MaskDesignerApp: dpi = int(self.raster_dpi.get()) for i, s in enumerate(samp[:2], start=1): pdf_in = Path(s) - out_dir = pdf_in.parent / "masked_preview" + out_dir = pdf_in.parent / self.preview_dir_name out_dir.mkdir(exist_ok=True) pdf_out = out_dir / f"{pdf_in.stem}.preview_raster.pdf" audit = out_dir / f"{pdf_in.stem}.audit.jsonl" @@ -391,7 +442,10 @@ class MaskDesignerApp: apply_template_raster(pdf_in, pdf_out, tpl, dpi, audit) except Exception as e: messagebox.showerror("Erreur", f"Prévisualisation raster échouée sur {pdf_in.name} : {e}") - messagebox.showinfo("Prévisualisation", "Terminé (raster). Ouvrez le dossier 'masked_preview'.") + messagebox.showinfo( + "Prévisualisation", + f"Terminé (raster). Ouvrez le dossier '{self.preview_dir_name}'.", + ) def apply_vector_batch(self): tpl = self._build_template_from_state() @@ -400,7 +454,7 @@ class MaskDesignerApp: if not files: return for s in files: pdf_in = Path(s) - out_dir = pdf_in.parent / "masked" + out_dir = pdf_in.parent / self.output_dir_name out_dir.mkdir(exist_ok=True) pdf_out = out_dir / f"{pdf_in.stem}.masked_vector.pdf" audit = out_dir / f"{pdf_in.stem}.audit.jsonl" @@ -418,7 +472,7 @@ class MaskDesignerApp: dpi = int(self.raster_dpi.get()) for s in files: pdf_in = Path(s) - out_dir = pdf_in.parent / "masked" + out_dir = pdf_in.parent / self.output_dir_name out_dir.mkdir(exist_ok=True) pdf_out = out_dir / f"{pdf_in.stem}.masked_raster.pdf" audit = out_dir / f"{pdf_in.stem}.audit.jsonl" @@ -430,9 +484,27 @@ class MaskDesignerApp: # ----------------------------- Main ------------------------------ -def main(): +def build_arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Editeur de masques PDF reutilisables") + parser.add_argument("--pdf", type=Path, help="PDF de reference a ouvrir au demarrage") + parser.add_argument("--template", type=Path, help="Template YAML/JSON a charger au demarrage") + parser.add_argument("--templates-dir", type=Path, help="Dossier par defaut pour sauver/charger les templates") + parser.add_argument("--output-dir-name", default=DEFAULT_MASK_OUTPUT_DIRNAME, help="Nom du dossier de sortie pour l'application des masques") + parser.add_argument("--preview-dir-name", default=DEFAULT_MASK_PREVIEW_DIRNAME, help="Nom du dossier de sortie pour les previsualisations") + return parser + + +def main(argv: Optional[List[str]] = None): + args = build_arg_parser().parse_args(argv) root = tk.Tk() - app = MaskDesignerApp(root) + app = MaskDesignerApp( + root, + initial_pdf=args.pdf, + initial_template=args.template, + templates_dir=args.templates_dir, + output_dir_name=args.output_dir_name, + preview_dir_name=args.preview_dir_name, + ) root.mainloop() if __name__ == "__main__": diff --git a/tests/unit/test_config_externalization.py b/tests/unit/test_config_externalization.py index ed9045e..25f8615 100644 --- a/tests/unit/test_config_externalization.py +++ b/tests/unit/test_config_externalization.py @@ -9,6 +9,7 @@ from config_defaults import ( deep_merge_dict, ensure_runtime_dictionaries_config, load_effective_dictionaries_dict, + load_effective_param_lists, read_default_dictionaries_text, read_runtime_dictionaries_overlay_text, ) @@ -90,3 +91,14 @@ def test_runtime_overlay_is_created_and_effective_merge_works(tmp_path: Path): effective = load_effective_dictionaries_dict(cfg_path) assert "CHCB" in effective["blacklist"]["force_mask_terms"] assert "LOCAL_SIGLE" in effective["blacklist"]["force_mask_terms"] + + +def test_effective_param_lists_include_defaults_when_overlay_is_empty(tmp_path: Path): + cfg_path = tmp_path / "dictionnaires.yml" + cfg_path.write_text("{}\n", encoding="utf-8") + + params = load_effective_param_lists(cfg_path) + + assert "classification internationale" in params["whitelist_phrases"] + assert "CHCB" in params["blacklist_force_mask_terms"] + assert params["additional_stopwords"] == []