# agent_v0/server_v1/session_worker.py """ SessionWorker — Traitement asynchrone des sessions finalisées en arrière-plan. Résout le problème de finalize qui retourne "insufficient_data" : l'analyse VLM prend plusieurs minutes, et le client n'attend plus. Le worker traite les sessions à son rythme et notifie quand le workflow est prêt. Tourne dans un thread daemon. Traite une session à la fois. Pour chaque session : 1. Analyse les screenshots via ScreenAnalyzer + VLM 2. Construit le workflow via GraphBuilder 3. Sauvegarde le workflow 4. Notifie que c'est prêt (callback) """ import logging import threading import time from datetime import datetime from typing import Any, Callable, Dict, List, Optional logger = logging.getLogger("session_worker") class SessionWorker: """Worker qui traite les sessions finalisées en arrière-plan. Tourne dans un thread daemon. Traite une session à la fois. Pour chaque session : 1. Analyse les screenshots via ScreenAnalyzer + VLM 2. Construit le workflow via GraphBuilder 3. Sauvegarde le workflow 4. Notifie que c'est prêt (callback on_complete) """ def __init__(self, processor, poll_interval: int = 10): """ Args: processor: Instance de StreamProcessor partagée avec l'API. poll_interval: Intervalle de polling en secondes quand la queue est vide. """ from .stream_processor import StreamProcessor self._processor: StreamProcessor = processor self._queue: List[str] = [] # session_ids à traiter self._lock = threading.Lock() self._running = False self._current_session: Optional[str] = None self._current_progress: Optional[Dict[str, Any]] = None self._on_complete: Optional[Callable[[str, Dict[str, Any]], None]] = None self._poll_interval = poll_interval # Historique des traitements (succès et échecs) self._completed: List[Dict[str, Any]] = [] self._failed: List[Dict[str, Any]] = [] self._thread: Optional[threading.Thread] = None def start(self): """Démarre le worker dans un thread daemon.""" if self._running: logger.warning("[WORKER] Déjà en cours d'exécution") return self._running = True self._thread = threading.Thread( target=self._process_loop, name="SessionWorker", daemon=True, ) self._thread.start() logger.info("[WORKER] Démarré — traitement asynchrone des sessions finalisées") def stop(self): """Arrête proprement le worker.""" self._running = False if self._thread and self._thread.is_alive(): self._thread.join(timeout=10) logger.info("[WORKER] Arrêté") def enqueue(self, session_id: str): """Ajoute une session à la file d'attente. Évite les doublons : si la session est déjà dans la queue ou en cours de traitement, elle n'est pas ré-ajoutée. """ with self._lock: if session_id in self._queue: logger.info(f"[WORKER] Session {session_id} déjà dans la queue, skip") return if self._current_session == session_id: logger.info(f"[WORKER] Session {session_id} en cours de traitement, skip") return # Vérifier si déjà traitée avec succès for item in self._completed: if item.get("session_id") == session_id: logger.info(f"[WORKER] Session {session_id} déjà traitée avec succès, skip") return self._queue.append(session_id) logger.info( f"[WORKER] Session {session_id} ajoutée à la queue " f"(position {len(self._queue)})" ) def get_status(self) -> Dict[str, Any]: """Retourne l'état complet du worker.""" with self._lock: return { "running": self._running, "queue_length": len(self._queue), "queue": list(self._queue), "current_session": self._current_session, "current_progress": dict(self._current_progress) if self._current_progress else None, "completed_count": len(self._completed), "completed": list(self._completed[-10:]), # 10 derniers "failed_count": len(self._failed), "failed": list(self._failed[-10:]), # 10 derniers } def _dequeue(self) -> Optional[str]: """Retire et retourne le prochain session_id de la queue.""" with self._lock: if self._queue: return self._queue.pop(0) return None def _process_loop(self): """Boucle principale — prend la prochaine session et la traite.""" logger.info("[WORKER] Boucle de traitement démarrée") while self._running: session_id = self._dequeue() if session_id: self._process_session(session_id) else: time.sleep(self._poll_interval) def _process_session(self, session_id: str): """Traite une session complète (analyse screenshots + build workflow). Utilise StreamProcessor.reprocess_session() qui : 1. Liste les screenshots shot_*_full.png sur disque 2. Appelle process_screenshot() pour chaque (VLM + CLIP) 3. Appelle finalize_session() pour construire le workflow """ with self._lock: self._current_session = session_id self._current_progress = { "session_id": session_id, "status": "starting", "started_at": datetime.now().isoformat(), "screenshots_total": 0, "screenshots_processed": 0, } logger.info(f"[WORKER] === Début traitement session {session_id} ===") start_time = time.time() try: result = self._processor.reprocess_session( session_id, progress_callback=self._update_progress, ) elapsed = time.time() - start_time if result.get("error"): # Erreur pendant le traitement logger.error( f"[WORKER] Échec traitement session {session_id} " f"après {elapsed:.1f}s : {result['error']}" ) with self._lock: self._failed.append({ "session_id": session_id, "error": result["error"], "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) elif result.get("status") == "insufficient_data": # Pas assez de screenshots valides logger.warning( f"[WORKER] Session {session_id} : données insuffisantes " f"({result.get('states_count', 0)} states) après {elapsed:.1f}s" ) with self._lock: self._failed.append({ "session_id": session_id, "error": "insufficient_data", "states_count": result.get("states_count", 0), "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) else: # Succès logger.info( f"[WORKER] Session {session_id} traitée avec succès en {elapsed:.1f}s | " f"workflow={result.get('workflow_id', '?')} | " f"{result.get('nodes', 0)} nodes, {result.get('edges', 0)} edges" ) with self._lock: self._completed.append({ "session_id": session_id, "workflow_id": result.get("workflow_id"), "workflow_name": result.get("workflow_name"), "nodes": result.get("nodes", 0), "edges": result.get("edges", 0), "states_analyzed": result.get("states_analyzed", 0), "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) # Callback de notification if self._on_complete: try: self._on_complete(session_id, result) except Exception as e: logger.error(f"[WORKER] Erreur callback on_complete: {e}") except Exception as e: elapsed = time.time() - start_time logger.error( f"[WORKER] Exception inattendue pour session {session_id} " f"après {elapsed:.1f}s : {e}", exc_info=True, ) with self._lock: self._failed.append({ "session_id": session_id, "error": f"exception: {e}", "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) finally: with self._lock: self._current_session = None self._current_progress = None logger.info(f"[WORKER] === Fin traitement session {session_id} ===") def _update_progress(self, session_id: str, current: int, total: int, shot_id: str = ""): """Callback de progression appelé par reprocess_session.""" with self._lock: if self._current_progress: self._current_progress["screenshots_total"] = total self._current_progress["screenshots_processed"] = current self._current_progress["status"] = "processing" self._current_progress["current_shot"] = shot_id logger.info( f"[WORKER] Session {session_id} : screenshot {current}/{total}" + (f" ({shot_id})" if shot_id else "") )