"""Diagnostics structurés de la GUI V6 (E2/E3) — RGPD strict. On n'émet QUE des métadonnées techniques liste-blanche : type d'exception (nom de classe), catégorie d'erreur d'un ensemble fermé, statut, ordinal, durée. JAMAIS de nom/chemin/texte de document, ni de message d'exception brut. L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement. Patron : gui_v6/usage_telemetry.py (télémétrie d'usage). """ from __future__ import annotations import json import uuid from pathlib import Path from typing import Any, Callable, Iterable, Optional # Clés autorisées par item de diagnostic (filtre RGPD appliqué à la construction). _ALLOWED_ITEM_KEYS = {"ordinal", "status", "error_type", "error_code", "duration_ms"} REPORT_PATH = "/api/v1/diagnostics/report" def new_run_id() -> str: return uuid.uuid4().hex def items_from_summary(summary: Any) -> list[dict]: """Extrait les items de diagnostic (RGPD-safe) d'un ``RunSummary``. Ne lit que les attributs autorisés ; aucun nom/chemin/message n'est lu. """ items: list[dict] = [] for item in getattr(summary, "documents", None) or []: items.append( { "ordinal": getattr(item, "ordinal", 0), "status": getattr(item, "status", "success"), "error_type": getattr(item, "error_type", None), "error_code": getattr(item, "error_code", None), "duration_ms": getattr(item, "duration_ms", None), } ) return items def build_diagnostics_payload( *, run_id: str, app_name: str, app_version: Optional[str], license_ref: Optional[str], machine_id: Optional[str], duration_ms: Optional[int], items: Iterable[dict], ) -> dict: """Construit le payload diagnostic. Chaque item est filtré aux seules clés autorisées → aucun nom/chemin/message ne peut fuir, même fourni par erreur.""" clean_items: list[dict] = [] succeeded = failed = 0 for raw in items: it = {k: raw[k] for k in _ALLOWED_ITEM_KEYS if k in raw} status = it.get("status") if status == "success": succeeded += 1 elif status == "failed": failed += 1 clean_items.append(it) return { "run_id": run_id, "license_ref": license_ref, "machine_id": machine_id, "app_name": app_name, "app_version": app_version, "duration_ms": duration_ms, "document_count": len(clean_items), "succeeded_count": succeeded, "failed_count": failed, "items": clean_items, } class DiagnosticsClient: """Envoie un payload diagnostic au portail. Non bloquant : capture toute erreur.""" def __init__( self, base_url: str, session: Any, timeout: float = 4.0, logger: Optional[Callable[[str], None]] = None, ) -> None: self._url = base_url.rstrip("/") + REPORT_PATH self._session = session self._timeout = timeout self._log = logger or (lambda _msg: None) def report(self, payload: dict) -> bool: try: resp = self._session.post(self._url, json=payload, timeout=self._timeout) status = getattr(resp, "status_code", 0) ok = 200 <= int(status) < 300 if not ok: self._log(f"diagnostics report refusé (HTTP {status})") return ok except Exception as exc: # réseau absent, timeout, etc. self._log(f"diagnostics report échec (non bloquant) : {exc}") return False def report_run_diagnostics( summary: Any, *, base_url: str, license_ref: Optional[str], machine_id: Optional[str], session: Any, app_name: str = "gui_v6", app_version: Optional[str] = None, duration_ms: Optional[int] = None, run_id: Optional[str] = None, spool_path: Any = None, logger: Optional[Callable[[str], None]] = None, ) -> bool: """Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant). N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole le payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais. """ log = logger or (lambda _msg: None) if not license_ref: log("diagnostics ignorés : aucune licence locale valide") return False payload = build_diagnostics_payload( run_id=run_id or new_run_id(), app_name=app_name, app_version=app_version, license_ref=license_ref, machine_id=machine_id, duration_ms=duration_ms, items=items_from_summary(summary), ) client = DiagnosticsClient(base_url, session=session, logger=log) ok = client.report(payload) if not ok and spool_path is not None: spool_payload(spool_path, payload) return ok def spool_payload(path: Any, payload: dict) -> None: """Ajoute un payload à la file JSONL locale (ne lève pas).""" try: p = Path(path) p.parent.mkdir(parents=True, exist_ok=True) with p.open("a", encoding="utf-8") as fh: fh.write(json.dumps(payload, ensure_ascii=False) + "\n") except Exception: pass def flush_spool(path: Any, client: "DiagnosticsClient") -> int: """Tente d'envoyer chaque payload en file ; conserve ceux qui échouent. Retourne le nombre de payloads envoyés. Ne lève jamais. """ p = Path(path) if not p.exists(): return 0 try: lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()] except Exception: return 0 remaining: list[str] = [] sent = 0 for line in lines: try: payload = json.loads(line) except Exception: continue if client.report(payload): sent += 1 else: remaining.append(line) try: if remaining: p.write_text("\n".join(remaining) + "\n", encoding="utf-8") else: p.unlink(missing_ok=True) except Exception: pass return sent