feat(streamer): purge après ACK + buffering SQLite persistant

- Nouveau module persistent_buffer.py (SQLite WAL, thread-safe)
- Purge automatique des captures locales après ACK 200 serveur
- Drain loop 15s, retry exponentiel, plafonds tentatives
- Enum ImageSendResult.{OK, FAILED, FILE_GONE} pour distinguer les cas
- FileNotFoundError n'est plus un faux succès (P0-E audit)
- 14 tests intégration

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-14 16:47:35 +02:00
parent 203dc00d53
commit 013fe071a2
5 changed files with 1282 additions and 24 deletions

View File

@@ -14,10 +14,19 @@ Robustesse (P0-2) :
- Health-check périodique (30s) pour recovery du flag _server_available
- Compression JPEG qualité 85 pour les images (réduction ~5-10x)
- Backpressure : queue bornée (maxsize=100), drop des heartbeat si pleine
Conformité AI Act (Article 12 — journalisation automatique) :
- Purge après ACK : les screenshots locaux sont supprimés après HTTP 200
du serveur (par défaut). Le serveur devient la source de vérité.
- Buffer persistant : les events/images prioritaires non envoyés sont
persistés dans un SQLite local (agent_v1/buffer/pending_events.db)
et rejoués au démarrage et à la reconnexion.
"""
import enum
import io
import logging
import os
import queue
import threading
import time
@@ -25,7 +34,18 @@ import time
import requests
from PIL import Image
from ..config import API_TOKEN, STREAMING_ENDPOINT
from ..config import API_TOKEN, BASE_DIR, STREAMING_ENDPOINT
from .persistent_buffer import MAX_ATTEMPTS, PersistentBuffer
# Fix P0-E : résultat d'envoi d'image trivaleur (succès / échec réseau / fichier
# disparu). On ne doit PAS considérer un FileNotFoundError comme un succès
# HTTP 200 — sinon le buffer SQLite supprime l'entrée alors que le serveur n'a
# jamais reçu l'image (perte silencieuse).
class ImageSendResult(enum.Enum):
OK = "ok" # HTTP 200, serveur a accusé réception
FAILED = "failed" # Erreur réseau/serveur récupérable (retry OK)
FILE_GONE = "file_gone" # Fichier local introuvable (abandon, pas retry)
logger = logging.getLogger(__name__)
@@ -45,6 +65,20 @@ QUEUE_MAX_SIZE = 100
# Types d'événements à ne jamais dropper
PRIORITY_EVENT_TYPES = {"click", "key", "scroll", "action", "screenshot"}
# Purge locale après ACK serveur (Partie A de l'audit)
# Activé par défaut : le serveur conserve déjà les screenshots 180 jours
# (conformité AI Act Article 12). Désactivable via RPA_PURGE_AFTER_ACK=0
# pour debugging local.
PURGE_AFTER_ACK = os.environ.get("RPA_PURGE_AFTER_ACK", "1").lower() in (
"1", "true", "yes",
)
# Chemin du buffer persistant (Partie B de l'audit)
BUFFER_DIR = BASE_DIR / "buffer"
# Intervalle entre deux tentatives de drain du buffer (secondes)
BUFFER_DRAIN_INTERVAL_S = 15
class TraceStreamer:
def __init__(self, session_id: str, machine_id: str = "default"):
@@ -54,8 +88,20 @@ class TraceStreamer:
self.running = False
self._thread = None
self._health_thread = None
self._drain_thread = None
self._server_available = True # Désactivé après trop d'échecs
# Buffer persistant — partagé entre sessions (survit au redémarrage)
# Initialisé paresseusement pour ne pas payer le coût SQLite en dehors
# d'un streaming actif.
self._buffer: PersistentBuffer | None = None
def _get_buffer(self) -> PersistentBuffer:
"""Retourne le buffer persistant, en l'initialisant au besoin."""
if self._buffer is None:
self._buffer = PersistentBuffer(BUFFER_DIR)
return self._buffer
@staticmethod
def _auth_headers() -> dict:
"""Headers d'authentification Bearer pour les requêtes API."""
@@ -75,6 +121,11 @@ class TraceStreamer:
target=self._health_check_loop, daemon=True
)
self._health_thread.start()
# Thread de drain du buffer persistant (rejoue les items en attente)
self._drain_thread = threading.Thread(
target=self._buffer_drain_loop, daemon=True
)
self._drain_thread.start()
logger.info(f"Streamer pour {self.session_id} démarré")
def stop(self):
@@ -99,6 +150,9 @@ class TraceStreamer:
if self._health_thread:
self._health_thread.join(timeout=2.0)
if self._drain_thread:
self._drain_thread.join(timeout=2.0)
self._finalize_session()
logger.info(f"Streamer pour {self.session_id} arrêté")
@@ -126,11 +180,21 @@ class TraceStreamer:
Quand la queue est pleine :
- Les événements prioritaires (click, key, action, screenshot) sont
ajoutés en bloquant brièvement (0.5s)
- Les heartbeat sont silencieusement droppés
ajoutés en bloquant brièvement (0.5s). Si toujours pleine → persistés
dans le buffer SQLite pour rejeu ultérieur.
- Les heartbeat sont silencieusement droppés.
- Si le serveur est marqué indisponible, on persiste immédiatement les
items prioritaires (évite de remplir la queue inutilement).
"""
is_priority = self._is_priority_item(item_type, data)
# Serveur indisponible + item prioritaire → on persiste directement
# sans polluer la queue RAM (qui ne sera jamais vidée tant que le
# serveur est down).
if is_priority and not self._server_available:
self._persist_to_buffer(item_type, data)
return
try:
self.queue.put_nowait((item_type, data))
except queue.Full:
@@ -139,10 +203,18 @@ class TraceStreamer:
try:
self.queue.put((item_type, data), timeout=0.5)
except queue.Full:
logger.warning(
f"Queue pleine — événement prioritaire droppé "
f"(type={item_type})"
)
# Persistance disque (ne JAMAIS dropper un prioritaire)
persisted = self._persist_to_buffer(item_type, data)
if persisted:
logger.warning(
f"Queue pleine — événement prioritaire persisté "
f"sur disque (type={item_type})"
)
else:
logger.error(
f"Queue pleine ET buffer saturé — événement "
f"prioritaire perdu (type={item_type})"
)
else:
# Heartbeat ou événement non-critique : on drop silencieusement
logger.debug(
@@ -163,6 +235,23 @@ class TraceStreamer:
return event_type in PRIORITY_EVENT_TYPES
return False
def _persist_to_buffer(self, item_type: str, data) -> bool:
"""Persiste un item dans le buffer SQLite. Retourne True si OK.
Utilisé quand la queue est pleine ou le serveur indisponible.
"""
try:
buf = self._get_buffer()
if item_type == "event" and isinstance(data, dict):
return buf.add_event(self.session_id, data)
if item_type == "image":
path, shot_id = data
return buf.add_image(self.session_id, path, shot_id)
except Exception as e:
# On n'arrête jamais l'agent si le buffer échoue
logger.error(f"Persistance buffer échouée : {e}")
return False
# =========================================================================
# Boucle d'envoi
# =========================================================================
@@ -174,16 +263,36 @@ class TraceStreamer:
try:
item_type, data = self.queue.get(timeout=0.5)
success = False
is_file_gone = False
if item_type == "event":
success = self._send_with_retry(self._send_event, data)
elif item_type == "image":
success = self._send_with_retry(self._send_image, *data)
result = self._send_with_retry(self._send_image, *data)
# Fix P0-E : distinguer FILE_GONE du vrai succès HTTP.
if result is ImageSendResult.OK:
success = True
elif result is ImageSendResult.FILE_GONE:
# Fichier disparu : pas de retry, pas de persistance
# (on ne peut plus le renvoyer). On considère l'item
# comme traité sans comptabiliser un succès réseau.
is_file_gone = True
success = False
else:
success = False
self.queue.task_done()
if success:
consecutive_failures = 0
elif is_file_gone:
# Fichier introuvable — déjà logué ERROR dans _send_image.
# On ne persiste PAS dans le buffer (retry voué à échouer).
consecutive_failures = 0
else:
consecutive_failures += 1
# Après 3 retries infructueux, si l'item est prioritaire,
# on le persiste pour ne pas le perdre définitivement.
if self._is_priority_item(item_type, data):
self._persist_to_buffer(item_type, data)
if consecutive_failures >= 10:
logger.warning(
"10 échecs consécutifs — serveur marqué indisponible"
@@ -200,15 +309,22 @@ class TraceStreamer:
# Retry avec backoff exponentiel
# =========================================================================
def _send_with_retry(self, send_fn, *args) -> bool:
def _send_with_retry(self, send_fn, *args):
"""Tente l'envoi avec retry et backoff exponentiel.
3 tentatives max avec délais de 1s, 2s, 4s entre chaque.
Retourne True si l'envoi a réussi, False sinon.
Retourne :
- True / ImageSendResult.OK si l'envoi a réussi
- ImageSendResult.FILE_GONE (images uniquement) — pas de retry
- False / ImageSendResult.FAILED sinon
"""
# Première tentative (sans délai)
if send_fn(*args):
return True
first = send_fn(*args)
if first is ImageSendResult.OK or first is True:
return first
# Fix P0-E : FILE_GONE → pas de retry, l'erreur est permanente.
if first is ImageSendResult.FILE_GONE:
return first
# Retries avec backoff
for attempt, delay in enumerate(RETRY_DELAYS, start=1):
@@ -219,9 +335,13 @@ class TraceStreamer:
f"Retry {attempt}/{MAX_RETRIES} dans {delay}s..."
)
time.sleep(delay)
if send_fn(*args):
result = send_fn(*args)
if result is ImageSendResult.OK or result is True:
logger.debug(f"Retry {attempt} réussi")
return True
return result
# FILE_GONE pendant un retry — idem, on arrête
if result is ImageSendResult.FILE_GONE:
return result
logger.debug(f"Envoi échoué après {MAX_RETRIES} retries")
return False
@@ -260,6 +380,115 @@ class TraceStreamer:
except Exception:
logger.debug("Health-check échoué — serveur toujours indisponible")
# =========================================================================
# Drain du buffer persistant (Partie B)
# =========================================================================
def _buffer_drain_loop(self):
"""Rejoue les items persistés en arrière-plan.
Tourne tant que self.running. Essaie de drainer le buffer toutes les
BUFFER_DRAIN_INTERVAL_S secondes, mais seulement si :
- le serveur est disponible,
- il y a effectivement des items en attente.
Au premier passage (démarrage agent), on draine immédiatement pour
rejouer tout ce qui a été persisté lors de la session précédente.
"""
# Au démarrage : drain immédiat (pas d'attente)
first_pass = True
while self.running:
if not first_pass:
time.sleep(BUFFER_DRAIN_INTERVAL_S)
if not self.running:
break
first_pass = False
if not self._server_available:
continue
try:
buf = self._get_buffer()
# Abandonner d'abord les items exceeded (évite de les retenter)
abandoned = buf.abandon_exceeded()
if abandoned:
logger.warning(
f"Buffer : {abandoned} items abandonnés "
f"après {MAX_ATTEMPTS} tentatives"
)
counts = buf.counts()
if counts["events"] == 0 and counts["images"] == 0:
continue
logger.info(
f"Buffer drain : {counts['events']} events, "
f"{counts['images']} images en attente — rejeu"
)
self._drain_buffer_once(buf)
except Exception as e:
logger.error(f"Buffer drain loop échoué : {e}")
def _drain_buffer_once(self, buf: PersistentBuffer):
"""Une passe de drain : envoie ce qui peut l'être, incrémente le reste.
On arrête dès qu'un envoi échoue (serveur probablement down).
"""
# Events d'abord (plus légers, priorité métier AI Act)
for row in buf.drain_events(limit=50):
if not self._server_available:
return
try:
import json as _json
event = _json.loads(row["payload"])
except (ValueError, TypeError):
logger.error(
f"Buffer : payload event #{row['id']} corrompu, suppression"
)
buf.delete_event(row["id"])
continue
if self._send_event(event):
buf.delete_event(row["id"])
else:
buf.increment_attempts(row["id"], "event")
# Serveur répond mal — on arrête la passe
return
# Puis images
for row in buf.drain_images(limit=20):
if not self._server_available:
return
image_path = row["image_path"]
shot_id = row["shot_id"]
if not os.path.exists(image_path):
# Fichier local disparu (purge, clean-up) — on abandonne.
# Fix P0-E : log ERROR (pas warning) — c'est une perte de donnée.
logger.error(
f"Buffer : image #{row['id']} introuvable sur disque "
f"({image_path}) — entrée abandonnée (le serveur n'a "
f"jamais reçu cette image, session={row['session_id']}, "
f"shot={shot_id})"
)
buf.delete_image(row["id"])
continue
result = self._send_image(image_path, shot_id)
if result is ImageSendResult.OK or result is True:
buf.delete_image(row["id"])
elif result is ImageSendResult.FILE_GONE:
# Fix P0-E : fichier disparu pendant l'envoi.
# Ce n'est PAS un succès HTTP — ne pas considérer comme tel.
# On supprime néanmoins l'entrée (retry voué à échouer)
# mais avec un log ERROR explicite.
logger.error(
f"Buffer : image #{row['id']} disparue pendant l'envoi "
f"({image_path}) — entrée abandonnée, pas de retry "
f"(session={row['session_id']}, shot={shot_id})"
)
buf.delete_image(row["id"])
else:
buf.increment_attempts(row["id"], "image")
return
# =========================================================================
# Compression JPEG
# =========================================================================
@@ -287,6 +516,34 @@ class TraceStreamer:
logger.warning(f"Compression JPEG échouée, envoi PNG brut: {e}")
return None, None, None
# =========================================================================
# Purge locale après ACK (Partie A)
# =========================================================================
@staticmethod
def _purge_local_image(path: str):
"""Supprime un screenshot local après ACK 200 du serveur.
Ne crashe JAMAIS si le fichier est verrouillé (cas Windows) ou
déjà supprimé : on log en debug et on continue. L'auto-cleanup
de SessionStorage repassera plus tard.
"""
if not PURGE_AFTER_ACK:
return
try:
os.remove(path)
logger.debug(f"Screenshot local purgé après ACK : {path}")
except FileNotFoundError:
# Déjà supprimé ou chemin invalide — silencieux
pass
except PermissionError as e:
# Windows verrouille parfois les fichiers (antivirus, indexation...)
logger.debug(
f"Purge différée (fichier verrouillé) : {path}{e}"
)
except OSError as e:
logger.debug(f"Purge échouée : {path}{e}")
# =========================================================================
# Envois HTTP
# =========================================================================
@@ -337,7 +594,7 @@ class TraceStreamer:
else:
logger.warning(f"Finalisation échouée: {resp.status_code}")
except Exception as e:
logger.debug(f"Finalisation échouée: {e}")
logger.warning(f"Finalisation échouée: {e}")
def _send_event(self, event: dict) -> bool:
"""Envoyer un événement au serveur (avec identifiant machine)."""
@@ -361,14 +618,23 @@ class TraceStreamer:
logger.debug(f"Streaming Event échoué: {e}")
return False
def _send_image(self, path: str, shot_id: str) -> bool:
def _send_image(self, path: str, shot_id: str):
"""Envoyer un screenshot au serveur, compressé en JPEG.
Utilise un context manager pour le fallback PNG afin d'éviter
les fuites de descripteurs de fichier.
Partie A (purge après ACK) : en cas de HTTP 200 confirmé, le fichier
local est supprimé (le serveur devient la source de vérité).
Fix P0-E : retourne `ImageSendResult` (OK / FAILED / FILE_GONE).
Les appelants historiques qui attendaient un bool continuent de
fonctionner grâce à la truthiness du enum (OK → True, reste → False),
MAIS le drain du buffer doit désormais discriminer FILE_GONE pour
ne pas confondre "fichier disparu" avec "envoyé avec succès".
"""
if not self._server_available:
return False
return ImageSendResult.FAILED
try:
# Tenter la compression JPEG (réduction ~5-10x vs PNG)
jpeg_buf, content_type, suffix = self._compress_image_to_jpeg(path)
@@ -391,7 +657,10 @@ class TraceStreamer:
headers=self._auth_headers(),
timeout=5,
)
return resp.ok
if resp.ok:
self._purge_local_image(path)
return ImageSendResult.OK
return ImageSendResult.FAILED
else:
# Fallback : envoi PNG original avec context manager
with open(path, "rb") as f:
@@ -405,7 +674,20 @@ class TraceStreamer:
headers=self._auth_headers(),
timeout=5,
)
return resp.ok
if resp.ok:
self._purge_local_image(path)
return ImageSendResult.OK
return ImageSendResult.FAILED
except FileNotFoundError:
# Fix P0-E : fichier local disparu. On NE doit PAS considérer ça
# comme un succès HTTP 200. Le serveur n'a rien reçu. On signale
# `FILE_GONE` pour que le drain du buffer supprime l'entrée
# (pas de retry possible) tout en loguant ERROR (pas debug).
logger.error(
f"Image {shot_id} introuvable sur disque ({path}) — "
f"abandon (serveur n'a rien reçu)"
)
return ImageSendResult.FILE_GONE
except Exception as e:
logger.debug(f"Streaming Image échoué: {e}")
return False
return ImageSendResult.FAILED