"""Télémétrie d'usage de la GUI V6 (suivi licence/client, jamais audit médical). RGPD : on n'émet QUE des compteurs et métadonnées non sensibles. Jamais de nom ou de chemin de fichier, de texte extrait, d'entités ni de noms patients. L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement. """ from __future__ import annotations import json import uuid from pathlib import Path from typing import Any, Callable, Iterable, Optional # Clés autorisées par document (filtre RGPD appliqué à la construction). _ALLOWED_DOC_KEYS = {"ordinal", "page_count", "status", "duration_ms", "extension"} _IMAGE_EXTS = {".jpg", ".jpeg", ".png", ".tif", ".tiff", ".bmp", ".gif"} REPORT_PATH = "/api/v1/usage/report" def new_run_id() -> str: return uuid.uuid4().hex def _default_pdf_counter(path: Any) -> Optional[int]: try: import fitz with fitz.open(str(path)) as doc: return len(doc) except Exception: return None def page_count_for( path: Any, pdf_counter: Callable[[Any], Optional[int]] = _default_pdf_counter ) -> Optional[int]: """Nombre de pages best-effort. PDF→compteur, image→1, autres→None. Ne lève jamais et ne lit pas le nom du fichier (seulement l'extension). """ try: ext = Path(str(path)).suffix.lower() except Exception: return None if ext == ".pdf": try: return pdf_counter(path) except Exception: return None if ext in _IMAGE_EXTS: return 1 return None def build_usage_payload( *, run_id: str, app_name: str, app_version: str, license_ref: Optional[str], machine_id: Optional[str], documents: Iterable[dict], ) -> dict: """Construit le payload d'usage. Les documents sont filtrés aux seules clés autorisées → aucun nom/chemin ne peut fuir, même fourni par erreur.""" clean_docs: list[dict] = [] succeeded = failed = total_pages = 0 for raw in documents: doc = {k: raw[k] for k in _ALLOWED_DOC_KEYS if k in raw} status = doc.get("status") if status == "success": succeeded += 1 elif status == "failed": failed += 1 page_count = doc.get("page_count") if isinstance(page_count, int): total_pages += page_count clean_docs.append(doc) return { "run_id": run_id, "license_ref": license_ref, "machine_id": machine_id, "app_name": app_name, "app_version": app_version, "document_count": len(clean_docs), "succeeded_count": succeeded, "failed_count": failed, "total_pages": total_pages, "documents": clean_docs, } class UsageTelemetryClient: """Envoie un payload d'usage au portail. Non bloquant : capture toute erreur.""" def __init__( self, base_url: str, session: Any, timeout: float = 4.0, logger: Optional[Callable[[str], None]] = None, ) -> None: self._url = base_url.rstrip("/") + REPORT_PATH self._session = session self._timeout = timeout self._log = logger or (lambda _msg: None) def report(self, payload: dict) -> bool: try: resp = self._session.post(self._url, json=payload, timeout=self._timeout) status = getattr(resp, "status_code", 0) ok = 200 <= int(status) < 300 if not ok: self._log(f"usage report refusé (HTTP {status})") return ok except Exception as exc: # réseau absent, timeout, etc. self._log(f"usage report échec (non bloquant) : {exc}") return False # --- file locale JSONL (rejeu best-effort des échecs) ----------------------- def documents_from_summary(summary: Any) -> list[dict]: """Extrait la liste de documents (RGPD-safe) d'un ``RunSummary``. Ne lit que les attributs autorisés ; aucun nom/chemin n'est récupéré. """ docs: list[dict] = [] for item in getattr(summary, "documents", None) or []: docs.append( { "ordinal": getattr(item, "ordinal", 0), "page_count": getattr(item, "page_count", None), "status": getattr(item, "status", "success"), "duration_ms": getattr(item, "duration_ms", None), "extension": getattr(item, "extension", None), } ) return docs def report_run_summary( summary: Any, *, base_url: str, license_ref: Optional[str], machine_id: Optional[str], session: Any, app_name: str = "gui_v6", app_version: Optional[str] = None, run_id: Optional[str] = None, spool_path: Any = None, logger: Optional[Callable[[str], None]] = None, ) -> bool: """Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant). N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole le payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais. """ log = logger or (lambda _msg: None) if not license_ref: log("télémétrie ignorée : aucune licence locale valide") return False payload = build_usage_payload( run_id=run_id or new_run_id(), app_name=app_name, app_version=app_version, license_ref=license_ref, machine_id=machine_id, documents=documents_from_summary(summary), ) client = UsageTelemetryClient(base_url, session=session, logger=log) ok = client.report(payload) if not ok and spool_path is not None: spool_payload(spool_path, payload) return ok def spool_payload(path: Any, payload: dict) -> None: """Ajoute un payload à la file JSONL locale (ne lève pas).""" try: p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) with p.open("a", encoding="utf-8") as fh: fh.write(json.dumps(payload, ensure_ascii=False) + "\n") except Exception: pass def flush_spool(path: Any, client: "UsageTelemetryClient") -> int: """Tente d'envoyer chaque payload en file ; conserve ceux qui échouent. Retourne le nombre de payloads envoyés avec succès. Ne lève jamais. """ p = Path(path) if not p.exists(): return 0 try: lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()] except Exception: return 0 remaining: list[str] = [] sent = 0 for line in lines: try: payload = json.loads(line) except Exception: continue # ligne corrompue : on l'abandonne if client.report(payload): sent += 1 else: remaining.append(line) try: if remaining: p.write_text("\n".join(remaining) + "\n", encoding="utf-8") else: p.unlink(missing_ok=True) except Exception: pass return sent