Files
anonymisation/launcher.py

698 lines
25 KiB
Python

#!/usr/bin/env python3
"""Launcher Windows — single-instance + download models on first run + launch GUI."""
import os
import sys
import traceback
import tkinter as tk
from tkinter import ttk, messagebox
from pathlib import Path
import threading
import logging
import contextlib
import time
# pyi_splash : module injecté par PyInstaller quand --splash est utilisé.
# Permet d'actualiser / fermer le splash natif affiché au démarrage de l'exe
# pendant la décompression --onefile (~15-30 s sur Windows). En mode dev
# (pas frozen), le module n'existe pas → fallback silencieux.
try:
import pyi_splash # type: ignore
_HAS_PYI_SPLASH = True
except Exception:
pyi_splash = None
_HAS_PYI_SPLASH = False
def _splash_update(text: str) -> None:
"""Met à jour le texte affiché sous le splash natif PyInstaller (si actif)."""
if _HAS_PYI_SPLASH:
try:
pyi_splash.update_text(text)
except Exception:
pass
def _splash_close() -> None:
"""Ferme le splash natif PyInstaller (si actif)."""
if _HAS_PYI_SPLASH:
try:
pyi_splash.close()
except Exception:
pass
class BrandedSplash:
"""Splash applicatif avec le visuel existant + progression détaillée.
PyInstaller affiche d'abord le splash natif pendant l'extraction du onefile.
Dès que Python est démarré, cette fenêtre prend le relais pour montrer des
étapes lisibles et un petit journal de chargement.
"""
def __init__(self, total_steps: int = 6):
self.total_steps = max(total_steps, 1)
self.current_step = 0
self.enabled = False
self.root = None
self.status_var = None
self.progress = None
self.log_box = None
self._image = None
self._lines = []
try:
self.root = tk.Tk()
self.root.withdraw()
self.root.title("aivanonym")
self.root.resizable(False, False)
self.root.overrideredirect(True)
self.root.configure(bg="white")
container = tk.Frame(
self.root,
bg="white",
highlightthickness=1,
highlightbackground="#d8d8d8",
)
container.pack(fill="both", expand=True)
splash_path = APP_DIR / "assets" / "splash.png"
if splash_path.exists():
self._image = tk.PhotoImage(file=str(splash_path))
tk.Label(container, image=self._image, bg="white", bd=0).pack()
else:
fallback = tk.Frame(container, bg="white", width=500, height=170)
fallback.pack_propagate(False)
fallback.pack()
tk.Frame(fallback, bg="#cc0000", height=4).pack(fill="x")
tk.Label(
fallback,
text="aivanonym",
bg="white",
fg="#222222",
font=("Segoe UI", 28),
).pack(expand=True)
body = tk.Frame(container, bg="white", padx=24, pady=14)
body.pack(fill="x")
self.status_var = tk.StringVar(value="Initialisation...")
tk.Label(
body,
textvariable=self.status_var,
bg="white",
fg="#222222",
font=("Segoe UI", 10, "bold"),
anchor="w",
).pack(fill="x")
self.progress = ttk.Progressbar(
body,
mode="determinate",
maximum=self.total_steps,
length=452,
)
self.progress.pack(fill="x", pady=(8, 10))
tk.Label(
body,
text="Chargements en cours",
bg="white",
fg="#666666",
font=("Segoe UI", 8),
anchor="w",
).pack(fill="x")
self.log_box = tk.Listbox(
body,
height=5,
activestyle="none",
bg="#f7f7f7",
fg="#333333",
bd=0,
highlightthickness=1,
highlightbackground="#e7e7e7",
font=("Consolas", 8),
)
self.log_box.pack(fill="x", pady=(4, 0))
self._center()
self.root.deiconify()
self.root.lift()
self.root.update_idletasks()
self.root.update()
self.enabled = True
# Le splash natif PyInstaller n'a qu'une ligne de texte. Une fois
# cette fenêtre prête, elle prend le relais sans changer le visuel.
_splash_close()
except Exception as exc:
try:
if self.root is not None:
self.root.destroy()
except Exception:
pass
self.root = None
log.warning(f"Branded splash unavailable: {exc}")
def _center(self) -> None:
if self.root is None:
return
self.root.update_idletasks()
width = self.root.winfo_reqwidth()
height = self.root.winfo_reqheight()
screen_width = self.root.winfo_screenwidth()
screen_height = self.root.winfo_screenheight()
x = max(0, int((screen_width - width) / 2))
y = max(0, int((screen_height - height) / 2))
self.root.geometry(f"{width}x{height}+{x}+{y}")
def step(self, message: str) -> None:
self.current_step = min(self.current_step + 1, self.total_steps)
status = f"[{self.current_step}/{self.total_steps}] {message}"
self.message(status)
if self.progress is not None:
self.progress["value"] = self.current_step
self._pump()
def message(self, message: str) -> None:
_splash_update(message)
if self.enabled and self.status_var is not None:
self.status_var.set(message)
self._pump()
def detail(self, message: str) -> None:
_splash_update(message)
clean = " ".join(str(message).split())
if not clean:
return
if len(clean) > 150:
clean = clean[:147] + "..."
if self.enabled and self.log_box is not None:
self._lines.append(clean)
self._lines = self._lines[-7:]
self.log_box.delete(0, tk.END)
for line in self._lines:
self.log_box.insert(tk.END, line)
self.log_box.see(tk.END)
self._pump()
def close(self) -> None:
_splash_close()
if self.root is not None:
try:
self.root.destroy()
except Exception:
pass
self.root = None
self.enabled = False
def _pump(self) -> None:
if self.root is None:
return
try:
self.root.update_idletasks()
self.root.update()
except Exception:
self.enabled = False
class ModelProgressStream:
"""Redirige les sorties type tqdm vers une callback UI."""
def __init__(self, callback, prefix: str):
self.callback = callback
self.prefix = prefix
self.buffer = ""
self.last_line = ""
self.last_emit = 0.0
def write(self, data) -> int:
text = str(data)
self.buffer += text.replace("\r", "\n")
while "\n" in self.buffer:
line, self.buffer = self.buffer.split("\n", 1)
self._emit(line)
return len(text)
def flush(self) -> None:
if self.buffer:
self._emit(self.buffer)
self.buffer = ""
def _emit(self, line: str) -> None:
clean = " ".join(line.split())
if len(clean) < 3:
return
now = time.monotonic()
if clean == self.last_line and now - self.last_emit < 1.0:
return
self.last_line = clean
self.last_emit = now
self.callback(f"{self.prefix} : {clean}")
# ---------------------------------------------------------------------------
# Single-instance guard (lock file in user's temp directory)
# ---------------------------------------------------------------------------
_lock_file = None
_lock_fd = None
def _ensure_single_instance():
"""Prevent multiple instances using a lock file.
Works reliably on Windows and Linux, including PyInstaller --onefile."""
global _lock_file, _lock_fd
import tempfile
_lock_file = Path(tempfile.gettempdir()) / "anonymisation_chcb.lock"
try:
if sys.platform == "win32":
import msvcrt
_lock_fd = open(_lock_file, "w")
msvcrt.locking(_lock_fd.fileno(), msvcrt.LK_NBLCK, 1)
else:
import fcntl
_lock_fd = open(_lock_file, "w")
fcntl.flock(_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return True
except (OSError, IOError):
return False
except Exception:
return True
# ---------------------------------------------------------------------------
# Path resolution for PyInstaller frozen exe
# ---------------------------------------------------------------------------
if getattr(sys, 'frozen', False):
APP_DIR = Path(sys._MEIPASS)
EXE_DIR = Path(sys.executable).parent
else:
APP_DIR = Path(__file__).resolve().parent
EXE_DIR = APP_DIR
# Log file next to the exe
LOG_FILE = EXE_DIR / "anonymisation.log"
logging.basicConfig(
filename=str(LOG_FILE),
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger("launcher")
# Make embedded modules importable
sys.path.insert(0, str(APP_DIR))
os.chdir(str(APP_DIR))
log.info(f"APP_DIR={APP_DIR}")
log.info(f"EXE_DIR={EXE_DIR}")
log.info(f"frozen={getattr(sys, 'frozen', False)}")
MODELS_DIR = APP_DIR / "models"
def check_models_ready():
"""Check that the CamemBERT-bio ONNX model is present."""
onnx_path = MODELS_DIR / "camembert-bio-deid" / "onnx" / "model.onnx"
ok = onnx_path.exists()
log.info(f"CamemBERT ONNX present: {ok} ({onnx_path})")
return ok
def launch_gui():
"""Launch the main GUI with visible startup progress."""
log.info("Launching GUI...")
progress = BrandedSplash(total_steps=5)
progress.step("Préparation de l'environnement")
# Traductions log.info() → libellés "prod" lisibles pour l'utilisateur.
_LOG_TRANSLATIONS = [
("Gazetteers INSEE prénoms", "Chargement des prénoms français (INSEE)…"),
("Gazetteers INSEE communes", "Chargement des communes françaises (INSEE)…"),
("Gazetteers INSEE noms de famille", "Chargement des noms de famille (INSEE)…"),
("Villes blacklist", "Chargement de la blacklist des villes…"),
("Gazetteer FINESS numéros", "Chargement des numéros FINESS…"),
("Gazetteer FINESS villes", "Chargement des villes FINESS…"),
("Gazetteer FINESS téléphones", "Chargement des téléphones FINESS…"),
("Gazetteer FINESS Aho-Corasick", "Indexation des établissements de santé…"),
("Gazetteer FINESS adresses", "Chargement des adresses FINESS…"),
("Gazetteer VILLE Aho-Corasick", "Indexation des villes…"),
("Whitelist termes médicaux", "Chargement du lexique médical…"),
("Whitelist médicaments", "Chargement de la base médicamenteuse (BDPM)…"),
("Stop-words manuels", "Chargement des stop-words…"),
("BDPM stop-words", "Chargement des médicaments BDPM…"),
("DPI labels blacklist", "Chargement des libellés DPI…"),
("Companion blacklist", "Chargement du vocabulaire clinique…"),
("Whitelist phrases", "Chargement des phrases protégées…"),
("FINESS mono-mots", "Chargement des sigles d'établissement…"),
("Core imported OK", "Moteur d'anonymisation prêt…"),
("GUI module imported OK", "Interface prête — finalisation…"),
]
def _translate(msg: str) -> str:
for key, human in _LOG_TRANSLATIONS:
if key in msg:
return human
return msg
# Handler logs → splash natif. Installé sur le root logger pour capturer
# tous les log.info() des modules chargés pendant l'import.
class _SplashHandler(logging.Handler):
def emit(self, record):
try:
progress.detail(_translate(record.getMessage()))
except Exception:
pass
_handler = _SplashHandler()
_handler.setLevel(logging.INFO)
logging.getLogger().addHandler(_handler)
# Afficher tout de suite un message initial sous le logo
progress.detail("Démarrage du moteur applicatif")
# Import du core et de la GUI (synchrone : pas besoin de thread puisque
# le splash natif tourne dans son propre processus bootloader).
result = {"error": None}
try:
progress.step("Chargement des dictionnaires médicaux")
import anonymizer_core_refactored_onnx # noqa
log.info("Core imported OK")
progress.step("Chargement du moteur d'anonymisation")
import Pseudonymisation_Gui_V5 # noqa
log.info("GUI module imported OK")
progress.step("Vérification des modèles locaux")
if check_models_ready():
progress.detail("CamemBERT-bio ONNX local disponible")
else:
progress.detail("CamemBERT-bio ONNX non trouvé dans le bundle")
progress.step("Ouverture de l'interface")
except Exception as e:
result["error"] = f"{e}\n{traceback.format_exc()}"
log.error(f"Import error: {result['error']}")
# Retirer le handler — la GUI principale utilise ses propres logs
try:
logging.getLogger().removeHandler(_handler)
except Exception:
pass
# Fermer le splash maintenant que tout est prêt
progress.close()
if result["error"]:
try:
messagebox.showerror(
"Erreur",
f"Erreur au lancement :\n\n{result['error'].splitlines()[0]}\n\n"
f"Voir {LOG_FILE} pour les détails.",
)
except Exception:
pass
return
# Lancer la fenêtre principale
try:
import Pseudonymisation_Gui_V5
log.info("Starting mainloop…")
root = tk.Tk()
Pseudonymisation_Gui_V5.App(root)
root.mainloop()
except Exception as e:
log.error(f"GUI error: {e}\n{traceback.format_exc()}")
try:
messagebox.showerror(
"Erreur",
f"Erreur de l'interface :\n\n{e}\n\nVoir {LOG_FILE}",
)
except Exception:
pass
class SetupWindow:
"""Setup window for first launch — auto-démarre le téléchargement des modèles.
Affiche un suivi détaillé par modèle (EDS-Pseudo, GLiNER, CamemBERT-bio) avec
indicateurs visuels (⏳ en cours, ✓ succès, ✗ échec). Permet de relancer en
cas d'erreur. Lancement auto de la GUI une fois tous les modèles prêts.
"""
# Liste ordonnée des étapes de chargement. Chaque entrée :
# (clé interne, libellé, taille approx, fonction de chargement)
STEPS = [
("eds_pseudo", "EDS-Pseudo (CamemBERT clinique)", "~450 Mo"),
("gliner", "GLiNER (détection PII zero-shot)", "~300 Mo"),
("camembert_onnx", "CamemBERT-bio ONNX (embarqué)", "local"),
]
def __init__(self):
self.root = tk.Tk()
self.root.title("Anonymisation — Configuration initiale")
self.root.geometry("660x700")
self.root.resizable(False, False)
self._logo_image = None
self._log_lines = []
frame = ttk.Frame(self.root, padding=18)
frame.pack(fill="both", expand=True)
splash_path = APP_DIR / "assets" / "splash.png"
if splash_path.exists():
self._logo_image = tk.PhotoImage(file=str(splash_path))
ttk.Label(frame, image=self._logo_image).pack(pady=(0, 8))
ttk.Label(frame, text="Préparation des modèles d'intelligence artificielle",
font=("", 13, "bold")).pack(pady=(0, 4))
ttk.Label(
frame,
text=("Au premier lancement, les modèles de détection doivent être téléchargés\n"
"depuis HuggingFace. Cette opération est unique — durée 3 à 10 minutes\n"
"selon votre connexion internet. Merci de patienter."),
justify="center", foreground="#555555",
).pack(pady=(0, 12))
# Barre de progression globale
self.progress = ttk.Progressbar(frame, mode="determinate",
length=560, maximum=len(self.STEPS))
self.progress.pack(pady=(0, 4))
self.status_var = tk.StringVar(value="Démarrage…")
ttk.Label(frame, textvariable=self.status_var, foreground="#1a568e").pack(pady=(0, 12))
# Zone détail par modèle
detail_frame = ttk.LabelFrame(frame, text=" Modèles ", padding=10)
detail_frame.pack(fill="x", pady=(0, 12))
self.step_labels = {}
for key, title, size in self.STEPS:
row = ttk.Frame(detail_frame)
row.pack(fill="x", pady=3)
icon = ttk.Label(row, text="", width=3, font=("", 12))
icon.pack(side="left")
ttk.Label(row, text=title).pack(side="left")
ttk.Label(row, text=f" ({size})", foreground="#999999",
font=("", 8)).pack(side="left")
self.step_labels[key] = icon
log_frame = ttk.LabelFrame(frame, text=" Détail du chargement ", padding=8)
log_frame.pack(fill="x", pady=(0, 12))
self.log_text = tk.Text(
log_frame,
height=7,
wrap="word",
state="disabled",
bg="#f7f7f7",
fg="#333333",
bd=0,
padx=8,
pady=6,
font=("Consolas", 8),
)
self.log_text.pack(fill="x")
# Bouton relance (caché au début)
self.btn = ttk.Button(frame, text="Relancer", command=self.start_download)
self.btn.pack(pady=6)
self.btn.pack_forget()
# Bouton ignorer/continuer (affiché si échec partiel)
self.btn_skip = ttk.Button(
frame, text="Continuer malgré tout",
command=self._finish,
)
self.btn_skip.pack(pady=(0, 4))
self.btn_skip.pack_forget()
# Auto-démarrage du téléchargement (pas besoin de cliquer)
self.root.after(500, self.start_download)
def start_download(self):
self.btn.pack_forget()
self.btn_skip.pack_forget()
self.progress["value"] = 0
self.status_var.set("Démarrage du téléchargement…")
for icon in self.step_labels.values():
icon.configure(text="", foreground="#999999")
threading.Thread(target=self._download_thread, daemon=True).start()
def _set_step(self, key, state):
"""state : 'pending' | 'running' | 'ok' | 'fail'"""
mapping = {
"pending": ("", "#999999"),
"running": ("", "#f57c00"),
"ok": ("", "#2e7d32"),
"fail": ("", "#c62828"),
}
char, color = mapping.get(state, ("", "#999999"))
icon = self.step_labels.get(key)
if icon is not None:
self.root.after(0, lambda: icon.configure(text=char, foreground=color))
def _download_thread(self):
failures = []
try:
# 1. EDS-Pseudo
self._update("Téléchargement d'EDS-Pseudo… (modèle CamemBERT clinique)")
self._append_log("EDS-Pseudo : téléchargement/chargement du modèle AP-HP")
self._set_step("eds_pseudo", "running")
log.info("Downloading EDS-Pseudo...")
try:
from eds_pseudo_manager import EdsPseudoManager
mgr = EdsPseudoManager()
with self._capture_model_output("EDS-Pseudo"):
mgr.load()
self._set_step("eds_pseudo", "ok")
self._append_log("EDS-Pseudo : modèle prêt")
log.info("EDS-Pseudo OK")
except Exception as e:
self._set_step("eds_pseudo", "fail")
self._append_log(f"EDS-Pseudo : échec - {e}")
failures.append(("EDS-Pseudo", str(e)))
log.warning(f"EDS-Pseudo failed: {e}")
self._advance()
# 2. GLiNER
self._update("Téléchargement de GLiNER… (détection zero-shot)")
self._append_log("GLiNER : téléchargement/chargement du modèle PII")
self._set_step("gliner", "running")
log.info("Downloading GLiNER...")
try:
from gliner_manager import GlinerManager
mgr = GlinerManager()
with self._capture_model_output("GLiNER"):
mgr.load()
self._set_step("gliner", "ok")
self._append_log("GLiNER : modèle prêt")
log.info("GLiNER OK")
except Exception as e:
self._set_step("gliner", "fail")
self._append_log(f"GLiNER : échec - {e}")
failures.append(("GLiNER", str(e)))
log.warning(f"GLiNER failed: {e}")
self._advance()
# 3. CamemBERT-bio ONNX
self._update("Vérification CamemBERT-bio ONNX (modèle embarqué)…")
self._append_log("CamemBERT-bio ONNX : vérification du modèle embarqué")
self._set_step("camembert_onnx", "running")
if check_models_ready():
self._set_step("camembert_onnx", "ok")
self._append_log("CamemBERT-bio ONNX : modèle local présent")
else:
self._set_step("camembert_onnx", "fail")
self._append_log("CamemBERT-bio ONNX : fichier ONNX introuvable")
failures.append(("CamemBERT-bio ONNX", "fichier ONNX introuvable dans le bundle"))
log.error("CamemBERT-bio ONNX not found")
self._advance()
if failures:
lines = "\n".join(f"{name} : {err[:60]}" for name, err in failures)
self._update(f"Certains modèles ont échoué ({len(failures)}/{len(self.STEPS)}).")
log.warning(f"Setup partial failure: {len(failures)} model(s) failed\n{lines}")
self.root.after(0, lambda: self.btn.pack(pady=6))
self.root.after(0, lambda: self.btn_skip.pack(pady=(0, 4)))
else:
self._update("Tous les modèles sont prêts. Lancement de l'interface…")
log.info("Setup complete, launching GUI in 1.5s")
self.root.after(1500, self._finish)
except Exception as e:
log.error(f"Setup error: {e}\n{traceback.format_exc()}")
self._update(f"Erreur inattendue : {e}")
self.root.after(0, lambda: self.btn.pack(pady=6))
def _advance(self):
self.root.after(0, lambda: self.progress.step(1))
def _update(self, msg):
self.root.after(0, lambda: self.status_var.set(msg))
def _append_log(self, msg):
clean = " ".join(str(msg).split())
if not clean:
return
if len(clean) > 180:
clean = clean[:177] + "..."
def _apply():
self._log_lines.append(clean)
self._log_lines = self._log_lines[-80:]
self.log_text.configure(state="normal")
self.log_text.delete("1.0", tk.END)
self.log_text.insert("end", "\n".join(self._log_lines))
self.log_text.configure(state="disabled")
self.log_text.see("end")
self.root.after(0, _apply)
@contextlib.contextmanager
def _capture_model_output(self, label):
stream = ModelProgressStream(self._append_log, label)
with contextlib.redirect_stdout(stream), contextlib.redirect_stderr(stream):
yield
stream.flush()
def _finish(self):
try:
self.root.destroy()
except Exception:
pass
launch_gui()
def run(self):
self.root.mainloop()
def main():
log.info("=== Demarrage Anonymisation ===")
# Single-instance check
if not _ensure_single_instance():
log.warning("Another instance is already running. Exiting.")
try:
messagebox.showwarning(
"Anonymisation",
"L'application est deja en cours d'execution.\n\n"
"Regardez dans la barre des taches.",
)
except:
pass
sys.exit(0)
try:
if check_models_ready():
_splash_update("Modèles déjà installés — chargement…")
launch_gui()
else:
_splash_update("Premier lancement — configuration initiale")
_splash_close() # laisse place à la SetupWindow qui a sa propre UI
setup = SetupWindow()
setup.run()
except Exception as e:
log.error(f"Fatal error: {e}\n{traceback.format_exc()}")
try:
messagebox.showerror("Erreur fatale", f"{e}\n\nVoir {LOG_FILE}")
except:
pass
if __name__ == "__main__":
main()