"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd). Le runner orchestre l'anonymisation document par document via une fonction de traitement **injectable** : - en production, le défaut appelle ``process_document`` du moteur (import paresseux, aucun manager NER chargé à l'import de ce module) ; - en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle. Il ne contient aucune logique de détection : il découvre les documents, construit les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence préservée), exécute le traitement, et expose progression / journal / résumé / arrêt coopératif (entre deux documents). """ from __future__ import annotations import threading import time from dataclasses import dataclass, field from pathlib import Path from typing import Callable, Optional, Sequence from gui_batch_paths import build_batch_output_dir, list_supported_documents from gui_v6.usage_telemetry import page_count_for # process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner). ProcessFn = Callable[[Path, Path], dict] # Repli si format_converter indisponible à l'exécution (ne sert qu'au listing). _FALLBACK_EXTENSIONS = ( ".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm", ".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp", ) def supported_extensions() -> tuple[str, ...]: """Extensions supportées : depuis ``format_converter`` si dispo, sinon repli.""" try: from format_converter import SUPPORTED_EXTENSIONS return tuple(sorted(SUPPORTED_EXTENSIONS)) except Exception: return _FALLBACK_EXTENSIONS def default_output_dir(input_path) -> Path: """Dossier de sortie par défaut : ``anonymise/`` sous la source.""" path = Path(input_path) base = path if path.is_dir() else path.parent return base / "anonymise" def _delivered_pdf_paths(result: object) -> list[Path]: """Retourne les PDF effectivement produits par le moteur. Le moteur retourne toujours des clés ``pdf_*`` pour une sortie livrable. Les tests unitaires historiques injectent souvent ``{}`` comme succès factice ; on ne les assimile donc pas à un échec ici. """ if not isinstance(result, dict): return [] paths: list[Path] = [] for key, value in result.items(): if not str(key).startswith("pdf") or not isinstance(value, (str, Path)): continue path = Path(value) if path.exists() and path.is_file(): paths.append(path) return paths def _engine_result_error(result: object) -> str | None: """Traduit un retour moteur non livrable en erreur visible GUI.""" if not isinstance(result, dict): return None if result.get("status") == "quarantined": reason = result.get("reason") or "document mis en quarantaine" return f"Document mis en quarantaine : {reason}" has_real_engine_outputs = ( "text" in result or "audit" in result or any(str(key).startswith("pdf") for key in result) ) if has_real_engine_outputs and not _delivered_pdf_paths(result): return "Aucune sortie PDF anonymisée produite." return None def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]: """Liste les documents à traiter (fichier unique ou dossier récursif).""" path = Path(input_path) exts = tuple(extensions) if extensions is not None else supported_extensions() normalized = {e.lower() for e in exts} if path.is_file(): return [path] if path.suffix.lower() in normalized else [] if path.is_dir(): return list_supported_documents(path, exts) return [] class OutputNotWritableError(RuntimeError): """Le dossier de sortie n'est pas inscriptible (échec amont, message clair).""" @dataclass class DocResult: """Détail anonymisé d'un document traité (pour la télémétrie d'usage). RGPD : aucun nom ni chemin de fichier — uniquement des métadonnées. """ ordinal: int page_count: Optional[int] status: str # "success" | "failed" duration_ms: Optional[int] extension: Optional[str] @dataclass class RunSummary: """Résultat d'un run : compteurs et erreurs par document.""" total: int = 0 succeeded: int = 0 failed: int = 0 stopped: bool = False errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message) documents: list = field(default_factory=list) # list[DocResult] (anonymisé) @property def ok(self) -> bool: return self.failed == 0 and not self.stopped def _default_process_fn(doc_path: Path, out_dir: Path) -> dict: # Passe par make_process_fn pour bénéficier du fail-close P0-1 (refus si le # NER obligatoire est indisponible), même sur ce chemin de repli. from gui_v6.engine_bridge import EngineSettings, make_process_fn return make_process_fn(EngineSettings())(doc_path, out_dir) class ProcessingRunner: """Exécute le traitement document par document, arrêt coopératif inclus.""" def __init__( self, process_fn: Optional[ProcessFn] = None, extensions: Optional[Sequence[str]] = None, ) -> None: self._process_fn = process_fn or _default_process_fn self._extensions = tuple(extensions) if extensions is not None else None self._lock = threading.Lock() self._running = False @property def is_running(self) -> bool: return self._running def discover(self, input_path) -> list[Path]: return discover_documents(input_path, self._extensions) def run( self, input_path, output_dir=None, *, on_progress: Optional[Callable[[int, int, str], None]] = None, on_log: Optional[Callable[[str], None]] = None, stop_event: Optional[threading.Event] = None, ) -> RunSummary: """Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI). Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement). """ with self._lock: if self._running: raise RuntimeError("Un traitement est déjà en cours.") self._running = True try: return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event) finally: with self._lock: self._running = False def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary: input_path = Path(input_path) docs = self.discover(input_path) out_root = Path(output_dir) if output_dir else default_output_dir(input_path) root_dir = input_path if input_path.is_dir() else input_path.parent summary = RunSummary(total=len(docs)) def log(message: str) -> None: if on_log: on_log(message) if not docs: log("Aucun document supporté détecté.") return summary # Sonde amont : on vérifie une seule fois que le dossier de sortie est # inscriptible AVANT la boucle, pour un échec clair et unique (P1-6) # plutôt qu'une erreur cryptique répétée à chaque document. try: out_root.mkdir(parents=True, exist_ok=True) probe = out_root / ".anon_write_test" probe.write_text("", encoding="utf-8") probe.unlink() except Exception as exc: raise OutputNotWritableError( f"Dossier de sortie non inscriptible : {out_root} ({exc})" ) from exc for index, doc in enumerate(docs, start=1): if stop_event is not None and stop_event.is_set(): summary.stopped = True log("Arrêt demandé — traitement interrompu entre deux documents.") break if on_progress: on_progress(index - 1, summary.total, doc.name) # Détails anonymisés pour la télémétrie (jamais le nom/chemin). extension = doc.suffix.lstrip(".").lower() or None page_count = page_count_for(doc) started = time.monotonic() status = "success" try: if input_path.is_dir(): doc_out = build_batch_output_dir(root_dir, out_root, doc) else: doc_out = out_root doc_out.mkdir(parents=True, exist_ok=True) result = self._process_fn(doc, doc_out) result_error = _engine_result_error(result) if result_error is not None: raise RuntimeError(result_error) summary.succeeded += 1 log(f"OK : {doc.name}") except Exception as exc: # un échec n'interrompt pas le lot status = "failed" summary.failed += 1 summary.errors.append((doc.name, str(exc))) log(f"ÉCHEC : {doc.name} — {exc}") summary.documents.append( DocResult( ordinal=index - 1, page_count=page_count, status=status, duration_ms=int((time.monotonic() - started) * 1000), extension=extension, ) ) if on_progress: on_progress(index, summary.total, doc.name) return summary