# agent_v0/server_v1/replay_engine.py """ Replay Engine — Gestion des replays de workflows. Contient : - Setup environnement (préparation apps avant replay) - Validation des actions de replay (sécurité) - Conversion workflow → actions normalisées - Fonctions utilitaires de replay (session detection, state management, retry) - Pre-check écran par embedding CLIP - Détection popup Extrait de api_stream.py pour clarifier l'architecture. """ import json import logging import re import threading import time import uuid from collections import defaultdict from typing import Any, Dict, List, Optional logger = logging.getLogger("api_stream") # ========================================================================= # Validation des actions de replay (sécurité HIGH) # ========================================================================= _ALLOWED_ACTION_TYPES = { "click", "type", "key_combo", "scroll", "wait", "file_open", "file_save", "file_close", "file_new", "file_dialog", "double_click", "right_click", "drag", "verify_screen", # Replay hybride : vérification visuelle entre groupes } _MAX_ACTION_TEXT_LENGTH = 10000 _MAX_KEYS_PER_COMBO = 10 # Touches autorisées dans les key_combo (modificateurs + touches spéciales + caractères simples) _KNOWN_KEY_NAMES = { "enter", "return", "tab", "escape", "esc", "backspace", "delete", "space", "up", "down", "left", "right", "home", "end", "page_up", "page_down", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "ctrl", "ctrl_l", "ctrl_r", "alt", "alt_l", "alt_r", "shift", "shift_l", "shift_r", "cmd", "win", "super", "super_l", "super_r", "windows", "meta", "insert", "print_screen", "caps_lock", "num_lock", } def _validate_replay_action(action: dict) -> Optional[str]: """Valide une action de replay. Retourne un message d'erreur ou None si valide.""" action_type = action.get("type", "") # Vérifier le type d'action if action_type not in _ALLOWED_ACTION_TYPES: return f"Type d'action non autorisé : '{action_type}'. Autorisés : {sorted(_ALLOWED_ACTION_TYPES)}" # Vérifier la longueur du texte text = action.get("text", "") if isinstance(text, str) and len(text) > _MAX_ACTION_TEXT_LENGTH: return f"Texte trop long ({len(text)} > {_MAX_ACTION_TEXT_LENGTH} caractères)" # Vérifier les touches keys = action.get("keys", []) if isinstance(keys, list): if len(keys) > _MAX_KEYS_PER_COMBO: return f"Trop de touches ({len(keys)} > {_MAX_KEYS_PER_COMBO})" for key in keys: key_lower = str(key).lower() # Accepter les caractères simples (a-z, 0-9, ponctuation) et les noms connus if len(str(key)) == 1 or key_lower in _KNOWN_KEY_NAMES: continue return f"Touche inconnue : '{key}'" # Vérifier les coordonnées normalisées for coord_name in ("x_pct", "y_pct"): val = action.get(coord_name) if val is not None: try: val_f = float(val) if not (0.0 <= val_f <= 1.0): return f"Coordonnée {coord_name}={val_f} hors limites [0.0, 1.0]" except (TypeError, ValueError): return f"Coordonnée {coord_name} invalide : {val}" return None # Valide # ========================================================================= # Setup environnement — Préparation automatique avant le replay # ========================================================================= # Mapping des noms d'exécutables Windows courants vers la commande de lancement. # Utilisé comme fallback pour le texte de recherche dans le menu Démarrer. # Le format est : "processname.exe" (minuscule) -> commande shell _APP_LAUNCH_COMMANDS: Dict[str, str] = { "notepad.exe": "notepad", "explorer.exe": "explorer", "calc.exe": "calc", "mspaint.exe": "mspaint", "cmd.exe": "cmd", "powershell.exe": "powershell", "wordpad.exe": "wordpad", "charmap.exe": "charmap", "snippingtool.exe": "snippingtool", "taskmgr.exe": "taskmgr", "regedit.exe": "regedit", "mstsc.exe": "mstsc", "winword.exe": "winword", "excel.exe": "excel", "powerpnt.exe": "powerpnt", "outlook.exe": "outlook", "msedge.exe": "msedge", "chrome.exe": "chrome", "firefox.exe": "firefox", "code.exe": "code", } # Mapping des exécutables vers le nom visuel à chercher dans le menu Démarrer. # Contient le texte de recherche (souvent le nom français) et une description # pour le VLM afin d'identifier l'icône dans les résultats de recherche. # Format : "processname.exe" -> {"search_text": ..., "display_name": ..., "vlm_description": ...} _APP_VISUAL_SEARCH: Dict[str, Dict[str, str]] = { "notepad.exe": { "search_text": "Bloc-notes", "display_name": "Bloc-notes", "vlm_description": "L'application Bloc-notes (Notepad) dans les résultats de recherche", }, "calc.exe": { "search_text": "Calculatrice", "display_name": "Calculatrice", "vlm_description": "L'application Calculatrice dans les résultats de recherche", }, "mspaint.exe": { "search_text": "Paint", "display_name": "Paint", "vlm_description": "L'application Paint dans les résultats de recherche", }, "cmd.exe": { "search_text": "Invite de commandes", "display_name": "Invite de commandes", "vlm_description": "L'Invite de commandes (Command Prompt) dans les résultats", }, "powershell.exe": { "search_text": "PowerShell", "display_name": "PowerShell", "vlm_description": "Windows PowerShell dans les résultats de recherche", }, "wordpad.exe": { "search_text": "WordPad", "display_name": "WordPad", "vlm_description": "L'application WordPad dans les résultats de recherche", }, "winword.exe": { "search_text": "Word", "display_name": "Microsoft Word", "vlm_description": "Microsoft Word dans les résultats de recherche", }, "excel.exe": { "search_text": "Excel", "display_name": "Microsoft Excel", "vlm_description": "Microsoft Excel dans les résultats de recherche", }, "powerpnt.exe": { "search_text": "PowerPoint", "display_name": "Microsoft PowerPoint", "vlm_description": "Microsoft PowerPoint dans les résultats de recherche", }, "outlook.exe": { "search_text": "Outlook", "display_name": "Microsoft Outlook", "vlm_description": "Microsoft Outlook dans les résultats de recherche", }, "msedge.exe": { "search_text": "Edge", "display_name": "Microsoft Edge", "vlm_description": "Microsoft Edge dans les résultats de recherche", }, "chrome.exe": { "search_text": "Chrome", "display_name": "Google Chrome", "vlm_description": "Google Chrome dans les résultats de recherche", }, "firefox.exe": { "search_text": "Firefox", "display_name": "Mozilla Firefox", "vlm_description": "Mozilla Firefox dans les résultats de recherche", }, "code.exe": { "search_text": "Visual Studio Code", "display_name": "Visual Studio Code", "vlm_description": "Visual Studio Code dans les résultats de recherche", }, "taskmgr.exe": { "search_text": "Gestionnaire des tâches", "display_name": "Gestionnaire des tâches", "vlm_description": "Le Gestionnaire des tâches dans les résultats de recherche", }, "snippingtool.exe": { "search_text": "Outil Capture", "display_name": "Outil Capture d'écran", "vlm_description": "L'Outil Capture d'écran dans les résultats de recherche", }, "mstsc.exe": { "search_text": "Connexion Bureau à distance", "display_name": "Bureau à distance", "vlm_description": "La Connexion Bureau à distance dans les résultats", }, } # Applications Windows à ignorer pour le setup (processus système, agents, etc.) _SETUP_IGNORE_APPS = { "searchhost.exe", # Barre de recherche Windows "explorer.exe", # Explorer est toujours lancé (shell Windows) "pythonw.exe", # Agent Python (notre propre agent) "python.exe", # Idem "shellexperiencehost.exe", "startmenuexperiencehost.exe", "applicationframehost.exe", "systemsettings.exe", "textinputhost.exe", "runtimebroker.exe", } def _extract_required_apps_from_events(raw_events: list) -> Dict[str, Any]: """Extraire les applications requises depuis les événements bruts d'une session. Analyse les window_focus_change pour identifier : - L'application principale (la plus utilisée hors apps système) - La première fenêtre ciblée (pour le setup initial) Args: raw_events: Événements bruts depuis live_events.jsonl. Returns: Dict avec les clés : - primary_app: str (nom de l'exécutable principal, ex: "Notepad.exe") - primary_launch_cmd: str (commande Win+R, ex: "notepad") - first_window_title: str (titre de la première fenêtre applicative) - apps: dict[str, int] (app_name -> nombre d'occurrences) """ app_counts: Dict[str, int] = defaultdict(int) first_app = None first_window_title = None for raw_evt in raw_events: event_data = raw_evt.get("event", raw_evt) evt_type = event_data.get("type", "") if evt_type == "window_focus_change": to_info = event_data.get("to", {}) if not to_info: continue app_name = to_info.get("app_name", "") title = to_info.get("title", "") if app_name: app_counts[app_name] += 1 if first_app is None and app_name.lower() not in _SETUP_IGNORE_APPS: first_app = app_name first_window_title = title # Aussi extraire depuis les mouse_click qui ont un champ window elif evt_type == "mouse_click": window = event_data.get("window", {}) if isinstance(window, dict): app_name = window.get("app_name", "") if app_name: app_counts[app_name] += 1 if not app_counts: return {} # Déterminer l'application principale (la plus fréquente hors apps ignorées) filtered_apps = { k: v for k, v in app_counts.items() if k.lower() not in _SETUP_IGNORE_APPS } if not filtered_apps: return {} primary_app = max(filtered_apps, key=filtered_apps.get) # Résoudre la commande de lancement primary_launch_cmd = _resolve_launch_command(primary_app) return { "primary_app": primary_app, "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_window_title or "", "apps": dict(app_counts), } def _extract_required_apps_from_workflow(workflow) -> Dict[str, Any]: """Extraire les applications requises depuis un workflow structuré. Analyse les nodes du workflow pour identifier les titres de fenêtres requis, puis infère l'application principale. Args: workflow: Objet Workflow ou dict brut. Returns: Même format que _extract_required_apps_from_events. """ # Accéder aux données (objet ou dict) if hasattr(workflow, 'nodes'): nodes = workflow.nodes metadata = workflow.metadata if hasattr(workflow, 'metadata') else {} elif isinstance(workflow, dict): nodes = workflow.get('nodes', []) metadata = workflow.get('metadata', {}) else: return {} if not nodes: return {} # Collecter les titres de fenêtres depuis les nodes window_titles = [] for node in nodes: template = node.template if hasattr(node, 'template') else node.get('template', {}) if isinstance(template, dict): window = template.get('window', {}) elif hasattr(template, 'window'): window = template.window if hasattr(template.window, '__dict__') else {} else: window = {} if isinstance(window, dict): title = window.get('title_pattern', '') or window.get('title_contains', '') elif hasattr(window, 'title_pattern'): title = getattr(window, 'title_pattern', '') or '' else: title = '' if title: window_titles.append(title) # Inférer l'app principale depuis les titres de fenêtres primary_app, primary_launch_cmd, matched_title = _infer_app_from_window_titles(window_titles) # Utiliser le titre qui a matché l'app (pas le premier node qui peut être "Rechercher") first_title = matched_title or (window_titles[0] if window_titles else "") if not primary_app: return {} source_session_id = metadata.get("source_session_id", "") if isinstance(metadata, dict) else "" machine_id = metadata.get("machine_id", "") if isinstance(metadata, dict) else "" return { "primary_app": primary_app, "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_title, "apps": {}, "source_session_id": source_session_id, "machine_id": machine_id, } def _resolve_launch_command(app_name: str) -> str: """Résoudre la commande Win+R pour lancer une application. Si l'app n'est pas dans le mapping, utilise le nom de l'exécutable directement sans l'extension .exe (fonctionne pour la plupart des apps). """ app_lower = app_name.lower() if app_lower in _APP_LAUNCH_COMMANDS: return _APP_LAUNCH_COMMANDS[app_lower] # Fallback : utiliser le nom sans l'extension .exe if app_lower.endswith(".exe"): return app_name[:-4] return app_name def _infer_app_from_window_titles(titles: list) -> tuple: """Inférer le nom de l'application et la commande de lancement depuis des titres de fenêtres. Utilise des heuristiques basées sur les patterns de titres Windows courants. Returns: Tuple (app_name, launch_command, matched_title). ("", "", "") si non identifié. """ _TITLE_APP_PATTERNS = [ ("bloc-notes", "Notepad.exe", "notepad"), ("notepad", "Notepad.exe", "notepad"), ("word", "winword.exe", "winword"), ("excel", "excel.exe", "excel"), ("powerpoint", "powerpnt.exe", "powerpnt"), ("outlook", "outlook.exe", "outlook"), ("paint", "mspaint.exe", "mspaint"), ("calculatrice", "calc.exe", "calc"), ("calculator", "calc.exe", "calc"), ("explorateur de fichiers", "explorer.exe", "explorer"), ("file explorer", "explorer.exe", "explorer"), ("invite de commandes", "cmd.exe", "cmd"), ("command prompt", "cmd.exe", "cmd"), ("powershell", "powershell.exe", "powershell"), ("visual studio code", "code.exe", "code"), ("edge", "msedge.exe", "msedge"), ("chrome", "chrome.exe", "chrome"), ("firefox", "firefox.exe", "firefox"), ] for title in titles: title_lower = title.lower() for pattern, app_name, launch_cmd in _TITLE_APP_PATTERNS: if pattern in title_lower: # Ignorer les apps système (explorer, etc.) if app_name.lower() in _SETUP_IGNORE_APPS: continue return (app_name, launch_cmd, title) return ("", "", "") def _get_visual_search_info(app_name: str) -> Dict[str, str]: """Obtenir les informations de recherche visuelle pour une application. Consulte _APP_VISUAL_SEARCH, sinon construit un fallback à partir du nom de l'exécutable (ex: "MonApp.exe" -> search_text="MonApp"). Args: app_name: Nom de l'exécutable (ex: "Notepad.exe"). Returns: Dict avec search_text, display_name, vlm_description. """ app_lower = app_name.lower() if app_lower in _APP_VISUAL_SEARCH: return dict(_APP_VISUAL_SEARCH[app_lower]) # Fallback : utiliser le nom sans .exe base_name = app_name[:-4] if app_lower.endswith(".exe") else app_name return { "search_text": base_name, "display_name": base_name, "vlm_description": f"L'application {base_name} dans les résultats de recherche", } def _generate_setup_actions( app_info: Dict[str, Any], setup_id_prefix: str = "setup", ) -> List[Dict[str, Any]]: """Générer les actions 100% visuelles pour ouvrir l'application avant le replay. Approche entièrement visuelle -- JAMAIS de raccourcis clavier (Win, Win+R, Ctrl+X, etc.) qui n'ont pas été enregistrés par l'utilisateur. Tout passe par des clics visuels résolus par le VLM (Qwen2.5-VL). La séquence est : 1. Clic visuel sur le bouton Démarrer (coin bas-gauche de l'écran) 2. Attendre que le menu Démarrer s'ouvre (1s) 3. Clic visuel sur la barre de recherche du menu Démarrer 4. Attendre que la barre de recherche soit active (500ms) 5. Taper le nom de l'application (texte français, ex: "Bloc-notes") 6. Attendre les résultats de recherche (1.2s) 7. Clic visuel sur le résultat de l'application trouvée 8. Attendre que l'application s'ouvre (2-3s selon le poids) 9. verify_screen : vérifier que la fenêtre attendue est apparue Args: app_info: Dict retourné par _extract_required_apps_from_events ou _extract_required_apps_from_workflow. setup_id_prefix: Préfixe pour les action_id générés. Returns: Liste d'actions normalisées, prêtes à injecter dans la queue. Liste vide si aucune préparation n'est nécessaire. """ if not app_info: return [] launch_cmd = app_info.get("primary_launch_cmd", "") primary_app = app_info.get("primary_app", "") first_title = app_info.get("first_window_title", "") if not launch_cmd: logger.debug( "setup_actions : pas de commande de lancement pour '%s', skip", primary_app, ) return [] # Ne pas lancer les apps système (toujours présentes) if primary_app.lower() in _SETUP_IGNORE_APPS: logger.debug("setup_actions : app '%s' ignorée (système)", primary_app) return [] # Obtenir les informations de recherche visuelle pour cette app visual_info = _get_visual_search_info(primary_app) search_text = visual_info["search_text"] display_name = visual_info["display_name"] vlm_description = visual_info["vlm_description"] actions = [] logger.info( "Génération setup env 100%% visuel : lancement de '%s' via clic " "Démarrer → recherche visuelle '%s' (fenêtre attendue : '%s')", primary_app, search_text, first_title, ) # 1. Clic visuel sur le bouton Démarrer (toujours visible, bas-gauche) # Le VLM résout la position exacte ; x_pct/y_pct sont des fallbacks. actions.append({ "action_id": f"act_{setup_id_prefix}_click_start", "type": "click", "x_pct": 0.02, "y_pct": 0.98, "button": "left", "visual_mode": True, "target_spec": { "by_text": "Démarrer", "by_role": "start_button", "vlm_description": ( "Le bouton Démarrer de Windows (icône Windows), " "en bas à gauche de la barre des tâches" ), }, "_setup_phase": True, "_setup_step": "click_start_menu", }) # 2. Attendre que le menu Démarrer s'ouvre actions.append({ "action_id": f"act_{setup_id_prefix}_wait_start", "type": "wait", "duration_ms": 1000, "_setup_phase": True, "_setup_step": "wait_start_menu", }) # 3. Clic visuel sur la barre de recherche du menu Démarrer actions.append({ "action_id": f"act_{setup_id_prefix}_click_search", "type": "click", "x_pct": 0.20, "y_pct": 0.92, "button": "left", "visual_mode": True, "target_spec": { "by_text": "Rechercher", "by_role": "search_box", "vlm_description": ( "La barre ou le champ de recherche dans le menu Démarrer " "de Windows, souvent intitulé 'Tapez ici pour rechercher' " "ou 'Rechercher'" ), }, "_setup_phase": True, "_setup_step": "click_search_box", }) # 4. Attendre que la barre de recherche soit active et prête actions.append({ "action_id": f"act_{setup_id_prefix}_wait_search_ready", "type": "wait", "duration_ms": 500, "_setup_phase": True, "_setup_step": "wait_search_ready", }) # 5. Taper le nom visuel de l'application (texte français) actions.append({ "action_id": f"act_{setup_id_prefix}_type_search", "type": "type", "text": search_text, "_setup_phase": True, "_setup_step": "type_app_name", }) # 6. Attendre que la recherche Windows trouve l'application actions.append({ "action_id": f"act_{setup_id_prefix}_wait_results", "type": "wait", "duration_ms": 1200, "_setup_phase": True, "_setup_step": "wait_search_results", }) # 7. Clic visuel sur le résultat de l'application dans la liste actions.append({ "action_id": f"act_{setup_id_prefix}_click_result", "type": "click", "x_pct": 0.20, "y_pct": 0.50, "button": "left", "visual_mode": True, "target_spec": { "by_text": display_name, "by_role": "app_icon", "vlm_description": vlm_description, }, "_setup_phase": True, "_setup_step": "click_app_result", }) # 8. Attendre que l'application s'ouvre heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"} wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000 actions.append({ "action_id": f"act_{setup_id_prefix}_wait_launch", "type": "wait", "duration_ms": wait_ms, "_setup_phase": True, "_setup_step": "wait_app_launch", }) # 9. Vérification visuelle que la fenêtre attendue est apparue if first_title: actions.append({ "action_id": f"act_{setup_id_prefix}_verify", "type": "verify_screen", "expected_node": "setup_initial", "timeout_ms": 5000, "_setup_phase": True, "_setup_step": "verify_app_ready", "_expected_title": first_title, }) logger.info( "Setup env visuel généré : %d actions pour lancer '%s' " "(recherche visuelle : '%s')", len(actions), primary_app, search_text, ) return actions # ========================================================================= # Replay — Fonctions de conversion workflow → actions # ========================================================================= def _find_active_agent_session(session_manager, machine_id: Optional[str] = None) -> Optional[str]: """Trouver la dernière session Agent V1 pour le replay. Stratégie en 2 passes : 1. D'abord chercher une session non-finalisée (Agent V1 actif) 2. Sinon, prendre la plus récente même finalisée Args: session_manager: Instance LiveSessionManager. machine_id: Si fourni, ne chercher que les sessions de cette machine. """ with session_manager._lock: all_agent_sessions = [ s for s in session_manager._sessions.values() if s.session_id.startswith("sess_") and (machine_id is None or s.machine_id == machine_id) ] if not all_agent_sessions: return None # Trier par session_id (contient un timestamp) — plus récent d'abord all_agent_sessions.sort(key=lambda s: s.session_id, reverse=True) # Passe 1 : préférer une session non-finalisée for s in all_agent_sessions: if not s.finalized: return s.session_id # Passe 2 : fallback sur la plus récente (même finalisée) return all_agent_sessions[0].session_id def _workflow_to_actions( workflow, params: Optional[Dict[str, Any]] = None, processor=None, gesture_catalog=None, ) -> List[Dict[str, Any]]: """ Convertir un workflow (nodes + edges ordonnés) en liste d'actions normalisées. Parcourt le graphe depuis les entry_nodes en suivant les edges. Chaque edge produit une action normalisée avec coordonnées en pourcentage. Mode intelligent (workflows appris par Léa) : Si le workflow a des nodes avec des prototype_vectors, utilise le StreamProcessor.extract_enriched_actions() qui enrichit les actions avec les données de la session originale, le ciblage visuel et le pre-check/post-check par embedding CLIP. Mode classique (workflows VWB/manuels) : Parcours BFS classique avec _edge_to_normalized_actions(). """ params = params or {} # Détection d'un workflow appris (a des nodes avec prototype_vectors) # et qui a des edges structurés if _is_learned_workflow(workflow) and processor is not None: # Priorité 1 : replay hybride (événements bruts + structure workflow) hybrid = processor.build_hybrid_replay(workflow) if hybrid: logger.info( "Replay hybride : %d actions depuis events bruts + structure workflow", len(hybrid), ) # Optimisation par gestes clavier si disponible if gesture_catalog and hybrid: hybrid = gesture_catalog.optimize_replay_actions(hybrid) return hybrid # Priorité 2 : enrichissement classique (fallback si hybride échoue) enriched = processor.extract_enriched_actions(workflow, params) if enriched: logger.info( "Replay intelligent : %d actions enrichies depuis le workflow appris", len(enriched), ) if gesture_catalog and enriched: enriched = gesture_catalog.optimize_replay_actions(enriched) return enriched # Si l'enrichissement échoue aussi, fallback sur le mode classique logger.warning( "Enrichissement échoué pour le workflow appris, fallback mode classique" ) # Mode classique (VWB/manuels ou fallback) actions = [] # Construire un index des edges sortants par node outgoing: Dict[str, list] = defaultdict(list) for edge in workflow.edges: outgoing[edge.from_node].append(edge) # Parcours linéaire depuis le premier entry_node visited = set() current_nodes = list(workflow.entry_nodes) if workflow.entry_nodes else [] # Fallback : si pas d'entry_nodes, prendre le premier node if not current_nodes and workflow.nodes: current_nodes = [workflow.nodes[0].node_id] while current_nodes: node_id = current_nodes.pop(0) if node_id in visited: continue visited.add(node_id) edges = outgoing.get(node_id, []) for edge in edges: edge_actions = _edge_to_normalized_actions(edge, params) actions.extend(edge_actions) # Suivre le graphe vers le prochain node if edge.to_node not in visited: current_nodes.append(edge.to_node) # Optimisation : substituer les actions visuelles par des gestes clavier si possible if gesture_catalog and actions: actions = gesture_catalog.optimize_replay_actions(actions) return actions def _is_learned_workflow(workflow) -> bool: """Détecter si un workflow est un workflow appris (vs VWB/manuel). Un workflow appris a : - Des nodes avec _prototype_vector dans metadata - Des edges avec from_node/to_node - Un learning_state indicatif (OBSERVATION, COACHING, AUTO_CANDIDATE, etc.) Un workflow VWB/manuel a généralement : - Des edges avec des target_spec complets (by_text, by_role remplis) - Pas de prototype_vectors """ # Accéder aux données (objet ou dict) if hasattr(workflow, 'nodes'): nodes = workflow.nodes edges = workflow.edges elif isinstance(workflow, dict): nodes = workflow.get('nodes', []) edges = workflow.get('edges', []) else: return False if not nodes or not edges: return False # Vérifier si au moins un node a un prototype_vector has_prototype = False for node in nodes: metadata = node.metadata if hasattr(node, 'metadata') else node.get('metadata', {}) if isinstance(metadata, dict) and '_prototype_vector' in metadata: has_prototype = True break return has_prototype def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, Any]]: """ Convertir un WorkflowEdge en liste d'actions normalisées pour l'Agent V1. Un edge simple produit 1 action, un edge compound produit N actions (une par step). """ action = edge.action if action is None: logger.warning(f"Edge {edge.edge_id} sans action, skip") return [] action_type = action.type target = action.target action_params = action.parameters or {} # Extraire les coordonnées normalisées depuis TargetSpec.by_position x_pct = 0.0 y_pct = 0.0 if target and target.by_position: px, py = target.by_position if px <= 1.0 and py <= 1.0: x_pct = px y_pct = py else: ref_w = action_params.get("ref_width", 1920) or 1920 ref_h = action_params.get("ref_height", 1080) or 1080 x_pct = round(px / ref_w, 6) y_pct = round(py / ref_h, 6) base = {"edge_id": edge.edge_id, "from_node": edge.from_node, "to_node": edge.to_node} # Compound : décomposer en actions individuelles if action_type == "compound": return _expand_compound_steps(action_params.get("steps", []), base, params) # Actions simples normalized = {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"} if action_type == "mouse_click": normalized["type"] = "click" normalized["x_pct"] = x_pct normalized["y_pct"] = y_pct normalized["button"] = action_params.get("button", "left") elif action_type == "text_input": normalized["type"] = "type" text = action_params.get("text", "") text = _substitute_variables(text, params, action_params.get("defaults", {})) normalized["text"] = text normalized["x_pct"] = x_pct normalized["y_pct"] = y_pct elif action_type == "key_press": normalized["type"] = "key_combo" keys = action_params.get("keys", []) if not keys and action_params.get("key"): keys = [action_params["key"]] normalized["keys"] = keys else: logger.warning(f"Type d'action inconnu : {action_type}") return [] # Ajouter le target_spec complet pour la résolution visuelle target_spec = {} if target and target.by_role: target_spec["by_role"] = target.by_role normalized["target_role"] = target.by_role # Compat debug if target and target.by_text: target_spec["by_text"] = target.by_text normalized["target_text"] = target.by_text # Compat debug if target and hasattr(target, 'context_hints') and target.context_hints: target_spec["context_hints"] = target.context_hints if target_spec: normalized["target_spec"] = target_spec normalized["visual_mode"] = True # Signal à l'agent d'utiliser la résolution visuelle return [normalized] def _substitute_variables(text: str, params: Dict[str, Any], defaults: Dict[str, Any]) -> str: """Substituer les variables ${var} dans un texte. Priorité : params utilisateur > defaults du workflow > texte brut inchangé. Supporte ${var} dans un texte plus long (ex: "${expression}="). """ def replacer(match): var_name = match.group(1) return str(params.get(var_name, defaults.get(var_name, match.group(0)))) return re.sub(r'\$\{(\w+)\}', replacer, text) def _expand_compound_steps( steps: List[Dict[str, Any]], base: Dict[str, Any], params: Dict[str, Any] ) -> List[Dict[str, Any]]: """Décomposer les steps d'un compound en actions individuelles.""" actions = [] for step in steps: step_type = step.get("type", "unknown") action = { **base, "action_id": f"act_{uuid.uuid4().hex[:8]}", } if step_type == "key_press": action["type"] = "key_combo" keys = step.get("keys", []) if not keys and step.get("key"): keys = [step["key"]] action["keys"] = keys elif step_type == "text_input": action["type"] = "type" text = step.get("text", "") text = _substitute_variables(text, params, {}) action["text"] = text elif step_type == "wait": action["type"] = "wait" action["duration_ms"] = step.get("duration_ms", 500) elif step_type == "mouse_click": action["type"] = "click" action["x_pct"] = step.get("x_pct", 0.0) action["y_pct"] = step.get("y_pct", 0.0) action["button"] = step.get("button", "left") else: logger.debug(f"Step compound inconnu : {step_type}") continue actions.append(action) return actions # ========================================================================= # Pre-check écran — Vérification pré-action par embedding CLIP # ========================================================================= def _pre_check_screen_state( session_id: str, expected_node_id: str, current_screenshot_path: str, active_processor, replay_states: Dict[str, Dict[str, Any]], replay_lock: threading.Lock, precheck_threshold: float = 0.85, ) -> Dict[str, Any]: """Vérifier que l'écran actuel correspond à l'état attendu du node. Compare le screenshot actuel avec le prototype du node attendu via similarité d'embedding CLIP (rapide, ~200ms). Args: session_id: ID de la session de replay expected_node_id: ID du node source de l'action (from_node) current_screenshot_path: Chemin du screenshot heartbeat récent active_processor: Instance StreamProcessor avec le CLIPEmbedder chargé replay_states: Dict partagé des états de replay replay_lock: Lock pour l'accès concurrent aux replay_states precheck_threshold: Seuil de similarité cosine Returns: {"match": True/False, "similarity": float, "expected_node": str, "reason": str (si mismatch), "popup_detected": bool} """ result: Dict[str, Any] = { "match": True, "similarity": 1.0, "expected_node": expected_node_id, "popup_detected": False, } try: # 1. Trouver le workflow actif pour cette session replay_state = None workflow = None with replay_lock: for state in replay_states.values(): if state["session_id"] == session_id and state["status"] == "running": replay_state = state break if not replay_state: result["reason"] = "no_active_replay" return result workflow_id = replay_state.get("workflow_id", "") with active_processor._data_lock: workflow = active_processor._workflows.get(workflow_id) if workflow is None: result["reason"] = "workflow_not_found" return result # 2. Récupérer le prototype du node attendu # Supporter à la fois les objets Workflow et les dicts bruts node = None if hasattr(workflow, "get_node"): node = workflow.get_node(expected_node_id) elif isinstance(workflow, dict): # Format dict brut (workflows VWB/manuels) for n in workflow.get("nodes", []): if n.get("node_id") == expected_node_id: node = n break if node is None: result["reason"] = "node_not_found" return result # Extraire le prototype vector metadata = node.metadata if hasattr(node, "metadata") else node.get("metadata", {}) proto_list = metadata.get("_prototype_vector") if not proto_list or not isinstance(proto_list, (list, tuple)): result["reason"] = "no_prototype_vector" return result import numpy as np prototype_vector = np.array(proto_list, dtype=np.float32) # 3. Calculer l'embedding CLIP du screenshot actuel active_processor._ensure_initialized() if active_processor._clip_embedder is None: result["reason"] = "clip_embedder_unavailable" return result from PIL import Image pil_image = Image.open(current_screenshot_path) current_vector = active_processor._clip_embedder.embed_image(pil_image) if current_vector is None or len(current_vector) == 0: result["reason"] = "embedding_failed" return result # 4. Similarité cosine current_vector = current_vector.flatten().astype(np.float32) prototype_vector = prototype_vector.flatten().astype(np.float32) norm_current = np.linalg.norm(current_vector) norm_proto = np.linalg.norm(prototype_vector) if norm_current < 1e-8 or norm_proto < 1e-8: result["reason"] = "zero_norm_vector" result["match"] = False result["similarity"] = 0.0 return result similarity = float( np.dot(current_vector, prototype_vector) / (norm_current * norm_proto) ) result["similarity"] = round(similarity, 4) result["match"] = similarity >= precheck_threshold if not result["match"]: result["reason"] = "screen_mismatch" logger.warning( f"Pre-check MISMATCH pour session={session_id} " f"node={expected_node_id}: similarity={similarity:.4f} " f"< seuil={precheck_threshold}" ) # 5. Détection de popup par changement de titre de fenêtre result["popup_detected"] = _detect_popup_hint( session_id, workflow, expected_node_id, active_processor, ) except Exception as e: # Ne jamais bloquer le replay en cas d'erreur du pre-check logger.error(f"Pre-check échoué (non bloquant): {e}") result["match"] = True # Fallback permissif result["reason"] = f"precheck_error: {e}" return result def _detect_popup_hint( session_id: str, workflow: Any, expected_node_id: str, processor_instance=None, ) -> bool: """Détecter si une popup ou un dialogue modal est probable. Compare le titre de fenêtre actuel (via last_window_info de la session) avec le titre attendu du node dans le workflow. Un changement de titre suggère une popup/dialogue inattendu. Args: session_id: ID de la session workflow: Workflow object ou dict expected_node_id: ID du node attendu processor_instance: StreamProcessor pour accéder aux sessions Returns: True si un changement de titre suggère une popup """ try: if processor_instance is None: return False # Titre actuel depuis la session session = processor_instance.session_manager.get_session(session_id) if not session: return False current_title = session.last_window_info.get("title", "").strip().lower() if not current_title or current_title == "unknown": return False # Titre attendu depuis le node du workflow expected_title = "" if hasattr(workflow, "get_node"): node = workflow.get_node(expected_node_id) if node and hasattr(node, "template") and hasattr(node.template, "window"): window_spec = node.template.window if hasattr(window_spec, "title_contains") and window_spec.title_contains: expected_title = window_spec.title_contains.strip().lower() elif isinstance(workflow, dict): for n in workflow.get("nodes", []): if n.get("node_id") == expected_node_id: template = n.get("template", {}) window = template.get("window", {}) expected_title = (window.get("title_contains") or "").strip().lower() break if not expected_title: return False # Si le titre actuel ne contient plus le titre attendu, popup probable if expected_title not in current_title: logger.info( f"Popup détectée: titre actuel='{current_title}' " f"ne contient pas '{expected_title}'" ) return True except Exception as e: logger.debug(f"Détection popup échouée: {e}") return False # ========================================================================= # Replay — État et retry # ========================================================================= def _create_replay_state( replay_id: str, workflow_id: str, session_id: str, total_actions: int, params: Optional[Dict[str, Any]] = None, machine_id: Optional[str] = None, actions: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Créer un état de replay enrichi avec les champs de suivi d'erreur. Args: actions: Liste des actions du replay. Une copie slim (sans anchors base64) est stockée pour permettre à `/replay/result` de retrouver le `target_spec` de l'action courante — nécessaire pour l'apprentissage mémoire (Phase 1 plan Léa). """ # Copie slim des actions : on strip les anchor_image_base64 pour ne # pas gonfler la mémoire (anchors peuvent faire 50-200 KB chacun). # On conserve les champs utilisés par : # - la Phase 1 apprentissage (target_spec pour memory_record_success) # - le contrôle strict (success_strict, expected_window_*) # - les logs/audit (intention, action_id, type, coords) actions_slim: List[Dict[str, Any]] = [] if actions: for a in actions: a_copy = { "action_id": a.get("action_id"), "type": a.get("type"), "x_pct": a.get("x_pct"), "y_pct": a.get("y_pct"), # Contrôle strict des étapes (Dom, matin 10 avril 2026) "success_strict": a.get("success_strict", False), "expected_window_before": a.get("expected_window_before", ""), "expected_window_title": a.get("expected_window_title", ""), # Contexte métier utile pour logs et apprentissage "intention": a.get("intention", ""), } ts = a.get("target_spec") if isinstance(ts, dict): a_copy["target_spec"] = { k: v for k, v in ts.items() if k not in ("anchor_image_base64",) } actions_slim.append(a_copy) return { "replay_id": replay_id, "workflow_id": workflow_id, "session_id": session_id, "machine_id": machine_id or "default", # Machine cible du replay "status": "running", "total_actions": total_actions, "completed_actions": 0, "failed_actions": 0, "current_action_index": 0, "params": params or {}, "results": [], # Historique des résultats action par action "actions": actions_slim, # Copie slim pour lookup par index (Phase 1 mémoire) # Champs enrichis pour le suivi d'erreur (#7) "retried_actions": 0, "unverified_actions": 0, "error_log": [], # Liste des erreurs rencontrées "last_screenshot": None, # Path du dernier screenshot reçu "_last_screenshot_before": None, # Interne: screenshot avant la dernière action # Champs pour pause supervisée (target_not_found) "failed_action": None, # Contexte de l'action en echec (quand paused_need_help) "pause_message": None, # Message a afficher a l'utilisateur } def _schedule_retry( session_id: str, replay_state: Dict[str, Any], action: Dict[str, Any], current_retry: int, reason: str, replay_queues: Dict[str, List[Dict[str, Any]]], retry_pending: Dict[str, Dict[str, Any]], max_retries: int = 3, ): """ Programmer un retry pour une action échouée. Stratégie : - Retry 1 : réinjecter l'action directement (re-résolution visuelle par l'agent) - Retry 2 : injecter un wait de 2s avant l'action (possible loading en cours) - Retry 3 : dernier essai direct L'action est réinsérée en tête de la queue pour être la prochaine exécutée. Le lock de replay doit être acquis par l'appelant. """ next_retry = current_retry + 1 replay_state["retried_actions"] += 1 # Créer une copie de l'action avec un nouveau action_id pour le tracking retry_action = dict(action) retry_action_id = f"{action.get('action_id', 'unknown')}_retry{next_retry}" retry_action["action_id"] = retry_action_id # Stocker l'info de retry pour le prochain report_action_result retry_pending[retry_action_id] = { "action": action, "retry_count": next_retry, "replay_id": replay_state["replay_id"], "reason": reason, } # Stratégie de retry selon le numéro actions_to_insert = [] if next_retry == 2: # Retry 2 : injecter un wait de 2s avant l'action wait_action = { "action_id": f"wait_retry_{uuid.uuid4().hex[:6]}", "type": "wait", "duration_ms": 2000, } actions_to_insert.append(wait_action) actions_to_insert.append(retry_action) # Insérer en tête de la queue (prochaine action à exécuter) queue = replay_queues.get(session_id, []) replay_queues[session_id] = actions_to_insert + queue logger.info( f"Retry {next_retry}/{max_retries} programmé pour {action.get('action_id')} " f"(raison: {reason}) | nouveau id: {retry_action_id}" ) def _notify_error_callback( replay_state: Dict[str, Any], action_id: str, error: Optional[str], error_callbacks: Dict[str, str], ): """ Notifier le callback d'erreur si configuré pour ce replay. Appel HTTP POST non-bloquant vers l'URL de callback. En cas d'échec de notification, on log mais on ne bloque pas. """ replay_id = replay_state["replay_id"] callback_url = error_callbacks.get(replay_id) if not callback_url: return def _send_callback(): try: import urllib.request payload = json.dumps({ "replay_id": replay_id, "workflow_id": replay_state.get("workflow_id"), "session_id": replay_state.get("session_id"), "action_id": action_id, "error": error or "Erreur inconnue", "retried_actions": replay_state.get("retried_actions", 0), "error_log": replay_state.get("error_log", []), "status": replay_state.get("status"), }).encode("utf-8") req = urllib.request.Request( callback_url, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=5) as resp: logger.info( f"Error callback envoyé à {callback_url}: {resp.status}" ) except Exception as e: logger.warning( f"Échec envoi error callback à {callback_url}: {e}" ) # Envoyer en arrière-plan pour ne pas bloquer threading.Thread(target=_send_callback, daemon=True).start()