Le module usage_telemetry est maintenant réellement branché : la GUI V6 envoie les statistiques au portail après chaque run (les stats web restaient vides sans cela). - processing_runner : RunSummary porte une liste DocResult (ordinal, page_count via page_count_for, status, duration_ms, extension) — peuplée dans la boucle. Aucun nom/chemin de fichier. - usage_telemetry : report_run_summary(summary, base_url, license_ref, machine_id, session, ...) construit le payload depuis le RunSummary et l'envoie (non bloquant). N'envoie RIEN sans license_ref. Spool JSONL si échec réseau. - tab_usage : _finish() déclenche l'envoi en thread daemon (jamais bloquant pour l'UI ni le run). - app : fournit le reporter à UsageTab avec le contexte licence (base_url du LicenseClient, license_ref via local_status, machine_id, app_version). Tests : RunSummary.documents peuplé (0 chemin) ; report_run_summary (payload correct, réseau KO → spool sans crash, pas d'envoi sans licence) ; _finish appelle le reporter. 252 tests unit OK (0 régression), self-test OK. V5/moteur/app_aivanov intacts, 0 dépendance. Aucun build/push sans GO Dom. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
202 lines
6.6 KiB
Python
202 lines
6.6 KiB
Python
"""Télémétrie d'usage GUI V6 — payload RGPD-safe + envoi non bloquant.
|
|
|
|
Aucun nom/chemin de fichier ne doit sortir. L'échec réseau ne doit jamais
|
|
faire échouer le traitement.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
|
|
import pytest
|
|
|
|
from gui_v6.usage_telemetry import (
|
|
UsageTelemetryClient,
|
|
build_usage_payload,
|
|
flush_spool,
|
|
new_run_id,
|
|
page_count_for,
|
|
spool_payload,
|
|
)
|
|
|
|
|
|
# --- page_count_for ---------------------------------------------------------
|
|
|
|
def test_page_count_image_is_one():
|
|
assert page_count_for("scan.PNG") == 1
|
|
assert page_count_for("photo.jpeg") == 1
|
|
|
|
|
|
def test_page_count_unknown_is_none():
|
|
assert page_count_for("note.txt") is None
|
|
assert page_count_for("doc.docx") is None
|
|
|
|
|
|
def test_page_count_pdf_uses_counter_and_never_raises():
|
|
assert page_count_for("x.pdf", pdf_counter=lambda p: 7) == 7
|
|
|
|
def boom(_p):
|
|
raise RuntimeError("corrompu")
|
|
|
|
assert page_count_for("x.pdf", pdf_counter=boom) is None
|
|
|
|
|
|
# --- build_usage_payload ----------------------------------------------------
|
|
|
|
def test_payload_counts_and_no_filename_leak():
|
|
documents = [
|
|
{"ordinal": 0, "page_count": 5, "status": "success", "extension": "pdf",
|
|
"filename": "LETTRE 23070126.pdf", "path": "/home/dom/secret.pdf"},
|
|
{"ordinal": 1, "page_count": 3, "status": "success"},
|
|
{"ordinal": 2, "page_count": None, "status": "failed"},
|
|
]
|
|
payload = build_usage_payload(
|
|
run_id="r1", app_name="gui_v6", app_version="6.0.0-g1",
|
|
license_ref="LIC-1", machine_id="m1", documents=documents,
|
|
)
|
|
assert payload["document_count"] == 3
|
|
assert payload["succeeded_count"] == 2
|
|
assert payload["failed_count"] == 1
|
|
assert payload["total_pages"] == 8
|
|
assert payload["license_ref"] == "LIC-1"
|
|
# RGPD : aucun nom/chemin ne doit survivre, à aucun niveau
|
|
blob = json.dumps(payload, ensure_ascii=False).lower()
|
|
assert "filename" not in blob
|
|
assert "secret" not in blob
|
|
assert "lettre" not in blob
|
|
for doc in payload["documents"]:
|
|
assert set(doc).issubset({"ordinal", "page_count", "status", "duration_ms", "extension"})
|
|
|
|
|
|
def test_new_run_id_unique():
|
|
assert new_run_id() != new_run_id()
|
|
|
|
|
|
# --- UsageTelemetryClient ---------------------------------------------------
|
|
|
|
class _FakeResp:
|
|
def __init__(self, status_code):
|
|
self.status_code = status_code
|
|
|
|
|
|
class _FakeSession:
|
|
def __init__(self, status_code=200, raise_exc=None):
|
|
self.status_code = status_code
|
|
self.raise_exc = raise_exc
|
|
self.calls = []
|
|
|
|
def post(self, url, json=None, timeout=None):
|
|
self.calls.append({"url": url, "json": json, "timeout": timeout})
|
|
if self.raise_exc is not None:
|
|
raise self.raise_exc
|
|
return _FakeResp(self.status_code)
|
|
|
|
|
|
def test_report_ok_on_2xx():
|
|
sess = _FakeSession(status_code=200)
|
|
client = UsageTelemetryClient("http://localhost:8000", session=sess)
|
|
assert client.report({"run_id": "r1"}) is True
|
|
assert sess.calls[0]["url"].endswith("/api/v1/usage/report")
|
|
assert sess.calls[0]["json"] == {"run_id": "r1"}
|
|
|
|
|
|
def test_report_false_on_network_error_without_raising():
|
|
sess = _FakeSession(raise_exc=ConnectionError("réseau coupé"))
|
|
client = UsageTelemetryClient("http://localhost:8000", session=sess)
|
|
assert client.report({"run_id": "r1"}) is False # ne lève pas
|
|
|
|
|
|
def test_report_false_on_non_2xx():
|
|
sess = _FakeSession(status_code=403)
|
|
client = UsageTelemetryClient("http://localhost:8000", session=sess)
|
|
assert client.report({"run_id": "r1"}) is False
|
|
|
|
|
|
# --- spool JSONL (rejeu des échecs) -----------------------------------------
|
|
|
|
def test_spool_and_flush(tmp_path):
|
|
spool = tmp_path / "usage_spool.jsonl"
|
|
spool_payload(spool, {"run_id": "a"})
|
|
spool_payload(spool, {"run_id": "b"})
|
|
assert spool.read_text(encoding="utf-8").count("\n") == 2
|
|
|
|
# tout part : le spool est vidé
|
|
sent = []
|
|
ok_client = UsageTelemetryClient("http://x", session=_FakeSession(200))
|
|
flush_spool(spool, ok_client)
|
|
assert not spool.exists() or spool.read_text(encoding="utf-8").strip() == ""
|
|
|
|
|
|
def test_flush_keeps_failures(tmp_path):
|
|
spool = tmp_path / "usage_spool.jsonl"
|
|
spool_payload(spool, {"run_id": "a"})
|
|
down_client = UsageTelemetryClient("http://x", session=_FakeSession(raise_exc=OSError("down")))
|
|
flush_spool(spool, down_client)
|
|
# l'échec reste en file pour un prochain essai
|
|
assert spool.exists()
|
|
assert "a" in spool.read_text(encoding="utf-8")
|
|
|
|
|
|
# --- report_run_summary (câblage fin de run) --------------------------------
|
|
|
|
class _FakeDoc:
|
|
def __init__(self, ordinal, page_count, status, duration_ms=None, extension=None):
|
|
self.ordinal = ordinal
|
|
self.page_count = page_count
|
|
self.status = status
|
|
self.duration_ms = duration_ms
|
|
self.extension = extension
|
|
|
|
|
|
class _FakeSummary:
|
|
def __init__(self, documents):
|
|
self.documents = documents
|
|
|
|
|
|
def test_report_run_summary_builds_and_sends():
|
|
from gui_v6.usage_telemetry import report_run_summary
|
|
|
|
sess = _FakeSession(status_code=200)
|
|
summary = _FakeSummary([
|
|
_FakeDoc(0, 5, "success", extension="pdf"),
|
|
_FakeDoc(1, None, "failed"),
|
|
])
|
|
ok = report_run_summary(
|
|
summary, base_url="http://localhost:8088", license_ref="LIC-1",
|
|
machine_id="machine-0001", session=sess, app_version="6.0.0-g1",
|
|
)
|
|
assert ok is True
|
|
payload = sess.calls[0]["json"]
|
|
assert payload["license_ref"] == "LIC-1"
|
|
assert payload["app_name"] == "gui_v6"
|
|
assert payload["document_count"] == 2
|
|
assert payload["total_pages"] == 5
|
|
blob = json.dumps(payload, ensure_ascii=False).lower()
|
|
assert "filename" not in blob and "path" not in blob
|
|
|
|
|
|
def test_report_run_summary_no_send_without_license():
|
|
from gui_v6.usage_telemetry import report_run_summary
|
|
|
|
sess = _FakeSession(status_code=200)
|
|
summary = _FakeSummary([_FakeDoc(0, 1, "success")])
|
|
ok = report_run_summary(
|
|
summary, base_url="http://x", license_ref=None, machine_id="m1", session=sess
|
|
)
|
|
assert ok is False
|
|
assert sess.calls == [] # aucun appel réseau sans licence
|
|
|
|
|
|
def test_report_run_summary_network_down_spools(tmp_path):
|
|
from gui_v6.usage_telemetry import report_run_summary
|
|
|
|
sess = _FakeSession(raise_exc=OSError("down"))
|
|
summary = _FakeSummary([_FakeDoc(0, 1, "success")])
|
|
spool = tmp_path / "spool.jsonl"
|
|
ok = report_run_summary(
|
|
summary, base_url="http://x", license_ref="LIC-1", machine_id="m1",
|
|
session=sess, spool_path=spool,
|
|
)
|
|
assert ok is False # ne lève pas
|
|
assert spool.exists() # conservé pour rejeu
|