Files
rpa_vision_v3/tests/integration/test_streamer_buffer_and_purge.py
Dom 013fe071a2 feat(streamer): purge après ACK + buffering SQLite persistant
- Nouveau module persistent_buffer.py (SQLite WAL, thread-safe)
- Purge automatique des captures locales après ACK 200 serveur
- Drain loop 15s, retry exponentiel, plafonds tentatives
- Enum ImageSendResult.{OK, FAILED, FILE_GONE} pour distinguer les cas
- FileNotFoundError n'est plus un faux succès (P0-E audit)
- 14 tests intégration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:47:35 +02:00

379 lines
14 KiB
Python

"""
Tests pour les fonctionnalités Partie A (purge après ACK) et Partie B
(buffer persistant) du TraceStreamer — bloquants audit AI Act.
Aucun réseau : on mocke requests.post.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_png(path: Path, size: int = 100) -> Path:
"""Crée un PNG minimal (header + padding) valide pour open()."""
path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * size)
return path
@pytest.fixture
def isolated_buffer(tmp_path, monkeypatch):
"""Isole le buffer persistant dans un tmp_path par test.
Le buffer est normalement partagé (BASE_DIR / "buffer"). On pointe
vers un chemin jetable pour éviter la pollution croisée entre tests.
"""
from agent_v0.agent_v1.network import streamer as streamer_mod
buffer_dir = tmp_path / "buffer"
monkeypatch.setattr(streamer_mod, "BUFFER_DIR", buffer_dir)
return buffer_dir
# ---------------------------------------------------------------------------
# Partie A — Purge après ACK
# ---------------------------------------------------------------------------
class TestPurgeAfterAck:
"""Partie A : les screenshots locaux sont supprimés après HTTP 200."""
def test_image_purged_after_ack(self, tmp_path, isolated_buffer):
"""Après HTTP 200, le fichier image local doit être supprimé."""
from agent_v0.agent_v1.network.streamer import (
ImageSendResult,
TraceStreamer,
)
img_path = _make_png(tmp_path / "to_purge.png")
assert img_path.exists()
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer = TraceStreamer("sess_purge_001")
streamer._server_available = True
result = streamer._send_image(str(img_path), "shot_test")
assert result is ImageSendResult.OK
assert not img_path.exists(), "Fichier local doit être supprimé après ACK"
def test_image_not_purged_if_server_rejects(self, tmp_path, isolated_buffer):
"""Si le serveur répond 500, le fichier local est conservé."""
from agent_v0.agent_v1.network.streamer import (
ImageSendResult,
TraceStreamer,
)
img_path = _make_png(tmp_path / "keep_me.png")
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=False, status_code=500)
streamer = TraceStreamer("sess_purge_002")
streamer._server_available = True
result = streamer._send_image(str(img_path), "shot_test")
assert result is ImageSendResult.FAILED
assert img_path.exists(), "Fichier doit rester si le serveur rejette"
def test_purge_disabled_via_env(
self, tmp_path, isolated_buffer, monkeypatch
):
"""RPA_PURGE_AFTER_ACK=0 désactive la purge."""
# On patche PURGE_AFTER_ACK directement (lu au module load)
from agent_v0.agent_v1.network import streamer as streamer_mod
monkeypatch.setattr(streamer_mod, "PURGE_AFTER_ACK", False)
img_path = _make_png(tmp_path / "keep.png")
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer = streamer_mod.TraceStreamer("sess_purge_003")
streamer._server_available = True
streamer._send_image(str(img_path), "shot_test")
assert img_path.exists(), "Purge doit être désactivée"
def test_purge_does_not_crash_on_locked_file(
self, tmp_path, isolated_buffer, monkeypatch
):
"""Si os.remove échoue (fichier verrouillé), pas de crash."""
from agent_v0.agent_v1.network import streamer as streamer_mod
img_path = _make_png(tmp_path / "locked.png")
def _raise_permission(*_args, **_kwargs):
raise PermissionError("Fichier verrouillé (simulé)")
monkeypatch.setattr(streamer_mod.os, "remove", _raise_permission)
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer = streamer_mod.TraceStreamer("sess_purge_004")
streamer._server_available = True
# Ne doit PAS lever
result = streamer._send_image(str(img_path), "shot_test")
from agent_v0.agent_v1.network.streamer import ImageSendResult
assert result is ImageSendResult.OK
# ---------------------------------------------------------------------------
# Partie B — Buffer persistant SQLite
# ---------------------------------------------------------------------------
class TestPersistentBuffer:
"""Partie B : persistance disque des events/images non envoyés."""
def test_priority_event_persisted_when_server_down(
self, tmp_path, isolated_buffer
):
"""Un event prioritaire est persisté si le serveur est indisponible."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_buf_001")
streamer._server_available = False
streamer.push_event({"type": "click", "pos": [100, 200]})
buf = streamer._get_buffer()
counts = buf.counts()
assert counts["events"] == 1, "Click doit être persisté"
def test_heartbeat_not_persisted_when_server_down(
self, tmp_path, isolated_buffer
):
"""Un heartbeat (non prioritaire) n'est PAS persisté."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_buf_002")
streamer._server_available = False
# La queue n'est pas pleine, donc le heartbeat va dans la queue RAM
streamer.push_event({"type": "heartbeat", "image": "/tmp/h.png"})
buf = streamer._get_buffer()
# Heartbeat reste dans la queue RAM (pas prioritaire → pas persisté)
assert buf.counts()["events"] == 0
def test_image_persisted_when_server_down(
self, tmp_path, isolated_buffer
):
"""Une image est persistée si le serveur est indisponible."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
img = _make_png(tmp_path / "img.png")
streamer = TraceStreamer("sess_buf_003")
streamer._server_available = False
streamer.push_image(str(img), "shot_001")
buf = streamer._get_buffer()
assert buf.counts()["images"] == 1
def test_buffer_persists_when_queue_full(
self, tmp_path, isolated_buffer
):
"""Quand la queue RAM est pleine, un event prioritaire va en SQLite."""
from agent_v0.agent_v1.network import streamer as streamer_mod
# Monkeypatch la taille max de queue pour forcer le débordement vite
streamer = streamer_mod.TraceStreamer("sess_buf_004")
streamer._server_available = True
# Remplir artificiellement la queue
import queue as _q
# Remplir jusqu'à être full
while True:
try:
streamer.queue.put_nowait(("event", {"type": "noise"}))
except _q.Full:
break
# Maintenant queue pleine — un click doit aller en SQLite
streamer.push_event({"type": "click", "pos": [1, 2]})
buf = streamer._get_buffer()
assert buf.counts()["events"] >= 1
def test_drain_replays_events_when_server_recovers(
self, tmp_path, isolated_buffer
):
"""Le drain rejoue les events persistés quand le serveur revient."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_buf_005")
# Persister un event pendant que le serveur est down
streamer._server_available = False
streamer.push_event({"type": "click", "pos": [50, 50]})
assert streamer._get_buffer().counts()["events"] == 1
# Serveur revient — on simule un drain manuel
streamer._server_available = True
with patch(
"agent_v0.agent_v1.network.streamer.requests"
) as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer._drain_buffer_once(streamer._get_buffer())
# L'event doit être envoyé ET supprimé du buffer
event_calls = [
c for c in mock_req.post.call_args_list if "/event" in str(c)
]
assert len(event_calls) == 1
assert streamer._get_buffer().counts()["events"] == 0
def test_drain_increments_attempts_on_failure(
self, tmp_path, isolated_buffer
):
"""Si le drain échoue, attempts est incrémenté (pas de suppression)."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_buf_006")
streamer._server_available = False
streamer.push_event({"type": "click"})
buf = streamer._get_buffer()
assert buf.counts()["events"] == 1
# Simule un envoi qui échoue (500)
streamer._server_available = True
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=False, status_code=500)
streamer._drain_buffer_once(buf)
# L'event reste dans le buffer avec attempts=1
rows = buf.drain_events()
assert len(rows) == 1
assert rows[0]["attempts"] == 1
def test_event_abandoned_after_max_attempts(
self, tmp_path, isolated_buffer
):
"""Après MAX_ATTEMPTS, un event est abandonné (supprimé + log error)."""
from agent_v0.agent_v1.network.persistent_buffer import (
MAX_ATTEMPTS,
PersistentBuffer,
)
buf = PersistentBuffer(tmp_path / "buf")
buf.add_event("sess_aband", {"type": "click"})
# Incrémenter attempts jusqu'au max
rows = buf.drain_events()
for _ in range(MAX_ATTEMPTS):
buf.increment_attempts(rows[0]["id"], "event")
abandoned = buf.abandon_exceeded()
assert abandoned == 1
assert buf.counts()["events"] == 0
def test_buffer_survives_corrupted_db(self, tmp_path):
"""Un fichier DB corrompu est renommé et un nouveau est créé."""
from agent_v0.agent_v1.network.persistent_buffer import (
PersistentBuffer,
)
buffer_dir = tmp_path / "buf"
buffer_dir.mkdir()
# Créer un fichier "DB" corrompu
db_path = buffer_dir / "pending_events.db"
db_path.write_bytes(b"this is not a valid sqlite db file\x00\x01")
# Ne doit pas crasher
buf = PersistentBuffer(buffer_dir)
# Le buffer doit être utilisable
assert buf.add_event("sess_recover", {"type": "click"}) is True
assert buf.counts()["events"] == 1
def test_drain_skips_image_with_missing_file(
self, tmp_path, isolated_buffer
):
"""Si le fichier image a disparu, on supprime l'entrée du buffer."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_buf_missing")
streamer._server_available = False
# Persister une image vers un chemin qui n'existe pas
streamer.push_image("/tmp/does_not_exist_xyz.png", "shot_missing")
buf = streamer._get_buffer()
assert buf.counts()["images"] == 1
# Drain : l'entrée doit être supprimée (fichier introuvable)
streamer._server_available = True
with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer._drain_buffer_once(buf)
assert buf.counts()["images"] == 0
# ---------------------------------------------------------------------------
# Scénarios complets (reprise, coupure réseau)
# ---------------------------------------------------------------------------
class TestScenarios:
"""Scénarios de bout en bout pour valider la reprise après incident."""
def test_scenario_server_offline_then_recover(
self, tmp_path, isolated_buffer
):
"""Scénario : serveur offline → events bufferisés → serveur revient
→ drain automatique → buffer vide."""
from agent_v0.agent_v1.network.streamer import TraceStreamer
streamer = TraceStreamer("sess_scenario_001")
# 1) Serveur offline au démarrage
streamer._server_available = False
# 2) L'utilisateur clique 5 fois
for i in range(5):
streamer.push_event({"type": "click", "pos": [i, i]})
buf = streamer._get_buffer()
assert buf.counts()["events"] == 5, "5 clicks doivent être persistés"
# 3) Le serveur revient
streamer._server_available = True
# 4) Drain manuel (équivalent boucle)
with patch(
"agent_v0.agent_v1.network.streamer.requests"
) as mock_req:
mock_req.post.return_value = MagicMock(ok=True)
streamer._drain_buffer_once(buf)
# 5) Tous les events ont été envoyés dans l'ordre
event_calls = [
c for c in mock_req.post.call_args_list if "/event" in str(c)
]
assert len(event_calls) == 5
# Vérifier l'ordre (positions croissantes)
positions = [
c[1]["json"]["event"]["pos"][0] for c in event_calls
]
assert positions == [0, 1, 2, 3, 4]
assert buf.counts()["events"] == 0