Compare commits
5 Commits
675e328d8c
...
8f9107a27f
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f9107a27f | |||
| 8eb8cf9999 | |||
| 4b7a31b9df | |||
| 4412512d4b | |||
| 952a1c6ca0 |
@@ -193,6 +193,7 @@ class AnonymisationApp(ctk.CTk):
|
||||
on_theme_change=self.set_theme,
|
||||
current_theme=self._theme_name,
|
||||
usage_reporter=self._report_usage,
|
||||
diag_reporter=self._report_diagnostics,
|
||||
)
|
||||
if key == "cfg":
|
||||
return ConfigTab(self._content, palette=p, state=self._config, config_path=self._user_config_path)
|
||||
@@ -243,6 +244,36 @@ class AnonymisationApp(ctk.CTk):
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _report_diagnostics(self, summary) -> None:
|
||||
"""Envoie les diagnostics en fin de run (non bloquant, best-effort).
|
||||
|
||||
N'envoie rien si aucune licence locale valide. Ne lève jamais.
|
||||
"""
|
||||
try:
|
||||
from gui_v6 import __version__ as gui_version
|
||||
from gui_v6 import diagnostics
|
||||
from gui_v6.logging_setup import log_file_path
|
||||
from gui_v6.machine_id import default_machine_id
|
||||
|
||||
session = self._usage_session()
|
||||
if session is None:
|
||||
return
|
||||
status = self._safe_local_status()
|
||||
base_url = getattr(self._license_client, "_base_url", "") or resolve_portal_url()
|
||||
spool = log_file_path().parent / "diagnostics_spool.jsonl"
|
||||
diagnostics.report_run_diagnostics(
|
||||
summary,
|
||||
base_url=base_url,
|
||||
license_ref=getattr(status, "license_ref", None),
|
||||
machine_id=default_machine_id(),
|
||||
session=session,
|
||||
app_name="gui_v6",
|
||||
app_version=gui_version,
|
||||
spool_path=spool,
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def _show(self, key: str) -> None:
|
||||
self._active = key
|
||||
self._refresh_tabbar()
|
||||
|
||||
190
gui_v6/diagnostics.py
Normal file
190
gui_v6/diagnostics.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""Diagnostics structurés de la GUI V6 (E2/E3) — RGPD strict.
|
||||
|
||||
On n'émet QUE des métadonnées techniques liste-blanche : type d'exception
|
||||
(nom de classe), catégorie d'erreur d'un ensemble fermé, statut, ordinal,
|
||||
durée. JAMAIS de nom/chemin/texte de document, ni de message d'exception brut.
|
||||
L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement.
|
||||
Patron : gui_v6/usage_telemetry.py (télémétrie d'usage).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from typing import Any, Callable, Iterable, Optional
|
||||
|
||||
# Clés autorisées par item de diagnostic (filtre RGPD appliqué à la construction).
|
||||
_ALLOWED_ITEM_KEYS = {"ordinal", "status", "error_type", "error_code", "duration_ms"}
|
||||
|
||||
REPORT_PATH = "/api/v1/diagnostics/report"
|
||||
|
||||
|
||||
def new_run_id() -> str:
|
||||
return uuid.uuid4().hex
|
||||
|
||||
|
||||
def items_from_summary(summary: Any) -> list[dict]:
|
||||
"""Extrait les items de diagnostic (RGPD-safe) d'un ``RunSummary``.
|
||||
|
||||
Ne lit que les attributs autorisés ; aucun nom/chemin/message n'est lu.
|
||||
"""
|
||||
items: list[dict] = []
|
||||
for item in getattr(summary, "documents", None) or []:
|
||||
items.append(
|
||||
{
|
||||
"ordinal": getattr(item, "ordinal", 0),
|
||||
"status": getattr(item, "status", "success"),
|
||||
"error_type": getattr(item, "error_type", None),
|
||||
"error_code": getattr(item, "error_code", None),
|
||||
"duration_ms": getattr(item, "duration_ms", None),
|
||||
}
|
||||
)
|
||||
return items
|
||||
|
||||
|
||||
def build_diagnostics_payload(
|
||||
*,
|
||||
run_id: str,
|
||||
app_name: str,
|
||||
app_version: Optional[str],
|
||||
license_ref: Optional[str],
|
||||
machine_id: Optional[str],
|
||||
duration_ms: Optional[int],
|
||||
items: Iterable[dict],
|
||||
) -> dict:
|
||||
"""Construit le payload diagnostic. Chaque item est filtré aux seules clés
|
||||
autorisées → aucun nom/chemin/message ne peut fuir, même fourni par erreur."""
|
||||
clean_items: list[dict] = []
|
||||
succeeded = failed = 0
|
||||
for raw in items:
|
||||
it = {k: raw[k] for k in _ALLOWED_ITEM_KEYS if k in raw}
|
||||
status = it.get("status")
|
||||
if status == "success":
|
||||
succeeded += 1
|
||||
elif status == "failed":
|
||||
failed += 1
|
||||
clean_items.append(it)
|
||||
return {
|
||||
"run_id": run_id,
|
||||
"license_ref": license_ref,
|
||||
"machine_id": machine_id,
|
||||
"app_name": app_name,
|
||||
"app_version": app_version,
|
||||
"duration_ms": duration_ms,
|
||||
"document_count": len(clean_items),
|
||||
"succeeded_count": succeeded,
|
||||
"failed_count": failed,
|
||||
"items": clean_items,
|
||||
}
|
||||
|
||||
|
||||
class DiagnosticsClient:
|
||||
"""Envoie un payload diagnostic au portail. Non bloquant : capture toute erreur."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
base_url: str,
|
||||
session: Any,
|
||||
timeout: float = 4.0,
|
||||
logger: Optional[Callable[[str], None]] = None,
|
||||
) -> None:
|
||||
self._url = base_url.rstrip("/") + REPORT_PATH
|
||||
self._session = session
|
||||
self._timeout = timeout
|
||||
self._log = logger or (lambda _msg: None)
|
||||
|
||||
def report(self, payload: dict) -> bool:
|
||||
try:
|
||||
resp = self._session.post(self._url, json=payload, timeout=self._timeout)
|
||||
status = getattr(resp, "status_code", 0)
|
||||
ok = 200 <= int(status) < 300
|
||||
if not ok:
|
||||
self._log(f"diagnostics report refusé (HTTP {status})")
|
||||
return ok
|
||||
except Exception as exc: # réseau absent, timeout, etc.
|
||||
self._log(f"diagnostics report échec (non bloquant) : {exc}")
|
||||
return False
|
||||
|
||||
|
||||
def report_run_diagnostics(
|
||||
summary: Any,
|
||||
*,
|
||||
base_url: str,
|
||||
license_ref: Optional[str],
|
||||
machine_id: Optional[str],
|
||||
session: Any,
|
||||
app_name: str = "gui_v6",
|
||||
app_version: Optional[str] = None,
|
||||
duration_ms: Optional[int] = None,
|
||||
run_id: Optional[str] = None,
|
||||
spool_path: Any = None,
|
||||
logger: Optional[Callable[[str], None]] = None,
|
||||
) -> bool:
|
||||
"""Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant).
|
||||
|
||||
N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole
|
||||
le payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais.
|
||||
"""
|
||||
log = logger or (lambda _msg: None)
|
||||
if not license_ref:
|
||||
log("diagnostics ignorés : aucune licence locale valide")
|
||||
return False
|
||||
payload = build_diagnostics_payload(
|
||||
run_id=run_id or new_run_id(),
|
||||
app_name=app_name,
|
||||
app_version=app_version,
|
||||
license_ref=license_ref,
|
||||
machine_id=machine_id,
|
||||
duration_ms=duration_ms,
|
||||
items=items_from_summary(summary),
|
||||
)
|
||||
client = DiagnosticsClient(base_url, session=session, logger=log)
|
||||
ok = client.report(payload)
|
||||
if not ok and spool_path is not None:
|
||||
spool_payload(spool_path, payload)
|
||||
return ok
|
||||
|
||||
|
||||
def spool_payload(path: Any, payload: dict) -> None:
|
||||
"""Ajoute un payload à la file JSONL locale (ne lève pas)."""
|
||||
try:
|
||||
p = Path(path)
|
||||
p.parent.mkdir(parents=True, exist_ok=True)
|
||||
with p.open("a", encoding="utf-8") as fh:
|
||||
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def flush_spool(path: Any, client: "DiagnosticsClient") -> int:
|
||||
"""Tente d'envoyer chaque payload en file ; conserve ceux qui échouent.
|
||||
|
||||
Retourne le nombre de payloads envoyés. Ne lève jamais.
|
||||
"""
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
return 0
|
||||
try:
|
||||
lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()]
|
||||
except Exception:
|
||||
return 0
|
||||
remaining: list[str] = []
|
||||
sent = 0
|
||||
for line in lines:
|
||||
try:
|
||||
payload = json.loads(line)
|
||||
except Exception:
|
||||
continue
|
||||
if client.report(payload):
|
||||
sent += 1
|
||||
else:
|
||||
remaining.append(line)
|
||||
try:
|
||||
if remaining:
|
||||
p.write_text("\n".join(remaining) + "\n", encoding="utf-8")
|
||||
else:
|
||||
p.unlink(missing_ok=True)
|
||||
except Exception:
|
||||
pass
|
||||
return sent
|
||||
@@ -87,6 +87,32 @@ def _engine_result_error(result: object) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
# Ensemble FERMÉ de catégories d'erreur (aucune PII ne peut y entrer).
|
||||
_ERROR_CODES = ("ner_unavailable", "quarantined", "no_output", "processing_error")
|
||||
|
||||
|
||||
def classify_error_code(exc: Exception) -> str:
|
||||
"""Catégorise une exception de run en une valeur de l'ensemble fermé _ERROR_CODES.
|
||||
|
||||
Lit le type et d'éventuels préfixes de message GÉNÉRÉS PAR NOUS pour classer ;
|
||||
ne renvoie JAMAIS le message lui-même (RGPD). Inconnu → 'processing_error'.
|
||||
"""
|
||||
name = type(exc).__name__
|
||||
if name == "EngineUnavailableError":
|
||||
return "ner_unavailable"
|
||||
msg = str(exc)
|
||||
# ⚠ ANTI-DÉRIVE : ces littéraux DOIVENT rester synchronisés avec les messages
|
||||
# produits par `_engine_result_error` ci-dessus ("Document mis en quarantaine :"
|
||||
# et "Aucune sortie PDF anonymisée produite."). Si l'un est reformulé sans
|
||||
# mettre à jour l'autre, l'erreur retombe silencieusement en 'processing_error'
|
||||
# (couvert par les tests test_classify_error_code_*).
|
||||
if "quarantaine" in msg:
|
||||
return "quarantined"
|
||||
if "Aucune sortie" in msg:
|
||||
return "no_output"
|
||||
return "processing_error"
|
||||
|
||||
|
||||
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
|
||||
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
|
||||
path = Path(input_path)
|
||||
@@ -115,6 +141,10 @@ class DocResult:
|
||||
status: str # "success" | "failed"
|
||||
duration_ms: Optional[int]
|
||||
extension: Optional[str]
|
||||
# Diagnostics RGPD-safe : nom de classe d'exception + catégorie fermée.
|
||||
# JAMAIS le message d'exception (str(exc)) ni nom/chemin de document.
|
||||
error_type: Optional[str] = None
|
||||
error_code: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -224,6 +254,8 @@ class ProcessingRunner:
|
||||
page_count = page_count_for(doc)
|
||||
started = time.monotonic()
|
||||
status = "success"
|
||||
error_type = None
|
||||
error_code = None
|
||||
try:
|
||||
if input_path.is_dir():
|
||||
doc_out = build_batch_output_dir(root_dir, out_root, doc)
|
||||
@@ -238,6 +270,8 @@ class ProcessingRunner:
|
||||
log(f"OK : {doc.name}")
|
||||
except Exception as exc: # un échec n'interrompt pas le lot
|
||||
status = "failed"
|
||||
error_type = type(exc).__name__
|
||||
error_code = classify_error_code(exc)
|
||||
summary.failed += 1
|
||||
summary.errors.append((doc.name, str(exc)))
|
||||
log(f"ÉCHEC : {doc.name} — {exc}")
|
||||
@@ -248,6 +282,8 @@ class ProcessingRunner:
|
||||
status=status,
|
||||
duration_ms=int((time.monotonic() - started) * 1000),
|
||||
extension=extension,
|
||||
error_type=error_type,
|
||||
error_code=error_code,
|
||||
)
|
||||
)
|
||||
if on_progress:
|
||||
|
||||
@@ -68,6 +68,7 @@ class UsageTab(ctk.CTkFrame):
|
||||
on_theme_change=None,
|
||||
current_theme: str = theme_mod.DEFAULT_THEME,
|
||||
usage_reporter=None,
|
||||
diag_reporter=None,
|
||||
**kwargs,
|
||||
):
|
||||
self._p = palette or theme_mod.get_palette(current_theme)
|
||||
@@ -80,6 +81,9 @@ class UsageTab(ctk.CTkFrame):
|
||||
# Callback(summary) appelé en fin de run pour la télémétrie d'usage
|
||||
# (envoi non bloquant, injecté par l'app avec le contexte licence).
|
||||
self._usage_reporter = usage_reporter
|
||||
# Callback(summary) appelé en fin de run pour les diagnostics RGPD
|
||||
# (envoi non bloquant, injecté par l'app avec le contexte licence).
|
||||
self._diag_reporter = diag_reporter
|
||||
|
||||
self._input_path: Path | None = None
|
||||
self._output_dir: Path | None = None
|
||||
@@ -320,6 +324,7 @@ class UsageTab(ctk.CTkFrame):
|
||||
self._show_results(summary)
|
||||
self._show_failure_hint(summary)
|
||||
self._send_usage_telemetry(summary)
|
||||
self._send_diagnostics(summary)
|
||||
|
||||
def _send_usage_telemetry(self, summary) -> None:
|
||||
"""Envoie la télémétrie d'usage en fin de run, sans bloquer l'UI ni le run."""
|
||||
@@ -335,6 +340,20 @@ class UsageTab(ctk.CTkFrame):
|
||||
|
||||
threading.Thread(target=work, daemon=True).start()
|
||||
|
||||
def _send_diagnostics(self, summary) -> None:
|
||||
"""Envoie les diagnostics en fin de run, sans bloquer l'UI ni le run."""
|
||||
reporter = self._diag_reporter
|
||||
if reporter is None:
|
||||
return
|
||||
|
||||
def work():
|
||||
try:
|
||||
reporter(summary)
|
||||
except Exception:
|
||||
pass # un échec diagnostic ne doit jamais remonter
|
||||
|
||||
threading.Thread(target=work, daemon=True).start()
|
||||
|
||||
def _show_results(self, summary) -> None:
|
||||
p = self._p
|
||||
for w in self._stats_row.winfo_children():
|
||||
|
||||
136
tests/unit/test_gui_v6_diagnostics.py
Normal file
136
tests/unit/test_gui_v6_diagnostics.py
Normal file
@@ -0,0 +1,136 @@
|
||||
import json
|
||||
from types import SimpleNamespace
|
||||
|
||||
from gui_v6 import diagnostics
|
||||
|
||||
|
||||
def _doc(**kw):
|
||||
base = dict(ordinal=0, status="success", error_type=None, error_code=None, duration_ms=12)
|
||||
base.update(kw)
|
||||
return SimpleNamespace(**base)
|
||||
|
||||
|
||||
def test_new_run_id_is_hex():
|
||||
rid = diagnostics.new_run_id()
|
||||
assert isinstance(rid, str) and len(rid) >= 16
|
||||
|
||||
|
||||
def test_items_from_summary_whitelist_only():
|
||||
summary = SimpleNamespace(documents=[
|
||||
_doc(ordinal=0, status="success"),
|
||||
_doc(ordinal=1, status="failed", error_type="ValueError", error_code="processing_error"),
|
||||
])
|
||||
items = diagnostics.items_from_summary(summary)
|
||||
assert items[1]["error_type"] == "ValueError"
|
||||
assert set(items[0]) <= {"ordinal", "status", "error_type", "error_code", "duration_ms"}
|
||||
|
||||
|
||||
def test_build_payload_counts_and_no_pii_leak():
|
||||
# On INJECTE de la PII via des clés interdites + un faux message d'erreur :
|
||||
raw_docs = [
|
||||
{"ordinal": 0, "status": "success", "duration_ms": 5,
|
||||
"filename": "LETTRE Dupont 1980.pdf", "path": "/home/dom/secret.pdf"},
|
||||
{"ordinal": 1, "status": "failed", "error_type": "ValueError",
|
||||
"error_code": "processing_error", "error_message": "patient Dupont Jean"},
|
||||
]
|
||||
payload = diagnostics.build_diagnostics_payload(
|
||||
run_id="r" * 16, app_name="gui_v6", app_version="6.0.0-g1",
|
||||
license_ref="LIC-1", machine_id="m" * 12, duration_ms=999, items=raw_docs,
|
||||
)
|
||||
assert payload["document_count"] == 2
|
||||
assert payload["succeeded_count"] == 1 and payload["failed_count"] == 1
|
||||
blob = json.dumps(payload).lower()
|
||||
for forbidden in ("filename", "path", "secret", "dupont", "lettre", "error_message", "patient"):
|
||||
assert forbidden not in blob, f"fuite RGPD : {forbidden}"
|
||||
for item in payload["items"]:
|
||||
assert set(item) <= {"ordinal", "status", "error_type", "error_code", "duration_ms"}
|
||||
|
||||
|
||||
class _FakeResp:
|
||||
def __init__(self, status_code):
|
||||
self.status_code = status_code
|
||||
|
||||
|
||||
class _FakeSession:
|
||||
def __init__(self, status_code=200, raise_exc=None):
|
||||
self.status_code = status_code
|
||||
self.raise_exc = raise_exc
|
||||
self.calls = []
|
||||
|
||||
def post(self, url, json=None, timeout=None):
|
||||
self.calls.append((url, json, timeout))
|
||||
if self.raise_exc:
|
||||
raise self.raise_exc
|
||||
return _FakeResp(self.status_code)
|
||||
|
||||
|
||||
def test_client_report_ok_on_2xx():
|
||||
sess = _FakeSession(status_code=200)
|
||||
client = diagnostics.DiagnosticsClient("https://app.aivanov.eu/", session=sess)
|
||||
assert client.report({"run_id": "r"}) is True
|
||||
assert sess.calls[0][0] == "https://app.aivanov.eu/api/v1/diagnostics/report"
|
||||
|
||||
|
||||
def test_client_report_false_on_network_error_without_raising():
|
||||
sess = _FakeSession(raise_exc=RuntimeError("no network"))
|
||||
client = diagnostics.DiagnosticsClient("https://app.aivanov.eu", session=sess)
|
||||
assert client.report({"run_id": "r"}) is False # ne lève pas
|
||||
|
||||
|
||||
def test_report_run_diagnostics_no_send_without_license(tmp_path):
|
||||
sess = _FakeSession()
|
||||
ok = diagnostics.report_run_diagnostics(
|
||||
SimpleNamespace(documents=[]), base_url="https://app.aivanov.eu",
|
||||
license_ref=None, machine_id="m" * 12, session=sess,
|
||||
spool_path=tmp_path / "spool.jsonl",
|
||||
)
|
||||
assert ok is False and sess.calls == []
|
||||
|
||||
|
||||
def test_report_run_diagnostics_network_down_spools(tmp_path):
|
||||
sess = _FakeSession(raise_exc=RuntimeError("down"))
|
||||
spool = tmp_path / "spool.jsonl"
|
||||
summary = SimpleNamespace(documents=[_doc(ordinal=0, status="failed",
|
||||
error_type="ValueError", error_code="processing_error")])
|
||||
ok = diagnostics.report_run_diagnostics(
|
||||
summary, base_url="https://app.aivanov.eu", license_ref="LIC-1",
|
||||
machine_id="m" * 12, session=sess, spool_path=spool,
|
||||
)
|
||||
assert ok is False and spool.exists()
|
||||
line = json.loads(spool.read_text(encoding="utf-8").splitlines()[0])
|
||||
assert line["failed_count"] == 1
|
||||
|
||||
|
||||
def test_flush_spool_sends_and_clears(tmp_path):
|
||||
spool = tmp_path / "spool.jsonl"
|
||||
diagnostics.spool_payload(spool, {"run_id": "r1"})
|
||||
diagnostics.spool_payload(spool, {"run_id": "r2"})
|
||||
sent = diagnostics.flush_spool(spool, diagnostics.DiagnosticsClient(
|
||||
"https://app.aivanov.eu", session=_FakeSession(status_code=200)))
|
||||
assert sent == 2 and not spool.exists()
|
||||
|
||||
|
||||
def test_tab_send_diagnostics_calls_reporter():
|
||||
import threading
|
||||
from gui_v6.tabs.tab_usage import UsageTab
|
||||
|
||||
tab = object.__new__(UsageTab) # pas de Tk : on teste juste le helper
|
||||
seen = {}
|
||||
done = threading.Event()
|
||||
|
||||
def reporter(summary):
|
||||
seen["summary"] = summary
|
||||
done.set()
|
||||
|
||||
tab._diag_reporter = reporter
|
||||
tab._send_diagnostics(SimpleNamespace(documents=[], failed=0))
|
||||
assert done.wait(timeout=2.0)
|
||||
assert seen["summary"] is not None
|
||||
|
||||
|
||||
def test_tab_send_diagnostics_noop_without_reporter():
|
||||
from gui_v6.tabs.tab_usage import UsageTab
|
||||
|
||||
tab = object.__new__(UsageTab)
|
||||
tab._diag_reporter = None
|
||||
tab._send_diagnostics(SimpleNamespace(documents=[])) # ne lève pas
|
||||
@@ -261,3 +261,70 @@ def test_run_records_per_document_details(tmp_path):
|
||||
assert not hasattr(doc, "path")
|
||||
assert not hasattr(doc, "filename")
|
||||
assert not hasattr(doc, "name")
|
||||
|
||||
|
||||
# -- diagnostics d'erreur RGPD-safe (E2) -----------------------------------
|
||||
|
||||
def test_failed_doc_carries_rgpd_safe_error_fields(tmp_path):
|
||||
from gui_v6.processing_runner import ProcessingRunner
|
||||
|
||||
secret = "Dupont Jean 1980" # simulacre de PII dans un message d'exception
|
||||
|
||||
def boom(_inp, _out):
|
||||
raise ValueError(f"échec sur patient {secret}")
|
||||
|
||||
inp = tmp_path / "in"; inp.mkdir()
|
||||
(inp / "a.pdf").write_bytes(b"%PDF-1.4\n")
|
||||
out = tmp_path / "out"; out.mkdir()
|
||||
runner = ProcessingRunner(process_fn=boom)
|
||||
summary = runner.run(inp, out)
|
||||
|
||||
assert summary.failed == 1
|
||||
doc = summary.documents[0]
|
||||
assert doc.error_type == "ValueError"
|
||||
assert doc.error_code in {"ner_unavailable", "quarantined", "no_output", "processing_error"}
|
||||
blob = repr(vars(doc)).lower()
|
||||
assert "dupont" not in blob and "patient" not in blob and secret.lower() not in blob
|
||||
|
||||
|
||||
def test_success_doc_has_no_error_fields(tmp_path):
|
||||
from gui_v6.processing_runner import ProcessingRunner
|
||||
|
||||
def ok(_inp, out_dir):
|
||||
# process_fn reçoit le DOSSIER de sortie : on y écrit un PDF livrable.
|
||||
pdf = out_dir / "a.redacted_raster.pdf"
|
||||
pdf.write_bytes(b"%PDF-1.4\n")
|
||||
return {"status": "ok", "pdf_raster": str(pdf)}
|
||||
|
||||
inp = tmp_path / "in"; inp.mkdir()
|
||||
(inp / "a.pdf").write_bytes(b"%PDF-1.4\n")
|
||||
out = tmp_path / "out"; out.mkdir()
|
||||
summary = ProcessingRunner(process_fn=ok).run(inp, out)
|
||||
doc = summary.documents[0]
|
||||
assert doc.status == "success"
|
||||
assert doc.error_type is None and doc.error_code is None
|
||||
|
||||
|
||||
# -- classification d'erreur : une assertion par branche (mapping vérifié) -
|
||||
|
||||
def test_classify_error_code_ner_unavailable():
|
||||
from gui_v6.processing_runner import classify_error_code
|
||||
from gui_v6.engine_bridge import EngineUnavailableError # import the REAL class
|
||||
# importing the real class means a future rename breaks this test (intended guard)
|
||||
assert classify_error_code(EngineUnavailableError("modèle indispo")) == "ner_unavailable"
|
||||
|
||||
|
||||
def test_classify_error_code_quarantined():
|
||||
from gui_v6.processing_runner import classify_error_code
|
||||
assert classify_error_code(RuntimeError("Document mis en quarantaine : texte trop court")) == "quarantined"
|
||||
|
||||
|
||||
def test_classify_error_code_no_output():
|
||||
from gui_v6.processing_runner import classify_error_code
|
||||
assert classify_error_code(RuntimeError("Aucune sortie PDF anonymisée produite")) == "no_output"
|
||||
|
||||
|
||||
def test_classify_error_code_processing_error_default():
|
||||
from gui_v6.processing_runner import classify_error_code, _ERROR_CODES
|
||||
assert classify_error_code(ValueError("patient Dupont")) == "processing_error"
|
||||
assert classify_error_code(ValueError("x")) in _ERROR_CODES
|
||||
|
||||
Reference in New Issue
Block a user