feat(gui): add GUI V6 G2 — onglet Utilisation + runner injectable

Onglet Utilisation fonctionnel (couche présentation only) :
- processing_runner: runner testable sans display/moteur lourd, process_fn
  injectable (défaut = process_document en import paresseux), découverte
  fichier/dossier, sorties anonymise/ comme V5 (arbo préservée), progression,
  journal, résumé OK/KO, arrêt coopératif entre documents, anti double-lancement
- tabs/tab_usage: sélection fichier/dossier + nb PDF détectés, dossier sortie
  (défaut anonymise/), Lancer/Arrêter, barre de progression, statut, journal,
  résumé ; worker threadé, file d'événements drainée par after() ; aucun réseau
- app.py: onglet Utilisation câblé (placeholder G2 retiré)
- self-test: couvre processing_runner + tab_usage

Tests: +11 (runner) — discovery, sorties, échec partiel, arrêt, anti-double-run,
callbacks. self-test exit 0, 32 tests gui_v6, 179 tests/unit (0 régression).
Moteur/V5/managers/specs intacts.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-11 18:58:10 +02:00
parent a6ee68a8a3
commit 9bc6537233
5 changed files with 534 additions and 7 deletions

View File

@@ -16,15 +16,17 @@ import sys
def _self_test() -> int:
"""Importe les modules du socle GUI V6 sans créer de fenêtre."""
from gui_v6 import app, license_client, license_store, theme # noqa: F401
from gui_v6.tabs import tab_about # noqa: F401
from gui_v6 import app, license_client, license_store, processing_runner, theme # noqa: F401
from gui_v6.tabs import tab_about, tab_usage # noqa: F401
# Sanity check des contrats publics du socle.
assert hasattr(app, "AnonymisationApp")
assert hasattr(license_client, "LicenseClient")
assert hasattr(license_client, "LicenseStatus")
assert hasattr(license_store, "LicenseStore")
assert hasattr(processing_runner, "ProcessingRunner")
assert hasattr(tab_about, "AboutTab")
assert hasattr(tab_usage, "UsageTab")
print("GUI V6 self-test OK")
return 0

View File

@@ -17,6 +17,7 @@ import customtkinter as ctk
from gui_v6 import theme as theme_mod
from gui_v6.license_client import LicenseClient, LicenseStatus
from gui_v6.tabs.tab_about import AboutTab
from gui_v6.tabs.tab_usage import UsageTab
_TABS = ("Utilisation", "Configuration", "À propos")
@@ -77,16 +78,15 @@ class AnonymisationApp(ctk.CTk):
for name in _TABS:
tabview.add(name)
self._usage = UsageTab(tabview.tab("Utilisation"))
self._usage.pack(fill="both", expand=True)
self._about = AboutTab(
tabview.tab("À propos"), status=status, theme_name=self._theme_name
)
self._about.pack(fill="both", expand=True)
# Placeholders G2/G3.
ctk.CTkLabel(
tabview.tab("Utilisation"),
text="Onglet Utilisation — disponible au lot G2.",
).pack(padx=16, pady=16, anchor="w")
# Placeholder G3.
ctk.CTkLabel(
tabview.tab("Configuration"),
text="Onglet Configuration — disponible au lot G3.",

166
gui_v6/processing_runner.py Normal file
View File

@@ -0,0 +1,166 @@
"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd).
Le runner orchestre l'anonymisation document par document via une fonction de
traitement **injectable** :
- en production, le défaut appelle ``process_document`` du moteur (import paresseux,
aucun manager NER chargé à l'import de ce module) ;
- en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle.
Il ne contient aucune logique de détection : il découvre les documents, construit
les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence
préservée), exécute le traitement, et expose progression / journal / résumé /
arrêt coopératif (entre deux documents).
"""
from __future__ import annotations
import threading
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Sequence
from gui_batch_paths import build_batch_output_dir, list_supported_documents
# process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner).
ProcessFn = Callable[[Path, Path], dict]
# Repli si format_converter indisponible à l'exécution (ne sert qu'au listing).
_FALLBACK_EXTENSIONS = (
".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm",
".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp",
)
def supported_extensions() -> tuple[str, ...]:
"""Extensions supportées : depuis ``format_converter`` si dispo, sinon repli."""
try:
from format_converter import SUPPORTED_EXTENSIONS
return tuple(sorted(SUPPORTED_EXTENSIONS))
except Exception:
return _FALLBACK_EXTENSIONS
def default_output_dir(input_path) -> Path:
"""Dossier de sortie par défaut : ``anonymise/`` sous la source."""
path = Path(input_path)
base = path if path.is_dir() else path.parent
return base / "anonymise"
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
path = Path(input_path)
exts = tuple(extensions) if extensions is not None else supported_extensions()
normalized = {e.lower() for e in exts}
if path.is_file():
return [path] if path.suffix.lower() in normalized else []
if path.is_dir():
return list_supported_documents(path, exts)
return []
@dataclass
class RunSummary:
"""Résultat d'un run : compteurs et erreurs par document."""
total: int = 0
succeeded: int = 0
failed: int = 0
stopped: bool = False
errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message)
@property
def ok(self) -> bool:
return self.failed == 0 and not self.stopped
def _default_process_fn(doc_path: Path, out_dir: Path) -> dict:
# Import paresseux : aucun manager NER chargé à l'import du runner.
from anonymizer_core_refactored_onnx import process_document
return process_document(doc_path, out_dir)
class ProcessingRunner:
"""Exécute le traitement document par document, arrêt coopératif inclus."""
def __init__(
self,
process_fn: Optional[ProcessFn] = None,
extensions: Optional[Sequence[str]] = None,
) -> None:
self._process_fn = process_fn or _default_process_fn
self._extensions = tuple(extensions) if extensions is not None else None
self._lock = threading.Lock()
self._running = False
@property
def is_running(self) -> bool:
return self._running
def discover(self, input_path) -> list[Path]:
return discover_documents(input_path, self._extensions)
def run(
self,
input_path,
output_dir=None,
*,
on_progress: Optional[Callable[[int, int, str], None]] = None,
on_log: Optional[Callable[[str], None]] = None,
stop_event: Optional[threading.Event] = None,
) -> RunSummary:
"""Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI).
Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement).
"""
with self._lock:
if self._running:
raise RuntimeError("Un traitement est déjà en cours.")
self._running = True
try:
return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event)
finally:
with self._lock:
self._running = False
def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary:
input_path = Path(input_path)
docs = self.discover(input_path)
out_root = Path(output_dir) if output_dir else default_output_dir(input_path)
root_dir = input_path if input_path.is_dir() else input_path.parent
summary = RunSummary(total=len(docs))
def log(message: str) -> None:
if on_log:
on_log(message)
if not docs:
log("Aucun document supporté détecté.")
return summary
for index, doc in enumerate(docs, start=1):
if stop_event is not None and stop_event.is_set():
summary.stopped = True
log("Arrêt demandé — traitement interrompu entre deux documents.")
break
if on_progress:
on_progress(index - 1, summary.total, doc.name)
try:
if input_path.is_dir():
doc_out = build_batch_output_dir(root_dir, out_root, doc)
else:
doc_out = out_root
doc_out.mkdir(parents=True, exist_ok=True)
self._process_fn(doc, doc_out)
summary.succeeded += 1
log(f"OK : {doc.name}")
except Exception as exc: # un échec n'interrompt pas le lot
summary.failed += 1
summary.errors.append((doc.name, str(exc)))
log(f"ÉCHEC : {doc.name}{exc}")
if on_progress:
on_progress(index, summary.total, doc.name)
return summary

194
gui_v6/tabs/tab_usage.py Normal file
View File

@@ -0,0 +1,194 @@
"""Onglet « Utilisation » de la GUI V6.
Sélection fichier/dossier → choix sortie → lancement via le runner (dans un
thread) → progression / journal / résumé. Aucun appel réseau au démarrage,
aucune logique de détection : tout passe par :class:`ProcessingRunner`.
Les widgets ne sont créés qu'à l'instanciation (import sûr pour ``--self-test``).
La communication thread worker → UI passe par une file drainée via ``after``.
"""
from __future__ import annotations
import queue
import threading
from pathlib import Path
from tkinter import filedialog
import customtkinter as ctk
from gui_v6.processing_runner import ProcessingRunner, default_output_dir
class UsageTab(ctk.CTkFrame):
def __init__(self, master, runner: ProcessingRunner | None = None, **kwargs):
super().__init__(master, **kwargs)
self._runner = runner or ProcessingRunner()
self._input_path: Path | None = None
self._output_dir: Path | None = None
self._stop_event: threading.Event | None = None
self._worker: threading.Thread | None = None
self._events: "queue.Queue[tuple]" = queue.Queue()
self._build()
self.after(120, self._drain_events)
# -- construction UI --------------------------------------------------
def _build(self) -> None:
bar = ctk.CTkFrame(self)
bar.pack(fill="x", padx=12, pady=(12, 6))
ctk.CTkButton(bar, text="Choisir un fichier…", command=self._pick_file).pack(
side="left", padx=(0, 8)
)
ctk.CTkButton(bar, text="Choisir un dossier…", command=self._pick_folder).pack(
side="left"
)
self._target_label = ctk.CTkLabel(self, text="Aucune source sélectionnée", anchor="w")
self._target_label.pack(fill="x", padx=12, pady=(0, 4))
out_bar = ctk.CTkFrame(self)
out_bar.pack(fill="x", padx=12, pady=(0, 6))
ctk.CTkButton(out_bar, text="Dossier de sortie…", command=self._pick_output).pack(
side="left", padx=(0, 8)
)
self._output_label = ctk.CTkLabel(out_bar, text="Sortie : (défaut anonymise/)", anchor="w")
self._output_label.pack(side="left")
action = ctk.CTkFrame(self)
action.pack(fill="x", padx=12, pady=(0, 6))
self._run_btn = ctk.CTkButton(action, text="Lancer", command=self._start, state="disabled")
self._run_btn.pack(side="left", padx=(0, 8))
self._stop_btn = ctk.CTkButton(action, text="Arrêter", command=self._request_stop, state="disabled")
self._stop_btn.pack(side="left")
self._progress = ctk.CTkProgressBar(self)
self._progress.set(0.0)
self._progress.pack(fill="x", padx=12, pady=(6, 4))
self._status_label = ctk.CTkLabel(self, text="Prêt.", anchor="w")
self._status_label.pack(fill="x", padx=12)
self._log = ctk.CTkTextbox(self, height=180)
self._log.pack(fill="both", expand=True, padx=12, pady=(6, 12))
self._log.configure(state="disabled")
# -- sélection --------------------------------------------------------
def _pick_file(self) -> None:
path = filedialog.askopenfilename(title="Choisir un document")
if path:
self._set_input(Path(path))
def _pick_folder(self) -> None:
path = filedialog.askdirectory(title="Choisir un dossier")
if path:
self._set_input(Path(path))
def _pick_output(self) -> None:
path = filedialog.askdirectory(title="Dossier de sortie")
if path:
self._output_dir = Path(path)
self._output_label.configure(text=f"Sortie : {self._output_dir}")
def _set_input(self, path: Path) -> None:
self._input_path = path
count = len(self._runner.discover(path))
if path.is_dir():
self._target_label.configure(text=f"Dossier : {path} · {count} document(s) détecté(s)")
else:
self._target_label.configure(text=f"Fichier : {path.name}")
self._output_label.configure(text=f"Sortie : (défaut {default_output_dir(path)})")
self._run_btn.configure(state="normal" if count > 0 else "disabled")
# -- exécution --------------------------------------------------------
def _start(self) -> None:
if self._input_path is None or self._runner.is_running:
return
self._stop_event = threading.Event()
self._run_btn.configure(state="disabled")
self._stop_btn.configure(state="normal")
self._progress.set(0.0)
self._clear_log()
self._set_status("Traitement en cours…")
input_path, output_dir, stop = self._input_path, self._output_dir, self._stop_event
def work() -> None:
try:
summary = self._runner.run(
input_path,
output_dir,
on_progress=lambda done, total, name: self._events.put(("progress", done, total, name)),
on_log=lambda msg: self._events.put(("log", msg)),
stop_event=stop,
)
self._events.put(("done", summary))
except Exception as exc: # garde-fou : ne jamais laisser le thread tuer l'UI
self._events.put(("error", str(exc)))
self._worker = threading.Thread(target=work, daemon=True)
self._worker.start()
def _request_stop(self) -> None:
if self._stop_event is not None:
self._stop_event.set()
self._set_status("Arrêt demandé…")
self._stop_btn.configure(state="disabled")
# -- file d'événements worker → UI ------------------------------------
def _drain_events(self) -> None:
try:
while True:
event = self._events.get_nowait()
self._handle_event(event)
except queue.Empty:
pass
self.after(120, self._drain_events)
def _handle_event(self, event: tuple) -> None:
kind = event[0]
if kind == "progress":
_, done, total, name = event
self._progress.set(done / total if total else 0.0)
self._set_status(f"{done}/{total}{name}")
elif kind == "log":
self._append_log(event[1])
elif kind == "done":
self._finish(event[1])
elif kind == "error":
self._append_log(f"Erreur : {event[1]}")
self._finish(None)
def _finish(self, summary) -> None:
self._stop_btn.configure(state="disabled")
self._run_btn.configure(state="normal")
if summary is None:
self._set_status("Terminé avec erreur.")
return
if summary.stopped:
self._set_status(f"Arrêté : {summary.succeeded}/{summary.total} traités.")
else:
self._progress.set(1.0)
self._set_status(
f"Terminé : {summary.succeeded} OK, {summary.failed} échec(s) sur {summary.total}."
)
# -- helpers widgets --------------------------------------------------
def _set_status(self, text: str) -> None:
self._status_label.configure(text=text)
def _clear_log(self) -> None:
self._log.configure(state="normal")
self._log.delete("1.0", "end")
self._log.configure(state="disabled")
def _append_log(self, message: str) -> None:
self._log.configure(state="normal")
self._log.insert("end", message + "\n")
self._log.see("end")
self._log.configure(state="disabled")

View File

@@ -0,0 +1,165 @@
"""Tests du runner G2 : process_fn injectée, vrais fichiers tmp, aucun moteur réel."""
from __future__ import annotations
import threading
from pathlib import Path
import pytest
from gui_v6.processing_runner import (
ProcessingRunner,
RunSummary,
default_output_dir,
discover_documents,
)
_EXTS = (".pdf", ".txt")
def _touch(path: Path) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("x", encoding="utf-8")
return path
# -- découverte & chemins --------------------------------------------------
def test_discover_single_file(tmp_path):
f = _touch(tmp_path / "doc.pdf")
assert discover_documents(f, _EXTS) == [f]
def test_discover_single_file_unsupported(tmp_path):
f = _touch(tmp_path / "doc.xyz")
assert discover_documents(f, _EXTS) == []
def test_discover_folder_sorted_and_skips_output(tmp_path):
_touch(tmp_path / "b.pdf")
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "note.txt")
_touch(tmp_path / "anonymise" / "already.pdf") # sous-arbre de sortie ignoré
found = discover_documents(tmp_path, _EXTS)
names = [p.name for p in found]
assert names == ["a.pdf", "b.pdf", "note.txt"]
def test_default_output_dir_file_and_dir(tmp_path):
f = _touch(tmp_path / "doc.pdf")
assert default_output_dir(f) == tmp_path / "anonymise"
assert default_output_dir(tmp_path) == tmp_path / "anonymise"
# -- exécution -------------------------------------------------------------
def test_run_processes_all_docs(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "b.pdf")
calls = []
runner = ProcessingRunner(process_fn=lambda d, o: calls.append((d, o)) or {}, extensions=_EXTS)
summary = runner.run(tmp_path)
assert isinstance(summary, RunSummary)
assert summary.total == 2
assert summary.succeeded == 2
assert summary.failed == 0
assert summary.ok is True
assert len(calls) == 2
# Le dossier de sortie par défaut a été créé.
assert (tmp_path / "anonymise").is_dir()
def test_run_single_file_uses_output_dir(tmp_path):
f = _touch(tmp_path / "doc.pdf")
out = tmp_path / "sortie"
seen = {}
runner = ProcessingRunner(process_fn=lambda d, o: seen.update(doc=d, out=o) or {}, extensions=_EXTS)
summary = runner.run(f, output_dir=out)
assert summary.total == 1 and summary.succeeded == 1
assert seen["doc"] == f
assert seen["out"] == out
assert out.is_dir()
def test_run_continues_after_failure(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "boom.pdf")
_touch(tmp_path / "c.pdf")
def proc(doc, out):
if doc.name == "boom.pdf":
raise RuntimeError("explosion")
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(tmp_path)
assert summary.total == 3
assert summary.succeeded == 2
assert summary.failed == 1
assert summary.ok is False
assert summary.errors[0][0] == "boom.pdf"
assert "explosion" in summary.errors[0][1]
def test_run_empty_folder(tmp_path):
logs = []
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
summary = runner.run(tmp_path, on_log=logs.append)
assert summary.total == 0
assert any("Aucun document" in m for m in logs)
def test_stop_event_interrupts_between_docs(tmp_path):
for name in ("a.pdf", "b.pdf", "c.pdf"):
_touch(tmp_path / name)
stop = threading.Event()
processed = []
def proc(doc, out):
processed.append(doc.name)
stop.set() # demande l'arrêt après le 1er document
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(tmp_path, stop_event=stop)
assert summary.stopped is True
assert summary.succeeded == 1
assert len(processed) == 1 # arrêt effectif entre deux documents
def test_progress_callbacks(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "b.pdf")
events = []
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
runner.run(tmp_path, on_progress=lambda done, total, name: events.append((done, total)))
assert (2, 2) in events # progression finale atteinte
def test_no_double_run(tmp_path):
_touch(tmp_path / "a.pdf")
started = threading.Event()
release = threading.Event()
result = {}
def proc(doc, out):
started.set()
release.wait(timeout=2)
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
worker = threading.Thread(target=lambda: runner.run(tmp_path))
worker.start()
assert started.wait(timeout=2)
# Pendant le run, un second lancement est refusé.
with pytest.raises(RuntimeError):
runner.run(tmp_path)
release.set()
worker.join(timeout=2)
assert runner.is_running is False