#!/usr/bin/env python3 """ Visual Workflow Builder - Backend Flask Application (Version Allégée) Auteur : Dom, Alice, Kiro - 09 janvier 2026 Version optimisée pour un démarrage rapide avec uniquement les fonctionnalités essentielles. Cette version évite les imports lourds et les dépendances optionnelles. Fonctionnalités : - API REST pour la gestion des workflows - Capture d'écran via ScreenCapturer (core/capture) - Création d'embeddings visuels via CLIPEmbedder (core/embedding) """ import json import os import sys import base64 import io from pathlib import Path from datetime import datetime from typing import Dict, Any, List, Optional # Ajouter le répertoire racine au path pour les imports core ROOT_DIR = Path(__file__).parent.parent.parent sys.path.insert(0, str(ROOT_DIR)) sys.path.insert(0, str(Path(__file__).parent)) # Import minimal sans dépendances lourdes try: from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import urlparse, parse_qs import socketserver USE_FLASK = False print("⚡ Mode serveur HTTP natif (sans Flask)") except ImportError: USE_FLASK = True print("🔄 Tentative d'utilisation de Flask...") # Import du service de capture d'écran réelle try: from services.real_screen_capture import real_capture_service REAL_CAPTURE_AVAILABLE = True print("✅ Service de capture d'écran réelle disponible") except ImportError as e: REAL_CAPTURE_AVAILABLE = False real_capture_service = None print(f"⚠️ Service de capture d'écran réelle non disponible: {e}") # Import des routes du catalogue VWB try: from catalog_routes import register_catalog_routes CATALOG_ROUTES_AVAILABLE = True print("✅ Routes du catalogue VWB disponibles") except ImportError as e: CATALOG_ROUTES_AVAILABLE = False register_catalog_routes = None print(f"⚠️ Routes du catalogue VWB non disponibles: {e}") # ============================================================================ # Services de capture d'écran et d'embedding # ============================================================================ # Instance globale du capturer (initialisée à la demande) _screen_capturer = None _clip_embedder = None def get_screen_capturer(): """ Obtenir l'instance du ScreenCapturer (initialisation paresseuse). Returns: ScreenCapturer ou None si non disponible """ global _screen_capturer if _screen_capturer is None: try: # Vérifier les dépendances de capture d'écran try: import mss print("✅ mss disponible") except ImportError: print("❌ mss non disponible") try: import pyautogui print("✅ pyautogui disponible") except ImportError: print("❌ pyautogui non disponible") from core.capture import ScreenCapturer _screen_capturer = ScreenCapturer(buffer_size=5, detect_changes=False) print(f"✅ ScreenCapturer initialisé avec succès - méthode: {_screen_capturer.method}") except ImportError as e: print(f"⚠️ ScreenCapturer non disponible: {e}") return None except Exception as e: print(f"❌ Erreur initialisation ScreenCapturer: {e}") return None return _screen_capturer def get_clip_embedder(): """ Obtenir l'instance du CLIPEmbedder (initialisation paresseuse). Returns: CLIPEmbedder ou None si non disponible """ global _clip_embedder if _clip_embedder is None: try: from core.embedding import create_clip_embedder _clip_embedder = create_clip_embedder(device="cpu") print("✅ CLIPEmbedder initialisé avec succès") except ImportError as e: print(f"⚠️ CLIPEmbedder non disponible: {e}") return None except Exception as e: print(f"❌ Erreur initialisation CLIPEmbedder: {e}") return None return _clip_embedder def capture_screen_to_base64() -> Dict[str, Any]: """ Capture l'écran et retourne l'image en base64 (version ultra stable - Option A). Returns: Dict avec 'success', 'screenshot' (base64), 'width', 'height', ou 'error' """ try: # Utiliser directement le ScreenCapturer corrigé (Option A - ultra stable) capturer = get_screen_capturer() if capturer is None: return { 'success': False, 'error': 'Service de capture d\'écran non disponible' } from PIL import Image import numpy as np # Capturer l'écran avec la méthode ultra stable img_array = capturer.capture() if img_array is None: return { 'success': False, 'error': 'Échec de la capture d\'écran' } # Convertir en PIL Image pil_image = Image.fromarray(img_array) # Convertir en base64 buffer = io.BytesIO() pil_image.save(buffer, format='PNG', optimize=True) buffer.seek(0) screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8') return { 'success': True, 'screenshot': screenshot_base64, 'width': pil_image.width, 'height': pil_image.height, 'timestamp': datetime.now().isoformat(), 'method': 'ultra_stable_mss' } except Exception as e: return { 'success': False, 'error': f'Erreur lors de la capture: {str(e)}' } def create_visual_embedding(screenshot_base64: str, bounding_box: Dict[str, int], step_id: str) -> Dict[str, Any]: """ Crée un embedding visuel à partir d'une capture d'écran et d'une zone sélectionnée. Args: screenshot_base64: Image en base64 bounding_box: Zone sélectionnée {'x', 'y', 'width', 'height'} step_id: Identifiant de l'étape Returns: Dict avec 'success', 'embedding', 'embedding_id', ou 'error' """ embedder = get_clip_embedder() if embedder is None: return { 'success': False, 'error': 'Service d\'embedding non disponible' } try: from PIL import Image import numpy as np # Décoder l'image base64 image_data = base64.b64decode(screenshot_base64) pil_image = Image.open(io.BytesIO(image_data)) # Extraire la zone sélectionnée x = bounding_box.get('x', 0) y = bounding_box.get('y', 0) width = bounding_box.get('width', 100) height = bounding_box.get('height', 100) # Valider les coordonnées x = max(0, min(x, pil_image.width - 1)) y = max(0, min(y, pil_image.height - 1)) width = max(10, min(width, pil_image.width - x)) height = max(10, min(height, pil_image.height - y)) # Découper la zone cropped_image = pil_image.crop((x, y, x + width, y + height)) # Créer l'embedding embedding = embedder.embed_image(cropped_image) # Générer un ID unique pour l'embedding embedding_id = f"emb_{step_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Sauvegarder l'embedding et l'image de référence embeddings_dir = ROOT_DIR / "data" / "visual_embeddings" embeddings_dir.mkdir(parents=True, exist_ok=True) # Sauvegarder l'embedding en numpy embedding_path = embeddings_dir / f"{embedding_id}.npy" np.save(str(embedding_path), embedding) # Sauvegarder l'image de référence reference_path = embeddings_dir / f"{embedding_id}_ref.png" cropped_image.save(str(reference_path)) return { 'success': True, 'embedding': embedding.tolist(), 'embedding_id': embedding_id, 'dimension': len(embedding), 'reference_image': f"{embedding_id}_ref.png", 'bounding_box': { 'x': x, 'y': y, 'width': width, 'height': height } } except Exception as e: return { 'success': False, 'error': f'Erreur lors de la création de l\'embedding: {str(e)}' } class WorkflowHandler(BaseHTTPRequestHandler): """Gestionnaire HTTP simple pour les workflows.""" def __init__(self, *args, **kwargs): self.workflows_db = WorkflowDatabase() super().__init__(*args, **kwargs) def do_GET(self): """Gère les requêtes GET.""" parsed_path = urlparse(self.path) path = parsed_path.path # Headers CORS self.send_cors_headers() if path == '/health': self.send_health_check() elif path == '/': self.send_index() elif path.startswith('/api/workflows'): self.handle_workflows_get(path) else: self.send_error(404, "Not Found") def do_POST(self): """Gère les requêtes POST.""" parsed_path = urlparse(self.path) path = parsed_path.path self.send_cors_headers() if path.startswith('/api/workflows'): self.handle_workflows_post(path) else: self.send_error(404, "Not Found") def do_OPTIONS(self): """Gère les requêtes OPTIONS pour CORS.""" self.send_cors_headers() self.send_response(200) self.end_headers() def send_cors_headers(self): """Envoie les headers CORS.""" self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization') def send_json_response(self, data: Any, status_code: int = 200): """Envoie une réponse JSON.""" self.send_response(status_code) self.send_header('Content-Type', 'application/json') self.send_cors_headers() self.end_headers() json_data = json.dumps(data, ensure_ascii=False, indent=2) self.wfile.write(json_data.encode('utf-8')) def send_health_check(self): """Endpoint de santé.""" self.send_json_response({ 'status': 'healthy', 'version': '1.0.0-lightweight', 'mode': 'native-http' }) def send_index(self): """Page d'accueil.""" self.send_json_response({ 'message': 'Visual Workflow Builder Backend (Version Allégée)', 'version': '1.0.0-lightweight', 'mode': 'native-http', 'endpoints': ['/health', '/api/workflows'] }) def handle_workflows_get(self, path: str): """Gère les GET sur /api/workflows.""" if path == '/api/workflows' or path == '/api/workflows/': # Liste des workflows try: workflows = self.workflows_db.list_workflows() self.send_json_response([w.to_dict() for w in workflows]) except Exception as e: self.send_json_response({'error': str(e)}, 500) else: # Workflow spécifique workflow_id = path.split('/')[-1] try: workflow = self.workflows_db.get_workflow(workflow_id) if workflow: self.send_json_response(workflow.to_dict()) else: self.send_json_response({'error': 'Workflow not found'}, 404) except Exception as e: self.send_json_response({'error': str(e)}, 500) def handle_workflows_post(self, path: str): """Gère les POST sur /api/workflows.""" try: content_length = int(self.headers.get('Content-Length', 0)) if content_length > 0: post_data = self.rfile.read(content_length) data = json.loads(post_data.decode('utf-8')) else: data = {} if path == '/api/workflows' or path == '/api/workflows/': # Créer un nouveau workflow workflow = self.workflows_db.create_workflow(data) self.send_json_response(workflow.to_dict(), 201) else: self.send_json_response({'error': 'Method not allowed'}, 405) except json.JSONDecodeError: self.send_json_response({'error': 'Invalid JSON'}, 400) except Exception as e: self.send_json_response({'error': str(e)}, 500) class SimpleWorkflow: """Modèle de workflow simplifié.""" def __init__(self, id: str, name: str, description: str = "", created_by: str = "unknown"): self.id = id self.name = name self.description = description self.created_by = created_by self.created_at = datetime.now().isoformat() self.updated_at = self.created_at self.nodes = [] self.edges = [] self.variables = [] self.settings = {} self.tags = [] self.category = "default" self.is_template = False def to_dict(self) -> Dict[str, Any]: """Convertit en dictionnaire.""" return { 'id': self.id, 'name': self.name, 'description': self.description, 'created_by': self.created_by, 'created_at': self.created_at, 'updated_at': self.updated_at, 'nodes': self.nodes, 'edges': self.edges, 'variables': self.variables, 'settings': self.settings, 'tags': self.tags, 'category': self.category, 'is_template': self.is_template } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'SimpleWorkflow': """Crée depuis un dictionnaire.""" workflow = cls( id=data.get('id', f"wf_{datetime.now().strftime('%Y%m%d_%H%M%S')}"), name=data.get('name', 'Sans titre'), description=data.get('description', ''), created_by=data.get('created_by', 'unknown') ) workflow.nodes = data.get('nodes', []) workflow.edges = data.get('edges', []) workflow.variables = data.get('variables', []) workflow.settings = data.get('settings', {}) workflow.tags = data.get('tags', []) workflow.category = data.get('category', 'default') workflow.is_template = data.get('is_template', False) return workflow class WorkflowDatabase: """Base de données simple pour les workflows.""" def __init__(self): self.data_dir = Path("../../data/workflows") self.data_dir.mkdir(parents=True, exist_ok=True) print(f"📁 Base de données: {self.data_dir.absolute()}") def _get_file_path(self, workflow_id: str) -> Path: """Retourne le chemin du fichier pour un workflow.""" safe_id = "".join(c for c in workflow_id if c.isalnum() or c in ("_", "-")) return self.data_dir / f"{safe_id}.json" def create_workflow(self, data: Dict[str, Any]) -> SimpleWorkflow: """Crée un nouveau workflow.""" if 'name' not in data: raise ValueError("Le nom est requis") workflow = SimpleWorkflow.from_dict(data) self.save_workflow(workflow) return workflow def save_workflow(self, workflow: SimpleWorkflow): """Sauvegarde un workflow.""" file_path = self._get_file_path(workflow.id) with open(file_path, 'w', encoding='utf-8') as f: json.dump(workflow.to_dict(), f, ensure_ascii=False, indent=2) def get_workflow(self, workflow_id: str) -> Optional[SimpleWorkflow]: """Récupère un workflow par ID.""" file_path = self._get_file_path(workflow_id) if not file_path.exists(): return None try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) return SimpleWorkflow.from_dict(data) except Exception as e: print(f"Erreur lecture workflow {workflow_id}: {e}") return None def list_workflows(self) -> List[SimpleWorkflow]: """Liste tous les workflows.""" workflows = [] for file_path in self.data_dir.glob("*.json"): try: with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) workflows.append(SimpleWorkflow.from_dict(data)) except Exception as e: print(f"Erreur lecture {file_path}: {e}") return workflows def start_native_server(port: int = 5002): """Démarre le serveur HTTP natif.""" print(f"🚀 Démarrage du serveur natif sur le port {port}") print(f"🌐 URL: http://localhost:{port}") print(f"❤️ Health check: http://localhost:{port}/health") print(f"📋 API Workflows: http://localhost:{port}/api/workflows") print("") print("Appuyez sur Ctrl+C pour arrêter") try: with socketserver.TCPServer(("", port), WorkflowHandler) as httpd: httpd.serve_forever() except KeyboardInterrupt: print("\n🛑 Arrêt du serveur") except Exception as e: print(f"❌ Erreur serveur: {e}") def start_flask_server(port: int = 5002): """Démarre le serveur Flask si disponible.""" try: from flask import Flask, jsonify, request from flask_cors import CORS app = Flask(__name__) CORS(app, origins="*", methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"], allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"], supports_credentials=False) # Enregistrer les routes du catalogue VWB si disponibles if CATALOG_ROUTES_AVAILABLE and register_catalog_routes: try: register_catalog_routes(app) print("✅ Routes du catalogue VWB enregistrées avec succès") except Exception as e: print(f"⚠️ Erreur lors de l'enregistrement des routes catalogue: {e}") # Gestionnaire global pour les requêtes OPTIONS (CORS preflight) @app.before_request def handle_preflight(): if request.method == "OPTIONS": response = jsonify({'status': 'OK'}) response.headers.add("Access-Control-Allow-Origin", "*") response.headers.add('Access-Control-Allow-Headers', "Content-Type,Authorization,Accept,X-Requested-With") response.headers.add('Access-Control-Allow-Methods', "GET,PUT,POST,DELETE,OPTIONS") return response db = WorkflowDatabase() @app.route('/health') @app.route('/api/health') def health_check(): return jsonify({ 'status': 'healthy', 'version': '1.0.0-lightweight', 'mode': 'flask', 'features': { 'screen_capture': get_screen_capturer() is not None, 'visual_embedding': get_clip_embedder() is not None } }) @app.route('/') def index(): endpoints = [ '/health', '/api/workflows', '/api/screen-capture', '/api/visual-embedding', '/api/real-screen-capture', '/api/real-screen-capture/start', '/api/real-screen-capture/stop', '/api/real-screen-capture/status' ] # Ajouter les endpoints du catalogue si disponibles if CATALOG_ROUTES_AVAILABLE: endpoints.extend([ '/api/vwb/catalog/actions', '/api/vwb/catalog/execute', '/api/vwb/catalog/validate', '/api/vwb/catalog/health' ]) return jsonify({ 'message': 'Visual Workflow Builder Backend (Version Allégée)', 'version': '1.0.0-lightweight', 'mode': 'flask', 'endpoints': endpoints, 'features': { 'catalog_routes': CATALOG_ROUTES_AVAILABLE, 'real_capture': REAL_CAPTURE_AVAILABLE } }) @app.route('/api/workflows', methods=['GET']) def list_workflows(): try: workflows = db.list_workflows() return jsonify([w.to_dict() for w in workflows]) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows', methods=['POST']) def create_workflow(): try: data = request.get_json() or {} workflow = db.create_workflow(data) # Enregistrer dans le système d'apprentissage try: from services.learning_integration import register_workflow_for_learning register_workflow_for_learning(workflow) except ImportError: pass # Module non disponible, on continue sans return jsonify(workflow.to_dict()), 201 except Exception as e: return jsonify({'error': str(e)}), 400 @app.route('/api/workflows/', methods=['GET']) def get_workflow(workflow_id): """ Récupère un workflow spécifique par son ID. """ try: workflow = db.get_workflow(workflow_id) if workflow: return jsonify(workflow.to_dict()) else: return jsonify({'error': 'Workflow not found'}), 404 except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows/', methods=['PUT']) def update_workflow(workflow_id): """ Met à jour un workflow existant. """ try: data = request.get_json() or {} workflow = db.get_workflow(workflow_id) if not workflow: return jsonify({'error': 'Workflow not found'}), 404 # Mettre à jour les champs workflow.name = data.get('name', workflow.name) workflow.description = data.get('description', workflow.description) workflow.nodes = data.get('nodes', workflow.nodes) workflow.edges = data.get('edges', workflow.edges) workflow.variables = data.get('variables', workflow.variables) workflow.settings = data.get('settings', workflow.settings) workflow.tags = data.get('tags', workflow.tags) workflow.category = data.get('category', workflow.category) workflow.is_template = data.get('is_template', workflow.is_template) workflow.updated_at = datetime.now().isoformat() db.save_workflow(workflow) return jsonify(workflow.to_dict()) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows/', methods=['DELETE']) def delete_workflow(workflow_id): """ Supprime un workflow. """ try: workflow = db.get_workflow(workflow_id) if not workflow: return jsonify({'error': 'Workflow not found'}), 404 # Supprimer le fichier file_path = db._get_file_path(workflow_id) if file_path.exists(): file_path.unlink() return jsonify({'success': True, 'message': 'Workflow supprimé'}) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflow/execute', methods=['POST']) def execute_workflow(): """ Exécute un workflow complet. Request Body: { "workflowId": "workflow_id", "parameters": {...} } """ try: data = request.get_json() or {} workflow_id = data.get('workflowId') parameters = data.get('parameters', {}) if not workflow_id: return jsonify({ 'success': False, 'error': 'workflowId requis' }), 400 workflow = db.get_workflow(workflow_id) if not workflow: return jsonify({ 'success': False, 'error': 'Workflow non trouvé' }), 404 # Simulation d'exécution (à implémenter selon les besoins) results = [] for i, node in enumerate(workflow.nodes): results.append({ 'stepId': node.get('id', f'step_{i}'), 'success': True, 'output': f'Simulation - étape {i+1} exécutée', 'timestamp': datetime.now().isoformat() }) return jsonify({ 'success': True, 'results': results, 'workflowId': workflow_id, 'executionTime': datetime.now().isoformat() }) except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur exécution: {str(e)}' }), 500 @app.route('/api/workflow/execute-step', methods=['POST']) def execute_step(): """ Exécute une étape individuelle avec pyautogui pour les actions réelles. Request Body: { "stepId": "step_id", "stepType": "click|type|wait|...", "parameters": {...}, "workflowId": "workflow_id" (optionnel) } """ try: import pyautogui import time data = request.get_json() or {} step_id = data.get('stepId') step_type = data.get('stepType') parameters = data.get('parameters', {}) workflow_id = data.get('workflowId') if not step_id or not step_type: return jsonify({ 'success': False, 'error': 'stepId et stepType requis' }), 400 result_message = '' execution_success = True # Exécution réelle basée sur le type d'étape if step_type in ['click', 'click_anchor']: # Récupérer les coordonnées depuis plusieurs sources possibles # 1. visual_anchor.bounding_box (snake_case) # 2. visual_anchor.boundingBox (camelCase) # 3. target.boundingBox (format frontend) # 4. target.bounding_box visual_anchor = parameters.get('visual_anchor', {}) target = parameters.get('target', {}) bounding_box = ( visual_anchor.get('bounding_box') or visual_anchor.get('boundingBox') or target.get('boundingBox') or target.get('bounding_box') or {} ) # Récupérer la résolution de l'écran pour conversion des coordonnées normalisées screen_width, screen_height = pyautogui.size() print(f"📐 Résolution écran: {screen_width}x{screen_height}") if bounding_box: # Calculer le centre de la bounding box bbox_x = bounding_box.get('x', 0) bbox_y = bounding_box.get('y', 0) bbox_w = bounding_box.get('width', 0) bbox_h = bounding_box.get('height', 0) print(f"📦 BoundingBox brute: x={bbox_x}, y={bbox_y}, w={bbox_w}, h={bbox_h}") # Détecter si les coordonnées sont normalisées (0-1) # Si x ou y est < 2, on considère que c'est normalisé if 0 < bbox_x < 2: bbox_x = bbox_x * screen_width print(f"↔️ x normalisé converti: {bbox_x}") if 0 < bbox_y < 2: bbox_y = bbox_y * screen_height print(f"↕️ y normalisé converti: {bbox_y}") if 0 < bbox_w < 2: bbox_w = bbox_w * screen_width if 0 < bbox_h < 2: bbox_h = bbox_h * screen_height # Calculer le centre (utiliser / au lieu de // pour les floats) x = int(bbox_x + bbox_w / 2) y = int(bbox_y + bbox_h / 2) print(f"🎯 Centre calculé: ({x}, {y})") else: # Utiliser les coordonnées directes si disponibles x = parameters.get('x', parameters.get('click_x', 0)) y = parameters.get('y', parameters.get('click_y', 0)) print(f"📍 Coordonnées directes: ({x}, {y})") if x > 0 and y > 0: click_type = parameters.get('click_type', 'left') print(f"🖱️ Clic {click_type} à ({x}, {y})") if click_type == 'right': pyautogui.rightClick(x, y) elif click_type == 'double': pyautogui.doubleClick(x, y) else: pyautogui.click(x, y) result_message = f'Clic effectué à ({x}, {y})' else: result_message = 'Coordonnées de clic non définies - simulation' elif step_type in ['type', 'type_text', 'saisir_texte']: text = parameters.get('text', parameters.get('texte', '')) if text: print(f"⌨️ Saisie de texte: {text[:30]}...") pyautogui.typewrite(text, interval=0.05) if text.isascii() else pyautogui.write(text) result_message = f'Texte saisi: {text[:50]}...' else: result_message = 'Aucun texte à saisir' elif step_type in ['wait', 'attendre', 'wait_for_anchor']: wait_time = parameters.get('duration', parameters.get('timeout_ms', 1000)) / 1000.0 print(f"⏳ Attente de {wait_time}s") time.sleep(wait_time) result_message = f'Attente de {wait_time}s terminée' elif step_type in ['hotkey', 'raccourci']: keys = parameters.get('keys', parameters.get('touches', [])) if keys: print(f"⌨️ Raccourci: {'+'.join(keys)}") pyautogui.hotkey(*keys) result_message = f'Raccourci {"+".join(keys)} exécuté' else: result_message = 'Aucune touche définie' elif step_type in ['scroll', 'defiler']: direction = parameters.get('direction', 'down') amount = parameters.get('amount', 3) clicks = amount if direction == 'down' else -amount print(f"📜 Scroll {direction} de {amount}") pyautogui.scroll(clicks) result_message = f'Scroll {direction} effectué' else: result_message = f'Type d\'étape {step_type} - simulation' output = { 'stepId': step_id, 'stepType': step_type, 'result': result_message, 'real_execution': True, 'parameters': parameters, 'timestamp': datetime.now().isoformat() } return jsonify({ 'success': execution_success, 'output': output }) except Exception as e: import traceback print(f"❌ Erreur exécution: {traceback.format_exc()}") return jsonify({ 'success': False, 'error': f'Erreur exécution étape: {str(e)}' }), 500 @app.route('/api/workflow/validate', methods=['POST']) def validate_workflow(): """ Valide un workflow. Request Body: WorkflowData Response: { "isValid": true, "errors": [], "warnings": [] } """ try: data = request.get_json() or {} errors = [] warnings = [] # Validation basique if not data.get('name'): errors.append('Le nom du workflow est obligatoire') if not isinstance(data.get('steps', []), list): errors.append('Les étapes doivent être un tableau') if not isinstance(data.get('connections', []), list): errors.append('Les connexions doivent être un tableau') # Vérifications avancées steps = data.get('steps', []) connections = data.get('connections', []) if len(steps) == 0: warnings.append('Le workflow ne contient aucune étape') if len(steps) > 1 and len(connections) == 0: warnings.append('Le workflow contient plusieurs étapes mais aucune connexion') # Vérifier les IDs uniques step_ids = [step.get('id') for step in steps if step.get('id')] if len(step_ids) != len(set(step_ids)): errors.append('Les IDs des étapes doivent être uniques') return jsonify({ 'isValid': len(errors) == 0, 'errors': errors, 'warnings': warnings }) except Exception as e: return jsonify({ 'isValid': False, 'errors': [f'Erreur validation: {str(e)}'], 'warnings': [] }), 500 # ==================================================================== # Endpoints d'apprentissage (Learning Integration) # ==================================================================== @app.route('/api/workflows//feedback', methods=['POST']) def submit_feedback(workflow_id): """ Soumet un feedback utilisateur pour l'apprentissage. Request Body: { "success": true/false, "confidence": 0.0-1.0 (optionnel) } """ try: from services.learning_integration import ( record_workflow_execution, get_workflow_learning_state, get_workflow_stats ) data = request.get_json() or {} if 'success' not in data: return jsonify({'error': 'Le champ "success" est requis'}), 400 success = bool(data['success']) confidence = data.get('confidence', 0.95 if success else 0.3) # Enregistrer le feedback recorded = record_workflow_execution(workflow_id, success, confidence) # Récupérer l'état mis à jour new_state = get_workflow_learning_state(workflow_id) stats = get_workflow_stats(workflow_id) return jsonify({ 'message': 'Feedback enregistré', 'workflow_id': workflow_id, 'feedback_recorded': recorded, 'learning_state': new_state, 'stats': stats }) except ImportError as e: return jsonify({ 'message': 'Feedback simulé (learning_integration non disponible)', 'workflow_id': workflow_id, 'learning_state': 'OBSERVATION', 'note': str(e) }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/workflows//learning', methods=['GET']) def get_learning_info(workflow_id): """ Récupère les informations d'apprentissage d'un workflow. """ try: from services.learning_integration import ( get_workflow_learning_state, get_workflow_stats ) state = get_workflow_learning_state(workflow_id) stats = get_workflow_stats(workflow_id) if stats is None: return jsonify({ 'workflow_id': workflow_id, 'learning_state': 'NOT_REGISTERED', 'message': 'Workflow non enregistré dans le système d\'apprentissage', 'stats': None }) return jsonify({ 'workflow_id': workflow_id, 'learning_state': state, 'stats': stats, 'can_auto_execute': state in ['AUTO_CANDIDATE', 'AUTO_CONFIRMED'] }) except ImportError: return jsonify({ 'workflow_id': workflow_id, 'learning_state': 'NOT_AVAILABLE', 'message': 'Module learning_integration non disponible', 'stats': None }) except Exception as e: return jsonify({'error': str(e)}), 500 @app.route('/api/stats', methods=['GET']) def get_stats(): """ Retourne les statistiques de l'API. """ try: workflows = db.list_workflows() stats = { 'workflows': { 'total': len(workflows), 'templates': len([w for w in workflows if w.is_template]), 'categories': list(set(w.category for w in workflows)) }, 'features': { 'screen_capture': get_screen_capturer() is not None, 'visual_embedding': get_clip_embedder() is not None }, 'server': { 'version': '1.0.0-lightweight', 'mode': 'flask', 'uptime': datetime.now().isoformat() } } return jsonify(stats) except Exception as e: return jsonify({'error': str(e)}), 500 # ==================================================================== # Endpoints de capture d'écran et d'embedding visuel # ==================================================================== @app.route('/api/screen-capture', methods=['POST']) def screen_capture(): """ Capture l'écran actuel et retourne l'image en base64 (Option A - ultra stable). Request Body (optionnel): { "format": "png", // Format de l'image (png par défaut) "quality": 90 // Qualité (non utilisé pour PNG) } Response: { "success": true, "screenshot": "base64_encoded_image", "width": 1920, "height": 1080, "timestamp": "2026-01-09T...", "method": "ultra_stable_mss" } """ try: print("🔧 Capture d'écran demandée via API Flask...") # Utiliser directement la méthode ultra stable (Option A) result = capture_screen_to_base64() if result['success']: print(f"✅ Capture réussie - {result.get('width')}x{result.get('height')} ({result.get('method')})") return jsonify(result) else: print(f"❌ Capture échouée: {result.get('error')}") return jsonify(result), 500 except Exception as e: print(f"❌ Erreur lors de la capture d'écran: {e}") return jsonify({ 'success': False, 'error': f'Erreur capture d\'écran: {str(e)}', 'method': 'error_fallback' }), 500 @app.route('/api/real-screen-capture', methods=['POST']) def real_screen_capture(): """ Service de capture d'écran réelle avec détection d'éléments UI. Request Body (optionnel): { "monitor_id": 0, // ID du moniteur (0 par défaut) "detect_elements": true // Détecter les éléments UI } Response: { "success": true, "screenshot": "data:image/jpeg;base64,...", "elements": [...], // Éléments UI détectés "monitors": [...], // Liste des moniteurs "status": {...} // Statut du service } """ if not REAL_CAPTURE_AVAILABLE: return jsonify({ 'success': False, 'error': 'Service de capture d\'écran réelle non disponible' }), 503 try: data = request.get_json() or {} monitor_id = data.get('monitor_id', 0) detect_elements = data.get('detect_elements', True) print(f"🔧 Capture d'écran réelle demandée (moniteur {monitor_id})...") # Sélectionner le moniteur si spécifié if monitor_id != real_capture_service.selected_monitor: if not real_capture_service.select_monitor(monitor_id): return jsonify({ 'success': False, 'error': f'Moniteur {monitor_id} non valide' }), 400 # TOUJOURS faire une nouvelle capture (pas de cache) # Cela garantit que l'utilisateur obtient l'écran actuel print(f"📸 Exécution d'une nouvelle capture d'écran...") screenshot_array = real_capture_service._capture_screen() if screenshot_array is not None: real_capture_service.current_screenshot = screenshot_array screenshot_b64 = real_capture_service.get_current_screenshot_base64() else: screenshot_b64 = None if screenshot_b64 is None: return jsonify({ 'success': False, 'error': 'Impossible de capturer l\'écran' }), 500 # Obtenir les éléments détectés si demandé elements = [] if detect_elements: elements = real_capture_service.get_detected_elements() result = { 'success': True, 'screenshot': screenshot_b64, 'elements': elements, 'monitors': real_capture_service.get_monitors(), 'status': real_capture_service.get_status(), 'timestamp': datetime.now().isoformat(), 'method': 'real_screen_capture_service' } print(f"✅ Capture réelle réussie - {len(elements)} éléments détectés") return jsonify(result) except Exception as e: print(f"❌ Erreur capture d'écran réelle: {e}") return jsonify({ 'success': False, 'error': f'Erreur capture d\'écran réelle: {str(e)}' }), 500 @app.route('/api/real-screen-capture/start', methods=['POST']) def start_real_capture(): """ Démarre la capture d'écran en temps réel. Request Body (optionnel): { "interval": 1.0 // Intervalle en secondes } """ if not REAL_CAPTURE_AVAILABLE: return jsonify({ 'success': False, 'error': 'Service de capture d\'écran réelle non disponible' }), 503 try: data = request.get_json() or {} interval = data.get('interval', 1.0) success = real_capture_service.start_capture(interval) if success: return jsonify({ 'success': True, 'message': f'Capture temps réel démarrée (intervalle: {interval}s)', 'status': real_capture_service.get_status() }) else: return jsonify({ 'success': False, 'error': 'Impossible de démarrer la capture' }), 500 except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur: {str(e)}' }), 500 @app.route('/api/real-screen-capture/stop', methods=['POST']) def stop_real_capture(): """Arrête la capture d'écran en temps réel.""" if not REAL_CAPTURE_AVAILABLE: return jsonify({ 'success': False, 'error': 'Service de capture d\'écran réelle non disponible' }), 503 try: success = real_capture_service.stop_capture() return jsonify({ 'success': success, 'message': 'Capture temps réel arrêtée' if success else 'Aucune capture en cours', 'status': real_capture_service.get_status() }) except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur: {str(e)}' }), 500 @app.route('/api/real-screen-capture/status', methods=['GET']) def real_capture_status(): """Obtient le statut du service de capture réelle.""" if not REAL_CAPTURE_AVAILABLE: return jsonify({ 'success': False, 'error': 'Service de capture d\'écran réelle non disponible' }), 503 try: return jsonify({ 'success': True, 'status': real_capture_service.get_status(), 'monitors': real_capture_service.get_monitors() }) except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur: {str(e)}' }), 500 @app.route('/api/visual-embedding', methods=['POST']) def visual_embedding(): """ Crée un embedding visuel à partir d'une capture d'écran et d'une zone sélectionnée. Request Body: { "screenshot": "base64_encoded_image", "boundingBox": { "x": 100, "y": 200, "width": 150, "height": 50 }, "stepId": "step_123" } Response: { "success": true, "embedding": [0.1, 0.2, ...], "embedding_id": "emb_step_123_20260109_...", "dimension": 512, "reference_image": "emb_step_123_..._ref.png", "bounding_box": {...} } """ try: data = request.get_json() if not data: return jsonify({ 'success': False, 'error': 'Corps de requête JSON requis' }), 400 # Valider les paramètres requis screenshot = data.get('screenshot') bounding_box = data.get('boundingBox') step_id = data.get('stepId', 'unknown') if not screenshot: return jsonify({ 'success': False, 'error': 'Paramètre "screenshot" requis' }), 400 if not bounding_box: return jsonify({ 'success': False, 'error': 'Paramètre "boundingBox" requis' }), 400 # Créer l'embedding result = create_visual_embedding(screenshot, bounding_box, step_id) if result['success']: return jsonify(result) else: return jsonify(result), 500 except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur serveur: {str(e)}' }), 500 @app.route('/api/visual-embedding/', methods=['GET']) def get_visual_embedding(embedding_id): """ Récupère un embedding visuel existant par son ID. Response: { "success": true, "embedding_id": "emb_...", "embedding": [0.1, 0.2, ...], "reference_image_url": "/api/visual-embedding/emb_.../image" } """ try: import numpy as np embeddings_dir = ROOT_DIR / "data" / "visual_embeddings" embedding_path = embeddings_dir / f"{embedding_id}.npy" if not embedding_path.exists(): return jsonify({ 'success': False, 'error': f'Embedding "{embedding_id}" non trouvé' }), 404 embedding = np.load(str(embedding_path)) return jsonify({ 'success': True, 'embedding_id': embedding_id, 'embedding': embedding.tolist(), 'dimension': len(embedding), 'reference_image_url': f'/api/visual-embedding/{embedding_id}/image' }) except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur: {str(e)}' }), 500 @app.route('/api/visual-embedding//image', methods=['GET']) def get_embedding_reference_image(embedding_id): """ Récupère l'image de référence d'un embedding. """ try: from flask import send_file embeddings_dir = ROOT_DIR / "data" / "visual_embeddings" image_path = embeddings_dir / f"{embedding_id}_ref.png" if not image_path.exists(): return jsonify({ 'success': False, 'error': f'Image de référence non trouvée' }), 404 return send_file(str(image_path), mimetype='image/png') except Exception as e: return jsonify({ 'success': False, 'error': f'Erreur: {str(e)}' }), 500 print(f"🚀 Démarrage du serveur Flask sur le port {port}") print(f"🌐 URL: http://localhost:{port}") print(f"❤️ Health check: http://localhost:{port}/health") print(f"📋 API Workflows: http://localhost:{port}/api/workflows") print(f"📷 API Capture: http://localhost:{port}/api/screen-capture") print(f"🎯 API Embedding: http://localhost:{port}/api/visual-embedding") print(f"🔍 API Capture Réelle: http://localhost:{port}/api/real-screen-capture") print(f"📊 API Statut Capture: http://localhost:{port}/api/real-screen-capture/status") # Afficher les routes du catalogue si disponibles if CATALOG_ROUTES_AVAILABLE: print(f"📋 API Catalogue VWB: http://localhost:{port}/api/vwb/catalog/actions") print(f"⚡ API Exécution VWB: http://localhost:{port}/api/vwb/catalog/execute") print(f"✅ API Validation VWB: http://localhost:{port}/api/vwb/catalog/validate") print(f"❤️ Health Catalogue: http://localhost:{port}/api/vwb/catalog/health") app.run(host='0.0.0.0', port=port, debug=False) except ImportError as e: print(f"❌ Flask non disponible: {e}") print("🔄 Basculement vers le serveur natif...") start_native_server(port) def main(): """Fonction principale.""" print("=" * 60) print(" VISUAL WORKFLOW BUILDER - BACKEND ALLÉGÉ") print("=" * 60) print("Auteur : Dom, Alice, Kiro - 09 janvier 2026") print("") # Déterminer le port - utiliser 5003 par défaut (compatible frontend) port = int(os.getenv('PORT', 5003)) # Vérifier les dépendances try: import flask import flask_cors print("✅ Flask disponible - utilisation du mode Flask") start_flask_server(port) except ImportError: print("⚡ Flask non disponible - utilisation du serveur natif") start_native_server(port) if __name__ == '__main__': main()