Files
Dom 3ff36e3c79 refactor(audit): Nettoyage dette technique phases 1-4
Phase 1 — Code mort et duplication :
- Supprimer catalog_routes.py (-1832 lignes, doublon de v2_vlm)
- Mettre à jour app.py et app_lightweight.py vers catalog_routes_v2_vlm
- Nettoyer 9 imports inutilisés dans catalog_routes_v2_vlm.py
- Supprimer get_required_params inutilisé dans execute.py

Phase 2 — Centraliser la configuration :
- Ollama URL via os.environ.get() dans verify_text_content.py et extraire_tableau.py
- MODEL_PATH relatif au projet + var env UI_DETR_MODEL_PATH dans ui_detection_service.py

Phase 3 — Thread-safety de l'exécution :
- Ajouter _execution_lock (RLock) pour protéger _execution_state
- Remplacer le polling self-healing par threading.Event
- Initialiser 'variables' dans le dict initial (plus de création dynamique)
- Corriger bare except → except Exception as db_err avec message

Phase 4 — Logging minimal :
- Ajouter logger dans execute.py, remplacer print() critiques par logger
- Configurer RotatingFileHandler (5MB, 3 backups) dans app.py

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 08:11:45 +01:00

1452 lines
56 KiB
Python

#!/usr/bin/env python3
"""
Visual Workflow Builder - Backend Flask Application (Version Allégée)
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Version optimisée pour un démarrage rapide avec uniquement les fonctionnalités essentielles.
Cette version évite les imports lourds et les dépendances optionnelles.
Fonctionnalités :
- API REST pour la gestion des workflows
- Capture d'écran via ScreenCapturer (core/capture)
- Création d'embeddings visuels via CLIPEmbedder (core/embedding)
"""
import json
import os
import sys
import base64
import io
from pathlib import Path
from datetime import datetime
from typing import Dict, Any, List, Optional
# Ajouter le répertoire racine au path pour les imports core
ROOT_DIR = Path(__file__).parent.parent.parent
sys.path.insert(0, str(ROOT_DIR))
sys.path.insert(0, str(Path(__file__).parent))
# Import minimal sans dépendances lourdes
try:
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import socketserver
USE_FLASK = False
print("⚡ Mode serveur HTTP natif (sans Flask)")
except ImportError:
USE_FLASK = True
print("🔄 Tentative d'utilisation de Flask...")
# Import du service de capture d'écran réelle
try:
from services.real_screen_capture import real_capture_service
REAL_CAPTURE_AVAILABLE = True
print("✅ Service de capture d'écran réelle disponible")
except ImportError as e:
REAL_CAPTURE_AVAILABLE = False
real_capture_service = None
print(f"⚠️ Service de capture d'écran réelle non disponible: {e}")
# Import des routes du catalogue VWB
try:
from catalog_routes_v2_vlm import register_catalog_routes
CATALOG_ROUTES_AVAILABLE = True
print("✅ Routes du catalogue VWB disponibles")
except ImportError as e:
CATALOG_ROUTES_AVAILABLE = False
register_catalog_routes = None
print(f"⚠️ Routes du catalogue VWB non disponibles: {e}")
# ============================================================================
# Services de capture d'écran et d'embedding
# ============================================================================
# Instance globale du capturer (initialisée à la demande)
_screen_capturer = None
_clip_embedder = None
def get_screen_capturer():
"""
Obtenir l'instance du ScreenCapturer (initialisation paresseuse).
Returns:
ScreenCapturer ou None si non disponible
"""
global _screen_capturer
if _screen_capturer is None:
try:
# Vérifier les dépendances de capture d'écran
try:
import mss
print("✅ mss disponible")
except ImportError:
print("❌ mss non disponible")
try:
import pyautogui
print("✅ pyautogui disponible")
except ImportError:
print("❌ pyautogui non disponible")
from core.capture import ScreenCapturer
_screen_capturer = ScreenCapturer(buffer_size=5, detect_changes=False)
print(f"✅ ScreenCapturer initialisé avec succès - méthode: {_screen_capturer.method}")
except ImportError as e:
print(f"⚠️ ScreenCapturer non disponible: {e}")
return None
except Exception as e:
print(f"❌ Erreur initialisation ScreenCapturer: {e}")
return None
return _screen_capturer
def get_clip_embedder():
"""
Obtenir l'instance du CLIPEmbedder (initialisation paresseuse).
Returns:
CLIPEmbedder ou None si non disponible
"""
global _clip_embedder
if _clip_embedder is None:
try:
from core.embedding import create_clip_embedder
_clip_embedder = create_clip_embedder(device="cpu")
print("✅ CLIPEmbedder initialisé avec succès")
except ImportError as e:
print(f"⚠️ CLIPEmbedder non disponible: {e}")
return None
except Exception as e:
print(f"❌ Erreur initialisation CLIPEmbedder: {e}")
return None
return _clip_embedder
def capture_screen_to_base64() -> Dict[str, Any]:
"""
Capture l'écran et retourne l'image en base64 (version ultra stable - Option A).
Returns:
Dict avec 'success', 'screenshot' (base64), 'width', 'height', ou 'error'
"""
try:
# Utiliser directement le ScreenCapturer corrigé (Option A - ultra stable)
capturer = get_screen_capturer()
if capturer is None:
return {
'success': False,
'error': 'Service de capture d\'écran non disponible'
}
from PIL import Image
import numpy as np
# Capturer l'écran avec la méthode ultra stable
img_array = capturer.capture()
if img_array is None:
return {
'success': False,
'error': 'Échec de la capture d\'écran'
}
# Convertir en PIL Image
pil_image = Image.fromarray(img_array)
# Convertir en base64
buffer = io.BytesIO()
pil_image.save(buffer, format='PNG', optimize=True)
buffer.seek(0)
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
return {
'success': True,
'screenshot': screenshot_base64,
'width': pil_image.width,
'height': pil_image.height,
'timestamp': datetime.now().isoformat(),
'method': 'ultra_stable_mss'
}
except Exception as e:
return {
'success': False,
'error': f'Erreur lors de la capture: {str(e)}'
}
def create_visual_embedding(screenshot_base64: str, bounding_box: Dict[str, int], step_id: str) -> Dict[str, Any]:
"""
Crée un embedding visuel à partir d'une capture d'écran et d'une zone sélectionnée.
Args:
screenshot_base64: Image en base64
bounding_box: Zone sélectionnée {'x', 'y', 'width', 'height'}
step_id: Identifiant de l'étape
Returns:
Dict avec 'success', 'embedding', 'embedding_id', ou 'error'
"""
embedder = get_clip_embedder()
if embedder is None:
return {
'success': False,
'error': 'Service d\'embedding non disponible'
}
try:
from PIL import Image
import numpy as np
# Décoder l'image base64
image_data = base64.b64decode(screenshot_base64)
pil_image = Image.open(io.BytesIO(image_data))
# Extraire la zone sélectionnée
x = bounding_box.get('x', 0)
y = bounding_box.get('y', 0)
width = bounding_box.get('width', 100)
height = bounding_box.get('height', 100)
# Valider les coordonnées
x = max(0, min(x, pil_image.width - 1))
y = max(0, min(y, pil_image.height - 1))
width = max(10, min(width, pil_image.width - x))
height = max(10, min(height, pil_image.height - y))
# Découper la zone
cropped_image = pil_image.crop((x, y, x + width, y + height))
# Créer l'embedding
embedding = embedder.embed_image(cropped_image)
# Générer un ID unique pour l'embedding
embedding_id = f"emb_{step_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Sauvegarder l'embedding et l'image de référence
embeddings_dir = ROOT_DIR / "data" / "visual_embeddings"
embeddings_dir.mkdir(parents=True, exist_ok=True)
# Sauvegarder l'embedding en numpy
embedding_path = embeddings_dir / f"{embedding_id}.npy"
np.save(str(embedding_path), embedding)
# Sauvegarder l'image de référence
reference_path = embeddings_dir / f"{embedding_id}_ref.png"
cropped_image.save(str(reference_path))
return {
'success': True,
'embedding': embedding.tolist(),
'embedding_id': embedding_id,
'dimension': len(embedding),
'reference_image': f"{embedding_id}_ref.png",
'bounding_box': {
'x': x,
'y': y,
'width': width,
'height': height
}
}
except Exception as e:
return {
'success': False,
'error': f'Erreur lors de la création de l\'embedding: {str(e)}'
}
class WorkflowHandler(BaseHTTPRequestHandler):
"""Gestionnaire HTTP simple pour les workflows."""
def __init__(self, *args, **kwargs):
self.workflows_db = WorkflowDatabase()
super().__init__(*args, **kwargs)
def do_GET(self):
"""Gère les requêtes GET."""
parsed_path = urlparse(self.path)
path = parsed_path.path
# Headers CORS
self.send_cors_headers()
if path == '/health':
self.send_health_check()
elif path == '/':
self.send_index()
elif path.startswith('/api/workflows'):
self.handle_workflows_get(path)
else:
self.send_error(404, "Not Found")
def do_POST(self):
"""Gère les requêtes POST."""
parsed_path = urlparse(self.path)
path = parsed_path.path
self.send_cors_headers()
if path.startswith('/api/workflows'):
self.handle_workflows_post(path)
else:
self.send_error(404, "Not Found")
def do_OPTIONS(self):
"""Gère les requêtes OPTIONS pour CORS."""
self.send_cors_headers()
self.send_response(200)
self.end_headers()
def send_cors_headers(self):
"""Envoie les headers CORS."""
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, PUT, DELETE, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type, Authorization')
def send_json_response(self, data: Any, status_code: int = 200):
"""Envoie une réponse JSON."""
self.send_response(status_code)
self.send_header('Content-Type', 'application/json')
self.send_cors_headers()
self.end_headers()
json_data = json.dumps(data, ensure_ascii=False, indent=2)
self.wfile.write(json_data.encode('utf-8'))
def send_health_check(self):
"""Endpoint de santé."""
self.send_json_response({
'status': 'healthy',
'version': '1.0.0-lightweight',
'mode': 'native-http'
})
def send_index(self):
"""Page d'accueil."""
self.send_json_response({
'message': 'Visual Workflow Builder Backend (Version Allégée)',
'version': '1.0.0-lightweight',
'mode': 'native-http',
'endpoints': ['/health', '/api/workflows']
})
def handle_workflows_get(self, path: str):
"""Gère les GET sur /api/workflows."""
if path == '/api/workflows' or path == '/api/workflows/':
# Liste des workflows
try:
workflows = self.workflows_db.list_workflows()
self.send_json_response([w.to_dict() for w in workflows])
except Exception as e:
self.send_json_response({'error': str(e)}, 500)
else:
# Workflow spécifique
workflow_id = path.split('/')[-1]
try:
workflow = self.workflows_db.get_workflow(workflow_id)
if workflow:
self.send_json_response(workflow.to_dict())
else:
self.send_json_response({'error': 'Workflow not found'}, 404)
except Exception as e:
self.send_json_response({'error': str(e)}, 500)
def handle_workflows_post(self, path: str):
"""Gère les POST sur /api/workflows."""
try:
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
post_data = self.rfile.read(content_length)
data = json.loads(post_data.decode('utf-8'))
else:
data = {}
if path == '/api/workflows' or path == '/api/workflows/':
# Créer un nouveau workflow
workflow = self.workflows_db.create_workflow(data)
self.send_json_response(workflow.to_dict(), 201)
else:
self.send_json_response({'error': 'Method not allowed'}, 405)
except json.JSONDecodeError:
self.send_json_response({'error': 'Invalid JSON'}, 400)
except Exception as e:
self.send_json_response({'error': str(e)}, 500)
class SimpleWorkflow:
"""Modèle de workflow simplifié."""
def __init__(self, id: str, name: str, description: str = "", created_by: str = "unknown"):
self.id = id
self.name = name
self.description = description
self.created_by = created_by
self.created_at = datetime.now().isoformat()
self.updated_at = self.created_at
self.nodes = []
self.edges = []
self.variables = []
self.settings = {}
self.tags = []
self.category = "default"
self.is_template = False
def to_dict(self) -> Dict[str, Any]:
"""Convertit en dictionnaire."""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'created_by': self.created_by,
'created_at': self.created_at,
'updated_at': self.updated_at,
'nodes': self.nodes,
'edges': self.edges,
'variables': self.variables,
'settings': self.settings,
'tags': self.tags,
'category': self.category,
'is_template': self.is_template
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SimpleWorkflow':
"""Crée depuis un dictionnaire."""
workflow = cls(
id=data.get('id', f"wf_{datetime.now().strftime('%Y%m%d_%H%M%S')}"),
name=data.get('name', 'Sans titre'),
description=data.get('description', ''),
created_by=data.get('created_by', 'unknown')
)
workflow.nodes = data.get('nodes', [])
workflow.edges = data.get('edges', [])
workflow.variables = data.get('variables', [])
workflow.settings = data.get('settings', {})
workflow.tags = data.get('tags', [])
workflow.category = data.get('category', 'default')
workflow.is_template = data.get('is_template', False)
return workflow
class WorkflowDatabase:
"""Base de données simple pour les workflows."""
def __init__(self):
self.data_dir = Path("../../data/workflows")
self.data_dir.mkdir(parents=True, exist_ok=True)
print(f"📁 Base de données: {self.data_dir.absolute()}")
def _get_file_path(self, workflow_id: str) -> Path:
"""Retourne le chemin du fichier pour un workflow."""
safe_id = "".join(c for c in workflow_id if c.isalnum() or c in ("_", "-"))
return self.data_dir / f"{safe_id}.json"
def create_workflow(self, data: Dict[str, Any]) -> SimpleWorkflow:
"""Crée un nouveau workflow."""
if 'name' not in data:
raise ValueError("Le nom est requis")
workflow = SimpleWorkflow.from_dict(data)
self.save_workflow(workflow)
return workflow
def save_workflow(self, workflow: SimpleWorkflow):
"""Sauvegarde un workflow."""
file_path = self._get_file_path(workflow.id)
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(workflow.to_dict(), f, ensure_ascii=False, indent=2)
def get_workflow(self, workflow_id: str) -> Optional[SimpleWorkflow]:
"""Récupère un workflow par ID."""
file_path = self._get_file_path(workflow_id)
if not file_path.exists():
return None
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return SimpleWorkflow.from_dict(data)
except Exception as e:
print(f"Erreur lecture workflow {workflow_id}: {e}")
return None
def list_workflows(self) -> List[SimpleWorkflow]:
"""Liste tous les workflows."""
workflows = []
for file_path in self.data_dir.glob("*.json"):
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
workflows.append(SimpleWorkflow.from_dict(data))
except Exception as e:
print(f"Erreur lecture {file_path}: {e}")
return workflows
def start_native_server(port: int = 5002):
"""Démarre le serveur HTTP natif."""
print(f"🚀 Démarrage du serveur natif sur le port {port}")
print(f"🌐 URL: http://localhost:{port}")
print(f"❤️ Health check: http://localhost:{port}/health")
print(f"📋 API Workflows: http://localhost:{port}/api/workflows")
print("")
print("Appuyez sur Ctrl+C pour arrêter")
try:
with socketserver.TCPServer(("", port), WorkflowHandler) as httpd:
httpd.serve_forever()
except KeyboardInterrupt:
print("\n🛑 Arrêt du serveur")
except Exception as e:
print(f"❌ Erreur serveur: {e}")
def start_flask_server(port: int = 5002):
"""Démarre le serveur Flask si disponible."""
try:
from flask import Flask, jsonify, request
from flask_cors import CORS
app = Flask(__name__)
CORS(app,
origins="*",
methods=["GET", "POST", "PUT", "DELETE", "OPTIONS"],
allow_headers=["Content-Type", "Authorization", "Accept", "X-Requested-With"],
supports_credentials=False)
# Enregistrer les routes du catalogue VWB si disponibles
if CATALOG_ROUTES_AVAILABLE and register_catalog_routes:
try:
register_catalog_routes(app)
print("✅ Routes du catalogue VWB enregistrées avec succès")
except Exception as e:
print(f"⚠️ Erreur lors de l'enregistrement des routes catalogue: {e}")
# Gestionnaire global pour les requêtes OPTIONS (CORS preflight)
@app.before_request
def handle_preflight():
if request.method == "OPTIONS":
response = jsonify({'status': 'OK'})
response.headers.add("Access-Control-Allow-Origin", "*")
response.headers.add('Access-Control-Allow-Headers', "Content-Type,Authorization,Accept,X-Requested-With")
response.headers.add('Access-Control-Allow-Methods', "GET,PUT,POST,DELETE,OPTIONS")
return response
db = WorkflowDatabase()
@app.route('/health')
@app.route('/api/health')
def health_check():
return jsonify({
'status': 'healthy',
'version': '1.0.0-lightweight',
'mode': 'flask',
'features': {
'screen_capture': get_screen_capturer() is not None,
'visual_embedding': get_clip_embedder() is not None
}
})
@app.route('/')
def index():
endpoints = [
'/health',
'/api/workflows',
'/api/screen-capture',
'/api/visual-embedding',
'/api/real-screen-capture',
'/api/real-screen-capture/start',
'/api/real-screen-capture/stop',
'/api/real-screen-capture/status'
]
# Ajouter les endpoints du catalogue si disponibles
if CATALOG_ROUTES_AVAILABLE:
endpoints.extend([
'/api/vwb/catalog/actions',
'/api/vwb/catalog/execute',
'/api/vwb/catalog/validate',
'/api/vwb/catalog/health'
])
return jsonify({
'message': 'Visual Workflow Builder Backend (Version Allégée)',
'version': '1.0.0-lightweight',
'mode': 'flask',
'endpoints': endpoints,
'features': {
'catalog_routes': CATALOG_ROUTES_AVAILABLE,
'real_capture': REAL_CAPTURE_AVAILABLE
}
})
@app.route('/api/workflows', methods=['GET'])
def list_workflows():
try:
workflows = db.list_workflows()
return jsonify([w.to_dict() for w in workflows])
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/workflows', methods=['POST'])
def create_workflow():
try:
data = request.get_json() or {}
workflow = db.create_workflow(data)
# Enregistrer dans le système d'apprentissage
try:
from services.learning_integration import register_workflow_for_learning
register_workflow_for_learning(workflow)
except ImportError:
pass # Module non disponible, on continue sans
return jsonify(workflow.to_dict()), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/workflows/<workflow_id>', methods=['GET'])
def get_workflow(workflow_id):
"""
Récupère un workflow spécifique par son ID.
"""
try:
workflow = db.get_workflow(workflow_id)
if workflow:
return jsonify(workflow.to_dict())
else:
return jsonify({'error': 'Workflow not found'}), 404
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/workflows/<workflow_id>', methods=['PUT'])
def update_workflow(workflow_id):
"""
Met à jour un workflow existant.
"""
try:
data = request.get_json() or {}
workflow = db.get_workflow(workflow_id)
if not workflow:
return jsonify({'error': 'Workflow not found'}), 404
# Mettre à jour les champs
workflow.name = data.get('name', workflow.name)
workflow.description = data.get('description', workflow.description)
workflow.nodes = data.get('nodes', workflow.nodes)
workflow.edges = data.get('edges', workflow.edges)
workflow.variables = data.get('variables', workflow.variables)
workflow.settings = data.get('settings', workflow.settings)
workflow.tags = data.get('tags', workflow.tags)
workflow.category = data.get('category', workflow.category)
workflow.is_template = data.get('is_template', workflow.is_template)
workflow.updated_at = datetime.now().isoformat()
db.save_workflow(workflow)
return jsonify(workflow.to_dict())
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/workflows/<workflow_id>', methods=['DELETE'])
def delete_workflow(workflow_id):
"""
Supprime un workflow.
"""
try:
workflow = db.get_workflow(workflow_id)
if not workflow:
return jsonify({'error': 'Workflow not found'}), 404
# Supprimer le fichier
file_path = db._get_file_path(workflow_id)
if file_path.exists():
file_path.unlink()
return jsonify({'success': True, 'message': 'Workflow supprimé'})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/workflow/execute', methods=['POST'])
def execute_workflow():
"""
Exécute un workflow complet.
Request Body:
{
"workflowId": "workflow_id",
"parameters": {...}
}
"""
try:
data = request.get_json() or {}
workflow_id = data.get('workflowId')
parameters = data.get('parameters', {})
if not workflow_id:
return jsonify({
'success': False,
'error': 'workflowId requis'
}), 400
workflow = db.get_workflow(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': 'Workflow non trouvé'
}), 404
# Simulation d'exécution (à implémenter selon les besoins)
results = []
for i, node in enumerate(workflow.nodes):
results.append({
'stepId': node.get('id', f'step_{i}'),
'success': True,
'output': f'Simulation - étape {i+1} exécutée',
'timestamp': datetime.now().isoformat()
})
return jsonify({
'success': True,
'results': results,
'workflowId': workflow_id,
'executionTime': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur exécution: {str(e)}'
}), 500
@app.route('/api/workflow/execute-step', methods=['POST'])
def execute_step():
"""
Exécute une étape individuelle avec pyautogui pour les actions réelles.
Request Body:
{
"stepId": "step_id",
"stepType": "click|type|wait|...",
"parameters": {...},
"workflowId": "workflow_id" (optionnel)
}
"""
try:
import pyautogui
import time
data = request.get_json() or {}
step_id = data.get('stepId')
step_type = data.get('stepType')
parameters = data.get('parameters', {})
workflow_id = data.get('workflowId')
if not step_id or not step_type:
return jsonify({
'success': False,
'error': 'stepId et stepType requis'
}), 400
result_message = ''
execution_success = True
# Exécution réelle basée sur le type d'étape
if step_type in ['click', 'click_anchor']:
# Récupérer les coordonnées depuis plusieurs sources possibles
# 1. visual_anchor.bounding_box (snake_case)
# 2. visual_anchor.boundingBox (camelCase)
# 3. target.boundingBox (format frontend)
# 4. target.bounding_box
visual_anchor = parameters.get('visual_anchor', {})
target = parameters.get('target', {})
bounding_box = (
visual_anchor.get('bounding_box') or
visual_anchor.get('boundingBox') or
target.get('boundingBox') or
target.get('bounding_box') or
{}
)
# Récupérer la résolution de l'écran pour conversion des coordonnées normalisées
screen_width, screen_height = pyautogui.size()
print(f"📐 Résolution écran: {screen_width}x{screen_height}")
if bounding_box:
# Calculer le centre de la bounding box
bbox_x = bounding_box.get('x', 0)
bbox_y = bounding_box.get('y', 0)
bbox_w = bounding_box.get('width', 0)
bbox_h = bounding_box.get('height', 0)
print(f"📦 BoundingBox brute: x={bbox_x}, y={bbox_y}, w={bbox_w}, h={bbox_h}")
# Détecter si les coordonnées sont normalisées (0-1)
# Si x ou y est < 2, on considère que c'est normalisé
if 0 < bbox_x < 2:
bbox_x = bbox_x * screen_width
print(f"↔️ x normalisé converti: {bbox_x}")
if 0 < bbox_y < 2:
bbox_y = bbox_y * screen_height
print(f"↕️ y normalisé converti: {bbox_y}")
if 0 < bbox_w < 2:
bbox_w = bbox_w * screen_width
if 0 < bbox_h < 2:
bbox_h = bbox_h * screen_height
# Calculer le centre (utiliser / au lieu de // pour les floats)
x = int(bbox_x + bbox_w / 2)
y = int(bbox_y + bbox_h / 2)
print(f"🎯 Centre calculé: ({x}, {y})")
else:
# Utiliser les coordonnées directes si disponibles
x = parameters.get('x', parameters.get('click_x', 0))
y = parameters.get('y', parameters.get('click_y', 0))
print(f"📍 Coordonnées directes: ({x}, {y})")
if x > 0 and y > 0:
click_type = parameters.get('click_type', 'left')
print(f"🖱️ Clic {click_type} à ({x}, {y})")
if click_type == 'right':
pyautogui.rightClick(x, y)
elif click_type == 'double':
pyautogui.doubleClick(x, y)
else:
pyautogui.click(x, y)
result_message = f'Clic effectué à ({x}, {y})'
else:
result_message = 'Coordonnées de clic non définies - simulation'
elif step_type in ['type', 'type_text', 'saisir_texte']:
text = parameters.get('text', parameters.get('texte', ''))
if text:
print(f"⌨️ Saisie de texte: {text[:30]}...")
pyautogui.typewrite(text, interval=0.05) if text.isascii() else pyautogui.write(text)
result_message = f'Texte saisi: {text[:50]}...'
else:
result_message = 'Aucun texte à saisir'
elif step_type in ['wait', 'attendre', 'wait_for_anchor']:
wait_time = parameters.get('duration', parameters.get('timeout_ms', 1000)) / 1000.0
print(f"⏳ Attente de {wait_time}s")
time.sleep(wait_time)
result_message = f'Attente de {wait_time}s terminée'
elif step_type in ['hotkey', 'raccourci']:
keys = parameters.get('keys', parameters.get('touches', []))
if keys:
print(f"⌨️ Raccourci: {'+'.join(keys)}")
pyautogui.hotkey(*keys)
result_message = f'Raccourci {"+".join(keys)} exécuté'
else:
result_message = 'Aucune touche définie'
elif step_type in ['scroll', 'defiler']:
direction = parameters.get('direction', 'down')
amount = parameters.get('amount', 3)
clicks = amount if direction == 'down' else -amount
print(f"📜 Scroll {direction} de {amount}")
pyautogui.scroll(clicks)
result_message = f'Scroll {direction} effectué'
else:
result_message = f'Type d\'étape {step_type} - simulation'
output = {
'stepId': step_id,
'stepType': step_type,
'result': result_message,
'real_execution': True,
'parameters': parameters,
'timestamp': datetime.now().isoformat()
}
return jsonify({
'success': execution_success,
'output': output
})
except Exception as e:
import traceback
print(f"❌ Erreur exécution: {traceback.format_exc()}")
return jsonify({
'success': False,
'error': f'Erreur exécution étape: {str(e)}'
}), 500
@app.route('/api/workflow/validate', methods=['POST'])
def validate_workflow():
"""
Valide un workflow.
Request Body: WorkflowData
Response:
{
"isValid": true,
"errors": [],
"warnings": []
}
"""
try:
data = request.get_json() or {}
errors = []
warnings = []
# Validation basique
if not data.get('name'):
errors.append('Le nom du workflow est obligatoire')
if not isinstance(data.get('steps', []), list):
errors.append('Les étapes doivent être un tableau')
if not isinstance(data.get('connections', []), list):
errors.append('Les connexions doivent être un tableau')
# Vérifications avancées
steps = data.get('steps', [])
connections = data.get('connections', [])
if len(steps) == 0:
warnings.append('Le workflow ne contient aucune étape')
if len(steps) > 1 and len(connections) == 0:
warnings.append('Le workflow contient plusieurs étapes mais aucune connexion')
# Vérifier les IDs uniques
step_ids = [step.get('id') for step in steps if step.get('id')]
if len(step_ids) != len(set(step_ids)):
errors.append('Les IDs des étapes doivent être uniques')
return jsonify({
'isValid': len(errors) == 0,
'errors': errors,
'warnings': warnings
})
except Exception as e:
return jsonify({
'isValid': False,
'errors': [f'Erreur validation: {str(e)}'],
'warnings': []
}), 500
# ====================================================================
# Endpoints d'apprentissage (Learning Integration)
# ====================================================================
@app.route('/api/workflows/<workflow_id>/feedback', methods=['POST'])
def submit_feedback(workflow_id):
"""
Soumet un feedback utilisateur pour l'apprentissage.
Request Body:
{
"success": true/false,
"confidence": 0.0-1.0 (optionnel)
}
"""
try:
from services.learning_integration import (
record_workflow_execution,
get_workflow_learning_state,
get_workflow_stats
)
data = request.get_json() or {}
if 'success' not in data:
return jsonify({'error': 'Le champ "success" est requis'}), 400
success = bool(data['success'])
confidence = data.get('confidence', 0.95 if success else 0.3)
# Enregistrer le feedback
recorded = record_workflow_execution(workflow_id, success, confidence)
# Récupérer l'état mis à jour
new_state = get_workflow_learning_state(workflow_id)
stats = get_workflow_stats(workflow_id)
return jsonify({
'message': 'Feedback enregistré',
'workflow_id': workflow_id,
'feedback_recorded': recorded,
'learning_state': new_state,
'stats': stats
})
except ImportError as e:
return jsonify({
'message': 'Feedback simulé (learning_integration non disponible)',
'workflow_id': workflow_id,
'learning_state': 'OBSERVATION',
'note': str(e)
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/workflows/<workflow_id>/learning', methods=['GET'])
def get_learning_info(workflow_id):
"""
Récupère les informations d'apprentissage d'un workflow.
"""
try:
from services.learning_integration import (
get_workflow_learning_state,
get_workflow_stats
)
state = get_workflow_learning_state(workflow_id)
stats = get_workflow_stats(workflow_id)
if stats is None:
return jsonify({
'workflow_id': workflow_id,
'learning_state': 'NOT_REGISTERED',
'message': 'Workflow non enregistré dans le système d\'apprentissage',
'stats': None
})
return jsonify({
'workflow_id': workflow_id,
'learning_state': state,
'stats': stats,
'can_auto_execute': state in ['AUTO_CANDIDATE', 'AUTO_CONFIRMED']
})
except ImportError:
return jsonify({
'workflow_id': workflow_id,
'learning_state': 'NOT_AVAILABLE',
'message': 'Module learning_integration non disponible',
'stats': None
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""
Retourne les statistiques de l'API.
"""
try:
workflows = db.list_workflows()
stats = {
'workflows': {
'total': len(workflows),
'templates': len([w for w in workflows if w.is_template]),
'categories': list(set(w.category for w in workflows))
},
'features': {
'screen_capture': get_screen_capturer() is not None,
'visual_embedding': get_clip_embedder() is not None
},
'server': {
'version': '1.0.0-lightweight',
'mode': 'flask',
'uptime': datetime.now().isoformat()
}
}
return jsonify(stats)
except Exception as e:
return jsonify({'error': str(e)}), 500
# ====================================================================
# Endpoints de capture d'écran et d'embedding visuel
# ====================================================================
@app.route('/api/screen-capture', methods=['POST'])
def screen_capture():
"""
Capture l'écran actuel et retourne l'image en base64 (Option A - ultra stable).
Request Body (optionnel):
{
"format": "png", // Format de l'image (png par défaut)
"quality": 90 // Qualité (non utilisé pour PNG)
}
Response:
{
"success": true,
"screenshot": "base64_encoded_image",
"width": 1920,
"height": 1080,
"timestamp": "2026-01-09T...",
"method": "ultra_stable_mss"
}
"""
try:
print("🔧 Capture d'écran demandée via API Flask...")
# Utiliser directement la méthode ultra stable (Option A)
result = capture_screen_to_base64()
if result['success']:
print(f"✅ Capture réussie - {result.get('width')}x{result.get('height')} ({result.get('method')})")
return jsonify(result)
else:
print(f"❌ Capture échouée: {result.get('error')}")
return jsonify(result), 500
except Exception as e:
print(f"❌ Erreur lors de la capture d'écran: {e}")
return jsonify({
'success': False,
'error': f'Erreur capture d\'écran: {str(e)}',
'method': 'error_fallback'
}), 500
@app.route('/api/real-screen-capture', methods=['POST'])
def real_screen_capture():
"""
Service de capture d'écran réelle avec détection d'éléments UI.
Request Body (optionnel):
{
"monitor_id": 0, // ID du moniteur (0 par défaut)
"detect_elements": true // Détecter les éléments UI
}
Response:
{
"success": true,
"screenshot": "data:image/jpeg;base64,...",
"elements": [...], // Éléments UI détectés
"monitors": [...], // Liste des moniteurs
"status": {...} // Statut du service
}
"""
if not REAL_CAPTURE_AVAILABLE:
return jsonify({
'success': False,
'error': 'Service de capture d\'écran réelle non disponible'
}), 503
try:
data = request.get_json() or {}
monitor_id = data.get('monitor_id', 0)
detect_elements = data.get('detect_elements', True)
print(f"🔧 Capture d'écran réelle demandée (moniteur {monitor_id})...")
# Sélectionner le moniteur si spécifié
if monitor_id != real_capture_service.selected_monitor:
if not real_capture_service.select_monitor(monitor_id):
return jsonify({
'success': False,
'error': f'Moniteur {monitor_id} non valide'
}), 400
# TOUJOURS faire une nouvelle capture (pas de cache)
# Cela garantit que l'utilisateur obtient l'écran actuel
print(f"📸 Exécution d'une nouvelle capture d'écran...")
screenshot_array = real_capture_service._capture_screen()
if screenshot_array is not None:
real_capture_service.current_screenshot = screenshot_array
screenshot_b64 = real_capture_service.get_current_screenshot_base64()
else:
screenshot_b64 = None
if screenshot_b64 is None:
return jsonify({
'success': False,
'error': 'Impossible de capturer l\'écran'
}), 500
# Obtenir les éléments détectés si demandé
elements = []
if detect_elements:
elements = real_capture_service.get_detected_elements()
result = {
'success': True,
'screenshot': screenshot_b64,
'elements': elements,
'monitors': real_capture_service.get_monitors(),
'status': real_capture_service.get_status(),
'timestamp': datetime.now().isoformat(),
'method': 'real_screen_capture_service'
}
print(f"✅ Capture réelle réussie - {len(elements)} éléments détectés")
return jsonify(result)
except Exception as e:
print(f"❌ Erreur capture d'écran réelle: {e}")
return jsonify({
'success': False,
'error': f'Erreur capture d\'écran réelle: {str(e)}'
}), 500
@app.route('/api/real-screen-capture/start', methods=['POST'])
def start_real_capture():
"""
Démarre la capture d'écran en temps réel.
Request Body (optionnel):
{
"interval": 1.0 // Intervalle en secondes
}
"""
if not REAL_CAPTURE_AVAILABLE:
return jsonify({
'success': False,
'error': 'Service de capture d\'écran réelle non disponible'
}), 503
try:
data = request.get_json() or {}
interval = data.get('interval', 1.0)
success = real_capture_service.start_capture(interval)
if success:
return jsonify({
'success': True,
'message': f'Capture temps réel démarrée (intervalle: {interval}s)',
'status': real_capture_service.get_status()
})
else:
return jsonify({
'success': False,
'error': 'Impossible de démarrer la capture'
}), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur: {str(e)}'
}), 500
@app.route('/api/real-screen-capture/stop', methods=['POST'])
def stop_real_capture():
"""Arrête la capture d'écran en temps réel."""
if not REAL_CAPTURE_AVAILABLE:
return jsonify({
'success': False,
'error': 'Service de capture d\'écran réelle non disponible'
}), 503
try:
success = real_capture_service.stop_capture()
return jsonify({
'success': success,
'message': 'Capture temps réel arrêtée' if success else 'Aucune capture en cours',
'status': real_capture_service.get_status()
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur: {str(e)}'
}), 500
@app.route('/api/real-screen-capture/status', methods=['GET'])
def real_capture_status():
"""Obtient le statut du service de capture réelle."""
if not REAL_CAPTURE_AVAILABLE:
return jsonify({
'success': False,
'error': 'Service de capture d\'écran réelle non disponible'
}), 503
try:
return jsonify({
'success': True,
'status': real_capture_service.get_status(),
'monitors': real_capture_service.get_monitors()
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur: {str(e)}'
}), 500
@app.route('/api/visual-embedding', methods=['POST'])
def visual_embedding():
"""
Crée un embedding visuel à partir d'une capture d'écran et d'une zone sélectionnée.
Request Body:
{
"screenshot": "base64_encoded_image",
"boundingBox": {
"x": 100,
"y": 200,
"width": 150,
"height": 50
},
"stepId": "step_123"
}
Response:
{
"success": true,
"embedding": [0.1, 0.2, ...],
"embedding_id": "emb_step_123_20260109_...",
"dimension": 512,
"reference_image": "emb_step_123_..._ref.png",
"bounding_box": {...}
}
"""
try:
data = request.get_json()
if not data:
return jsonify({
'success': False,
'error': 'Corps de requête JSON requis'
}), 400
# Valider les paramètres requis
screenshot = data.get('screenshot')
bounding_box = data.get('boundingBox')
step_id = data.get('stepId', 'unknown')
if not screenshot:
return jsonify({
'success': False,
'error': 'Paramètre "screenshot" requis'
}), 400
if not bounding_box:
return jsonify({
'success': False,
'error': 'Paramètre "boundingBox" requis'
}), 400
# Créer l'embedding
result = create_visual_embedding(screenshot, bounding_box, step_id)
if result['success']:
return jsonify(result)
else:
return jsonify(result), 500
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur serveur: {str(e)}'
}), 500
@app.route('/api/visual-embedding/<embedding_id>', methods=['GET'])
def get_visual_embedding(embedding_id):
"""
Récupère un embedding visuel existant par son ID.
Response:
{
"success": true,
"embedding_id": "emb_...",
"embedding": [0.1, 0.2, ...],
"reference_image_url": "/api/visual-embedding/emb_.../image"
}
"""
try:
import numpy as np
embeddings_dir = ROOT_DIR / "data" / "visual_embeddings"
embedding_path = embeddings_dir / f"{embedding_id}.npy"
if not embedding_path.exists():
return jsonify({
'success': False,
'error': f'Embedding "{embedding_id}" non trouvé'
}), 404
embedding = np.load(str(embedding_path))
return jsonify({
'success': True,
'embedding_id': embedding_id,
'embedding': embedding.tolist(),
'dimension': len(embedding),
'reference_image_url': f'/api/visual-embedding/{embedding_id}/image'
})
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur: {str(e)}'
}), 500
@app.route('/api/visual-embedding/<embedding_id>/image', methods=['GET'])
def get_embedding_reference_image(embedding_id):
"""
Récupère l'image de référence d'un embedding.
"""
try:
from flask import send_file
embeddings_dir = ROOT_DIR / "data" / "visual_embeddings"
image_path = embeddings_dir / f"{embedding_id}_ref.png"
if not image_path.exists():
return jsonify({
'success': False,
'error': f'Image de référence non trouvée'
}), 404
return send_file(str(image_path), mimetype='image/png')
except Exception as e:
return jsonify({
'success': False,
'error': f'Erreur: {str(e)}'
}), 500
print(f"🚀 Démarrage du serveur Flask sur le port {port}")
print(f"🌐 URL: http://localhost:{port}")
print(f"❤️ Health check: http://localhost:{port}/health")
print(f"📋 API Workflows: http://localhost:{port}/api/workflows")
print(f"📷 API Capture: http://localhost:{port}/api/screen-capture")
print(f"🎯 API Embedding: http://localhost:{port}/api/visual-embedding")
print(f"🔍 API Capture Réelle: http://localhost:{port}/api/real-screen-capture")
print(f"📊 API Statut Capture: http://localhost:{port}/api/real-screen-capture/status")
# Afficher les routes du catalogue si disponibles
if CATALOG_ROUTES_AVAILABLE:
print(f"📋 API Catalogue VWB: http://localhost:{port}/api/vwb/catalog/actions")
print(f"⚡ API Exécution VWB: http://localhost:{port}/api/vwb/catalog/execute")
print(f"✅ API Validation VWB: http://localhost:{port}/api/vwb/catalog/validate")
print(f"❤️ Health Catalogue: http://localhost:{port}/api/vwb/catalog/health")
app.run(host='0.0.0.0', port=port, debug=False)
except ImportError as e:
print(f"❌ Flask non disponible: {e}")
print("🔄 Basculement vers le serveur natif...")
start_native_server(port)
def main():
"""Fonction principale."""
print("=" * 60)
print(" VISUAL WORKFLOW BUILDER - BACKEND ALLÉGÉ")
print("=" * 60)
print("Auteur : Dom, Alice, Kiro - 09 janvier 2026")
print("")
# Déterminer le port - utiliser 5003 par défaut (compatible frontend)
port = int(os.getenv('PORT', 5003))
# Vérifier les dépendances
try:
import flask
import flask_cors
print("✅ Flask disponible - utilisation du mode Flask")
start_flask_server(port)
except ImportError:
print("⚡ Flask non disponible - utilisation du serveur natif")
start_native_server(port)
if __name__ == '__main__':
main()