feat(vwb-v3): Architecture Thin Client fonctionnelle

API = Source de vérité unique (SQLite + Flask)
- Backend: API v3 avec session, workflow, capture, execute
- Frontend: Vanilla TypeScript, pas de state local
- Contrats stricts pour les actions RPA
- Drag & drop pour réorganiser les étapes
- Insertion d'étapes entre deux existantes
- Bibliothèque de captures (sessionStorage)
- Exécution avec coordonnées statiques (pyautogui)

Fonctionne mais fragile (coordonnées fixes, pas de détection visuelle)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-23 12:07:13 +01:00
parent a9a53991bc
commit 858e6007f9
23 changed files with 6813 additions and 6 deletions

View File

@@ -0,0 +1,18 @@
"""
API v3 - RPA Vision Thin Client
Source de vérité unique, pas de logique frontend
Auteur: Dom, Alice, Kiro - 23 janvier 2026
"""
from flask import Blueprint
api_v3_bp = Blueprint('api_v3', __name__, url_prefix='/api/v3')
# Import des routes après la création du blueprint pour éviter les imports circulaires
from . import session
from . import workflow
from . import capture
from . import execute
__all__ = ['api_v3_bp']

View File

@@ -0,0 +1,318 @@
"""
API v3 - Capture et Sélection Visuelle
Gestion des captures d'écran et création d'ancres visuelles
POST /api/v3/capture/screen → Capture écran
POST /api/v3/capture/select → Crée ancre depuis sélection
GET /api/v3/anchor/{id}/image → Image de l'ancre
"""
from flask import jsonify, request, send_file
from datetime import datetime
import uuid
import os
import base64
from io import BytesIO
from PIL import Image
from . import api_v3_bp
from db.models import db, Step, VisualAnchor, get_session_state
# Dossier pour stocker les images d'ancres
ANCHORS_DIR = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'data', 'anchors')
os.makedirs(ANCHORS_DIR, exist_ok=True)
def generate_id(prefix: str) -> str:
"""Génère un ID unique"""
return f"{prefix}_{uuid.uuid4().hex[:12]}_{int(datetime.now().timestamp())}"
@api_v3_bp.route('/capture/screen', methods=['POST'])
def capture_screen():
"""
Capture l'écran actuel.
Response:
{
"success": true,
"capture": {
"screenshot_base64": "...",
"width": 1920,
"height": 1080,
"timestamp": "..."
}
}
"""
try:
import pyautogui
# Capture écran
screenshot = pyautogui.screenshot()
width, height = screenshot.size
# Convertir en base64
buffer = BytesIO()
screenshot.save(buffer, format='PNG')
screenshot_base64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
# Stocker dans la session
session = get_session_state()
session.last_capture = {
'screenshot_base64': screenshot_base64,
'width': width,
'height': height,
'timestamp': datetime.now().isoformat()
}
print(f"📸 [API v3] Capture écran: {width}x{height}")
return jsonify({
'success': True,
'capture': session.last_capture
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/capture/select', methods=['POST'])
def select_anchor():
"""
Crée une ancre visuelle à partir d'une sélection utilisateur.
Request:
{
"step_id": "step_123", // Étape à laquelle associer l'ancre
"bbox": {
"x": 100,
"y": 200,
"width": 50,
"height": 30
},
"description": "Bouton Valider", // Optionnel
"screenshot_base64": "..." // Optionnel - si fourni, utilise cette capture au lieu de session.last_capture
}
L'image est extraite de la capture fournie ou de la dernière capture (session.last_capture).
Response:
{
"success": true,
"workflow": { ... },
"step": { ... },
"anchor": { ... }
}
"""
try:
data = request.get_json() or {}
step_id = data.get('step_id')
bbox = data.get('bbox')
description = data.get('description', '')
screenshot_base64 = data.get('screenshot_base64') # Capture optionnelle directe
if not step_id:
return jsonify({
'success': False,
'error': "step_id requis"
}), 400
if not bbox:
return jsonify({
'success': False,
'error': "bbox requis"
}), 400
# Vérifier que l'étape existe
step = Step.query.get(step_id)
if not step:
return jsonify({
'success': False,
'error': f"Étape '{step_id}' non trouvée"
}), 404
# Récupérer la capture (fournie ou depuis la session)
session = get_session_state()
if screenshot_base64:
# Utiliser la capture fournie directement
print(f"📸 [API v3] Utilisation de la capture fournie dans la requête")
elif session.last_capture:
screenshot_base64 = session.last_capture['screenshot_base64']
else:
return jsonify({
'success': False,
'error': "Aucune capture disponible. Fournissez screenshot_base64 ou appelez /capture/screen d'abord."
}), 400
# Décoder l'image
img_data = base64.b64decode(screenshot_base64)
img = Image.open(BytesIO(img_data))
# Extraire la zone sélectionnée
x = int(bbox.get('x', 0))
y = int(bbox.get('y', 0))
w = int(bbox.get('width', 100))
h = int(bbox.get('height', 100))
# Valider les coordonnées
x = max(0, min(x, img.width - 1))
y = max(0, min(y, img.height - 1))
w = max(10, min(w, img.width - x))
h = max(10, min(h, img.height - y))
# Créer l'ancre
anchor_id = generate_id('anchor')
# Sauvegarder l'image complète de référence
image_path = os.path.join(ANCHORS_DIR, f"{anchor_id}_full.png")
img.save(image_path, 'PNG')
# Créer et sauvegarder la miniature (zone sélectionnée)
thumbnail = img.crop((x, y, x + w, y + h))
thumbnail_path = os.path.join(ANCHORS_DIR, f"{anchor_id}_thumb.png")
thumbnail.save(thumbnail_path, 'PNG')
# Créer l'enregistrement en base
# Utiliser les dimensions de l'image décodée (pas de session.last_capture qui peut être None)
anchor = VisualAnchor(
id=anchor_id,
image_path=image_path,
thumbnail_path=thumbnail_path,
bbox_x=x,
bbox_y=y,
bbox_width=w,
bbox_height=h,
screen_width=img.width,
screen_height=img.height,
description=description
)
db.session.add(anchor)
# Associer l'ancre à l'étape
step.anchor_id = anchor_id
# Mettre à jour les paramètres de l'étape avec l'ancre
params = step.parameters or {}
params['visual_anchor'] = {
'anchor_id': anchor_id,
'bounding_box': {'x': x, 'y': y, 'width': w, 'height': h}
}
step.parameters = params
db.session.commit()
from db.models import Workflow
workflow = Workflow.query.get(step.workflow_id)
print(f"✅ [API v3] Ancre créée: {anchor_id} pour étape {step_id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict(),
'step': step.to_dict(),
'anchor': anchor.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/anchor/<anchor_id>/image', methods=['GET'])
def get_anchor_image(anchor_id: str):
"""Retourne l'image complète de l'ancre"""
try:
anchor = VisualAnchor.query.get(anchor_id)
if not anchor or not anchor.image_path:
return jsonify({
'success': False,
'error': f"Ancre '{anchor_id}' non trouvée"
}), 404
if not os.path.exists(anchor.image_path):
return jsonify({
'success': False,
'error': "Fichier image non trouvé"
}), 404
return send_file(anchor.image_path, mimetype='image/png')
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/anchor/<anchor_id>/thumbnail', methods=['GET'])
def get_anchor_thumbnail(anchor_id: str):
"""Retourne la miniature de l'ancre (zone sélectionnée)"""
try:
anchor = VisualAnchor.query.get(anchor_id)
if not anchor or not anchor.thumbnail_path:
return jsonify({
'success': False,
'error': f"Ancre '{anchor_id}' non trouvée"
}), 404
if not os.path.exists(anchor.thumbnail_path):
return jsonify({
'success': False,
'error': "Fichier miniature non trouvé"
}), 404
return send_file(anchor.thumbnail_path, mimetype='image/png')
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/anchor/<anchor_id>/base64', methods=['GET'])
def get_anchor_base64(anchor_id: str):
"""Retourne l'image de l'ancre en base64 (pour exécution)"""
try:
anchor = VisualAnchor.query.get(anchor_id)
if not anchor or not anchor.image_path:
return jsonify({
'success': False,
'error': f"Ancre '{anchor_id}' non trouvée"
}), 404
if not os.path.exists(anchor.image_path):
return jsonify({
'success': False,
'error': "Fichier image non trouvé"
}), 404
with open(anchor.image_path, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
return jsonify({
'success': True,
'anchor_id': anchor_id,
'image_base64': image_base64,
'bounding_box': {
'x': anchor.bbox_x,
'y': anchor.bbox_y,
'width': anchor.bbox_width,
'height': anchor.bbox_height
}
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -0,0 +1,504 @@
"""
API v3 - Exécution de Workflows
Gestion de l'exécution avec validation de contrats
POST /api/v3/execute/start → Lance l'exécution
POST /api/v3/execute/pause → Met en pause
POST /api/v3/execute/resume → Reprend
POST /api/v3/execute/stop → Arrête
GET /api/v3/execute/status → État actuel
"""
from flask import jsonify, request
from datetime import datetime
import uuid
import threading
import time
import base64
import os
from . import api_v3_bp
from db.models import db, Workflow, Step, Execution, ExecutionStep, VisualAnchor, get_session_state
from contracts.action_contracts import enforce_action_contract, ContractValidationError, get_required_params
def generate_id(prefix: str) -> str:
"""Génère un ID unique"""
return f"{prefix}_{uuid.uuid4().hex[:12]}_{int(datetime.now().timestamp())}"
# État de l'exécution en cours (en mémoire)
_execution_state = {
'is_running': False,
'is_paused': False,
'should_stop': False,
'current_execution_id': None,
'thread': None
}
def execute_workflow_thread(execution_id: str, workflow_id: str, app):
"""
Thread d'exécution du workflow.
Exécute chaque étape séquentiellement avec validation de contrat.
"""
global _execution_state
with app.app_context():
try:
execution = Execution.query.get(execution_id)
workflow = Workflow.query.get(workflow_id)
if not execution or not workflow:
print(f"❌ [Execute] Workflow ou exécution non trouvé")
return
steps = workflow.steps.order_by(Step.order).all()
execution.total_steps = len(steps)
execution.status = 'running'
execution.started_at = datetime.utcnow()
db.session.commit()
print(f"🚀 [Execute] Démarrage workflow {workflow_id}: {len(steps)} étapes")
for index, step in enumerate(steps):
# Vérifier si arrêt demandé
if _execution_state['should_stop']:
print(f"⛔ [Execute] Arrêt demandé")
execution.status = 'cancelled'
break
# Attendre si en pause
while _execution_state['is_paused'] and not _execution_state['should_stop']:
time.sleep(0.1)
if _execution_state['should_stop']:
execution.status = 'cancelled'
break
# Mettre à jour la progression
execution.current_step_index = index
db.session.commit()
# Créer l'enregistrement de résultat
step_result = ExecutionStep(
execution_id=execution_id,
step_id=step.id,
status='running',
started_at=datetime.utcnow()
)
db.session.add(step_result)
db.session.commit()
print(f"📋 [Execute] Étape {index + 1}/{len(steps)}: {step.action_type} (id={step.id})")
try:
# === VALIDATION CONTRAT STRICT ===
params = step.parameters or {}
# Si l'étape a une ancre, charger ses données
if step.anchor_id:
anchor = VisualAnchor.query.get(step.anchor_id)
if anchor:
# Charger l'image base64 depuis le fichier
if anchor.image_path and os.path.exists(anchor.image_path):
with open(anchor.image_path, 'rb') as f:
image_base64 = base64.b64encode(f.read()).decode('utf-8')
else:
image_base64 = None
params['visual_anchor'] = {
'anchor_id': anchor.id,
'screenshot': image_base64,
'bounding_box': {
'x': anchor.bbox_x,
'y': anchor.bbox_y,
'width': anchor.bbox_width,
'height': anchor.bbox_height
},
'metadata': {
'screen_resolution': {
'width': anchor.screen_width,
'height': anchor.screen_height
}
}
}
# Valider le contrat
try:
enforce_action_contract(step.action_type, params)
except ContractValidationError as e:
print(f"🚫 [Execute] CONTRAT VIOLÉ pour étape {step.id}: {e}")
step_result.status = 'error'
step_result.error_message = f"Contrat violé: {str(e)}"
step_result.ended_at = datetime.utcnow()
execution.failed_steps += 1
db.session.commit()
# Arrêter sur violation de contrat
execution.status = 'error'
execution.error_message = f"Contrat violé à l'étape {index + 1}: {str(e)}"
break
# === EXÉCUTION DE L'ACTION ===
result = execute_action(step.action_type, params)
step_result.ended_at = datetime.utcnow()
step_result.duration_ms = int((step_result.ended_at - step_result.started_at).total_seconds() * 1000)
if result.get('success'):
step_result.status = 'success'
step_result.output = result.get('output', {})
execution.completed_steps += 1
print(f"✅ [Execute] Étape {index + 1} réussie")
else:
step_result.status = 'error'
step_result.error_message = result.get('error', 'Erreur inconnue')
execution.failed_steps += 1
print(f"❌ [Execute] Étape {index + 1} échouée: {step_result.error_message}")
# Arrêter sur erreur
execution.status = 'error'
execution.error_message = f"Erreur à l'étape {index + 1}: {step_result.error_message}"
db.session.commit()
break
db.session.commit()
except Exception as e:
print(f"❌ [Execute] Exception étape {index + 1}: {e}")
step_result.status = 'error'
step_result.error_message = str(e)
step_result.ended_at = datetime.utcnow()
execution.failed_steps += 1
execution.status = 'error'
execution.error_message = f"Exception à l'étape {index + 1}: {str(e)}"
db.session.commit()
break
# Finaliser l'exécution
if execution.status == 'running':
execution.status = 'completed'
execution.ended_at = datetime.utcnow()
db.session.commit()
print(f"🏁 [Execute] Workflow terminé: {execution.status}")
print(f" Complétées: {execution.completed_steps}, Échouées: {execution.failed_steps}")
except Exception as e:
print(f"❌ [Execute] Erreur fatale: {e}")
try:
execution = Execution.query.get(execution_id)
if execution:
execution.status = 'error'
execution.error_message = f"Erreur fatale: {str(e)}"
execution.ended_at = datetime.utcnow()
db.session.commit()
except:
pass
finally:
_execution_state['is_running'] = False
_execution_state['current_execution_id'] = None
def execute_action(action_type: str, params: dict) -> dict:
"""
Exécute une action RPA.
Utilise pyautogui pour les interactions.
"""
import pyautogui
import time
try:
if action_type in ['click_anchor', 'click', 'double_click_anchor', 'right_click_anchor']:
# Récupérer les coordonnées depuis l'ancre
anchor = params.get('visual_anchor', {})
bbox = anchor.get('bounding_box', {})
if not bbox:
return {'success': False, 'error': 'Pas de bounding_box dans visual_anchor'}
# Calculer le centre
x = bbox.get('x', 0) + bbox.get('width', 0) / 2
y = bbox.get('y', 0) + bbox.get('height', 0) / 2
# TODO: Utiliser la détection visuelle (OmniParser/VLM) ici
# Pour l'instant, on utilise les coordonnées statiques
print(f"🖱️ [Action] Clic à ({x}, {y})")
if action_type == 'double_click_anchor':
pyautogui.doubleClick(x, y)
elif action_type == 'right_click_anchor':
pyautogui.rightClick(x, y)
else:
pyautogui.click(x, y)
return {'success': True, 'output': {'clicked_at': {'x': x, 'y': y}}}
elif action_type in ['type_text', 'type']:
text = params.get('text', '')
if not text:
return {'success': False, 'error': 'Pas de texte à saisir'}
print(f"⌨️ [Action] Saisie: {text[:30]}...")
# Petit délai pour s'assurer que le focus est bon
time.sleep(0.2)
if text.isascii():
pyautogui.typewrite(text, interval=0.05)
else:
pyautogui.write(text)
return {'success': True, 'output': {'typed': text}}
elif action_type in ['wait_for_anchor', 'wait']:
timeout_ms = params.get('timeout_ms', params.get('timeout', 5000))
print(f"⏳ [Action] Attente {timeout_ms}ms")
time.sleep(timeout_ms / 1000)
return {'success': True, 'output': {'waited_ms': timeout_ms}}
elif action_type == 'keyboard_shortcut':
keys = params.get('keys', [])
if not keys:
return {'success': False, 'error': 'Pas de touches définies'}
print(f"⌨️ [Action] Raccourci: {'+'.join(keys)}")
pyautogui.hotkey(*keys)
return {'success': True, 'output': {'hotkey': keys}}
else:
return {'success': False, 'error': f"Type d'action non supporté: {action_type}"}
except Exception as e:
return {'success': False, 'error': str(e)}
@api_v3_bp.route('/execute/start', methods=['POST'])
def start_execution():
"""
Lance l'exécution d'un workflow.
Request:
{
"workflow_id": "wf_123" // Optionnel, utilise le workflow actif sinon
}
"""
global _execution_state
try:
if _execution_state['is_running']:
return jsonify({
'success': False,
'error': "Une exécution est déjà en cours"
}), 400
data = request.get_json() or {}
workflow_id = data.get('workflow_id')
# Utiliser le workflow actif si non spécifié
if not workflow_id:
session = get_session_state()
workflow_id = session.active_workflow_id
if not workflow_id:
return jsonify({
'success': False,
'error': "Aucun workflow spécifié ou actif"
}), 400
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
if workflow.steps.count() == 0:
return jsonify({
'success': False,
'error': "Le workflow n'a aucune étape"
}), 400
# Créer l'exécution
execution = Execution(
id=generate_id('exec'),
workflow_id=workflow_id,
status='pending'
)
db.session.add(execution)
db.session.commit()
# Mettre à jour la session
session = get_session_state()
session.active_execution_id = execution.id
# Réinitialiser l'état
_execution_state['is_running'] = True
_execution_state['is_paused'] = False
_execution_state['should_stop'] = False
_execution_state['current_execution_id'] = execution.id
# Lancer le thread d'exécution
from flask import current_app
app = current_app._get_current_object()
thread = threading.Thread(
target=execute_workflow_thread,
args=(execution.id, workflow_id, app)
)
thread.daemon = True
thread.start()
_execution_state['thread'] = thread
print(f"🚀 [API v3] Exécution lancée: {execution.id}")
return jsonify({
'success': True,
'execution': execution.to_dict(),
'session': session.to_dict()
})
except Exception as e:
db.session.rollback()
_execution_state['is_running'] = False
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/execute/pause', methods=['POST'])
def pause_execution():
"""Met en pause l'exécution"""
global _execution_state
if not _execution_state['is_running']:
return jsonify({
'success': False,
'error': "Aucune exécution en cours"
}), 400
_execution_state['is_paused'] = True
execution = Execution.query.get(_execution_state['current_execution_id'])
if execution:
execution.status = 'paused'
db.session.commit()
print(f"⏸️ [API v3] Exécution en pause")
return jsonify({
'success': True,
'execution': execution.to_dict() if execution else None
})
@api_v3_bp.route('/execute/resume', methods=['POST'])
def resume_execution():
"""Reprend l'exécution"""
global _execution_state
if not _execution_state['is_running']:
return jsonify({
'success': False,
'error': "Aucune exécution en cours"
}), 400
if not _execution_state['is_paused']:
return jsonify({
'success': False,
'error': "L'exécution n'est pas en pause"
}), 400
_execution_state['is_paused'] = False
execution = Execution.query.get(_execution_state['current_execution_id'])
if execution:
execution.status = 'running'
db.session.commit()
print(f"▶️ [API v3] Exécution reprise")
return jsonify({
'success': True,
'execution': execution.to_dict() if execution else None
})
@api_v3_bp.route('/execute/stop', methods=['POST'])
def stop_execution():
"""Arrête l'exécution"""
global _execution_state
if not _execution_state['is_running']:
return jsonify({
'success': False,
'error': "Aucune exécution en cours"
}), 400
_execution_state['should_stop'] = True
_execution_state['is_paused'] = False
print(f"⛔ [API v3] Arrêt demandé")
# Attendre un peu que le thread réagisse
time.sleep(0.5)
execution = Execution.query.get(_execution_state['current_execution_id'])
session = get_session_state()
session.active_execution_id = None
return jsonify({
'success': True,
'execution': execution.to_dict() if execution else None,
'session': session.to_dict()
})
@api_v3_bp.route('/execute/status', methods=['GET'])
def get_execution_status():
"""Retourne l'état de l'exécution en cours"""
global _execution_state
session = get_session_state()
execution = None
if session.active_execution_id:
execution = Execution.query.get(session.active_execution_id)
return jsonify({
'success': True,
'is_running': _execution_state['is_running'],
'is_paused': _execution_state['is_paused'],
'execution': execution.to_dict() if execution else None,
'session': session.to_dict()
})
@api_v3_bp.route('/execute/history', methods=['GET'])
def get_execution_history():
"""Retourne l'historique des exécutions"""
try:
workflow_id = request.args.get('workflow_id')
query = Execution.query.order_by(Execution.started_at.desc())
if workflow_id:
query = query.filter_by(workflow_id=workflow_id)
executions = query.limit(50).all()
return jsonify({
'success': True,
'executions': [e.to_dict() for e in executions]
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -0,0 +1,157 @@
"""
API v3 - Session State
Endpoint principal qui retourne l'état complet de l'UI
GET /api/v3/session/state → Tout ce que le frontend doit afficher
"""
from flask import jsonify
from . import api_v3_bp
from db.models import db, Workflow, Step, Execution, get_session_state
@api_v3_bp.route('/session/state', methods=['GET'])
def get_state():
"""
Retourne l'état complet de l'UI.
Le frontend thin client appelle cet endpoint et affiche ce qu'il reçoit.
C'est LA source de vérité.
Response:
{
"session": {
"active_workflow_id": "...",
"selected_step_id": "...",
"active_execution_id": "..."
},
"workflow": { ... } ou null,
"execution": { ... } ou null,
"workflows_list": [ {id, name, step_count, updated_at}, ... ]
}
"""
try:
session = get_session_state()
# Workflow actif
active_workflow = None
if session.active_workflow_id:
wf = Workflow.query.get(session.active_workflow_id)
if wf:
active_workflow = wf.to_dict()
# Exécution active
active_execution = None
if session.active_execution_id:
exe = Execution.query.get(session.active_execution_id)
if exe:
active_execution = exe.to_dict()
# Liste des workflows (résumé)
workflows_list = []
for wf in Workflow.query.filter_by(is_active=True).order_by(Workflow.updated_at.desc()).all():
workflows_list.append({
'id': wf.id,
'name': wf.name,
'step_count': wf.steps.count(),
'updated_at': wf.updated_at.isoformat() if wf.updated_at else None
})
return jsonify({
'success': True,
'session': session.to_dict(),
'workflow': active_workflow,
'execution': active_execution,
'workflows_list': workflows_list
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/session/select-workflow/<workflow_id>', methods=['POST'])
def select_workflow(workflow_id: str):
"""Sélectionne un workflow comme actif"""
try:
session = get_session_state()
# Vérifier que le workflow existe
wf = Workflow.query.get(workflow_id)
if not wf:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
session.active_workflow_id = workflow_id
session.selected_step_id = None
return jsonify({
'success': True,
'session': session.to_dict(),
'workflow': wf.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/session/select-step/<step_id>', methods=['POST'])
def select_step(step_id: str):
"""Sélectionne une étape"""
try:
session = get_session_state()
# Vérifier que l'étape existe
step = Step.query.get(step_id)
if not step:
return jsonify({
'success': False,
'error': f"Étape '{step_id}' non trouvée"
}), 404
session.selected_step_id = step_id
# S'assurer que le workflow parent est actif
if session.active_workflow_id != step.workflow_id:
session.active_workflow_id = step.workflow_id
return jsonify({
'success': True,
'session': session.to_dict(),
'step': step.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/session/clear', methods=['POST'])
def clear_session():
"""Réinitialise la session"""
try:
session = get_session_state()
session.active_workflow_id = None
session.selected_step_id = None
session.active_execution_id = None
session.last_capture = None
return jsonify({
'success': True,
'session': session.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -0,0 +1,403 @@
"""
API v3 - Workflow CRUD
Gestion des workflows et étapes
POST /api/v3/workflow/create
GET /api/v3/workflow/{id}
POST /api/v3/workflow/{id}/step
PUT /api/v3/workflow/{id}/step/{step_id}
DELETE /api/v3/workflow/{id}/step/{step_id}
"""
from flask import jsonify, request
from datetime import datetime
import uuid
from . import api_v3_bp
from db.models import db, Workflow, Step, VisualAnchor, get_session_state
from contracts.action_contracts import enforce_action_contract, ContractValidationError, get_required_params
def generate_id(prefix: str) -> str:
"""Génère un ID unique"""
return f"{prefix}_{uuid.uuid4().hex[:12]}_{int(datetime.now().timestamp())}"
@api_v3_bp.route('/workflow/create', methods=['POST'])
def create_workflow():
"""
Crée un nouveau workflow.
Request:
{
"name": "Mon workflow",
"description": "Description optionnelle"
}
Response:
{
"success": true,
"workflow": { ... },
"session": { ... }
}
"""
try:
data = request.get_json() or {}
name = data.get('name', f"Workflow {datetime.now().strftime('%Y-%m-%d %H:%M')}")
description = data.get('description', '')
workflow = Workflow(
id=generate_id('wf'),
name=name,
description=description
)
db.session.add(workflow)
db.session.commit()
# Activer ce workflow dans la session
session = get_session_state()
session.active_workflow_id = workflow.id
session.selected_step_id = None
print(f"✅ [API v3] Workflow créé: {workflow.id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict(),
'session': session.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>', methods=['GET'])
def get_workflow(workflow_id: str):
"""Récupère un workflow complet"""
try:
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
return jsonify({
'success': True,
'workflow': workflow.to_dict()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>', methods=['DELETE'])
def delete_workflow(workflow_id: str):
"""Supprime un workflow"""
try:
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
# Désactiver si c'est le workflow actif
session = get_session_state()
if session.active_workflow_id == workflow_id:
session.active_workflow_id = None
session.selected_step_id = None
db.session.delete(workflow)
db.session.commit()
print(f"🗑️ [API v3] Workflow supprimé: {workflow_id}")
return jsonify({
'success': True,
'deleted_id': workflow_id,
'session': session.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>/step', methods=['POST'])
def add_step(workflow_id: str):
"""
Ajoute une étape au workflow.
Request:
{
"action_type": "click_anchor",
"position": {"x": 100, "y": 200},
"parameters": {},
"label": "Clic sur bouton",
"insert_after": "step_123" // Optionnel: insérer après cette étape
}
Response:
{
"success": true,
"workflow": { ... }, // Workflow complet mis à jour
"step": { ... }, // Nouvelle étape
"needs_anchor": true // Si l'action requiert une ancre visuelle
}
"""
try:
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
data = request.get_json() or {}
action_type = data.get('action_type')
if not action_type:
return jsonify({
'success': False,
'error': "action_type requis"
}), 400
position = data.get('position', {'x': 100, 'y': 100})
parameters = data.get('parameters', {})
label = data.get('label', action_type)
insert_after = data.get('insert_after') # ID de l'étape après laquelle insérer
# Calculer l'ordre
if insert_after:
# Insérer après une étape spécifique
prev_step = Step.query.filter_by(id=insert_after, workflow_id=workflow_id).first()
if not prev_step:
return jsonify({
'success': False,
'error': f"Étape '{insert_after}' non trouvée"
}), 404
new_order = prev_step.order + 1
# Décaler toutes les étapes suivantes
Step.query.filter(
Step.workflow_id == workflow_id,
Step.order >= new_order
).update({Step.order: Step.order + 1})
else:
# Ajouter à la fin
max_order = db.session.query(db.func.max(Step.order)).filter(
Step.workflow_id == workflow_id
).scalar() or -1
new_order = max_order + 1
step = Step(
id=generate_id('step'),
workflow_id=workflow_id,
action_type=action_type,
order=new_order,
position_x=position.get('x', 100),
position_y=position.get('y', 100),
label=label
)
step.parameters = parameters
db.session.add(step)
db.session.commit()
# Sélectionner cette étape
session = get_session_state()
session.selected_step_id = step.id
# Vérifier si l'action requiert une ancre visuelle
required_params = get_required_params(action_type)
needs_anchor = 'visual_anchor' in required_params
print(f"✅ [API v3] Étape ajoutée: {step.id} ({action_type}) au workflow {workflow_id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict(),
'step': step.to_dict(),
'needs_anchor': needs_anchor,
'session': session.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>/step/<step_id>', methods=['PUT'])
def update_step(workflow_id: str, step_id: str):
"""
Met à jour une étape.
Request:
{
"action_type": "...", // Optionnel
"position": {"x": ..., "y": ...}, // Optionnel
"parameters": {...}, // Optionnel
"label": "...", // Optionnel
"anchor_id": "..." // Optionnel
}
"""
try:
step = Step.query.filter_by(id=step_id, workflow_id=workflow_id).first()
if not step:
return jsonify({
'success': False,
'error': f"Étape '{step_id}' non trouvée dans le workflow '{workflow_id}'"
}), 404
data = request.get_json() or {}
# Mettre à jour les champs fournis
if 'action_type' in data:
step.action_type = data['action_type']
if 'position' in data:
step.position_x = data['position'].get('x', step.position_x)
step.position_y = data['position'].get('y', step.position_y)
if 'parameters' in data:
step.parameters = data['parameters']
if 'label' in data:
step.label = data['label']
if 'anchor_id' in data:
# Vérifier que l'ancre existe
if data['anchor_id']:
anchor = VisualAnchor.query.get(data['anchor_id'])
if not anchor:
return jsonify({
'success': False,
'error': f"Ancre '{data['anchor_id']}' non trouvée"
}), 404
step.anchor_id = data['anchor_id']
step.updated_at = datetime.utcnow()
db.session.commit()
workflow = Workflow.query.get(workflow_id)
print(f"✅ [API v3] Étape mise à jour: {step_id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict(),
'step': step.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>/step/<step_id>', methods=['DELETE'])
def delete_step(workflow_id: str, step_id: str):
"""Supprime une étape"""
try:
step = Step.query.filter_by(id=step_id, workflow_id=workflow_id).first()
if not step:
return jsonify({
'success': False,
'error': f"Étape '{step_id}' non trouvée"
}), 404
deleted_order = step.order
db.session.delete(step)
# Réordonner les étapes suivantes
Step.query.filter(
Step.workflow_id == workflow_id,
Step.order > deleted_order
).update({Step.order: Step.order - 1})
db.session.commit()
# Désélectionner si c'était l'étape sélectionnée
session = get_session_state()
if session.selected_step_id == step_id:
session.selected_step_id = None
workflow = Workflow.query.get(workflow_id)
print(f"🗑️ [API v3] Étape supprimée: {step_id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict(),
'session': session.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500
@api_v3_bp.route('/workflow/<workflow_id>/reorder', methods=['POST'])
def reorder_steps(workflow_id: str):
"""
Réordonne les étapes d'un workflow.
Request:
{
"step_ids": ["step_1", "step_2", "step_3"] // Nouvel ordre
}
"""
try:
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
data = request.get_json() or {}
step_ids = data.get('step_ids', [])
# Mettre à jour l'ordre de chaque étape
for index, step_id in enumerate(step_ids):
step = Step.query.filter_by(id=step_id, workflow_id=workflow_id).first()
if step:
step.order = index
db.session.commit()
print(f"🔄 [API v3] Étapes réordonnées pour workflow {workflow_id}")
return jsonify({
'success': True,
'workflow': workflow.to_dict()
})
except Exception as e:
db.session.rollback()
return jsonify({
'success': False,
'error': str(e)
}), 500

View File

@@ -9,7 +9,6 @@ for real-time execution updates.
from flask import Flask
from flask_cors import CORS
from flask_socketio import SocketIO
from flask_sqlalchemy import SQLAlchemy
from flask_caching import Cache
import os
from dotenv import load_dotenv
@@ -22,14 +21,15 @@ app = Flask(__name__)
# Configuration
app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'dev-secret-key-change-in-production')
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///workflows.db')
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vwb_v3.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['MAX_CONTENT_LENGTH'] = 10 * 1024 * 1024 # 10MB max upload
app.config['CACHE_TYPE'] = 'redis' if os.getenv('REDIS_URL') else 'simple'
app.config['CACHE_REDIS_URL'] = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
# Initialize extensions
db = SQLAlchemy(app)
# Initialize extensions - Use db from v3 models (source of truth)
from db.models import db
db.init_app(app)
cache = Cache(app)
socketio = SocketIO(
app,
@@ -126,6 +126,40 @@ try:
except ImportError as e:
print(f"⚠️ Blueprint coaching_sessions désactivé: {e}")
# Catalogue VWB - actions VisionOnly
# V2 avec VLM (Vision Language Model) pour détection intelligente
try:
from catalog_routes_v2_vlm import catalog_bp
app.register_blueprint(catalog_bp)
print("✅ Blueprint catalog V2 VLM (Ollama qwen2.5vl) enregistré")
except ImportError as e:
print(f"⚠️ Blueprint catalog V2 VLM désactivé: {e}")
# Fallback sur la version pyautogui
try:
from catalog_routes import catalog_bp
app.register_blueprint(catalog_bp)
print("✅ Blueprint catalog (fallback pyautogui) enregistré")
except ImportError as e2:
print(f"⚠️ Blueprint catalog désactivé: {e2}")
# API Images Ancres Visuelles - stockage serveur
try:
from api.anchor_images import anchor_images_bp
app.register_blueprint(anchor_images_bp)
print("✅ Blueprint anchor_images enregistré")
except ImportError as e:
print(f"⚠️ Blueprint anchor_images désactivé: {e}")
# ============================================================
# API V3 - Thin Client Architecture (Source de Vérité Unique)
# ============================================================
try:
from api_v3 import api_v3_bp
app.register_blueprint(api_v3_bp)
print("✅ Blueprint API v3 (Thin Client) enregistré - /api/v3/*")
except ImportError as e:
print(f"⚠️ Blueprint API v3 désactivé: {e}")
# Import WebSocket handlers (optional)
try:
@@ -158,9 +192,92 @@ def handle_exception(error):
# Health check endpoint
@app.route('/health')
@app.route('/api/health')
def health_check():
"""Health check endpoint for monitoring"""
return {'status': 'healthy', 'version': '1.0.0'}
from flask import jsonify
return jsonify({'status': 'healthy', 'version': '1.0.0'})
# Workflow execution endpoint (proxy to catalog execute)
@app.route('/api/workflow/execute-step', methods=['POST'])
def execute_workflow_step():
"""Execute a workflow step via the catalog execute endpoint"""
from flask import jsonify, request
import requests
try:
data = request.get_json() or {}
step_id = data.get('stepId', f'step_{int(__import__("time").time() * 1000)}')
step_type = data.get('stepType', 'click_anchor')
parameters = data.get('parameters', {})
# DEBUG: Écrire les données reçues dans un fichier
import json as json_module
with open('/tmp/vwb_debug.log', 'a') as debug_file:
debug_file.write(f"\n{'='*60}\n")
debug_file.write(f"[execute-step] stepType={step_type}, stepId={step_id}\n")
debug_file.write(f"[execute-step] parameters keys: {list(parameters.keys())}\n")
if 'visual_anchor' in parameters:
va = parameters['visual_anchor']
debug_file.write(f"[execute-step] visual_anchor keys: {list(va.keys()) if va else 'None'}\n")
debug_file.write(f"[execute-step] visual_anchor.id: {va.get('id')}\n")
debug_file.write(f"[execute-step] visual_anchor.thumbnail_url: {va.get('thumbnail_url') or (va.get('metadata', {}) or {}).get('thumbnail_url')}\n")
debug_file.write(f"[execute-step] FULL visual_anchor: {json_module.dumps(va, default=str)[:500]}\n")
debug_file.flush()
# Convert to catalog execute format
catalog_request = {
'type': step_type,
'step_id': step_id,
'parameters': parameters
}
# Call the internal catalog execute endpoint
from catalog_routes import catalog_bp
# Direct execution via catalog
try:
# Import the execute function directly
from catalog_routes import execute_action as catalog_execute
# We need to simulate Flask request context - use internal call
from flask import current_app
with current_app.test_request_context(
'/api/vwb/catalog/execute',
method='POST',
data=__import__('json').dumps(catalog_request),
content_type='application/json'
):
response = catalog_execute()
if hasattr(response, 'get_json'):
result = response.get_json()
else:
result = __import__('json').loads(response[0].get_data(as_text=True))
# Convert to expected format
if result.get('success') and result.get('result'):
return jsonify({
'success': result['result'].get('status') == 'success',
'output': result['result'].get('output_data', {}),
'error': result['result'].get('error', {}).get('message') if result['result'].get('error') else None
})
else:
return jsonify({
'success': False,
'error': result.get('error', 'Échec de l\'exécution')
})
except Exception as inner_e:
print(f"❌ Erreur exécution interne: {inner_e}")
return jsonify({
'success': False,
'error': str(inner_e)
})
except Exception as e:
print(f"❌ Erreur execute-step: {e}")
return jsonify({
'success': False,
'error': str(e)
}), 500
# Create database tables
with app.app_context():
@@ -195,7 +312,7 @@ except Exception as e:
print(f"❌ Erreur lors de l'initialisation des services visuels: {e}")
if __name__ == '__main__':
port = int(os.getenv('PORT', 5002))
port = int(os.getenv('PORT', 5000))
debug = os.getenv('FLASK_ENV') == 'development'
socketio.run(

View File

@@ -0,0 +1,54 @@
"""
Contrats de Données VWB - Module d'initialisation
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Ce module contient les contrats de données spécifiques au Visual Workflow Builder
pour les actions VisionOnly RPA.
Contrats disponibles :
- VWBActionError : Gestion des erreurs d'actions
- VWBEvidence : Preuves d'exécution avec screenshots
- VWBVisualAnchor : Ancres visuelles pour sélection d'éléments UI
"""
from .error import VWBActionError, VWBErrorType, VWBErrorSeverity
from .evidence import VWBEvidence, VWBEvidenceType
from .visual_anchor import VWBVisualAnchor, VWBVisualAnchorType
from .action_contracts import (
ActionContract,
ContractViolation,
ContractViolationType,
ContractValidationError,
VWB_ACTION_CONTRACTS,
validate_action_contract,
enforce_action_contract,
get_action_contract,
get_required_params,
list_all_action_types
)
__all__ = [
'VWBActionError',
'VWBErrorType',
'VWBErrorSeverity',
'VWBEvidence',
'VWBEvidenceType',
'VWBVisualAnchor',
'VWBVisualAnchorType',
# Contrats d'actions
'ActionContract',
'ContractViolation',
'ContractViolationType',
'ContractValidationError',
'VWB_ACTION_CONTRACTS',
'validate_action_contract',
'enforce_action_contract',
'get_action_contract',
'get_required_params',
'list_all_action_types'
]
__version__ = '1.0.0'
__author__ = 'Dom, Alice, Kiro'
__date__ = '09 janvier 2026'

View File

@@ -0,0 +1,403 @@
"""
Contrats Stricts des Actions VWB - Définition et Validation
Auteur : Dom, Alice, Kiro - 23 janvier 2026
Ce module définit les contrats stricts pour chaque action VWB.
Chaque action a des paramètres OBLIGATOIRES qui doivent être présents
pour que l'exécution soit autorisée.
PRINCIPE CLÉ: Si le contrat n'est pas respecté → BLOQUER l'exécution
avec un message d'erreur clair.
"""
from dataclasses import dataclass, field
from typing import Dict, List, Any, Optional, Set, Callable
from enum import Enum
class ContractViolationType(Enum):
"""Types de violation de contrat."""
MISSING_REQUIRED = "missing_required" # Paramètre obligatoire manquant
INVALID_TYPE = "invalid_type" # Mauvais type de valeur
INVALID_VALUE = "invalid_value" # Valeur invalide
FORBIDDEN_PARAM = "forbidden_param" # Paramètre interdit présent
INCOMPATIBLE_ACTION = "incompatible_action" # Type d'action incompatible avec params
@dataclass
class ContractViolation:
"""Représente une violation de contrat."""
violation_type: ContractViolationType
parameter: str
message: str
expected: Optional[str] = None
received: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"type": self.violation_type.value,
"parameter": self.parameter,
"message": self.message,
"expected": self.expected,
"received": self.received
}
@dataclass
class ActionContract:
"""Définition du contrat d'une action VWB."""
action_type: str
description: str
required_params: List[str]
optional_params: List[str] = field(default_factory=list)
param_validators: Dict[str, Callable[[Any], bool]] = field(default_factory=dict)
# Actions qui ne peuvent PAS avoir certains paramètres
forbidden_if_missing: Dict[str, str] = field(default_factory=dict)
def validate(self, parameters: Dict[str, Any]) -> List[ContractViolation]:
"""
Valide les paramètres contre le contrat.
Returns:
Liste de violations (vide si tout est OK)
"""
violations = []
# 1. Vérifier les paramètres obligatoires
for param in self.required_params:
if param not in parameters or parameters[param] is None:
violations.append(ContractViolation(
violation_type=ContractViolationType.MISSING_REQUIRED,
parameter=param,
message=f"Paramètre obligatoire '{param}' manquant pour l'action '{self.action_type}'",
expected=f"'{param}' doit être fourni",
received="absent ou None"
))
elif param in self.param_validators:
# Valider le contenu du paramètre
if not self.param_validators[param](parameters[param]):
violations.append(ContractViolation(
violation_type=ContractViolationType.INVALID_VALUE,
parameter=param,
message=f"Valeur invalide pour '{param}' dans l'action '{self.action_type}'",
expected="valeur valide selon les règles du contrat",
received=str(type(parameters[param]).__name__)
))
return violations
def has_visual_anchor(params: Dict[str, Any]) -> bool:
"""Vérifie si visual_anchor est présent et valide."""
anchor = params.get('visual_anchor') or params.get('target')
if not anchor:
return False
if not isinstance(anchor, dict):
return False
# Doit avoir soit une image, soit des coordonnées
has_image = bool(
anchor.get('screenshot') or
anchor.get('image') or
anchor.get('reference_image_base64') or
anchor.get('id') # ID d'ancre stockée sur le serveur
)
has_coords = bool(
anchor.get('bounding_box') or
anchor.get('boundingBox')
)
return has_image or has_coords
def has_text(params: Dict[str, Any]) -> bool:
"""Vérifie si text est présent et non vide."""
text = params.get('text') or params.get('text_to_type') or params.get('texte')
return bool(text and isinstance(text, str) and len(text.strip()) > 0)
def has_timeout(params: Dict[str, Any]) -> bool:
"""Vérifie si timeout est présent et valide."""
timeout = params.get('timeout') or params.get('timeout_ms') or params.get('max_wait_time_ms')
if timeout is None:
return True # Optionnel, une valeur par défaut sera utilisée
try:
return int(timeout) > 0
except (ValueError, TypeError):
return False
# =============================================================================
# DÉFINITION DES CONTRATS POUR CHAQUE ACTION VWB
# =============================================================================
VWB_ACTION_CONTRACTS: Dict[str, ActionContract] = {
# --- ACTIONS DE CLIC ---
"click_anchor": ActionContract(
action_type="click_anchor",
description="Clic sur un élément identifié par ancre visuelle",
required_params=["visual_anchor"],
optional_params=["click_type", "click_offset_x", "click_offset_y", "confidence_threshold"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"double_click_anchor": ActionContract(
action_type="double_click_anchor",
description="Double-clic sur un élément identifié par ancre visuelle",
required_params=["visual_anchor"],
optional_params=["click_offset_x", "click_offset_y", "confidence_threshold"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"right_click_anchor": ActionContract(
action_type="right_click_anchor",
description="Clic droit sur un élément identifié par ancre visuelle",
required_params=["visual_anchor"],
optional_params=["click_offset_x", "click_offset_y", "confidence_threshold"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"hover_anchor": ActionContract(
action_type="hover_anchor",
description="Survol d'un élément identifié par ancre visuelle",
required_params=["visual_anchor"],
optional_params=["hover_duration_ms", "confidence_threshold"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
# --- ACTIONS DE SAISIE ---
"type_text": ActionContract(
action_type="type_text",
description="Saisie de texte (PAS de clic automatique, le focus doit être déjà fait)",
required_params=["text"],
optional_params=["typing_speed_ms", "clear_field_first", "press_enter_after"],
param_validators={"text": lambda p: bool(p and isinstance(p, str))}
),
"type_secret": ActionContract(
action_type="type_secret",
description="Saisie sécurisée de texte sensible (mot de passe)",
required_params=["secret_text"],
optional_params=["typing_speed_ms", "clear_field_first", "mask_in_evidence"],
param_validators={"secret_text": lambda p: bool(p and isinstance(p, str))}
),
# --- ACTIONS DE FOCUS ---
"focus_anchor": ActionContract(
action_type="focus_anchor",
description="Donne le focus à un élément (clic pour activer)",
required_params=["visual_anchor"],
optional_params=["focus_method", "verify_focus", "confidence_threshold"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
# --- ACTIONS D'ATTENTE ---
"wait_for_anchor": ActionContract(
action_type="wait_for_anchor",
description="Attendre qu'un élément apparaisse ou disparaisse",
required_params=["visual_anchor"],
optional_params=["wait_mode", "max_wait_time_ms", "check_interval_ms"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
# --- ACTIONS DE SCROLL ---
"scroll_to_anchor": ActionContract(
action_type="scroll_to_anchor",
description="Défiler jusqu'à ce qu'un élément soit visible",
required_params=["visual_anchor"],
optional_params=["scroll_direction", "scroll_speed", "max_scroll_attempts", "target_position"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"drag_drop_anchor": ActionContract(
action_type="drag_drop_anchor",
description="Glisser-déposer d'un élément vers un autre",
required_params=["source_anchor", "target_anchor"],
optional_params=["drag_speed", "hold_duration_ms"],
param_validators={
"source_anchor": lambda p: has_visual_anchor({"visual_anchor": p}),
"target_anchor": lambda p: has_visual_anchor({"visual_anchor": p})
}
),
# --- ACTIONS CLAVIER ---
"keyboard_shortcut": ActionContract(
action_type="keyboard_shortcut",
description="Exécuter un raccourci clavier",
required_params=["keys"],
optional_params=["hold_duration_ms"],
param_validators={"keys": lambda p: isinstance(p, list) and len(p) > 0}
),
# --- ACTIONS D'EXTRACTION ---
"extract_text": ActionContract(
action_type="extract_text",
description="Extraire du texte d'une zone identifiée par ancre",
required_params=["visual_anchor"],
optional_params=["extraction_mode", "text_filters", "output_format", "output_variable"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"extract_table": ActionContract(
action_type="extract_table",
description="Extraire un tableau d'une zone identifiée par ancre",
required_params=["visual_anchor"],
optional_params=["table_format", "output_variable"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"screenshot_evidence": ActionContract(
action_type="screenshot_evidence",
description="Capturer une preuve visuelle (screenshot)",
required_params=[], # Aucun paramètre obligatoire
optional_params=["region", "label", "include_timestamp"]
),
# --- ACTIONS CONDITIONNELLES ---
"visual_condition": ActionContract(
action_type="visual_condition",
description="Condition basée sur présence/absence d'élément visuel",
required_params=["visual_anchor"],
optional_params=["condition_type", "timeout_ms"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"loop_visual": ActionContract(
action_type="loop_visual",
description="Boucle tant qu'un élément est visible",
required_params=["visual_anchor"],
optional_params=["max_iterations", "timeout_ms"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
# --- ACTIONS DONNÉES ---
"download_to_folder": ActionContract(
action_type="download_to_folder",
description="Télécharger un fichier vers un dossier",
required_params=["target_folder"],
optional_params=["filename_pattern", "timeout_ms"]
),
"ai_analyze_text": ActionContract(
action_type="ai_analyze_text",
description="Analyser du texte avec IA",
required_params=["visual_anchor", "analysis_prompt"],
optional_params=["model", "output_variable"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"db_save_data": ActionContract(
action_type="db_save_data",
description="Sauvegarder des données en base",
required_params=["data", "table_name"],
optional_params=["connection_id"]
),
"db_read_data": ActionContract(
action_type="db_read_data",
description="Lire des données depuis la base",
required_params=["query"],
optional_params=["connection_id", "output_variable"]
),
# --- ACTIONS DE VÉRIFICATION ---
"verify_element_exists": ActionContract(
action_type="verify_element_exists",
description="Vérifier qu'un élément existe à l'écran",
required_params=["visual_anchor"],
optional_params=["timeout_ms", "should_exist"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
"verify_text_content": ActionContract(
action_type="verify_text_content",
description="Vérifier le contenu textuel d'un élément",
required_params=["visual_anchor", "expected_text"],
optional_params=["match_mode", "case_sensitive"],
param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})}
),
}
class ContractValidationError(Exception):
"""Exception levée quand un contrat n'est pas respecté."""
def __init__(self, violations: List[ContractViolation], action_type: str):
self.violations = violations
self.action_type = action_type
messages = [v.message for v in violations]
super().__init__(f"Contrat violé pour '{action_type}': {'; '.join(messages)}")
def to_dict(self) -> Dict[str, Any]:
return {
"error": "contract_violation",
"action_type": self.action_type,
"violations": [v.to_dict() for v in self.violations],
"message": str(self)
}
def validate_action_contract(action_type: str, parameters: Dict[str, Any]) -> List[ContractViolation]:
"""
Valide les paramètres d'une action contre son contrat.
Args:
action_type: Type de l'action (ex: "click_anchor", "type_text")
parameters: Paramètres fournis pour l'action
Returns:
Liste de violations (vide si tout est OK)
Raises:
ContractValidationError si le contrat n'est pas respecté
"""
# Normaliser le type d'action
action_type_normalized = action_type.lower().strip()
# Vérifier si l'action existe
if action_type_normalized not in VWB_ACTION_CONTRACTS:
# Action non reconnue - on laisse passer avec un warning
print(f"⚠️ [Contract] Action '{action_type}' non reconnue dans les contrats")
return []
contract = VWB_ACTION_CONTRACTS[action_type_normalized]
violations = contract.validate(parameters)
return violations
def enforce_action_contract(action_type: str, parameters: Dict[str, Any]) -> None:
"""
Valide et BLOQUE si le contrat n'est pas respecté.
Args:
action_type: Type de l'action
parameters: Paramètres fournis
Raises:
ContractValidationError si le contrat n'est pas respecté
"""
violations = validate_action_contract(action_type, parameters)
if violations:
print(f"🚫 [Contract] VIOLATION DÉTECTÉE pour '{action_type}':")
for v in violations:
print(f" - {v.parameter}: {v.message}")
raise ContractValidationError(violations, action_type)
print(f"✅ [Contract] Contrat respecté pour '{action_type}'")
def get_action_contract(action_type: str) -> Optional[ActionContract]:
"""Retourne le contrat d'une action."""
return VWB_ACTION_CONTRACTS.get(action_type.lower().strip())
def get_required_params(action_type: str) -> List[str]:
"""Retourne la liste des paramètres obligatoires pour une action."""
contract = get_action_contract(action_type)
return contract.required_params if contract else []
def list_all_action_types() -> List[str]:
"""Retourne la liste de tous les types d'actions avec contrat."""
return list(VWB_ACTION_CONTRACTS.keys())

View File

@@ -0,0 +1,291 @@
"""
Contrat de Données VWB - Gestion des Erreurs d'Actions
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Ce module définit les contrats pour la gestion des erreurs dans les actions
VisionOnly du Visual Workflow Builder.
Classes :
- VWBErrorType : Types d'erreurs possibles
- VWBErrorSeverity : Niveaux de gravité
- VWBActionError : Contrat principal pour les erreurs d'actions
"""
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
class VWBErrorType(Enum):
"""Types d'erreurs possibles dans les actions VWB."""
# Erreurs de capture d'écran
SCREEN_CAPTURE_FAILED = "screen_capture_failed"
SCREEN_CAPTURE_TIMEOUT = "screen_capture_timeout"
# Erreurs de détection d'éléments
ELEMENT_NOT_FOUND = "element_not_found"
ELEMENT_NOT_VISIBLE = "element_not_visible"
ELEMENT_NOT_CLICKABLE = "element_not_clickable"
MULTIPLE_ELEMENTS_FOUND = "multiple_elements_found"
# Erreurs d'interaction
CLICK_FAILED = "click_failed"
TYPE_TEXT_FAILED = "type_text_failed"
WAIT_TIMEOUT = "wait_timeout"
# Erreurs de validation
VALIDATION_FAILED = "validation_failed"
PARAMETER_INVALID = "parameter_invalid"
ANCHOR_INVALID = "anchor_invalid"
# Erreurs système
SYSTEM_ERROR = "system_error"
NETWORK_ERROR = "network_error"
PERMISSION_DENIED = "permission_denied"
# Erreurs de configuration
CONFIG_ERROR = "config_error"
DEPENDENCY_MISSING = "dependency_missing"
class VWBErrorSeverity(Enum):
"""Niveaux de gravité des erreurs VWB."""
INFO = "info" # Information, pas d'impact
WARNING = "warning" # Avertissement, impact mineur
ERROR = "error" # Erreur, impact modéré
CRITICAL = "critical" # Erreur critique, arrêt nécessaire
FATAL = "fatal" # Erreur fatale, système compromis
@dataclass
class VWBActionError:
"""
Contrat de données pour les erreurs d'actions VWB.
Cette classe encapsule toutes les informations nécessaires pour
diagnostiquer et résoudre les erreurs survenant lors de l'exécution
des actions VisionOnly dans le Visual Workflow Builder.
"""
# Identification de l'erreur
error_id: str
error_type: VWBErrorType
severity: VWBErrorSeverity
# Message et description
message: str
description: str
# Contexte d'exécution
action_id: str
step_id: str
# Informations temporelles
timestamp: datetime
# Détails techniques
technical_details: Dict[str, Any]
# Suggestions de résolution
suggestions: List[str]
# Paramètres optionnels avec valeurs par défaut
workflow_id: Optional[str] = None
execution_time_ms: Optional[float] = None
stack_trace: Optional[str] = None
retry_possible: bool = True
user_id: Optional[str] = None
session_id: Optional[str] = None
environment: str = "development"
def __post_init__(self):
"""Validation et initialisation post-création."""
if not self.error_id:
self.error_id = f"err_{self.action_id}_{int(self.timestamp.timestamp())}"
if not self.technical_details:
self.technical_details = {}
if not self.suggestions:
self.suggestions = self._generate_default_suggestions()
def _generate_default_suggestions(self) -> List[str]:
"""Génère des suggestions par défaut selon le type d'erreur."""
suggestions_map = {
VWBErrorType.SCREEN_CAPTURE_FAILED: [
"Vérifiez que l'écran est accessible",
"Redémarrez le service de capture d'écran",
"Vérifiez les permissions d'accès à l'écran"
],
VWBErrorType.ELEMENT_NOT_FOUND: [
"Vérifiez que l'élément est visible à l'écran",
"Ajustez les paramètres de détection",
"Attendez que la page soit complètement chargée"
],
VWBErrorType.CLICK_FAILED: [
"Vérifiez que l'élément est cliquable",
"Attendez que l'interface soit stable",
"Réessayez avec un délai plus long"
],
VWBErrorType.VALIDATION_FAILED: [
"Vérifiez les paramètres de l'action",
"Consultez la documentation de l'action",
"Contactez l'administrateur si le problème persiste"
]
}
return suggestions_map.get(self.error_type, [
"Consultez les logs pour plus de détails",
"Réessayez l'opération",
"Contactez le support technique si nécessaire"
])
def to_dict(self) -> Dict[str, Any]:
"""Convertit l'erreur en dictionnaire pour sérialisation JSON."""
data = asdict(self)
# Convertir les enums en strings
data['error_type'] = self.error_type.value
data['severity'] = self.severity.value
# Convertir le timestamp en ISO string
data['timestamp'] = self.timestamp.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'VWBActionError':
"""Crée une instance depuis un dictionnaire."""
# Convertir les strings en enums
data['error_type'] = VWBErrorType(data['error_type'])
data['severity'] = VWBErrorSeverity(data['severity'])
# Convertir le timestamp
if isinstance(data['timestamp'], str):
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
return cls(**data)
def to_json(self) -> str:
"""Sérialise l'erreur en JSON."""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'VWBActionError':
"""Désérialise depuis JSON."""
data = json.loads(json_str)
return cls.from_dict(data)
def is_retryable(self) -> bool:
"""Détermine si l'erreur permet un retry."""
non_retryable_types = {
VWBErrorType.PARAMETER_INVALID,
VWBErrorType.ANCHOR_INVALID,
VWBErrorType.PERMISSION_DENIED,
VWBErrorType.CONFIG_ERROR,
VWBErrorType.DEPENDENCY_MISSING
}
return (
self.retry_possible and
self.error_type not in non_retryable_types and
self.severity not in {VWBErrorSeverity.FATAL}
)
def get_user_friendly_message(self) -> str:
"""Retourne un message convivial pour l'utilisateur."""
friendly_messages = {
VWBErrorType.SCREEN_CAPTURE_FAILED: "Impossible de capturer l'écran",
VWBErrorType.ELEMENT_NOT_FOUND: "Élément non trouvé sur l'écran",
VWBErrorType.CLICK_FAILED: "Impossible de cliquer sur l'élément",
VWBErrorType.TYPE_TEXT_FAILED: "Impossible de saisir le texte",
VWBErrorType.WAIT_TIMEOUT: "Délai d'attente dépassé",
VWBErrorType.VALIDATION_FAILED: "Validation échouée",
VWBErrorType.SYSTEM_ERROR: "Erreur système",
VWBErrorType.NETWORK_ERROR: "Erreur réseau",
VWBErrorType.PERMISSION_DENIED: "Permissions insuffisantes"
}
base_message = friendly_messages.get(self.error_type, self.message)
if self.severity == VWBErrorSeverity.CRITICAL:
return f"🚨 {base_message}"
elif self.severity == VWBErrorSeverity.ERROR:
return f"{base_message}"
elif self.severity == VWBErrorSeverity.WARNING:
return f"⚠️ {base_message}"
else:
return f" {base_message}"
def add_technical_detail(self, key: str, value: Any) -> None:
"""Ajoute un détail technique."""
self.technical_details[key] = value
def add_suggestion(self, suggestion: str) -> None:
"""Ajoute une suggestion de résolution."""
if suggestion not in self.suggestions:
self.suggestions.append(suggestion)
def __str__(self) -> str:
"""Représentation string de l'erreur."""
return f"VWBActionError({self.error_type.value}, {self.severity.value}): {self.message}"
def __repr__(self) -> str:
"""Représentation détaillée de l'erreur."""
return (
f"VWBActionError("
f"error_id='{self.error_id}', "
f"error_type={self.error_type.value}, "
f"severity={self.severity.value}, "
f"action_id='{self.action_id}', "
f"message='{self.message}'"
f")"
)
def create_vwb_error(
error_type: VWBErrorType,
message: str,
action_id: str,
step_id: str,
severity: VWBErrorSeverity = VWBErrorSeverity.ERROR,
**kwargs
) -> VWBActionError:
"""
Fonction utilitaire pour créer rapidement une erreur VWB.
Args:
error_type: Type d'erreur
message: Message d'erreur
action_id: ID de l'action
step_id: ID de l'étape
severity: Gravité (ERROR par défaut)
**kwargs: Paramètres additionnels
Returns:
Instance de VWBActionError
"""
return VWBActionError(
error_id=kwargs.get('error_id', ''),
error_type=error_type,
severity=severity,
message=message,
description=kwargs.get('description', message),
action_id=action_id,
step_id=step_id,
workflow_id=kwargs.get('workflow_id'),
timestamp=kwargs.get('timestamp', datetime.now()),
execution_time_ms=kwargs.get('execution_time_ms'),
technical_details=kwargs.get('technical_details', {}),
stack_trace=kwargs.get('stack_trace'),
suggestions=kwargs.get('suggestions', []),
retry_possible=kwargs.get('retry_possible', True),
user_id=kwargs.get('user_id'),
session_id=kwargs.get('session_id'),
environment=kwargs.get('environment', 'development')
)

View File

@@ -0,0 +1,375 @@
"""
Contrat de Données VWB - Evidence d'Exécution
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Ce module définit les contrats pour les preuves d'exécution (Evidence) des actions
VisionOnly dans le Visual Workflow Builder.
Classes :
- VWBEvidenceType : Types d'evidence possibles
- VWBEvidence : Contrat principal pour les preuves d'exécution
"""
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, List
from datetime import datetime
import json
import base64
from pathlib import Path
class VWBEvidenceType(Enum):
"""Types d'evidence possibles dans le VWB."""
# Evidence visuelles
SCREENSHOT_BEFORE = "screenshot_before" # Capture avant action
SCREENSHOT_AFTER = "screenshot_after" # Capture après action
SCREENSHOT_ERROR = "screenshot_error" # Capture lors d'erreur
ELEMENT_HIGHLIGHT = "element_highlight" # Élément surligné
# Evidence d'interaction
CLICK_EVIDENCE = "click_evidence" # Preuve de clic
TYPE_EVIDENCE = "type_evidence" # Preuve de saisie
WAIT_EVIDENCE = "wait_evidence" # Preuve d'attente
# Evidence de validation
VALIDATION_SUCCESS = "validation_success" # Validation réussie
VALIDATION_FAILURE = "validation_failure" # Validation échouée
# Evidence système
SYSTEM_STATE = "system_state" # État système
PERFORMANCE_METRICS = "performance_metrics" # Métriques de performance
# Evidence de débogage
DEBUG_INFO = "debug_info" # Informations de débogage
LOG_ENTRY = "log_entry" # Entrée de log
@dataclass
class VWBEvidence:
"""
Contrat de données pour les preuves d'exécution VWB.
Cette classe encapsule toutes les informations nécessaires pour
documenter et tracer l'exécution des actions VisionOnly dans le
Visual Workflow Builder.
"""
# Identification de l'evidence
evidence_id: str
evidence_type: VWBEvidenceType
# Contexte d'exécution
action_id: str
step_id: str
# Informations temporelles
timestamp: datetime
# Contenu de l'evidence
title: str
description: str
# Données structurées
data: Dict[str, Any]
# Liens vers autres evidence
related_evidence_ids: List[str]
# Paramètres optionnels avec valeurs par défaut
workflow_id: Optional[str] = None
execution_time_ms: Optional[float] = None
screenshot_base64: Optional[str] = None
screenshot_width: Optional[int] = None
screenshot_height: Optional[int] = None
highlight_box: Optional[Dict[str, int]] = None # {x, y, width, height}
success: bool = True
confidence_score: Optional[float] = None
user_id: Optional[str] = None
session_id: Optional[str] = None
parent_evidence_id: Optional[str] = None
def __post_init__(self):
"""Validation et initialisation post-création."""
if not self.evidence_id:
self.evidence_id = f"ev_{self.action_id}_{int(self.timestamp.timestamp())}"
if not self.data:
self.data = {}
if not self.related_evidence_ids:
self.related_evidence_ids = []
# Validation des dimensions screenshot
if self.screenshot_base64:
if not self.screenshot_width or not self.screenshot_height:
self._extract_screenshot_dimensions()
def _extract_screenshot_dimensions(self):
"""Extrait les dimensions du screenshot depuis les données base64."""
try:
if self.screenshot_base64:
from PIL import Image
import io
# Décoder le base64
image_data = base64.b64decode(self.screenshot_base64)
image = Image.open(io.BytesIO(image_data))
self.screenshot_width = image.width
self.screenshot_height = image.height
except Exception:
# En cas d'erreur, utiliser des valeurs par défaut
self.screenshot_width = 1920
self.screenshot_height = 1080
def to_dict(self) -> Dict[str, Any]:
"""Convertit l'evidence en dictionnaire pour sérialisation JSON."""
data = asdict(self)
# Convertir l'enum en string
data['evidence_type'] = self.evidence_type.value
# Convertir le timestamp en ISO string
data['timestamp'] = self.timestamp.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'VWBEvidence':
"""Crée une instance depuis un dictionnaire."""
# Convertir le string en enum
data['evidence_type'] = VWBEvidenceType(data['evidence_type'])
# Convertir le timestamp
if isinstance(data['timestamp'], str):
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
return cls(**data)
def to_json(self) -> str:
"""Sérialise l'evidence en JSON."""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'VWBEvidence':
"""Désérialise depuis JSON."""
data = json.loads(json_str)
return cls.from_dict(data)
def has_screenshot(self) -> bool:
"""Vérifie si l'evidence contient un screenshot."""
return self.screenshot_base64 is not None and len(self.screenshot_base64) > 0
def has_highlight(self) -> bool:
"""Vérifie si l'evidence contient une zone surlignée."""
return (
self.highlight_box is not None and
all(key in self.highlight_box for key in ['x', 'y', 'width', 'height'])
)
def get_screenshot_data_url(self) -> Optional[str]:
"""Retourne l'URL data du screenshot pour affichage web."""
if not self.has_screenshot():
return None
# Déterminer le format (PNG par défaut)
format_prefix = "data:image/png;base64,"
if self.screenshot_base64.startswith('/9j/'): # JPEG magic bytes en base64
format_prefix = "data:image/jpeg;base64,"
return f"{format_prefix}{self.screenshot_base64}"
def save_screenshot(self, file_path: str) -> bool:
"""Sauvegarde le screenshot sur disque."""
if not self.has_screenshot():
return False
try:
image_data = base64.b64decode(self.screenshot_base64)
with open(file_path, 'wb') as f:
f.write(image_data)
return True
except Exception:
return False
def add_data(self, key: str, value: Any) -> None:
"""Ajoute une donnée à l'evidence."""
self.data[key] = value
def get_data(self, key: str, default: Any = None) -> Any:
"""Récupère une donnée de l'evidence."""
return self.data.get(key, default)
def set_highlight_box(self, x: int, y: int, width: int, height: int) -> None:
"""Définit la zone de surbrillance."""
self.highlight_box = {
'x': max(0, x),
'y': max(0, y),
'width': max(1, width),
'height': max(1, height)
}
def add_related_evidence(self, evidence_id: str) -> None:
"""Ajoute une evidence liée."""
if evidence_id not in self.related_evidence_ids:
self.related_evidence_ids.append(evidence_id)
def get_file_size_mb(self) -> float:
"""Calcule la taille approximative en MB."""
size_bytes = 0
# Taille du screenshot
if self.screenshot_base64:
size_bytes += len(self.screenshot_base64.encode('utf-8'))
# Taille des données JSON
size_bytes += len(json.dumps(self.data).encode('utf-8'))
return size_bytes / (1024 * 1024)
def is_visual_evidence(self) -> bool:
"""Vérifie si c'est une evidence visuelle."""
visual_types = {
VWBEvidenceType.SCREENSHOT_BEFORE,
VWBEvidenceType.SCREENSHOT_AFTER,
VWBEvidenceType.SCREENSHOT_ERROR,
VWBEvidenceType.ELEMENT_HIGHLIGHT
}
return self.evidence_type in visual_types
def is_interaction_evidence(self) -> bool:
"""Vérifie si c'est une evidence d'interaction."""
interaction_types = {
VWBEvidenceType.CLICK_EVIDENCE,
VWBEvidenceType.TYPE_EVIDENCE,
VWBEvidenceType.WAIT_EVIDENCE
}
return self.evidence_type in interaction_types
def get_summary(self) -> Dict[str, Any]:
"""Retourne un résumé de l'evidence pour affichage."""
return {
'evidence_id': self.evidence_id,
'type': self.evidence_type.value,
'title': self.title,
'timestamp': self.timestamp.isoformat(),
'success': self.success,
'has_screenshot': self.has_screenshot(),
'has_highlight': self.has_highlight(),
'file_size_mb': round(self.get_file_size_mb(), 2),
'confidence_score': self.confidence_score
}
def __str__(self) -> str:
"""Représentation string de l'evidence."""
status = "" if self.success else ""
return f"{status} VWBEvidence({self.evidence_type.value}): {self.title}"
def __repr__(self) -> str:
"""Représentation détaillée de l'evidence."""
return (
f"VWBEvidence("
f"evidence_id='{self.evidence_id}', "
f"evidence_type={self.evidence_type.value}, "
f"action_id='{self.action_id}', "
f"success={self.success}, "
f"has_screenshot={self.has_screenshot()}"
f")"
)
def create_screenshot_evidence(
action_id: str,
step_id: str,
screenshot_base64: str,
evidence_type: VWBEvidenceType = VWBEvidenceType.SCREENSHOT_BEFORE,
title: str = "Capture d'écran",
**kwargs
) -> VWBEvidence:
"""
Fonction utilitaire pour créer rapidement une evidence de screenshot.
Args:
action_id: ID de l'action
step_id: ID de l'étape
screenshot_base64: Screenshot en base64
evidence_type: Type d'evidence
title: Titre de l'evidence
**kwargs: Paramètres additionnels
Returns:
Instance de VWBEvidence
"""
return VWBEvidence(
evidence_id=kwargs.get('evidence_id', ''),
evidence_type=evidence_type,
action_id=action_id,
step_id=step_id,
workflow_id=kwargs.get('workflow_id'),
timestamp=kwargs.get('timestamp', datetime.now()),
execution_time_ms=kwargs.get('execution_time_ms'),
title=title,
description=kwargs.get('description', title),
screenshot_base64=screenshot_base64,
screenshot_width=kwargs.get('screenshot_width'),
screenshot_height=kwargs.get('screenshot_height'),
highlight_box=kwargs.get('highlight_box'),
data=kwargs.get('data', {}),
success=kwargs.get('success', True),
confidence_score=kwargs.get('confidence_score'),
user_id=kwargs.get('user_id'),
session_id=kwargs.get('session_id'),
related_evidence_ids=kwargs.get('related_evidence_ids', []),
parent_evidence_id=kwargs.get('parent_evidence_id')
)
def create_interaction_evidence(
action_id: str,
step_id: str,
evidence_type: VWBEvidenceType,
title: str,
interaction_data: Dict[str, Any],
**kwargs
) -> VWBEvidence:
"""
Fonction utilitaire pour créer une evidence d'interaction.
Args:
action_id: ID de l'action
step_id: ID de l'étape
evidence_type: Type d'evidence d'interaction
title: Titre de l'evidence
interaction_data: Données de l'interaction
**kwargs: Paramètres additionnels
Returns:
Instance de VWBEvidence
"""
return VWBEvidence(
evidence_id=kwargs.get('evidence_id', ''),
evidence_type=evidence_type,
action_id=action_id,
step_id=step_id,
workflow_id=kwargs.get('workflow_id'),
timestamp=kwargs.get('timestamp', datetime.now()),
execution_time_ms=kwargs.get('execution_time_ms'),
title=title,
description=kwargs.get('description', title),
screenshot_base64=kwargs.get('screenshot_base64'),
screenshot_width=kwargs.get('screenshot_width'),
screenshot_height=kwargs.get('screenshot_height'),
highlight_box=kwargs.get('highlight_box'),
data=interaction_data,
success=kwargs.get('success', True),
confidence_score=kwargs.get('confidence_score'),
user_id=kwargs.get('user_id'),
session_id=kwargs.get('session_id'),
related_evidence_ids=kwargs.get('related_evidence_ids', []),
parent_evidence_id=kwargs.get('parent_evidence_id')
)

View File

@@ -0,0 +1,474 @@
"""
Contrat de Données VWB - Ancres Visuelles
Auteur : Dom, Alice, Kiro - 09 janvier 2026
Ce module définit les contrats pour les ancres visuelles utilisées dans les actions
VisionOnly du Visual Workflow Builder pour la sélection et l'identification
d'éléments UI.
Classes :
- VWBVisualAnchorType : Types d'ancres visuelles
- VWBVisualAnchor : Contrat principal pour les ancres visuelles
"""
from enum import Enum
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional, List, Tuple
from datetime import datetime
import json
import base64
import hashlib
class VWBVisualAnchorType(Enum):
"""Types d'ancres visuelles possibles dans le VWB."""
# Type générique (par défaut pour les captures VWB)
GENERIC = "generic" # Type par défaut, utilise template matching
# Ancres basées sur l'image
IMAGE_TEMPLATE = "image_template" # Template d'image exact
IMAGE_FUZZY = "image_fuzzy" # Template d'image avec tolérance
# Ancres basées sur le texte
TEXT_EXACT = "text_exact" # Texte exact
TEXT_PARTIAL = "text_partial" # Texte partiel
TEXT_REGEX = "text_regex" # Expression régulière
# Ancres basées sur la position
COORDINATES = "coordinates" # Coordonnées absolues
RELATIVE_POSITION = "relative_position" # Position relative à un autre élément
# Ancres basées sur les propriétés UI
UI_ELEMENT = "ui_element" # Élément UI spécifique
BUTTON = "button" # Bouton
INPUT_FIELD = "input_field" # Champ de saisie
DROPDOWN = "dropdown" # Liste déroulante
CHECKBOX = "checkbox" # Case à cocher
RADIO_BUTTON = "radio_button" # Bouton radio
# Ancres composites
MULTI_CRITERIA = "multi_criteria" # Plusieurs critères combinés
CONTEXTUAL = "contextual" # Basée sur le contexte
@dataclass
class VWBVisualAnchor:
"""
Contrat de données pour les ancres visuelles VWB.
Cette classe encapsule toutes les informations nécessaires pour
identifier et localiser des éléments UI dans les actions VisionOnly
du Visual Workflow Builder.
"""
# Identification de l'ancre
anchor_id: str
anchor_type: VWBVisualAnchorType
# Métadonnées de base
name: str
description: str
# Critères de recherche
search_criteria: Dict[str, Any]
# Contexte d'utilisation
created_by: str
created_at: datetime
# Paramètres optionnels avec valeurs par défaut
reference_image_base64: Optional[str] = None
reference_width: Optional[int] = None
reference_height: Optional[int] = None
bounding_box: Optional[Dict[str, int]] = None # {x, y, width, height}
confidence_threshold: float = 0.8
max_search_time_ms: int = 5000
retry_count: int = 3
visual_embedding: Optional[List[float]] = None
embedding_model: Optional[str] = None
last_used_at: Optional[datetime] = None
usage_count: int = 0
success_rate: float = 0.0
average_match_time_ms: float = 0.0
screen_resolution: Optional[Tuple[int, int]] = None
application_context: Optional[str] = None
is_active: bool = True
validation_hash: Optional[str] = None
def __post_init__(self):
"""Validation et initialisation post-création."""
if not self.anchor_id:
self.anchor_id = f"anchor_{self.name.lower().replace(' ', '_')}_{int(self.created_at.timestamp())}"
if not self.search_criteria:
self.search_criteria = {}
# Générer le hash de validation
self._update_validation_hash()
# Valider les dimensions de l'image de référence
if self.reference_image_base64:
if not self.reference_width or not self.reference_height:
self._extract_reference_dimensions()
def _extract_reference_dimensions(self):
"""Extrait les dimensions de l'image de référence."""
try:
if self.reference_image_base64:
from PIL import Image
import io
# Décoder le base64
image_data = base64.b64decode(self.reference_image_base64)
image = Image.open(io.BytesIO(image_data))
self.reference_width = image.width
self.reference_height = image.height
except Exception:
# En cas d'erreur, utiliser des valeurs par défaut
self.reference_width = 100
self.reference_height = 50
def _update_validation_hash(self):
"""Met à jour le hash de validation basé sur les critères principaux."""
hash_data = {
'anchor_type': self.anchor_type.value,
'search_criteria': self.search_criteria,
'bounding_box': self.bounding_box,
'confidence_threshold': self.confidence_threshold
}
hash_string = json.dumps(hash_data, sort_keys=True)
self.validation_hash = hashlib.md5(hash_string.encode()).hexdigest()
def to_dict(self) -> Dict[str, Any]:
"""Convertit l'ancre en dictionnaire pour sérialisation JSON."""
data = asdict(self)
# Convertir l'enum en string
data['anchor_type'] = self.anchor_type.value
# Convertir les timestamps en ISO strings
data['created_at'] = self.created_at.isoformat()
if self.last_used_at:
data['last_used_at'] = self.last_used_at.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'VWBVisualAnchor':
"""Crée une instance depuis un dictionnaire.
Gère les données partielles du frontend en fournissant des valeurs par défaut
pour les champs obligatoires manquants.
"""
# Copier pour ne pas modifier l'original
data = data.copy()
# Convertir le string en enum (avec fallback sur GENERIC)
anchor_type_val = data.get('anchor_type', 'generic')
if isinstance(anchor_type_val, str):
try:
data['anchor_type'] = VWBVisualAnchorType(anchor_type_val)
except ValueError:
data['anchor_type'] = VWBVisualAnchorType.GENERIC
elif not isinstance(anchor_type_val, VWBVisualAnchorType):
data['anchor_type'] = VWBVisualAnchorType.GENERIC
# Fournir des valeurs par défaut pour les champs obligatoires manquants
if 'anchor_id' not in data:
data['anchor_id'] = f"anchor_{datetime.now().timestamp()}"
if 'name' not in data:
data['name'] = data.get('description', 'Ancre visuelle')[:50]
if 'description' not in data:
data['description'] = 'Ancre visuelle VWB'
if 'search_criteria' not in data:
data['search_criteria'] = {}
if 'created_by' not in data:
data['created_by'] = 'vwb_frontend'
if 'created_at' not in data:
data['created_at'] = datetime.now()
# Convertir les timestamps string en datetime
if isinstance(data.get('created_at'), str):
try:
data['created_at'] = datetime.fromisoformat(data['created_at'])
except ValueError:
data['created_at'] = datetime.now()
if data.get('last_used_at') and isinstance(data['last_used_at'], str):
try:
data['last_used_at'] = datetime.fromisoformat(data['last_used_at'])
except ValueError:
data['last_used_at'] = None
# Filtrer les clés non reconnues par le dataclass
valid_fields = {
'anchor_id', 'anchor_type', 'name', 'description', 'search_criteria',
'created_by', 'created_at', 'reference_image_base64', 'reference_width',
'reference_height', 'bounding_box', 'confidence_threshold', 'max_search_time_ms',
'retry_count', 'visual_embedding', 'embedding_model', 'last_used_at',
'usage_count', 'success_rate', 'average_match_time_ms', 'screen_resolution',
'application_context', 'is_active', 'validation_hash'
}
filtered_data = {k: v for k, v in data.items() if k in valid_fields}
return cls(**filtered_data)
def to_json(self) -> str:
"""Sérialise l'ancre en JSON."""
return json.dumps(self.to_dict(), ensure_ascii=False, indent=2)
@classmethod
def from_json(cls, json_str: str) -> 'VWBVisualAnchor':
"""Désérialise depuis JSON."""
data = json.loads(json_str)
return cls.from_dict(data)
def has_reference_image(self) -> bool:
"""Vérifie si l'ancre a une image de référence."""
return self.reference_image_base64 is not None and len(self.reference_image_base64) > 0
def has_bounding_box(self) -> bool:
"""Vérifie si l'ancre a une bounding box définie."""
return (
self.bounding_box is not None and
all(key in self.bounding_box for key in ['x', 'y', 'width', 'height'])
)
def has_visual_embedding(self) -> bool:
"""Vérifie si l'ancre a un embedding visuel."""
return self.visual_embedding is not None and len(self.visual_embedding) > 0
def get_reference_data_url(self) -> Optional[str]:
"""Retourne l'URL data de l'image de référence pour affichage web."""
if not self.has_reference_image():
return None
# Déterminer le format (PNG par défaut)
format_prefix = "data:image/png;base64,"
if self.reference_image_base64.startswith('/9j/'): # JPEG magic bytes en base64
format_prefix = "data:image/jpeg;base64,"
return f"{format_prefix}{self.reference_image_base64}"
def update_usage_stats(self, match_time_ms: float, success: bool):
"""Met à jour les statistiques d'utilisation."""
self.usage_count += 1
self.last_used_at = datetime.now()
# Mettre à jour le temps moyen de matching
if self.average_match_time_ms == 0:
self.average_match_time_ms = match_time_ms
else:
# Moyenne mobile
self.average_match_time_ms = (
(self.average_match_time_ms * (self.usage_count - 1) + match_time_ms) /
self.usage_count
)
# Mettre à jour le taux de succès
if success:
success_count = int(self.success_rate * (self.usage_count - 1)) + 1
else:
success_count = int(self.success_rate * (self.usage_count - 1))
self.success_rate = success_count / self.usage_count
def is_reliable(self) -> bool:
"""Détermine si l'ancre est fiable basé sur les statistiques."""
return (
self.usage_count >= 3 and
self.success_rate >= 0.8 and
self.average_match_time_ms <= self.max_search_time_ms
)
def needs_optimization(self) -> bool:
"""Détermine si l'ancre a besoin d'optimisation."""
return (
self.usage_count >= 5 and
(self.success_rate < 0.7 or self.average_match_time_ms > self.max_search_time_ms * 0.8)
)
def add_search_criterion(self, key: str, value: Any):
"""Ajoute un critère de recherche."""
self.search_criteria[key] = value
self._update_validation_hash()
def remove_search_criterion(self, key: str):
"""Supprime un critère de recherche."""
if key in self.search_criteria:
del self.search_criteria[key]
self._update_validation_hash()
def set_bounding_box(self, x: int, y: int, width: int, height: int):
"""Définit la bounding box."""
self.bounding_box = {
'x': max(0, x),
'y': max(0, y),
'width': max(1, width),
'height': max(1, height)
}
self._update_validation_hash()
def set_visual_embedding(self, embedding: List[float], model: str):
"""Définit l'embedding visuel."""
self.visual_embedding = embedding
self.embedding_model = model
def is_compatible_with_resolution(self, width: int, height: int) -> bool:
"""Vérifie si l'ancre est compatible avec une résolution donnée."""
if not self.screen_resolution:
return True # Pas de contrainte de résolution
# Tolérance de 10% sur les dimensions
tolerance = 0.1
ref_width, ref_height = self.screen_resolution
width_ok = abs(width - ref_width) / ref_width <= tolerance
height_ok = abs(height - ref_height) / ref_height <= tolerance
return width_ok and height_ok
def get_search_area(self, screen_width: int, screen_height: int) -> Optional[Dict[str, int]]:
"""Calcule la zone de recherche sur l'écran actuel."""
if not self.has_bounding_box():
return None
# Si pas de résolution de référence, utiliser les coordonnées telles quelles
if not self.screen_resolution:
return self.bounding_box.copy()
# Adapter les coordonnées à la résolution actuelle
ref_width, ref_height = self.screen_resolution
scale_x = screen_width / ref_width
scale_y = screen_height / ref_height
return {
'x': int(self.bounding_box['x'] * scale_x),
'y': int(self.bounding_box['y'] * scale_y),
'width': int(self.bounding_box['width'] * scale_x),
'height': int(self.bounding_box['height'] * scale_y)
}
def validate_integrity(self) -> bool:
"""Valide l'intégrité de l'ancre."""
current_hash = self.validation_hash
self._update_validation_hash()
return current_hash == self.validation_hash
def get_summary(self) -> Dict[str, Any]:
"""Retourne un résumé de l'ancre pour affichage."""
return {
'anchor_id': self.anchor_id,
'name': self.name,
'type': self.anchor_type.value,
'confidence_threshold': self.confidence_threshold,
'usage_count': self.usage_count,
'success_rate': round(self.success_rate, 2),
'average_match_time_ms': round(self.average_match_time_ms, 1),
'is_reliable': self.is_reliable(),
'needs_optimization': self.needs_optimization(),
'has_reference_image': self.has_reference_image(),
'has_embedding': self.has_visual_embedding(),
'is_active': self.is_active
}
def __str__(self) -> str:
"""Représentation string de l'ancre."""
status = "🟢" if self.is_reliable() else "🟡" if self.is_active else "🔴"
return f"{status} VWBVisualAnchor({self.anchor_type.value}): {self.name}"
def __repr__(self) -> str:
"""Représentation détaillée de l'ancre."""
return (
f"VWBVisualAnchor("
f"anchor_id='{self.anchor_id}', "
f"anchor_type={self.anchor_type.value}, "
f"name='{self.name}', "
f"success_rate={self.success_rate:.2f}, "
f"usage_count={self.usage_count}"
f")"
)
def create_image_anchor(
name: str,
reference_image_base64: str,
created_by: str,
bounding_box: Optional[Dict[str, int]] = None,
confidence_threshold: float = 0.8,
**kwargs
) -> VWBVisualAnchor:
"""
Fonction utilitaire pour créer une ancre basée sur une image.
Args:
name: Nom de l'ancre
reference_image_base64: Image de référence en base64
created_by: Créateur de l'ancre
bounding_box: Zone de capture
confidence_threshold: Seuil de confiance
**kwargs: Paramètres additionnels
Returns:
Instance de VWBVisualAnchor
"""
return VWBVisualAnchor(
anchor_id=kwargs.get('anchor_id', ''),
anchor_type=VWBVisualAnchorType.IMAGE_TEMPLATE,
name=name,
description=kwargs.get('description', f"Ancre image: {name}"),
reference_image_base64=reference_image_base64,
bounding_box=bounding_box,
search_criteria=kwargs.get('search_criteria', {}),
confidence_threshold=confidence_threshold,
max_search_time_ms=kwargs.get('max_search_time_ms', 5000),
retry_count=kwargs.get('retry_count', 3),
visual_embedding=kwargs.get('visual_embedding'),
embedding_model=kwargs.get('embedding_model'),
created_by=created_by,
created_at=kwargs.get('created_at', datetime.now()),
screen_resolution=kwargs.get('screen_resolution'),
application_context=kwargs.get('application_context')
)
def create_text_anchor(
name: str,
text_pattern: str,
created_by: str,
anchor_type: VWBVisualAnchorType = VWBVisualAnchorType.TEXT_EXACT,
**kwargs
) -> VWBVisualAnchor:
"""
Fonction utilitaire pour créer une ancre basée sur du texte.
Args:
name: Nom de l'ancre
text_pattern: Motif de texte à rechercher
created_by: Créateur de l'ancre
anchor_type: Type d'ancre texte
**kwargs: Paramètres additionnels
Returns:
Instance de VWBVisualAnchor
"""
search_criteria = {'text_pattern': text_pattern}
search_criteria.update(kwargs.get('search_criteria', {}))
return VWBVisualAnchor(
anchor_id=kwargs.get('anchor_id', ''),
anchor_type=anchor_type,
name=name,
description=kwargs.get('description', f"Ancre texte: {name}"),
search_criteria=search_criteria,
confidence_threshold=kwargs.get('confidence_threshold', 0.9),
max_search_time_ms=kwargs.get('max_search_time_ms', 3000),
retry_count=kwargs.get('retry_count', 2),
created_by=created_by,
created_at=kwargs.get('created_at', datetime.now()),
application_context=kwargs.get('application_context')
)

View File

@@ -0,0 +1,18 @@
"""
Database Module - RPA Vision v3
Source de vérité unique pour les workflows VWB
"""
from .models import db, Workflow, Step, VisualAnchor, Execution, ExecutionStep
from .models import init_db, get_db_session
__all__ = [
'db',
'Workflow',
'Step',
'VisualAnchor',
'Execution',
'ExecutionStep',
'init_db',
'get_db_session'
]

View File

@@ -0,0 +1,306 @@
"""
Modèles SQLAlchemy - RPA Vision v3
Source de vérité unique pour les workflows VWB
Auteur: Dom, Alice, Kiro - 23 janvier 2026
"""
from flask_sqlalchemy import SQLAlchemy
from datetime import datetime
from typing import Dict, Any, List, Optional
import json
# Instance SQLAlchemy partagée
db = SQLAlchemy()
class Workflow(db.Model):
"""Workflow VWB - Conteneur d'étapes ordonnées"""
__tablename__ = 'workflows'
id = db.Column(db.String(64), primary_key=True)
name = db.Column(db.String(255), nullable=False)
description = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
is_active = db.Column(db.Boolean, default=True)
# Relations
steps = db.relationship('Step', backref='workflow', lazy='dynamic',
order_by='Step.order', cascade='all, delete-orphan')
executions = db.relationship('Execution', backref='workflow', lazy='dynamic',
cascade='all, delete-orphan')
def to_dict(self) -> Dict[str, Any]:
"""Sérialise le workflow complet"""
return {
'id': self.id,
'name': self.name,
'description': self.description,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'steps': [step.to_dict() for step in self.steps.order_by(Step.order).all()],
'step_count': self.steps.count()
}
def __repr__(self):
return f'<Workflow {self.id}: {self.name}>'
class Step(db.Model):
"""Étape d'un workflow - Action VWB avec type et paramètres"""
__tablename__ = 'steps'
id = db.Column(db.String(64), primary_key=True)
workflow_id = db.Column(db.String(64), db.ForeignKey('workflows.id'), nullable=False)
# Type d'action - SOURCE DE VÉRITÉ UNIQUE
action_type = db.Column(db.String(64), nullable=False)
# Ordre dans le workflow (0-indexed)
order = db.Column(db.Integer, nullable=False, default=0)
# Position sur le canvas (pour l'affichage)
position_x = db.Column(db.Float, default=0)
position_y = db.Column(db.Float, default=0)
# Paramètres de l'action (JSON)
parameters_json = db.Column(db.Text, default='{}')
# Référence vers l'ancre visuelle (si applicable)
anchor_id = db.Column(db.String(64), db.ForeignKey('visual_anchors.id'), nullable=True)
# Métadonnées
label = db.Column(db.String(255), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relations
anchor = db.relationship('VisualAnchor', backref='steps')
@property
def parameters(self) -> Dict[str, Any]:
"""Retourne les paramètres comme dict"""
try:
return json.loads(self.parameters_json) if self.parameters_json else {}
except json.JSONDecodeError:
return {}
@parameters.setter
def parameters(self, value: Dict[str, Any]):
"""Stocke les paramètres comme JSON"""
self.parameters_json = json.dumps(value) if value else '{}'
def to_dict(self) -> Dict[str, Any]:
"""Sérialise l'étape"""
return {
'id': self.id,
'workflow_id': self.workflow_id,
'action_type': self.action_type, # SOURCE DE VÉRITÉ
'order': self.order,
'position': {'x': self.position_x, 'y': self.position_y},
'parameters': self.parameters,
'anchor_id': self.anchor_id,
'anchor': self.anchor.to_dict() if self.anchor else None,
'label': self.label or self.action_type,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<Step {self.id}: {self.action_type} (order={self.order})>'
class VisualAnchor(db.Model):
"""Ancre visuelle - Image de référence pour localiser un élément UI"""
__tablename__ = 'visual_anchors'
id = db.Column(db.String(64), primary_key=True)
# Image de référence (chemin fichier, pas base64 en DB)
image_path = db.Column(db.String(512), nullable=True)
thumbnail_path = db.Column(db.String(512), nullable=True)
# Bounding box de la sélection
bbox_x = db.Column(db.Float, nullable=True)
bbox_y = db.Column(db.Float, nullable=True)
bbox_width = db.Column(db.Float, nullable=True)
bbox_height = db.Column(db.Float, nullable=True)
# Résolution de l'écran lors de la capture
screen_width = db.Column(db.Integer, nullable=True)
screen_height = db.Column(db.Integer, nullable=True)
# Description pour l'utilisateur
description = db.Column(db.Text, nullable=True)
# Seuil de confiance pour la détection
confidence_threshold = db.Column(db.Float, default=0.8)
# Métadonnées
created_at = db.Column(db.DateTime, default=datetime.utcnow)
capture_method = db.Column(db.String(64), default='screen_capture')
def to_dict(self) -> Dict[str, Any]:
"""Sérialise l'ancre visuelle"""
return {
'id': self.id,
'image_url': f'/api/v3/anchor/{self.id}/image' if self.image_path else None,
'thumbnail_url': f'/api/v3/anchor/{self.id}/thumbnail' if self.thumbnail_path else None,
'bounding_box': {
'x': self.bbox_x,
'y': self.bbox_y,
'width': self.bbox_width,
'height': self.bbox_height
} if self.bbox_x is not None else None,
'screen_resolution': {
'width': self.screen_width,
'height': self.screen_height
} if self.screen_width else None,
'description': self.description,
'confidence_threshold': self.confidence_threshold,
'created_at': self.created_at.isoformat() if self.created_at else None
}
def __repr__(self):
return f'<VisualAnchor {self.id}>'
class Execution(db.Model):
"""Exécution d'un workflow - Historique et état"""
__tablename__ = 'executions'
id = db.Column(db.String(64), primary_key=True)
workflow_id = db.Column(db.String(64), db.ForeignKey('workflows.id'), nullable=False)
# État de l'exécution
status = db.Column(db.String(32), default='pending') # pending, running, paused, completed, error, cancelled
# Timestamps
started_at = db.Column(db.DateTime, nullable=True)
ended_at = db.Column(db.DateTime, nullable=True)
# Progression
current_step_index = db.Column(db.Integer, default=0)
total_steps = db.Column(db.Integer, default=0)
# Résumé
completed_steps = db.Column(db.Integer, default=0)
failed_steps = db.Column(db.Integer, default=0)
# Erreur globale (si applicable)
error_message = db.Column(db.Text, nullable=True)
# Relations
step_results = db.relationship('ExecutionStep', backref='execution', lazy='dynamic',
cascade='all, delete-orphan')
def to_dict(self) -> Dict[str, Any]:
"""Sérialise l'exécution"""
return {
'id': self.id,
'workflow_id': self.workflow_id,
'status': self.status,
'started_at': self.started_at.isoformat() if self.started_at else None,
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
'current_step_index': self.current_step_index,
'total_steps': self.total_steps,
'completed_steps': self.completed_steps,
'failed_steps': self.failed_steps,
'error_message': self.error_message,
'progress': (self.current_step_index / self.total_steps * 100) if self.total_steps > 0 else 0,
'step_results': [sr.to_dict() for sr in self.step_results.all()]
}
def __repr__(self):
return f'<Execution {self.id}: {self.status}>'
class ExecutionStep(db.Model):
"""Résultat d'exécution d'une étape"""
__tablename__ = 'execution_steps'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
execution_id = db.Column(db.String(64), db.ForeignKey('executions.id'), nullable=False)
step_id = db.Column(db.String(64), nullable=False)
# Résultat
status = db.Column(db.String(32), default='pending') # pending, running, success, error, skipped
# Timestamps
started_at = db.Column(db.DateTime, nullable=True)
ended_at = db.Column(db.DateTime, nullable=True)
duration_ms = db.Column(db.Integer, nullable=True)
# Erreur (si applicable)
error_message = db.Column(db.Text, nullable=True)
# Evidence (chemin vers screenshot)
evidence_path = db.Column(db.String(512), nullable=True)
# Output data (JSON)
output_json = db.Column(db.Text, default='{}')
@property
def output(self) -> Dict[str, Any]:
try:
return json.loads(self.output_json) if self.output_json else {}
except json.JSONDecodeError:
return {}
@output.setter
def output(self, value: Dict[str, Any]):
self.output_json = json.dumps(value) if value else '{}'
def to_dict(self) -> Dict[str, Any]:
return {
'step_id': self.step_id,
'status': self.status,
'started_at': self.started_at.isoformat() if self.started_at else None,
'ended_at': self.ended_at.isoformat() if self.ended_at else None,
'duration_ms': self.duration_ms,
'error_message': self.error_message,
'evidence_url': f'/api/v3/evidence/{self.id}' if self.evidence_path else None,
'output': self.output
}
# Session active (en mémoire, pas en DB)
class SessionState:
"""État de la session utilisateur (en mémoire)"""
def __init__(self):
self.active_workflow_id: Optional[str] = None
self.selected_step_id: Optional[str] = None
self.active_execution_id: Optional[str] = None
self.last_capture: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
return {
'active_workflow_id': self.active_workflow_id,
'selected_step_id': self.selected_step_id,
'active_execution_id': self.active_execution_id,
'has_capture': self.last_capture is not None
}
# Instance globale de session
_session_state = SessionState()
def get_session_state() -> SessionState:
"""Retourne l'état de session global"""
return _session_state
def init_db(app):
"""Initialise la base de données avec l'application Flask"""
db.init_app(app)
with app.app_context():
db.create_all()
print("✅ [DB] Base de données SQLite initialisée")
def get_db_session():
"""Retourne la session DB actuelle"""
return db.session