191 lines
6.0 KiB
Python
191 lines
6.0 KiB
Python
"""Diagnostics structurés de la GUI V6 (E2/E3) — RGPD strict.
|
|
|
|
On n'émet QUE des métadonnées techniques liste-blanche : type d'exception
|
|
(nom de classe), catégorie d'erreur d'un ensemble fermé, statut, ordinal,
|
|
durée. JAMAIS de nom/chemin/texte de document, ni de message d'exception brut.
|
|
L'envoi est non bloquant : un échec réseau n'interrompt jamais le traitement.
|
|
Patron : gui_v6/usage_telemetry.py (télémétrie d'usage).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any, Callable, Iterable, Optional
|
|
|
|
# Clés autorisées par item de diagnostic (filtre RGPD appliqué à la construction).
|
|
_ALLOWED_ITEM_KEYS = {"ordinal", "status", "error_type", "error_code", "duration_ms"}
|
|
|
|
REPORT_PATH = "/api/v1/diagnostics/report"
|
|
|
|
|
|
def new_run_id() -> str:
|
|
return uuid.uuid4().hex
|
|
|
|
|
|
def items_from_summary(summary: Any) -> list[dict]:
|
|
"""Extrait les items de diagnostic (RGPD-safe) d'un ``RunSummary``.
|
|
|
|
Ne lit que les attributs autorisés ; aucun nom/chemin/message n'est lu.
|
|
"""
|
|
items: list[dict] = []
|
|
for item in getattr(summary, "documents", None) or []:
|
|
items.append(
|
|
{
|
|
"ordinal": getattr(item, "ordinal", 0),
|
|
"status": getattr(item, "status", "success"),
|
|
"error_type": getattr(item, "error_type", None),
|
|
"error_code": getattr(item, "error_code", None),
|
|
"duration_ms": getattr(item, "duration_ms", None),
|
|
}
|
|
)
|
|
return items
|
|
|
|
|
|
def build_diagnostics_payload(
|
|
*,
|
|
run_id: str,
|
|
app_name: str,
|
|
app_version: Optional[str],
|
|
license_ref: Optional[str],
|
|
machine_id: Optional[str],
|
|
duration_ms: Optional[int],
|
|
items: Iterable[dict],
|
|
) -> dict:
|
|
"""Construit le payload diagnostic. Chaque item est filtré aux seules clés
|
|
autorisées → aucun nom/chemin/message ne peut fuir, même fourni par erreur."""
|
|
clean_items: list[dict] = []
|
|
succeeded = failed = 0
|
|
for raw in items:
|
|
it = {k: raw[k] for k in _ALLOWED_ITEM_KEYS if k in raw}
|
|
status = it.get("status")
|
|
if status == "success":
|
|
succeeded += 1
|
|
elif status == "failed":
|
|
failed += 1
|
|
clean_items.append(it)
|
|
return {
|
|
"run_id": run_id,
|
|
"license_ref": license_ref,
|
|
"machine_id": machine_id,
|
|
"app_name": app_name,
|
|
"app_version": app_version,
|
|
"duration_ms": duration_ms,
|
|
"document_count": len(clean_items),
|
|
"succeeded_count": succeeded,
|
|
"failed_count": failed,
|
|
"items": clean_items,
|
|
}
|
|
|
|
|
|
class DiagnosticsClient:
|
|
"""Envoie un payload diagnostic au portail. Non bloquant : capture toute erreur."""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
session: Any,
|
|
timeout: float = 4.0,
|
|
logger: Optional[Callable[[str], None]] = None,
|
|
) -> None:
|
|
self._url = base_url.rstrip("/") + REPORT_PATH
|
|
self._session = session
|
|
self._timeout = timeout
|
|
self._log = logger or (lambda _msg: None)
|
|
|
|
def report(self, payload: dict) -> bool:
|
|
try:
|
|
resp = self._session.post(self._url, json=payload, timeout=self._timeout)
|
|
status = getattr(resp, "status_code", 0)
|
|
ok = 200 <= int(status) < 300
|
|
if not ok:
|
|
self._log(f"diagnostics report refusé (HTTP {status})")
|
|
return ok
|
|
except Exception as exc: # réseau absent, timeout, etc.
|
|
self._log(f"diagnostics report échec (non bloquant) : {exc}")
|
|
return False
|
|
|
|
|
|
def report_run_diagnostics(
|
|
summary: Any,
|
|
*,
|
|
base_url: str,
|
|
license_ref: Optional[str],
|
|
machine_id: Optional[str],
|
|
session: Any,
|
|
app_name: str = "gui_v6",
|
|
app_version: Optional[str] = None,
|
|
duration_ms: Optional[int] = None,
|
|
run_id: Optional[str] = None,
|
|
spool_path: Any = None,
|
|
logger: Optional[Callable[[str], None]] = None,
|
|
) -> bool:
|
|
"""Construit le payload depuis un ``RunSummary`` et l'envoie (non bloquant).
|
|
|
|
N'envoie RIEN si ``license_ref`` est absent. En cas d'échec réseau, spoole
|
|
le payload (si ``spool_path``) pour un rejeu ultérieur. Ne lève jamais.
|
|
"""
|
|
log = logger or (lambda _msg: None)
|
|
if not license_ref:
|
|
log("diagnostics ignorés : aucune licence locale valide")
|
|
return False
|
|
payload = build_diagnostics_payload(
|
|
run_id=run_id or new_run_id(),
|
|
app_name=app_name,
|
|
app_version=app_version,
|
|
license_ref=license_ref,
|
|
machine_id=machine_id,
|
|
duration_ms=duration_ms,
|
|
items=items_from_summary(summary),
|
|
)
|
|
client = DiagnosticsClient(base_url, session=session, logger=log)
|
|
ok = client.report(payload)
|
|
if not ok and spool_path is not None:
|
|
spool_payload(spool_path, payload)
|
|
return ok
|
|
|
|
|
|
def spool_payload(path: Any, payload: dict) -> None:
|
|
"""Ajoute un payload à la file JSONL locale (ne lève pas)."""
|
|
try:
|
|
p = Path(path)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
with p.open("a", encoding="utf-8") as fh:
|
|
fh.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def flush_spool(path: Any, client: "DiagnosticsClient") -> int:
|
|
"""Tente d'envoyer chaque payload en file ; conserve ceux qui échouent.
|
|
|
|
Retourne le nombre de payloads envoyés. Ne lève jamais.
|
|
"""
|
|
p = Path(path)
|
|
if not p.exists():
|
|
return 0
|
|
try:
|
|
lines = [ln for ln in p.read_text(encoding="utf-8").splitlines() if ln.strip()]
|
|
except Exception:
|
|
return 0
|
|
remaining: list[str] = []
|
|
sent = 0
|
|
for line in lines:
|
|
try:
|
|
payload = json.loads(line)
|
|
except Exception:
|
|
continue
|
|
if client.report(payload):
|
|
sent += 1
|
|
else:
|
|
remaining.append(line)
|
|
try:
|
|
if remaining:
|
|
p.write_text("\n".join(remaining) + "\n", encoding="utf-8")
|
|
else:
|
|
p.unlink(missing_ok=True)
|
|
except Exception:
|
|
pass
|
|
return sent
|