256 lines
9.3 KiB
Python
256 lines
9.3 KiB
Python
"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd).
|
|
|
|
Le runner orchestre l'anonymisation document par document via une fonction de
|
|
traitement **injectable** :
|
|
|
|
- en production, le défaut appelle ``process_document`` du moteur (import paresseux,
|
|
aucun manager NER chargé à l'import de ce module) ;
|
|
- en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle.
|
|
|
|
Il ne contient aucune logique de détection : il découvre les documents, construit
|
|
les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence
|
|
préservée), exécute le traitement, et expose progression / journal / résumé /
|
|
arrêt coopératif (entre deux documents).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Callable, Optional, Sequence
|
|
|
|
from gui_batch_paths import build_batch_output_dir, list_supported_documents
|
|
from gui_v6.usage_telemetry import page_count_for
|
|
|
|
# process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner).
|
|
ProcessFn = Callable[[Path, Path], dict]
|
|
|
|
# Repli si format_converter indisponible à l'exécution (ne sert qu'au listing).
|
|
_FALLBACK_EXTENSIONS = (
|
|
".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm",
|
|
".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp",
|
|
)
|
|
|
|
|
|
def supported_extensions() -> tuple[str, ...]:
|
|
"""Extensions supportées : depuis ``format_converter`` si dispo, sinon repli."""
|
|
try:
|
|
from format_converter import SUPPORTED_EXTENSIONS
|
|
|
|
return tuple(sorted(SUPPORTED_EXTENSIONS))
|
|
except Exception:
|
|
return _FALLBACK_EXTENSIONS
|
|
|
|
|
|
def default_output_dir(input_path) -> Path:
|
|
"""Dossier de sortie par défaut : ``anonymise/`` sous la source."""
|
|
path = Path(input_path)
|
|
base = path if path.is_dir() else path.parent
|
|
return base / "anonymise"
|
|
|
|
|
|
def _delivered_pdf_paths(result: object) -> list[Path]:
|
|
"""Retourne les PDF effectivement produits par le moteur.
|
|
|
|
Le moteur retourne toujours des clés ``pdf_*`` pour une sortie livrable.
|
|
Les tests unitaires historiques injectent souvent ``{}`` comme succès factice ;
|
|
on ne les assimile donc pas à un échec ici.
|
|
"""
|
|
if not isinstance(result, dict):
|
|
return []
|
|
paths: list[Path] = []
|
|
for key, value in result.items():
|
|
if not str(key).startswith("pdf") or not isinstance(value, (str, Path)):
|
|
continue
|
|
path = Path(value)
|
|
if path.exists() and path.is_file():
|
|
paths.append(path)
|
|
return paths
|
|
|
|
|
|
def _engine_result_error(result: object) -> str | None:
|
|
"""Traduit un retour moteur non livrable en erreur visible GUI."""
|
|
if not isinstance(result, dict):
|
|
return None
|
|
if result.get("status") == "quarantined":
|
|
reason = result.get("reason") or "document mis en quarantaine"
|
|
return f"Document mis en quarantaine : {reason}"
|
|
has_real_engine_outputs = (
|
|
"text" in result
|
|
or "audit" in result
|
|
or any(str(key).startswith("pdf") for key in result)
|
|
)
|
|
if has_real_engine_outputs and not _delivered_pdf_paths(result):
|
|
return "Aucune sortie PDF anonymisée produite."
|
|
return None
|
|
|
|
|
|
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
|
|
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
|
|
path = Path(input_path)
|
|
exts = tuple(extensions) if extensions is not None else supported_extensions()
|
|
normalized = {e.lower() for e in exts}
|
|
if path.is_file():
|
|
return [path] if path.suffix.lower() in normalized else []
|
|
if path.is_dir():
|
|
return list_supported_documents(path, exts)
|
|
return []
|
|
|
|
|
|
class OutputNotWritableError(RuntimeError):
|
|
"""Le dossier de sortie n'est pas inscriptible (échec amont, message clair)."""
|
|
|
|
|
|
@dataclass
|
|
class DocResult:
|
|
"""Détail anonymisé d'un document traité (pour la télémétrie d'usage).
|
|
|
|
RGPD : aucun nom ni chemin de fichier — uniquement des métadonnées.
|
|
"""
|
|
|
|
ordinal: int
|
|
page_count: Optional[int]
|
|
status: str # "success" | "failed"
|
|
duration_ms: Optional[int]
|
|
extension: Optional[str]
|
|
|
|
|
|
@dataclass
|
|
class RunSummary:
|
|
"""Résultat d'un run : compteurs et erreurs par document."""
|
|
|
|
total: int = 0
|
|
succeeded: int = 0
|
|
failed: int = 0
|
|
stopped: bool = False
|
|
errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message)
|
|
documents: list = field(default_factory=list) # list[DocResult] (anonymisé)
|
|
|
|
@property
|
|
def ok(self) -> bool:
|
|
return self.failed == 0 and not self.stopped
|
|
|
|
|
|
def _default_process_fn(doc_path: Path, out_dir: Path) -> dict:
|
|
# Passe par make_process_fn pour bénéficier du fail-close P0-1 (refus si le
|
|
# NER obligatoire est indisponible), même sur ce chemin de repli.
|
|
from gui_v6.engine_bridge import EngineSettings, make_process_fn
|
|
|
|
return make_process_fn(EngineSettings())(doc_path, out_dir)
|
|
|
|
|
|
class ProcessingRunner:
|
|
"""Exécute le traitement document par document, arrêt coopératif inclus."""
|
|
|
|
def __init__(
|
|
self,
|
|
process_fn: Optional[ProcessFn] = None,
|
|
extensions: Optional[Sequence[str]] = None,
|
|
) -> None:
|
|
self._process_fn = process_fn or _default_process_fn
|
|
self._extensions = tuple(extensions) if extensions is not None else None
|
|
self._lock = threading.Lock()
|
|
self._running = False
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
return self._running
|
|
|
|
def discover(self, input_path) -> list[Path]:
|
|
return discover_documents(input_path, self._extensions)
|
|
|
|
def run(
|
|
self,
|
|
input_path,
|
|
output_dir=None,
|
|
*,
|
|
on_progress: Optional[Callable[[int, int, str], None]] = None,
|
|
on_log: Optional[Callable[[str], None]] = None,
|
|
stop_event: Optional[threading.Event] = None,
|
|
) -> RunSummary:
|
|
"""Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI).
|
|
|
|
Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement).
|
|
"""
|
|
with self._lock:
|
|
if self._running:
|
|
raise RuntimeError("Un traitement est déjà en cours.")
|
|
self._running = True
|
|
try:
|
|
return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event)
|
|
finally:
|
|
with self._lock:
|
|
self._running = False
|
|
|
|
def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary:
|
|
input_path = Path(input_path)
|
|
docs = self.discover(input_path)
|
|
out_root = Path(output_dir) if output_dir else default_output_dir(input_path)
|
|
root_dir = input_path if input_path.is_dir() else input_path.parent
|
|
summary = RunSummary(total=len(docs))
|
|
|
|
def log(message: str) -> None:
|
|
if on_log:
|
|
on_log(message)
|
|
|
|
if not docs:
|
|
log("Aucun document supporté détecté.")
|
|
return summary
|
|
|
|
# Sonde amont : on vérifie une seule fois que le dossier de sortie est
|
|
# inscriptible AVANT la boucle, pour un échec clair et unique (P1-6)
|
|
# plutôt qu'une erreur cryptique répétée à chaque document.
|
|
try:
|
|
out_root.mkdir(parents=True, exist_ok=True)
|
|
probe = out_root / ".anon_write_test"
|
|
probe.write_text("", encoding="utf-8")
|
|
probe.unlink()
|
|
except Exception as exc:
|
|
raise OutputNotWritableError(
|
|
f"Dossier de sortie non inscriptible : {out_root} ({exc})"
|
|
) from exc
|
|
|
|
for index, doc in enumerate(docs, start=1):
|
|
if stop_event is not None and stop_event.is_set():
|
|
summary.stopped = True
|
|
log("Arrêt demandé — traitement interrompu entre deux documents.")
|
|
break
|
|
if on_progress:
|
|
on_progress(index - 1, summary.total, doc.name)
|
|
# Détails anonymisés pour la télémétrie (jamais le nom/chemin).
|
|
extension = doc.suffix.lstrip(".").lower() or None
|
|
page_count = page_count_for(doc)
|
|
started = time.monotonic()
|
|
status = "success"
|
|
try:
|
|
if input_path.is_dir():
|
|
doc_out = build_batch_output_dir(root_dir, out_root, doc)
|
|
else:
|
|
doc_out = out_root
|
|
doc_out.mkdir(parents=True, exist_ok=True)
|
|
result = self._process_fn(doc, doc_out)
|
|
result_error = _engine_result_error(result)
|
|
if result_error is not None:
|
|
raise RuntimeError(result_error)
|
|
summary.succeeded += 1
|
|
log(f"OK : {doc.name}")
|
|
except Exception as exc: # un échec n'interrompt pas le lot
|
|
status = "failed"
|
|
summary.failed += 1
|
|
summary.errors.append((doc.name, str(exc)))
|
|
log(f"ÉCHEC : {doc.name} — {exc}")
|
|
summary.documents.append(
|
|
DocResult(
|
|
ordinal=index - 1,
|
|
page_count=page_count,
|
|
status=status,
|
|
duration_ms=int((time.monotonic() - started) * 1000),
|
|
extension=extension,
|
|
)
|
|
)
|
|
if on_progress:
|
|
on_progress(index, summary.total, doc.name)
|
|
return summary
|