Files
anonymisation/gui_v6/usage_telemetry.py
Domi31tls 1bbe70a911 feat(gui): câbler l'envoi de la télémétrie d'usage en fin de run
Le module usage_telemetry est maintenant réellement branché : la GUI V6
envoie les statistiques au portail après chaque run (les stats web
restaient vides sans cela).

- processing_runner : RunSummary porte une liste DocResult (ordinal,
  page_count via page_count_for, status, duration_ms, extension) — peuplée
  dans la boucle. Aucun nom/chemin de fichier.
- usage_telemetry : report_run_summary(summary, base_url, license_ref,
  machine_id, session, ...) construit le payload depuis le RunSummary et
  l'envoie (non bloquant). N'envoie RIEN sans license_ref. Spool JSONL si
  échec réseau.
- tab_usage : _finish() déclenche l'envoi en thread daemon (jamais bloquant
  pour l'UI ni le run).
- app : fournit le reporter à UsageTab avec le contexte licence (base_url du
  LicenseClient, license_ref via local_status, machine_id, app_version).

Tests : RunSummary.documents peuplé (0 chemin) ; report_run_summary (payload
correct, réseau KO → spool sans crash, pas d'envoi sans licence) ; _finish
appelle le reporter. 252 tests unit OK (0 régression), self-test OK.
V5/moteur/app_aivanov intacts, 0 dépendance. Aucun build/push sans GO Dom.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-15 21:24:43 +02:00

224 lines
6.8 KiB
Python

"""Télémétrie d'usage de la GUI V6 (suivi licence/client, jamais audit médical).
RGPD : on n'émet QUE des compteurs et métadonnées non sensibles. Jamais de nom
ou de chemin de fichier, de texte extrait, d'entités ni de noms patients.
L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement.
"""
from __future__ import annotations
import json
import uuid
from pathlib import Path
from typing import Any, Callable, Iterable, Optional
# Clés autorisées par document (filtre RGPD appliqué à la construction).
_ALLOWED_DOC_KEYS = {"ordinal", "page_count", "status", "duration_ms", "extension"}
_IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".gif"}
REPORT_PATH = "/api/v1/usage/report"
def new_run_id() -> str:
return uuid.uuid4().hex
def _default_pdf_counter(path: Any) -> Optional[int]:
try:
import fitz
with fitz.open(str(path)) as doc:
return len(doc)
except Exception:
return None
def page_count_for(
path: Any, pdf_counter: Callable[[Any], Optional[int]] = _default_pdf_counter
) -> Optional[int]:
"""Nombre de pages best-effort. PDF→compteur, image→1, autres→None.
Ne lève jamais et ne lit pas le nom du fichier (seulement l'extension).
"""
try:
ext = Path(str(path)).suffix.lower()
except Exception:
return None
if ext == ".pdf":
try:
return pdf_counter(path)
except Exception:
return None
if ext in _IMAGE_EXTS:
return 1
return None
def build_usage_payload(
*,
run_id: str,
app_name: str,
app_version: str,
license_ref: Optional[str],
machine_id: Optional[str],
documents: Iterable[dict],
) -> dict:
"""Construit le payload d'usage. Les documents sont filtrés aux seules clés
autorisées → aucun nom/chemin ne peut fuir, même fourni par erreur."""
clean_docs: list[dict] = []
succeeded = failed = total_pages = 0
for raw in documents:
doc = {k: raw[k] for k in _ALLOWED_DOC_KEYS if k in raw}
status = doc.get("status")
if status == "success":
succeeded += 1
elif status == "failed":
failed += 1
page_count = doc.get("page_count")
if isinstance(page_count, int):
total_pages += page_count
clean_docs.append(doc)
return {
"run_id": run_id,
"license_ref": license_ref,
"machine_id": machine_id,
"app_name": app_name,
"app_version": app_version,
"document_count": len(clean_docs),
"succeeded_count": succeeded,
"failed_count": failed,
"total_pages": total_pages,
"documents": clean_docs,
}
class UsageTelemetryClient:
"""Envoie un payload d'usage au portail. Non bloquant : capture toute erreur."""
def __init__(
self,
base_url: str,
session: Any,
timeout: float = 4.0,
logger: Optional[Callable[[str], None]] = None,
) -> None:
self._url = base_url.rstrip("/") + REPORT_PATH
self._session = session
self._timeout = timeout
self._log = logger or (lambda _msg: None)
def report(self, payload: dict) -> bool:
try:
resp = self._session.post(self._url, json=payload, timeout=self._timeout)
status = getattr(resp, "status_code", 0)
ok = 200 <= int(status) < 300
if not ok:
self._log(f"usage report refusé (HTTP {status})")
return ok
except Exception as exc: # réseau absent, timeout, etc.
self._log(f"usage report échec (non bloquant) : {exc}")
return False
# --- file locale JSONL (rejeu best-effort des échecs) -----------------------
def documents_from_summary(summary: Any) -> list[dict]:
"""Extrait la liste de documents (RGPD-safe) d'un ``RunSummary``.
Ne lit que les attributs autorisés ; aucun nom/chemin n'est récupéré.
"""
docs: list[dict] = []
for item in getattr(summary, "documents", None) or []:
docs.append(
{
"ordinal": getattr(item, "ordinal", 0),
"page_count": getattr(item, "page_count", None),
"status": getattr(item, "status", "success"),
"duration_ms": getattr(item, "duration_ms", None),
"extension": getattr(item, "extension", None),
}
)
return docs
def report_run_summary(
summary: Any,
*,
base_url: str,
license_ref: Optional[str],
machine_id: Optional[str],
session: Any,
app_name: str = "gui_v6",
app_version: Optional[str] = None,
run_id: Optional[str] = None,
spool_path: Any = None,
logger: Optional[Callable[[str], None]] = None,
) -> bool:
"""Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant).
N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole le
payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais.
"""
log = logger or (lambda _msg: None)
if not license_ref:
log("télémétrie ignorée : aucune licence locale valide")
return False
payload = build_usage_payload(
run_id=run_id or new_run_id(),
app_name=app_name,
app_version=app_version,
license_ref=license_ref,
machine_id=machine_id,
documents=documents_from_summary(summary),
)
client = UsageTelemetryClient(base_url, session=session, logger=log)
ok = client.report(payload)
if not ok and spool_path is not None:
spool_payload(spool_path, payload)
return ok
def spool_payload(path: Any, payload: dict) -> None:
"""Ajoute un payload à la file JSONL locale (ne lève pas)."""
try:
p = Path(path)
p.parent.mkdir(parents=True, exist_ok=True)
with p.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
except Exception:
pass
def flush_spool(path: Any, client: "UsageTelemetryClient") -> int:
"""Tente d'envoyer chaque payload en file ; conserve ceux qui échouent.
Retourne le nombre de payloads envoyés avec succès. Ne lève jamais.
"""
p = Path(path)
if not p.exists():
return 0
try:
lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()]
except Exception:
return 0
remaining: list[str] = []
sent = 0
for line in lines:
try:
payload = json.loads(line)
except Exception:
continue # ligne corrompue : on l'abandonne
if client.report(payload):
sent += 1
else:
remaining.append(line)
try:
if remaining:
p.write_text("\n".join(remaining) + "\n", encoding="utf-8")
else:
p.unlink(missing_ok=True)
except Exception:
pass
return sent