Le module usage_telemetry est maintenant réellement branché : la GUI V6 envoie les statistiques au portail après chaque run (les stats web restaient vides sans cela). - processing_runner : RunSummary porte une liste DocResult (ordinal, page_count via page_count_for, status, duration_ms, extension) — peuplée dans la boucle. Aucun nom/chemin de fichier. - usage_telemetry : report_run_summary(summary, base_url, license_ref, machine_id, session, ...) construit le payload depuis le RunSummary et l'envoie (non bloquant). N'envoie RIEN sans license_ref. Spool JSONL si échec réseau. - tab_usage : _finish() déclenche l'envoi en thread daemon (jamais bloquant pour l'UI ni le run). - app : fournit le reporter à UsageTab avec le contexte licence (base_url du LicenseClient, license_ref via local_status, machine_id, app_version). Tests : RunSummary.documents peuplé (0 chemin) ; report_run_summary (payload correct, réseau KO → spool sans crash, pas d'envoi sans licence) ; _finish appelle le reporter. 252 tests unit OK (0 régression), self-test OK. V5/moteur/app_aivanov intacts, 0 dépendance. Aucun build/push sans GO Dom. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
192 lines
5.6 KiB
Python
192 lines
5.6 KiB
Python
"""Tests du runner G2 : process_fn injectée, vrais fichiers tmp, aucun moteur réel."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from gui_v6.processing_runner import (
|
|
ProcessingRunner,
|
|
RunSummary,
|
|
default_output_dir,
|
|
discover_documents,
|
|
)
|
|
|
|
_EXTS = (".pdf", ".txt")
|
|
|
|
|
|
def _touch(path: Path) -> Path:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text("x", encoding="utf-8")
|
|
return path
|
|
|
|
|
|
# -- découverte & chemins --------------------------------------------------
|
|
|
|
def test_discover_single_file(tmp_path):
|
|
f = _touch(tmp_path / "doc.pdf")
|
|
assert discover_documents(f, _EXTS) == [f]
|
|
|
|
|
|
def test_discover_single_file_unsupported(tmp_path):
|
|
f = _touch(tmp_path / "doc.xyz")
|
|
assert discover_documents(f, _EXTS) == []
|
|
|
|
|
|
def test_discover_folder_sorted_and_skips_output(tmp_path):
|
|
_touch(tmp_path / "b.pdf")
|
|
_touch(tmp_path / "a.pdf")
|
|
_touch(tmp_path / "note.txt")
|
|
_touch(tmp_path / "anonymise" / "already.pdf") # sous-arbre de sortie ignoré
|
|
found = discover_documents(tmp_path, _EXTS)
|
|
names = [p.name for p in found]
|
|
assert names == ["a.pdf", "b.pdf", "note.txt"]
|
|
|
|
|
|
def test_default_output_dir_file_and_dir(tmp_path):
|
|
f = _touch(tmp_path / "doc.pdf")
|
|
assert default_output_dir(f) == tmp_path / "anonymise"
|
|
assert default_output_dir(tmp_path) == tmp_path / "anonymise"
|
|
|
|
|
|
# -- exécution -------------------------------------------------------------
|
|
|
|
def test_run_processes_all_docs(tmp_path):
|
|
_touch(tmp_path / "a.pdf")
|
|
_touch(tmp_path / "b.pdf")
|
|
calls = []
|
|
runner = ProcessingRunner(process_fn=lambda d, o: calls.append((d, o)) or {}, extensions=_EXTS)
|
|
|
|
summary = runner.run(tmp_path)
|
|
|
|
assert isinstance(summary, RunSummary)
|
|
assert summary.total == 2
|
|
assert summary.succeeded == 2
|
|
assert summary.failed == 0
|
|
assert summary.ok is True
|
|
assert len(calls) == 2
|
|
# Le dossier de sortie par défaut a été créé.
|
|
assert (tmp_path / "anonymise").is_dir()
|
|
|
|
|
|
def test_run_single_file_uses_output_dir(tmp_path):
|
|
f = _touch(tmp_path / "doc.pdf")
|
|
out = tmp_path / "sortie"
|
|
seen = {}
|
|
runner = ProcessingRunner(process_fn=lambda d, o: seen.update(doc=d, out=o) or {}, extensions=_EXTS)
|
|
|
|
summary = runner.run(f, output_dir=out)
|
|
|
|
assert summary.total == 1 and summary.succeeded == 1
|
|
assert seen["doc"] == f
|
|
assert seen["out"] == out
|
|
assert out.is_dir()
|
|
|
|
|
|
def test_run_continues_after_failure(tmp_path):
|
|
_touch(tmp_path / "a.pdf")
|
|
_touch(tmp_path / "boom.pdf")
|
|
_touch(tmp_path / "c.pdf")
|
|
|
|
def proc(doc, out):
|
|
if doc.name == "boom.pdf":
|
|
raise RuntimeError("explosion")
|
|
return {}
|
|
|
|
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
|
summary = runner.run(tmp_path)
|
|
|
|
assert summary.total == 3
|
|
assert summary.succeeded == 2
|
|
assert summary.failed == 1
|
|
assert summary.ok is False
|
|
assert summary.errors[0][0] == "boom.pdf"
|
|
assert "explosion" in summary.errors[0][1]
|
|
|
|
|
|
def test_run_empty_folder(tmp_path):
|
|
logs = []
|
|
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
|
|
summary = runner.run(tmp_path, on_log=logs.append)
|
|
assert summary.total == 0
|
|
assert any("Aucun document" in m for m in logs)
|
|
|
|
|
|
def test_stop_event_interrupts_between_docs(tmp_path):
|
|
for name in ("a.pdf", "b.pdf", "c.pdf"):
|
|
_touch(tmp_path / name)
|
|
stop = threading.Event()
|
|
processed = []
|
|
|
|
def proc(doc, out):
|
|
processed.append(doc.name)
|
|
stop.set() # demande l'arrêt après le 1er document
|
|
return {}
|
|
|
|
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
|
summary = runner.run(tmp_path, stop_event=stop)
|
|
|
|
assert summary.stopped is True
|
|
assert summary.succeeded == 1
|
|
assert len(processed) == 1 # arrêt effectif entre deux documents
|
|
|
|
|
|
def test_progress_callbacks(tmp_path):
|
|
_touch(tmp_path / "a.pdf")
|
|
_touch(tmp_path / "b.pdf")
|
|
events = []
|
|
runner = ProcessingRunner(process_fn=lambda d, o: {}, extensions=_EXTS)
|
|
runner.run(tmp_path, on_progress=lambda done, total, name: events.append((done, total)))
|
|
assert (2, 2) in events # progression finale atteinte
|
|
|
|
|
|
def test_no_double_run(tmp_path):
|
|
_touch(tmp_path / "a.pdf")
|
|
started = threading.Event()
|
|
release = threading.Event()
|
|
result = {}
|
|
|
|
def proc(doc, out):
|
|
started.set()
|
|
release.wait(timeout=2)
|
|
return {}
|
|
|
|
runner = ProcessingRunner(process_fn=proc, extensions=_EXTS)
|
|
worker = threading.Thread(target=lambda: runner.run(tmp_path))
|
|
worker.start()
|
|
assert started.wait(timeout=2)
|
|
# Pendant le run, un second lancement est refusé.
|
|
with pytest.raises(RuntimeError):
|
|
runner.run(tmp_path)
|
|
release.set()
|
|
worker.join(timeout=2)
|
|
assert runner.is_running is False
|
|
|
|
|
|
# -- détails par document (télémétrie) -------------------------------------
|
|
|
|
def test_run_records_per_document_details(tmp_path):
|
|
_touch(tmp_path / "a.pdf")
|
|
_touch(tmp_path / "b.pdf")
|
|
|
|
def fake(doc, out):
|
|
if doc.name == "b.pdf":
|
|
raise RuntimeError("boom")
|
|
return {}
|
|
|
|
runner = ProcessingRunner(process_fn=fake, extensions=_EXTS)
|
|
summary = runner.run(tmp_path)
|
|
|
|
assert len(summary.documents) == 2
|
|
statuses = {doc.ordinal: doc.status for doc in summary.documents}
|
|
assert statuses == {0: "success", 1: "failed"}
|
|
for doc in summary.documents:
|
|
assert doc.extension == "pdf"
|
|
assert isinstance(doc.duration_ms, int)
|
|
# RGPD : aucun nom/chemin de fichier dans les détails
|
|
assert not hasattr(doc, "path")
|
|
assert not hasattr(doc, "filename")
|
|
assert not hasattr(doc, "name")
|