Files
anonymisation/tests/unit/test_gui_v6_processing_runner.py
Domi31tls 8d683bc6d8 feat(ocr): migrer l'OCR de docTR (PyTorch) vers OnnxTR (ONNX Runtime)
OnnxTR exécute les MÊMES modèles que docTR (db_resnet50 + crnn_vgg16_bn) sur
ONNX Runtime, sans PyTorch. Corrige le crash torch/oneDNN « could not create a
primitive » sur CPU contraint (VM 2 cœurs collaborateur : OCR scan impossible →
quarantaine). Qualité identique validée empiriquement (CER 0,10-0,23 % vs docTR,
2 validations indépendantes Claude+Qwen), OCR ~2-3× plus rapide CPU.

- core : import OnnxTR, _get_ocr_model(), _OCR_AVAILABLE, boucle OCR inchangée
  (API miroir) ; ONNXTR_CACHE_DIR pour le frozen ; bandeau de logs ENV au démarrage
  (OS, CPU+AVX, cœurs, RAM, versions, providers) pour retours terrain auto-suffisants.
- 3 .spec : embarquent les poids ONNX OnnxTR (fail-closed) + hiddenimports onnxtr.
- requirements : onnxtr[cpu] (python-doctr conservé transitoirement).
- inclut le correctif quarantaine-visible du runner (GO Qwen).

Tests : test_ocr_onnxtr.py (RED→GREEN), 95 unit passed, e2e scan client OK
(OCR 5/5, PDF produit, plus de crash). Retrait torch du frozen + rebuild Windows
= étapes suivantes (gates Dom).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 17:07:00 +02:00

247 lines
7.4 KiB
Python

"""Tests du runner G2 : process_fn injectée, vrais fichiers tmp, aucun moteur réel."""
from __future__ import annotations
import threading
from pathlib import Path
import pytest
from gui_v6.processing_runner import (
ProcessingRunner,
RunSummary,
default_output_dir,
discover_documents,
)
_EXTS = (".pdf", ".txt")
def _touch(path: Path) -> Path:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text("x", encoding="utf-8")
return path
# -- découverte & chemins --------------------------------------------------
def test_discover_single_file(tmp_path):
f = _touch(tmp_path / "doc.pdf")
assert discover_documents(f, _EXTS) == [f]
def test_discover_single_file_unsupported(tmp_path):
f = _touch(tmp_path / "doc.xyz")
assert discover_documents(f, _EXTS) == []
def test_discover_folder_sorted_and_skips_output(tmp_path):
_touch(tmp_path / "b.pdf")
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "note.txt")
_touch(tmp_path / "anonymise" / "already.pdf") # sous-arbre de sortie ignoré
found = discover_documents(tmp_path, _EXTS)
names = [p.name for p in found]
assert names == ["a.pdf", "b.pdf", "note.txt"]
def test_default_output_dir_file_and_dir(tmp_path):
f = _touch(tmp_path / "doc.pdf")
assert default_output_dir(f) == tmp_path / "anonymise"
assert default_output_dir(tmp_path) == tmp_path / "anonymise"
# -- exécution -------------------------------------------------------------
def test_run_processes_all_docs(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "b.pdf")
calls = []
runner = ProcessingRunner(process_fn=lambda d, o: calls.append((d, o)) or {}, extensions=_EXTS)
summary = runner.run(tmp_path)
assert isinstance(summary, RunSummary)
assert summary.total == 2
assert summary.succeeded == 2
assert summary.failed == 0
assert summary.ok is True
assert len(calls) == 2
# Le dossier de sortie par défaut a été créé.
assert (tmp_path / "anonymise").is_dir()
def test_run_single_file_uses_output_dir(tmp_path):
f = _touch(tmp_path / "doc.pdf")
out = tmp_path / "sortie"
seen = {}
runner = ProcessingRunner(process_fn=lambda d, o: seen.update(doc=d, out=o) or {}, extensions=_EXTS)
summary = runner.run(f, output_dir=out)
assert summary.total == 1 and summary.succeeded == 1
assert seen["doc"] == f
assert seen["out"] == out
assert out.is_dir()
def test_run_continues_after_failure(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "boom.pdf")
_touch(tmp_path / "c.pdf")
def proc(doc, out):
if doc.name == "boom.pdf":
raise RuntimeError("explosion")
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(tmp_path)
assert summary.total == 3
assert summary.succeeded == 2
assert summary.failed == 1
assert summary.ok is False
assert summary.errors[0][0] == "boom.pdf"
assert "explosion" in summary.errors[0][1]
def test_run_marks_quarantined_engine_result_as_failure(tmp_path):
f = _touch(tmp_path / "scan.pdf")
logs = []
def proc(doc, out):
return {"status": "quarantined", "reason": "preflight_text_too_short"}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(f, on_log=logs.append)
assert summary.succeeded == 0
assert summary.failed == 1
assert summary.ok is False
assert summary.documents[0].status == "failed"
assert "preflight_text_too_short" in summary.errors[0][1]
assert any("ÉCHEC : scan.pdf" in item for item in logs)
def test_run_marks_missing_pdf_output_as_failure(tmp_path):
f = _touch(tmp_path / "doc.pdf")
out = tmp_path / "sortie"
def proc(doc, out_dir):
txt = out_dir / "doc.pseudonymise.txt"
audit = out_dir / "doc.audit.jsonl"
txt.write_text("ok", encoding="utf-8")
audit.write_text("{}", encoding="utf-8")
return {"text": str(txt), "audit": str(audit)}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(f, output_dir=out)
assert summary.succeeded == 0
assert summary.failed == 1
assert summary.documents[0].status == "failed"
assert "Aucune sortie PDF" in summary.errors[0][1]
def test_run_accepts_existing_pdf_output(tmp_path):
f = _touch(tmp_path / "doc.pdf")
out = tmp_path / "sortie"
def proc(doc, out_dir):
pdf = out_dir / "doc.redacted_raster.pdf"
pdf.write_bytes(b"%PDF-1.4\n")
return {"pdf_raster": str(pdf)}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(f, output_dir=out)
assert summary.succeeded == 1
assert summary.failed == 0
assert summary.documents[0].status == "success"
def test_run_empty_folder(tmp_path):
logs = []
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
summary = runner.run(tmp_path, on_log=logs.append)
assert summary.total == 0
assert any("Aucun document" in m for m in logs)
def test_stop_event_interrupts_between_docs(tmp_path):
for name in ("a.pdf", "b.pdf", "c.pdf"):
_touch(tmp_path / name)
stop = threading.Event()
processed = []
def proc(doc, out):
processed.append(doc.name)
stop.set() # demande l'arrêt après le 1er document
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
summary = runner.run(tmp_path, stop_event=stop)
assert summary.stopped is True
assert summary.succeeded == 1
assert len(processed) == 1 # arrêt effectif entre deux documents
def test_progress_callbacks(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "b.pdf")
events = []
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
runner.run(tmp_path, on_progress=lambda done, total, name: events.append((done, total)))
assert (2, 2) in events # progression finale atteinte
def test_no_double_run(tmp_path):
_touch(tmp_path / "a.pdf")
started = threading.Event()
release = threading.Event()
result = {}
def proc(doc, out):
started.set()
release.wait(timeout=2)
return {}
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
worker = threading.Thread(target=lambda: runner.run(tmp_path))
worker.start()
assert started.wait(timeout=2)
# Pendant le run, un second lancement est refusé.
with pytest.raises(RuntimeError):
runner.run(tmp_path)
release.set()
worker.join(timeout=2)
assert runner.is_running is False
# -- détails par document (télémétrie) -------------------------------------
def test_run_records_per_document_details(tmp_path):
_touch(tmp_path / "a.pdf")
_touch(tmp_path / "b.pdf")
def fake(doc, out):
if doc.name == "b.pdf":
raise RuntimeError("boom")
return {}
runner = ProcessingRunner(process_fn=fake, extensions=_EXTS)
summary = runner.run(tmp_path)
assert len(summary.documents) == 2
statuses = {doc.ordinal: doc.status for doc in summary.documents}
assert statuses == {0: "success", 1: "failed"}
for doc in summary.documents:
assert doc.extension == "pdf"
assert isinstance(doc.duration_ms, int)
# RGPD : aucun nom/chemin de fichier dans les détails
assert not hasattr(doc, "path")
assert not hasattr(doc, "filename")
assert not hasattr(doc, "name")