diff --git a/agent_v0/server_v1/replay_failure_logger.py b/agent_v0/server_v1/replay_failure_logger.py new file mode 100644 index 000000000..3ef0f8320 --- /dev/null +++ b/agent_v0/server_v1/replay_failure_logger.py @@ -0,0 +1,143 @@ +# agent_v0/server_v1/replay_failure_logger.py +""" +Logger des echecs de replay pour l'apprentissage futur. + +Chaque echec de resolution visuelle (target_not_found) est sauvegarde dans un +fichier JSONL par session, avec le screenshot de ce que l'agent voit au moment +de l'echec. Ces donnees alimentent le learning loop : re-entrainement des +embeddings, ajustement des seuils, enrichissement des target_spec. + +Structure : + data/training/replay_failures/{replay_id}/failures.jsonl + data/training/replay_failures/{replay_id}/screenshots/{action_id}.jpg +""" + +import base64 +import json +import logging +import os +import threading +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger("replay_failure_logger") + +# Repertoire racine des echecs de replay +_FAILURES_BASE_DIR = Path("data/training/replay_failures") + +# Lock pour les ecritures concurrentes +_write_lock = threading.Lock() + + +def log_replay_failure( + replay_id: str, + action_id: str, + target_spec: Optional[Dict[str, Any]], + screenshot_b64: Optional[str], + resolution_attempts: Optional[List[Dict[str, Any]]] = None, + error: str = "target_not_found", + extra: Optional[Dict[str, Any]] = None, +) -> Optional[str]: + """Sauvegarder un echec de replay pour l'apprentissage futur. + + Args: + replay_id: Identifiant du replay en cours + action_id: Identifiant de l'action echouee + target_spec: Specification de la cible recherchee + screenshot_b64: Screenshot JPEG base64 de ce que l'agent voit + resolution_attempts: Liste des tentatives de resolution (methode, score, etc.) + error: Type d'erreur (defaut: "target_not_found") + extra: Champs supplementaires a stocker + + Returns: + Chemin du fichier JSONL cree, ou None en cas d'erreur. + """ + try: + # Creer le repertoire de la session + session_dir = _FAILURES_BASE_DIR / replay_id + session_dir.mkdir(parents=True, exist_ok=True) + + # Sauvegarder le screenshot si fourni + screenshot_path = None + if screenshot_b64: + screenshots_dir = session_dir / "screenshots" + screenshots_dir.mkdir(exist_ok=True) + screenshot_path = str(screenshots_dir / f"{action_id}.jpg") + try: + img_bytes = base64.b64decode(screenshot_b64) + with open(screenshot_path, "wb") as f: + f.write(img_bytes) + except Exception as e: + logger.warning(f"Impossible de sauvegarder le screenshot : {e}") + screenshot_path = None + + # Construire l'entree JSONL + entry = { + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + "replay_id": replay_id, + "action_id": action_id, + "target_spec": _sanitize_target_spec(target_spec) if target_spec else None, + "screenshot_path": screenshot_path, + "resolution_attempts": resolution_attempts or [], + "error": error, + } + if extra: + entry.update(extra) + + # Ecrire dans le fichier JSONL (thread-safe) + jsonl_path = session_dir / "failures.jsonl" + with _write_lock: + with open(jsonl_path, "a", encoding="utf-8") as f: + f.write(json.dumps(entry, ensure_ascii=False) + "\n") + + logger.info( + f"Echec replay loggue : replay={replay_id} action={action_id} " + f"error={error} -> {jsonl_path}" + ) + return str(jsonl_path) + + except Exception as e: + logger.error(f"Impossible de logger l'echec replay : {e}") + return None + + +def _sanitize_target_spec(target_spec: Dict[str, Any]) -> Dict[str, Any]: + """Nettoyer le target_spec pour le stockage (retirer les images base64 volumineuses).""" + cleaned = {} + for key, value in target_spec.items(): + # Ne pas stocker les images base64 (trop volumineux pour le JSONL) + if key.endswith("_base64") or key.endswith("_b64"): + cleaned[key] = f"<{len(str(value))} chars>" if value else None + else: + cleaned[key] = value + return cleaned + + +def get_failure_count(replay_id: str) -> int: + """Compter le nombre d'echecs pour un replay donne.""" + jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl" + if not jsonl_path.exists(): + return 0 + try: + with open(jsonl_path, "r", encoding="utf-8") as f: + return sum(1 for _ in f) + except Exception: + return 0 + + +def get_failures(replay_id: str) -> List[Dict[str, Any]]: + """Lire tous les echecs pour un replay donne.""" + jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl" + if not jsonl_path.exists(): + return [] + failures = [] + try: + with open(jsonl_path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if line: + failures.append(json.loads(line)) + except Exception as e: + logger.warning(f"Erreur lecture echecs replay {replay_id} : {e}") + return failures