"""Télémétrie d'usage GUI V6 — payload RGPD-safe + envoi non bloquant. Aucun nom/chemin de fichier ne doit sortir. L'échec réseau ne doit jamais faire échouer le traitement. """ from __future__ import annotations import json import pytest from gui_v6.usage_telemetry import ( UsageTelemetryClient, build_usage_payload, flush_spool, new_run_id, page_count_for, spool_payload, ) # --- page_count_for --------------------------------------------------------- def test_page_count_image_is_one(): assert page_count_for("scan.PNG") == 1 assert page_count_for("photo.jpeg") == 1 def test_page_count_unknown_is_none(): assert page_count_for("note.txt") is None assert page_count_for("doc.docx") is None def test_page_count_pdf_uses_counter_and_never_raises(): assert page_count_for("x.pdf", pdf_counter=lambda p: 7) == 7 def boom(_p): raise RuntimeError("corrompu") assert page_count_for("x.pdf", pdf_counter=boom) is None # --- build_usage_payload ---------------------------------------------------- def test_payload_counts_and_no_filename_leak(): documents = [ {"ordinal": 0, "page_count": 5, "status": "success", "extension": "pdf", "filename": "LETTRE 23070126.pdf", "path": "/home/dom/secret.pdf"}, {"ordinal": 1, "page_count": 3, "status": "success"}, {"ordinal": 2, "page_count": None, "status": "failed"}, ] payload = build_usage_payload( run_id="r1", app_name="gui_v6", app_version="6.0.0-g1", license_ref="LIC-1", machine_id="m1", documents=documents, ) assert payload["document_count"] == 3 assert payload["succeeded_count"] == 2 assert payload["failed_count"] == 1 assert payload["total_pages"] == 8 assert payload["license_ref"] == "LIC-1" # RGPD : aucun nom/chemin ne doit survivre, à aucun niveau blob = json.dumps(payload, ensure_ascii=False).lower() assert "filename" not in blob assert "secret" not in blob assert "lettre" not in blob for doc in payload["documents"]: assert set(doc).issubset({"ordinal", "page_count", "status", "duration_ms", "extension"}) def test_new_run_id_unique(): assert new_run_id() != new_run_id() # --- UsageTelemetryClient --------------------------------------------------- class _FakeResp: def __init__(self, status_code): self.status_code = status_code class _FakeSession: def __init__(self, status_code=200, raise_exc=None): self.status_code = status_code self.raise_exc = raise_exc self.calls = [] def post(self, url, json=None, timeout=None): self.calls.append({"url": url, "json": json, "timeout": timeout}) if self.raise_exc is not None: raise self.raise_exc return _FakeResp(self.status_code) def test_report_ok_on_2xx(): sess = _FakeSession(status_code=200) client = UsageTelemetryClient("http://localhost:8000", session=sess) assert client.report({"run_id": "r1"}) is True assert sess.calls[0]["url"].endswith("/api/v1/usage/report") assert sess.calls[0]["json"] == {"run_id": "r1"} def test_report_false_on_network_error_without_raising(): sess = _FakeSession(raise_exc=ConnectionError("réseau coupé")) client = UsageTelemetryClient("http://localhost:8000", session=sess) assert client.report({"run_id": "r1"}) is False # ne lève pas def test_report_false_on_non_2xx(): sess = _FakeSession(status_code=403) client = UsageTelemetryClient("http://localhost:8000", session=sess) assert client.report({"run_id": "r1"}) is False # --- spool JSONL (rejeu des échecs) ----------------------------------------- def test_spool_and_flush(tmp_path): spool = tmp_path / "usage_spool.jsonl" spool_payload(spool, {"run_id": "a"}) spool_payload(spool, {"run_id": "b"}) assert spool.read_text(encoding="utf-8").count("\n") == 2 # tout part : le spool est vidé sent = [] ok_client = UsageTelemetryClient("http://x", session=_FakeSession(200)) flush_spool(spool, ok_client) assert not spool.exists() or spool.read_text(encoding="utf-8").strip() == "" def test_flush_keeps_failures(tmp_path): spool = tmp_path / "usage_spool.jsonl" spool_payload(spool, {"run_id": "a"}) down_client = UsageTelemetryClient("http://x", session=_FakeSession(raise_exc=OSError("down"))) flush_spool(spool, down_client) # l'échec reste en file pour un prochain essai assert spool.exists() assert "a" in spool.read_text(encoding="utf-8") # --- report_run_summary (câblage fin de run) -------------------------------- class _FakeDoc: def __init__(self, ordinal, page_count, status, duration_ms=None, extension=None): self.ordinal = ordinal self.page_count = page_count self.status = status self.duration_ms = duration_ms self.extension = extension class _FakeSummary: def __init__(self, documents): self.documents = documents def test_report_run_summary_builds_and_sends(): from gui_v6.usage_telemetry import report_run_summary sess = _FakeSession(status_code=200) summary = _FakeSummary([ _FakeDoc(0, 5, "success", extension="pdf"), _FakeDoc(1, None, "failed"), ]) ok = report_run_summary( summary, base_url="http://localhost:8088", license_ref="LIC-1", machine_id="machine-0001", session=sess, app_version="6.0.0-g1", ) assert ok is True payload = sess.calls[0]["json"] assert payload["license_ref"] == "LIC-1" assert payload["app_name"] == "gui_v6" assert payload["document_count"] == 2 assert payload["total_pages"] == 5 blob = json.dumps(payload, ensure_ascii=False).lower() assert "filename" not in blob and "path" not in blob def test_report_run_summary_no_send_without_license(): from gui_v6.usage_telemetry import report_run_summary sess = _FakeSession(status_code=200) summary = _FakeSummary([_FakeDoc(0, 1, "success")]) ok = report_run_summary( summary, base_url="http://x", license_ref=None, machine_id="m1", session=sess ) assert ok is False assert sess.calls == [] # aucun appel réseau sans licence def test_report_run_summary_network_down_spools(tmp_path): from gui_v6.usage_telemetry import report_run_summary sess = _FakeSession(raise_exc=OSError("down")) summary = _FakeSummary([_FakeDoc(0, 1, "success")]) spool = tmp_path / "spool.jsonl" ok = report_run_summary( summary, base_url="http://x", license_ref="LIC-1", machine_id="m1", session=sess, spool_path=spool, ) assert ok is False # ne lève pas assert spool.exists() # conservé pour rejeu