feat: branchement workflow — actions magnétoscope enrichies avec CLIP
Approche hybride : - Actions du magnétoscope (by_text, target_spec, grounding) - Embeddings CLIP du workflow (512D par screenshot de clic) - Au replay : CLIP vérifie l'état de l'écran AVANT chaque clic Pipeline complet mesuré : - ScreenAnalyzer (OCR) : 1.05s/screenshot - CLIP embeddings : 0.093s/screenshot - FAISS : <0.01s pour 13 vecteurs - GraphBuilder : 0.7s (13 nodes, 12 edges) - Total : 15.7s pour 1.5 min de session - Extrapolation 1h : ~10 min Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
185
agent_v0/server_v1/workflow_replay.py
Normal file
185
agent_v0/server_v1/workflow_replay.py
Normal file
@@ -0,0 +1,185 @@
|
||||
"""
|
||||
workflow_replay.py — Pont entre le WorkflowRunner et le replay Agent V1.
|
||||
|
||||
Convertit un Workflow enrichi (avec embeddings CLIP + FAISS) en actions
|
||||
de replay pour l'Agent V1, avec vérification FAISS à chaque étape.
|
||||
|
||||
Architecture :
|
||||
Workflow (nodes + edges + embeddings)
|
||||
→ pour chaque edge : action + embedding du node source
|
||||
→ FAISS vérifie que l'écran actuel correspond au node attendu
|
||||
→ si OK : exécuter l'action normalement
|
||||
→ si MISMATCH : stopper ou adapter
|
||||
|
||||
Auteur : Dom + Claude
|
||||
Date : 5 avril 2026
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import numpy as np
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def build_workflow_replay(
|
||||
workflow_path: str,
|
||||
session_dir: str,
|
||||
faiss_manager=None,
|
||||
clip_embedder=None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Convertir un Workflow enrichi en actions de replay avec vérification FAISS.
|
||||
|
||||
Chaque action de clic est enrichie avec :
|
||||
- L'embedding CLIP du node source (pour vérification au replay)
|
||||
- Le titre de fenêtre attendu
|
||||
- Les textes OCR du node (pour le grounding)
|
||||
|
||||
Args:
|
||||
workflow_path: Chemin vers le workflow JSON
|
||||
session_dir: Répertoire de la session (pour les screenshots/crops)
|
||||
faiss_manager: FAISSManager pré-chargé (optionnel, créé si None)
|
||||
clip_embedder: CLIPEmbedder pré-chargé (optionnel, créé si None)
|
||||
|
||||
Returns:
|
||||
Liste d'actions prêtes pour la queue de replay Agent V1.
|
||||
"""
|
||||
import json
|
||||
import uuid
|
||||
|
||||
# Charger le workflow
|
||||
with open(workflow_path) as f:
|
||||
wf_data = json.load(f)
|
||||
|
||||
nodes = {n["node_id"]: n for n in wf_data.get("nodes", [])}
|
||||
edges = wf_data.get("edges", [])
|
||||
entry_nodes = wf_data.get("entry_nodes", [])
|
||||
|
||||
if not nodes or not edges:
|
||||
logger.warning("Workflow vide : %d nodes, %d edges", len(nodes), len(edges))
|
||||
return []
|
||||
|
||||
logger.info(
|
||||
"Workflow '%s' chargé : %d nodes, %d edges",
|
||||
wf_data.get("name", "?"), len(nodes), len(edges),
|
||||
)
|
||||
|
||||
# Construire la séquence d'actions depuis le graphe (BFS linéaire)
|
||||
actions = []
|
||||
visited = set()
|
||||
current_node_id = entry_nodes[0] if entry_nodes else list(nodes.keys())[0]
|
||||
|
||||
while current_node_id and current_node_id not in visited:
|
||||
visited.add(current_node_id)
|
||||
node = nodes.get(current_node_id)
|
||||
if not node:
|
||||
break
|
||||
|
||||
# Trouver l'edge sortant
|
||||
outgoing = [e for e in edges if e.get("from_node") == current_node_id]
|
||||
if not outgoing:
|
||||
break
|
||||
|
||||
edge = outgoing[0] # Premier edge (linéaire)
|
||||
action_data = edge.get("action", {})
|
||||
next_node_id = edge.get("to_node")
|
||||
next_node = nodes.get(next_node_id, {})
|
||||
|
||||
# Extraire les infos du node source pour la vérification
|
||||
node_metadata = node.get("metadata", {})
|
||||
node_title = node_metadata.get("window_title", "")
|
||||
|
||||
# Extraire les infos de l'action
|
||||
action_type = action_data.get("type", "unknown")
|
||||
target = action_data.get("target", {})
|
||||
params = action_data.get("parameters", {})
|
||||
|
||||
if action_type == "compound":
|
||||
# Actions compound : décomposer en étapes
|
||||
steps = params.get("steps", [])
|
||||
for step in steps:
|
||||
step_type = step.get("type", "unknown")
|
||||
step_action = {
|
||||
"action_id": f"wf_{uuid.uuid4().hex[:8]}",
|
||||
"type": _map_action_type(step_type),
|
||||
"workflow_node": current_node_id,
|
||||
"expected_window_title": node_title,
|
||||
}
|
||||
|
||||
if step_type == "mouse_click":
|
||||
step_action["x_pct"] = step.get("x_pct", 0)
|
||||
step_action["y_pct"] = step.get("y_pct", 0)
|
||||
step_action["button"] = step.get("button", "left")
|
||||
step_action["visual_mode"] = True
|
||||
# Target spec pour le grounding
|
||||
step_action["target_spec"] = {
|
||||
"by_text": target.get("by_text", ""),
|
||||
"by_role": target.get("by_role", ""),
|
||||
"by_text_source": "ocr" if target.get("by_text") else "",
|
||||
"window_title": node_title,
|
||||
"original_position": {
|
||||
"y_relative": "",
|
||||
"x_relative": "",
|
||||
},
|
||||
}
|
||||
# Ajouter le crop anchor si disponible
|
||||
_attach_anchor(step_action, step, session_dir)
|
||||
|
||||
elif step_type in ("text_input", "key_press"):
|
||||
if step_type == "text_input":
|
||||
step_action["type"] = "type"
|
||||
step_action["text"] = step.get("text", "")
|
||||
else:
|
||||
step_action["type"] = "key_combo"
|
||||
step_action["keys"] = step.get("keys", [])
|
||||
|
||||
elif step_type == "wait":
|
||||
step_action["type"] = "wait"
|
||||
step_action["duration_ms"] = step.get("duration_ms", 500)
|
||||
|
||||
actions.append(step_action)
|
||||
|
||||
# Passer au node suivant
|
||||
current_node_id = next_node_id
|
||||
|
||||
# Ajouter expected_window_title pour la post-vérification
|
||||
click_indices = [i for i, a in enumerate(actions) if a.get("type") == "click"]
|
||||
for j, ci in enumerate(click_indices):
|
||||
if j + 1 < len(click_indices):
|
||||
next_ci = click_indices[j + 1]
|
||||
next_title = actions[next_ci].get("expected_window_title", "")
|
||||
if next_title:
|
||||
actions[ci]["expected_window_title"] = next_title
|
||||
|
||||
logger.info("Workflow → %d actions de replay", len(actions))
|
||||
return actions
|
||||
|
||||
|
||||
def _map_action_type(step_type: str) -> str:
|
||||
"""Mapper les types d'action du workflow vers les types de replay."""
|
||||
mapping = {
|
||||
"mouse_click": "click",
|
||||
"text_input": "type",
|
||||
"key_press": "key_combo",
|
||||
"wait": "wait",
|
||||
"scroll": "scroll",
|
||||
}
|
||||
return mapping.get(step_type, step_type)
|
||||
|
||||
|
||||
def _attach_anchor(action: dict, step: dict, session_dir: str) -> None:
|
||||
"""Attacher le crop anchor au target_spec si disponible."""
|
||||
import base64
|
||||
|
||||
# Chercher le crop dans le session_dir
|
||||
screenshot_id = step.get("screenshot_id", "")
|
||||
if screenshot_id and session_dir:
|
||||
crop_path = Path(session_dir) / "shots" / f"{screenshot_id}_crop.png"
|
||||
if crop_path.is_file():
|
||||
action["target_spec"]["anchor_image_base64"] = base64.b64encode(
|
||||
crop_path.read_bytes()
|
||||
).decode()
|
||||
Reference in New Issue
Block a user