diff --git a/agent_v0/server_v1/workflow_replay.py b/agent_v0/server_v1/workflow_replay.py new file mode 100644 index 000000000..eea261117 --- /dev/null +++ b/agent_v0/server_v1/workflow_replay.py @@ -0,0 +1,185 @@ +""" +workflow_replay.py — Pont entre le WorkflowRunner et le replay Agent V1. + +Convertit un Workflow enrichi (avec embeddings CLIP + FAISS) en actions +de replay pour l'Agent V1, avec vérification FAISS à chaque étape. + +Architecture : + Workflow (nodes + edges + embeddings) + → pour chaque edge : action + embedding du node source + → FAISS vérifie que l'écran actuel correspond au node attendu + → si OK : exécuter l'action normalement + → si MISMATCH : stopper ou adapter + +Auteur : Dom + Claude +Date : 5 avril 2026 +""" + +import logging +import os +import time +from pathlib import Path +from typing import Any, Dict, List, Optional + +import numpy as np + +logger = logging.getLogger(__name__) + + +def build_workflow_replay( + workflow_path: str, + session_dir: str, + faiss_manager=None, + clip_embedder=None, +) -> List[Dict[str, Any]]: + """Convertir un Workflow enrichi en actions de replay avec vérification FAISS. + + Chaque action de clic est enrichie avec : + - L'embedding CLIP du node source (pour vérification au replay) + - Le titre de fenêtre attendu + - Les textes OCR du node (pour le grounding) + + Args: + workflow_path: Chemin vers le workflow JSON + session_dir: Répertoire de la session (pour les screenshots/crops) + faiss_manager: FAISSManager pré-chargé (optionnel, créé si None) + clip_embedder: CLIPEmbedder pré-chargé (optionnel, créé si None) + + Returns: + Liste d'actions prêtes pour la queue de replay Agent V1. + """ + import json + import uuid + + # Charger le workflow + with open(workflow_path) as f: + wf_data = json.load(f) + + nodes = {n["node_id"]: n for n in wf_data.get("nodes", [])} + edges = wf_data.get("edges", []) + entry_nodes = wf_data.get("entry_nodes", []) + + if not nodes or not edges: + logger.warning("Workflow vide : %d nodes, %d edges", len(nodes), len(edges)) + return [] + + logger.info( + "Workflow '%s' chargé : %d nodes, %d edges", + wf_data.get("name", "?"), len(nodes), len(edges), + ) + + # Construire la séquence d'actions depuis le graphe (BFS linéaire) + actions = [] + visited = set() + current_node_id = entry_nodes[0] if entry_nodes else list(nodes.keys())[0] + + while current_node_id and current_node_id not in visited: + visited.add(current_node_id) + node = nodes.get(current_node_id) + if not node: + break + + # Trouver l'edge sortant + outgoing = [e for e in edges if e.get("from_node") == current_node_id] + if not outgoing: + break + + edge = outgoing[0] # Premier edge (linéaire) + action_data = edge.get("action", {}) + next_node_id = edge.get("to_node") + next_node = nodes.get(next_node_id, {}) + + # Extraire les infos du node source pour la vérification + node_metadata = node.get("metadata", {}) + node_title = node_metadata.get("window_title", "") + + # Extraire les infos de l'action + action_type = action_data.get("type", "unknown") + target = action_data.get("target", {}) + params = action_data.get("parameters", {}) + + if action_type == "compound": + # Actions compound : décomposer en étapes + steps = params.get("steps", []) + for step in steps: + step_type = step.get("type", "unknown") + step_action = { + "action_id": f"wf_{uuid.uuid4().hex[:8]}", + "type": _map_action_type(step_type), + "workflow_node": current_node_id, + "expected_window_title": node_title, + } + + if step_type == "mouse_click": + step_action["x_pct"] = step.get("x_pct", 0) + step_action["y_pct"] = step.get("y_pct", 0) + step_action["button"] = step.get("button", "left") + step_action["visual_mode"] = True + # Target spec pour le grounding + step_action["target_spec"] = { + "by_text": target.get("by_text", ""), + "by_role": target.get("by_role", ""), + "by_text_source": "ocr" if target.get("by_text") else "", + "window_title": node_title, + "original_position": { + "y_relative": "", + "x_relative": "", + }, + } + # Ajouter le crop anchor si disponible + _attach_anchor(step_action, step, session_dir) + + elif step_type in ("text_input", "key_press"): + if step_type == "text_input": + step_action["type"] = "type" + step_action["text"] = step.get("text", "") + else: + step_action["type"] = "key_combo" + step_action["keys"] = step.get("keys", []) + + elif step_type == "wait": + step_action["type"] = "wait" + step_action["duration_ms"] = step.get("duration_ms", 500) + + actions.append(step_action) + + # Passer au node suivant + current_node_id = next_node_id + + # Ajouter expected_window_title pour la post-vérification + click_indices = [i for i, a in enumerate(actions) if a.get("type") == "click"] + for j, ci in enumerate(click_indices): + if j + 1 < len(click_indices): + next_ci = click_indices[j + 1] + next_title = actions[next_ci].get("expected_window_title", "") + if next_title: + actions[ci]["expected_window_title"] = next_title + + logger.info("Workflow → %d actions de replay", len(actions)) + return actions + + +def _map_action_type(step_type: str) -> str: + """Mapper les types d'action du workflow vers les types de replay.""" + mapping = { + "mouse_click": "click", + "text_input": "type", + "key_press": "key_combo", + "wait": "wait", + "scroll": "scroll", + } + return mapping.get(step_type, step_type) + + +def _attach_anchor(action: dict, step: dict, session_dir: str) -> None: + """Attacher le crop anchor au target_spec si disponible.""" + import base64 + + # Chercher le crop dans le session_dir + screenshot_id = step.get("screenshot_id", "") + if screenshot_id and session_dir: + crop_path = Path(session_dir) / "shots" / f"{screenshot_id}_crop.png" + if crop_path.is_file(): + action["target_spec"]["anchor_image_base64"] = base64.b64encode( + crop_path.read_bytes() + ).decode()