# agent_v1/main.py """ Point d'entree Agent V1 - Enrichi avec Intelligence de Contexte, Heartbeat et Replay. Boucles paralleles (threads daemon) : - _heartbeat_loop : capture periodique toutes les 5s - _command_watchdog_loop : surveillance du fichier command.json (legacy) - _replay_poll_loop : polling du serveur pour les actions de replay (P0-5) """ import sys import os import uuid import time import logging import threading from .config import SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID from .core.captor import EventCaptorV1 from .core.executor import ActionExecutorV1 from .network.streamer import TraceStreamer from .ui.shared_state import AgentState from .ui.smart_tray import SmartTrayV1 from .ui.chat_window import ChatWindow from .ui.capture_server import CaptureServer from .session.storage import SessionStorage from .vision.capturer import VisionCapturer # Import optionnel du client serveur (pour le chat et les workflows) # Deux chemins : relatif (depuis agent_v0.agent_v1) ou absolu (depuis C:\rpa_vision\agent_v1) try: from ..lea_ui.server_client import LeaServerClient except (ImportError, ValueError): try: from lea_ui.server_client import LeaServerClient except ImportError: LeaServerClient = None # Configuration du logging logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger(__name__) # Intervalle de polling replay (secondes) REPLAY_POLL_INTERVAL = 1.0 class AgentV1: def __init__(self, user_id="demo_user"): self.user_id = user_id self.machine_id = MACHINE_ID self.session_id = None self.session_dir = None # Gestion du stockage local et nettoyage self.storage = SessionStorage(SESSIONS_ROOT) threading.Thread(target=self._delayed_cleanup, daemon=True).start() self.vision = None self.streamer = None self.captor = None self.shot_counter = 0 self.running = False # Executeur partage entre watchdog et replay self._executor = None # Flag pour indiquer qu'un replay est en cours (eviter les conflits) self._replay_active = False # Etat partage entre systray et chat (source de verite unique) self._state = AgentState() self._state.set_on_start(self.start_session) self._state.set_on_stop(self.stop_session) # Client serveur pour le chat et les workflows self._server_client = None if LeaServerClient is not None: self._server_client = LeaServerClient() # Fenetre de chat Lea (tkinter natif) server_host = ( self._server_client.server_host if self._server_client is not None else os.getenv("RPA_SERVER_HOST", "localhost") ) self._chat_window = ChatWindow( server_client=self._server_client, on_start_callback=self.start_session, server_host=server_host, chat_port=5004, shared_state=self._state, ) # Executeur pour le replay (doit exister avant le poll) self._executor = ActionExecutorV1() # Boucles permanentes (pas besoin de session active) self.running = True self._bg_vision = VisionCapturer(str(SESSIONS_ROOT / "_background")) threading.Thread(target=self._replay_poll_loop, daemon=True).start() threading.Thread(target=self._background_heartbeat_loop, daemon=True).start() # Mini-serveur HTTP pour captures a la demande (port 5006) self._capture_server = CaptureServer() self._capture_server.start() # UI Tray intelligent (remplace TrayAppV1, plus de PyQt5) self.ui = SmartTrayV1( self.start_session, self.stop_session, server_client=self._server_client, chat_window=self._chat_window, machine_id=self.machine_id, shared_state=self._state, ) def _delayed_cleanup(self): """Nettoyage en arrière-plan après 30s pour ne pas bloquer le démarrage.""" time.sleep(30) self.storage.run_auto_cleanup() def start_session(self, workflow_name): self.session_id = f"sess_{time.strftime('%Y%m%dT%H%M%S')}_{uuid.uuid4().hex[:6]}" self.session_dir = self.storage.get_session_dir(self.session_id) self.vision = VisionCapturer(str(self.session_dir)) self.streamer = TraceStreamer(self.session_id, machine_id=self.machine_id) self.captor = EventCaptorV1(self._on_event_bridge) # Initialiser l'executeur partage self._executor = ActionExecutorV1() self.shot_counter = 0 self.running = True self._replay_active = False self.streamer.start() self.captor.start() # Heartbeat Contextuel (Toutes les 5s par defaut) threading.Thread(target=self._heartbeat_loop, daemon=True).start() # Watchdog de Commandes (GHOST Replay — legacy fichier) threading.Thread(target=self._command_watchdog_loop, daemon=True).start() # Boucle de polling replay (P0-5 — pull depuis le serveur) threading.Thread(target=self._replay_poll_loop, daemon=True).start() logger.info(f"Session {self.session_id} ({workflow_name}) sur machine {self.machine_id} en cours...") def _command_watchdog_loop(self): """Surveille un fichier de commande pour executer des ordres visuels (legacy).""" import json import platform from .config import BASE_DIR # Chemin du fichier de commande selon l'OS if platform.system() == "Windows": cmd_path = "C:\\rpa_vision\\command.json" else: cmd_path = str(BASE_DIR / "command.json") while self.running: # Ne pas traiter les commandes fichier pendant un replay serveur if self._replay_active: time.sleep(1) continue if os.path.exists(cmd_path): try: with open(cmd_path, "r") as f: order = json.load(f) os.remove(cmd_path) # On consomme l'ordre if self._executor: self._executor.execute_normalized_order(order) except Exception as e: logger.error(f"Erreur Watchdog: {e}") time.sleep(1) def _replay_poll_loop(self): """ Boucle de polling pour les actions de replay depuis le serveur (P0-5). Tourne en parallele du heartbeat et du watchdog. Poll GET /replay/next toutes les REPLAY_POLL_INTERVAL secondes. Quand une action est recue, l'execute via l'executor et rapporte le resultat. """ msg = ( f"[REPLAY] Boucle replay demarree — poll toutes les " f"{REPLAY_POLL_INTERVAL}s sur {SERVER_URL}" ) print(msg) logger.info(msg) poll_count = 0 while self.running: if not self._executor: time.sleep(REPLAY_POLL_INTERVAL) continue # Utiliser la session active ou un ID par défaut pour le replay poll_session = self.session_id or f"agent_{self.user_id}" # Log periodique pour confirmer que la boucle tourne (toutes les 60s) poll_count += 1 if poll_count % int(60 / REPLAY_POLL_INTERVAL) == 0: print( f"[REPLAY] Poll #{poll_count} — session={poll_session} " f"— serveur={SERVER_URL}" ) try: # Tenter de recuperer et executer une action had_action = self._executor.poll_and_execute( session_id=poll_session, server_url=SERVER_URL, machine_id=self.machine_id, ) if had_action: if not self._replay_active: self._replay_active = True self.ui.set_replay_active(True) self._state.set_replay_active(True) # Si une action a ete executee, poll plus rapidement # pour enchainer les actions du workflow time.sleep(0.2) else: # Pas d'action en attente — utiliser le backoff de l'executor # (augmente si le serveur est indisponible, reset a 1s sinon) if self._replay_active: print("[REPLAY] Replay termine — retour en mode capture") logger.info("Replay termine — retour en mode capture") self._replay_active = False self.ui.set_replay_active(False) self._state.set_replay_active(False) poll_delay = getattr(self._executor, '_poll_backoff', REPLAY_POLL_INTERVAL) time.sleep(max(poll_delay, REPLAY_POLL_INTERVAL)) except Exception as e: print(f"[REPLAY] ERREUR boucle replay : {e}") logger.error(f"Erreur replay poll loop : {e}") self._replay_active = False self._state.set_replay_active(False) poll_delay = getattr(self._executor, '_poll_backoff', REPLAY_POLL_INTERVAL) time.sleep(max(poll_delay, REPLAY_POLL_INTERVAL)) _last_bg_hash: str = "" def _background_heartbeat_loop(self): """Heartbeat permanent — envoie un screenshot toutes les 5s au serveur. Tourne même sans session active, pour que le VWB puisse capturer Windows. """ import requests as req bg_session = f"bg_{self.machine_id}" logger.info(f"[HEARTBEAT] Boucle permanente démarrée (session={bg_session})") while self.running: try: # Ne pas envoyer pendant un enregistrement (le heartbeat session s'en charge) if self.session_id: time.sleep(5) continue full_path = self._bg_vision.capture_full_context("heartbeat") if not full_path: time.sleep(5) continue # Dédup : skip si écran identique img_hash = self._quick_hash(full_path) if img_hash and img_hash == self._last_bg_hash: time.sleep(5) continue self._last_bg_hash = img_hash # Envoyer au streaming server with open(full_path, 'rb') as f: req.post( f"{SERVER_URL}/traces/stream/image", params={ "session_id": bg_session, "shot_id": f"heartbeat_{int(time.time())}", "machine_id": self.machine_id, }, files={"file": ("screenshot.png", f, "image/png")}, timeout=10, ) except Exception as e: logger.debug(f"[HEARTBEAT] Erreur: {e}") time.sleep(5) def stop_session(self): self.running = False if self.captor: self.captor.stop() if self.streamer: self.streamer.stop() logger.info(f"Session {self.session_id} terminée.") _last_heartbeat_hash: str = "" def _heartbeat_loop(self): """Capture périodique pour donner du contexte au stagiaire. Déduplication : n'envoie que si l'écran a changé. """ while self.running: try: full_path = self.vision.capture_full_context("heartbeat") if full_path: # Hash rapide pour détecter les changements d'écran img_hash = self._quick_hash(full_path) if img_hash != self._last_heartbeat_hash: self._last_heartbeat_hash = img_hash self.streamer.push_image(full_path, f"heartbeat_{int(time.time())}") self.streamer.push_event({"type": "heartbeat", "image": full_path, "timestamp": time.time(), "machine_id": self.machine_id}) except Exception as e: logger.error(f"Heartbeat error: {e}") time.sleep(5) @staticmethod def _quick_hash(image_path: str) -> str: """Hash perceptuel rapide (16x16 niveaux de gris).""" try: from PIL import Image import hashlib img = Image.open(image_path).resize((16, 16)).convert('L') return hashlib.md5(img.tobytes()).hexdigest() except Exception: return "" def _on_event_bridge(self, event): """Pont intelligent avec capture duale et post-action monitoring.""" if not self.session_id: return # Injecter l'identifiant machine dans chaque événement (multi-machine) event["machine_id"] = self.machine_id # Injecter le contexte fenêtre dans chaque événement (nécessaire # pour que le serveur maintienne last_window_info) if self.captor and self.captor.last_window: event["window"] = self.captor.last_window # Capture Proactive sur changement de fenêtre if event["type"] == "window_focus_change": full_path = self.vision.capture_full_context("focus_change") event["screenshot_context"] = full_path self.streamer.push_image(full_path, f"focus_{int(time.time())}") # 🔴 Capture Interactive (Dual) if event["type"] in ["mouse_click", "key_combo"]: self.shot_counter += 1 shot_id = f"shot_{self.shot_counter:04d}" pos = event.get("pos", (0, 0)) capture_info = self.vision.capture_dual(pos[0], pos[1], shot_id) event["screenshot_id"] = shot_id event["vision_info"] = capture_info self._stream_capture_info(capture_info, shot_id) # 🕒 POST-ACTION : Capture du résultat après 1s (pour voir le résultat du clic) threading.Timer(1.0, self._capture_result, args=(shot_id,)).start() self.ui.update_stats(self.shot_counter) self._state.update_actions_count(self.shot_counter) print(f"📸 Action capturée : {event['type']}") self.streamer.push_event(event) def _capture_result(self, base_shot_id: str): """Capture l'état de l'écran 1s après l'action pour voir l'effet.""" if not self.running: return res_path = self.vision.capture_full_context(f"result_of_{base_shot_id}") self.streamer.push_image(res_path, f"res_{base_shot_id}") self.streamer.push_event({"type": "action_result", "base_shot_id": base_shot_id, "image": res_path}) def _stream_capture_info(self, capture_info, shot_id): if "full" in capture_info: self.streamer.push_image(capture_info["full"], f"{shot_id}_full") if "crop" in capture_info: self.streamer.push_image(capture_info["crop"], f"{shot_id}_crop") def run(self): self.ui.run() def main(): agent = AgentV1() agent.run() if __name__ == "__main__": main()