""" API v3 - Exécution DAG de Workflows avec étapes LLM Convertit un workflow VWB (nœuds + edges) en DAGExecutor steps et lance l'exécution parallèle (UI séquentiel, LLM parallèle). POST /api/v3/workflow//execute-dag → Lance l'exécution DAG GET /api/v3/workflow//dag-status → Statut de l'exécution en cours Auteur : Dom, Claude — 16 mars 2026 """ import json import logging import os import re import sys import traceback from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from flask import jsonify, request from . import api_v3_bp logger = logging.getLogger(__name__) # Ajouter le répertoire racine pour les imports core _ROOT = str(Path(__file__).resolve().parent.parent.parent.parent) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) from core.execution.dag_executor import ( DAGExecutionResult, DAGExecutor, StepStatus, StepType, WorkflowStep, ) from core.execution.llm_actions import LLMActionHandler # Data loop — import Excel et itération sur tables try: from core.data.excel_importer import ExcelImporter from core.data.db_iterator import DBIterator _DATA_LOOP_AVAILABLE = True except ImportError as _e: logger.warning("Module core.data indisponible : %s", _e) _DATA_LOOP_AVAILABLE = False # --------------------------------------------------------------------------- # Types d'actions VWB → StepType du DAGExecutor # --------------------------------------------------------------------------- # Les action_types VWB qui correspondent à des appels LLM _LLM_ACTION_TYPES = { "llm_analyze", "llm_translate", "llm_extract_data", "llm_generate", } # Mapping action_type VWB → llm_action du LLMActionHandler _LLM_ACTION_MAP = { "llm_analyze": "analyze_text", "llm_translate": "translate", "llm_extract_data": "extract_data", "llm_generate": "generate_text", } # Actions VWB de type attente _WAIT_ACTION_TYPES = {"wait_for_anchor"} # Actions VWB de type condition _CONDITION_ACTION_TYPES = {"visual_condition"} # Actions VWB de type data loop _DATA_LOOP_ACTION_TYPES = {"import_excel", "db_foreach"} # Actions VWB de gestion de fichiers _FILE_ACTION_TYPES = { "file_list_dir", "file_create_dir", "file_move", "file_copy", "file_sort_by_ext", } def _classify_step_type(action_type: str) -> StepType: """Détermine le StepType DAG à partir du action_type VWB.""" if action_type in _LLM_ACTION_TYPES: return StepType.LLM_CALL if action_type in _WAIT_ACTION_TYPES: return StepType.WAIT if action_type in _CONDITION_ACTION_TYPES: return StepType.CONDITION # import_excel et db_foreach sont des UI_ACTION traitées spécialement return StepType.UI_ACTION def _build_llm_action(action_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: """Construit le dict d'action LLM attendu par le LLMActionHandler. Ajoute la clé 'llm_action' et recopie les paramètres pertinents. """ llm_action = _LLM_ACTION_MAP.get(action_type) if not llm_action: raise ValueError(f"Type d'action LLM inconnu : {action_type}") action = {"llm_action": llm_action} # Copier les paramètres pertinents sans modification for key in ("text", "instruction", "model", "temperature", "target_lang", "source_lang", "schema", "prompt", "context"): if key in parameters and parameters[key]: val = parameters[key] # Le schema peut être une chaîne JSON — le parser if key == "schema" and isinstance(val, str): try: val = json.loads(val) except json.JSONDecodeError: pass action[key] = val return action # --------------------------------------------------------------------------- # Conversion VWB workflow → DAG steps # --------------------------------------------------------------------------- def _convert_vwb_to_dag_steps( steps_data: List[Dict[str, Any]], edges_data: List[Dict[str, Any]], ) -> List[WorkflowStep]: """Convertit les nœuds et edges VWB en liste de WorkflowStep DAG. Les edges définissent les dépendances : si un edge va de A vers B, alors B dépend de A. Args: steps_data: Liste de dicts (step.to_dict() depuis le modèle SQLAlchemy) edges_data: Liste de dicts {"source": "step_id_A", "target": "step_id_B"} Returns: Liste de WorkflowStep prêtes à être chargées dans le DAGExecutor """ # Construire le mapping des dépendances depuis les edges depends_map: Dict[str, List[str]] = {} for edge in edges_data: source = edge.get("source", "") target = edge.get("target", "") if source and target: depends_map.setdefault(target, []).append(source) dag_steps = [] for step_data in steps_data: step_id = step_data["id"] action_type = step_data["action_type"] parameters = step_data.get("parameters", {}) step_type = _classify_step_type(action_type) depends_on = depends_map.get(step_id, []) # Construire l'action selon le type if step_type == StepType.LLM_CALL: action = _build_llm_action(action_type, parameters) else: # Pour les actions UI, passer les paramètres tels quels action = {"type": action_type, **parameters} dag_step = WorkflowStep( step_id=step_id, step_type=step_type, action=action, depends_on=depends_on, ) dag_steps.append(dag_step) return dag_steps # --------------------------------------------------------------------------- # Exécution des étapes Data Loop (import_excel, db_foreach) # --------------------------------------------------------------------------- _CURRENT_ROW_PATTERN = re.compile(r"\$\{current_row\.(\w+)\}") def _execute_import_excel(parameters: Dict[str, Any]) -> Dict[str, Any]: """Exécute l'import d'un fichier Excel dans la base SQLite. Args: parameters: Doit contenir 'file_path', optionnellement 'table_name', 'sheet_name' Returns: Dict avec les infos d'import (table_name, row_count, columns) """ if not _DATA_LOOP_AVAILABLE: raise RuntimeError( "Module core.data non disponible. " "Vérifiez que core/data/excel_importer.py existe." ) file_path = parameters.get("file_path", "") if not file_path: raise ValueError("Paramètre 'file_path' requis pour import_excel") table_name = parameters.get("table_name") or None sheet_name = parameters.get("sheet_name") or None # Convertir sheet_name en int si c'est un index numérique if sheet_name and sheet_name.isdigit(): sheet_name = int(sheet_name) importer = ExcelImporter() result = importer.import_file( file_path=file_path, table_name=table_name, sheet_name=sheet_name, ) logger.info( "Import Excel terminé : %s → table '%s' (%d lignes, colonnes: %s)", file_path, result.table_name, result.row_count, result.columns, ) return { "table_name": result.table_name, "row_count": result.row_count, "columns": result.columns, "db_path": str(result.db_path) if hasattr(result, "db_path") else None, } def _inject_current_row( action: Dict[str, Any], row: Dict[str, Any] ) -> Dict[str, Any]: """Remplace les références ${current_row.column} dans les paramètres d'une action. Args: action: Dict des paramètres de l'étape row: Dict représentant la ligne courante (colonne → valeur) Returns: Copie de l'action avec les références remplacées """ import copy resolved = copy.deepcopy(action) def _resolve(obj: Any) -> Any: if isinstance(obj, str): # Cas spécial : la chaîne entière est une seule référence m = _CURRENT_ROW_PATTERN.fullmatch(obj) if m: col = m.group(1) return row.get(col, obj) # Cas général : remplacement inline def _replacer(match: re.Match) -> str: col = match.group(1) val = row.get(col) return str(val) if val is not None else match.group(0) return _CURRENT_ROW_PATTERN.sub(_replacer, obj) elif isinstance(obj, dict): return {k: _resolve(v) for k, v in obj.items()} elif isinstance(obj, list): return [_resolve(item) for item in obj] return obj return _resolve(resolved) def _execute_db_foreach( foreach_step_data: Dict[str, Any], all_steps_data: List[Dict[str, Any]], edges_data: List[Dict[str, Any]], executor_kwargs: Dict[str, Any], ) -> Dict[str, Any]: """Exécute une boucle db_foreach : lit les lignes puis ré-exécute les étapes dépendantes. Algorithme : 1. Lire toutes les lignes de la table (avec filtres optionnels) 2. Identifier les étapes qui dépendent du foreach (via les edges) 3. Pour chaque ligne, injecter ${current_row.column} et exécuter ces étapes Args: foreach_step_data: Les données de l'étape db_foreach all_steps_data: Toutes les étapes du workflow edges_data: Les edges du workflow executor_kwargs: Paramètres pour le DAGExecutor (model, ollama_endpoint, timeout) Returns: Dict avec le résumé de l'exécution de la boucle """ if not _DATA_LOOP_AVAILABLE: raise RuntimeError( "Module core.data non disponible. " "Vérifiez que core/data/db_iterator.py existe." ) parameters = foreach_step_data.get("parameters", {}) table_name = parameters.get("table_name", "") if not table_name: raise ValueError("Paramètre 'table_name' requis pour db_foreach") where_clause = parameters.get("where_clause") or None order_by = parameters.get("order_by") or None limit = parameters.get("limit") if limit is not None: try: limit = int(limit) except (ValueError, TypeError): limit = None # 1. Lire les lignes iterator = DBIterator() rows = list(iterator.iterate( table_name=table_name, where=where_clause, order_by=order_by, limit=limit, )) if not rows: logger.info("db_foreach : table '%s' vide, aucune itération", table_name) return {"row_count": 0, "iterations": [], "table_name": table_name} logger.info( "db_foreach : %d lignes à traiter dans '%s'", len(rows), table_name ) # 2. Identifier les étapes dépendantes du foreach foreach_id = foreach_step_data["id"] dependent_ids = set() for edge in edges_data: if edge.get("source") == foreach_id: dependent_ids.add(edge["target"]) # Aussi inclure les étapes qui dépendent des dépendants directs (chaîne) changed = True while changed: changed = False for edge in edges_data: if edge.get("source") in dependent_ids and edge["target"] not in dependent_ids: dependent_ids.add(edge["target"]) changed = True if not dependent_ids: logger.warning("db_foreach '%s' : aucune étape dépendante trouvée", foreach_id) return {"row_count": len(rows), "iterations": [], "table_name": table_name} # Filtrer les étapes et edges pour ne garder que le sous-graphe sub_steps = [s for s in all_steps_data if s["id"] in dependent_ids] sub_edges = [ e for e in edges_data if e.get("source") in dependent_ids and e.get("target") in dependent_ids ] # 3. Pour chaque ligne, injecter et exécuter iteration_results = [] model = executor_kwargs.get("model", os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b"))) ollama_endpoint = executor_kwargs.get("ollama_endpoint", "http://localhost:11434") timeout = executor_kwargs.get("timeout", 300) for row_idx, row in enumerate(rows): logger.info( "db_foreach iteration %d/%d : %s", row_idx + 1, len(rows), {k: str(v)[:50] for k, v in row.items()} ) # Injecter ${current_row.xxx} dans les paramètres de chaque étape injected_steps = [] for step_data in sub_steps: injected = dict(step_data) injected["parameters"] = _inject_current_row( step_data.get("parameters", {}), row ) injected_steps.append(injected) # Reconstruire les edges internes (sans la dépendance vers foreach) internal_edges = [] for e in sub_edges: internal_edges.append(e) # Pour les étapes qui dépendaient du foreach, supprimer cette dépendance # (elles deviennent des racines du sous-DAG) root_ids = set() for edge in edges_data: if edge.get("source") == foreach_id and edge["target"] in dependent_ids: root_ids.add(edge["target"]) # Convertir en DAG steps dag_steps = _convert_vwb_to_dag_steps(injected_steps, internal_edges) # Supprimer la dépendance vers foreach_id dans les étapes racines for ds in dag_steps: ds.depends_on = [d for d in ds.depends_on if d != foreach_id] # Vérifier s'il y a des étapes LLM has_llm = any(s.step_type == StepType.LLM_CALL for s in dag_steps) llm_handler = None if has_llm: llm_handler = LLMActionHandler( ollama_endpoint=ollama_endpoint, model=model, ) executor = DAGExecutor( max_llm_workers=2, max_ui_workers=1, llm_handler=llm_handler, ) executor.load_workflow(dag_steps) result = executor.execute(timeout=timeout) iteration_results.append({ "row_index": row_idx, "row_data": row, "success": result.success, "results": result.results, "errors": result.errors, "duration": result.duration_seconds, }) if not result.success: logger.warning( "db_foreach iteration %d échouée : %s", row_idx, result.errors, ) # Résumé success_count = sum(1 for r in iteration_results if r["success"]) return { "table_name": table_name, "row_count": len(rows), "success_count": success_count, "error_count": len(rows) - success_count, "iterations": iteration_results, } # --------------------------------------------------------------------------- # Instance globale du dernier exécuteur (pour le status polling) # --------------------------------------------------------------------------- _current_executor: Optional[DAGExecutor] = None _last_result: Optional[DAGExecutionResult] = None # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @api_v3_bp.route('/workflow//execute-dag', methods=['POST']) def execute_dag(workflow_id: str): """ Lance l'exécution DAG d'un workflow VWB. Les étapes LLM (llm_analyze, llm_translate, llm_extract_data, llm_generate) sont exécutées en parallèle via Ollama. Les étapes UI restent séquentielles. Body (optionnel) : { "edges": [{"source": "step_A", "target": "step_B"}, ...], "timeout": 300, "model": "qwen3-vl:8b", "ollama_endpoint": "http://localhost:11434" } Response : { "success": true/false, "execution": { "success": ..., "steps": {...}, "results": {...}, ... } } """ global _current_executor, _last_result try: from db.models import Workflow, Step workflow = Workflow.query.get(workflow_id) if not workflow: return jsonify({ 'success': False, 'error': f"Workflow '{workflow_id}' non trouvé" }), 404 # Récupérer les étapes depuis la BDD steps_db = Step.query.filter_by( workflow_id=workflow_id ).order_by(Step.order).all() if not steps_db: return jsonify({ 'success': False, 'error': "Le workflow n'a aucune étape" }), 400 steps_data = [s.to_dict() for s in steps_db] # Récupérer les edges depuis le body (le frontend les envoie) data = request.get_json() or {} edges_data = data.get("edges", []) # Si pas d'edges fournis, créer une chaîne linéaire par défaut if not edges_data: for i in range(len(steps_data) - 1): edges_data.append({ "source": steps_data[i]["id"], "target": steps_data[i + 1]["id"], }) # Paramètres optionnels timeout = data.get("timeout", 300) model = data.get("model", os.environ.get("RPA_VLM_MODEL", os.environ.get("VLM_MODEL", "gemma4:e4b"))) ollama_endpoint = data.get("ollama_endpoint", "http://localhost:11434") executor_kwargs = { "timeout": timeout, "model": model, "ollama_endpoint": ollama_endpoint, } # --------------------------------------------------------------- # Pré-traitement des étapes Data Loop (import_excel, db_foreach) # --------------------------------------------------------------- data_loop_results: Dict[str, Any] = {} # 1. Exécuter les imports Excel en premier (avant le DAG principal) import_steps = [s for s in steps_data if s["action_type"] == "import_excel"] for imp_step in import_steps: try: imp_result = _execute_import_excel(imp_step.get("parameters", {})) data_loop_results[imp_step["id"]] = imp_result logger.info( "Import Excel '%s' terminé : %s", imp_step["id"], imp_result, ) except Exception as exc: logger.error("Import Excel '%s' échoué : %s", imp_step["id"], exc) return jsonify({ 'success': False, 'error': f"Import Excel échoué : {exc}", }), 500 # 2. Traiter les étapes db_foreach séparément foreach_steps = [s for s in steps_data if s["action_type"] == "db_foreach"] for fe_step in foreach_steps: try: fe_result = _execute_db_foreach( fe_step, steps_data, edges_data, executor_kwargs, ) data_loop_results[fe_step["id"]] = fe_result logger.info( "db_foreach '%s' terminé : %d lignes, %d succès", fe_step["id"], fe_result.get("row_count", 0), fe_result.get("success_count", 0), ) except Exception as exc: logger.error("db_foreach '%s' échoué : %s", fe_step["id"], exc) return jsonify({ 'success': False, 'error': f"Boucle données échouée : {exc}", }), 500 # 3. Filtrer les étapes data loop + leurs dépendants (déjà exécutés) # pour le DAG principal foreach_ids = {s["id"] for s in foreach_steps} already_executed_ids = set() for fe_id in foreach_ids: already_executed_ids.add(fe_id) # Trouver récursivement tous les dépendants changed = True while changed: changed = False for edge in edges_data: if (edge.get("source") in already_executed_ids and edge["target"] not in already_executed_ids): already_executed_ids.add(edge["target"]) changed = True # Ajouter les import_excel aux déjà exécutés for imp_step in import_steps: already_executed_ids.add(imp_step["id"]) # Filtrer les étapes restantes pour le DAG principal remaining_steps = [s for s in steps_data if s["id"] not in already_executed_ids] remaining_edges = [ e for e in edges_data if e.get("source") not in already_executed_ids and e.get("target") not in already_executed_ids ] # Si toutes les étapes sont des data loop, retourner directement if not remaining_steps: return jsonify({ 'success': True, 'execution': { "success": True, "data_loop_results": data_loop_results, "steps": {}, "results": {}, "errors": [], "duration_seconds": 0, }, }) # --------------------------------------------------------------- # Exécution DAG normale pour les étapes restantes # --------------------------------------------------------------- # Convertir en étapes DAG dag_steps = _convert_vwb_to_dag_steps(remaining_steps, remaining_edges) # Supprimer les dépendances vers des étapes data loop déjà exécutées for ds in dag_steps: ds.depends_on = [d for d in ds.depends_on if d not in already_executed_ids] # Vérifier s'il y a des étapes LLM has_llm_steps = any(s.step_type == StepType.LLM_CALL for s in dag_steps) # Créer le handler LLM si nécessaire llm_handler = None if has_llm_steps: llm_handler = LLMActionHandler( ollama_endpoint=ollama_endpoint, model=model, ) # Créer et configurer l'exécuteur executor = DAGExecutor( max_llm_workers=2, max_ui_workers=1, llm_handler=llm_handler, ) # Injecter les résultats des étapes data loop dans l'exécuteur # pour qu'ils soient disponibles via ${step_id.result} for step_id, dl_result in data_loop_results.items(): executor._results[step_id] = dl_result # Charger le workflow dans le DAG executor.load_workflow(dag_steps) # Garder une référence pour le status _current_executor = executor _last_result = None logger.info( "Lancement exécution DAG pour workflow '%s' : %d étapes (%d LLM, %d data loop pré-traités)", workflow_id, len(dag_steps), sum(1 for s in dag_steps if s.step_type == StepType.LLM_CALL), len(data_loop_results), ) # Exécuter (bloquant — le timeout protège) result = executor.execute(timeout=timeout) _last_result = result # Fusionner les résultats data loop result_dict = result.to_dict() if data_loop_results: result_dict["data_loop_results"] = data_loop_results logger.info( "Exécution DAG terminée : success=%s, durée=%.2fs", result.success, result.duration_seconds, ) return jsonify({ 'success': True, 'execution': result_dict, }) except ValueError as e: return jsonify({ 'success': False, 'error': f"Erreur de validation : {str(e)}" }), 400 except Exception as e: traceback.print_exc() return jsonify({ 'success': False, 'error': f"Erreur d'exécution : {str(e)}" }), 500 @api_v3_bp.route('/workflow//dag-status', methods=['GET']) def get_dag_status(workflow_id: str): """ Retourne le statut de la dernière exécution DAG. Response : { "success": true, "status": { "steps": {...}, "results": {...}, "summary": {...} } } """ global _current_executor, _last_result try: # Si une exécution est terminée, retourner le résultat if _last_result is not None: return jsonify({ 'success': True, 'completed': True, 'status': _last_result.to_dict(), }) # Si un exécuteur est en cours, retourner son état if _current_executor is not None: return jsonify({ 'success': True, 'completed': False, 'status': _current_executor.get_status(), }) return jsonify({ 'success': True, 'completed': False, 'status': None, 'message': "Aucune exécution DAG en cours", }) except Exception as e: return jsonify({ 'success': False, 'error': str(e) }), 500 # --------------------------------------------------------------------------- # Upload Excel — explorateur de fichier côté navigateur # --------------------------------------------------------------------------- UPLOAD_DIR = os.path.join( os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'data', 'uploads' ) @api_v3_bp.route('/upload-excel', methods=['POST']) def upload_excel(): """Reçoit un fichier Excel uploadé depuis le navigateur. Sauvegarde dans data/uploads/ et retourne le chemin serveur + un nom de table suggéré basé sur le nom du fichier. """ if 'file' not in request.files: return jsonify({'error': 'Aucun fichier reçu'}), 400 file = request.files['file'] if not file.filename: return jsonify({'error': 'Nom de fichier vide'}), 400 # Vérifier l'extension ext = os.path.splitext(file.filename)[1].lower() if ext not in ('.xlsx', '.xls'): return jsonify({'error': f'Format non supporté : {ext}'}), 400 os.makedirs(UPLOAD_DIR, exist_ok=True) save_path = os.path.join(UPLOAD_DIR, file.filename) file.save(save_path) # Nom de table suggéré à partir du nom du fichier import re as _re base = os.path.splitext(file.filename)[0] suggested = _re.sub(r'[^a-zA-Z0-9_]', '_', base).strip('_').lower() if suggested and suggested[0].isdigit(): suggested = 't_' + suggested logger.info(f"Excel uploadé : {save_path} → table suggérée : {suggested}") return jsonify({ 'path': save_path, 'filename': file.filename, 'suggested_table': suggested, }) # --------------------------------------------------------------------------- # Exécution sur Windows — proxy vers le streaming server (port 5005) # --------------------------------------------------------------------------- def _load_anchor_image_b64(anchor_id: str) -> Optional[str]: """Charger l'image d'une ancre et la retourner en base64. Cherche dans 3 emplacements possibles : 1. data/anchors/{id}_full.png (nouveau format V3) 2. data/anchor_images/{id}/original.png (ancien format) 3. SQLite visual_anchors.image_path (chemin absolu en BDD) """ import base64 as b64 backend_dir = Path(__file__).resolve().parent.parent # 1. Nouveau format : data/anchors/{id}_thumb.png (crop de l'ancre, pas le screenshot complet) new_path = backend_dir / 'data' / 'anchors' / f'{anchor_id}_thumb.png' if new_path.exists(): try: with open(new_path, 'rb') as f: return b64.b64encode(f.read()).decode('utf-8') except Exception as e: logger.error("Erreur lecture ancre %s : %s", new_path, e) # 2. Ancien format : data/anchor_images/{id}/original.png old_path = backend_dir / 'data' / 'anchor_images' / anchor_id / 'original.png' if old_path.exists(): try: with open(old_path, 'rb') as f: return b64.b64encode(f.read()).decode('utf-8') except Exception as e: logger.error("Erreur lecture ancre %s : %s", old_path, e) # 3. Chemin depuis la BDD try: import sqlite3 db_path = backend_dir / 'instance' / 'workflows.db' conn = sqlite3.connect(str(db_path)) row = conn.execute("SELECT image_path FROM visual_anchors WHERE id=?", (anchor_id,)).fetchone() conn.close() if row and row[0] and Path(row[0]).exists(): with open(row[0], 'rb') as f: return b64.b64encode(f.read()).decode('utf-8') except Exception as e: logger.error("Erreur lecture ancre BDD %s : %s", anchor_id, e) logger.warning("Image ancre introuvable pour %s", anchor_id) return None def _load_anchor_metadata(anchor_id: str) -> Optional[Dict]: """Charger les métadonnées d'une ancre (bounding_box, taille, etc.).""" backend_dir = Path(__file__).resolve().parent.parent # 1. Ancien format : metadata.json meta_path = backend_dir / 'data' / 'anchor_images' / anchor_id / 'metadata.json' if meta_path.exists(): try: with open(meta_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception: pass # 2. Depuis la BDD visual_anchors try: import sqlite3 db_path = backend_dir / 'instance' / 'workflows.db' conn = sqlite3.connect(str(db_path)) row = conn.execute( "SELECT bbox_x, bbox_y, bbox_width, bbox_height, screen_width, screen_height " "FROM visual_anchors WHERE id=?", (anchor_id,) ).fetchone() conn.close() if row: return { 'bounding_box': {'x': row[0], 'y': row[1], 'width': row[2], 'height': row[3]}, 'original_size': {'width': row[4] or 1920, 'height': row[5] or 1080}, } except Exception: pass return None def _inject_anchor_targeting(action: Dict, anchor_id: str) -> None: """Enrichit une action avec la cible visuelle (x_pct/y_pct + visual_mode/target_spec). Mutation in-place de `action`. Utilisé pour click_anchor*, type_text et type_secret — toute action qui doit cibler une zone visuelle précise avant d'agir (clic ou frappe avec focus). Sans cette injection, l'agent côté Windows ne peut pas faire le pre-click de focus avant `_type_text`, et le texte tape dans le vide. """ if not anchor_id: return anchor_meta = _load_anchor_metadata(anchor_id) # Coordonnées du centre du bbox (fallback si template matching échoue) if anchor_meta: bbox = anchor_meta.get('bounding_box', {}) orig = anchor_meta.get('original_size', {}) orig_w = orig.get('width', 1920) orig_h = orig.get('height', 1080) if bbox.get('x') is not None and orig_w > 0 and orig_h > 0: cx = (bbox['x'] + bbox.get('width', 0) / 2) / orig_w cy = (bbox['y'] + bbox.get('height', 0) / 2) / orig_h action['x_pct'] = round(cx, 4) action['y_pct'] = round(cy, 4) # Image de l'ancre pour template matching côté agent anchor_b64 = _load_anchor_image_b64(anchor_id) if anchor_b64: target_spec = { 'anchor_image_base64': anchor_b64, 'anchor_id': anchor_id, } if anchor_meta: target_spec['anchor_bbox'] = anchor_meta.get('bounding_box', {}) target_spec['original_size'] = anchor_meta.get('original_size', {}) action['visual_mode'] = True action['target_spec'] = target_spec logger.info( "Action %s : ancre '%s' chargée (%d Ko), visual_mode activé", action.get('action_id', '?'), anchor_id, len(anchor_b64) // 1024, ) else: logger.warning( "Action %s : ancre '%s' introuvable, fallback blind mode", action.get('action_id', '?'), anchor_id, ) @api_v3_bp.route('/execute-windows', methods=['POST']) def execute_windows(): """Proxy les actions du workflow vers le streaming server pour exécution sur Windows. Le navigateur ne peut pas contacter le port 5005 directement (CORS/réseau), donc le backend VWB sert de proxy. Pour les actions click_anchor, charge l'image de l'ancre visuelle depuis le disque et l'inclut en base64 dans target_spec afin que l'exécuteur Windows puisse résoudre la position par template matching (visual_mode). """ import requests as req data = request.get_json() if not data: return jsonify({'error': 'Aucune donnée'}), 400 # Vérifier si ce sont uniquement des actions fichiers (pas besoin de wait ni replay) all_file_actions = all( a.get('type', '') in _FILE_ACTION_TYPES for a in data.get('actions', []) ) if data.get('actions') else False # Injecter un délai de 5s SEULEMENT pour les actions UI (pas les fichiers) if not all_file_actions and 'actions' in data and data['actions']: data['actions'].insert(0, { 'type': 'wait', 'action_id': 'wait_before_start', 'parameters': {'duration_ms': 5000}, 'text': '', }) # Mapper les types VWB → types executor Windows TYPE_MAP = { 'click_anchor': 'click', 'double_click_anchor': 'click', 'right_click_anchor': 'click', 'type_text': 'type', 'type_secret': 'type', 'keyboard_shortcut': 'key_combo', 'hotkey': 'key_combo', 'scroll_to_anchor': 'scroll', 'wait_for_anchor': 'wait', 'visual_condition': 'wait', } # Types d'actions basées sur une ancre visuelle (nécessitent visual_mode) _ANCHOR_CLICK_TYPES = {'click_anchor', 'double_click_anchor', 'right_click_anchor'} if 'actions' in data: for action in data['actions']: vwb_type = action.get('type', '') params = action.get('parameters', {}) # Mapper le type VWB → type executor mapped_type = TYPE_MAP.get(vwb_type, vwb_type) action['type'] = mapped_type # --------------------------------------------------------------- # Actions basées sur ancre visuelle → injecter visual_mode # --------------------------------------------------------------- if vwb_type in _ANCHOR_CLICK_TYPES: anchor_id = action.get('anchor_id') if anchor_id: _inject_anchor_targeting(action, anchor_id) # Propagation du by_text (ciblage textuel prioritaire sur template) _by_text = params.get('by_text', '') if _by_text: action['by_text'] = _by_text if 'target_spec' in action: action['target_spec']['by_text'] = _by_text # Mapper le bouton selon le type de clic VWB if vwb_type == 'double_click_anchor': action['button'] = 'double' elif vwb_type == 'right_click_anchor': action['button'] = 'right' # --------------------------------------------------------------- # type_text / type_secret → extraire le texte + cibler la zone # de saisie si une ancre visuelle est associée au step. # Sans ancre, l'agent tape là où le focus se trouve déjà # (compatibilité avec les workflows historiques sans anchor). # --------------------------------------------------------------- if vwb_type in ('type_text', 'type_secret') and 'text' in params: action['text'] = params['text'] anchor_id = action.get('anchor_id') or ( params.get('visual_anchor') or {} ).get('anchor_id') if anchor_id: _inject_anchor_targeting(action, anchor_id) # --------------------------------------------------------------- # keyboard_shortcut / hotkey → extraire les touches # --------------------------------------------------------------- if vwb_type in ('keyboard_shortcut', 'hotkey') and 'keys' in params: action['keys'] = params['keys'] # --------------------------------------------------------------- # Actions fichiers → proxy vers /file-action de l'agent (port 5006) # --------------------------------------------------------------- if 'actions' in data: file_actions = [a for a in data['actions'] if a.get('type', '') in _FILE_ACTION_TYPES] if file_actions: # Exécuter les actions fichiers via l'agent Windows # Auth : Bearer token obligatoire (capture_server.py exige RPA_API_TOKEN) _agent_host = os.environ.get('RPA_WINDOWS_AGENT_HOST', '192.168.1.11') _agent_port = int(os.environ.get('RPA_WINDOWS_AGENT_PORT', '5006')) _agent_url = f'http://{_agent_host}:{_agent_port}/file-action' _api_token = os.environ.get('RPA_API_TOKEN', '') _file_headers = {'Authorization': f'Bearer {_api_token}'} if _api_token else {} file_results = [] for fa in file_actions: try: fa_resp = req.post( _agent_url, json={ 'action': fa['type'], 'params': fa.get('parameters', {}), }, headers=_file_headers, timeout=30, ) if fa_resp.status_code == 401: file_results.append({ 'error': "Agent Windows : auth refusee (verifier RPA_API_TOKEN)", }) else: file_results.append(fa_resp.json()) except req.ConnectionError: file_results.append({ 'error': f"Agent Windows ({_agent_host}:{_agent_port}) non disponible pour l'action fichier" }) except Exception as e: file_results.append({'error': str(e)}) # Si TOUTES les actions sont des actions fichiers, retourner directement non_file_actions = [a for a in data['actions'] if a.get('type', '') not in _FILE_ACTION_TYPES] if not non_file_actions: return jsonify({ 'success': all('error' not in r for r in file_results), 'file_results': file_results, }) # Sinon, retirer les actions fichiers du flux principal data['actions'] = non_file_actions # Token Bearer pour le streaming server (auth obligatoire) _stream_token = os.environ.get('RPA_API_TOKEN', '') _stream_headers = {'Authorization': f'Bearer {_stream_token}'} if _stream_token else {} # L'agent Windows poll sous session "agent_demo_user" (= agent_{user_id}, user_id="demo_user") # On injecte directement dans cette session pour éviter le transfer cross-session # et pour que /replay/raw ne tente pas l'auto-détection d'une session "sess_*" # (qui échoue avec "Aucune session Agent V1 active" si l'agent n'a pas créé de session V1). if not data.get('session_id'): data['session_id'] = 'agent_demo_user' # Forcer le mode supervisé : pause_for_human DÉCLENCHE au lieu d'être # skippée. Le médecin valide la décision Léa avant que les saisies # type_text ne s'exécutent dans l'onglet Codage. Crucial pour la démo # GHT : Léa propose, humain valide, Léa finalise (cf. workflow Urgence). # Sans ça, mode "autonomous" par défaut → pause skippée → saisies # tentées sans validation → désordre visuel. data.setdefault('params', {}) data['params'].setdefault('execution_mode', 'supervised') # Injecter le machine_id pour le ciblage multi-machine. # Cibler la machine Windows la plus récemment active (heartbeat last_activity) # plutôt que la première dans l'ordre arbitraire renvoyé par /machines : # un workflow enregistré sur PC A doit pouvoir être rejoué sur PC B (vision # 100 % visuelle, recalcul anchors+coords selon la résolution courante). # Le workflow.machine_id signale l'origine d'enregistrement, pas la cible # d'exécution — la cible doit être l'agent qui POLLE actuellement. if 'machine_id' not in data or not data.get('machine_id'): try: machines_resp = req.get( 'http://localhost:5005/api/v1/traces/stream/machines', headers=_stream_headers, timeout=3, ) if machines_resp.ok: machines = machines_resp.json().get('machines', []) # Filtrer Windows + non default, trier par last_activity desc windows_machines = [ m for m in machines if m.get('machine_id') and m['machine_id'] != 'default' and 'windows' in m['machine_id'].lower() ] windows_machines.sort( key=lambda m: m.get('last_activity', ''), reverse=True, ) if windows_machines: data['machine_id'] = windows_machines[0]['machine_id'] except Exception: pass try: resp = req.post( 'http://localhost:5005/api/v1/traces/stream/replay/raw', json=data, headers=_stream_headers, timeout=30, ) return jsonify(resp.json()), resp.status_code except req.ConnectionError: return jsonify({'error': 'Streaming server (port 5005) non disponible'}), 503 except Exception as e: return jsonify({'error': str(e)}), 500 # --------------------------------------------------------------------------- # QW4 — Proxy /api/v3/replay/resume → streaming /replay/{id}/resume # Forward Bearer token + body { replay_id, acknowledged_check_ids }. # Le frontend (PauseDialog) appelle /api/v3/replay/resume via le VWB ; # on relaye au streaming server pour valider les acquittements safety_checks. # --------------------------------------------------------------------------- @api_v3_bp.route('/replay/resume', methods=['POST']) def replay_resume_proxy(): """Proxy QW4 vers le serveur streaming pour la reprise avec safety_checks.""" import requests as req data = request.get_json() or {} replay_id = data.get('replay_id') if not replay_id: return jsonify({'error': 'replay_id manquant'}), 400 streaming_url = os.environ.get('RPA_STREAMING_URL', 'http://localhost:5005') token = os.environ.get('RPA_API_TOKEN', '') headers = {'Content-Type': 'application/json'} if token: headers['Authorization'] = f'Bearer {token}' # Body forwardé : uniquement acknowledged_check_ids (replay_id est dans l'URL) forward_body = { 'acknowledged_check_ids': data.get('acknowledged_check_ids') or [], } try: resp = req.post( f'{streaming_url}/api/v1/traces/stream/replay/{replay_id}/resume', json=forward_body, headers=headers, timeout=10, ) return resp.content, resp.status_code, {'Content-Type': 'application/json'} except req.ConnectionError: return jsonify({'error': 'streaming_unreachable', 'detail': f'Streaming server non disponible ({streaming_url})'}), 502 except req.RequestException as e: return jsonify({'error': 'streaming_unreachable', 'detail': str(e)}), 502 # --------------------------------------------------------------------------- # QW4 — Proxy GET /api/v3/replay/state/ → streaming /replay/{id} # Forward Bearer token vers le serveur streaming. # Permet à App.tsx de récupérer le state du replay actif (Agent V1 Windows) # pour afficher PauseDialog quand status = paused_need_help avec safety_checks. # --------------------------------------------------------------------------- @api_v3_bp.route('/replay/state/', methods=['GET']) def replay_state_proxy(replay_id): """Proxy QW4 vers le serveur streaming pour récupérer le state replay actif.""" import requests as req streaming_url = os.environ.get('RPA_STREAMING_URL', 'http://localhost:5005') token = os.environ.get('RPA_API_TOKEN', '') headers = {} if token: headers['Authorization'] = f'Bearer {token}' try: resp = req.get( f'{streaming_url}/api/v1/traces/stream/replay/{replay_id}', headers=headers, timeout=5, ) return resp.content, resp.status_code, {'Content-Type': 'application/json'} except req.ConnectionError: return jsonify({'error': 'streaming_unreachable', 'detail': f'Streaming server non disponible ({streaming_url})'}), 502 except req.RequestException as e: return jsonify({'error': 'streaming_unreachable', 'detail': str(e)}), 502