diff --git a/visual_workflow_builder/backend/actions/data/extraire_tableau.py b/visual_workflow_builder/backend/actions/data/extraire_tableau.py index d184e76fe..12ae1a6a9 100644 --- a/visual_workflow_builder/backend/actions/data/extraire_tableau.py +++ b/visual_workflow_builder/backend/actions/data/extraire_tableau.py @@ -14,6 +14,7 @@ Cas d'usage : from typing import Dict, Any, List, Optional, Tuple from datetime import datetime +import os import time import base64 import io @@ -26,9 +27,9 @@ from ...contracts.error import VWBErrorType, create_vwb_error from ...contracts.visual_anchor import VWBVisualAnchor -# Configuration par défaut -OLLAMA_DEFAULT_URL = "http://localhost:11434" -OLLAMA_DEFAULT_MODEL = "qwen2.5-vl:7b" +# Configuration par défaut (centralisée via variable d'environnement) +OLLAMA_DEFAULT_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") +OLLAMA_DEFAULT_MODEL = os.environ.get("VLM_MODEL", "qwen2.5-vl:7b") class VWBExtraireTableauAction(BaseVWBAction): diff --git a/visual_workflow_builder/backend/actions/validation/verify_text_content.py b/visual_workflow_builder/backend/actions/validation/verify_text_content.py index dd7c45143..455636cea 100644 --- a/visual_workflow_builder/backend/actions/validation/verify_text_content.py +++ b/visual_workflow_builder/backend/actions/validation/verify_text_content.py @@ -12,6 +12,7 @@ Modes OCR disponibles: from typing import Dict, Any, List, Optional from datetime import datetime +import os import time import re import base64 @@ -36,9 +37,9 @@ class VWBVerifyTextContentAction(BaseVWBAction): - easyocr: OCR traditionnel (plus rapide, fallback) """ - # Configuration Ollama par défaut - OLLAMA_URL = "http://localhost:11434" - OLLAMA_MODEL = "qwen2.5-vl:7b" # Modèle de vision Qwen - excellent pour OCR + # Configuration Ollama par défaut (centralisée via variable d'environnement) + OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") + OLLAMA_MODEL = os.environ.get("VLM_MODEL", "qwen2.5-vl:7b") # Modèle de vision Qwen - excellent pour OCR def __init__( self, diff --git a/visual_workflow_builder/backend/api_v3/execute.py b/visual_workflow_builder/backend/api_v3/execute.py index 9db8cfff6..c8d413d76 100644 --- a/visual_workflow_builder/backend/api_v3/execute.py +++ b/visual_workflow_builder/backend/api_v3/execute.py @@ -16,9 +16,68 @@ import threading import time import base64 import os +import logging import subprocess from . import api_v3_bp +logger = logging.getLogger(__name__) + + +def safe_type_text(text): + """Saisie de texte compatible avec tous les claviers (AZERTY/QWERTY). + + pyautogui.write() envoie des codes de touches QWERTY qui produisent + des caractères erronés sur AZERTY (* → µ, ( → 5, ) → °, etc.). + + Priorité : + 1. Presse-papier (xclip) + Ctrl+V → instantané, fiable pour tout texte + 2. xdotool type → respecte le layout clavier + 3. pyautogui.write() → dernier recours + """ + import shutil + import pyautogui + + # Méthode 1 : Presse-papier (le plus fiable, gère UTF-8/accents/CJK) + # xclip reste en vie comme daemon X11 pour servir le clipboard. + # On ne doit PAS attendre sa terminaison — juste envoyer les données + # et coller immédiatement. + xclip = shutil.which('xclip') + if xclip: + try: + p = subprocess.Popen( + ['xclip', '-selection', 'clipboard'], + stdin=subprocess.PIPE, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ) + p.stdin.write(text.encode('utf-8')) + p.stdin.close() + # Ne PAS attendre xclip — il reste vivant comme owner du clipboard + time.sleep(0.2) + pyautogui.hotkey('ctrl', 'v') + time.sleep(0.3) + print(f" ✅ Saisie via presse-papier ({len(text)} car.)") + return + except Exception as e: + print(f" ⚠️ xclip échoué: {e}") + + # Méthode 2 : xdotool type (respecte le layout clavier actif) + if shutil.which('xdotool'): + try: + subprocess.run( + ['xdotool', 'type', '--delay', '0', '--clearmodifiers', '--', text], + timeout=max(30, len(text) * 0.05), + check=True + ) + print(f" ✅ Saisie via xdotool type ({len(text)} car.)") + return + except Exception as e: + print(f" ⚠️ xdotool type échoué: {e}") + + # Méthode 3 : pyautogui (dernier recours — problèmes AZERTY possibles) + print(" ⚠️ Saisie via pyautogui.write() (AZERTY non garanti)") + pyautogui.write(text) + def minimize_active_window(): """Minimise la fenêtre active (Linux avec xdotool)""" @@ -37,7 +96,7 @@ def minimize_active_window(): print(f"⚠️ [Execute] Erreur minimisation: {e}") return False from db.models import db, Workflow, Step, Execution, ExecutionStep, VisualAnchor, get_session_state -from contracts.action_contracts import enforce_action_contract, ContractValidationError, get_required_params +from contracts.action_contracts import enforce_action_contract, ContractValidationError def generate_id(prefix: str) -> str: @@ -45,6 +104,14 @@ def generate_id(prefix: str) -> str: return f"{prefix}_{uuid.uuid4().hex[:12]}_{int(datetime.now().timestamp())}" +# Thread-safety : verrou pour protéger _execution_state (accès R/W depuis le +# thread d'exécution ET les handlers HTTP Flask en parallèle). +_execution_lock = threading.RLock() + +# Event pour remplacer le polling actif du self-healing : +# le thread d'exécution attend (.wait), le handler HTTP signale (.set). +_healing_event = threading.Event() + # État de l'exécution en cours (en mémoire) _execution_state = { 'is_running': False, @@ -53,6 +120,7 @@ _execution_state = { 'current_execution_id': None, 'thread': None, 'execution_mode': 'basic', # 'basic', 'intelligent', 'debug' + 'variables': {}, # Variables runtime du workflow (initialisé ici, plus de création dynamique) # Self-healing interactif 'waiting_for_choice': False, 'pending_action': None, # Action en attente de choix utilisateur @@ -75,7 +143,7 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): workflow = Workflow.query.get(workflow_id) if not execution or not workflow: - print(f"❌ [Execute] Workflow ou exécution non trouvé") + logger.error("Workflow ou exécution non trouvé") return steps = workflow.steps.order_by(Step.order).all() @@ -84,7 +152,7 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): execution.started_at = datetime.utcnow() db.session.commit() - print(f"🚀 [Execute] Démarrage workflow {workflow_id}: {len(steps)} étapes") + logger.info(f"Démarrage workflow {workflow_id}: {len(steps)} étapes") for index, step in enumerate(steps): # Vérifier si arrêt demandé @@ -155,7 +223,7 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): try: enforce_action_contract(step.action_type, params) except ContractValidationError as e: - print(f"🚫 [Execute] CONTRAT VIOLÉ pour étape {step.id}: {e}") + logger.warning(f"CONTRAT VIOLÉ pour étape {step.id}: {e}") step_result.status = 'error' step_result.error_message = f"Contrat violé: {str(e)}" step_result.ended_at = datetime.utcnow() @@ -176,34 +244,31 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): print(f"🔄 [Self-Healing] Attente choix utilisateur pour étape {index + 1}") # Stocker les informations pour le frontend - _execution_state['waiting_for_choice'] = True - _execution_state['pending_action'] = { - 'step_id': step.id, - 'step_index': index, - 'action_type': step.action_type, - 'params': params - } - _execution_state['candidates'] = result.get('candidates', []) - _execution_state['current_step_info'] = { - 'index': index, - 'total': len(steps), - 'original_bbox': result.get('original_bbox'), - 'error': result.get('error') - } - _execution_state['user_choice'] = None + with _execution_lock: + _healing_event.clear() + _execution_state['waiting_for_choice'] = True + _execution_state['pending_action'] = { + 'step_id': step.id, + 'step_index': index, + 'action_type': step.action_type, + 'params': params + } + _execution_state['candidates'] = result.get('candidates', []) + _execution_state['current_step_info'] = { + 'index': index, + 'total': len(steps), + 'original_bbox': result.get('original_bbox'), + 'error': result.get('error') + } + _execution_state['user_choice'] = None # Mettre à jour le status de l'exécution execution.status = 'waiting_user_choice' db.session.commit() - # Attendre le choix de l'utilisateur + # Attendre le choix de l'utilisateur (Event au lieu de polling) timeout_seconds = 120 # 2 minutes max - waited = 0 - while _execution_state['waiting_for_choice'] and waited < timeout_seconds: - if _execution_state['should_stop']: - break - time.sleep(0.5) - waited += 0.5 + _healing_event.wait(timeout=timeout_seconds) # Vérifier si on doit arrêter if _execution_state['should_stop']: @@ -211,10 +276,11 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): break # Traiter le choix de l'utilisateur - user_choice = _execution_state['user_choice'] - _execution_state['waiting_for_choice'] = False - _execution_state['pending_action'] = None - _execution_state['candidates'] = [] + with _execution_lock: + user_choice = _execution_state['user_choice'] + _execution_state['waiting_for_choice'] = False + _execution_state['pending_action'] = None + _execution_state['candidates'] = [] if user_choice is None: # Timeout - aucun choix @@ -261,7 +327,7 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): step_result.status = 'error' step_result.error_message = result.get('error', 'Erreur inconnue') execution.failed_steps += 1 - print(f"❌ [Execute] Étape {index + 1} échouée: {step_result.error_message}") + logger.warning(f"Étape {index + 1} échouée: {step_result.error_message}") # Arrêter sur erreur execution.status = 'error' @@ -272,7 +338,7 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): db.session.commit() except Exception as e: - print(f"❌ [Execute] Exception étape {index + 1}: {e}") + logger.error(f"Exception étape {index + 1}: {e}", exc_info=True) step_result.status = 'error' step_result.error_message = str(e) step_result.ended_at = datetime.utcnow() @@ -289,11 +355,10 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): execution.ended_at = datetime.utcnow() db.session.commit() - print(f"🏁 [Execute] Workflow terminé: {execution.status}") - print(f" Complétées: {execution.completed_steps}, Échouées: {execution.failed_steps}") + logger.info(f"Workflow terminé: {execution.status} (complétées: {execution.completed_steps}, échouées: {execution.failed_steps})") except Exception as e: - print(f"❌ [Execute] Erreur fatale: {e}") + logger.error(f"Erreur fatale: {e}", exc_info=True) try: execution = Execution.query.get(execution_id) if execution: @@ -301,74 +366,130 @@ def execute_workflow_thread(execution_id: str, workflow_id: str, app): execution.error_message = f"Erreur fatale: {str(e)}" execution.ended_at = datetime.utcnow() db.session.commit() - except: - pass + except Exception as db_err: + print(f"⚠️ [Execute] DB cleanup error: {db_err}") finally: - _execution_state['is_running'] = False - _execution_state['current_execution_id'] = None + with _execution_lock: + _execution_state['is_running'] = False + _execution_state['current_execution_id'] = None def execute_ai_analyze(params: dict) -> dict: """ Exécute une analyse IA avec Ollama. - Capture la zone de l'ancre et envoie à l'IA pour analyse. + Deux modes : + - Mode texte : envoie le texte brut directement (meilleure qualité) + - Mode image : capture la zone de l'ancre et envoie le screenshot """ import requests + import re + global _execution_state try: - # Récupérer les paramètres - anchor = params.get('visual_anchor', {}) prompt = params.get('analysis_prompt', params.get('prompt', '')) - model = params.get('model', params.get('ollama_model', 'qwen2.5-vl:7b')) + model = params.get('model', params.get('ollama_model', 'qwen3-vl:8b')) output_variable = params.get('output_variable', 'resultat_analyse') - timeout_ms = params.get('timeout_ms', 60000) - temperature = params.get('temperature', 0.3) + timeout_ms = params.get('timeout_ms', 120000) # 2 minutes par défaut + temperature = params.get('temperature', 0.7) # Même défaut que CLI Ollama + max_tokens = params.get('max_tokens', -1) # -1 = illimité (défaut Ollama) + input_text = params.get('input_text', '') - # Récupérer l'image de l'ancre - screenshot_base64 = anchor.get('screenshot') + # Résoudre les variables {{var}} dans input_text + variables = _execution_state.get('variables', {}) + if input_text and '{{' in input_text: + def replace_var(match): + var_name = match.group(1) + value = variables.get(var_name, match.group(0)) + print(f" 📌 Variable {{{{{var_name}}}}} → {str(value)[:50]}...") + return str(value) + input_text = re.sub(r'\{\{(\w+)\}\}', replace_var, input_text) - if not screenshot_base64: - # Capturer l'écran si pas d'image dans l'ancre - try: - from PIL import ImageGrab - import io + # Déterminer le mode : texte ou image + use_text_mode = bool(input_text) + anchor = params.get('visual_anchor', {}) - bbox = anchor.get('bounding_box', {}) - if bbox: - # Capturer la zone spécifique - x, y = int(bbox.get('x', 0)), int(bbox.get('y', 0)) - w, h = int(bbox.get('width', 100)), int(bbox.get('height', 100)) - screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h)) - else: - # Capturer tout l'écran - screenshot = ImageGrab.grab() + print(f"🤖 [IA] Mode: {'TEXTE' if use_text_mode else 'IMAGE'}") - buffer = io.BytesIO() - screenshot.save(buffer, format='PNG') - screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') - except Exception as cap_err: - return {'success': False, 'error': f"Erreur capture: {cap_err}"} + if use_text_mode: + # ═══ MODE TEXTE : envoyer le texte directement (comme en CLI) ═══ + print(f"📝 [IA] Texte brut: {len(input_text)} caractères") - if not prompt: - prompt = "Décris ce que tu vois dans cette image." + # Construire le prompt complet avec le texte en entrée + if prompt: + full_prompt = f"{prompt}\n\nVoici le texte :\n{input_text}" + else: + full_prompt = input_text - print(f"🤖 [IA] Analyse avec {model}...") - print(f" Prompt: {prompt[:80]}...") + # Pour les modèles Qwen, désactiver le thinking étendu + if 'qwen' in model.lower() and not full_prompt.startswith('/no_think'): + full_prompt = f"/no_think\n{full_prompt}" - # Appeler Ollama - ollama_url = params.get('ollama_url', 'http://localhost:11434') + print(f"🤖 [IA] Analyse texte avec {model}...") - payload = { - "model": model, - "prompt": prompt, - "images": [screenshot_base64], - "stream": False, - "options": { - "temperature": temperature, - "num_predict": 1000 + ollama_url = params.get('ollama_url', 'http://localhost:11434') + options = {"temperature": temperature} + if max_tokens > 0: + options["num_predict"] = max_tokens + payload = { + "model": model, + "prompt": full_prompt, + "stream": False, + "options": options } - } + + else: + # ═══ MODE IMAGE : capturer l'écran et envoyer le screenshot ═══ + screenshot_base64 = anchor.get('screenshot') if anchor else None + + if not screenshot_base64: + try: + from PIL import ImageGrab + import io + + bbox = anchor.get('bounding_box', {}) if anchor else {} + + if bbox: + x, y = int(bbox.get('x', 0)), int(bbox.get('y', 0)) + w, h = int(bbox.get('width', 100)), int(bbox.get('height', 100)) + print(f"📸 [IA] Capture zone: ({x}, {y}) -> ({x+w}, {y+h})") + screenshot = ImageGrab.grab(bbox=(x, y, x + w, y + h)) + else: + print(f"📸 [IA] Capture écran complet") + screenshot = ImageGrab.grab() + + buffer = io.BytesIO() + screenshot.save(buffer, format='PNG') + screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') + except Exception as cap_err: + return {'success': False, 'error': f"Erreur capture: {cap_err}"} + + if not screenshot_base64: + return {'success': False, 'error': "Pas d'image à analyser (ni ancre, ni capture)"} + + if not prompt: + prompt = "Décris ce que tu vois dans cette image." + + full_prompt = prompt + if 'qwen' in model.lower() and not full_prompt.startswith('/no_think'): + full_prompt = f"/no_think\n{full_prompt}" + + print(f"🤖 [IA] Analyse image avec {model}...") + + ollama_url = params.get('ollama_url', 'http://localhost:11434') + options = {"temperature": temperature} + if max_tokens > 0: + options["num_predict"] = max_tokens + payload = { + "model": model, + "prompt": full_prompt, + "images": [screenshot_base64], + "stream": False, + "options": options + } + + # ═══ APPEL OLLAMA ═══ + print(f" Prompt: {full_prompt[:100]}...") response = requests.post( f"{ollama_url}/api/generate", @@ -380,13 +501,15 @@ def execute_ai_analyze(params: dict) -> dict: result = response.json() analysis_text = result.get('response', '').strip() - print(f"✅ [IA] Analyse terminée ({len(analysis_text)} caractères)") - print(f" Résultat: {analysis_text[:150]}...") + # Fallback : extraire du champ thinking si response vide + if not analysis_text and result.get('thinking'): + analysis_text = result.get('thinking', '').strip() - # Stocker le résultat dans le contexte d'exécution pour les variables - global _execution_state - if 'variables' not in _execution_state: - _execution_state['variables'] = {} + print(f"✅ [IA] Analyse terminée ({len(analysis_text)} caractères)") + if analysis_text: + print(f" Résultat: {analysis_text[:150]}...") + + # Stocker dans les variables d'exécution _execution_state['variables'][output_variable] = analysis_text return { @@ -394,7 +517,8 @@ def execute_ai_analyze(params: dict) -> dict: 'output': { 'analysis': analysis_text, 'variable': output_variable, - 'model': model + 'model': model, + 'mode': 'text' if use_text_mode else 'image' } } else: @@ -582,27 +706,34 @@ def execute_action(action_type: str, params: dict) -> dict: } except Exception as vision_err: - print(f"❌ [Vision] Erreur: {vision_err}") - return { - 'success': False, - 'error': f"Erreur vision: {str(vision_err)}" - } + print(f"⚠️ [Vision] Erreur: {vision_err}") + if execution_mode in ['intelligent', 'debug']: + # En mode visuel, on NE fait PAS de fallback statique + return { + 'success': False, + 'error': f"Erreur vision: {vision_err}. Ancre introuvable à l'écran." + } + else: + print(f"🔄 [Fallback] Mode basic: utilisation des coordonnées statiques...") - # === MODE BASIC (ou fallback) === - # Calculer le centre depuis les coordonnées statiques - x = bbox.get('x', 0) + bbox.get('width', 0) / 2 - y = bbox.get('y', 0) + bbox.get('height', 0) / 2 + # === MODE BASIC uniquement === + if execution_mode not in ['intelligent', 'debug']: + x = bbox.get('x', 0) + bbox.get('width', 0) / 2 + y = bbox.get('y', 0) + bbox.get('height', 0) / 2 - print(f"🖱️ [Action] Clic {click_type} à ({x}, {y}) [mode: {execution_mode}]") + print(f"🖱️ [Action] Clic {click_type} à ({x}, {y}) [mode: basic, coordonnées statiques]") - if click_type == 'double': - pyautogui.doubleClick(x, y) - elif click_type == 'right': - pyautogui.rightClick(x, y) - else: - pyautogui.click(x, y) + if click_type == 'double': + pyautogui.doubleClick(x, y) + elif click_type == 'right': + pyautogui.rightClick(x, y) + else: + pyautogui.click(x, y) - return {'success': True, 'output': {'clicked_at': {'x': x, 'y': y}, 'mode': execution_mode}} + return {'success': True, 'output': {'clicked_at': {'x': x, 'y': y}, 'mode': 'basic'}} + + # En mode intelligent/debug, si on arrive ici c'est que la vision n'a pas été tentée + return {'success': False, 'error': 'Ancre non trouvée (aucune méthode visuelle disponible)'} elif action_type in ['type_text', 'type']: text = params.get('text', '') @@ -631,8 +762,14 @@ def execute_action(action_type: str, params: dict) -> dict: # Petit délai pour s'assurer que le focus est bon time.sleep(0.2) - # Utiliser write() pour supporter l'unicode (caractères français, etc.) - pyautogui.write(text) + # Saisie compatible AZERTY/QWERTY (presse-papier > xdotool > pyautogui) + safe_type_text(text) + + # Stocker le texte dans une variable si output_variable est défini + output_variable = params.get('output_variable') + if output_variable: + _execution_state['variables'][output_variable] = text + print(f" 📦 Texte stocké dans variable '{output_variable}' ({len(text)} caractères)") return {'success': True, 'output': {'typed': text[:100] + '...' if len(text) > 100 else text}} @@ -683,7 +820,7 @@ def start_execution(): data = request.get_json() or {} workflow_id = data.get('workflow_id') - execution_mode = data.get('execution_mode', 'basic') + execution_mode = data.get('execution_mode', 'intelligent') minimize_browser = data.get('minimize_browser', True) # Activé par défaut # Valider le mode @@ -727,12 +864,14 @@ def start_execution(): session = get_session_state() session.active_execution_id = execution.id - # Réinitialiser l'état - _execution_state['is_running'] = True - _execution_state['is_paused'] = False - _execution_state['should_stop'] = False - _execution_state['current_execution_id'] = execution.id - _execution_state['execution_mode'] = execution_mode + # Réinitialiser l'état (protégé par lock) + with _execution_lock: + _execution_state['is_running'] = True + _execution_state['is_paused'] = False + _execution_state['should_stop'] = False + _execution_state['current_execution_id'] = execution.id + _execution_state['execution_mode'] = execution_mode + _execution_state['variables'] = {} # Reset des variables print(f"🎯 [API v3] Mode d'exécution: {execution_mode}") @@ -762,7 +901,8 @@ def start_execution(): except Exception as e: db.session.rollback() - _execution_state['is_running'] = False + with _execution_lock: + _execution_state['is_running'] = False return jsonify({ 'success': False, 'error': str(e) @@ -832,14 +972,17 @@ def stop_execution(): """Arrête l'exécution""" global _execution_state - if not _execution_state['is_running']: - return jsonify({ - 'success': False, - 'error': "Aucune exécution en cours" - }), 400 + with _execution_lock: + if not _execution_state['is_running']: + return jsonify({ + 'success': False, + 'error': "Aucune exécution en cours" + }), 400 - _execution_state['should_stop'] = True - _execution_state['is_paused'] = False + _execution_state['should_stop'] = True + _execution_state['is_paused'] = False + # Débloquer le thread s'il attend un choix self-healing + _healing_event.set() print(f"⛔ [API v3] Arrêt demandé") @@ -876,6 +1019,8 @@ def get_execution_status(): 'execution_mode': _execution_state.get('execution_mode', 'basic'), 'execution': execution.to_dict() if execution else None, 'session': session.to_dict(), + # Variables runtime du workflow + 'variables': _execution_state.get('variables', {}), # Self-healing interactif 'waiting_for_choice': _execution_state.get('waiting_for_choice', False), 'candidates': _execution_state.get('candidates', []) if _execution_state.get('waiting_for_choice') else [], @@ -975,9 +1120,11 @@ def submit_healing_choice(): 'error': "Coordonnées invalides. Format attendu: {x: number, y: number}" }), 400 - # Stocker le choix - _execution_state['user_choice'] = choice - _execution_state['waiting_for_choice'] = False + # Stocker le choix et réveiller le thread d'exécution + with _execution_lock: + _execution_state['user_choice'] = choice + _execution_state['waiting_for_choice'] = False + _healing_event.set() print(f"✅ [Self-Healing] Choix reçu: {choice}") diff --git a/visual_workflow_builder/backend/app.py b/visual_workflow_builder/backend/app.py index 8f28329bd..0d011e5ea 100644 --- a/visual_workflow_builder/backend/app.py +++ b/visual_workflow_builder/backend/app.py @@ -12,6 +12,8 @@ from flask_socketio import SocketIO from flask_caching import Cache from flask_migrate import Migrate import os +import logging +from logging.handlers import RotatingFileHandler from dotenv import load_dotenv # Load environment variables @@ -20,6 +22,25 @@ load_dotenv() # Initialize Flask app app = Flask(__name__) +# ============================================================ +# Logging — fichier rotatif + console +# ============================================================ +_log_dir = os.path.join(os.path.dirname(__file__), 'logs') +os.makedirs(_log_dir, exist_ok=True) + +_file_handler = RotatingFileHandler( + os.path.join(_log_dir, 'vwb.log'), + maxBytes=5 * 1024 * 1024, # 5 MB + backupCount=3 +) +_file_handler.setLevel(logging.INFO) +_file_handler.setFormatter(logging.Formatter( + '%(asctime)s [%(levelname)s] %(name)s: %(message)s' +)) + +logging.getLogger().addHandler(_file_handler) +logging.getLogger().setLevel(logging.INFO) + # Configuration app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production') app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vwb_v3.db') @@ -134,18 +155,11 @@ except ImportError as e: # Catalogue VWB - actions VisionOnly # V2 avec VLM (Vision Language Model) pour détection intelligente try: - from catalog_routes_v2_vlm import catalog_bp + from catalog_routes_v2_vlm import catalog_bp, VLM_MODEL app.register_blueprint(catalog_bp) - print("✅ Blueprint catalog V2 VLM (Ollama qwen2.5vl) enregistré") + print(f"✅ Blueprint catalog V2 VLM (Ollama {VLM_MODEL}) enregistré") except ImportError as e: print(f"⚠️ Blueprint catalog V2 VLM désactivé: {e}") - # Fallback sur la version pyautogui - try: - from catalog_routes import catalog_bp - app.register_blueprint(catalog_bp) - print("✅ Blueprint catalog (fallback pyautogui) enregistré") - except ImportError as e2: - print(f"⚠️ Blueprint catalog désactivé: {e2}") # API Images Ancres Visuelles - stockage serveur try: @@ -245,13 +259,13 @@ def execute_workflow_step(): 'parameters': parameters } - # Call the internal catalog execute endpoint - from catalog_routes import catalog_bp + # Call the internal catalog execute endpoint (v2 VLM) + from catalog_routes_v2_vlm import catalog_bp # Direct execution via catalog try: # Import the execute function directly - from catalog_routes import execute_action as catalog_execute + from catalog_routes_v2_vlm import execute_action as catalog_execute # We need to simulate Flask request context - use internal call from flask import current_app with current_app.test_request_context( diff --git a/visual_workflow_builder/backend/app_lightweight.py b/visual_workflow_builder/backend/app_lightweight.py index fcd6e8c38..0c3659b35 100644 --- a/visual_workflow_builder/backend/app_lightweight.py +++ b/visual_workflow_builder/backend/app_lightweight.py @@ -50,7 +50,7 @@ except ImportError as e: # Import des routes du catalogue VWB try: - from catalog_routes import register_catalog_routes + from catalog_routes_v2_vlm import register_catalog_routes CATALOG_ROUTES_AVAILABLE = True print("✅ Routes du catalogue VWB disponibles") except ImportError as e: diff --git a/visual_workflow_builder/backend/catalog_routes.py b/visual_workflow_builder/backend/catalog_routes.py deleted file mode 100644 index de1f42478..000000000 --- a/visual_workflow_builder/backend/catalog_routes.py +++ /dev/null @@ -1,1832 +0,0 @@ -""" -Routes API Catalogue VWB - Endpoints pour Actions VisionOnly - -Auteur : Dom, Alice, Kiro - 09 janvier 2026 - -Ce module définit les routes Flask pour l'API du catalogue d'actions VisionOnly -du Visual Workflow Builder. - -Endpoints : -- GET /api/vwb/catalog/actions : Liste des actions disponibles -- POST /api/vwb/catalog/execute : Exécution d'une action -- GET /api/vwb/catalog/actions/{action_id} : Détails d'une action -- POST /api/vwb/catalog/validate : Validation d'une action -""" - -from flask import Blueprint, request, jsonify -from typing import Dict, Any, List, Optional -from datetime import datetime -import traceback - -# Import des actions et contrats VWB -try: - from visual_workflow_builder.backend.actions import ( - BaseVWBAction, VWBActionResult, VWBActionStatus, - VWBClickAnchorAction, VWBTypeTextAction, VWBWaitForAnchorAction, - VWBFocusAnchorAction, VWBTypeSecretAction, VWBScrollToAnchorAction, - VWBExtractTextAction - ) - from visual_workflow_builder.backend.contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error - from visual_workflow_builder.backend.contracts.evidence import VWBEvidenceType - from visual_workflow_builder.backend.contracts.visual_anchor import VWBVisualAnchor, VWBVisualAnchorType - from visual_workflow_builder.backend.contracts.action_contracts import ( - enforce_action_contract, ContractValidationError, get_required_params - ) - ACTIONS_AVAILABLE = True - CONTRACTS_AVAILABLE = True - print("✅ Actions VWB importées avec succès") -except ImportError as e: - print(f"⚠️ Actions VWB non disponibles: {e}") - try: - # Essayer import relatif - from .actions import ( - BaseVWBAction, VWBActionResult, VWBActionStatus, - VWBClickAnchorAction, VWBTypeTextAction, VWBWaitForAnchorAction, - VWBFocusAnchorAction, VWBTypeSecretAction, VWBScrollToAnchorAction, - VWBExtractTextAction - ) - from .contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error - from .contracts.evidence import VWBEvidenceType - from .contracts.visual_anchor import VWBVisualAnchor, VWBVisualAnchorType - from .contracts.action_contracts import ( - enforce_action_contract, ContractValidationError, get_required_params - ) - ACTIONS_AVAILABLE = True - CONTRACTS_AVAILABLE = True - print("✅ Actions VWB importées avec import relatif") - except ImportError as e2: - print(f"⚠️ Import relatif échoué aussi: {e2}") - ACTIONS_AVAILABLE = False - CONTRACTS_AVAILABLE = False - BaseVWBAction = None - -# Import du ScreenCapturer (Option A thread-safe) -try: - import sys - from pathlib import Path - sys.path.insert(0, str(Path(__file__).parent.parent.parent)) - from core.capture import ScreenCapturer - SCREEN_CAPTURER_AVAILABLE = True -except ImportError: - SCREEN_CAPTURER_AVAILABLE = False - ScreenCapturer = None - -# Import pour la recherche visuelle dynamique -import os -try: - import cv2 - import numpy as np - import base64 - from io import BytesIO - from PIL import Image - CV2_AVAILABLE = True -except ImportError: - CV2_AVAILABLE = False - cv2 = None - np = None - - -def find_visual_anchor_on_screen(anchor_image_base64: str, confidence_threshold: float = 0.7, bounding_box: Dict = None, expected_position: Dict = None) -> Optional[Dict[str, Any]]: - """ - Recherche dynamiquement une ancre visuelle sur l'écran actuel. - - Utilise le template matching OpenCV pour trouver l'image de l'ancre - sur la capture d'écran actuelle. - - Args: - anchor_image_base64: Image de l'ancre encodée en base64 - confidence_threshold: Seuil de confiance minimum (0.0-1.0) - bounding_box: Si fourni, extrait cette région de l'image de l'ancre avant recherche - expected_position: Position attendue (x, y) pour validation - si la position trouvée - est trop éloignée (>300px), on retourne None - - Returns: - Dict avec les coordonnées trouvées ou None si non trouvé - { - "found": True, - "x": 100, - "y": 200, - "width": 50, - "height": 30, - "confidence": 0.95, - "center_x": 125, - "center_y": 215 - } - """ - if not CV2_AVAILABLE: - print("⚠️ OpenCV non disponible pour la recherche visuelle") - return None - - try: - import pyautogui - - # 1. Capturer l'écran actuel - print("📸 [Visual Search] Capture de l'écran...") - screenshot = pyautogui.screenshot() - screen_array = np.array(screenshot) - screen_bgr = cv2.cvtColor(screen_array, cv2.COLOR_RGB2BGR) - screen_gray = cv2.cvtColor(screen_bgr, cv2.COLOR_BGR2GRAY) - screen_h, screen_w = screen_gray.shape[:2] - - # 2. Décoder l'image de l'ancre depuis base64 - print("🔍 [Visual Search] Décodage de l'image de l'ancre...") - - # Gérer les différents formats base64 - if ',' in anchor_image_base64: - # Format "data:image/png;base64,XXXX" - anchor_image_base64 = anchor_image_base64.split(',')[1] - - anchor_bytes = base64.b64decode(anchor_image_base64) - anchor_pil = Image.open(BytesIO(anchor_bytes)) - anchor_array = np.array(anchor_pil.convert('RGB')) - - # 3. Si bounding_box fourni et que l'image est grande, extraire la région - ref_h, ref_w = anchor_array.shape[:2] - print(f"📐 [Visual Search] Image référence: {ref_w}x{ref_h}") - - if bounding_box and ref_w > 200 and ref_h > 200: - # L'image de référence est l'écran capturé (ex: 1200x675) - # Le bounding_box contient les coordonnées de sélection EN PIXELS de cette image - # Donc on utilise directement ces coordonnées pour l'extraction (pas de scale!) - bbox_x = bounding_box.get('x', 0) - bbox_y = bounding_box.get('y', 0) - bbox_w = bounding_box.get('width', 100) - bbox_h = bounding_box.get('height', 100) - - print(f"📊 [Visual Search] Image référence: {ref_w}x{ref_h}, Écran actuel: {screen_w}x{screen_h}") - print(f"📊 [Visual Search] BBox (pixels image): x={bbox_x:.1f}, y={bbox_y:.1f}, w={bbox_w:.1f}, h={bbox_h:.1f}") - - # Les coordonnées du bounding_box sont DÉJÀ en pixels de l'image de référence - # Pas besoin de scale pour l'extraction! - crop_x = int(bbox_x) - crop_y = int(bbox_y) - crop_w = int(bbox_w) - crop_h = int(bbox_h) - - # S'assurer que les coordonnées sont valides - crop_x = max(0, min(crop_x, ref_w - 10)) - crop_y = max(0, min(crop_y, ref_h - 10)) - crop_w = max(20, min(crop_w, ref_w - crop_x)) - crop_h = max(20, min(crop_h, ref_h - crop_y)) - - print(f"✂️ [Visual Search] Extraction région: ({crop_x}, {crop_y}) - {crop_w}x{crop_h}") - - # Extraire la région de l'image de référence - anchor_array = anchor_array[crop_y:crop_y+crop_h, crop_x:crop_x+crop_w] - - anchor_bgr = cv2.cvtColor(anchor_array, cv2.COLOR_RGB2BGR) - anchor_gray = cv2.cvtColor(anchor_bgr, cv2.COLOR_BGR2GRAY) - - anchor_h, anchor_w = anchor_gray.shape[:2] - print(f"📐 [Visual Search] Taille ancre finale: {anchor_w}x{anchor_h}") - - # 3. Template matching avec plusieurs méthodes - methods = [ - (cv2.TM_CCOEFF_NORMED, "TM_CCOEFF_NORMED"), - (cv2.TM_CCORR_NORMED, "TM_CCORR_NORMED"), - ] - - best_result = None - best_confidence = 0 - - for method, method_name in methods: - result = cv2.matchTemplate(screen_gray, anchor_gray, method) - min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) - - confidence = max_val - - if confidence > best_confidence: - best_confidence = confidence - best_result = { - "found": True, - "x": max_loc[0], - "y": max_loc[1], - "width": anchor_w, - "height": anchor_h, - "confidence": confidence, - "center_x": max_loc[0] + anchor_w // 2, - "center_y": max_loc[1] + anchor_h // 2, - "method": method_name - } - - if best_result and best_result["confidence"] >= confidence_threshold: - print(f"✅ [Visual Search] Ancre trouvée!") - print(f" Position: ({best_result['center_x']}, {best_result['center_y']})") - print(f" Confiance: {best_result['confidence']:.2%}") - print(f" Méthode: {best_result['method']}") - - # Validation de la position attendue - # Évite de cliquer sur une copie de l'image (ex: dans l'interface VWB) - if expected_position: - exp_x = expected_position.get('x', 0) - exp_y = expected_position.get('y', 0) - found_x = best_result['center_x'] - found_y = best_result['center_y'] - distance = ((found_x - exp_x) ** 2 + (found_y - exp_y) ** 2) ** 0.5 - - print(f"📍 [Visual Search] Validation position:") - print(f" Attendue: ({exp_x}, {exp_y})") - print(f" Trouvée: ({found_x}, {found_y})") - print(f" Distance: {distance:.0f}px") - - MAX_DISTANCE = 300 # Tolérance en pixels - HIGH_CONFIDENCE_THRESHOLD = 0.75 # Seuil de haute confiance (abaissé pour être plus permissif) - - if distance > MAX_DISTANCE: - # Si confiance suffisante, on fait confiance à Visual Search même si position éloignée - # Car les coordonnées statiques peuvent être incorrectes (capture faite à un autre moment) - if best_result['confidence'] >= HIGH_CONFIDENCE_THRESHOLD: - print(f"⚠️ [Visual Search] Position éloignée ({distance:.0f}px > {MAX_DISTANCE}px)") - print(f" MAIS confiance suffisante ({best_result['confidence']:.2%} >= {HIGH_CONFIDENCE_THRESHOLD:.0%})") - print(f" → Utilisation de la position Visual Search (coordonnées statiques ignorées)") - else: - print(f"⚠️ [Visual Search] Position trop éloignée ({distance:.0f}px > {MAX_DISTANCE}px)") - print(f" Confiance insuffisante ({best_result['confidence']:.2%} < {HIGH_CONFIDENCE_THRESHOLD:.0%})") - print(f" → Utilisation des coordonnées statiques") - return None - else: - print(f"✅ [Visual Search] Position validée (distance < {MAX_DISTANCE}px)") - - return best_result - else: - conf_found = best_confidence if best_result else 0 - print(f"❌ [Visual Search] Ancre non trouvée (confiance: {conf_found:.2%} < {confidence_threshold:.2%})") - return None - - except Exception as e: - print(f"❌ [Visual Search] Erreur: {e}") - traceback.print_exc() - return None - - -def find_anchor_multiscale(anchor_image_base64: str, scales: List[float] = None, confidence_threshold: float = 0.7) -> Optional[Dict[str, Any]]: - """ - Recherche visuelle multi-échelle pour trouver l'ancre même si elle a changé de taille. - - Args: - anchor_image_base64: Image de l'ancre encodée en base64 - scales: Liste des facteurs d'échelle à tester (ex: [0.8, 0.9, 1.0, 1.1, 1.2]) - confidence_threshold: Seuil de confiance minimum - - Returns: - Dict avec les coordonnées trouvées ou None - """ - if scales is None: - scales = [0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3] - - if not CV2_AVAILABLE: - return None - - try: - import pyautogui - - # Capturer l'écran - screenshot = pyautogui.screenshot() - screen_array = np.array(screenshot) - screen_gray = cv2.cvtColor(screen_array, cv2.COLOR_RGB2GRAY) - - # Décoder l'ancre - if ',' in anchor_image_base64: - anchor_image_base64 = anchor_image_base64.split(',')[1] - - anchor_bytes = base64.b64decode(anchor_image_base64) - anchor_pil = Image.open(BytesIO(anchor_bytes)) - anchor_array = np.array(anchor_pil.convert('RGB')) - anchor_gray = cv2.cvtColor(anchor_array, cv2.COLOR_RGB2GRAY) - - best_result = None - best_confidence = 0 - - for scale in scales: - # Redimensionner l'ancre - new_w = int(anchor_gray.shape[1] * scale) - new_h = int(anchor_gray.shape[0] * scale) - - if new_w < 10 or new_h < 10: - continue - if new_w > screen_gray.shape[1] or new_h > screen_gray.shape[0]: - continue - - resized = cv2.resize(anchor_gray, (new_w, new_h)) - - result = cv2.matchTemplate(screen_gray, resized, cv2.TM_CCOEFF_NORMED) - min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result) - - if max_val > best_confidence: - best_confidence = max_val - best_result = { - "found": True, - "x": max_loc[0], - "y": max_loc[1], - "width": new_w, - "height": new_h, - "confidence": max_val, - "center_x": max_loc[0] + new_w // 2, - "center_y": max_loc[1] + new_h // 2, - "scale": scale - } - - if best_result and best_result["confidence"] >= confidence_threshold: - print(f"✅ [MultiScale] Trouvé à l'échelle {best_result['scale']:.1f}x") - print(f" Position: ({best_result['center_x']}, {best_result['center_y']})") - print(f" Confiance: {best_result['confidence']:.2%}") - return best_result - - return None - - except Exception as e: - print(f"❌ [MultiScale] Erreur: {e}") - return None - -# Créer le blueprint pour les routes catalogue -catalog_bp = Blueprint('catalog', __name__, url_prefix='/api/vwb/catalog') - -# Instance globale du ScreenCapturer (initialisée à la demande) -_screen_capturer_instance = None - - -def get_screen_capturer(): - """ - Obtient l'instance du ScreenCapturer (initialisation paresseuse). - - Returns: - ScreenCapturer ou None si non disponible - """ - global _screen_capturer_instance - if _screen_capturer_instance is None and SCREEN_CAPTURER_AVAILABLE: - try: - _screen_capturer_instance = ScreenCapturer( - buffer_size=5, - detect_changes=False - ) - print("✅ ScreenCapturer initialisé pour le catalogue VWB") - except Exception as e: - print(f"⚠️ Erreur initialisation ScreenCapturer: {e}") - return None - return _screen_capturer_instance - - -def create_action_from_config(action_config: Dict[str, Any]) -> Optional[BaseVWBAction]: - """ - Crée une instance d'action à partir de la configuration. - - Args: - action_config: Configuration de l'action - - Returns: - Instance d'action ou None si erreur - """ - if not ACTIONS_AVAILABLE: - print("⚠️ Actions VWB non disponibles") - return None - - try: - action_type = action_config.get('type') - action_id = action_config.get('action_id', f"action_{datetime.now().strftime('%Y%m%d_%H%M%S')}") - parameters = action_config.get('parameters', {}) - - # Obtenir le ScreenCapturer - screen_capturer = get_screen_capturer() - - # Créer l'ancre visuelle si fournie - if 'visual_anchor' in parameters and isinstance(parameters['visual_anchor'], dict): - anchor_data = parameters['visual_anchor'] - visual_anchor = VWBVisualAnchor.from_dict(anchor_data) - parameters['visual_anchor'] = visual_anchor - - # Créer l'action selon le type - if action_type == 'click_anchor': - return VWBClickAnchorAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'type_text': - return VWBTypeTextAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'wait_for_anchor': - return VWBWaitForAnchorAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'focus_anchor': - return VWBFocusAnchorAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'type_secret': - return VWBTypeSecretAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'scroll_to_anchor': - return VWBScrollToAnchorAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - elif action_type == 'extract_text': - return VWBExtractTextAction( - action_id=action_id, - parameters=parameters, - screen_capturer=screen_capturer - ) - else: - print(f"⚠️ Type d'action non supporté: {action_type}") - return None - - except Exception as e: - print(f"❌ Erreur création action: {e}") - return None - - -@catalog_bp.route('/actions', methods=['GET']) -def list_actions(): - """ - Liste toutes les actions disponibles dans le catalogue VWB. - - Query Parameters: - - category: Filtrer par catégorie (vision_ui, control, data) - - search: Recherche textuelle dans nom/description - - Response: - { - "success": true, - "actions": [ - { - "id": "click_anchor", - "name": "Clic sur Ancre Visuelle", - "description": "...", - "category": "vision_ui", - "parameters": {...}, - "examples": [...] - } - ], - "total": 3, - "categories": ["vision_ui"] - } - """ - try: - # Paramètres de requête - category_filter = request.args.get('category') - search_query = request.args.get('search', '').lower() - - # Définir les actions disponibles - available_actions = [ - { - "id": "click_anchor", - "name": "Clic sur Ancre Visuelle", - "description": "Clique sur un élément UI identifié par une ancre visuelle", - "category": "vision_ui", - "icon": "🖱️", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser l'élément" - }, - "click_type": { - "type": "string", - "required": False, - "default": "left", - "options": ["left", "right", "double"], - "description": "Type de clic à effectuer" - }, - "click_offset_x": { - "type": "number", - "required": False, - "default": 0, - "description": "Décalage horizontal en pixels" - }, - "click_offset_y": { - "type": "number", - "required": False, - "default": 0, - "description": "Décalage vertical en pixels" - }, - "confidence_threshold": { - "type": "number", - "required": False, - "default": 0.8, - "min": 0.0, - "max": 1.0, - "description": "Seuil de confiance pour la détection" - } - }, - "examples": [ - { - "name": "Clic sur bouton de validation", - "description": "Clique sur le bouton 'Valider' d'un formulaire", - "parameters": { - "click_type": "left", - "confidence_threshold": 0.9 - } - } - ] - }, - { - "id": "type_text", - "name": "Saisie de Texte", - "description": "Saisit du texte dans un champ identifié par une ancre visuelle", - "category": "vision_ui", - "icon": "⌨️", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser le champ" - }, - "text_to_type": { - "type": "string", - "required": True, - "description": "Texte à saisir" - }, - "clear_field_first": { - "type": "boolean", - "required": False, - "default": True, - "description": "Vider le champ avant la saisie" - }, - "click_before_typing": { - "type": "boolean", - "required": False, - "default": True, - "description": "Cliquer sur le champ avant la saisie" - }, - "press_enter_after": { - "type": "boolean", - "required": False, - "default": False, - "description": "Appuyer sur Entrée après la saisie" - }, - "typing_speed_ms": { - "type": "number", - "required": False, - "default": 50, - "min": 0, - "description": "Délai entre caractères en millisecondes" - } - }, - "examples": [ - { - "name": "Saisie d'email", - "description": "Saisit une adresse email dans un champ", - "parameters": { - "text_to_type": "user@example.com", - "clear_field_first": True, - "press_enter_after": False - } - } - ] - }, - { - "id": "wait_for_anchor", - "name": "Attente d'Ancre Visuelle", - "description": "Attend qu'une ancre visuelle apparaisse ou disparaisse", - "category": "control", - "icon": "⏳", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle à surveiller" - }, - "wait_mode": { - "type": "string", - "required": False, - "default": "appear", - "options": ["appear", "disappear"], - "description": "Mode d'attente (apparition ou disparition)" - }, - "max_wait_time_ms": { - "type": "number", - "required": False, - "default": 30000, - "min": 1000, - "description": "Délai d'attente maximum en millisecondes" - }, - "check_interval_ms": { - "type": "number", - "required": False, - "default": 500, - "min": 100, - "description": "Intervalle de vérification en millisecondes" - } - }, - "examples": [ - { - "name": "Attendre chargement", - "description": "Attend qu'un indicateur de chargement disparaisse", - "parameters": { - "wait_mode": "disappear", - "max_wait_time_ms": 10000 - } - } - ] - }, - { - "id": "focus_anchor", - "name": "Donner le Focus à un Élément", - "description": "Donne le focus à un élément UI identifié par une ancre visuelle", - "category": "vision_ui", - "icon": "🎯", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser l'élément" - }, - "focus_method": { - "type": "string", - "required": False, - "default": "click", - "options": ["click", "tab", "keyboard"], - "description": "Méthode pour donner le focus" - }, - "verify_focus": { - "type": "boolean", - "required": False, - "default": True, - "description": "Vérifier que le focus a été donné" - }, - "confidence_threshold": { - "type": "number", - "required": False, - "default": 0.8, - "min": 0.0, - "max": 1.0, - "description": "Seuil de confiance pour la détection" - } - }, - "examples": [ - { - "name": "Focus sur champ de saisie", - "description": "Donne le focus à un champ pour la saisie clavier", - "parameters": { - "focus_method": "click", - "verify_focus": True - } - } - ] - }, - { - "id": "type_secret", - "name": "Saisie Sécurisée", - "description": "Saisit de manière sécurisée des données sensibles (mots de passe, codes)", - "category": "vision_ui", - "icon": "🔐", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser le champ de saisie" - }, - "secret_text": { - "type": "string", - "required": True, - "sensitive": True, - "description": "Texte secret à saisir (masqué dans les logs)" - }, - "clear_field_first": { - "type": "boolean", - "required": False, - "default": True, - "description": "Vider le champ avant la saisie" - }, - "mask_in_evidence": { - "type": "boolean", - "required": False, - "default": True, - "description": "Masquer le texte dans les Evidence" - }, - "typing_speed_ms": { - "type": "number", - "required": False, - "default": 30, - "min": 0, - "description": "Délai entre caractères (plus rapide pour sécurité)" - } - }, - "examples": [ - { - "name": "Saisie mot de passe", - "description": "Saisit un mot de passe dans un champ de connexion", - "parameters": { - "clear_field_first": True, - "mask_in_evidence": True - } - } - ] - }, - { - "id": "scroll_to_anchor", - "name": "Défiler vers un Élément", - "description": "Fait défiler la page jusqu'à ce qu'un élément soit visible", - "category": "vision_ui", - "icon": "📜", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser l'élément cible" - }, - "scroll_direction": { - "type": "string", - "required": False, - "default": "vertical", - "options": ["vertical", "horizontal", "both"], - "description": "Direction du défilement" - }, - "scroll_speed": { - "type": "string", - "required": False, - "default": "medium", - "options": ["slow", "medium", "fast"], - "description": "Vitesse du défilement" - }, - "max_scroll_attempts": { - "type": "number", - "required": False, - "default": 10, - "min": 1, - "description": "Nombre maximum de tentatives de défilement" - }, - "target_position": { - "type": "string", - "required": False, - "default": "center", - "options": ["top", "center", "bottom"], - "description": "Position cible de l'élément dans la vue" - } - }, - "examples": [ - { - "name": "Défiler vers un bouton", - "description": "Fait défiler verticalement pour trouver un bouton", - "parameters": { - "scroll_direction": "vertical", - "target_position": "center" - } - } - ] - }, - { - "id": "extract_text", - "name": "Extraire du Texte", - "description": "Extrait du texte d'une zone identifiée par une ancre visuelle", - "category": "data", - "icon": "📝", - "parameters": { - "visual_anchor": { - "type": "VWBVisualAnchor", - "required": True, - "description": "Ancre visuelle pour localiser la zone de texte" - }, - "extraction_mode": { - "type": "string", - "required": False, - "default": "full", - "options": ["full", "lines", "words", "numbers", "custom"], - "description": "Mode d'extraction du texte" - }, - "text_filters": { - "type": "array", - "required": False, - "default": [], - "description": "Filtres à appliquer au texte extrait" - }, - "output_format": { - "type": "string", - "required": False, - "default": "text", - "options": ["text", "json", "structured"], - "description": "Format de sortie du texte extrait" - }, - "confidence_threshold": { - "type": "number", - "required": False, - "default": 0.8, - "min": 0.0, - "max": 1.0, - "description": "Seuil de confiance pour la détection" - } - }, - "examples": [ - { - "name": "Extraire un numéro de facture", - "description": "Extrait un numéro de facture d'un document", - "parameters": { - "extraction_mode": "numbers", - "text_filters": ["digits_only"] - } - } - ] - } - ] - - # Filtrer par catégorie - if category_filter: - available_actions = [ - action for action in available_actions - if action['category'] == category_filter - ] - - # Filtrer par recherche - if search_query: - available_actions = [ - action for action in available_actions - if (search_query in action['name'].lower() or - search_query in action['description'].lower()) - ] - - # Obtenir les catégories uniques - categories = list(set(action['category'] for action in available_actions)) - - return jsonify({ - "success": True, - "actions": available_actions, - "total": len(available_actions), - "categories": sorted(categories), - "screen_capturer_available": SCREEN_CAPTURER_AVAILABLE - }) - - except Exception as e: - return jsonify({ - "success": False, - "error": f"Erreur lors de la récupération des actions: {str(e)}", - "traceback": traceback.format_exc() - }), 500 - - -@catalog_bp.route('/actions/', methods=['GET']) -def get_action_details(action_id: str): - """ - Obtient les détails d'une action spécifique. - - Path Parameters: - - action_id: Identifiant de l'action - - Response: - { - "success": true, - "action": { - "id": "click_anchor", - "name": "...", - "description": "...", - "parameters": {...}, - "examples": [...], - "documentation": "..." - } - } - """ - try: - # Récupérer toutes les actions - all_actions_response = list_actions() - all_actions_data = all_actions_response.get_json() - - if not all_actions_data.get('success'): - return all_actions_response - - # Trouver l'action demandée - action = None - for act in all_actions_data['actions']: - if act['id'] == action_id: - action = act - break - - if not action: - return jsonify({ - "success": False, - "error": f"Action '{action_id}' non trouvée" - }), 404 - - # Ajouter de la documentation détaillée - documentation = { - "click_anchor": """ -# Clic sur Ancre Visuelle - -Cette action permet de cliquer sur un élément de l'interface utilisateur en utilisant une ancre visuelle pour le localiser. - -## Fonctionnement -1. Capture l'écran actuel -2. Recherche l'ancre visuelle dans l'image -3. Calcule les coordonnées de clic -4. Effectue le clic au type spécifié -5. Génère des preuves d'exécution (Evidence) - -## Cas d'usage -- Cliquer sur des boutons -- Sélectionner des éléments de menu -- Activer des contrôles UI -- Naviguer dans des interfaces - -## Bonnes pratiques -- Utilisez des ancres visuelles distinctives -- Ajustez le seuil de confiance selon la stabilité de l'UI -- Testez avec différentes résolutions d'écran - """, - "type_text": """ -# Saisie de Texte - -Cette action permet de saisir du texte dans un champ de saisie identifié par une ancre visuelle. - -## Fonctionnement -1. Localise le champ de saisie via l'ancre visuelle -2. Clique sur le champ (optionnel) -3. Vide le champ existant (optionnel) -4. Saisit le texte caractère par caractère -5. Appuie sur Entrée (optionnel) - -## Cas d'usage -- Remplir des formulaires -- Saisir des identifiants de connexion -- Entrer des données dans des champs -- Effectuer des recherches - -## Bonnes pratiques -- Vérifiez que le champ est actif avant la saisie -- Utilisez un délai de saisie approprié -- Testez avec différents types de champs - """, - "wait_for_anchor": """ -# Attente d'Ancre Visuelle - -Cette action permet d'attendre qu'un élément apparaisse ou disparaisse de l'écran. - -## Fonctionnement -1. Surveille l'écran à intervalles réguliers -2. Recherche l'ancre visuelle à chaque vérification -3. Attend que la condition soit remplie (apparition/disparition) -4. Retourne le résultat ou timeout - -## Cas d'usage -- Attendre la fin d'un chargement -- Synchroniser avec des animations -- Attendre l'apparition de dialogues -- Gérer les états transitoires - -## Bonnes pratiques -- Définissez des délais d'attente raisonnables -- Utilisez des intervalles de vérification appropriés -- Prévoyez des conditions de sortie - """ - } - - action['documentation'] = documentation.get(action_id, "Documentation non disponible") - - return jsonify({ - "success": True, - "action": action - }) - - except Exception as e: - return jsonify({ - "success": False, - "error": f"Erreur lors de la récupération de l'action: {str(e)}" - }), 500 - - -@catalog_bp.route('/execute', methods=['POST']) -def execute_action(): - """ - Exécute une action du catalogue VWB. - - Request Body: - { - "type": "click_anchor", - "action_id": "action_001", - "step_id": "step_001", - "parameters": { - "visual_anchor": {...}, - "click_type": "left" - }, - "workflow_id": "workflow_001", - "user_id": "user_001" - } - - Response: - { - "success": true, - "result": { - "action_id": "action_001", - "step_id": "step_001", - "status": "success", - "execution_time_ms": 1250.5, - "output_data": {...}, - "evidence_list": [...], - "error": null - } - } - """ - try: - # Valider le corps de la requête - data = request.get_json() - if not data: - return jsonify({ - "success": False, - "error": "Corps de requête JSON requis" - }), 400 - - # Extraire les paramètres (supporter les deux formats: type/stepType, step_id/stepId) - action_type = data.get('type') or data.get('stepType') - step_id = data.get('step_id') or data.get('stepId', f"step_{datetime.now().strftime('%Y%m%d_%H%M%S')}") - workflow_id = data.get('workflow_id') or data.get('workflowId') - user_id = data.get('user_id') or data.get('userId') - - if not action_type: - return jsonify({ - "success": False, - "error": "Paramètre 'type' ou 'stepType' requis" - }), 400 - - parameters = data.get('parameters', {}) - - # LOG DEBUG - Voir ce qui arrive du frontend - print(f"\n{'='*60}") - print(f"🔥 REQUÊTE EXECUTE REÇUE:") - print(f" stepType: {action_type}") - print(f" stepId: {step_id}") - print(f" parameters keys: {list(parameters.keys())}") - print(f"{'='*60}\n") - - # === VALIDATION CONTRAT STRICT === - # BLOQUE l'exécution si le contrat n'est pas respecté - if CONTRACTS_AVAILABLE: - try: - enforce_action_contract(action_type, parameters) - except ContractValidationError as e: - print(f"🚫 [CONTRAT VIOLÉ] Action '{action_type}' bloquée!") - print(f" Paramètres requis: {get_required_params(action_type)}") - print(f" Paramètres reçus: {list(parameters.keys())}") - return jsonify({ - "success": False, - "status": "error", - "should_stop": True, - "error": e.to_dict(), - "message": str(e) - }), 400 - else: - print(f"⚠️ [Contract] Validation de contrat non disponible") - - # === MODE EXÉCUTION DIRECTE AVEC PYAUTOGUI === - # Si on a des coordonnées bounding_box, on exécute directement sans passer par les classes VWB - direct_execution = data.get('direct_execution', True) # Par défaut, exécution directe - if direct_execution: - try: - import pyautogui - import time - - # Récupérer la résolution de l'écran - screen_width, screen_height = pyautogui.size() - print(f"📐 [Direct] Résolution: {screen_width}x{screen_height}") - - result_message = '' - execution_success = True - start_time = datetime.now() - - if action_type in ['click', 'click_anchor']: - # IMPORTANT: Minimiser la fenêtre active (navigateur VWB) pour éviter - # que la miniature de l'ancre soit détectée à la place de l'élément réel - try: - import subprocess - # Utiliser xdotool pour minimiser la fenêtre active - subprocess.run(['xdotool', 'getactivewindow', 'windowminimize'], - timeout=2, capture_output=True) - print("📉 [Prepare] Fenêtre active minimisée") - time.sleep(0.5) # Attendre que la fenêtre soit minimisée - except Exception as minimize_error: - print(f"⚠️ [Prepare] Impossible de minimiser la fenêtre: {minimize_error}") - - # Récupérer l'ancre visuelle - visual_anchor = parameters.get('visual_anchor', {}) - target = parameters.get('target', {}) - - # DEBUG: Écrire dans fichier de log - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"\n--- catalog_routes execute ---\n") - df.write(f"parameters keys: {list(parameters.keys())}\n") - df.write(f"visual_anchor keys: {list(visual_anchor.keys()) if visual_anchor else 'None'}\n") - df.write(f"visual_anchor.anchor_id: {visual_anchor.get('anchor_id')}\n") - df.write(f"visual_anchor.id: {visual_anchor.get('id')}\n") - df.flush() - - # === RECHERCHE VISUELLE DYNAMIQUE === - # Si l'ancre contient une image (screenshot), on la cherche sur l'écran actuel - # Chercher dans tous les formats possibles - anchor_screenshot = ( - visual_anchor.get('screenshot') or - visual_anchor.get('image') or - visual_anchor.get('reference_image_base64') or - visual_anchor.get('referenceImage') or - target.get('screenshot') or - target.get('image') or - target.get('reference_image_base64') or - target.get('referenceImage') or - # Chercher dans metadata aussi - (visual_anchor.get('metadata', {}) or {}).get('reference_image') or - (target.get('metadata', {}) or {}).get('reference_image') - ) - - # Si pas d'image base64, essayer de charger depuis l'URL du serveur - if not anchor_screenshot: - original_url = ( - visual_anchor.get('reference_image_url') or - (visual_anchor.get('metadata', {}) or {}).get('reference_image_url') - ) - if original_url: - try: - # Extraire l'anchor_id depuis l'URL - # Format: /api/anchor-images/{anchor_id}/original - import re - match = re.search(r'/api/anchor-images/([^/]+)/original', original_url) - if match: - anchor_id = match.group(1) - # Charger l'image depuis le disque - from services.anchor_image_service import get_original_path - original_path = get_original_path(anchor_id) - if original_path and os.path.exists(original_path): - with open(original_path, 'rb') as f: - anchor_screenshot = base64.b64encode(f.read()).decode('utf-8') - print(f"✅ [Debug] Image chargée depuis le serveur: {anchor_id}") - except Exception as e: - print(f"⚠️ [Debug] Erreur chargement image serveur: {e}") - - # FALLBACK: Charger depuis anchor_id directement si présent - if not anchor_screenshot: - anchor_id = ( - visual_anchor.get('id') or - visual_anchor.get('anchor_id') or - target.get('id') or - target.get('anchor_id') - ) - - # Essayer aussi d'extraire anchor_id depuis thumbnail_url ou metadata.thumbnail_url - if not anchor_id: - thumbnail_url = ( - visual_anchor.get('thumbnail_url') or - (visual_anchor.get('metadata', {}) or {}).get('thumbnail_url') or - target.get('thumbnail_url') or - (target.get('metadata', {}) or {}).get('thumbnail_url') - ) - if thumbnail_url: - import re - match = re.search(r'/api/anchor-images/([^/]+)/', thumbnail_url) - if match: - anchor_id = match.group(1) - print(f"📎 [Debug] anchor_id extrait de thumbnail_url: {anchor_id}") - - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"Fallback anchor_id trouvé: {anchor_id}\n") - df.flush() - - if anchor_id and anchor_id.startswith('anchor_'): - try: - import os as os_module - from services.anchor_image_service import get_original_path - original_path = get_original_path(anchor_id) - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"original_path: {original_path}, exists: {os_module.path.exists(original_path) if original_path else False}\n") - df.flush() - if original_path and os_module.path.exists(original_path): - with open(original_path, 'rb') as f: - anchor_screenshot = base64.b64encode(f.read()).decode('utf-8') - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"✅ Image chargée via anchor_id: {anchor_id}, len={len(anchor_screenshot)}\n") - df.flush() - except Exception as e: - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"⚠️ Erreur chargement via anchor_id: {e}\n") - df.flush() - - print(f"🔍 [Debug] visual_anchor keys: {list(visual_anchor.keys()) if visual_anchor else 'None'}") - print(f"🔍 [Debug] target keys: {list(target.keys()) if target else 'None'}") - print(f"🔍 [Debug] anchor_screenshot trouvé: {bool(anchor_screenshot)} (longueur: {len(anchor_screenshot) if anchor_screenshot else 0})") - - # Mode de recherche visuelle DÉSACTIVÉ par défaut (retour à l'approche du 14 janvier) - # La Visual Search avec template matching trouve trop de faux positifs (miniatures, icônes similaires) - # On utilise les coordonnées statiques du bounding_box qui sont plus fiables - use_visual_search = data.get('use_visual_search', False) # Désactivé par défaut - print(f"🎯 [Mode Simple] Utilisation des coordonnées bounding_box directement (Visual Search: {use_visual_search})") - confidence_threshold = parameters.get('confidence_threshold', 0.7) - - # Extraire le bounding_box pour le cropping de l'image de référence - bounding_box = ( - visual_anchor.get('bounding_box') or - visual_anchor.get('boundingBox') or - target.get('boundingBox') or - target.get('bounding_box') or - {} - ) - print(f"🔍 [Debug] bounding_box: {bounding_box}") - - x, y = 0, 0 - search_method = "static" - - if use_visual_search: - print(f"🔎 [Visual Search] Recherche dynamique de l'ancre sur l'écran...") - - # Calculer la position attendue pour validation - # Évite les fausses détections (ex: copie dans l'interface VWB) - expected_position = None - if bounding_box: - bbox_x = bounding_box.get('x', 0) - bbox_y = bounding_box.get('y', 0) - bbox_w = bounding_box.get('width', 0) - bbox_h = bounding_box.get('height', 0) - - # Récupérer la résolution de l'image originale depuis les métadonnées - metadata = ( - visual_anchor.get('metadata') or - target.get('metadata') or - {} - ) - screen_resolution = metadata.get('screen_resolution') or {} - ref_width = screen_resolution.get('width') or screen_width - ref_height = screen_resolution.get('height') or screen_height - - # Calculer le scale pour convertir vers l'écran réel - if ref_width == screen_width and ref_height == screen_height: - scale_to_screen_x = 1.0 - scale_to_screen_y = 1.0 - else: - scale_to_screen_x = screen_width / ref_width - scale_to_screen_y = screen_height / ref_height - - exp_x = int((bbox_x + bbox_w / 2) * scale_to_screen_x) - exp_y = int((bbox_y + bbox_h / 2) * scale_to_screen_y) - expected_position = {'x': exp_x, 'y': exp_y} - print(f"📍 [Visual Search] Position attendue: ({exp_x}, {exp_y}) [ref: {ref_width}x{ref_height}]") - - # Essayer d'abord la recherche normale avec le bounding_box pour cropper - visual_result = find_visual_anchor_on_screen(anchor_screenshot, confidence_threshold, bounding_box, expected_position) - - # Note: On ne fait PAS de fallback multi-échelle quand on a une position attendue - # car si la validation a échoué, le multi-échelle trouvera probablement la même fausse copie - # Les coordonnées statiques sont plus fiables dans ce cas - if not visual_result and not expected_position: - # Fallback: recherche multi-échelle (seulement si pas de position attendue) - print(f"🔄 [Visual Search] Tentative multi-échelle...") - visual_result = find_anchor_multiscale(anchor_screenshot, confidence_threshold=confidence_threshold * 0.9) - - if visual_result: - x = visual_result['center_x'] - y = visual_result['center_y'] - search_method = f"visual ({visual_result.get('method', 'multiscale')}, conf={visual_result['confidence']:.2%})" - print(f"✅ [Visual Search] Élément trouvé à ({x}, {y})") - else: - print(f"⚠️ [Visual Search] Ancre non trouvée, fallback sur coordonnées statiques") - - # === MÉTHODE PYAUTOGUI AMÉLIORÉE === - # Priorité sur pyautogui car les coordonnées bounding_box du frontend sont incorrectes - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"=== PYAUTOGUI CHECK ===\n") - df.write(f"x={x}, y={y}, anchor_screenshot={bool(anchor_screenshot)}, len={len(anchor_screenshot) if anchor_screenshot else 0}\n") - df.flush() - - if x == 0 and y == 0 and anchor_screenshot: - try: - import pyautogui - import tempfile - import os - - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"🔍 PyAutoGUI: Démarrage recherche...\n") - df.flush() - - # Sauvegarder l'image de l'ancre temporairement - anchor_img_data = base64.b64decode(anchor_screenshot) - with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp_file: - tmp_file.write(anchor_img_data) - tmp_path = tmp_file.name - - try: - location = None - found_conf = 0 - found_region = "unknown" - - # Définir les régions de recherche prioritaires - # PRIORITÉ 1: Dock en bas (car c'est là que l'utilisateur a son dock Ubuntu) - # PRIORITÉ 2: Autres régions si pas trouvé en bas - dock_regions = [ - # Dock en bas (100 derniers pixels) - PRIORITAIRE - ("dock-bas", (0, screen_height - 100, screen_width, 100)), - # Dock à gauche (80 premiers pixels) - ("dock-gauche", (0, 0, 80, screen_height)), - # Dock à droite (80 derniers pixels) - ("dock-droite", (screen_width - 80, 0, 80, screen_height)), - ] - - # Stratégie: tester CHAQUE région avec TOUTES les confidences - # avant de passer à la suivante (priorité au dock-bas) - # Note: certaines images nécessitent conf=0.4 pour être trouvées - confidence_levels = [0.8, 0.7, 0.6, 0.5, 0.4] - - # Pour chaque région (en commençant par dock-bas) - for region_name, region in dock_regions: - if location: - break - # Tester toutes les confidences pour cette région - for conf in confidence_levels: - try: - loc = pyautogui.locateOnScreen( - tmp_path, - confidence=conf, - region=region, - grayscale=True # Réduire les faux positifs couleur - ) - if loc: - location = loc - found_conf = conf - found_region = region_name - print(f"🎯 [PyAutoGUI] Trouvé dans {region_name} avec confidence={conf}") - break - except Exception as e: - pass - - # Si pas trouvé dans les docks, chercher sur l'écran complet - if not location: - for conf in confidence_levels: - try: - loc = pyautogui.locateOnScreen( - tmp_path, - confidence=conf, - grayscale=True - ) - if loc: - location = loc - found_conf = conf - found_region = "écran complet" - print(f"🔍 [PyAutoGUI] Trouvé sur écran complet avec confidence={conf}") - break - except Exception as e: - pass - - if location: - # Obtenir le centre de la zone trouvée - center = pyautogui.center(location) - x, y = center.x, center.y - search_method = f"pyautogui ({found_region}, conf={found_conf})" - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"✅ PyAutoGUI: Trouvé à ({x}, {y}) - région: {found_region}, conf={found_conf}\n") - df.flush() - else: - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"⚠️ PyAutoGUI: Ancre NON TROUVÉE après toutes les tentatives\n") - df.flush() - finally: - # Nettoyer le fichier temporaire - import os as os_cleanup - os_cleanup.unlink(tmp_path) - - except Exception as e: - print(f"⚠️ [PyAutoGUI] Erreur: {e}") - - # === FALLBACK: COORDONNÉES STATIQUES === - # Note: Les coordonnées bounding_box du frontend peuvent être incorrectes - # (bug connu: x négatif, y décalé). Ce fallback n'est utilisé que si pyautogui échoue. - if x == 0 and y == 0: - bounding_box = ( - visual_anchor.get('bounding_box') or - visual_anchor.get('boundingBox') or - target.get('boundingBox') or - target.get('bounding_box') or - {} - ) - - if bounding_box: - bbox_x = bounding_box.get('x', 0) - bbox_y = bounding_box.get('y', 0) - bbox_w = bounding_box.get('width', 0) - bbox_h = bounding_box.get('height', 0) - - print(f"📦 [Static] BBox brute: x={bbox_x}, y={bbox_y}, w={bbox_w}, h={bbox_h}") - - # VALIDATION: Rejeter les coordonnées invalides - if bbox_x < 0 or bbox_y < 0: - print(f"⚠️ [Static] Coordonnées invalides (négatives): x={bbox_x}, y={bbox_y} - IGNORÉ") - elif bbox_w <= 0 or bbox_h <= 0: - print(f"⚠️ [Static] Dimensions invalides: w={bbox_w}, h={bbox_h} - IGNORÉ") - else: - # Récupérer la résolution de l'image originale depuis les métadonnées - # Les coordonnées du bounding_box sont en pixels de l'image ORIGINALE - # (pas une image redimensionnée) - metadata = ( - visual_anchor.get('metadata') or - target.get('metadata') or - {} - ) - screen_resolution = metadata.get('screen_resolution') or {} - ref_width = screen_resolution.get('width') or screen_width # Utiliser la résolution écran si non spécifié - ref_height = screen_resolution.get('height') or screen_height - - # Si la résolution originale correspond à l'écran actuel, pas besoin de scale - if ref_width == screen_width and ref_height == screen_height: - scale_to_screen_x = 1.0 - scale_to_screen_y = 1.0 - print(f"📐 [Static] Image originale = Écran actuel ({screen_width}x{screen_height}), pas de conversion") - else: - scale_to_screen_x = screen_width / ref_width - scale_to_screen_y = screen_height / ref_height - print(f"📐 [Static] Image originale: {ref_width}x{ref_height}, Écran: {screen_width}x{screen_height}") - print(f"📐 [Static] Scale: x={scale_to_screen_x:.2f}, y={scale_to_screen_y:.2f}") - - # Convertir vers coordonnées écran - screen_x = (bbox_x + bbox_w / 2) * scale_to_screen_x - screen_y = (bbox_y + bbox_h / 2) * scale_to_screen_y - - # Validation finale: les coordonnées doivent être dans l'écran - if 0 < screen_x < screen_width and 0 < screen_y < screen_height: - x = int(screen_x) - y = int(screen_y) - search_method = "static (coordonnées enregistrées)" - print(f"🎯 [Static] Centre image: ({bbox_x + bbox_w/2:.0f}, {bbox_y + bbox_h/2:.0f}) -> Écran: ({x}, {y})") - else: - print(f"⚠️ [Static] Coordonnées hors écran: ({screen_x:.0f}, {screen_y:.0f}) - IGNORÉ") - - # === EXÉCUTER LE CLIC === - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"=== CLIC FINAL ===\n") - df.write(f"x={x}, y={y}, search_method={search_method}\n") - df.flush() - - if x > 0 and y > 0: - click_type = parameters.get('click_type', 'left') - with open('/tmp/vwb_debug.log', 'a') as df: - df.write(f"🖱️ Clic {click_type} à ({x}, {y}) via {search_method}\n") - df.flush() - - if click_type == 'right': - pyautogui.rightClick(x, y) - elif click_type == 'double': - pyautogui.doubleClick(x, y) - else: - pyautogui.click(x, y) - result_message = f'Clic {click_type} effectué à ({x}, {y}) [{search_method}]' - else: - result_message = 'Coordonnées invalides - ancre non trouvée' - execution_success = False - - elif action_type in ['type', 'type_text', 'saisir_texte']: - text = parameters.get('text', parameters.get('texte', '')) - if text: - print(f"⌨️ [Direct] Saisie: {text[:30]}...") - if text.isascii(): - pyautogui.typewrite(text, interval=0.05) - else: - pyautogui.write(text) - result_message = f'Texte saisi: {text[:50]}...' - else: - result_message = 'Aucun texte à saisir' - - elif action_type in ['wait', 'attendre', 'wait_for_anchor']: - wait_time = parameters.get('duration', parameters.get('timeout_ms', 1000)) / 1000.0 - print(f"⏳ [Direct] Attente {wait_time}s") - time.sleep(wait_time) - result_message = f'Attente de {wait_time}s terminée' - - elif action_type in ['hotkey', 'raccourci']: - keys = parameters.get('keys', parameters.get('touches', [])) - if keys: - print(f"⌨️ [Direct] Raccourci: {'+'.join(keys)}") - pyautogui.hotkey(*keys) - result_message = f'Raccourci {"+".join(keys)} exécuté' - else: - result_message = 'Aucune touche définie' - - elif action_type in ['scroll', 'defiler']: - direction = parameters.get('direction', 'down') - amount = parameters.get('amount', 3) - clicks = amount if direction == 'down' else -amount - print(f"📜 [Direct] Scroll {direction} de {amount}") - pyautogui.scroll(clicks) - result_message = f'Scroll {direction} effectué' - - else: - result_message = f'Type {action_type} non géré en mode direct' - execution_success = False - - end_time = datetime.now() - execution_time_ms = (end_time - start_time).total_seconds() * 1000 - - return jsonify({ - "success": True, - "result": { - "action_id": f"direct_{action_type}_{step_id}", - "step_id": step_id, - "status": "success" if execution_success else "failed", - "execution_time_ms": execution_time_ms, - "output_data": {"message": result_message}, - "evidence_list": [], - "error": None if execution_success else {"message": result_message} - } - }) - - except Exception as e: - print(f"❌ [Direct] Erreur: {e}") - traceback.print_exc() - return jsonify({ - "success": False, - "error": f"Erreur exécution directe: {str(e)}" - }), 500 - - # === MODE VWB CLASSIQUE (si direct_execution=False) === - # Créer la configuration de l'action - action_config = { - "type": action_type, - "action_id": data.get('action_id', f"action_{action_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"), - "parameters": parameters - } - - # Créer l'instance d'action - action = create_action_from_config(action_config) - if not action: - return jsonify({ - "success": False, - "error": { - "message": f"Impossible de créer l'action de type '{action_type}'", - "type": "action_creation_failed" - } - }), 400 - - print(f"🚀 Exécution de l'action {action_type} (ID: {action.action_id})") - - # Exécuter l'action - result = action.execute( - step_id=step_id, - workflow_id=workflow_id, - user_id=user_id - ) - - # Convertir le résultat en dictionnaire - result_dict = { - "action_id": result.action_id, - "step_id": result.step_id, - "status": result.status.value, - "start_time": result.start_time.isoformat(), - "end_time": result.end_time.isoformat(), - "execution_time_ms": result.execution_time_ms, - "output_data": result.output_data, - "evidence_list": [evidence.to_dict() for evidence in result.evidence_list], - "error": result.error.to_dict() if result.error else None, - "retry_count": result.retry_count, - "workflow_id": result.workflow_id, - "user_id": result.user_id, - "session_id": result.session_id - } - - status_emoji = "✅" if result.is_success() else "❌" - print(f"{status_emoji} Action {action_type} terminée en {result.execution_time_ms:.1f}ms") - - return jsonify({ - "success": True, - "result": result_dict - }) - - except Exception as e: - error_msg = f"Erreur lors de l'exécution de l'action: {str(e)}" - print(f"❌ {error_msg}") - - return jsonify({ - "success": False, - "error": error_msg, - "traceback": traceback.format_exc() - }), 500 - - -@catalog_bp.route('/validate', methods=['POST']) -def validate_action(): - """ - Valide la configuration d'une action sans l'exécuter. - - Request Body: - { - "type": "click_anchor", - "parameters": { - "visual_anchor": {...}, - "click_type": "left" - } - } - - Response: - { - "success": true, - "validation": { - "is_valid": true, - "errors": [], - "warnings": [], - "suggestions": [] - } - } - """ - try: - # Valider le corps de la requête - data = request.get_json() - if not data: - return jsonify({ - "success": False, - "error": "Corps de requête JSON requis" - }), 400 - - action_type = data.get('type') - if not action_type: - return jsonify({ - "success": False, - "error": "Paramètre 'type' requis" - }), 400 - - # Créer la configuration de l'action - action_config = { - "type": action_type, - "action_id": "validation_test", - "parameters": data.get('parameters', {}) - } - - # Créer l'instance d'action pour validation - action = create_action_from_config(action_config) - if not action: - return jsonify({ - "is_valid": False, - "errors": [{"parameter": "type", "message": f"Type d'action '{action_type}' non supporté", "severity": "error"}], - "warnings": [], - "suggestions": [{"type": "best_practice", "message": "Vérifiez que le type d'action est correct", "priority": "medium"}] - }) - - # Valider les paramètres - validation_errors = action.validate_parameters() - - # Générer des avertissements et suggestions - warnings = [] - suggestions = [] - - # Vérifications spécifiques selon le type d'action - if action_type == 'click_anchor': - if hasattr(action, 'confidence_threshold') and action.confidence_threshold < 0.7: - warnings.append("Seuil de confiance faible (< 0.7) - risque de faux positifs") - suggestions.append("Considérez augmenter le seuil de confiance à 0.8 ou plus") - - elif action_type == 'type_text': - if hasattr(action, 'text_to_type') and len(action.text_to_type) > 100: - warnings.append("Texte très long (> 100 caractères) - saisie lente") - suggestions.append("Considérez diviser en plusieurs actions de saisie") - - elif action_type == 'wait_for_anchor': - if hasattr(action, 'max_wait_time_ms') and action.max_wait_time_ms > 60000: - warnings.append("Délai d'attente très long (> 60s)") - suggestions.append("Considérez réduire le délai d'attente") - - # Vérifier la disponibilité du ScreenCapturer - if not SCREEN_CAPTURER_AVAILABLE: - warnings.append("ScreenCapturer non disponible - actions simulées uniquement") - suggestions.append("Installez les dépendances de capture d'écran pour un fonctionnement complet") - - validation_result = { - "is_valid": len(validation_errors) == 0, - "errors": [{"parameter": "validation", "message": error, "severity": "error"} for error in validation_errors], - "warnings": [{"parameter": "validation", "message": warning, "impact": "medium"} for warning in warnings], - "suggestions": [{"type": "best_practice", "message": suggestion, "priority": "medium"} for suggestion in suggestions] - } - - return jsonify(validation_result) - - except Exception as e: - return jsonify({ - "success": False, - "error": f"Erreur lors de la validation: {str(e)}" - }), 500 - - -@catalog_bp.route('/categories', methods=['GET']) -def list_categories(): - """ - Liste toutes les catégories d'actions disponibles dans le catalogue VWB. - - Response: - { - "success": true, - "categories": [ - { - "id": "vision_ui", - "name": "Interface Utilisateur", - "description": "Actions d'interaction avec les éléments visuels", - "icon": "🖱️", - "actionCount": 5 - } - ], - "total": 3 - } - """ - try: - # Définir les catégories disponibles avec métadonnées - available_categories = [ - { - "id": "vision_ui", - "name": "Interface Utilisateur", - "description": "Actions d'interaction avec les éléments visuels de l'interface", - "icon": "🖱️", - "actionCount": 5, - "color": "#2196f3", - "isEnabled": True - }, - { - "id": "control", - "name": "Contrôle de Flux", - "description": "Actions de contrôle et synchronisation des workflows", - "icon": "⏳", - "actionCount": 1, - "color": "#ff9800", - "isEnabled": True - }, - { - "id": "data", - "name": "Données", - "description": "Actions de manipulation et extraction de données", - "icon": "📊", - "actionCount": 1, - "color": "#4caf50", - "isEnabled": True - } - ] - - return jsonify({ - "success": True, - "categories": available_categories, - "total": len(available_categories), - "screen_capturer_available": SCREEN_CAPTURER_AVAILABLE - }) - - except Exception as e: - return jsonify({ - "success": False, - "error": f"Erreur lors de la récupération des catégories: {str(e)}", - "traceback": traceback.format_exc() - }), 500 - - -@catalog_bp.route('/health', methods=['GET']) -def catalog_health(): - """ - Vérifie la santé du service catalogue VWB. - - Response: - { - "success": true, - "status": "healthy", - "services": { - "screen_capturer": true, - "actions": 3 - }, - "timestamp": "2026-01-09T23:30:00" - } - """ - try: - # Vérifier les services - screen_capturer = get_screen_capturer() - - services_status = { - "screen_capturer": screen_capturer is not None, - "actions": 7, # Nombre d'actions disponibles - "screen_capturer_method": getattr(screen_capturer, 'method', 'unavailable') if screen_capturer else 'unavailable' - } - - # Déterminer le statut global - overall_status = "healthy" if services_status["screen_capturer"] else "degraded" - - return jsonify({ - "success": True, - "status": overall_status, - "services": services_status, - "timestamp": datetime.now().isoformat(), - "version": "1.0.0-catalog" - }) - - except Exception as e: - return jsonify({ - "success": False, - "status": "unhealthy", - "error": str(e), - "timestamp": datetime.now().isoformat() - }), 500 - - -# Fonction pour enregistrer les routes dans l'application Flask -def register_catalog_routes(app): - """ - Enregistre les routes du catalogue dans l'application Flask. - - Args: - app: Instance Flask - """ - app.register_blueprint(catalog_bp) - print("📋 Routes du catalogue VWB enregistrées") - print(" - GET /api/vwb/catalog/actions") - print(" - GET /api/vwb/catalog/categories") - print(" - GET /api/vwb/catalog/actions/") - print(" - POST /api/vwb/catalog/execute") - print(" - POST /api/vwb/catalog/validate") - print(" - GET /api/vwb/catalog/health") \ No newline at end of file diff --git a/visual_workflow_builder/backend/catalog_routes_v2_vlm.py b/visual_workflow_builder/backend/catalog_routes_v2_vlm.py index 9d1d7f37e..36a5e10a0 100644 --- a/visual_workflow_builder/backend/catalog_routes_v2_vlm.py +++ b/visual_workflow_builder/backend/catalog_routes_v2_vlm.py @@ -25,14 +25,12 @@ import traceback # Import des actions et contrats VWB try: from visual_workflow_builder.backend.actions import ( - BaseVWBAction, VWBActionResult, VWBActionStatus, + BaseVWBAction, VWBClickAnchorAction, VWBTypeTextAction, VWBWaitForAnchorAction, VWBFocusAnchorAction, VWBTypeSecretAction, VWBScrollToAnchorAction, VWBExtractTextAction ) - from visual_workflow_builder.backend.contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error - from visual_workflow_builder.backend.contracts.evidence import VWBEvidenceType - from visual_workflow_builder.backend.contracts.visual_anchor import VWBVisualAnchor, VWBVisualAnchorType + from visual_workflow_builder.backend.contracts.visual_anchor import VWBVisualAnchor ACTIONS_AVAILABLE = True print("✅ Actions VWB importées avec succès") except ImportError as e: @@ -40,14 +38,12 @@ except ImportError as e: try: # Essayer import relatif from .actions import ( - BaseVWBAction, VWBActionResult, VWBActionStatus, + BaseVWBAction, VWBClickAnchorAction, VWBTypeTextAction, VWBWaitForAnchorAction, VWBFocusAnchorAction, VWBTypeSecretAction, VWBScrollToAnchorAction, VWBExtractTextAction ) - from .contracts.error import VWBErrorType, VWBErrorSeverity, create_vwb_error - from .contracts.evidence import VWBEvidenceType - from .contracts.visual_anchor import VWBVisualAnchor, VWBVisualAnchorType + from .contracts.visual_anchor import VWBVisualAnchor ACTIONS_AVAILABLE = True print("✅ Actions VWB importées avec import relatif") except ImportError as e2: @@ -97,7 +93,7 @@ try: sys.path.insert(0, '/home/dom/ai/rpa_vision_v3') if '/home/dom/ai/OmniParser' not in sys.path: sys.path.insert(0, '/home/dom/ai/OmniParser') - from core.detection.omniparser_adapter import get_omniparser, find_element as omniparser_find + from core.detection.omniparser_adapter import get_omniparser omniparser_adapter = get_omniparser() OMNIPARSER_AVAILABLE = omniparser_adapter.available print(f"✅ OmniParser disponible: {OMNIPARSER_AVAILABLE}") @@ -108,19 +104,18 @@ except Exception as e: OMNIPARSER_AVAILABLE = False # ============================================================================ -# VLM (Vision Language Model) - Ollama qwen2.5vl (fallback si OmniParser échoue) +# VLM (Vision Language Model) - Ollama (fallback si OmniParser échoue) +# Configurable via variable d'environnement VLM_MODEL # ============================================================================ -OLLAMA_URL = "http://localhost:11434" -VLM_MODEL = "qwen2.5vl:7b" # Modèle vision local - bon équilibre précision/vitesse +OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434") +VLM_MODEL = os.environ.get("VLM_MODEL", "qwen3-vl:8b") # qwen3-vl offre une meilleure qualité OCR # ============================================================================ # Pipeline VLM Coarse → Refine → Refine++ (Template Matching) # ============================================================================ from pydantic import BaseModel, Field -from typing import Literal, Optional, List -from dataclasses import dataclass # Ollama est optionnel - le template matching fonctionnera sans try: diff --git a/visual_workflow_builder/backend/services/ui_detection_service.py b/visual_workflow_builder/backend/services/ui_detection_service.py index f8a001b86..1c296df17 100644 --- a/visual_workflow_builder/backend/services/ui_detection_service.py +++ b/visual_workflow_builder/backend/services/ui_detection_service.py @@ -18,8 +18,12 @@ from dataclasses import dataclass import numpy as np from PIL import Image -# Configuration -MODEL_PATH = "/home/dom/ai/rpa_vision_v3/models/ui-detr-1/model.pth" +# Configuration — chemin relatif à la racine du projet, ou variable d'environnement +_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..')) +MODEL_PATH = os.environ.get( + "UI_DETR_MODEL_PATH", + os.path.join(_PROJECT_ROOT, "models", "ui-detr-1", "model.pth") +) CONFIDENCE_THRESHOLD = 0.35 RESOLUTION = 1600