"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd). Le runner orchestre l'anonymisation document par document via une fonction de traitement **injectable** : - en production, le défaut appelle ``process_document`` du moteur (import paresseux, aucun manager NER chargé à l'import de ce module) ; - en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle. Il ne contient aucune logique de détection : il découvre les documents, construit les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence préservée), exécute le traitement, et expose progression / journal / résumé / arrêt coopératif (entre deux documents). """ from __future__ import annotations import threading from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Optional, Sequence from gui_batch_paths import build_batch_output_dir, list_supported_documents # process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner). ProcessFn = Callable[[Path, Path], dict] # Repli si format_converter indisponible à l'exécution (ne sert qu'au listing). _FALLBACK_EXTENSIONS = ( ".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm", ".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp", ) def supported_extensions() -> tuple[str, ...]: """Extensions supportées : depuis ``format_converter`` si dispo, sinon repli.""" try: from format_converter import SUPPORTED_EXTENSIONS return tuple(sorted(SUPPORTED_EXTENSIONS)) except Exception: return _FALLBACK_EXTENSIONS def default_output_dir(input_path) -> Path: """Dossier de sortie par défaut : ``anonymise/`` sous la source.""" path = Path(input_path) base = path if path.is_dir() else path.parent return base / "anonymise" def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]: """Liste les documents à traiter (fichier unique ou dossier récursif).""" path = Path(input_path) exts = tuple(extensions) if extensions is not None else supported_extensions() normalized = {e.lower() for e in exts} if path.is_file(): return [path] if path.suffix.lower() in normalized else [] if path.is_dir(): return list_supported_documents(path, exts) return [] @dataclass class RunSummary: """Résultat d'un run : compteurs et erreurs par document.""" total: int = 0 succeeded: int = 0 failed: int = 0 stopped: bool = False errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message) @property def ok(self) -> bool: return self.failed == 0 and not self.stopped def _default_process_fn(doc_path: Path, out_dir: Path) -> dict: # Import paresseux : aucun manager NER chargé à l'import du runner. from anonymizer_core_refactored_onnx import process_document return process_document(doc_path, out_dir) class ProcessingRunner: """Exécute le traitement document par document, arrêt coopératif inclus.""" def __init__( self, process_fn: Optional[ProcessFn] = None, extensions: Optional[Sequence[str]] = None, ) -> None: self._process_fn = process_fn or _default_process_fn self._extensions = tuple(extensions) if extensions is not None else None self._lock = threading.Lock() self._running = False @property def is_running(self) -> bool: return self._running def discover(self, input_path) -> list[Path]: return discover_documents(input_path, self._extensions) def run( self, input_path, output_dir=None, *, on_progress: Optional[Callable[[int, int, str], None]] = None, on_log: Optional[Callable[[str], None]] = None, stop_event: Optional[threading.Event] = None, ) -> RunSummary: """Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI). Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement). """ with self._lock: if self._running: raise RuntimeError("Un traitement est déjà en cours.") self._running = True try: return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event) finally: with self._lock: self._running = False def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary: input_path = Path(input_path) docs = self.discover(input_path) out_root = Path(output_dir) if output_dir else default_output_dir(input_path) root_dir = input_path if input_path.is_dir() else input_path.parent summary = RunSummary(total=len(docs)) def log(message: str) -> None: if on_log: on_log(message) if not docs: log("Aucun document supporté détecté.") return summary for index, doc in enumerate(docs, start=1): if stop_event is not None and stop_event.is_set(): summary.stopped = True log("Arrêt demandé — traitement interrompu entre deux documents.") break if on_progress: on_progress(index - 1, summary.total, doc.name) try: if input_path.is_dir(): doc_out = build_batch_output_dir(root_dir, out_root, doc) else: doc_out = out_root doc_out.mkdir(parents=True, exist_ok=True) self._process_fn(doc, doc_out) summary.succeeded += 1 log(f"OK : {doc.name}") except Exception as exc: # un échec n'interrompt pas le lot summary.failed += 1 summary.errors.append((doc.name, str(exc))) log(f"ÉCHEC : {doc.name} — {exc}") if on_progress: on_progress(index, summary.total, doc.name) return summary