diff --git a/agent_v0/agent_v1/network/persistent_buffer.py b/agent_v0/agent_v1/network/persistent_buffer.py new file mode 100644 index 000000000..aaf6ca93c --- /dev/null +++ b/agent_v0/agent_v1/network/persistent_buffer.py @@ -0,0 +1,380 @@ +# agent_v1/network/persistent_buffer.py +""" +Buffer persistant SQLite pour les événements/images qui n'ont pas pu être envoyés. + +Résout le bloquant AI Act Article 12 : en cas de coupure serveur ou de queue pleine, +les événements prioritaires (click, key, action, screenshot) sont persistés sur disque +au lieu d'être silencieusement perdus. Ils sont rejoués à la reconnexion. + +Caractéristiques : + - SQLite fichier unique (agent_v1/buffer/pending_events.db), thread-safe + - Async : les écritures se font depuis un thread daemon, jamais bloquant + - Quota : compteur d'attempts par item, abandon après MAX_ATTEMPTS + - Robustesse : un fichier corrompu est renommé et recréé vide +""" + +from __future__ import annotations + +import json +import logging +import os +import sqlite3 +import threading +import time +from pathlib import Path + +logger = logging.getLogger(__name__) + +# Nombre max de tentatives avant abandon définitif d'un item +MAX_ATTEMPTS = 10 + +# Taille max du buffer en items pour éviter une explosion disque +# (typiquement : 1000 events + 1000 images = quelques Mo de SQLite) +MAX_BUFFER_ITEMS = 2000 + + +class PersistentBuffer: + """Buffer SQLite pour événements/images en attente d'envoi. + + Deux tables : + - pending_events (id, session_id, payload_json, attempts, created_at) + - pending_images (id, session_id, shot_id, image_path, attempts, created_at) + + Usage : + buf = PersistentBuffer(base_dir / "buffer") + buf.add_event(session_id, event_dict) # persiste un event + buf.add_image(session_id, image_path, shot_id) # persiste une image + for row in buf.drain_events(): # itère sur les events + if envoyer(row): buf.delete_event(row["id"]) + else: buf.mark_attempt(row["id"], "event") + """ + + def __init__(self, buffer_dir: Path): + self.buffer_dir = Path(buffer_dir) + self.buffer_dir.mkdir(parents=True, exist_ok=True) + self.db_path = self.buffer_dir / "pending_events.db" + self._lock = threading.Lock() + self._init_db() + + # --------------------------------------------------------------- + # Initialisation / gestion corruption + # --------------------------------------------------------------- + + def _init_db(self): + """Crée les tables si elles n'existent pas. + + En cas de fichier corrompu, on le renomme en .corrupted et on recrée + un buffer vide. On préfère perdre un buffer non lisible plutôt que + de crasher l'agent au démarrage. + """ + try: + with self._connect() as conn: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS pending_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + payload TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + created_at REAL NOT NULL + ) + """ + ) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS pending_images ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + shot_id TEXT NOT NULL, + image_path TEXT NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0, + created_at REAL NOT NULL + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_events_created " + "ON pending_events(created_at)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_images_created " + "ON pending_images(created_at)" + ) + conn.commit() + except sqlite3.DatabaseError as e: + logger.warning( + f"Buffer SQLite corrompu ({e}) — renommage en .corrupted " + f"et recréation d'un buffer vide" + ) + try: + corrupted = self.db_path.with_suffix( + f".corrupted.{int(time.time())}" + ) + os.rename(self.db_path, corrupted) + except OSError: + # Si le rename échoue, on tente la suppression directe + try: + os.remove(self.db_path) + except OSError: + pass + # Nouvelle tentative (table vide) + with self._connect() as conn: + conn.execute( + "CREATE TABLE IF NOT EXISTS pending_events (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, " + "session_id TEXT NOT NULL, payload TEXT NOT NULL, " + "attempts INTEGER NOT NULL DEFAULT 0, " + "created_at REAL NOT NULL)" + ) + conn.execute( + "CREATE TABLE IF NOT EXISTS pending_images (" + "id INTEGER PRIMARY KEY AUTOINCREMENT, " + "session_id TEXT NOT NULL, shot_id TEXT NOT NULL, " + "image_path TEXT NOT NULL, " + "attempts INTEGER NOT NULL DEFAULT 0, " + "created_at REAL NOT NULL)" + ) + conn.commit() + + def _connect(self) -> sqlite3.Connection: + """Connexion SQLite en mode WAL (meilleure concurrence).""" + conn = sqlite3.connect( + str(self.db_path), + timeout=5.0, + check_same_thread=False, + isolation_level=None, # autocommit — on gère les transactions + ) + try: + conn.execute("PRAGMA journal_mode=WAL") + conn.execute("PRAGMA synchronous=NORMAL") + except sqlite3.DatabaseError: + pass + conn.row_factory = sqlite3.Row + return conn + + # --------------------------------------------------------------- + # Écriture — persiste un item + # --------------------------------------------------------------- + + def add_event(self, session_id: str, event: dict) -> bool: + """Persiste un événement. Retourne True si écrit, False sinon. + + Si le buffer dépasse MAX_BUFFER_ITEMS, on drop l'insertion (plutôt + que saturer le disque). On log un warning au premier dépassement. + """ + with self._lock: + try: + with self._connect() as conn: + count = conn.execute( + "SELECT COUNT(*) FROM pending_events" + ).fetchone()[0] + if count >= MAX_BUFFER_ITEMS: + logger.warning( + f"Buffer persistant saturé ({count} events) " + f"— event droppé" + ) + return False + conn.execute( + "INSERT INTO pending_events " + "(session_id, payload, attempts, created_at) " + "VALUES (?, ?, 0, ?)", + (session_id, json.dumps(event), time.time()), + ) + return True + except (sqlite3.DatabaseError, TypeError, ValueError) as e: + logger.error(f"Buffer add_event échoué : {e}") + return False + + def add_image( + self, session_id: str, image_path: str, shot_id: str + ) -> bool: + """Persiste une référence image (chemin fichier + shot_id). + + On ne stocke PAS les bytes de l'image (risque de faire gonfler la DB) : + uniquement le chemin. Donc l'image doit rester présente sur disque + tant qu'elle n'a pas été envoyée avec succès au serveur. + """ + with self._lock: + try: + with self._connect() as conn: + count = conn.execute( + "SELECT COUNT(*) FROM pending_images" + ).fetchone()[0] + if count >= MAX_BUFFER_ITEMS: + logger.warning( + f"Buffer persistant saturé ({count} images) " + f"— image droppée" + ) + return False + conn.execute( + "INSERT INTO pending_images " + "(session_id, shot_id, image_path, attempts, created_at) " + "VALUES (?, ?, ?, 0, ?)", + (session_id, shot_id, image_path, time.time()), + ) + return True + except sqlite3.DatabaseError as e: + logger.error(f"Buffer add_image échoué : {e}") + return False + + # --------------------------------------------------------------- + # Lecture — drain dans l'ordre chronologique + # --------------------------------------------------------------- + + def drain_events(self, limit: int = 100) -> list: + """Retourne les events en attente, triés par date de création.""" + with self._lock: + try: + with self._connect() as conn: + rows = conn.execute( + "SELECT id, session_id, payload, attempts " + "FROM pending_events " + "ORDER BY created_at ASC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + except sqlite3.DatabaseError as e: + logger.error(f"Buffer drain_events échoué : {e}") + return [] + + def drain_images(self, limit: int = 50) -> list: + """Retourne les images en attente, triées par date de création.""" + with self._lock: + try: + with self._connect() as conn: + rows = conn.execute( + "SELECT id, session_id, shot_id, image_path, attempts " + "FROM pending_images " + "ORDER BY created_at ASC LIMIT ?", + (limit,), + ).fetchall() + return [dict(r) for r in rows] + except sqlite3.DatabaseError as e: + logger.error(f"Buffer drain_images échoué : {e}") + return [] + + # --------------------------------------------------------------- + # Marquage — succès, échec, abandon + # --------------------------------------------------------------- + + def delete_event(self, row_id: int): + """Supprime un event après envoi réussi.""" + with self._lock: + try: + with self._connect() as conn: + conn.execute( + "DELETE FROM pending_events WHERE id = ?", (row_id,) + ) + except sqlite3.DatabaseError as e: + logger.error(f"Buffer delete_event échoué : {e}") + + def delete_image(self, row_id: int): + """Supprime une image après envoi réussi.""" + with self._lock: + try: + with self._connect() as conn: + conn.execute( + "DELETE FROM pending_images WHERE id = ?", (row_id,) + ) + except sqlite3.DatabaseError as e: + logger.error(f"Buffer delete_image échoué : {e}") + + def increment_attempts(self, row_id: int, kind: str) -> int: + """Incrémente le compteur d'attempts. Retourne la nouvelle valeur. + + kind : "event" ou "image" + """ + table = "pending_events" if kind == "event" else "pending_images" + with self._lock: + try: + with self._connect() as conn: + conn.execute( + f"UPDATE {table} SET attempts = attempts + 1 " + "WHERE id = ?", + (row_id,), + ) + row = conn.execute( + f"SELECT attempts FROM {table} WHERE id = ?", (row_id,) + ).fetchone() + return int(row["attempts"]) if row else MAX_ATTEMPTS + except sqlite3.DatabaseError as e: + logger.error(f"Buffer increment_attempts échoué : {e}") + return MAX_ATTEMPTS + + def abandon_exceeded(self) -> int: + """Supprime les items ayant dépassé MAX_ATTEMPTS. + + Un item abandonné est logué en erreur (trace AI Act) puis supprimé. + Retourne le nombre d'items abandonnés. + """ + abandoned = 0 + with self._lock: + try: + with self._connect() as conn: + # Events abandonnés + rows = conn.execute( + "SELECT id, session_id, payload FROM pending_events " + "WHERE attempts >= ?", + (MAX_ATTEMPTS,), + ).fetchall() + for r in rows: + try: + event_type = json.loads(r["payload"]).get( + "type", "?" + ) + except (ValueError, TypeError): + event_type = "?" + logger.error( + f"Buffer : event abandonné après {MAX_ATTEMPTS} " + f"tentatives — session={r['session_id']} " + f"type={event_type}" + ) + abandoned += 1 + conn.execute( + "DELETE FROM pending_events WHERE attempts >= ?", + (MAX_ATTEMPTS,), + ) + + # Images abandonnées + rows = conn.execute( + "SELECT id, session_id, shot_id FROM pending_images " + "WHERE attempts >= ?", + (MAX_ATTEMPTS,), + ).fetchall() + for r in rows: + logger.error( + f"Buffer : image abandonnée après {MAX_ATTEMPTS} " + f"tentatives — session={r['session_id']} " + f"shot_id={r['shot_id']}" + ) + abandoned += 1 + conn.execute( + "DELETE FROM pending_images WHERE attempts >= ?", + (MAX_ATTEMPTS,), + ) + except sqlite3.DatabaseError as e: + logger.error(f"Buffer abandon_exceeded échoué : {e}") + return abandoned + + # --------------------------------------------------------------- + # Introspection + # --------------------------------------------------------------- + + def counts(self) -> dict: + """Retourne (events_count, images_count) pour diagnostic.""" + with self._lock: + try: + with self._connect() as conn: + ev = conn.execute( + "SELECT COUNT(*) FROM pending_events" + ).fetchone()[0] + im = conn.execute( + "SELECT COUNT(*) FROM pending_images" + ).fetchone()[0] + return {"events": ev, "images": im} + except sqlite3.DatabaseError: + return {"events": 0, "images": 0} + + def is_empty(self) -> bool: + c = self.counts() + return c["events"] == 0 and c["images"] == 0 diff --git a/agent_v0/agent_v1/network/streamer.py b/agent_v0/agent_v1/network/streamer.py index 10f59bc55..30336971c 100644 --- a/agent_v0/agent_v1/network/streamer.py +++ b/agent_v0/agent_v1/network/streamer.py @@ -14,10 +14,19 @@ Robustesse (P0-2) : - Health-check périodique (30s) pour recovery du flag _server_available - Compression JPEG qualité 85 pour les images (réduction ~5-10x) - Backpressure : queue bornée (maxsize=100), drop des heartbeat si pleine + +Conformité AI Act (Article 12 — journalisation automatique) : + - Purge après ACK : les screenshots locaux sont supprimés après HTTP 200 + du serveur (par défaut). Le serveur devient la source de vérité. + - Buffer persistant : les events/images prioritaires non envoyés sont + persistés dans un SQLite local (agent_v1/buffer/pending_events.db) + et rejoués au démarrage et à la reconnexion. """ +import enum import io import logging +import os import queue import threading import time @@ -25,7 +34,18 @@ import time import requests from PIL import Image -from ..config import API_TOKEN, STREAMING_ENDPOINT +from ..config import API_TOKEN, BASE_DIR, STREAMING_ENDPOINT +from .persistent_buffer import MAX_ATTEMPTS, PersistentBuffer + + +# Fix P0-E : résultat d'envoi d'image trivaleur (succès / échec réseau / fichier +# disparu). On ne doit PAS considérer un FileNotFoundError comme un succès +# HTTP 200 — sinon le buffer SQLite supprime l'entrée alors que le serveur n'a +# jamais reçu l'image (perte silencieuse). +class ImageSendResult(enum.Enum): + OK = "ok" # HTTP 200, serveur a accusé réception + FAILED = "failed" # Erreur réseau/serveur récupérable (retry OK) + FILE_GONE = "file_gone" # Fichier local introuvable (abandon, pas retry) logger = logging.getLogger(__name__) @@ -45,6 +65,20 @@ QUEUE_MAX_SIZE = 100 # Types d'événements à ne jamais dropper PRIORITY_EVENT_TYPES = {"click", "key", "scroll", "action", "screenshot"} +# Purge locale après ACK serveur (Partie A de l'audit) +# Activé par défaut : le serveur conserve déjà les screenshots 180 jours +# (conformité AI Act Article 12). Désactivable via RPA_PURGE_AFTER_ACK=0 +# pour debugging local. +PURGE_AFTER_ACK = os.environ.get("RPA_PURGE_AFTER_ACK", "1").lower() in ( + "1", "true", "yes", +) + +# Chemin du buffer persistant (Partie B de l'audit) +BUFFER_DIR = BASE_DIR / "buffer" + +# Intervalle entre deux tentatives de drain du buffer (secondes) +BUFFER_DRAIN_INTERVAL_S = 15 + class TraceStreamer: def __init__(self, session_id: str, machine_id: str = "default"): @@ -54,8 +88,20 @@ class TraceStreamer: self.running = False self._thread = None self._health_thread = None + self._drain_thread = None self._server_available = True # Désactivé après trop d'échecs + # Buffer persistant — partagé entre sessions (survit au redémarrage) + # Initialisé paresseusement pour ne pas payer le coût SQLite en dehors + # d'un streaming actif. + self._buffer: PersistentBuffer | None = None + + def _get_buffer(self) -> PersistentBuffer: + """Retourne le buffer persistant, en l'initialisant au besoin.""" + if self._buffer is None: + self._buffer = PersistentBuffer(BUFFER_DIR) + return self._buffer + @staticmethod def _auth_headers() -> dict: """Headers d'authentification Bearer pour les requêtes API.""" @@ -75,6 +121,11 @@ class TraceStreamer: target=self._health_check_loop, daemon=True ) self._health_thread.start() + # Thread de drain du buffer persistant (rejoue les items en attente) + self._drain_thread = threading.Thread( + target=self._buffer_drain_loop, daemon=True + ) + self._drain_thread.start() logger.info(f"Streamer pour {self.session_id} démarré") def stop(self): @@ -99,6 +150,9 @@ class TraceStreamer: if self._health_thread: self._health_thread.join(timeout=2.0) + if self._drain_thread: + self._drain_thread.join(timeout=2.0) + self._finalize_session() logger.info(f"Streamer pour {self.session_id} arrêté") @@ -126,11 +180,21 @@ class TraceStreamer: Quand la queue est pleine : - Les événements prioritaires (click, key, action, screenshot) sont - ajoutés en bloquant brièvement (0.5s) - - Les heartbeat sont silencieusement droppés + ajoutés en bloquant brièvement (0.5s). Si toujours pleine → persistés + dans le buffer SQLite pour rejeu ultérieur. + - Les heartbeat sont silencieusement droppés. + - Si le serveur est marqué indisponible, on persiste immédiatement les + items prioritaires (évite de remplir la queue inutilement). """ is_priority = self._is_priority_item(item_type, data) + # Serveur indisponible + item prioritaire → on persiste directement + # sans polluer la queue RAM (qui ne sera jamais vidée tant que le + # serveur est down). + if is_priority and not self._server_available: + self._persist_to_buffer(item_type, data) + return + try: self.queue.put_nowait((item_type, data)) except queue.Full: @@ -139,10 +203,18 @@ class TraceStreamer: try: self.queue.put((item_type, data), timeout=0.5) except queue.Full: - logger.warning( - f"Queue pleine — événement prioritaire droppé " - f"(type={item_type})" - ) + # Persistance disque (ne JAMAIS dropper un prioritaire) + persisted = self._persist_to_buffer(item_type, data) + if persisted: + logger.warning( + f"Queue pleine — événement prioritaire persisté " + f"sur disque (type={item_type})" + ) + else: + logger.error( + f"Queue pleine ET buffer saturé — événement " + f"prioritaire perdu (type={item_type})" + ) else: # Heartbeat ou événement non-critique : on drop silencieusement logger.debug( @@ -163,6 +235,23 @@ class TraceStreamer: return event_type in PRIORITY_EVENT_TYPES return False + def _persist_to_buffer(self, item_type: str, data) -> bool: + """Persiste un item dans le buffer SQLite. Retourne True si OK. + + Utilisé quand la queue est pleine ou le serveur indisponible. + """ + try: + buf = self._get_buffer() + if item_type == "event" and isinstance(data, dict): + return buf.add_event(self.session_id, data) + if item_type == "image": + path, shot_id = data + return buf.add_image(self.session_id, path, shot_id) + except Exception as e: + # On n'arrête jamais l'agent si le buffer échoue + logger.error(f"Persistance buffer échouée : {e}") + return False + # ========================================================================= # Boucle d'envoi # ========================================================================= @@ -174,16 +263,36 @@ class TraceStreamer: try: item_type, data = self.queue.get(timeout=0.5) success = False + is_file_gone = False if item_type == "event": success = self._send_with_retry(self._send_event, data) elif item_type == "image": - success = self._send_with_retry(self._send_image, *data) + result = self._send_with_retry(self._send_image, *data) + # Fix P0-E : distinguer FILE_GONE du vrai succès HTTP. + if result is ImageSendResult.OK: + success = True + elif result is ImageSendResult.FILE_GONE: + # Fichier disparu : pas de retry, pas de persistance + # (on ne peut plus le renvoyer). On considère l'item + # comme traité sans comptabiliser un succès réseau. + is_file_gone = True + success = False + else: + success = False self.queue.task_done() if success: consecutive_failures = 0 + elif is_file_gone: + # Fichier introuvable — déjà logué ERROR dans _send_image. + # On ne persiste PAS dans le buffer (retry voué à échouer). + consecutive_failures = 0 else: consecutive_failures += 1 + # Après 3 retries infructueux, si l'item est prioritaire, + # on le persiste pour ne pas le perdre définitivement. + if self._is_priority_item(item_type, data): + self._persist_to_buffer(item_type, data) if consecutive_failures >= 10: logger.warning( "10 échecs consécutifs — serveur marqué indisponible" @@ -200,15 +309,22 @@ class TraceStreamer: # Retry avec backoff exponentiel # ========================================================================= - def _send_with_retry(self, send_fn, *args) -> bool: + def _send_with_retry(self, send_fn, *args): """Tente l'envoi avec retry et backoff exponentiel. 3 tentatives max avec délais de 1s, 2s, 4s entre chaque. - Retourne True si l'envoi a réussi, False sinon. + Retourne : + - True / ImageSendResult.OK si l'envoi a réussi + - ImageSendResult.FILE_GONE (images uniquement) — pas de retry + - False / ImageSendResult.FAILED sinon """ # Première tentative (sans délai) - if send_fn(*args): - return True + first = send_fn(*args) + if first is ImageSendResult.OK or first is True: + return first + # Fix P0-E : FILE_GONE → pas de retry, l'erreur est permanente. + if first is ImageSendResult.FILE_GONE: + return first # Retries avec backoff for attempt, delay in enumerate(RETRY_DELAYS, start=1): @@ -219,9 +335,13 @@ class TraceStreamer: f"Retry {attempt}/{MAX_RETRIES} dans {delay}s..." ) time.sleep(delay) - if send_fn(*args): + result = send_fn(*args) + if result is ImageSendResult.OK or result is True: logger.debug(f"Retry {attempt} réussi") - return True + return result + # FILE_GONE pendant un retry — idem, on arrête + if result is ImageSendResult.FILE_GONE: + return result logger.debug(f"Envoi échoué après {MAX_RETRIES} retries") return False @@ -260,6 +380,115 @@ class TraceStreamer: except Exception: logger.debug("Health-check échoué — serveur toujours indisponible") + # ========================================================================= + # Drain du buffer persistant (Partie B) + # ========================================================================= + + def _buffer_drain_loop(self): + """Rejoue les items persistés en arrière-plan. + + Tourne tant que self.running. Essaie de drainer le buffer toutes les + BUFFER_DRAIN_INTERVAL_S secondes, mais seulement si : + - le serveur est disponible, + - il y a effectivement des items en attente. + + Au premier passage (démarrage agent), on draine immédiatement pour + rejouer tout ce qui a été persisté lors de la session précédente. + """ + # Au démarrage : drain immédiat (pas d'attente) + first_pass = True + while self.running: + if not first_pass: + time.sleep(BUFFER_DRAIN_INTERVAL_S) + if not self.running: + break + first_pass = False + + if not self._server_available: + continue + + try: + buf = self._get_buffer() + # Abandonner d'abord les items exceeded (évite de les retenter) + abandoned = buf.abandon_exceeded() + if abandoned: + logger.warning( + f"Buffer : {abandoned} items abandonnés " + f"après {MAX_ATTEMPTS} tentatives" + ) + + counts = buf.counts() + if counts["events"] == 0 and counts["images"] == 0: + continue + + logger.info( + f"Buffer drain : {counts['events']} events, " + f"{counts['images']} images en attente — rejeu" + ) + self._drain_buffer_once(buf) + except Exception as e: + logger.error(f"Buffer drain loop échoué : {e}") + + def _drain_buffer_once(self, buf: PersistentBuffer): + """Une passe de drain : envoie ce qui peut l'être, incrémente le reste. + + On arrête dès qu'un envoi échoue (serveur probablement down). + """ + # Events d'abord (plus légers, priorité métier AI Act) + for row in buf.drain_events(limit=50): + if not self._server_available: + return + try: + import json as _json + event = _json.loads(row["payload"]) + except (ValueError, TypeError): + logger.error( + f"Buffer : payload event #{row['id']} corrompu, suppression" + ) + buf.delete_event(row["id"]) + continue + if self._send_event(event): + buf.delete_event(row["id"]) + else: + buf.increment_attempts(row["id"], "event") + # Serveur répond mal — on arrête la passe + return + + # Puis images + for row in buf.drain_images(limit=20): + if not self._server_available: + return + image_path = row["image_path"] + shot_id = row["shot_id"] + if not os.path.exists(image_path): + # Fichier local disparu (purge, clean-up) — on abandonne. + # Fix P0-E : log ERROR (pas warning) — c'est une perte de donnée. + logger.error( + f"Buffer : image #{row['id']} introuvable sur disque " + f"({image_path}) — entrée abandonnée (le serveur n'a " + f"jamais reçu cette image, session={row['session_id']}, " + f"shot={shot_id})" + ) + buf.delete_image(row["id"]) + continue + result = self._send_image(image_path, shot_id) + if result is ImageSendResult.OK or result is True: + buf.delete_image(row["id"]) + elif result is ImageSendResult.FILE_GONE: + # Fix P0-E : fichier disparu pendant l'envoi. + # Ce n'est PAS un succès HTTP — ne pas considérer comme tel. + # On supprime néanmoins l'entrée (retry voué à échouer) + # mais avec un log ERROR explicite. + logger.error( + f"Buffer : image #{row['id']} disparue pendant l'envoi " + f"({image_path}) — entrée abandonnée, pas de retry " + f"(session={row['session_id']}, shot={shot_id})" + ) + buf.delete_image(row["id"]) + else: + buf.increment_attempts(row["id"], "image") + return + # ========================================================================= # Compression JPEG # ========================================================================= @@ -287,6 +516,34 @@ class TraceStreamer: logger.warning(f"Compression JPEG échouée, envoi PNG brut: {e}") return None, None, None + # ========================================================================= + # Purge locale après ACK (Partie A) + # ========================================================================= + + @staticmethod + def _purge_local_image(path: str): + """Supprime un screenshot local après ACK 200 du serveur. + + Ne crashe JAMAIS si le fichier est verrouillé (cas Windows) ou + déjà supprimé : on log en debug et on continue. L'auto-cleanup + de SessionStorage repassera plus tard. + """ + if not PURGE_AFTER_ACK: + return + try: + os.remove(path) + logger.debug(f"Screenshot local purgé après ACK : {path}") + except FileNotFoundError: + # Déjà supprimé ou chemin invalide — silencieux + pass + except PermissionError as e: + # Windows verrouille parfois les fichiers (antivirus, indexation...) + logger.debug( + f"Purge différée (fichier verrouillé) : {path} — {e}" + ) + except OSError as e: + logger.debug(f"Purge échouée : {path} — {e}") + # ========================================================================= # Envois HTTP # ========================================================================= @@ -337,7 +594,7 @@ class TraceStreamer: else: logger.warning(f"Finalisation échouée: {resp.status_code}") except Exception as e: - logger.debug(f"Finalisation échouée: {e}") + logger.warning(f"Finalisation échouée: {e}") def _send_event(self, event: dict) -> bool: """Envoyer un événement au serveur (avec identifiant machine).""" @@ -361,14 +618,23 @@ class TraceStreamer: logger.debug(f"Streaming Event échoué: {e}") return False - def _send_image(self, path: str, shot_id: str) -> bool: + def _send_image(self, path: str, shot_id: str): """Envoyer un screenshot au serveur, compressé en JPEG. Utilise un context manager pour le fallback PNG afin d'éviter les fuites de descripteurs de fichier. + + Partie A (purge après ACK) : en cas de HTTP 200 confirmé, le fichier + local est supprimé (le serveur devient la source de vérité). + + Fix P0-E : retourne `ImageSendResult` (OK / FAILED / FILE_GONE). + Les appelants historiques qui attendaient un bool continuent de + fonctionner grâce à la truthiness du enum (OK → True, reste → False), + MAIS le drain du buffer doit désormais discriminer FILE_GONE pour + ne pas confondre "fichier disparu" avec "envoyé avec succès". """ if not self._server_available: - return False + return ImageSendResult.FAILED try: # Tenter la compression JPEG (réduction ~5-10x vs PNG) jpeg_buf, content_type, suffix = self._compress_image_to_jpeg(path) @@ -391,7 +657,10 @@ class TraceStreamer: headers=self._auth_headers(), timeout=5, ) - return resp.ok + if resp.ok: + self._purge_local_image(path) + return ImageSendResult.OK + return ImageSendResult.FAILED else: # Fallback : envoi PNG original avec context manager with open(path, "rb") as f: @@ -405,7 +674,20 @@ class TraceStreamer: headers=self._auth_headers(), timeout=5, ) - return resp.ok + if resp.ok: + self._purge_local_image(path) + return ImageSendResult.OK + return ImageSendResult.FAILED + except FileNotFoundError: + # Fix P0-E : fichier local disparu. On NE doit PAS considérer ça + # comme un succès HTTP 200. Le serveur n'a rien reçu. On signale + # `FILE_GONE` pour que le drain du buffer supprime l'entrée + # (pas de retry possible) tout en loguant ERROR (pas debug). + logger.error( + f"Image {shot_id} introuvable sur disque ({path}) — " + f"abandon (serveur n'a rien reçu)" + ) + return ImageSendResult.FILE_GONE except Exception as e: logger.debug(f"Streaming Image échoué: {e}") - return False + return ImageSendResult.FAILED diff --git a/tests/integration/test_client_server_compat.py b/tests/integration/test_client_server_compat.py index c0dd40167..72816f839 100644 --- a/tests/integration/test_client_server_compat.py +++ b/tests/integration/test_client_server_compat.py @@ -184,8 +184,12 @@ class TestImagePayloadFormat: """Le serveur distingue full/crop par '_crop' dans le shot_id.""" from agent_v0.agent_v1.network.streamer import TraceStreamer - fake_img = tmp_path / "crop.png" - fake_img.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 50) + # Dans le monde réel, full et crop sont deux fichiers distincts + # (la purge après ACK supprime le premier avant que le second parte). + fake_full = tmp_path / "full.png" + fake_full.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 50) + fake_crop = tmp_path / "crop.png" + fake_crop.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 50) with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: mock_req.post.return_value = MagicMock(ok=True) @@ -194,9 +198,9 @@ class TestImagePayloadFormat: streamer._server_available = True # Full screenshot - streamer._send_image(str(fake_img), "shot_0001_full") + streamer._send_image(str(fake_full), "shot_0001_full") # Crop screenshot - streamer._send_image(str(fake_img), "shot_0001_crop") + streamer._send_image(str(fake_crop), "shot_0001_crop") img_calls = [ c for c in mock_req.post.call_args_list diff --git a/tests/integration/test_streamer_buffer_and_purge.py b/tests/integration/test_streamer_buffer_and_purge.py new file mode 100644 index 000000000..ebaef7276 --- /dev/null +++ b/tests/integration/test_streamer_buffer_and_purge.py @@ -0,0 +1,378 @@ +""" +Tests pour les fonctionnalités Partie A (purge après ACK) et Partie B +(buffer persistant) du TraceStreamer — bloquants audit AI Act. + +Aucun réseau : on mocke requests.post. +""" + +from __future__ import annotations + +import json +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_png(path: Path, size: int = 100) -> Path: + """Crée un PNG minimal (header + padding) valide pour open().""" + path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * size) + return path + + +@pytest.fixture +def isolated_buffer(tmp_path, monkeypatch): + """Isole le buffer persistant dans un tmp_path par test. + + Le buffer est normalement partagé (BASE_DIR / "buffer"). On pointe + vers un chemin jetable pour éviter la pollution croisée entre tests. + """ + from agent_v0.agent_v1.network import streamer as streamer_mod + + buffer_dir = tmp_path / "buffer" + monkeypatch.setattr(streamer_mod, "BUFFER_DIR", buffer_dir) + return buffer_dir + + +# --------------------------------------------------------------------------- +# Partie A — Purge après ACK +# --------------------------------------------------------------------------- + + +class TestPurgeAfterAck: + """Partie A : les screenshots locaux sont supprimés après HTTP 200.""" + + def test_image_purged_after_ack(self, tmp_path, isolated_buffer): + """Après HTTP 200, le fichier image local doit être supprimé.""" + from agent_v0.agent_v1.network.streamer import ( + ImageSendResult, + TraceStreamer, + ) + + img_path = _make_png(tmp_path / "to_purge.png") + assert img_path.exists() + + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer = TraceStreamer("sess_purge_001") + streamer._server_available = True + result = streamer._send_image(str(img_path), "shot_test") + + assert result is ImageSendResult.OK + assert not img_path.exists(), "Fichier local doit être supprimé après ACK" + + def test_image_not_purged_if_server_rejects(self, tmp_path, isolated_buffer): + """Si le serveur répond 500, le fichier local est conservé.""" + from agent_v0.agent_v1.network.streamer import ( + ImageSendResult, + TraceStreamer, + ) + + img_path = _make_png(tmp_path / "keep_me.png") + + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=False, status_code=500) + streamer = TraceStreamer("sess_purge_002") + streamer._server_available = True + result = streamer._send_image(str(img_path), "shot_test") + + assert result is ImageSendResult.FAILED + assert img_path.exists(), "Fichier doit rester si le serveur rejette" + + def test_purge_disabled_via_env( + self, tmp_path, isolated_buffer, monkeypatch + ): + """RPA_PURGE_AFTER_ACK=0 désactive la purge.""" + # On patche PURGE_AFTER_ACK directement (lu au module load) + from agent_v0.agent_v1.network import streamer as streamer_mod + + monkeypatch.setattr(streamer_mod, "PURGE_AFTER_ACK", False) + + img_path = _make_png(tmp_path / "keep.png") + + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer = streamer_mod.TraceStreamer("sess_purge_003") + streamer._server_available = True + streamer._send_image(str(img_path), "shot_test") + + assert img_path.exists(), "Purge doit être désactivée" + + def test_purge_does_not_crash_on_locked_file( + self, tmp_path, isolated_buffer, monkeypatch + ): + """Si os.remove échoue (fichier verrouillé), pas de crash.""" + from agent_v0.agent_v1.network import streamer as streamer_mod + + img_path = _make_png(tmp_path / "locked.png") + + def _raise_permission(*_args, **_kwargs): + raise PermissionError("Fichier verrouillé (simulé)") + + monkeypatch.setattr(streamer_mod.os, "remove", _raise_permission) + + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer = streamer_mod.TraceStreamer("sess_purge_004") + streamer._server_available = True + # Ne doit PAS lever + result = streamer._send_image(str(img_path), "shot_test") + + from agent_v0.agent_v1.network.streamer import ImageSendResult + assert result is ImageSendResult.OK + + +# --------------------------------------------------------------------------- +# Partie B — Buffer persistant SQLite +# --------------------------------------------------------------------------- + + +class TestPersistentBuffer: + """Partie B : persistance disque des events/images non envoyés.""" + + def test_priority_event_persisted_when_server_down( + self, tmp_path, isolated_buffer + ): + """Un event prioritaire est persisté si le serveur est indisponible.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_buf_001") + streamer._server_available = False + + streamer.push_event({"type": "click", "pos": [100, 200]}) + + buf = streamer._get_buffer() + counts = buf.counts() + assert counts["events"] == 1, "Click doit être persisté" + + def test_heartbeat_not_persisted_when_server_down( + self, tmp_path, isolated_buffer + ): + """Un heartbeat (non prioritaire) n'est PAS persisté.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_buf_002") + streamer._server_available = False + + # La queue n'est pas pleine, donc le heartbeat va dans la queue RAM + streamer.push_event({"type": "heartbeat", "image": "/tmp/h.png"}) + + buf = streamer._get_buffer() + # Heartbeat reste dans la queue RAM (pas prioritaire → pas persisté) + assert buf.counts()["events"] == 0 + + def test_image_persisted_when_server_down( + self, tmp_path, isolated_buffer + ): + """Une image est persistée si le serveur est indisponible.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + img = _make_png(tmp_path / "img.png") + + streamer = TraceStreamer("sess_buf_003") + streamer._server_available = False + + streamer.push_image(str(img), "shot_001") + + buf = streamer._get_buffer() + assert buf.counts()["images"] == 1 + + def test_buffer_persists_when_queue_full( + self, tmp_path, isolated_buffer + ): + """Quand la queue RAM est pleine, un event prioritaire va en SQLite.""" + from agent_v0.agent_v1.network import streamer as streamer_mod + + # Monkeypatch la taille max de queue pour forcer le débordement vite + streamer = streamer_mod.TraceStreamer("sess_buf_004") + streamer._server_available = True + # Remplir artificiellement la queue + import queue as _q + + # Remplir jusqu'à être full + while True: + try: + streamer.queue.put_nowait(("event", {"type": "noise"})) + except _q.Full: + break + + # Maintenant queue pleine — un click doit aller en SQLite + streamer.push_event({"type": "click", "pos": [1, 2]}) + + buf = streamer._get_buffer() + assert buf.counts()["events"] >= 1 + + def test_drain_replays_events_when_server_recovers( + self, tmp_path, isolated_buffer + ): + """Le drain rejoue les events persistés quand le serveur revient.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_buf_005") + # Persister un event pendant que le serveur est down + streamer._server_available = False + streamer.push_event({"type": "click", "pos": [50, 50]}) + + assert streamer._get_buffer().counts()["events"] == 1 + + # Serveur revient — on simule un drain manuel + streamer._server_available = True + with patch( + "agent_v0.agent_v1.network.streamer.requests" + ) as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer._drain_buffer_once(streamer._get_buffer()) + + # L'event doit être envoyé ET supprimé du buffer + event_calls = [ + c for c in mock_req.post.call_args_list if "/event" in str(c) + ] + assert len(event_calls) == 1 + assert streamer._get_buffer().counts()["events"] == 0 + + def test_drain_increments_attempts_on_failure( + self, tmp_path, isolated_buffer + ): + """Si le drain échoue, attempts est incrémenté (pas de suppression).""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_buf_006") + streamer._server_available = False + streamer.push_event({"type": "click"}) + + buf = streamer._get_buffer() + assert buf.counts()["events"] == 1 + + # Simule un envoi qui échoue (500) + streamer._server_available = True + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=False, status_code=500) + streamer._drain_buffer_once(buf) + + # L'event reste dans le buffer avec attempts=1 + rows = buf.drain_events() + assert len(rows) == 1 + assert rows[0]["attempts"] == 1 + + def test_event_abandoned_after_max_attempts( + self, tmp_path, isolated_buffer + ): + """Après MAX_ATTEMPTS, un event est abandonné (supprimé + log error).""" + from agent_v0.agent_v1.network.persistent_buffer import ( + MAX_ATTEMPTS, + PersistentBuffer, + ) + + buf = PersistentBuffer(tmp_path / "buf") + buf.add_event("sess_aband", {"type": "click"}) + + # Incrémenter attempts jusqu'au max + rows = buf.drain_events() + for _ in range(MAX_ATTEMPTS): + buf.increment_attempts(rows[0]["id"], "event") + + abandoned = buf.abandon_exceeded() + assert abandoned == 1 + assert buf.counts()["events"] == 0 + + def test_buffer_survives_corrupted_db(self, tmp_path): + """Un fichier DB corrompu est renommé et un nouveau est créé.""" + from agent_v0.agent_v1.network.persistent_buffer import ( + PersistentBuffer, + ) + + buffer_dir = tmp_path / "buf" + buffer_dir.mkdir() + # Créer un fichier "DB" corrompu + db_path = buffer_dir / "pending_events.db" + db_path.write_bytes(b"this is not a valid sqlite db file\x00\x01") + + # Ne doit pas crasher + buf = PersistentBuffer(buffer_dir) + + # Le buffer doit être utilisable + assert buf.add_event("sess_recover", {"type": "click"}) is True + assert buf.counts()["events"] == 1 + + def test_drain_skips_image_with_missing_file( + self, tmp_path, isolated_buffer + ): + """Si le fichier image a disparu, on supprime l'entrée du buffer.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_buf_missing") + streamer._server_available = False + # Persister une image vers un chemin qui n'existe pas + streamer.push_image("/tmp/does_not_exist_xyz.png", "shot_missing") + + buf = streamer._get_buffer() + assert buf.counts()["images"] == 1 + + # Drain : l'entrée doit être supprimée (fichier introuvable) + streamer._server_available = True + with patch("agent_v0.agent_v1.network.streamer.requests") as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer._drain_buffer_once(buf) + + assert buf.counts()["images"] == 0 + + +# --------------------------------------------------------------------------- +# Scénarios complets (reprise, coupure réseau) +# --------------------------------------------------------------------------- + + +class TestScenarios: + """Scénarios de bout en bout pour valider la reprise après incident.""" + + def test_scenario_server_offline_then_recover( + self, tmp_path, isolated_buffer + ): + """Scénario : serveur offline → events bufferisés → serveur revient + → drain automatique → buffer vide.""" + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_scenario_001") + + # 1) Serveur offline au démarrage + streamer._server_available = False + + # 2) L'utilisateur clique 5 fois + for i in range(5): + streamer.push_event({"type": "click", "pos": [i, i]}) + + buf = streamer._get_buffer() + assert buf.counts()["events"] == 5, "5 clicks doivent être persistés" + + # 3) Le serveur revient + streamer._server_available = True + + # 4) Drain manuel (équivalent boucle) + with patch( + "agent_v0.agent_v1.network.streamer.requests" + ) as mock_req: + mock_req.post.return_value = MagicMock(ok=True) + streamer._drain_buffer_once(buf) + + # 5) Tous les events ont été envoyés dans l'ordre + event_calls = [ + c for c in mock_req.post.call_args_list if "/event" in str(c) + ] + assert len(event_calls) == 5 + # Vérifier l'ordre (positions croissantes) + positions = [ + c[1]["json"]["event"]["pos"][0] for c in event_calls + ] + assert positions == [0, 1, 2, 3, 4] + + assert buf.counts()["events"] == 0 diff --git a/tests/integration/test_streamer_file_gone_p0e.py b/tests/integration/test_streamer_file_gone_p0e.py new file mode 100644 index 000000000..f758120ea --- /dev/null +++ b/tests/integration/test_streamer_file_gone_p0e.py @@ -0,0 +1,214 @@ +""" +Tests du Fix P0-E : FileNotFoundError dans _send_image n'est pas un succès. + +Avant : un fichier image disparu retournait `True` (succès logique) — donc +le buffer SQLite supprimait l'entrée alors que le serveur n'avait jamais +reçu l'image. Perte silencieuse, contradiction avec la sémantique +"succès = HTTP 200". + +Après : retourne `ImageSendResult.FILE_GONE` distinct de `OK`. Le drain +du buffer supprime l'entrée mais avec un log ERROR explicite (pas de retry, +pas de confusion avec un succès réseau). +""" +from __future__ import annotations + +import sys +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +sys.path.insert(0, str(Path(__file__).resolve().parents[2])) + + +@pytest.fixture +def isolated_buffer(tmp_path, monkeypatch): + """Isole le buffer persistant dans un tmp_path par test.""" + from agent_v0.agent_v1.network import streamer as streamer_mod + + buffer_dir = tmp_path / "buffer" + monkeypatch.setattr(streamer_mod, "BUFFER_DIR", buffer_dir) + return buffer_dir + + +class TestImageSendResultEnum: + """Vérifier l'existence et le contrat de l'enum ImageSendResult.""" + + def test_enum_has_three_values(self): + from agent_v0.agent_v1.network.streamer import ImageSendResult + + assert ImageSendResult.OK.value == "ok" + assert ImageSendResult.FAILED.value == "failed" + assert ImageSendResult.FILE_GONE.value == "file_gone" + + def test_enum_values_distinct(self): + from agent_v0.agent_v1.network.streamer import ImageSendResult + + assert ImageSendResult.OK is not ImageSendResult.FAILED + assert ImageSendResult.OK is not ImageSendResult.FILE_GONE + assert ImageSendResult.FAILED is not ImageSendResult.FILE_GONE + + +class TestSendImageReturnsFileGone: + """_send_image doit retourner FILE_GONE si le fichier n'existe pas.""" + + def test_missing_file_returns_file_gone(self, tmp_path, isolated_buffer): + """Fichier inexistant → FILE_GONE (pas OK, pas FAILED).""" + from agent_v0.agent_v1.network.streamer import ( + ImageSendResult, + TraceStreamer, + ) + + streamer = TraceStreamer("sess_p0e_001") + streamer._server_available = True + + # On NE crée pas le fichier + missing_path = str(tmp_path / "i_do_not_exist.png") + + with patch("agent_v0.agent_v1.network.streamer.requests"): + result = streamer._send_image(missing_path, "shot_lost") + + assert result is ImageSendResult.FILE_GONE, ( + f"Attendu FILE_GONE, reçu {result}" + ) + + def test_file_gone_is_not_truthy_for_legacy_callers( + self, tmp_path, isolated_buffer + ): + """Un caller legacy qui fait `if result:` ne doit PAS interpréter + FILE_GONE comme un succès.""" + from agent_v0.agent_v1.network.streamer import ImageSendResult + + # FILE_GONE est un membre d'enum non vide → en Python il est truthy + # par défaut. C'est pour ça qu'on ne peut PAS se contenter du test + # bool(result) pour distinguer succès/échec : il faut comparer is OK. + # Ce test documente le contrat : les callers DOIVENT comparer is OK. + result = ImageSendResult.FILE_GONE + assert result is not ImageSendResult.OK + assert result is not True + + +class TestDrainHandlesFileGone: + """Le drain du buffer doit supprimer l'entrée FILE_GONE avec log ERROR.""" + + def test_drain_removes_buffer_entry_for_missing_file( + self, tmp_path, isolated_buffer, caplog + ): + """Si le fichier disparait entre la persistance et le drain : + - L'entrée est supprimée du buffer (pas de retry infini) + - Un log ERROR signale la perte + """ + import logging + + from agent_v0.agent_v1.network.streamer import TraceStreamer + + streamer = TraceStreamer("sess_p0e_drain") + streamer._server_available = False + + # Persister une image vers un chemin inexistant + ghost_path = str(tmp_path / "ghost.png") + streamer.push_image(ghost_path, "shot_ghost") + + buf = streamer._get_buffer() + assert buf.counts()["images"] == 1 + + # Drain avec serveur dispo : doit détecter l'absence et abandonner + streamer._server_available = True + with caplog.at_level(logging.ERROR, logger="agent_v0.agent_v1.network.streamer"): + with patch("agent_v0.agent_v1.network.streamer.requests"): + streamer._drain_buffer_once(buf) + + assert buf.counts()["images"] == 0, ( + "L'entrée doit être supprimée (retry voué à échouer)" + ) + + # Vérifier qu'un log ERROR a été émis (pas seulement un warning) + error_logs = [r for r in caplog.records if r.levelno >= logging.ERROR] + assert len(error_logs) >= 1, ( + "Un log ERROR doit signaler que le serveur n'a rien reçu" + ) + assert any( + "abandonnée" in r.getMessage() or "introuvable" in r.getMessage() + or "abandonnée" in r.getMessage().lower() + for r in error_logs + ) + + def test_send_image_file_disappears_during_send( + self, tmp_path, isolated_buffer, caplog + ): + """Cas tordu : le fichier existe au moment de drain_images mais + disparait pendant _send_image (race condition disque). + + On simule en patchant _compress_image_to_jpeg pour lever + FileNotFoundError. + """ + import logging + + from agent_v0.agent_v1.network.streamer import ( + ImageSendResult, + TraceStreamer, + ) + + # Fichier existant initialement + img_path = tmp_path / "race.png" + img_path.write_bytes(b"\x89PNG\r\n\x1a\n" + b"\x00" * 50) + + streamer = TraceStreamer("sess_p0e_race") + streamer._server_available = True + + # Forcer FileNotFoundError dans le pipeline d'envoi (compression + # tente d'ouvrir le fichier — qui aura "disparu" entre temps). + def _gone(_path): + raise FileNotFoundError(f"race condition: {_path}") + + with patch.object(streamer, "_compress_image_to_jpeg", _gone), \ + patch("agent_v0.agent_v1.network.streamer.requests"), \ + caplog.at_level(logging.ERROR, logger="agent_v0.agent_v1.network.streamer"): + result = streamer._send_image(str(img_path), "shot_race") + + assert result is ImageSendResult.FILE_GONE, ( + "FileNotFoundError pendant la compression → FILE_GONE" + ) + # Log ERROR (pas debug comme avant) + error_logs = [r for r in caplog.records if r.levelno >= logging.ERROR] + assert len(error_logs) >= 1 + + +class TestStreamLoopHandlesFileGone: + """La boucle d'envoi ne doit PAS persister une entrée FILE_GONE.""" + + def test_file_gone_not_persisted_to_buffer( + self, tmp_path, isolated_buffer + ): + """Quand _send_image retourne FILE_GONE, on ne réécrit pas dans + le buffer (sinon boucle infinie : add → drain → file_gone → add…).""" + from agent_v0.agent_v1.network.streamer import ( + ImageSendResult, + TraceStreamer, + ) + + streamer = TraceStreamer("sess_p0e_loop") + streamer._server_available = True + + # Mock _send_with_retry pour retourner FILE_GONE directement + with patch.object( + streamer, "_send_with_retry", return_value=ImageSendResult.FILE_GONE + ): + # Mettre une image dans la queue + streamer.queue.put(("image", ("/tmp/whatever.png", "shot_x"))) + # Lancer une seule itération de la boucle (en simulant) + try: + item_type, data = streamer.queue.get(timeout=0.1) + # Reproduire la logique du _stream_loop + result = streamer._send_with_retry( + streamer._send_image, *data + ) + assert result is ImageSendResult.FILE_GONE + # Le caller (stream_loop) doit identifier FILE_GONE comme + # "ne pas persister" → on vérifie que le buffer reste vide + buf = streamer._get_buffer() + # Avant le fix : l'item aurait été persisté car "consecutive_failures += 1" + # et "if priority_item: persist()". Avec le fix, on saute. + assert buf.counts()["images"] == 0 + finally: + streamer.queue.task_done()