# agent_v0/server_v1/run_worker.py """ Worker VLM autonome — tourne dans un process Python SEPARE du serveur HTTP. Résout le problème du GIL : le serveur HTTP (FastAPI) reste réactif car le VLM (ScreenAnalyzer, CLIP, FAISS, GraphBuilder) tourne dans ce process dédié. Usage: python -m agent_v0.server_v1.run_worker Architecture : Process 1 : Serveur HTTP (FastAPI, port 5005) — distribue les replays, reçoit events/images Process 2 : Ce worker — analyse VLM des sessions finalisées Process 3 : Ollama (port 11434) — LLM local Communication inter-process par fichiers (pas de Redis) : - _worker_queue.txt : liste des session_ids à traiter (ajoutés par le serveur HTTP) - _replay_active.lock : quand présent, le worker se suspend (le GPU est utilisé par le replay) Le worker : 1. Scanne _worker_queue.txt pour trouver les sessions à traiter 2. Vérifie _replay_active.lock avant chaque screenshot (priorité au replay) 3. Traite les sessions une par une (VLM + CLIP + GraphBuilder) 4. Sauvegarde les workflows JSON sur disque 5. Se suspend quand un replay est actif (libère le GPU) """ import logging import os import signal import sys import time from datetime import datetime from pathlib import Path from typing import Dict, List, Optional logger = logging.getLogger("vlm_worker") # Chemins de base (relatifs au working directory = racine du projet) ROOT_DIR = Path(__file__).parent.parent.parent DATA_DIR = ROOT_DIR / "data" / "training" LIVE_SESSIONS_DIR = DATA_DIR / "live_sessions" QUEUE_FILE = DATA_DIR / "_worker_queue.txt" REPLAY_LOCK = DATA_DIR / "_replay_active.lock" # Intervalle de polling quand la queue est vide (secondes) POLL_INTERVAL = 10 # Intervalle de vérification du replay lock (secondes) REPLAY_CHECK_INTERVAL = 2 # Timeout max d'attente du replay lock avant reprise forcée (secondes) REPLAY_WAIT_TIMEOUT = 120 class VLMWorker: """Worker VLM autonome qui traite les sessions finalisées. Tourne en boucle infinie dans un process séparé du serveur HTTP. Communique via le filesystem : - Lit les session_ids depuis _worker_queue.txt - Vérifie _replay_active.lock pour se suspendre - Écrit les workflows dans data/training/workflows/ """ def __init__(self): self._running = False self._processor = None # Initialisé au premier besoin (lazy loading GPU) self._current_session: Optional[str] = None # Stats self._stats: Dict[str, int] = { "sessions_processed": 0, "sessions_failed": 0, "sessions_skipped": 0, "total_screenshots_analyzed": 0, } self._completed: List[Dict] = [] self._failed: List[Dict] = [] def _get_processor(self): """Lazy init du StreamProcessor (charge les modèles GPU au premier appel).""" if self._processor is None: logger.info("Initialisation du StreamProcessor (chargement GPU)...") from .stream_processor import StreamProcessor self._processor = StreamProcessor(data_dir=str(LIVE_SESSIONS_DIR)) logger.info("StreamProcessor initialisé.") return self._processor def start(self): """Boucle principale du worker.""" self._running = True logger.info( "VLM Worker démarré — surveillance de %s", QUEUE_FILE, ) logger.info(" Replay lock : %s", REPLAY_LOCK) logger.info(" Sessions dir : %s", LIVE_SESSIONS_DIR) logger.info(" Poll interval : %ds", POLL_INTERVAL) while self._running: try: # Vérifier si un replay est actif if self._is_replay_active(): self._wait_for_replay_end() continue # Lire la prochaine session de la queue session_id = self._read_next_session() if session_id: self._process_session(session_id) else: time.sleep(POLL_INTERVAL) except KeyboardInterrupt: logger.info("Interruption clavier, arrêt du worker.") self._running = False except Exception as e: logger.error("Erreur dans la boucle principale : %s", e, exc_info=True) time.sleep(5) # Éviter une boucle d'erreurs rapide logger.info("VLM Worker arrêté.") def stop(self): """Arrêt propre du worker.""" self._running = False logger.info("Arrêt demandé.") # ========================================================================= # Queue management (fichier _worker_queue.txt) # ========================================================================= def _read_next_session(self) -> Optional[str]: """Lit et retire le premier session_id de la queue. Format du fichier : une ligne par session_id. Retire la ligne traitée de façon atomique (réécriture complète). """ if not QUEUE_FILE.exists(): return None try: lines = QUEUE_FILE.read_text(encoding="utf-8").strip().splitlines() if not lines: return None # Prendre le premier session_id non vide session_id = None remaining = [] for line in lines: line = line.strip() if not line: continue if session_id is None: session_id = line else: remaining.append(line) # Réécrire le fichier sans la première ligne (atomique via rename) tmp_file = QUEUE_FILE.with_suffix(".tmp") if remaining: tmp_file.write_text( "\n".join(remaining) + "\n", encoding="utf-8", ) else: tmp_file.write_text("", encoding="utf-8") tmp_file.rename(QUEUE_FILE) if session_id: logger.info( "Session déqueuée : %s (%d restantes dans la queue)", session_id, len(remaining), ) return session_id except Exception as e: logger.error("Erreur lecture queue %s : %s", QUEUE_FILE, e) return None # ========================================================================= # Replay lock (_replay_active.lock) # ========================================================================= def _is_replay_active(self) -> bool: """Vérifie si un replay est en cours (fichier lock présent).""" return REPLAY_LOCK.exists() def _wait_for_replay_end(self): """Attend que le replay se termine (suppression du fichier lock). Timeout de sécurité : REPLAY_WAIT_TIMEOUT secondes max. """ start = time.time() logger.info( "Replay actif détecté (%s), worker en pause...", REPLAY_LOCK, ) while self._running and REPLAY_LOCK.exists(): elapsed = time.time() - start if elapsed > REPLAY_WAIT_TIMEOUT: logger.warning( "Timeout d'attente du replay (%ds), reprise forcée.", REPLAY_WAIT_TIMEOUT, ) break time.sleep(REPLAY_CHECK_INTERVAL) elapsed = time.time() - start if elapsed > 0.5: logger.info("Replay terminé, worker reprend après %.1fs de pause.", elapsed) # ========================================================================= # Traitement d'une session # ========================================================================= def _process_session(self, session_id: str): """Traite une session complète (analyse VLM + construction workflow).""" self._current_session = session_id logger.info("=== Début traitement session %s ===", session_id) start_time = time.time() try: proc = self._get_processor() # Vérifier que le dossier session existe session_dir = proc._find_session_dir(session_id) if not session_dir: logger.error( "Dossier session %s introuvable, skip.", session_id, ) self._stats["sessions_skipped"] += 1 return shots_dir = session_dir / "shots" full_shots = sorted(shots_dir.glob("shot_*_full.png")) if shots_dir.exists() else [] if not full_shots: logger.warning( "Aucun screenshot full dans %s, skip.", shots_dir, ) self._stats["sessions_skipped"] += 1 return logger.info( "Session %s : %d screenshots full à analyser dans %s", session_id, len(full_shots), shots_dir, ) # Utiliser reprocess_session du StreamProcessor # qui fait : ScreenAnalyzer + CLIP + FAISS + GraphBuilder result = proc.reprocess_session( session_id, progress_callback=self._progress_callback, ) elapsed = time.time() - start_time if result.get("error"): logger.error( "Échec session %s après %.1fs : %s", session_id, elapsed, result["error"], ) self._stats["sessions_failed"] += 1 self._failed.append({ "session_id": session_id, "error": result["error"], "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) elif result.get("status") == "insufficient_data": logger.warning( "Session %s : données insuffisantes (%d states) après %.1fs", session_id, result.get("states_count", 0), elapsed, ) self._stats["sessions_failed"] += 1 self._failed.append({ "session_id": session_id, "error": "insufficient_data", "states_count": result.get("states_count", 0), "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) else: logger.info( "Session %s traitée en %.1fs | workflow=%s | %d nodes, %d edges", session_id, elapsed, result.get("workflow_id", "?"), result.get("nodes", 0), result.get("edges", 0), ) self._stats["sessions_processed"] += 1 self._stats["total_screenshots_analyzed"] += result.get("states_analyzed", 0) self._completed.append({ "session_id": session_id, "workflow_id": result.get("workflow_id"), "workflow_name": result.get("workflow_name"), "nodes": result.get("nodes", 0), "edges": result.get("edges", 0), "states_analyzed": result.get("states_analyzed", 0), "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) except Exception as e: elapsed = time.time() - start_time logger.error( "Exception inattendue pour session %s après %.1fs : %s", session_id, elapsed, e, exc_info=True, ) self._stats["sessions_failed"] += 1 self._failed.append({ "session_id": session_id, "error": f"exception: {e}", "elapsed_seconds": round(elapsed, 1), "timestamp": datetime.now().isoformat(), }) finally: self._current_session = None logger.info("=== Fin traitement session %s ===", session_id) def _progress_callback(self, session_id: str, current: int, total: int, shot_id: str = ""): """Callback de progression appelé par reprocess_session. Vérifie aussi le replay lock entre chaque screenshot. """ logger.info( "Session %s : screenshot %d/%d%s", session_id, current, total, f" ({shot_id})" if shot_id else "", ) # Vérifier si un replay est devenu actif pendant le traitement if self._is_replay_active(): logger.info( "Replay détecté pendant l'analyse de %s, pause...", session_id, ) self._wait_for_replay_end() def main(): """Point d'entrée du worker VLM autonome.""" logging.basicConfig( level=logging.INFO, format="%(asctime)s [VLM-WORKER] %(levelname)s %(message)s", ) # Réduire le bruit des loggers tiers logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("httpcore").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) # Créer les dossiers nécessaires DATA_DIR.mkdir(parents=True, exist_ok=True) LIVE_SESSIONS_DIR.mkdir(parents=True, exist_ok=True) worker = VLMWorker() # Gestion propre des signaux def _handle_signal(signum, frame): logger.info("Signal %s reçu, arrêt en cours...", signal.Signals(signum).name) worker.stop() signal.signal(signal.SIGTERM, _handle_signal) signal.signal(signal.SIGINT, _handle_signal) # Afficher l'état au démarrage print(f"\n{'='*60}") print(f" VLM Worker — Process séparé du serveur HTTP") print(f" Queue : {QUEUE_FILE}") print(f" Lock : {REPLAY_LOCK}") print(f" PID : {os.getpid()}") print(f"{'='*60}\n") worker.start() if __name__ == "__main__": main()