feat(gui): module télémétrie d'usage (client, RGPD-safe, non bloquant)

Phase A de la mission télémétrie d'usage par client.

- gui_v6/usage_telemetry.py :
  - page_count_for(path) : PDF→fitz, image→1, autres→None ; best-effort, ne
    lève jamais, ne lit que l'extension (jamais le nom).
  - build_usage_payload(...) : compteurs (document/succeeded/failed/total_pages)
    + documents filtrés aux seules clés autorisées (ordinal/page_count/status/
    duration_ms/extension) → aucun nom/chemin de fichier ne peut fuir.
  - UsageTelemetryClient(session injectée) : report() non bloquant (capture
    tout, False en cas d'échec réseau) vers POST /api/v1/usage/report.
  - spool JSONL local (spool_payload/flush_spool) pour rejouer les échecs.

Module isolé, non câblé au runner pour l'instant (le branchement fin-de-run
viendra après le backend, hors validation visuelle GUI en cours). Aucun
build/push sans GO Dom. 10 tests unitaires (payload sans nom de fichier,
réseau indispo ne crashe pas, compteurs, page_count PDF mockable).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-15 17:13:57 +02:00
parent a9e8b2c2e6
commit ab2ca8a552
2 changed files with 304 additions and 0 deletions

167
gui_v6/usage_telemetry.py Normal file
View File

@@ -0,0 +1,167 @@
"""Télémétrie d'usage de la GUI V6 (suivi licence/client, jamais audit médical).
RGPD : on n'émet QUE des compteurs et métadonnées non sensibles. Jamais de nom
ou de chemin de fichier, de texte extrait, d'entités ni de noms patients.
L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement.
"""
from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
# Clés autorisées par document (filtre RGPD appliqué à la construction).
_ALLOWED_DOC_KEYS = {"ordinal", "page_count", "status", "duration_ms", "extension"}
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".gif"}
REPORT_PATH = "/api/v1/usage/report"
def new_run_id() -> str:
return uuid.uuid4().hex
def _default_pdf_counter(path: Any) -> Optional[int]:
try:
import fitz
with fitz.open(str(path)) as doc:
return len(doc)
except Exception:
return None
def page_count_for(
path: Any, pdf_counter: Callable[[Any], Optional[int]] = _default_pdf_counter
) -> Optional[int]:
"""Nombre de pages best-effort. PDF→compteur, image→1, autres→None.
Ne lève jamais et ne lit pas le nom du fichier (seulement l'extension).
"""
try:
ext = Path(str(path)).suffix.lower()
except Exception:
return None
if ext == ".pdf":
try:
return pdf_counter(path)
except Exception:
return None
if ext in _IMAGE_EXTS:
return 1
return None
def build_usage_payload(
*,
run_id: str,
app_name: str,
app_version: str,
license_ref: Optional[str],
machine_id: Optional[str],
documents: Iterable[dict],
) -> dict:
"""Construit le payload d'usage. Les documents sont filtrés aux seules clés
autorisées → aucun nom/chemin ne peut fuir, même fourni par erreur."""
clean_docs: list[dict] = []
succeeded = failed = total_pages = 0
for raw in documents:
doc = {k: raw[k] for k in _ALLOWED_DOC_KEYS if k in raw}
status = doc.get("status")
if status == "success":
succeeded += 1
elif status == "failed":
failed += 1
page_count = doc.get("page_count")
if isinstance(page_count, int):
total_pages += page_count
clean_docs.append(doc)
return {
"run_id": run_id,
"license_ref": license_ref,
"machine_id": machine_id,
"app_name": app_name,
"app_version": app_version,
"document_count": len(clean_docs),
"succeeded_count": succeeded,
"failed_count": failed,
"total_pages": total_pages,
"documents": clean_docs,
}
class UsageTelemetryClient:
"""Envoie un payload d'usage au portail. Non bloquant : capture toute erreur."""
def __init__(
self,
base_url: str,
session: Any,
timeout: float = 4.0,
logger: Optional[Callable[[str], None]] = None,
) -> None:
self._url = base_url.rstrip("/") + REPORT_PATH
self._session = session
self._timeout = timeout
self._log = logger or (lambda _msg: None)
def report(self, payload: dict) -> bool:
try:
resp = self._session.post(self._url, json=payload, timeout=self._timeout)
status = getattr(resp, "status_code", 0)
ok = 200 <= int(status) < 300
if not ok:
self._log(f"usage report refusé (HTTP {status})")
return ok
except Exception as exc: # réseau absent, timeout, etc.
self._log(f"usage report échec (non bloquant) : {exc}")
return False
# --- file locale JSONL (rejeu best-effort des échecs) -----------------------
def spool_payload(path: Any, payload: dict) -> None:
"""Ajoute un payload à la file JSONL locale (ne lève pas)."""
try:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except Exception:
pass
def flush_spool(path: Any, client: "UsageTelemetryClient") -> int:
"""Tente d'envoyer chaque payload en file ; conserve ceux qui échouent.
Retourne le nombre de payloads envoyés avec succès. Ne lève jamais.
"""
p = Path(path)
if not p.exists():
return 0
try:
lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()]
except Exception:
return 0
remaining: list[str] = []
sent = 0
for line in lines:
try:
payload = json.loads(line)
except Exception:
continue # ligne corrompue : on l'abandonne
if client.report(payload):
sent += 1
else:
remaining.append(line)
try:
if remaining:
p.write_text("\n".join(remaining) + "\n", encoding="utf-8")
else:
p.unlink(missing_ok=True)
except Exception:
pass
return sent

View File

@@ -0,0 +1,137 @@
"""Télémétrie d'usage GUI V6 — payload RGPD-safe + envoi non bloquant.
Aucun nom/chemin de fichier ne doit sortir. L'échec réseau ne doit jamais
faire échouer le traitement.
"""
from __future__ import annotations
import json
import pytest
from gui_v6.usage_telemetry import (
UsageTelemetryClient,
build_usage_payload,
flush_spool,
new_run_id,
page_count_for,
spool_payload,
)
# --- page_count_for ---------------------------------------------------------
def test_page_count_image_is_one():
assert page_count_for("scan.PNG") == 1
assert page_count_for("photo.jpeg") == 1
def test_page_count_unknown_is_none():
assert page_count_for("note.txt") is None
assert page_count_for("doc.docx") is None
def test_page_count_pdf_uses_counter_and_never_raises():
assert page_count_for("x.pdf", pdf_counter=lambda p: 7) == 7
def boom(_p):
raise RuntimeError("corrompu")
assert page_count_for("x.pdf", pdf_counter=boom) is None
# --- build_usage_payload ----------------------------------------------------
def test_payload_counts_and_no_filename_leak():
documents = [
{"ordinal": 0, "page_count": 5, "status": "success", "extension": "pdf",
"filename": "LETTRE 23070126.pdf", "path": "/home/dom/secret.pdf"},
{"ordinal": 1, "page_count": 3, "status": "success"},
{"ordinal": 2, "page_count": None, "status": "failed"},
]
payload = build_usage_payload(
run_id="r1", app_name="gui_v6", app_version="6.0.0-g1",
license_ref="LIC-1", machine_id="m1", documents=documents,
)
assert payload["document_count"] == 3
assert payload["succeeded_count"] == 2
assert payload["failed_count"] == 1
assert payload["total_pages"] == 8
assert payload["license_ref"] == "LIC-1"
# RGPD : aucun nom/chemin ne doit survivre, à aucun niveau
blob = json.dumps(payload, ensure_ascii=False).lower()
assert "filename" not in blob
assert "secret" not in blob
assert "lettre" not in blob
for doc in payload["documents"]:
assert set(doc).issubset({"ordinal", "page_count", "status", "duration_ms", "extension"})
def test_new_run_id_unique():
assert new_run_id() != new_run_id()
# --- UsageTelemetryClient ---------------------------------------------------
class _FakeResp:
def __init__(self, status_code):
self.status_code = status_code
class _FakeSession:
def __init__(self, status_code=200, raise_exc=None):
self.status_code = status_code
self.raise_exc = raise_exc
self.calls = []
def post(self, url, json=None, timeout=None):
self.calls.append({"url": url, "json": json, "timeout": timeout})
if self.raise_exc is not None:
raise self.raise_exc
return _FakeResp(self.status_code)
def test_report_ok_on_2xx():
sess = _FakeSession(status_code=200)
client = UsageTelemetryClient("http://localhost:8000", session=sess)
assert client.report({"run_id": "r1"}) is True
assert sess.calls[0]["url"].endswith("/api/v1/usage/report")
assert sess.calls[0]["json"] == {"run_id": "r1"}
def test_report_false_on_network_error_without_raising():
sess = _FakeSession(raise_exc=ConnectionError("réseau coupé"))
client = UsageTelemetryClient("http://localhost:8000", session=sess)
assert client.report({"run_id": "r1"}) is False # ne lève pas
def test_report_false_on_non_2xx():
sess = _FakeSession(status_code=403)
client = UsageTelemetryClient("http://localhost:8000", session=sess)
assert client.report({"run_id": "r1"}) is False
# --- spool JSONL (rejeu des échecs) -----------------------------------------
def test_spool_and_flush(tmp_path):
spool = tmp_path / "usage_spool.jsonl"
spool_payload(spool, {"run_id": "a"})
spool_payload(spool, {"run_id": "b"})
assert spool.read_text(encoding="utf-8").count("\n") == 2
# tout part : le spool est vidé
sent = []
ok_client = UsageTelemetryClient("http://x", session=_FakeSession(200))
flush_spool(spool, ok_client)
assert not spool.exists() or spool.read_text(encoding="utf-8").strip() == ""
def test_flush_keeps_failures(tmp_path):
spool = tmp_path / "usage_spool.jsonl"
spool_payload(spool, {"run_id": "a"})
down_client = UsageTelemetryClient("http://x", session=_FakeSession(raise_exc=OSError("down")))
flush_spool(spool, down_client)
# l'échec reste en file pour un prochain essai
assert spool.exists()
assert "a" in spool.read_text(encoding="utf-8")