5 Commits

Author SHA1 Message Date
8f9107a27f feat(gui): câblage upload diagnostics en fin de run (E3)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:44:02 +02:00
8eb8cf9999 feat(gui): client diagnostics non bloquant + spool best-effort (E3)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:39:25 +02:00
4b7a31b9df feat(gui): module diagnostics — payload liste-blanche RGPD (E2)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:36:16 +02:00
4412512d4b test(gui): vérifier chaque branche de classify_error_code + anti-dérive (E2)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:34:11 +02:00
952a1c6ca0 feat(gui): DocResult porte type+catégorie d'erreur RGPD-safe (E2)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 10:28:42 +02:00
6 changed files with 479 additions and 0 deletions

View File

@@ -193,6 +193,7 @@ class AnonymisationApp(ctk.CTk):
on_theme_change=self.set_theme,
current_theme=self._theme_name,
usage_reporter=self._report_usage,
diag_reporter=self._report_diagnostics,
)
if key == "cfg":
return ConfigTab(self._content, palette=p, state=self._config, config_path=self._user_config_path)
@@ -243,6 +244,36 @@ class AnonymisationApp(ctk.CTk):
except Exception:
pass
def _report_diagnostics(self, summary) -> None:
"""Envoie les diagnostics en fin de run (non bloquant, best-effort).
N'envoie rien si aucune licence locale valide. Ne lève jamais.
"""
try:
from gui_v6 import __version__ as gui_version
from gui_v6 import diagnostics
from gui_v6.logging_setup import log_file_path
from gui_v6.machine_id import default_machine_id
session = self._usage_session()
if session is None:
return
status = self._safe_local_status()
base_url = getattr(self._license_client, "_base_url", "") or resolve_portal_url()
spool = log_file_path().parent / "diagnostics_spool.jsonl"
diagnostics.report_run_diagnostics(
summary,
base_url=base_url,
license_ref=getattr(status, "license_ref", None),
machine_id=default_machine_id(),
session=session,
app_name="gui_v6",
app_version=gui_version,
spool_path=spool,
)
except Exception:
pass
def _show(self, key: str) -> None:
self._active = key
self._refresh_tabbar()

190
gui_v6/diagnostics.py Normal file
View File

@@ -0,0 +1,190 @@
"""Diagnostics structurés de la GUI V6 (E2/E3) — RGPD strict.
On n'émet QUE des métadonnées techniques liste-blanche : type d'exception
(nom de classe), catégorie d'erreur d'un ensemble fermé, statut, ordinal,
durée. JAMAIS de nom/chemin/texte de document, ni de message d'exception brut.
L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement.
Patron : gui_v6/usage_telemetry.py (télémétrie d'usage).
"""
from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
# Clés autorisées par item de diagnostic (filtre RGPD appliqué à la construction).
_ALLOWED_ITEM_KEYS = {"ordinal", "status", "error_type", "error_code", "duration_ms"}
REPORT_PATH = "/api/v1/diagnostics/report"
def new_run_id() -> str:
return uuid.uuid4().hex
def items_from_summary(summary: Any) -> list[dict]:
"""Extrait les items de diagnostic (RGPD-safe) d'un ``RunSummary``.
Ne lit que les attributs autorisés ; aucun nom/chemin/message n'est lu.
"""
items: list[dict] = []
for item in getattr(summary, "documents", None) or []:
items.append(
{
"ordinal": getattr(item, "ordinal", 0),
"status": getattr(item, "status", "success"),
"error_type": getattr(item, "error_type", None),
"error_code": getattr(item, "error_code", None),
"duration_ms": getattr(item, "duration_ms", None),
}
)
return items
def build_diagnostics_payload(
*,
run_id: str,
app_name: str,
app_version: Optional[str],
license_ref: Optional[str],
machine_id: Optional[str],
duration_ms: Optional[int],
items: Iterable[dict],
) -> dict:
"""Construit le payload diagnostic. Chaque item est filtré aux seules clés
autorisées → aucun nom/chemin/message ne peut fuir, même fourni par erreur."""
clean_items: list[dict] = []
succeeded = failed = 0
for raw in items:
it = {k: raw[k] for k in _ALLOWED_ITEM_KEYS if k in raw}
status = it.get("status")
if status == "success":
succeeded += 1
elif status == "failed":
failed += 1
clean_items.append(it)
return {
"run_id": run_id,
"license_ref": license_ref,
"machine_id": machine_id,
"app_name": app_name,
"app_version": app_version,
"duration_ms": duration_ms,
"document_count": len(clean_items),
"succeeded_count": succeeded,
"failed_count": failed,
"items": clean_items,
}
class DiagnosticsClient:
"""Envoie un payload diagnostic au portail. Non bloquant : capture toute erreur."""
def __init__(
self,
base_url: str,
session: Any,
timeout: float = 4.0,
logger: Optional[Callable[[str], None]] = None,
) -> None:
self._url = base_url.rstrip("/") + REPORT_PATH
self._session = session
self._timeout = timeout
self._log = logger or (lambda _msg: None)
def report(self, payload: dict) -> bool:
try:
resp = self._session.post(self._url, json=payload, timeout=self._timeout)
status = getattr(resp, "status_code", 0)
ok = 200 <= int(status) < 300
if not ok:
self._log(f"diagnostics report refusé (HTTP {status})")
return ok
except Exception as exc: # réseau absent, timeout, etc.
self._log(f"diagnostics report échec (non bloquant) : {exc}")
return False
def report_run_diagnostics(
summary: Any,
*,
base_url: str,
license_ref: Optional[str],
machine_id: Optional[str],
session: Any,
app_name: str = "gui_v6",
app_version: Optional[str] = None,
duration_ms: Optional[int] = None,
run_id: Optional[str] = None,
spool_path: Any = None,
logger: Optional[Callable[[str], None]] = None,
) -> bool:
"""Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant).
N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole
le payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais.
"""
log = logger or (lambda _msg: None)
if not license_ref:
log("diagnostics ignorés : aucune licence locale valide")
return False
payload = build_diagnostics_payload(
run_id=run_id or new_run_id(),
app_name=app_name,
app_version=app_version,
license_ref=license_ref,
machine_id=machine_id,
duration_ms=duration_ms,
items=items_from_summary(summary),
)
client = DiagnosticsClient(base_url, session=session, logger=log)
ok = client.report(payload)
if not ok and spool_path is not None:
spool_payload(spool_path, payload)
return ok
def spool_payload(path: Any, payload: dict) -> None:
"""Ajoute un payload à la file JSONL locale (ne lève pas)."""
try:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except Exception:
pass
def flush_spool(path: Any, client: "DiagnosticsClient") -> int:
"""Tente d'envoyer chaque payload en file ; conserve ceux qui échouent.
Retourne le nombre de payloads envoyés. Ne lève jamais.
"""
p = Path(path)
if not p.exists():
return 0
try:
lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()]
except Exception:
return 0
remaining: list[str] = []
sent = 0
for line in lines:
try:
payload = json.loads(line)
except Exception:
continue
if client.report(payload):
sent += 1
else:
remaining.append(line)
try:
if remaining:
p.write_text("\n".join(remaining) + "\n", encoding="utf-8")
else:
p.unlink(missing_ok=True)
except Exception:
pass
return sent

View File

@@ -87,6 +87,32 @@ def _engine_result_error(result: object) -> str | None:
return None
# Ensemble FERMÉ de catégories d'erreur (aucune PII ne peut y entrer).
_ERROR_CODES = ("ner_unavailable", "quarantined", "no_output", "processing_error")
def classify_error_code(exc: Exception) -> str:
"""Catégorise une exception de run en une valeur de l'ensemble fermé _ERROR_CODES.
Lit le type et d'éventuels préfixes de message GÉNÉRÉS PAR NOUS pour classer ;
ne renvoie JAMAIS le message lui-même (RGPD). Inconnu → 'processing_error'.
"""
name = type(exc).__name__
if name == "EngineUnavailableError":
return "ner_unavailable"
msg = str(exc)
# ⚠ ANTI-DÉRIVE : ces littéraux DOIVENT rester synchronisés avec les messages
# produits par `_engine_result_error` ci-dessus ("Document mis en quarantaine :"
# et "Aucune sortie PDF anonymisée produite."). Si l'un est reformulé sans
# mettre à jour l'autre, l'erreur retombe silencieusement en 'processing_error'
# (couvert par les tests test_classify_error_code_*).
if "quarantaine" in msg:
return "quarantined"
if "Aucune sortie" in msg:
return "no_output"
return "processing_error"
def discover_documents(input_path, extensions: Optional[Sequence[str]] = None) -> list[Path]:
"""Liste les documents à traiter (fichier unique ou dossier récursif)."""
path = Path(input_path)
@@ -115,6 +141,10 @@ class DocResult:
status: str # "success" | "failed"
duration_ms: Optional[int]
extension: Optional[str]
# Diagnostics RGPD-safe : nom de classe d'exception + catégorie fermée.
# JAMAIS le message d'exception (str(exc)) ni nom/chemin de document.
error_type: Optional[str] = None
error_code: Optional[str] = None
@dataclass
@@ -224,6 +254,8 @@ class ProcessingRunner:
page_count = page_count_for(doc)
started = time.monotonic()
status = "success"
error_type = None
error_code = None
try:
if input_path.is_dir():
doc_out = build_batch_output_dir(root_dir, out_root, doc)
@@ -238,6 +270,8 @@ class ProcessingRunner:
log(f"OK : {doc.name}")
except Exception as exc: # un échec n'interrompt pas le lot
status = "failed"
error_type = type(exc).__name__
error_code = classify_error_code(exc)
summary.failed += 1
summary.errors.append((doc.name, str(exc)))
log(f"ÉCHEC : {doc.name}{exc}")
@@ -248,6 +282,8 @@ class ProcessingRunner:
status=status,
duration_ms=int((time.monotonic() - started) * 1000),
extension=extension,
error_type=error_type,
error_code=error_code,
)
)
if on_progress:

View File

@@ -68,6 +68,7 @@ class UsageTab(ctk.CTkFrame):
on_theme_change=None,
current_theme: str = theme_mod.DEFAULT_THEME,
usage_reporter=None,
diag_reporter=None,
**kwargs,
):
self._p = palette or theme_mod.get_palette(current_theme)
@@ -80,6 +81,9 @@ class UsageTab(ctk.CTkFrame):
# Callback(summary) appelé en fin de run pour la télémétrie d'usage
# (envoi non bloquant, injecté par l'app avec le contexte licence).
self._usage_reporter = usage_reporter
# Callback(summary) appelé en fin de run pour les diagnostics RGPD
# (envoi non bloquant, injecté par l'app avec le contexte licence).
self._diag_reporter = diag_reporter
self._input_path: Path | None = None
self._output_dir: Path | None = None
@@ -320,6 +324,7 @@ class UsageTab(ctk.CTkFrame):
self._show_results(summary)
self._show_failure_hint(summary)
self._send_usage_telemetry(summary)
self._send_diagnostics(summary)
def _send_usage_telemetry(self, summary) -> None:
"""Envoie la télémétrie d'usage en fin de run, sans bloquer l'UI ni le run."""
@@ -335,6 +340,20 @@ class UsageTab(ctk.CTkFrame):
threading.Thread(target=work, daemon=True).start()
def _send_diagnostics(self, summary) -> None:
"""Envoie les diagnostics en fin de run, sans bloquer l'UI ni le run."""
reporter = self._diag_reporter
if reporter is None:
return
def work():
try:
reporter(summary)
except Exception:
pass # un échec diagnostic ne doit jamais remonter
threading.Thread(target=work, daemon=True).start()
def _show_results(self, summary) -> None:
p = self._p
for w in self._stats_row.winfo_children():

View File

@@ -0,0 +1,136 @@
import json
from types import SimpleNamespace
from gui_v6 import diagnostics
def _doc(**kw):
base = dict(ordinal=0, status="success", error_type=None, error_code=None, duration_ms=12)
base.update(kw)
return SimpleNamespace(**base)
def test_new_run_id_is_hex():
rid = diagnostics.new_run_id()
assert isinstance(rid, str) and len(rid) >= 16
def test_items_from_summary_whitelist_only():
summary = SimpleNamespace(documents=[
_doc(ordinal=0, status="success"),
_doc(ordinal=1, status="failed", error_type="ValueError", error_code="processing_error"),
])
items = diagnostics.items_from_summary(summary)
assert items[1]["error_type"] == "ValueError"
assert set(items[0]) <= {"ordinal", "status", "error_type", "error_code", "duration_ms"}
def test_build_payload_counts_and_no_pii_leak():
# On INJECTE de la PII via des clés interdites + un faux message d'erreur :
raw_docs = [
{"ordinal": 0, "status": "success", "duration_ms": 5,
"filename": "LETTRE Dupont 1980.pdf", "path": "/home/dom/secret.pdf"},
{"ordinal": 1, "status": "failed", "error_type": "ValueError",
"error_code": "processing_error", "error_message": "patient Dupont Jean"},
]
payload = diagnostics.build_diagnostics_payload(
run_id="r" * 16, app_name="gui_v6", app_version="6.0.0-g1",
license_ref="LIC-1", machine_id="m" * 12, duration_ms=999, items=raw_docs,
)
assert payload["document_count"] == 2
assert payload["succeeded_count"] == 1 and payload["failed_count"] == 1
blob = json.dumps(payload).lower()
for forbidden in ("filename", "path", "secret", "dupont", "lettre", "error_message", "patient"):
assert forbidden not in blob, f"fuite RGPD : {forbidden}"
for item in payload["items"]:
assert set(item) <= {"ordinal", "status", "error_type", "error_code", "duration_ms"}
class _FakeResp:
def __init__(self, status_code):
self.status_code = status_code
class _FakeSession:
def __init__(self, status_code=200, raise_exc=None):
self.status_code = status_code
self.raise_exc = raise_exc
self.calls = []
def post(self, url, json=None, timeout=None):
self.calls.append((url, json, timeout))
if self.raise_exc:
raise self.raise_exc
return _FakeResp(self.status_code)
def test_client_report_ok_on_2xx():
sess = _FakeSession(status_code=200)
client = diagnostics.DiagnosticsClient("https://app.aivanov.eu/", session=sess)
assert client.report({"run_id": "r"}) is True
assert sess.calls[0][0] == "https://app.aivanov.eu/api/v1/diagnostics/report"
def test_client_report_false_on_network_error_without_raising():
sess = _FakeSession(raise_exc=RuntimeError("no network"))
client = diagnostics.DiagnosticsClient("https://app.aivanov.eu", session=sess)
assert client.report({"run_id": "r"}) is False # ne lève pas
def test_report_run_diagnostics_no_send_without_license(tmp_path):
sess = _FakeSession()
ok = diagnostics.report_run_diagnostics(
SimpleNamespace(documents=[]), base_url="https://app.aivanov.eu",
license_ref=None, machine_id="m" * 12, session=sess,
spool_path=tmp_path / "spool.jsonl",
)
assert ok is False and sess.calls == []
def test_report_run_diagnostics_network_down_spools(tmp_path):
sess = _FakeSession(raise_exc=RuntimeError("down"))
spool = tmp_path / "spool.jsonl"
summary = SimpleNamespace(documents=[_doc(ordinal=0, status="failed",
error_type="ValueError", error_code="processing_error")])
ok = diagnostics.report_run_diagnostics(
summary, base_url="https://app.aivanov.eu", license_ref="LIC-1",
machine_id="m" * 12, session=sess, spool_path=spool,
)
assert ok is False and spool.exists()
line = json.loads(spool.read_text(encoding="utf-8").splitlines()[0])
assert line["failed_count"] == 1
def test_flush_spool_sends_and_clears(tmp_path):
spool = tmp_path / "spool.jsonl"
diagnostics.spool_payload(spool, {"run_id": "r1"})
diagnostics.spool_payload(spool, {"run_id": "r2"})
sent = diagnostics.flush_spool(spool, diagnostics.DiagnosticsClient(
"https://app.aivanov.eu", session=_FakeSession(status_code=200)))
assert sent == 2 and not spool.exists()
def test_tab_send_diagnostics_calls_reporter():
import threading
from gui_v6.tabs.tab_usage import UsageTab
tab = object.__new__(UsageTab) # pas de Tk : on teste juste le helper
seen = {}
done = threading.Event()
def reporter(summary):
seen["summary"] = summary
done.set()
tab._diag_reporter = reporter
tab._send_diagnostics(SimpleNamespace(documents=[], failed=0))
assert done.wait(timeout=2.0)
assert seen["summary"] is not None
def test_tab_send_diagnostics_noop_without_reporter():
from gui_v6.tabs.tab_usage import UsageTab
tab = object.__new__(UsageTab)
tab._diag_reporter = None
tab._send_diagnostics(SimpleNamespace(documents=[])) # ne lève pas

View File

@@ -261,3 +261,70 @@ def test_run_records_per_document_details(tmp_path):
assert not hasattr(doc, "path")
assert not hasattr(doc, "filename")
assert not hasattr(doc, "name")
# -- diagnostics d'erreur RGPD-safe (E2) -----------------------------------
def test_failed_doc_carries_rgpd_safe_error_fields(tmp_path):
from gui_v6.processing_runner import ProcessingRunner
secret = "Dupont Jean 1980" # simulacre de PII dans un message d'exception
def boom(_inp, _out):
raise ValueError(f"échec sur patient {secret}")
inp = tmp_path / "in"; inp.mkdir()
(inp / "a.pdf").write_bytes(b"%PDF-1.4\n")
out = tmp_path / "out"; out.mkdir()
runner = ProcessingRunner(process_fn=boom)
summary = runner.run(inp, out)
assert summary.failed == 1
doc = summary.documents[0]
assert doc.error_type == "ValueError"
assert doc.error_code in {"ner_unavailable", "quarantined", "no_output", "processing_error"}
blob = repr(vars(doc)).lower()
assert "dupont" not in blob and "patient" not in blob and secret.lower() not in blob
def test_success_doc_has_no_error_fields(tmp_path):
from gui_v6.processing_runner import ProcessingRunner
def ok(_inp, out_dir):
# process_fn reçoit le DOSSIER de sortie : on y écrit un PDF livrable.
pdf = out_dir / "a.redacted_raster.pdf"
pdf.write_bytes(b"%PDF-1.4\n")
return {"status": "ok", "pdf_raster": str(pdf)}
inp = tmp_path / "in"; inp.mkdir()
(inp / "a.pdf").write_bytes(b"%PDF-1.4\n")
out = tmp_path / "out"; out.mkdir()
summary = ProcessingRunner(process_fn=ok).run(inp, out)
doc = summary.documents[0]
assert doc.status == "success"
assert doc.error_type is None and doc.error_code is None
# -- classification d'erreur : une assertion par branche (mapping vérifié) -
def test_classify_error_code_ner_unavailable():
from gui_v6.processing_runner import classify_error_code
from gui_v6.engine_bridge import EngineUnavailableError # import the REAL class
# importing the real class means a future rename breaks this test (intended guard)
assert classify_error_code(EngineUnavailableError("modèle indispo")) == "ner_unavailable"
def test_classify_error_code_quarantined():
from gui_v6.processing_runner import classify_error_code
assert classify_error_code(RuntimeError("Document mis en quarantaine : texte trop court")) == "quarantined"
def test_classify_error_code_no_output():
from gui_v6.processing_runner import classify_error_code
assert classify_error_code(RuntimeError("Aucune sortie PDF anonymisée produite")) == "no_output"
def test_classify_error_code_processing_error_default():
from gui_v6.processing_runner import classify_error_code, _ERROR_CODES
assert classify_error_code(ValueError("patient Dupont")) == "processing_error"
assert classify_error_code(ValueError("x")) in _ERROR_CODES