chore: ajouter replay_failure_logger.py au tracking git
Ce fichier existe sur disque depuis le 4 avril mais n'a jamais été ajouté à git. Il est importé par api_stream.py (ligne 29) — un fresh clone sans ce fichier ne peut pas démarrer le serveur streaming. Découvert par le project-quality-guardian lors de l'audit global du 11 avril (item C1, priorité P0 bloquant absolu). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
143
agent_v0/server_v1/replay_failure_logger.py
Normal file
143
agent_v0/server_v1/replay_failure_logger.py
Normal file
@@ -0,0 +1,143 @@
|
||||
# agent_v0/server_v1/replay_failure_logger.py
|
||||
"""
|
||||
Logger des echecs de replay pour l'apprentissage futur.
|
||||
|
||||
Chaque echec de resolution visuelle (target_not_found) est sauvegarde dans un
|
||||
fichier JSONL par session, avec le screenshot de ce que l'agent voit au moment
|
||||
de l'echec. Ces donnees alimentent le learning loop : re-entrainement des
|
||||
embeddings, ajustement des seuils, enrichissement des target_spec.
|
||||
|
||||
Structure :
|
||||
data/training/replay_failures/{replay_id}/failures.jsonl
|
||||
data/training/replay_failures/{replay_id}/screenshots/{action_id}.jpg
|
||||
"""
|
||||
|
||||
import base64
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger("replay_failure_logger")
|
||||
|
||||
# Repertoire racine des echecs de replay
|
||||
_FAILURES_BASE_DIR = Path("data/training/replay_failures")
|
||||
|
||||
# Lock pour les ecritures concurrentes
|
||||
_write_lock = threading.Lock()
|
||||
|
||||
|
||||
def log_replay_failure(
|
||||
replay_id: str,
|
||||
action_id: str,
|
||||
target_spec: Optional[Dict[str, Any]],
|
||||
screenshot_b64: Optional[str],
|
||||
resolution_attempts: Optional[List[Dict[str, Any]]] = None,
|
||||
error: str = "target_not_found",
|
||||
extra: Optional[Dict[str, Any]] = None,
|
||||
) -> Optional[str]:
|
||||
"""Sauvegarder un echec de replay pour l'apprentissage futur.
|
||||
|
||||
Args:
|
||||
replay_id: Identifiant du replay en cours
|
||||
action_id: Identifiant de l'action echouee
|
||||
target_spec: Specification de la cible recherchee
|
||||
screenshot_b64: Screenshot JPEG base64 de ce que l'agent voit
|
||||
resolution_attempts: Liste des tentatives de resolution (methode, score, etc.)
|
||||
error: Type d'erreur (defaut: "target_not_found")
|
||||
extra: Champs supplementaires a stocker
|
||||
|
||||
Returns:
|
||||
Chemin du fichier JSONL cree, ou None en cas d'erreur.
|
||||
"""
|
||||
try:
|
||||
# Creer le repertoire de la session
|
||||
session_dir = _FAILURES_BASE_DIR / replay_id
|
||||
session_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Sauvegarder le screenshot si fourni
|
||||
screenshot_path = None
|
||||
if screenshot_b64:
|
||||
screenshots_dir = session_dir / "screenshots"
|
||||
screenshots_dir.mkdir(exist_ok=True)
|
||||
screenshot_path = str(screenshots_dir / f"{action_id}.jpg")
|
||||
try:
|
||||
img_bytes = base64.b64decode(screenshot_b64)
|
||||
with open(screenshot_path, "wb") as f:
|
||||
f.write(img_bytes)
|
||||
except Exception as e:
|
||||
logger.warning(f"Impossible de sauvegarder le screenshot : {e}")
|
||||
screenshot_path = None
|
||||
|
||||
# Construire l'entree JSONL
|
||||
entry = {
|
||||
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"),
|
||||
"replay_id": replay_id,
|
||||
"action_id": action_id,
|
||||
"target_spec": _sanitize_target_spec(target_spec) if target_spec else None,
|
||||
"screenshot_path": screenshot_path,
|
||||
"resolution_attempts": resolution_attempts or [],
|
||||
"error": error,
|
||||
}
|
||||
if extra:
|
||||
entry.update(extra)
|
||||
|
||||
# Ecrire dans le fichier JSONL (thread-safe)
|
||||
jsonl_path = session_dir / "failures.jsonl"
|
||||
with _write_lock:
|
||||
with open(jsonl_path, "a", encoding="utf-8") as f:
|
||||
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
|
||||
|
||||
logger.info(
|
||||
f"Echec replay loggue : replay={replay_id} action={action_id} "
|
||||
f"error={error} -> {jsonl_path}"
|
||||
)
|
||||
return str(jsonl_path)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Impossible de logger l'echec replay : {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _sanitize_target_spec(target_spec: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Nettoyer le target_spec pour le stockage (retirer les images base64 volumineuses)."""
|
||||
cleaned = {}
|
||||
for key, value in target_spec.items():
|
||||
# Ne pas stocker les images base64 (trop volumineux pour le JSONL)
|
||||
if key.endswith("_base64") or key.endswith("_b64"):
|
||||
cleaned[key] = f"<{len(str(value))} chars>" if value else None
|
||||
else:
|
||||
cleaned[key] = value
|
||||
return cleaned
|
||||
|
||||
|
||||
def get_failure_count(replay_id: str) -> int:
|
||||
"""Compter le nombre d'echecs pour un replay donne."""
|
||||
jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl"
|
||||
if not jsonl_path.exists():
|
||||
return 0
|
||||
try:
|
||||
with open(jsonl_path, "r", encoding="utf-8") as f:
|
||||
return sum(1 for _ in f)
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
|
||||
def get_failures(replay_id: str) -> List[Dict[str, Any]]:
|
||||
"""Lire tous les echecs pour un replay donne."""
|
||||
jsonl_path = _FAILURES_BASE_DIR / replay_id / "failures.jsonl"
|
||||
if not jsonl_path.exists():
|
||||
return []
|
||||
failures = []
|
||||
try:
|
||||
with open(jsonl_path, "r", encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
failures.append(json.loads(line))
|
||||
except Exception as e:
|
||||
logger.warning(f"Erreur lecture echecs replay {replay_id} : {e}")
|
||||
return failures
|
||||
Reference in New Issue
Block a user