diff --git a/agent_chat/app.py b/agent_chat/app.py index 47952e4e6..7a1ffbe84 100644 --- a/agent_chat/app.py +++ b/agent_chat/app.py @@ -370,15 +370,22 @@ def api_status(): @app.route('/api/workflows') def api_workflows(): - """Liste des workflows (tous répertoires confondus). + """Liste unifiée des workflows (appris + VWB). - Enrichit les workflows avec le machine_id source quand disponible - (attribut _machine_id ajouté par le StreamProcessor). + Sources fusionnées : + 1. Workflows appris (SemanticMatcher — data/training/workflows/) + 2. Workflows VWB (port 5002 — SQLite, édités par l'humain) + + Dédupliqués par nom : si un workflow appris a été importé dans le VWB, + seule la version VWB est retournée (c'est la version validée/corrigée). """ if not matcher: return jsonify({"workflows": [], "directories": []}) + seen_ids = set() workflows = [] + + # Source 1 : workflows appris (core JSON) for wf in matcher.get_all_workflows(): wf_data = { "id": wf.workflow_id, @@ -386,12 +393,31 @@ def api_workflows(): "description": wf.description, "tags": wf.tags, "source": wf.source_dir, + "origin": "learned", } # Ajouter le machine_id source si disponible (workflows appris en streaming) machine_id = getattr(wf, '_machine_id', None) if machine_id: wf_data["machine_id"] = machine_id workflows.append(wf_data) + seen_ids.add(wf.workflow_id) + + # Source 2 : workflows VWB (édités par l'humain) + vwb_workflows = _fetch_vwb_workflows() + for vwb_wf in vwb_workflows: + vwb_id = vwb_wf.get("id", "") + vwb_wf["origin"] = "vwb" + # Si un workflow VWB a été importé depuis un appris, marquer le doublon + desc = vwb_wf.get("description", "") or "" + # Détecter les doublons par nom similaire ou description contenant l'ID core + is_duplicate = False + for core_id in seen_ids: + if core_id in desc: + is_duplicate = True + break + if vwb_id not in seen_ids and not is_duplicate: + workflows.append(vwb_wf) + seen_ids.add(vwb_id) # Récupérer la liste des machines connectées depuis le streaming server machines = _fetch_connected_machines() @@ -403,6 +429,35 @@ def api_workflows(): }) +def _fetch_vwb_workflows(): + """Récupère les workflows depuis le VWB backend (port 5002).""" + try: + resp = http_requests.get( + "http://localhost:5002/api/v3/session/state", + timeout=3, + ) + if resp.ok: + data = resp.json() + wf_list = data.get("workflows_list", []) + result = [] + for wf in wf_list: + result.append({ + "id": wf.get("id", ""), + "name": wf.get("name", ""), + "description": wf.get("description", ""), + "tags": wf.get("tags", []), + "source": "vwb", + "step_count": wf.get("step_count", 0), + "review_status": wf.get("review_status"), + }) + return result + except http_requests.ConnectionError: + logger.debug("VWB backend (port 5002) indisponible") + except Exception as e: + logger.warning("Erreur récupération workflows VWB: %s", e) + return [] + + @app.route('/api/workflows/refresh', methods=['POST']) def api_workflows_refresh(): """ diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 2d794cb61..487490c27 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -678,6 +678,33 @@ async def list_workflows(machine_id: Optional[str] = None): return result +@app.post("/api/v1/traces/stream/reload-workflows") +async def reload_workflows(): + """Recharger les workflows depuis le disque. + + Appelé par le VWB après un export-for-lea pour que le streaming server + voie immédiatement les nouveaux workflows sans redémarrage. + """ + count = processor.reload_workflows() + return {"success": True, "workflows_count": count} + + +@app.get("/api/v1/traces/stream/workflow/{workflow_id}") +async def get_workflow_detail(workflow_id: str): + """Retourne le détail complet d'un workflow (format core JSON). + + Utilisé par le VWB pour importer un workflow appris qui n'est pas + encore sur disque (seulement en mémoire dans le streaming server). + """ + with processor._data_lock: + wf = processor._workflows.get(workflow_id) + + if not wf: + raise HTTPException(status_code=404, detail=f"Workflow '{workflow_id}' non trouvé") + + return wf.to_dict() + + @app.get("/api/v1/traces/stream/session/{session_id}") async def get_session(session_id: str): """État d'une session.""" diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index 892a01291..efec5c259 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -950,6 +950,20 @@ class StreamProcessor: }) return result + def reload_workflows(self) -> int: + """Recharger les workflows depuis le disque. + + Utile après qu'un nouveau workflow a été exporté depuis le VWB + ou appris par le streaming. Retourne le nombre de workflows chargés. + """ + with self._data_lock: + self._workflows.clear() + self._load_persisted_workflows() + with self._data_lock: + count = len(self._workflows) + logger.info("Workflows rechargés depuis le disque : %d", count) + return count + @property def stats(self) -> Dict[str, Any]: """Statistiques du processeur.""" diff --git a/visual_workflow_builder/backend/api_v3/__init__.py b/visual_workflow_builder/backend/api_v3/__init__.py index 6f7f8b6db..d1148d301 100644 --- a/visual_workflow_builder/backend/api_v3/__init__.py +++ b/visual_workflow_builder/backend/api_v3/__init__.py @@ -23,4 +23,10 @@ try: except ImportError as e: print(f"⚠️ Module dag_execute désactivé: {e}") +# Pont workflows appris (Léa) <-> VWB +try: + from . import learned_workflows # noqa: F401 +except ImportError as e: + print(f"⚠️ Module learned_workflows désactivé: {e}") + __all__ = ['api_v3_bp'] diff --git a/visual_workflow_builder/backend/api_v3/learned_workflows.py b/visual_workflow_builder/backend/api_v3/learned_workflows.py new file mode 100644 index 000000000..c1d48a9bf --- /dev/null +++ b/visual_workflow_builder/backend/api_v3/learned_workflows.py @@ -0,0 +1,448 @@ +""" +API v3 - Pont workflows appris (Léa) <-> VWB + +Endpoints : + GET /api/v3/learned-workflows → liste les workflows appris (disque + streaming server) + POST /api/v3/learned-workflows//import → importe un workflow appris dans le VWB + POST /api/v3/workflow//export-for-lea → exporte un workflow VWB pour Léa + +Le but : UN seul format de workflow, UN stockage (SQLite VWB), UN système de replay. +Les workflows appris arrivent via import, l'humain les corrige dans le VWB, +puis l'exécution utilise le même pipeline (execute-windows → streaming server). + +Auteur : Dom, Claude — 18 mars 2026 +""" + +import json +import logging +import sys +import traceback +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, Optional + +import requests as http_requests + +from flask import jsonify, request + +from . import api_v3_bp +from .workflow import generate_id +from db.models import db, Workflow, Step + +logger = logging.getLogger(__name__) + +# Chemin racine pour les imports core +_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +# URL du streaming server +STREAMING_SERVER_URL = "http://localhost:5005" + + +# --------------------------------------------------------------------------- +# GET /api/v3/learned-workflows +# --------------------------------------------------------------------------- + +@api_v3_bp.route('/learned-workflows', methods=['GET']) +def list_learned_workflows(): + """ + Liste les workflows appris par Léa, depuis deux sources : + 1. Fichiers JSON sur disque (data/training/workflows/) + 2. Streaming server (port 5005) si disponible + + Indique pour chaque workflow s'il a déjà été importé dans le VWB. + + Query params: + machine_id: Filtrer par machine (optionnel) + + Response: + { + "success": true, + "workflows": [ + { + "workflow_id": "...", + "name": "...", + "nodes": 5, + "edges": 4, + "machine_id": "windows_pc_01", + "learning_state": "OBSERVATION", + "source": "learned", + "already_imported": false, + "vwb_workflow_id": null + } + ], + "streaming_server_available": true + } + """ + machine_id = request.args.get('machine_id') + + from services.learned_workflow_bridge import list_learned_workflows_from_disk + + # Source 1 : fichiers sur disque + disk_workflows = list_learned_workflows_from_disk() + + # Source 2 : streaming server (plus à jour, inclut les workflows en mémoire) + streaming_workflows = [] + streaming_available = False + + try: + params = {} + if machine_id: + params["machine_id"] = machine_id + + resp = http_requests.get( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows", + params=params, + timeout=3, + ) + if resp.ok: + streaming_available = True + streaming_data = resp.json().get("workflows", []) + for sw in streaming_data: + streaming_workflows.append({ + "workflow_id": sw.get("workflow_id", ""), + "name": sw.get("name", sw.get("workflow_id", "")), + "description": "", + "machine_id": sw.get("machine_id", "default"), + "nodes": sw.get("nodes", 0), + "edges": sw.get("edges", 0), + "learning_state": "OBSERVATION", + "source": "streaming", + }) + except http_requests.ConnectionError: + logger.debug("Streaming server indisponible (port 5005)") + except Exception as e: + logger.warning("Erreur streaming server : %s", e) + + # Fusionner : streaming a priorité (plus à jour), ajouter ceux qui sont seulement sur disque + seen_ids = set() + merged = [] + + for wf in streaming_workflows: + seen_ids.add(wf["workflow_id"]) + merged.append(wf) + + for wf in disk_workflows: + if wf["workflow_id"] not in seen_ids: + merged.append(wf) + seen_ids.add(wf["workflow_id"]) + + # Filtrer par machine si demandé + if machine_id: + merged = [w for w in merged if w.get("machine_id") == machine_id] + + # Enrichir : vérifier si déjà importé dans le VWB + for wf in merged: + existing = Workflow.query.filter( + Workflow.description.contains(wf["workflow_id"]), + Workflow.source.in_(["learned_import", "graph_to_visual_converter"]) + ).first() + + if existing: + wf["already_imported"] = True + wf["vwb_workflow_id"] = existing.id + else: + wf["already_imported"] = False + wf["vwb_workflow_id"] = None + + return jsonify({ + "success": True, + "workflows": merged, + "streaming_server_available": streaming_available, + }) + + +# --------------------------------------------------------------------------- +# POST /api/v3/learned-workflows//import +# --------------------------------------------------------------------------- + +@api_v3_bp.route('/learned-workflows//import', methods=['POST']) +def import_learned_workflow(workflow_id: str): + """ + Importe un workflow appris dans le VWB pour review/édition. + + 1. Charge le workflow core JSON (depuis disque ou streaming server) + 2. Convertit les edges en Steps VWB + 3. Crée un Workflow SQLAlchemy avec source='learned_import' + 4. Retourne le workflow VWB créé + + Body (optionnel): + { + "name": "Nom personnalisé", // Surcharge le nom + "machine_id": "windows_pc" // Pour charger depuis streaming server + } + + Response: + { + "success": true, + "workflow": { ... }, + "warnings": [...], + "message": "..." + } + """ + try: + data = request.get_json() or {} + + # Charger le workflow core + core_dict = _load_core_workflow(workflow_id, data.get("machine_id")) + if core_dict is None: + return jsonify({ + "success": False, + "error": f"Workflow '{workflow_id}' non trouvé " + "(ni sur disque, ni sur le streaming server)" + }), 404 + + # Convertir en steps VWB + from services.learned_workflow_bridge import convert_learned_to_vwb_steps + + wf_meta, steps_list, warnings = convert_learned_to_vwb_steps(core_dict) + + # Surcharger le nom si fourni + if data.get("name"): + wf_meta["name"] = data["name"] + + # Créer le workflow VWB + wf_id = generate_id("wf") + workflow = Workflow( + id=wf_id, + name=wf_meta["name"], + description=( + f"{wf_meta.get('description', '')}\n\n" + f"[Importé depuis workflow appris: {workflow_id}]" + ).strip(), + source="learned_import", + review_status="pending_review", + ) + + if wf_meta.get("tags"): + workflow.tags = wf_meta["tags"] + + db.session.add(workflow) + + # Créer les steps + for step_data in steps_list: + step = Step( + id=generate_id("step"), + workflow_id=wf_id, + action_type=step_data["action_type"], + order=step_data["order"], + position_x=step_data.get("position_x", 400), + position_y=step_data.get("position_y", 200), + label=step_data.get("label", step_data["action_type"]), + ) + step.parameters = step_data.get("parameters", {}) + db.session.add(step) + + db.session.commit() + + logger.info( + "Workflow appris importé : %s → %s (%d étapes, %d warnings)", + workflow_id, wf_id, len(steps_list), len(warnings), + ) + + return jsonify({ + "success": True, + "workflow": workflow.to_dict(), + "warnings": warnings, + "message": ( + f"Workflow '{wf_meta['name']}' importé avec {len(steps_list)} étapes. " + "En attente de validation." + ), + }), 201 + + except Exception as e: + db.session.rollback() + traceback.print_exc() + logger.error("Erreur import workflow appris %s : %s", workflow_id, e) + return jsonify({ + "success": False, + "error": str(e), + }), 500 + + +# --------------------------------------------------------------------------- +# POST /api/v3/workflow//export-for-lea +# --------------------------------------------------------------------------- + +@api_v3_bp.route('/workflow//export-for-lea', methods=['POST']) +def export_for_lea(workflow_id: str): + """ + Exporte un workflow VWB au format core JSON pour exécution par Léa. + + 1. Lit le workflow VWB depuis SQLite + 2. Convertit les Steps VWB → core Workflow JSON + 3. Sauvegarde dans data/training/workflows/{machine_id}/ + 4. Le workflow est maintenant disponible pour le streaming server + + Body (optionnel): + { + "machine_id": "windows_pc_01" // Sous-dossier cible (default: "vwb_export") + } + + Response: + { + "success": true, + "core_workflow_id": "...", + "export_path": "data/training/workflows/...", + "message": "..." + } + """ + try: + workflow = Workflow.query.get(workflow_id) + if not workflow: + return jsonify({ + "success": False, + "error": f"Workflow '{workflow_id}' non trouvé" + }), 404 + + # Récupérer les étapes + steps = Step.query.filter_by( + workflow_id=workflow_id + ).order_by(Step.order).all() + + if not steps: + return jsonify({ + "success": False, + "error": "Le workflow n'a aucune étape" + }), 400 + + # Préparer les données + workflow_data = { + "id": workflow.id, + "name": workflow.name, + "description": workflow.description or "", + "tags": workflow.tags or [], + "created_at": workflow.created_at.isoformat() if workflow.created_at else None, + } + steps_data = [s.to_dict() for s in steps] + + # Convertir + from services.learned_workflow_bridge import ( + convert_vwb_to_core_workflow, + save_core_workflow_to_disk, + ) + + core_wf = convert_vwb_to_core_workflow(workflow_data, steps_data) + + # Sauvegarder sur disque + data = request.get_json() or {} + machine_id = data.get("machine_id", "vwb_export") + + filepath = save_core_workflow_to_disk(core_wf, machine_id=machine_id) + + logger.info( + "Workflow VWB exporté pour Léa : %s → %s (%d nodes, %d edges)", + workflow_id, filepath, len(core_wf["nodes"]), len(core_wf["edges"]), + ) + + # Notifier le streaming server si disponible (pour rechargement) + _notify_streaming_reload() + + return jsonify({ + "success": True, + "core_workflow_id": core_wf["workflow_id"], + "export_path": str(filepath), + "nodes_count": len(core_wf["nodes"]), + "edges_count": len(core_wf["edges"]), + "message": ( + f"Workflow '{workflow.name}' exporté avec " + f"{len(core_wf['nodes'])} nœuds et {len(core_wf['edges'])} transitions. " + f"Disponible pour Léa dans {filepath.parent.name}/." + ), + }) + + except Exception as e: + traceback.print_exc() + logger.error("Erreur export workflow %s pour Léa : %s", workflow_id, e) + return jsonify({ + "success": False, + "error": str(e), + }), 500 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _load_core_workflow( + workflow_id: str, machine_id: Optional[str] = None +) -> Optional[Dict[str, Any]]: + """ + Charge un workflow core depuis le disque ou le streaming server. + + Priorité : + 1. Fichiers sur disque (data/training/workflows/) + 2. Streaming server (GET workflow → to_dict) + """ + from services.learned_workflow_bridge import load_learned_workflow + + # 1. Essayer le disque + disk_wf = load_learned_workflow(workflow_id) + if disk_wf: + return disk_wf + + # 2. Essayer le streaming server — récupérer le workflow complet + # Le streaming server n'a pas d'endpoint GET /workflow/, + # mais on peut essayer de lister et trouver par ID + try: + params = {} + if machine_id: + params["machine_id"] = machine_id + + resp = http_requests.get( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflows", + params=params, + timeout=3, + ) + if resp.ok: + workflows = resp.json().get("workflows", []) + for wf in workflows: + if wf.get("workflow_id") == workflow_id: + # Le listing ne contient que les métadonnées, + # on a besoin du workflow complet. + # Essayer l'endpoint de détail si disponible + try: + detail_resp = http_requests.get( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/workflow/{workflow_id}", + timeout=5, + ) + if detail_resp.ok: + return detail_resp.json() + except Exception: + pass + + # Fallback : charger depuis le disque avec le machine_id trouvé + wf_machine = wf.get("machine_id", "default") + wf_path = ( + Path(_ROOT) / "data" / "training" / "workflows" + / wf_machine / f"{workflow_id}.json" + ) + if wf_path.exists(): + import json as json_mod + with open(wf_path, "r", encoding="utf-8") as f: + return json_mod.load(f) + + except http_requests.ConnectionError: + pass + except Exception as e: + logger.warning("Erreur chargement workflow streaming : %s", e) + + return None + + +def _notify_streaming_reload(): + """ + Notifie le streaming server de recharger ses workflows. + + Appel non-bloquant : si le serveur est indisponible, on ignore. + """ + try: + http_requests.post( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/reload-workflows", + timeout=2, + ) + logger.debug("Streaming server notifié pour rechargement des workflows") + except Exception: + # Non-critique : le streaming server rechargera au prochain démarrage + pass diff --git a/visual_workflow_builder/backend/services/learned_workflow_bridge.py b/visual_workflow_builder/backend/services/learned_workflow_bridge.py new file mode 100644 index 000000000..28bc20b0f --- /dev/null +++ b/visual_workflow_builder/backend/services/learned_workflow_bridge.py @@ -0,0 +1,599 @@ +""" +Pont entre les workflows appris par Léa (core Workflow JSON) et les workflows VWB (SQLite). + +Deux directions : +1. Import : core Workflow → VWB Steps (pour review/édition humaine) +2. Export : VWB Steps → core Workflow JSON (pour exécution par Léa/streaming server) + +Le format unifié est le VWB SQLite ; les workflows core JSON servent de format d'échange +avec le streaming server. + +Auteur : Dom, Claude — 18 mars 2026 +""" + +import json +import logging +import os +import sys +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +logger = logging.getLogger(__name__) + +# Chemin racine du projet pour les imports core +_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +# --------------------------------------------------------------------------- +# Mapping action core → VWB +# --------------------------------------------------------------------------- + +# Action types dans les edges du core Workflow → action_type VWB +CORE_ACTION_TO_VWB = { + "mouse_click": "click_anchor", + "text_input": "type_text", + "key_press": "keyboard_shortcut", + "compound": "click_anchor", # Sera décomposé en sous-étapes + "wait": "wait_for_anchor", + "scroll": "scroll_to_anchor", + "unknown": "click_anchor", +} + +# action_type VWB → action type core +VWB_ACTION_TO_CORE = { + "click_anchor": "mouse_click", + "double_click_anchor": "mouse_click", + "right_click_anchor": "mouse_click", + "type_text": "text_input", + "type_secret": "text_input", + "keyboard_shortcut": "key_press", + "hotkey": "key_press", + "wait_for_anchor": "wait", + "scroll_to_anchor": "scroll", + "visual_condition": "evaluate_condition", + "screenshot_evidence": "screenshot", + "extract_text": "extract_data", +} + + +# --------------------------------------------------------------------------- +# Import : core Workflow JSON → VWB steps data +# --------------------------------------------------------------------------- + +def convert_learned_to_vwb_steps( + workflow_dict: Dict[str, Any], +) -> Tuple[Dict[str, Any], List[Dict[str, Any]], List[str]]: + """ + Convertit un workflow appris (format core JSON) en données VWB. + + Le workflow core stocke les actions dans les EDGES (transition entre nœuds), + tandis que le VWB stocke les actions dans les STEPS (séquence linéaire). + + On parcourt le graphe depuis les entry_nodes en suivant les edges, + et on crée un Step VWB par edge. + + Args: + workflow_dict: Le dict JSON du workflow core (tel que sauvegardé sur disque) + + Returns: + Tuple (workflow_meta, steps_list, warnings) + - workflow_meta: dict avec name, description, tags, source + - steps_list: liste de dicts compatibles VWB Step + - warnings: liste de messages d'avertissement + """ + warnings = [] + + # Extraire les métadonnées + workflow_meta = { + "name": workflow_dict.get("name", "Workflow importé"), + "description": workflow_dict.get("description", ""), + "tags": (workflow_dict.get("metadata") or {}).get("tags", []), + "source": "learned_import", + "core_workflow_id": workflow_dict.get("workflow_id", ""), + "learning_state": workflow_dict.get("learning_state", "OBSERVATION"), + } + + nodes = workflow_dict.get("nodes", []) + edges = workflow_dict.get("edges", []) + entry_nodes = workflow_dict.get("entry_nodes", []) + + if not edges and not nodes: + warnings.append("Le workflow ne contient ni nœuds ni edges") + return workflow_meta, [], warnings + + # Index des edges sortants par node + outgoing = {} + for edge in edges: + from_node = edge.get("from_node") or edge.get("source_node", "") + outgoing.setdefault(from_node, []).append(edge) + + # Index des nodes par ID + nodes_by_id = {n["node_id"]: n for n in nodes} + + # Parcours linéaire du graphe (BFS) + visited = set() + queue = list(entry_nodes) if entry_nodes else [] + if not queue and nodes: + queue = [nodes[0]["node_id"]] + + ordered_edges = [] + while queue: + node_id = queue.pop(0) + if node_id in visited: + continue + visited.add(node_id) + + for edge in outgoing.get(node_id, []): + ordered_edges.append(edge) + to_node = edge.get("to_node") or edge.get("target_node", "") + if to_node and to_node not in visited: + queue.append(to_node) + + # Convertir chaque edge en Step VWB + steps = [] + for idx, edge in enumerate(ordered_edges): + action = edge.get("action", {}) + action_type = action.get("type", "unknown") + action_params = action.get("parameters", {}) + target = action.get("target", {}) + + # Déterminer le type VWB + vwb_action_type = CORE_ACTION_TO_VWB.get(action_type, "click_anchor") + + # Construire les paramètres VWB + vwb_params = {} + + if action_type == "mouse_click": + # Extraire la position en pourcentage si disponible + by_position = target.get("by_position") + if by_position: + vwb_params["x_pct"] = by_position[0] if isinstance(by_position, list) else 0 + vwb_params["y_pct"] = by_position[1] if isinstance(by_position, list) else 0 + button = action_params.get("button", "left") + if button == "double": + vwb_action_type = "double_click_anchor" + elif button == "right": + vwb_action_type = "right_click_anchor" + + elif action_type == "text_input": + vwb_params["text"] = action_params.get("text", "") + + elif action_type == "key_press": + keys = action_params.get("keys", []) + if not keys and action_params.get("key"): + keys = [action_params["key"]] + vwb_params["keys"] = keys + + elif action_type == "compound": + # Stocker les sous-étapes dans les paramètres pour référence + vwb_params["compound_steps"] = action_params.get("steps", []) + warnings.append( + f"Étape {idx + 1} : action compound décomposée — vérifier manuellement" + ) + + # Ajouter des infos de ciblage pour la review humaine + if target.get("by_role"): + vwb_params["target_role"] = target["by_role"] + if target.get("by_text"): + vwb_params["target_text"] = target["by_text"] + + # Construire le label + from_node = edge.get("from_node", "") + to_node = edge.get("to_node") or edge.get("target_node", "") + from_name = nodes_by_id.get(from_node, {}).get("name", from_node) + to_name = nodes_by_id.get(to_node, {}).get("name", to_node) + label = _build_step_label(vwb_action_type, vwb_params, from_name, to_name) + + step = { + "action_type": vwb_action_type, + "order": idx, + "position_x": 400, + "position_y": 80 + idx * 120, + "parameters": vwb_params, + "label": label, + # Métadonnées d'origine pour traçabilité + "metadata": { + "core_edge_id": edge.get("edge_id", ""), + "core_from_node": from_node, + "core_to_node": to_node, + }, + } + steps.append(step) + + if not steps and nodes: + # Pas d'edges mais des nodes → créer des étapes basiques depuis les nodes + warnings.append("Aucun edge trouvé — création d'étapes depuis les nœuds") + for idx, node in enumerate(nodes): + node_name = node.get("name", node.get("node_id", f"node_{idx}")) + steps.append({ + "action_type": "click_anchor", + "order": idx, + "position_x": 400, + "position_y": 80 + idx * 120, + "parameters": { + "window_title": (node.get("template", {}).get("window", {}) or {}).get("title_pattern", ""), + }, + "label": f"Écran : {node_name}", + "metadata": {"core_node_id": node.get("node_id", "")}, + }) + + return workflow_meta, steps, warnings + + +def _build_step_label( + action_type: str, params: Dict[str, Any], from_name: str, to_name: str +) -> str: + """Construire un label lisible pour un step VWB.""" + if action_type == "type_text": + text = params.get("text", "") + if text: + return f"Saisir : {text[:40]}{'...' if len(text) > 40 else ''}" + return f"Saisir du texte ({from_name} → {to_name})" + + if action_type == "keyboard_shortcut": + keys = params.get("keys", []) + if keys: + return f"Raccourci : {'+'.join(keys)}" + return f"Raccourci clavier ({from_name} → {to_name})" + + if action_type in ("click_anchor", "double_click_anchor", "right_click_anchor"): + role = params.get("target_role", "") + text = params.get("target_text", "") + hint = role or text or "" + prefix = { + "click_anchor": "Clic", + "double_click_anchor": "Double-clic", + "right_click_anchor": "Clic droit", + }.get(action_type, "Clic") + if hint: + return f"{prefix} : {hint}" + return f"{prefix} ({from_name} → {to_name})" + + if action_type == "wait_for_anchor": + return f"Attente ({from_name} → {to_name})" + + return f"{action_type} ({from_name} → {to_name})" + + +# --------------------------------------------------------------------------- +# Export : VWB Steps → core Workflow JSON +# --------------------------------------------------------------------------- + +def convert_vwb_to_core_workflow( + workflow_data: Dict[str, Any], + steps_data: List[Dict[str, Any]], +) -> Dict[str, Any]: + """ + Convertit un workflow VWB (méta + steps) en format core Workflow JSON. + + Le résultat peut être sauvegardé dans data/training/workflows/ pour + être chargé par le streaming server. + + Args: + workflow_data: dict du workflow VWB (id, name, description, tags) + steps_data: liste de dicts des steps VWB (action_type, parameters, etc.) + + Returns: + Dict au format core Workflow (compatible Workflow.from_dict / save_to_file) + """ + now = datetime.now().isoformat() + wf_id = workflow_data.get("id", f"wf_{uuid.uuid4().hex[:12]}") + + # Créer les nodes : un node par étape (chaque étape = un état écran) + nodes = [] + edges = [] + + for idx, step in enumerate(steps_data): + node_id = f"node_{idx:03d}" + action_type = step.get("action_type", "click_anchor") + params = step.get("parameters", {}) + label = step.get("label", action_type) + + # Créer le node (template minimal) + node = { + "node_id": node_id, + "name": label, + "description": f"Étape {idx + 1} : {label}", + "template": { + "window": { + "title_pattern": params.get("window_title"), + "title_contains": params.get("window_title"), + "process_name": None, + }, + "text": { + "required_texts": params.get("text_patterns", []), + "forbidden_texts": [], + }, + "ui": { + "required_roles": [], + "required_types": [], + "min_element_count": 0, + }, + "embedding": { + "provider": "none", + "vector_id": "", + "min_cosine_similarity": 0.85, + "sample_count": 0, + }, + }, + "is_entry": idx == 0, + "is_end": idx == len(steps_data) - 1, + "variants": [], + "primary_variant_id": None, + "max_variants": 5, + "quality_score": 0.0, + "cluster_metrics": {}, + "spatial_relations": [], + "container_type": None, + "metadata": { + "vwb_step_id": step.get("id", ""), + "visual_type": _action_type_to_visual(action_type), + }, + } + nodes.append(node) + + # Créer l'edge vers le node suivant (sauf pour le dernier) + if idx < len(steps_data) - 1: + next_node_id = f"node_{idx + 1:03d}" + + # Convertir l'action VWB → action core + core_action_type = VWB_ACTION_TO_CORE.get(action_type, "mouse_click") + core_params = _vwb_params_to_core(action_type, params) + target_spec = _vwb_params_to_target_spec(action_type, params) + + edge = { + "edge_id": f"{node_id}_to_{next_node_id}", + "from_node": node_id, + "to_node": next_node_id, + "action": { + "type": core_action_type, + "target": target_spec, + "parameters": core_params, + }, + "constraints": { + "pre_conditions": {}, + "required_confidence": 0.8, + "max_wait_time_ms": 5000, + }, + "post_conditions": { + "success_mode": "all", + "timeout_ms": 3000, + "poll_ms": 200, + "success": [], + "fail_fast": [], + "retries": 2, + "backoff_ms": 150, + "expected_node": next_node_id, + "window_change_expected": False, + "new_ui_elements_expected": [], + }, + "stats": { + "execution_count": 0, + "success_count": 0, + "failure_count": 0, + "avg_duration_ms": 0.0, + }, + "metadata": { + "created_from_vwb": True, + "vwb_workflow_id": wf_id, + }, + } + edges.append(edge) + + # Construire le workflow complet + core_workflow = { + "workflow_id": wf_id, + "name": workflow_data.get("name", "Workflow exporté"), + "description": workflow_data.get("description", ""), + "version": 1, + "learning_state": "COACHING", # Exporté depuis VWB = validé par l'humain + "created_at": workflow_data.get("created_at", now), + "updated_at": now, + "entry_nodes": [nodes[0]["node_id"]] if nodes else [], + "end_nodes": [nodes[-1]["node_id"]] if nodes else [], + "nodes": nodes, + "edges": edges, + "safety_rules": { + "require_confirmation_for": [], + "forbidden_windows": [], + "execution_timeout_minutes": 0, + }, + "stats": { + "total_executions": 0, + "success_count": 0, + "failure_count": 0, + "observed_runs": 0, + "assist_runs": 0, + "auto_candidate_runs": 0, + "auto_confirmed_runs": 0, + }, + "learning": { + "observation_count": 0, + "confidence_threshold": 0.8, + "promotion_rules": {}, + }, + "metadata": { + "exported_from_vwb": True, + "vwb_workflow_id": wf_id, + "tags": workflow_data.get("tags", []), + }, + "loops": {}, + "conditionals": {}, + "references": [], + } + + return core_workflow + + +def _action_type_to_visual(action_type: str) -> str: + """Convertit un action_type VWB en type visuel.""" + mapping = { + "click_anchor": "click", + "double_click_anchor": "click", + "right_click_anchor": "click", + "type_text": "type", + "type_secret": "type", + "keyboard_shortcut": "validate", + "hotkey": "validate", + "wait_for_anchor": "wait", + "scroll_to_anchor": "scroll", + "visual_condition": "condition", + "screenshot_evidence": "screenshot", + } + return mapping.get(action_type, "click") + + +def _vwb_params_to_core(action_type: str, params: Dict[str, Any]) -> Dict[str, Any]: + """Convertit les paramètres VWB en paramètres core.""" + core_params = {} + + if action_type in ("type_text", "type_secret"): + core_params["text"] = params.get("text", "") + + elif action_type in ("keyboard_shortcut", "hotkey"): + core_params["keys"] = params.get("keys", []) + + elif action_type in ("click_anchor", "double_click_anchor", "right_click_anchor"): + button = "left" + if action_type == "double_click_anchor": + button = "double" + elif action_type == "right_click_anchor": + button = "right" + core_params["button"] = button + + elif action_type == "wait_for_anchor": + core_params["duration_ms"] = params.get("duration_ms", 2000) + + return core_params + + +def _vwb_params_to_target_spec(action_type: str, params: Dict[str, Any]) -> Dict[str, Any]: + """Construit un TargetSpec core depuis les paramètres VWB.""" + target = { + "by_role": params.get("target_role", "unknown_element"), + "by_text": params.get("target_text"), + "by_position": None, + "selection_policy": "first", + "fallback_strategy": "visual_similarity", + } + + # Injecter la position en pourcentage si disponible + x_pct = params.get("x_pct") + y_pct = params.get("y_pct") + if x_pct is not None and y_pct is not None: + target["by_position"] = [x_pct, y_pct] + + return target + + +# --------------------------------------------------------------------------- +# Utilitaires fichiers +# --------------------------------------------------------------------------- + +def save_core_workflow_to_disk( + workflow_dict: Dict[str, Any], + machine_id: str = "vwb_export", +) -> Path: + """ + Sauvegarde un workflow core sur disque dans le format attendu par le streaming server. + + Emplacement : data/training/workflows/{machine_id}/{workflow_id}.json + + Returns: + Path du fichier sauvegardé + """ + data_dir = Path(_ROOT) / "data" / "training" / "workflows" / machine_id + data_dir.mkdir(parents=True, exist_ok=True) + + wf_id = workflow_dict.get("workflow_id", f"wf_{uuid.uuid4().hex[:12]}") + filepath = data_dir / f"{wf_id}.json" + + with open(filepath, "w", encoding="utf-8") as f: + json.dump(workflow_dict, f, indent=2, ensure_ascii=False) + + logger.info("Workflow core sauvegardé : %s", filepath) + return filepath + + +def load_learned_workflow(workflow_id: str) -> Optional[Dict[str, Any]]: + """ + Charge un workflow appris depuis le disque. + + Cherche dans data/training/workflows/ (racine et sous-dossiers machine). + + Returns: + Dict JSON du workflow, ou None si non trouvé + """ + base_dir = Path(_ROOT) / "data" / "training" / "workflows" + if not base_dir.exists(): + return None + + # Chercher dans la racine + direct = base_dir / f"{workflow_id}.json" + if direct.exists(): + return _load_json(direct) + + # Chercher dans les sous-dossiers machine + for machine_dir in base_dir.iterdir(): + if machine_dir.is_dir(): + candidate = machine_dir / f"{workflow_id}.json" + if candidate.exists(): + return _load_json(candidate) + + return None + + +def list_learned_workflows_from_disk() -> List[Dict[str, Any]]: + """ + Liste tous les workflows appris disponibles sur disque. + + Retourne une liste de dicts avec les métadonnées de base. + """ + base_dir = Path(_ROOT) / "data" / "training" / "workflows" + if not base_dir.exists(): + return [] + + workflows = [] + + def _scan_dir(d: Path, machine_id: str = "default"): + for f in sorted(d.glob("*.json")): + try: + data = _load_json(f) + if data: + workflows.append({ + "workflow_id": data.get("workflow_id", f.stem), + "name": data.get("name", f.stem), + "description": data.get("description", ""), + "machine_id": machine_id, + "nodes": len(data.get("nodes", [])), + "edges": len(data.get("edges", [])), + "learning_state": data.get("learning_state", "OBSERVATION"), + "created_at": data.get("created_at"), + "source": "learned", + }) + except Exception as e: + logger.warning("Erreur lecture %s : %s", f, e) + + # Racine + _scan_dir(base_dir) + + # Sous-dossiers machine + for machine_dir in sorted(base_dir.iterdir()): + if machine_dir.is_dir(): + _scan_dir(machine_dir, machine_id=machine_dir.name) + + return workflows + + +def _load_json(filepath: Path) -> Optional[Dict[str, Any]]: + """Charge un fichier JSON.""" + try: + with open(filepath, "r", encoding="utf-8") as f: + return json.load(f) + except Exception as e: + logger.error("Erreur lecture JSON %s : %s", filepath, e) + return None diff --git a/visual_workflow_builder/frontend_v4/src/components/WorkflowSelector.tsx b/visual_workflow_builder/frontend_v4/src/components/WorkflowSelector.tsx index 9f3f6eabb..3ecebaf90 100644 --- a/visual_workflow_builder/frontend_v4/src/components/WorkflowSelector.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/WorkflowSelector.tsx @@ -1,5 +1,7 @@ -import { useState, useRef, useEffect } from 'react'; +import { useState, useRef, useEffect, useCallback } from 'react'; import type { WorkflowSummary } from '../types'; +import * as api from '../services/api'; +import type { LearnedWorkflow } from '../services/api'; interface Props { workflows: WorkflowSummary[]; @@ -22,6 +24,9 @@ export default function WorkflowSelector({ const [search, setSearch] = useState(''); const [editingId, setEditingId] = useState(null); const [editName, setEditName] = useState(''); + const [learnedWorkflows, setLearnedWorkflows] = useState([]); + const [learnedLoading, setLearnedLoading] = useState(false); + const [importingId, setImportingId] = useState(null); const dropdownRef = useRef(null); const inputRef = useRef(null); @@ -45,12 +50,39 @@ export default function WorkflowSelector({ } }, [editingId]); - // Filtrer les workflows + // Charger les workflows appris quand le dropdown s'ouvre + const loadLearnedWorkflows = useCallback(async () => { + setLearnedLoading(true); + try { + const data = await api.getLearnedWorkflows(); + // Ne garder que ceux qui ne sont pas encore importés + setLearnedWorkflows(data.workflows.filter(w => !w.already_imported)); + } catch { + // Silencieux : le streaming server n'est peut-être pas lancé + setLearnedWorkflows([]); + } finally { + setLearnedLoading(false); + } + }, []); + + useEffect(() => { + if (isOpen) { + loadLearnedWorkflows(); + } + }, [isOpen, loadLearnedWorkflows]); + + // Filtrer les workflows VWB const filteredWorkflows = workflows.filter(wf => wf.name.toLowerCase().includes(search.toLowerCase()) || (wf.tags || []).some(tag => tag.toLowerCase().includes(search.toLowerCase())) ); + // Filtrer les workflows appris + const filteredLearned = learnedWorkflows.filter(wf => + wf.name.toLowerCase().includes(search.toLowerCase()) || + wf.workflow_id.toLowerCase().includes(search.toLowerCase()) + ); + // Workflows récents (les 8 premiers) const recentWorkflows = filteredWorkflows.slice(0, 8); const hasMore = filteredWorkflows.length > 8; @@ -81,6 +113,27 @@ export default function WorkflowSelector({ } }; + const handleImportLearned = async (wf: LearnedWorkflow, e: React.MouseEvent) => { + e.stopPropagation(); + setImportingId(wf.workflow_id); + try { + const result = await api.importLearnedWorkflow(wf.workflow_id, { + machine_id: wf.machine_id, + }); + // Sélectionner le workflow importé + if (result.workflow?.id) { + onSelect(result.workflow.id); + setIsOpen(false); + } + // Retirer de la liste des appris + setLearnedWorkflows(prev => prev.filter(w => w.workflow_id !== wf.workflow_id)); + } catch (err) { + console.error('Erreur import workflow appris:', err); + } finally { + setImportingId(null); + } + }; + return (
{/* Bouton principal */} @@ -120,7 +173,7 @@ export default function WorkflowSelector({
- {/* Liste des workflows */} + {/* Liste des workflows VWB */}
{recentWorkflows.length === 0 ? (

@@ -177,6 +230,42 @@ export default function WorkflowSelector({ )}

+ {/* Section workflows appris par Léa */} + {(filteredLearned.length > 0 || learnedLoading) && ( + <> +
+ Appris par Léa + {learnedLoading && ...} +
+
+ {filteredLearned.map(wf => ( +
+ + {wf.name} + + appris + + + + {wf.nodes} noeuds, {wf.edges} transitions + + +
+ ))} +
+ + )} + {/* Lien vers le gestionnaire */} {(hasMore || workflows.length > 0) && (
diff --git a/visual_workflow_builder/frontend_v4/src/components/WorkflowValidation.tsx b/visual_workflow_builder/frontend_v4/src/components/WorkflowValidation.tsx index a3bfef361..2f996a437 100644 --- a/visual_workflow_builder/frontend_v4/src/components/WorkflowValidation.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/WorkflowValidation.tsx @@ -18,6 +18,8 @@ export default function WorkflowValidation({ workflowId }: WorkflowValidationPro const [result, setResult] = useState(null); const [exportPath, setExportPath] = useState(null); const [exportError, setExportError] = useState(null); + const [leaExportMessage, setLeaExportMessage] = useState(null); + const [leaExportError, setLeaExportError] = useState(null); const handleValidate = async () => { if (!workflowId) return; @@ -54,11 +56,26 @@ export default function WorkflowValidation({ workflowId }: WorkflowValidationPro } }; + const handleExportForLea = async () => { + if (!workflowId) return; + setLeaExportError(null); + setLeaExportMessage(null); + + try { + const data = await api.exportForLea(workflowId); + setLeaExportMessage(data.message); + } catch (err) { + setLeaExportError((err as Error).message); + } + }; + const handleClose = () => { setShowModal(false); setResult(null); setExportPath(null); setExportError(null); + setLeaExportMessage(null); + setLeaExportError(null); }; return ( @@ -154,6 +171,34 @@ export default function WorkflowValidation({ workflowId }: WorkflowValidationPro
)} + {/* Export pour Léa */} + {result.is_valid && !leaExportMessage && ( +
+ +
+ )} + + {leaExportError && ( +
+
Erreur export Léa
+
    +
  • {leaExportError}
  • +
+
+ )} + + {leaExportMessage && ( +
+ 🚀 +
+ Export Léa réussi +

{leaExportMessage}

+
+
+ )} + {/* Message si invalide */} {!result.is_valid && (
diff --git a/visual_workflow_builder/frontend_v4/src/services/api.ts b/visual_workflow_builder/frontend_v4/src/services/api.ts index 6e419615e..1cdc28fe7 100644 --- a/visual_workflow_builder/frontend_v4/src/services/api.ts +++ b/visual_workflow_builder/frontend_v4/src/services/api.ts @@ -306,3 +306,52 @@ export async function getDagStatus( }> { return request('GET', `/workflow/${workflowId}/dag-status`); } + +// Workflows appris par Léa — pont avec le streaming server +export interface LearnedWorkflow { + workflow_id: string; + name: string; + description: string; + machine_id: string; + nodes: number; + edges: number; + learning_state: string; + source: string; + already_imported: boolean; + vwb_workflow_id: string | null; + created_at?: string; +} + +export async function getLearnedWorkflows(machineId?: string): Promise<{ + workflows: LearnedWorkflow[]; + streaming_server_available: boolean; +}> { + const params = machineId ? `?machine_id=${encodeURIComponent(machineId)}` : ''; + return request('GET', `/learned-workflows${params}`); +} + +export async function importLearnedWorkflow( + workflowId: string, + options?: { name?: string; machine_id?: string } +): Promise<{ + workflow: Workflow; + warnings: string[]; + message: string; +}> { + return request('POST', `/learned-workflows/${workflowId}/import`, options); +} + +export async function exportForLea( + workflowId: string, + machineId?: string +): Promise<{ + core_workflow_id: string; + export_path: string; + nodes_count: number; + edges_count: number; + message: string; +}> { + return request('POST', `/workflow/${workflowId}/export-for-lea`, { + machine_id: machineId, + }); +} diff --git a/visual_workflow_builder/frontend_v4/src/styles.css b/visual_workflow_builder/frontend_v4/src/styles.css index e2280a4f4..a2674c96d 100644 --- a/visual_workflow_builder/frontend_v4/src/styles.css +++ b/visual_workflow_builder/frontend_v4/src/styles.css @@ -2513,6 +2513,70 @@ body { text-decoration: underline; } +/* Section workflows appris par Léa */ +.dropdown-section-header { + padding: 0.4rem 0.75rem; + font-size: 0.75rem; + font-weight: 600; + text-transform: uppercase; + color: var(--text-muted); + background: var(--bg-sidebar); + border-top: 1px solid var(--border); + display: flex; + align-items: center; + gap: 0.3rem; +} + +.dropdown-section-header .loading-dot { + color: var(--primary); + animation: pulse-dots 1s infinite; +} + +@keyframes pulse-dots { + 0%, 100% { opacity: 1; } + 50% { opacity: 0.3; } +} + +.learned-list .learned-item { + display: flex; + flex-wrap: wrap; + align-items: center; + gap: 0.3rem; +} + +.learned-badge { + display: inline-block; + font-size: 0.65rem; + padding: 0.1rem 0.4rem; + border-radius: 3px; + background: #dbeafe; + color: #1d4ed8; + margin-left: 0.3rem; + font-weight: 500; +} + +.import-btn { + margin-left: auto; + padding: 0.2rem 0.6rem; + font-size: 0.75rem; + background: var(--primary); + color: white; + border: none; + border-radius: 4px; + cursor: pointer; + transition: background 0.15s; + flex-shrink: 0; +} + +.import-btn:hover:not(:disabled) { + background: var(--primary-hover, #2563eb); +} + +.import-btn:disabled { + opacity: 0.5; + cursor: wait; +} + /* =========================================== WorkflowManagerModal =========================================== */ @@ -3610,6 +3674,11 @@ body { font-size: 0.9rem; } +.validation-export .btn-secondary { + padding: 0.6rem 1.5rem; + font-size: 0.9rem; +} + .validation-success { display: flex; align-items: center;