- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
731 lines
29 KiB
Python
731 lines
29 KiB
Python
"""
|
|
Visual to WorkflowGraph Converter - Visual Workflow Builder
|
|
|
|
Convertit les workflows visuels en WorkflowGraph exécutables.
|
|
|
|
Exigences: 6.1, 18.1
|
|
"""
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from datetime import datetime
|
|
|
|
# Ajouter le chemin racine pour importer les modules core
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
|
|
|
|
from core.models.workflow_graph import (
|
|
Workflow,
|
|
WorkflowNode,
|
|
WorkflowEdge,
|
|
Action,
|
|
TargetSpec,
|
|
ScreenTemplate,
|
|
WindowConstraint,
|
|
TextConstraint,
|
|
UIConstraint,
|
|
EmbeddingPrototype,
|
|
EdgeConstraints,
|
|
PostConditions,
|
|
EdgeStats,
|
|
SafetyRules,
|
|
WorkflowStats,
|
|
LearningConfig
|
|
)
|
|
|
|
from models.visual_workflow import (
|
|
VisualWorkflow,
|
|
VisualNode,
|
|
VisualEdge
|
|
)
|
|
|
|
try:
|
|
from .self_healing_converter import get_self_healing_converter
|
|
SELF_HEALING_AVAILABLE = True
|
|
except ImportError:
|
|
SELF_HEALING_AVAILABLE = False
|
|
def get_self_healing_converter():
|
|
return None
|
|
|
|
|
|
class ConversionError(Exception):
|
|
"""Erreur lors de la conversion"""
|
|
pass
|
|
|
|
|
|
class VisualToGraphConverter:
|
|
"""
|
|
Convertisseur de workflows visuels en WorkflowGraph.
|
|
|
|
Exigences: 6.1, 18.1
|
|
"""
|
|
|
|
# Mapping des types de nodes visuels vers les types d'actions
|
|
NODE_TYPE_TO_ACTION = {
|
|
'click': 'mouse_click',
|
|
'type': 'text_input',
|
|
'wait': 'wait',
|
|
'navigate': 'navigate',
|
|
'extract': 'extract_data',
|
|
'variable': 'set_variable',
|
|
'condition': 'evaluate_condition',
|
|
'loop': 'execute_loop',
|
|
'validate': 'key_press',
|
|
'scroll': 'scroll',
|
|
'screenshot': 'screenshot',
|
|
'transform': 'transform_data',
|
|
'api': 'api_call',
|
|
'database': 'database_query',
|
|
'start': 'workflow_start',
|
|
'end': 'workflow_end'
|
|
}
|
|
|
|
# Types de nodes de logique
|
|
LOGIC_NODE_TYPES = {'condition', 'loop'}
|
|
|
|
def __init__(self):
|
|
"""Initialise le convertisseur"""
|
|
self.errors: List[str] = []
|
|
self.warnings: List[str] = []
|
|
|
|
def convert(self, visual_workflow: VisualWorkflow) -> Workflow:
|
|
"""
|
|
Convertit un VisualWorkflow en Workflow exécutable.
|
|
|
|
Args:
|
|
visual_workflow: Le workflow visuel à convertir
|
|
|
|
Returns:
|
|
Workflow exécutable
|
|
|
|
Raises:
|
|
ConversionError: Si la conversion échoue
|
|
"""
|
|
self.errors = []
|
|
self.warnings = []
|
|
|
|
# Valider la structure avant conversion
|
|
validation_errors = visual_workflow.validate()
|
|
if validation_errors:
|
|
raise ConversionError(f"Workflow invalide: {', '.join(validation_errors)}")
|
|
|
|
# Vérifier qu'il y a au moins un node
|
|
if not visual_workflow.nodes:
|
|
raise ConversionError("Le workflow ne contient aucun node")
|
|
|
|
# Convertir les nodes
|
|
workflow_nodes = self._convert_nodes(visual_workflow)
|
|
|
|
# Convertir les edges
|
|
workflow_edges = self._convert_edges(visual_workflow, workflow_nodes)
|
|
|
|
# Déterminer les nodes d'entrée et de sortie
|
|
entry_nodes, end_nodes = self._determine_entry_exit_nodes(
|
|
visual_workflow, workflow_nodes, workflow_edges
|
|
)
|
|
|
|
# Détecter et configurer les boucles et conditions
|
|
loops, conditionals = self._detect_logic_structures(
|
|
visual_workflow, workflow_nodes, workflow_edges
|
|
)
|
|
|
|
# Créer le workflow
|
|
workflow = Workflow(
|
|
workflow_id=visual_workflow.id,
|
|
name=visual_workflow.name,
|
|
description=visual_workflow.description or "",
|
|
version=int(visual_workflow.version.split('.')[0]), # "1.0.0" -> 1
|
|
learning_state="OBSERVATION",
|
|
created_at=visual_workflow.created_at,
|
|
updated_at=visual_workflow.updated_at,
|
|
entry_nodes=entry_nodes,
|
|
end_nodes=end_nodes,
|
|
nodes=workflow_nodes,
|
|
edges=workflow_edges,
|
|
safety_rules=self._create_safety_rules(visual_workflow),
|
|
stats=WorkflowStats(),
|
|
learning=LearningConfig(),
|
|
loops=loops,
|
|
conditionals=conditionals,
|
|
metadata={
|
|
'created_by': visual_workflow.created_by,
|
|
'tags': visual_workflow.tags,
|
|
'category': visual_workflow.category,
|
|
'is_template': visual_workflow.is_template,
|
|
'source': 'visual_workflow_builder'
|
|
}
|
|
)
|
|
|
|
# Intégrer les paramètres Self-Healing au niveau workflow
|
|
if SELF_HEALING_AVAILABLE:
|
|
self_healing_converter = get_self_healing_converter()
|
|
if self_healing_converter:
|
|
workflow = self_healing_converter.convert_workflow_settings(visual_workflow, workflow)
|
|
|
|
return workflow
|
|
|
|
def _detect_logic_structures(
|
|
self,
|
|
visual_workflow: VisualWorkflow,
|
|
workflow_nodes: List[WorkflowNode],
|
|
workflow_edges: List[WorkflowEdge]
|
|
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
|
|
"""
|
|
Détecte et configure les structures de logique (boucles et conditions).
|
|
|
|
Exigences: 8.1, 8.2, 9.1, 9.2
|
|
"""
|
|
loops = {}
|
|
conditionals = {}
|
|
|
|
for node in workflow_nodes:
|
|
visual_type = node.metadata.get('visual_type')
|
|
parameters = node.metadata.get('parameters', {})
|
|
|
|
if visual_type == 'condition':
|
|
# Configuration d'un node conditionnel (Exigences 8.1, 8.2)
|
|
conditionals[node.node_id] = {
|
|
'expression': parameters.get('expression', ''),
|
|
'true_branch': self._find_branch_target(node.node_id, 'true', workflow_edges),
|
|
'false_branch': self._find_branch_target(node.node_id, 'false', workflow_edges),
|
|
'metadata': {
|
|
'visual_position': node.metadata.get('visual_position'),
|
|
'condition_type': parameters.get('type', 'expression')
|
|
}
|
|
}
|
|
|
|
elif visual_type == 'loop':
|
|
# Configuration d'un node de boucle (Exigences 9.1, 9.2)
|
|
loop_type = parameters.get('type', 'repeat')
|
|
loop_config = {
|
|
'loop_type': loop_type,
|
|
'body_nodes': self._find_loop_body(node.node_id, workflow_edges),
|
|
'exit_node': self._find_loop_exit(node.node_id, workflow_edges),
|
|
'metadata': {
|
|
'visual_position': node.metadata.get('visual_position')
|
|
}
|
|
}
|
|
|
|
# Ajouter les paramètres spécifiques au type de boucle
|
|
if loop_type == 'repeat':
|
|
loop_config['count'] = parameters.get('count', 1)
|
|
elif loop_type == 'while':
|
|
loop_config['condition'] = parameters.get('condition', '')
|
|
loop_config['max_iterations'] = parameters.get('max_iterations', 100)
|
|
elif loop_type == 'for-each':
|
|
loop_config['collection'] = parameters.get('collection', '')
|
|
loop_config['item_variable'] = parameters.get('item_variable', 'item')
|
|
|
|
loops[node.node_id] = loop_config
|
|
|
|
return loops, conditionals
|
|
|
|
def _find_branch_target(
|
|
self,
|
|
node_id: str,
|
|
branch_type: str,
|
|
edges: List[WorkflowEdge]
|
|
) -> Optional[str]:
|
|
"""Trouve le node cible d'une branche de condition"""
|
|
for edge in edges:
|
|
if edge.from_node == node_id:
|
|
# Vérifier le port source ou les métadonnées
|
|
source_port = edge.metadata.get('source_port', '')
|
|
if branch_type in source_port.lower():
|
|
return edge.to_node
|
|
|
|
# Vérifier les pre-conditions
|
|
if 'condition_result' in edge.constraints.pre_conditions:
|
|
expected_result = branch_type == 'true'
|
|
if edge.constraints.pre_conditions['condition_result'] == expected_result:
|
|
return edge.to_node
|
|
|
|
return None
|
|
|
|
def _find_loop_body(
|
|
self,
|
|
loop_node_id: str,
|
|
edges: List[WorkflowEdge]
|
|
) -> List[str]:
|
|
"""Trouve les nodes du corps de la boucle"""
|
|
body_nodes = []
|
|
|
|
# Trouver le premier node du corps (edge avec port 'body' ou 'loop')
|
|
for edge in edges:
|
|
if edge.from_node == loop_node_id:
|
|
source_port = edge.metadata.get('source_port', '')
|
|
if 'body' in source_port.lower() or 'loop' in source_port.lower():
|
|
body_nodes.append(edge.to_node)
|
|
# TODO: Suivre le graphe pour trouver tous les nodes du corps
|
|
break
|
|
|
|
return body_nodes
|
|
|
|
def _find_loop_exit(
|
|
self,
|
|
loop_node_id: str,
|
|
edges: List[WorkflowEdge]
|
|
) -> Optional[str]:
|
|
"""Trouve le node de sortie de la boucle"""
|
|
for edge in edges:
|
|
if edge.from_node == loop_node_id:
|
|
source_port = edge.metadata.get('source_port', '')
|
|
# Chercher le port de sortie (exit, out_exit, etc.)
|
|
if 'exit' in source_port.lower():
|
|
return edge.to_node
|
|
# Si pas de port body, c'est probablement la sortie
|
|
if 'body' not in source_port.lower() and 'loop' not in source_port.lower():
|
|
# Vérifier si ce n'est pas déjà le corps
|
|
body_targets = self._find_loop_body(loop_node_id, edges)
|
|
if edge.to_node not in body_targets:
|
|
return edge.to_node
|
|
|
|
return None
|
|
|
|
def _convert_nodes(self, visual_workflow: VisualWorkflow) -> List[WorkflowNode]:
|
|
"""Convertit les nodes visuels en WorkflowNodes"""
|
|
workflow_nodes = []
|
|
|
|
for vnode in visual_workflow.nodes:
|
|
try:
|
|
wnode = self._convert_node(vnode)
|
|
workflow_nodes.append(wnode)
|
|
except Exception as e:
|
|
self.errors.append(f"Erreur conversion node {vnode.id}: {str(e)}")
|
|
|
|
if self.errors:
|
|
raise ConversionError(f"Erreurs lors de la conversion des nodes: {', '.join(self.errors)}")
|
|
|
|
return workflow_nodes
|
|
|
|
def _convert_node(self, vnode: VisualNode) -> WorkflowNode:
|
|
"""Convertit un VisualNode en WorkflowNode"""
|
|
|
|
# Créer un template d'écran basique
|
|
# Dans une vraie implémentation, on utiliserait les embeddings et contraintes
|
|
template = ScreenTemplate(
|
|
window=WindowConstraint(),
|
|
text=TextConstraint(),
|
|
ui=UIConstraint(),
|
|
embedding=EmbeddingPrototype(
|
|
provider="visual_workflow_builder",
|
|
vector_id=f"node_{vnode.id}",
|
|
min_cosine_similarity=0.85,
|
|
sample_count=0
|
|
)
|
|
)
|
|
|
|
# Créer le WorkflowNode
|
|
wnode = WorkflowNode(
|
|
node_id=vnode.id,
|
|
name=vnode.label or vnode.type,
|
|
description=vnode.description or f"Node de type {vnode.type}",
|
|
template=template,
|
|
is_entry=False, # Sera déterminé plus tard
|
|
is_end=False, # Sera déterminé plus tard
|
|
metadata={
|
|
'visual_type': vnode.type,
|
|
'visual_position': vnode.position.to_dict(),
|
|
'visual_size': vnode.size.to_dict(),
|
|
'parameters': vnode.parameters,
|
|
'color': vnode.color
|
|
}
|
|
)
|
|
|
|
# Intégrer la configuration Self-Healing
|
|
if SELF_HEALING_AVAILABLE:
|
|
self_healing_converter = get_self_healing_converter()
|
|
if self_healing_converter:
|
|
wnode = self_healing_converter.convert_node_config(vnode, wnode)
|
|
|
|
return wnode
|
|
|
|
def _convert_edges(
|
|
self,
|
|
visual_workflow: VisualWorkflow,
|
|
workflow_nodes: List[WorkflowNode]
|
|
) -> List[WorkflowEdge]:
|
|
"""Convertit les edges visuels en WorkflowEdges"""
|
|
workflow_edges = []
|
|
|
|
# Créer un mapping node_id -> node pour validation
|
|
node_map = {node.node_id: node for node in workflow_nodes}
|
|
|
|
for vedge in visual_workflow.edges:
|
|
try:
|
|
# Vérifier que les nodes source et target existent
|
|
if vedge.source not in node_map:
|
|
raise ConversionError(f"Node source {vedge.source} introuvable")
|
|
if vedge.target not in node_map:
|
|
raise ConversionError(f"Node target {vedge.target} introuvable")
|
|
|
|
source_node = node_map[vedge.source]
|
|
target_node = node_map[vedge.target]
|
|
|
|
# Créer l'action basée sur le type du node source
|
|
action = self._create_action_from_node(source_node, visual_workflow)
|
|
|
|
# Créer les contraintes (avec gestion des conditions)
|
|
constraints = self._create_edge_constraints(vedge, source_node)
|
|
|
|
# Créer les post-conditions
|
|
post_conditions = PostConditions(
|
|
expected_node=target_node.node_id,
|
|
timeout_ms=3000
|
|
)
|
|
|
|
# Créer le WorkflowEdge
|
|
wedge = WorkflowEdge(
|
|
edge_id=vedge.id,
|
|
from_node=vedge.source,
|
|
to_node=vedge.target,
|
|
action=action,
|
|
constraints=constraints,
|
|
post_conditions=post_conditions,
|
|
stats=EdgeStats(),
|
|
metadata={
|
|
'visual_condition': vedge.condition.to_dict() if vedge.condition else None,
|
|
'visual_style': vedge.style.to_dict() if vedge.style else None,
|
|
'source_port': vedge.source_port,
|
|
'target_port': vedge.target_port
|
|
}
|
|
)
|
|
|
|
workflow_edges.append(wedge)
|
|
|
|
except Exception as e:
|
|
self.errors.append(f"Erreur conversion edge {vedge.id}: {str(e)}")
|
|
|
|
if self.errors:
|
|
raise ConversionError(f"Erreurs lors de la conversion des edges: {', '.join(self.errors)}")
|
|
|
|
return workflow_edges
|
|
|
|
def _create_edge_constraints(
|
|
self,
|
|
vedge: VisualEdge,
|
|
source_node: WorkflowNode
|
|
) -> EdgeConstraints:
|
|
"""Crée les contraintes d'edge avec support des conditions"""
|
|
|
|
constraints = EdgeConstraints(
|
|
required_confidence=0.8,
|
|
max_wait_time_ms=5000
|
|
)
|
|
|
|
# Si le node source est une condition, ajouter la condition à l'edge
|
|
visual_type = source_node.metadata.get('visual_type')
|
|
if visual_type == 'condition':
|
|
# Déterminer si c'est la branche true ou false basé sur le port
|
|
source_port = vedge.source_port
|
|
if 'true' in source_port.lower() or source_port == 'out_true':
|
|
constraints.pre_conditions['condition_result'] = True
|
|
elif 'false' in source_port.lower() or source_port == 'out_false':
|
|
constraints.pre_conditions['condition_result'] = False
|
|
|
|
# Si l'edge a une condition explicite, l'ajouter
|
|
if vedge.condition:
|
|
if vedge.condition.type == 'expression' and vedge.condition.expression:
|
|
constraints.pre_conditions['expression'] = vedge.condition.expression
|
|
elif vedge.condition.type in ['success', 'failure']:
|
|
constraints.pre_conditions['execution_status'] = vedge.condition.type
|
|
|
|
return constraints
|
|
|
|
def _create_action_from_node(
|
|
self,
|
|
node: WorkflowNode,
|
|
visual_workflow: VisualWorkflow
|
|
) -> Action:
|
|
"""Crée une Action basée sur le type et les paramètres du node"""
|
|
|
|
visual_type = node.metadata.get('visual_type', 'unknown')
|
|
parameters = node.metadata.get('parameters', {})
|
|
|
|
# Déterminer le type d'action
|
|
action_type = self.NODE_TYPE_TO_ACTION.get(visual_type, 'mouse_click')
|
|
|
|
# Créer le TargetSpec
|
|
target_spec = self._create_target_spec(visual_type, parameters)
|
|
|
|
# Créer les paramètres d'action
|
|
action_params = self._create_action_parameters(visual_type, parameters, visual_workflow)
|
|
|
|
return Action(
|
|
type=action_type,
|
|
target=target_spec,
|
|
parameters=action_params
|
|
)
|
|
|
|
def _create_target_spec(self, node_type: str, parameters: Dict[str, Any]) -> TargetSpec:
|
|
"""Crée un TargetSpec basé sur les paramètres du node"""
|
|
|
|
# Extraire les informations de cible
|
|
target_info = parameters.get('target', {})
|
|
|
|
# Si target est une string, c'est un sélecteur simple
|
|
if isinstance(target_info, str):
|
|
return TargetSpec(
|
|
by_text=target_info,
|
|
selection_policy="first"
|
|
)
|
|
|
|
# Si target est un dict, extraire les détails
|
|
if isinstance(target_info, dict):
|
|
return TargetSpec(
|
|
by_role=target_info.get('role'),
|
|
by_text=target_info.get('text'),
|
|
by_position=tuple(target_info['position']) if 'position' in target_info else None,
|
|
selection_policy=target_info.get('selection_policy', 'first')
|
|
)
|
|
|
|
# Par défaut, créer un target générique
|
|
return TargetSpec(
|
|
by_role="button",
|
|
selection_policy="first"
|
|
)
|
|
|
|
def _create_action_parameters(
|
|
self,
|
|
node_type: str,
|
|
parameters: Dict[str, Any],
|
|
visual_workflow: VisualWorkflow
|
|
) -> Dict[str, Any]:
|
|
"""Crée les paramètres d'action avec substitution de variables"""
|
|
|
|
action_params = {}
|
|
|
|
if node_type == 'click':
|
|
# Pour les actions de clic
|
|
action_params['click_type'] = parameters.get('click_type', 'left')
|
|
action_params['timeout_ms'] = parameters.get('timeout', 5000)
|
|
action_params['retries'] = parameters.get('retries', 3)
|
|
action_params['wait_after_ms'] = parameters.get('wait_after', 500)
|
|
|
|
elif node_type == 'type':
|
|
# Pour les actions de saisie de texte
|
|
text = parameters.get('text', '')
|
|
text = self._substitute_variables(text, visual_workflow)
|
|
action_params['text'] = text
|
|
action_params['clear_first'] = parameters.get('clear_first', False)
|
|
action_params['typing_speed'] = parameters.get('typing_speed', 'normal')
|
|
action_params['press_enter'] = parameters.get('press_enter', False)
|
|
|
|
elif node_type == 'wait':
|
|
# Pour les actions d'attente
|
|
duration = parameters.get('duration', 1000)
|
|
action_params['duration_ms'] = int(duration)
|
|
action_params['wait_type'] = parameters.get('wait_type', 'fixed')
|
|
|
|
elif node_type == 'navigate':
|
|
# Pour la navigation
|
|
url = parameters.get('url', '')
|
|
url = self._substitute_variables(url, visual_workflow)
|
|
action_params['url'] = url
|
|
action_params['wait_for_load'] = parameters.get('wait_for_load', True)
|
|
action_params['timeout_ms'] = parameters.get('timeout', 10000)
|
|
|
|
elif node_type == 'validate':
|
|
# Pour la validation (touche Entrée)
|
|
action_params['key'] = 'Return'
|
|
action_params['validation_type'] = parameters.get('validation_type', 'exists')
|
|
action_params['expected_text'] = parameters.get('expected_text', '')
|
|
|
|
elif node_type == 'scroll':
|
|
# Pour le défilement
|
|
action_params['direction'] = parameters.get('direction', 'down')
|
|
action_params['amount'] = parameters.get('amount', 3)
|
|
|
|
elif node_type == 'screenshot':
|
|
# Pour les captures d'écran
|
|
action_params['filename'] = parameters.get('filename', '')
|
|
action_params['full_screen'] = not parameters.get('region')
|
|
|
|
elif node_type == 'extract':
|
|
# Pour l'extraction de données
|
|
variable_name = parameters.get('variable', '')
|
|
action_params['variable_name'] = variable_name
|
|
action_params['extraction_type'] = parameters.get('extraction_type', 'text')
|
|
action_params['attribute_name'] = parameters.get('attribute_name', '')
|
|
|
|
elif node_type == 'variable':
|
|
# Pour la définition de variables
|
|
var_name = parameters.get('name', '')
|
|
var_value = parameters.get('value', '')
|
|
var_value = self._substitute_variables(str(var_value), visual_workflow)
|
|
action_params['variable_name'] = var_name
|
|
action_params['variable_value'] = var_value
|
|
action_params['variable_type'] = parameters.get('variable_type', 'string')
|
|
|
|
elif node_type == 'transform':
|
|
# Pour la transformation de données
|
|
action_params['transformation_type'] = parameters.get('transformation_type', 'format')
|
|
action_params['input_variable'] = parameters.get('input_variable', '')
|
|
action_params['output_variable'] = parameters.get('output_variable', '')
|
|
action_params['transformation_rule'] = parameters.get('transformation_rule', '')
|
|
|
|
elif node_type == 'api':
|
|
# Pour les appels API
|
|
action_params['method'] = parameters.get('method', 'GET')
|
|
action_params['url'] = self._substitute_variables(parameters.get('url', ''), visual_workflow)
|
|
action_params['headers'] = parameters.get('headers', {})
|
|
action_params['body'] = parameters.get('body', '')
|
|
action_params['response_variable'] = parameters.get('response_variable', '')
|
|
|
|
elif node_type == 'database':
|
|
# Pour les requêtes base de données
|
|
action_params['connection_string'] = parameters.get('connection_string', '')
|
|
action_params['query'] = self._substitute_variables(parameters.get('query', ''), visual_workflow)
|
|
action_params['result_variable'] = parameters.get('result_variable', '')
|
|
|
|
elif node_type == 'condition':
|
|
# Pour les conditions (Exigences 8.1, 8.2, 8.5)
|
|
expression = parameters.get('expression', '')
|
|
expression = self._substitute_variables(expression, visual_workflow)
|
|
action_params['expression'] = expression
|
|
action_params['condition_type'] = parameters.get('condition_type', 'expression')
|
|
|
|
# Valider la syntaxe de l'expression (Exigence 8.5)
|
|
validation_result = self._validate_expression(expression)
|
|
if not validation_result['valid']:
|
|
self.warnings.append(
|
|
f"Expression de condition potentiellement invalide: {expression} - {validation_result['message']}"
|
|
)
|
|
|
|
elif node_type == 'loop':
|
|
# Pour les boucles (Exigences 9.1, 9.2, 9.5)
|
|
loop_type = parameters.get('type', 'repeat') # for-each, while, repeat
|
|
action_params['loop_type'] = loop_type
|
|
|
|
if loop_type == 'repeat':
|
|
# Boucle avec nombre d'itérations fixe
|
|
count = parameters.get('count', 1)
|
|
action_params['count'] = int(count)
|
|
|
|
elif loop_type == 'while':
|
|
# Boucle avec condition
|
|
condition = parameters.get('condition', '')
|
|
condition = self._substitute_variables(condition, visual_workflow)
|
|
action_params['condition'] = condition
|
|
action_params['max_iterations'] = parameters.get('max_iterations', 100)
|
|
|
|
elif loop_type == 'for-each':
|
|
# Boucle sur une collection
|
|
collection = parameters.get('collection', '')
|
|
collection = self._substitute_variables(collection, visual_workflow)
|
|
action_params['collection'] = collection
|
|
action_params['item_variable'] = parameters.get('item_variable', 'item')
|
|
|
|
action_params['max_iterations'] = parameters.get('max_iterations', 100)
|
|
|
|
return action_params
|
|
|
|
def _validate_expression(self, expression: str) -> Dict[str, Any]:
|
|
"""
|
|
Valide la syntaxe d'une expression de condition.
|
|
|
|
Exigence: 8.5
|
|
"""
|
|
# Validation basique - dans une vraie implémentation, on utiliserait un parser
|
|
if not expression or not expression.strip():
|
|
return {'valid': False, 'message': 'Expression vide'}
|
|
|
|
# Vérifier les opérateurs de base
|
|
valid_operators = ['==', '!=', '<', '>', '<=', '>=', 'and', 'or', 'not', 'in']
|
|
has_operator = any(op in expression for op in valid_operators)
|
|
|
|
if not has_operator:
|
|
return {'valid': False, 'message': 'Aucun opérateur de comparaison trouvé'}
|
|
|
|
# Vérifier les parenthèses équilibrées
|
|
if expression.count('(') != expression.count(')'):
|
|
return {'valid': False, 'message': 'Parenthèses non équilibrées'}
|
|
|
|
return {'valid': True, 'message': 'OK'}
|
|
|
|
def _substitute_variables(self, text: str, visual_workflow: VisualWorkflow) -> str:
|
|
"""Substitue les références de variables ${var} dans le texte"""
|
|
|
|
# Pour l'instant, on garde les références telles quelles
|
|
# L'exécution fera la substitution réelle
|
|
return text
|
|
|
|
def _determine_entry_exit_nodes(
|
|
self,
|
|
visual_workflow: VisualWorkflow,
|
|
workflow_nodes: List[WorkflowNode],
|
|
workflow_edges: List[WorkflowEdge]
|
|
) -> Tuple[List[str], List[str]]:
|
|
"""Détermine les nodes d'entrée et de sortie du workflow"""
|
|
|
|
# Créer des sets pour les nodes avec edges entrants/sortants
|
|
nodes_with_incoming = {edge.to_node for edge in workflow_edges}
|
|
nodes_with_outgoing = {edge.from_node for edge in workflow_edges}
|
|
|
|
# Entry nodes = nodes sans edges entrants
|
|
entry_nodes = [
|
|
node.node_id for node in workflow_nodes
|
|
if node.node_id not in nodes_with_incoming
|
|
]
|
|
|
|
# End nodes = nodes sans edges sortants
|
|
end_nodes = [
|
|
node.node_id for node in workflow_nodes
|
|
if node.node_id not in nodes_with_outgoing
|
|
]
|
|
|
|
# Si pas de entry nodes, prendre le premier node
|
|
if not entry_nodes and workflow_nodes:
|
|
entry_nodes = [workflow_nodes[0].node_id]
|
|
self.warnings.append("Aucun node d'entrée détecté, utilisation du premier node")
|
|
|
|
# Si pas de end nodes, prendre le dernier node
|
|
if not end_nodes and workflow_nodes:
|
|
end_nodes = [workflow_nodes[-1].node_id]
|
|
self.warnings.append("Aucun node de sortie détecté, utilisation du dernier node")
|
|
|
|
# Marquer les nodes
|
|
for node in workflow_nodes:
|
|
if node.node_id in entry_nodes:
|
|
node.is_entry = True
|
|
if node.node_id in end_nodes:
|
|
node.is_end = True
|
|
|
|
return entry_nodes, end_nodes
|
|
|
|
def _create_safety_rules(self, visual_workflow: VisualWorkflow) -> SafetyRules:
|
|
"""Crée les règles de sécurité basées sur les settings du workflow"""
|
|
|
|
settings = visual_workflow.settings
|
|
|
|
return SafetyRules(
|
|
require_confirmation_for=[],
|
|
forbidden_windows=[],
|
|
execution_timeout_minutes=settings.timeout // 60000 if settings.timeout > 0 else 0
|
|
)
|
|
|
|
def get_errors(self) -> List[str]:
|
|
"""Retourne les erreurs de conversion"""
|
|
return self.errors
|
|
|
|
def get_warnings(self) -> List[str]:
|
|
"""Retourne les avertissements de conversion"""
|
|
return self.warnings
|
|
|
|
|
|
def convert_visual_to_graph(visual_workflow: VisualWorkflow) -> Workflow:
|
|
"""
|
|
Fonction utilitaire pour convertir un workflow visuel.
|
|
|
|
Args:
|
|
visual_workflow: Le workflow visuel à convertir
|
|
|
|
Returns:
|
|
Workflow exécutable
|
|
|
|
Raises:
|
|
ConversionError: Si la conversion échoue
|
|
"""
|
|
converter = VisualToGraphConverter()
|
|
return converter.convert(visual_workflow)
|