Files
anonymisation/gui_v6/processing_runner.py
Domi31tls 8d683bc6d8 feat(ocr): migrer l'OCR de docTR (PyTorch) vers OnnxTR (ONNX Runtime)
OnnxTR exécute les MÊMES modèles que docTR (db_resnet50 + crnn_vgg16_bn) sur
ONNX Runtime, sans PyTorch. Corrige le crash torch/oneDNN « could not create a
primitive » sur CPU contraint (VM 2 cœurs collaborateur : OCR scan impossible →
quarantaine). Qualité identique validée empiriquement (CER 0,10-0,23 % vs docTR,
2 validations indépendantes Claude+Qwen), OCR ~2-3× plus rapide CPU.

- core : import OnnxTR, _get_ocr_model(), _OCR_AVAILABLE, boucle OCR inchangée
  (API miroir) ; ONNXTR_CACHE_DIR pour le frozen ; bandeau de logs ENV au démarrage
  (OS, CPU+AVX, cœurs, RAM, versions, providers) pour retours terrain auto-suffisants.
- 3 .spec : embarquent les poids ONNX OnnxTR (fail-closed) + hiddenimports onnxtr.
- requirements : onnxtr[cpu] (python-doctr conservé transitoirement).
- inclut le correctif quarantaine-visible du runner (GO Qwen).

Tests : test_ocr_onnxtr.py (RED→GREEN), 95 unit passed, e2e scan client OK
(OCR 5/5, PDF produit, plus de crash). Retrait torch du frozen + rebuild Windows
= étapes suivantes (gates Dom).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 17:07:00 +02:00

238 lines
8.5 KiB
Python

"""Runner de traitement pour la GUI V6 (testable sans display ni moteur lourd).
Le runner orchestre l'anonymisation document par document via une fonction de
traitement **injectable** :
- en production, le défaut appelle ``process_document`` du moteur (import paresseux,
aucun manager NER chargé à l'import de ce module) ;
- en test, on injecte une fausse fonction — aucun appel réseau, aucun modèle.
Il ne contient aucune logique de détection : il découvre les documents, construit
les dossiers de sortie comme la V5 (``anonymise/`` sous la source, arborescence
préservée), exécute le traitement, et expose progression / journal / résumé /
arrêt coopératif (entre deux documents).
"""
from __future__ import annotations
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional, Sequence
from gui_batch_paths import build_batch_output_dir, list_supported_documents
from gui_v6.usage_telemetry import page_count_for
# process_fn(doc_path, out_dir) -> dict de sortie (ignoré par le runner).
ProcessFn = Callable[[Path, Path], dict]
# Repli si format_converter indisponible à l'exécution (ne sert qu'au listing).
_FALLBACK_EXTENSIONS = (
".pdf", ".docx", ".odt", ".rtf", ".txt", ".html", ".htm",
".jpg", ".jpeg", ".png", ".tiff", ".tif", ".bmp",
)
def supported_extensions() -> tuple[str, ...]:
"""Extensions supportées : depuis ``format_converter`` si dispo, sinon repli."""
try:
from format_converter import SUPPORTED_EXTENSIONS
return tuple(sorted(SUPPORTED_EXTENSIONS))
except Exception:
return _FALLBACK_EXTENSIONS
def default_output_dir(input_path) -> Path:
"""Dossier de sortie par défaut : ``anonymise/`` sous la source."""
path = Path(input_path)
base = path if path.is_dir() else path.parent
return base / "anonymise"
def _delivered_pdf_paths(result: object) -> list[Path]:
"""Retourne les PDF effectivement produits par le moteur.
Le moteur retourne toujours des clés ``pdf_*`` pour une sortie livrable.
Les tests unitaires historiques injectent souvent ``{}`` comme succès factice ;
on ne les assimile donc pas à un échec ici.
"""
if not isinstance(result, dict):
return []
paths: list[Path] = []
for key, value in result.items():
if not str(key).startswith("pdf") or not isinstance(value, (str, Path)):
continue
path = Path(value)
if path.exists() and path.is_file():
paths.append(path)
return paths
def _engine_result_error(result: object) -> str | None:
"""Traduit un retour moteur non livrable en erreur visible GUI."""
if not isinstance(result, dict):
return None
if result.get("status") == "quarantined":
reason = result.get("reason") or "document mis en quarantaine"
return f"Document mis en quarantaine : {reason}"
has_real_engine_outputs = (
"text" in result
or "audit" in result
or any(str(key).startswith("pdf") for key in result)
)
if has_real_engine_outputs and not _delivered_pdf_paths(result):
return "Aucune sortie PDF anonymisée produite."
return None
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
path = Path(input_path)
exts = tuple(extensions) if extensions is not None else supported_extensions()
normalized = {e.lower() for e in exts}
if path.is_file():
return [path] if path.suffix.lower() in normalized else []
if path.is_dir():
return list_supported_documents(path, exts)
return []
@dataclass
class DocResult:
"""Détail anonymisé d'un document traité (pour la télémétrie d'usage).
RGPD : aucun nom ni chemin de fichier — uniquement des métadonnées.
"""
ordinal: int
page_count: Optional[int]
status: str # "success" | "failed"
duration_ms: Optional[int]
extension: Optional[str]
@dataclass
class RunSummary:
"""Résultat d'un run : compteurs et erreurs par document."""
total: int = 0
succeeded: int = 0
failed: int = 0
stopped: bool = False
errors: list = field(default_factory=list) # list[tuple[str, str]] (nom, message)
documents: list = field(default_factory=list) # list[DocResult] (anonymisé)
@property
def ok(self) -> bool:
return self.failed == 0 and not self.stopped
def _default_process_fn(doc_path: Path, out_dir: Path) -> dict:
# Import paresseux : aucun manager NER chargé à l'import du runner.
from anonymizer_core_refactored_onnx import process_document
return process_document(doc_path, out_dir)
class ProcessingRunner:
"""Exécute le traitement document par document, arrêt coopératif inclus."""
def __init__(
self,
process_fn: Optional[ProcessFn] = None,
extensions: Optional[Sequence[str]] = None,
) -> None:
self._process_fn = process_fn or _default_process_fn
self._extensions = tuple(extensions) if extensions is not None else None
self._lock = threading.Lock()
self._running = False
@property
def is_running(self) -> bool:
return self._running
def discover(self, input_path) -> list[Path]:
return discover_documents(input_path, self._extensions)
def run(
self,
input_path,
output_dir=None,
*,
on_progress: Optional[Callable[[int, int, str], None]] = None,
on_log: Optional[Callable[[str], None]] = None,
stop_event: Optional[threading.Event] = None,
) -> RunSummary:
"""Traite les documents de ``input_path``. Synchrone (lancer dans un thread pour l'UI).
Lève ``RuntimeError`` si un run est déjà en cours (anti double-lancement).
"""
with self._lock:
if self._running:
raise RuntimeError("Un traitement est déjà en cours.")
self._running = True
try:
return self._run_impl(input_path, output_dir, on_progress, on_log, stop_event)
finally:
with self._lock:
self._running = False
def _run_impl(self, input_path, output_dir, on_progress, on_log, stop_event) -> RunSummary:
input_path = Path(input_path)
docs = self.discover(input_path)
out_root = Path(output_dir) if output_dir else default_output_dir(input_path)
root_dir = input_path if input_path.is_dir() else input_path.parent
summary = RunSummary(total=len(docs))
def log(message: str) -> None:
if on_log:
on_log(message)
if not docs:
log("Aucun document supporté détecté.")
return summary
for index, doc in enumerate(docs, start=1):
if stop_event is not None and stop_event.is_set():
summary.stopped = True
log("Arrêt demandé — traitement interrompu entre deux documents.")
break
if on_progress:
on_progress(index - 1, summary.total, doc.name)
# Détails anonymisés pour la télémétrie (jamais le nom/chemin).
extension = doc.suffix.lstrip(".").lower() or None
page_count = page_count_for(doc)
started = time.monotonic()
status = "success"
try:
if input_path.is_dir():
doc_out = build_batch_output_dir(root_dir, out_root, doc)
else:
doc_out = out_root
doc_out.mkdir(parents=True, exist_ok=True)
result = self._process_fn(doc, doc_out)
result_error = _engine_result_error(result)
if result_error is not None:
raise RuntimeError(result_error)
summary.succeeded += 1
log(f"OK : {doc.name}")
except Exception as exc: # un échec n'interrompt pas le lot
status = "failed"
summary.failed += 1
summary.errors.append((doc.name, str(exc)))
log(f"ÉCHEC : {doc.name}{exc}")
summary.documents.append(
DocResult(
ordinal=index - 1,
page_count=page_count,
status=status,
duration_ms=int((time.monotonic() - started) * 1000),
extension=extension,
)
)
if on_progress:
on_progress(index, summary.total, doc.name)
return summary