feat: unification VWB ↔ Léa — import/export bidirectionnel

- Workflows appris par Léa visibles dans le VWB ("Appris par Léa")
- Bouton "Importer" pour éditer un workflow appris
- Bouton "Exporter pour Léa" pour rendre un workflow VWB exécutable
- Conversion bidirectionnelle core ↔ VWB via learned_workflow_bridge
- Liste unifiée dans le chat Léa (merged + dédupliquée)
- reload_workflows() sur le streaming server (pas de redémarrage)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-18 22:41:34 +01:00
parent aa39af327f
commit 5973058f08
10 changed files with 1407 additions and 6 deletions

View File

@@ -0,0 +1,599 @@
"""
Pont entre les workflows appris par Léa (core Workflow JSON) et les workflows VWB (SQLite).
Deux directions :
1. Import : core Workflow → VWB Steps (pour review/édition humaine)
2. Export : VWB Steps → core Workflow JSON (pour exécution par Léa/streaming server)
Le format unifié est le VWB SQLite ; les workflows core JSON servent de format d'échange
avec le streaming server.
Auteur : Dom, Claude — 18 mars 2026
"""
import json
import logging
import os
import sys
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# Chemin racine du projet pour les imports core
_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent)
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
# ---------------------------------------------------------------------------
# Mapping action core → VWB
# ---------------------------------------------------------------------------
# Action types dans les edges du core Workflow → action_type VWB
CORE_ACTION_TO_VWB = {
"mouse_click": "click_anchor",
"text_input": "type_text",
"key_press": "keyboard_shortcut",
"compound": "click_anchor", # Sera décomposé en sous-étapes
"wait": "wait_for_anchor",
"scroll": "scroll_to_anchor",
"unknown": "click_anchor",
}
# action_type VWB → action type core
VWB_ACTION_TO_CORE = {
"click_anchor": "mouse_click",
"double_click_anchor": "mouse_click",
"right_click_anchor": "mouse_click",
"type_text": "text_input",
"type_secret": "text_input",
"keyboard_shortcut": "key_press",
"hotkey": "key_press",
"wait_for_anchor": "wait",
"scroll_to_anchor": "scroll",
"visual_condition": "evaluate_condition",
"screenshot_evidence": "screenshot",
"extract_text": "extract_data",
}
# ---------------------------------------------------------------------------
# Import : core Workflow JSON → VWB steps data
# ---------------------------------------------------------------------------
def convert_learned_to_vwb_steps(
workflow_dict: Dict[str, Any],
) -> Tuple[Dict[str, Any], List[Dict[str, Any]], List[str]]:
"""
Convertit un workflow appris (format core JSON) en données VWB.
Le workflow core stocke les actions dans les EDGES (transition entre nœuds),
tandis que le VWB stocke les actions dans les STEPS (séquence linéaire).
On parcourt le graphe depuis les entry_nodes en suivant les edges,
et on crée un Step VWB par edge.
Args:
workflow_dict: Le dict JSON du workflow core (tel que sauvegardé sur disque)
Returns:
Tuple (workflow_meta, steps_list, warnings)
- workflow_meta: dict avec name, description, tags, source
- steps_list: liste de dicts compatibles VWB Step
- warnings: liste de messages d'avertissement
"""
warnings = []
# Extraire les métadonnées
workflow_meta = {
"name": workflow_dict.get("name", "Workflow importé"),
"description": workflow_dict.get("description", ""),
"tags": (workflow_dict.get("metadata") or {}).get("tags", []),
"source": "learned_import",
"core_workflow_id": workflow_dict.get("workflow_id", ""),
"learning_state": workflow_dict.get("learning_state", "OBSERVATION"),
}
nodes = workflow_dict.get("nodes", [])
edges = workflow_dict.get("edges", [])
entry_nodes = workflow_dict.get("entry_nodes", [])
if not edges and not nodes:
warnings.append("Le workflow ne contient ni nœuds ni edges")
return workflow_meta, [], warnings
# Index des edges sortants par node
outgoing = {}
for edge in edges:
from_node = edge.get("from_node") or edge.get("source_node", "")
outgoing.setdefault(from_node, []).append(edge)
# Index des nodes par ID
nodes_by_id = {n["node_id"]: n for n in nodes}
# Parcours linéaire du graphe (BFS)
visited = set()
queue = list(entry_nodes) if entry_nodes else []
if not queue and nodes:
queue = [nodes[0]["node_id"]]
ordered_edges = []
while queue:
node_id = queue.pop(0)
if node_id in visited:
continue
visited.add(node_id)
for edge in outgoing.get(node_id, []):
ordered_edges.append(edge)
to_node = edge.get("to_node") or edge.get("target_node", "")
if to_node and to_node not in visited:
queue.append(to_node)
# Convertir chaque edge en Step VWB
steps = []
for idx, edge in enumerate(ordered_edges):
action = edge.get("action", {})
action_type = action.get("type", "unknown")
action_params = action.get("parameters", {})
target = action.get("target", {})
# Déterminer le type VWB
vwb_action_type = CORE_ACTION_TO_VWB.get(action_type, "click_anchor")
# Construire les paramètres VWB
vwb_params = {}
if action_type == "mouse_click":
# Extraire la position en pourcentage si disponible
by_position = target.get("by_position")
if by_position:
vwb_params["x_pct"] = by_position[0] if isinstance(by_position, list) else 0
vwb_params["y_pct"] = by_position[1] if isinstance(by_position, list) else 0
button = action_params.get("button", "left")
if button == "double":
vwb_action_type = "double_click_anchor"
elif button == "right":
vwb_action_type = "right_click_anchor"
elif action_type == "text_input":
vwb_params["text"] = action_params.get("text", "")
elif action_type == "key_press":
keys = action_params.get("keys", [])
if not keys and action_params.get("key"):
keys = [action_params["key"]]
vwb_params["keys"] = keys
elif action_type == "compound":
# Stocker les sous-étapes dans les paramètres pour référence
vwb_params["compound_steps"] = action_params.get("steps", [])
warnings.append(
f"Étape {idx + 1} : action compound décomposée — vérifier manuellement"
)
# Ajouter des infos de ciblage pour la review humaine
if target.get("by_role"):
vwb_params["target_role"] = target["by_role"]
if target.get("by_text"):
vwb_params["target_text"] = target["by_text"]
# Construire le label
from_node = edge.get("from_node", "")
to_node = edge.get("to_node") or edge.get("target_node", "")
from_name = nodes_by_id.get(from_node, {}).get("name", from_node)
to_name = nodes_by_id.get(to_node, {}).get("name", to_node)
label = _build_step_label(vwb_action_type, vwb_params, from_name, to_name)
step = {
"action_type": vwb_action_type,
"order": idx,
"position_x": 400,
"position_y": 80 + idx * 120,
"parameters": vwb_params,
"label": label,
# Métadonnées d'origine pour traçabilité
"metadata": {
"core_edge_id": edge.get("edge_id", ""),
"core_from_node": from_node,
"core_to_node": to_node,
},
}
steps.append(step)
if not steps and nodes:
# Pas d'edges mais des nodes → créer des étapes basiques depuis les nodes
warnings.append("Aucun edge trouvé — création d'étapes depuis les nœuds")
for idx, node in enumerate(nodes):
node_name = node.get("name", node.get("node_id", f"node_{idx}"))
steps.append({
"action_type": "click_anchor",
"order": idx,
"position_x": 400,
"position_y": 80 + idx * 120,
"parameters": {
"window_title": (node.get("template", {}).get("window", {}) or {}).get("title_pattern", ""),
},
"label": f"Écran : {node_name}",
"metadata": {"core_node_id": node.get("node_id", "")},
})
return workflow_meta, steps, warnings
def _build_step_label(
action_type: str, params: Dict[str, Any], from_name: str, to_name: str
) -> str:
"""Construire un label lisible pour un step VWB."""
if action_type == "type_text":
text = params.get("text", "")
if text:
return f"Saisir : {text[:40]}{'...' if len(text) > 40 else ''}"
return f"Saisir du texte ({from_name}{to_name})"
if action_type == "keyboard_shortcut":
keys = params.get("keys", [])
if keys:
return f"Raccourci : {'+'.join(keys)}"
return f"Raccourci clavier ({from_name}{to_name})"
if action_type in ("click_anchor", "double_click_anchor", "right_click_anchor"):
role = params.get("target_role", "")
text = params.get("target_text", "")
hint = role or text or ""
prefix = {
"click_anchor": "Clic",
"double_click_anchor": "Double-clic",
"right_click_anchor": "Clic droit",
}.get(action_type, "Clic")
if hint:
return f"{prefix} : {hint}"
return f"{prefix} ({from_name}{to_name})"
if action_type == "wait_for_anchor":
return f"Attente ({from_name}{to_name})"
return f"{action_type} ({from_name}{to_name})"
# ---------------------------------------------------------------------------
# Export : VWB Steps → core Workflow JSON
# ---------------------------------------------------------------------------
def convert_vwb_to_core_workflow(
workflow_data: Dict[str, Any],
steps_data: List[Dict[str, Any]],
) -> Dict[str, Any]:
"""
Convertit un workflow VWB (méta + steps) en format core Workflow JSON.
Le résultat peut être sauvegardé dans data/training/workflows/ pour
être chargé par le streaming server.
Args:
workflow_data: dict du workflow VWB (id, name, description, tags)
steps_data: liste de dicts des steps VWB (action_type, parameters, etc.)
Returns:
Dict au format core Workflow (compatible Workflow.from_dict / save_to_file)
"""
now = datetime.now().isoformat()
wf_id = workflow_data.get("id", f"wf_{uuid.uuid4().hex[:12]}")
# Créer les nodes : un node par étape (chaque étape = un état écran)
nodes = []
edges = []
for idx, step in enumerate(steps_data):
node_id = f"node_{idx:03d}"
action_type = step.get("action_type", "click_anchor")
params = step.get("parameters", {})
label = step.get("label", action_type)
# Créer le node (template minimal)
node = {
"node_id": node_id,
"name": label,
"description": f"Étape {idx + 1} : {label}",
"template": {
"window": {
"title_pattern": params.get("window_title"),
"title_contains": params.get("window_title"),
"process_name": None,
},
"text": {
"required_texts": params.get("text_patterns", []),
"forbidden_texts": [],
},
"ui": {
"required_roles": [],
"required_types": [],
"min_element_count": 0,
},
"embedding": {
"provider": "none",
"vector_id": "",
"min_cosine_similarity": 0.85,
"sample_count": 0,
},
},
"is_entry": idx == 0,
"is_end": idx == len(steps_data) - 1,
"variants": [],
"primary_variant_id": None,
"max_variants": 5,
"quality_score": 0.0,
"cluster_metrics": {},
"spatial_relations": [],
"container_type": None,
"metadata": {
"vwb_step_id": step.get("id", ""),
"visual_type": _action_type_to_visual(action_type),
},
}
nodes.append(node)
# Créer l'edge vers le node suivant (sauf pour le dernier)
if idx < len(steps_data) - 1:
next_node_id = f"node_{idx + 1:03d}"
# Convertir l'action VWB → action core
core_action_type = VWB_ACTION_TO_CORE.get(action_type, "mouse_click")
core_params = _vwb_params_to_core(action_type, params)
target_spec = _vwb_params_to_target_spec(action_type, params)
edge = {
"edge_id": f"{node_id}_to_{next_node_id}",
"from_node": node_id,
"to_node": next_node_id,
"action": {
"type": core_action_type,
"target": target_spec,
"parameters": core_params,
},
"constraints": {
"pre_conditions": {},
"required_confidence": 0.8,
"max_wait_time_ms": 5000,
},
"post_conditions": {
"success_mode": "all",
"timeout_ms": 3000,
"poll_ms": 200,
"success": [],
"fail_fast": [],
"retries": 2,
"backoff_ms": 150,
"expected_node": next_node_id,
"window_change_expected": False,
"new_ui_elements_expected": [],
},
"stats": {
"execution_count": 0,
"success_count": 0,
"failure_count": 0,
"avg_duration_ms": 0.0,
},
"metadata": {
"created_from_vwb": True,
"vwb_workflow_id": wf_id,
},
}
edges.append(edge)
# Construire le workflow complet
core_workflow = {
"workflow_id": wf_id,
"name": workflow_data.get("name", "Workflow exporté"),
"description": workflow_data.get("description", ""),
"version": 1,
"learning_state": "COACHING", # Exporté depuis VWB = validé par l'humain
"created_at": workflow_data.get("created_at", now),
"updated_at": now,
"entry_nodes": [nodes[0]["node_id"]] if nodes else [],
"end_nodes": [nodes[-1]["node_id"]] if nodes else [],
"nodes": nodes,
"edges": edges,
"safety_rules": {
"require_confirmation_for": [],
"forbidden_windows": [],
"execution_timeout_minutes": 0,
},
"stats": {
"total_executions": 0,
"success_count": 0,
"failure_count": 0,
"observed_runs": 0,
"assist_runs": 0,
"auto_candidate_runs": 0,
"auto_confirmed_runs": 0,
},
"learning": {
"observation_count": 0,
"confidence_threshold": 0.8,
"promotion_rules": {},
},
"metadata": {
"exported_from_vwb": True,
"vwb_workflow_id": wf_id,
"tags": workflow_data.get("tags", []),
},
"loops": {},
"conditionals": {},
"references": [],
}
return core_workflow
def _action_type_to_visual(action_type: str) -> str:
"""Convertit un action_type VWB en type visuel."""
mapping = {
"click_anchor": "click",
"double_click_anchor": "click",
"right_click_anchor": "click",
"type_text": "type",
"type_secret": "type",
"keyboard_shortcut": "validate",
"hotkey": "validate",
"wait_for_anchor": "wait",
"scroll_to_anchor": "scroll",
"visual_condition": "condition",
"screenshot_evidence": "screenshot",
}
return mapping.get(action_type, "click")
def _vwb_params_to_core(action_type: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Convertit les paramètres VWB en paramètres core."""
core_params = {}
if action_type in ("type_text", "type_secret"):
core_params["text"] = params.get("text", "")
elif action_type in ("keyboard_shortcut", "hotkey"):
core_params["keys"] = params.get("keys", [])
elif action_type in ("click_anchor", "double_click_anchor", "right_click_anchor"):
button = "left"
if action_type == "double_click_anchor":
button = "double"
elif action_type == "right_click_anchor":
button = "right"
core_params["button"] = button
elif action_type == "wait_for_anchor":
core_params["duration_ms"] = params.get("duration_ms", 2000)
return core_params
def _vwb_params_to_target_spec(action_type: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Construit un TargetSpec core depuis les paramètres VWB."""
target = {
"by_role": params.get("target_role", "unknown_element"),
"by_text": params.get("target_text"),
"by_position": None,
"selection_policy": "first",
"fallback_strategy": "visual_similarity",
}
# Injecter la position en pourcentage si disponible
x_pct = params.get("x_pct")
y_pct = params.get("y_pct")
if x_pct is not None and y_pct is not None:
target["by_position"] = [x_pct, y_pct]
return target
# ---------------------------------------------------------------------------
# Utilitaires fichiers
# ---------------------------------------------------------------------------
def save_core_workflow_to_disk(
workflow_dict: Dict[str, Any],
machine_id: str = "vwb_export",
) -> Path:
"""
Sauvegarde un workflow core sur disque dans le format attendu par le streaming server.
Emplacement : data/training/workflows/{machine_id}/{workflow_id}.json
Returns:
Path du fichier sauvegardé
"""
data_dir = Path(_ROOT) / "data" / "training" / "workflows" / machine_id
data_dir.mkdir(parents=True, exist_ok=True)
wf_id = workflow_dict.get("workflow_id", f"wf_{uuid.uuid4().hex[:12]}")
filepath = data_dir / f"{wf_id}.json"
with open(filepath, "w", encoding="utf-8") as f:
json.dump(workflow_dict, f, indent=2, ensure_ascii=False)
logger.info("Workflow core sauvegardé : %s", filepath)
return filepath
def load_learned_workflow(workflow_id: str) -> Optional[Dict[str, Any]]:
"""
Charge un workflow appris depuis le disque.
Cherche dans data/training/workflows/ (racine et sous-dossiers machine).
Returns:
Dict JSON du workflow, ou None si non trouvé
"""
base_dir = Path(_ROOT) / "data" / "training" / "workflows"
if not base_dir.exists():
return None
# Chercher dans la racine
direct = base_dir / f"{workflow_id}.json"
if direct.exists():
return _load_json(direct)
# Chercher dans les sous-dossiers machine
for machine_dir in base_dir.iterdir():
if machine_dir.is_dir():
candidate = machine_dir / f"{workflow_id}.json"
if candidate.exists():
return _load_json(candidate)
return None
def list_learned_workflows_from_disk() -> List[Dict[str, Any]]:
"""
Liste tous les workflows appris disponibles sur disque.
Retourne une liste de dicts avec les métadonnées de base.
"""
base_dir = Path(_ROOT) / "data" / "training" / "workflows"
if not base_dir.exists():
return []
workflows = []
def _scan_dir(d: Path, machine_id: str = "default"):
for f in sorted(d.glob("*.json")):
try:
data = _load_json(f)
if data:
workflows.append({
"workflow_id": data.get("workflow_id", f.stem),
"name": data.get("name", f.stem),
"description": data.get("description", ""),
"machine_id": machine_id,
"nodes": len(data.get("nodes", [])),
"edges": len(data.get("edges", [])),
"learning_state": data.get("learning_state", "OBSERVATION"),
"created_at": data.get("created_at"),
"source": "learned",
})
except Exception as e:
logger.warning("Erreur lecture %s : %s", f, e)
# Racine
_scan_dir(base_dir)
# Sous-dossiers machine
for machine_dir in sorted(base_dir.iterdir()):
if machine_dir.is_dir():
_scan_dir(machine_dir, machine_id=machine_dir.name)
return workflows
def _load_json(filepath: Path) -> Optional[Dict[str, Any]]:
"""Charge un fichier JSON."""
try:
with open(filepath, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.error("Erreur lecture JSON %s : %s", filepath, e)
return None