Files
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

731 lines
29 KiB
Python

"""
Visual to WorkflowGraph Converter - Visual Workflow Builder
Convertit les workflows visuels en WorkflowGraph exécutables.
Exigences: 6.1, 18.1
"""
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime
# Ajouter le chemin racine pour importer les modules core
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
from core.models.workflow_graph import (
Workflow,
WorkflowNode,
WorkflowEdge,
Action,
TargetSpec,
ScreenTemplate,
WindowConstraint,
TextConstraint,
UIConstraint,
EmbeddingPrototype,
EdgeConstraints,
PostConditions,
EdgeStats,
SafetyRules,
WorkflowStats,
LearningConfig
)
from models.visual_workflow import (
VisualWorkflow,
VisualNode,
VisualEdge
)
try:
from .self_healing_converter import get_self_healing_converter
SELF_HEALING_AVAILABLE = True
except ImportError:
SELF_HEALING_AVAILABLE = False
def get_self_healing_converter():
return None
class ConversionError(Exception):
"""Erreur lors de la conversion"""
pass
class VisualToGraphConverter:
"""
Convertisseur de workflows visuels en WorkflowGraph.
Exigences: 6.1, 18.1
"""
# Mapping des types de nodes visuels vers les types d'actions
NODE_TYPE_TO_ACTION = {
'click': 'mouse_click',
'type': 'text_input',
'wait': 'wait',
'navigate': 'navigate',
'extract': 'extract_data',
'variable': 'set_variable',
'condition': 'evaluate_condition',
'loop': 'execute_loop',
'validate': 'key_press',
'scroll': 'scroll',
'screenshot': 'screenshot',
'transform': 'transform_data',
'api': 'api_call',
'database': 'database_query',
'start': 'workflow_start',
'end': 'workflow_end'
}
# Types de nodes de logique
LOGIC_NODE_TYPES = {'condition', 'loop'}
def __init__(self):
"""Initialise le convertisseur"""
self.errors: List[str] = []
self.warnings: List[str] = []
def convert(self, visual_workflow: VisualWorkflow) -> Workflow:
"""
Convertit un VisualWorkflow en Workflow exécutable.
Args:
visual_workflow: Le workflow visuel à convertir
Returns:
Workflow exécutable
Raises:
ConversionError: Si la conversion échoue
"""
self.errors = []
self.warnings = []
# Valider la structure avant conversion
validation_errors = visual_workflow.validate()
if validation_errors:
raise ConversionError(f"Workflow invalide: {', '.join(validation_errors)}")
# Vérifier qu'il y a au moins un node
if not visual_workflow.nodes:
raise ConversionError("Le workflow ne contient aucun node")
# Convertir les nodes
workflow_nodes = self._convert_nodes(visual_workflow)
# Convertir les edges
workflow_edges = self._convert_edges(visual_workflow, workflow_nodes)
# Déterminer les nodes d'entrée et de sortie
entry_nodes, end_nodes = self._determine_entry_exit_nodes(
visual_workflow, workflow_nodes, workflow_edges
)
# Détecter et configurer les boucles et conditions
loops, conditionals = self._detect_logic_structures(
visual_workflow, workflow_nodes, workflow_edges
)
# Créer le workflow
workflow = Workflow(
workflow_id=visual_workflow.id,
name=visual_workflow.name,
description=visual_workflow.description or "",
version=int(visual_workflow.version.split('.')[0]), # "1.0.0" -> 1
learning_state="OBSERVATION",
created_at=visual_workflow.created_at,
updated_at=visual_workflow.updated_at,
entry_nodes=entry_nodes,
end_nodes=end_nodes,
nodes=workflow_nodes,
edges=workflow_edges,
safety_rules=self._create_safety_rules(visual_workflow),
stats=WorkflowStats(),
learning=LearningConfig(),
loops=loops,
conditionals=conditionals,
metadata={
'created_by': visual_workflow.created_by,
'tags': visual_workflow.tags,
'category': visual_workflow.category,
'is_template': visual_workflow.is_template,
'source': 'visual_workflow_builder'
}
)
# Intégrer les paramètres Self-Healing au niveau workflow
if SELF_HEALING_AVAILABLE:
self_healing_converter = get_self_healing_converter()
if self_healing_converter:
workflow = self_healing_converter.convert_workflow_settings(visual_workflow, workflow)
return workflow
def _detect_logic_structures(
self,
visual_workflow: VisualWorkflow,
workflow_nodes: List[WorkflowNode],
workflow_edges: List[WorkflowEdge]
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
"""
Détecte et configure les structures de logique (boucles et conditions).
Exigences: 8.1, 8.2, 9.1, 9.2
"""
loops = {}
conditionals = {}
for node in workflow_nodes:
visual_type = node.metadata.get('visual_type')
parameters = node.metadata.get('parameters', {})
if visual_type == 'condition':
# Configuration d'un node conditionnel (Exigences 8.1, 8.2)
conditionals[node.node_id] = {
'expression': parameters.get('expression', ''),
'true_branch': self._find_branch_target(node.node_id, 'true', workflow_edges),
'false_branch': self._find_branch_target(node.node_id, 'false', workflow_edges),
'metadata': {
'visual_position': node.metadata.get('visual_position'),
'condition_type': parameters.get('type', 'expression')
}
}
elif visual_type == 'loop':
# Configuration d'un node de boucle (Exigences 9.1, 9.2)
loop_type = parameters.get('type', 'repeat')
loop_config = {
'loop_type': loop_type,
'body_nodes': self._find_loop_body(node.node_id, workflow_edges),
'exit_node': self._find_loop_exit(node.node_id, workflow_edges),
'metadata': {
'visual_position': node.metadata.get('visual_position')
}
}
# Ajouter les paramètres spécifiques au type de boucle
if loop_type == 'repeat':
loop_config['count'] = parameters.get('count', 1)
elif loop_type == 'while':
loop_config['condition'] = parameters.get('condition', '')
loop_config['max_iterations'] = parameters.get('max_iterations', 100)
elif loop_type == 'for-each':
loop_config['collection'] = parameters.get('collection', '')
loop_config['item_variable'] = parameters.get('item_variable', 'item')
loops[node.node_id] = loop_config
return loops, conditionals
def _find_branch_target(
self,
node_id: str,
branch_type: str,
edges: List[WorkflowEdge]
) -> Optional[str]:
"""Trouve le node cible d'une branche de condition"""
for edge in edges:
if edge.from_node == node_id:
# Vérifier le port source ou les métadonnées
source_port = edge.metadata.get('source_port', '')
if branch_type in source_port.lower():
return edge.to_node
# Vérifier les pre-conditions
if 'condition_result' in edge.constraints.pre_conditions:
expected_result = branch_type == 'true'
if edge.constraints.pre_conditions['condition_result'] == expected_result:
return edge.to_node
return None
def _find_loop_body(
self,
loop_node_id: str,
edges: List[WorkflowEdge]
) -> List[str]:
"""Trouve les nodes du corps de la boucle"""
body_nodes = []
# Trouver le premier node du corps (edge avec port 'body' ou 'loop')
for edge in edges:
if edge.from_node == loop_node_id:
source_port = edge.metadata.get('source_port', '')
if 'body' in source_port.lower() or 'loop' in source_port.lower():
body_nodes.append(edge.to_node)
# TODO: Suivre le graphe pour trouver tous les nodes du corps
break
return body_nodes
def _find_loop_exit(
self,
loop_node_id: str,
edges: List[WorkflowEdge]
) -> Optional[str]:
"""Trouve le node de sortie de la boucle"""
for edge in edges:
if edge.from_node == loop_node_id:
source_port = edge.metadata.get('source_port', '')
# Chercher le port de sortie (exit, out_exit, etc.)
if 'exit' in source_port.lower():
return edge.to_node
# Si pas de port body, c'est probablement la sortie
if 'body' not in source_port.lower() and 'loop' not in source_port.lower():
# Vérifier si ce n'est pas déjà le corps
body_targets = self._find_loop_body(loop_node_id, edges)
if edge.to_node not in body_targets:
return edge.to_node
return None
def _convert_nodes(self, visual_workflow: VisualWorkflow) -> List[WorkflowNode]:
"""Convertit les nodes visuels en WorkflowNodes"""
workflow_nodes = []
for vnode in visual_workflow.nodes:
try:
wnode = self._convert_node(vnode)
workflow_nodes.append(wnode)
except Exception as e:
self.errors.append(f"Erreur conversion node {vnode.id}: {str(e)}")
if self.errors:
raise ConversionError(f"Erreurs lors de la conversion des nodes: {', '.join(self.errors)}")
return workflow_nodes
def _convert_node(self, vnode: VisualNode) -> WorkflowNode:
"""Convertit un VisualNode en WorkflowNode"""
# Créer un template d'écran basique
# Dans une vraie implémentation, on utiliserait les embeddings et contraintes
template = ScreenTemplate(
window=WindowConstraint(),
text=TextConstraint(),
ui=UIConstraint(),
embedding=EmbeddingPrototype(
provider="visual_workflow_builder",
vector_id=f"node_{vnode.id}",
min_cosine_similarity=0.85,
sample_count=0
)
)
# Créer le WorkflowNode
wnode = WorkflowNode(
node_id=vnode.id,
name=vnode.label or vnode.type,
description=vnode.description or f"Node de type {vnode.type}",
template=template,
is_entry=False, # Sera déterminé plus tard
is_end=False, # Sera déterminé plus tard
metadata={
'visual_type': vnode.type,
'visual_position': vnode.position.to_dict(),
'visual_size': vnode.size.to_dict(),
'parameters': vnode.parameters,
'color': vnode.color
}
)
# Intégrer la configuration Self-Healing
if SELF_HEALING_AVAILABLE:
self_healing_converter = get_self_healing_converter()
if self_healing_converter:
wnode = self_healing_converter.convert_node_config(vnode, wnode)
return wnode
def _convert_edges(
self,
visual_workflow: VisualWorkflow,
workflow_nodes: List[WorkflowNode]
) -> List[WorkflowEdge]:
"""Convertit les edges visuels en WorkflowEdges"""
workflow_edges = []
# Créer un mapping node_id -> node pour validation
node_map = {node.node_id: node for node in workflow_nodes}
for vedge in visual_workflow.edges:
try:
# Vérifier que les nodes source et target existent
if vedge.source not in node_map:
raise ConversionError(f"Node source {vedge.source} introuvable")
if vedge.target not in node_map:
raise ConversionError(f"Node target {vedge.target} introuvable")
source_node = node_map[vedge.source]
target_node = node_map[vedge.target]
# Créer l'action basée sur le type du node source
action = self._create_action_from_node(source_node, visual_workflow)
# Créer les contraintes (avec gestion des conditions)
constraints = self._create_edge_constraints(vedge, source_node)
# Créer les post-conditions
post_conditions = PostConditions(
expected_node=target_node.node_id,
timeout_ms=3000
)
# Créer le WorkflowEdge
wedge = WorkflowEdge(
edge_id=vedge.id,
from_node=vedge.source,
to_node=vedge.target,
action=action,
constraints=constraints,
post_conditions=post_conditions,
stats=EdgeStats(),
metadata={
'visual_condition': vedge.condition.to_dict() if vedge.condition else None,
'visual_style': vedge.style.to_dict() if vedge.style else None,
'source_port': vedge.source_port,
'target_port': vedge.target_port
}
)
workflow_edges.append(wedge)
except Exception as e:
self.errors.append(f"Erreur conversion edge {vedge.id}: {str(e)}")
if self.errors:
raise ConversionError(f"Erreurs lors de la conversion des edges: {', '.join(self.errors)}")
return workflow_edges
def _create_edge_constraints(
self,
vedge: VisualEdge,
source_node: WorkflowNode
) -> EdgeConstraints:
"""Crée les contraintes d'edge avec support des conditions"""
constraints = EdgeConstraints(
required_confidence=0.8,
max_wait_time_ms=5000
)
# Si le node source est une condition, ajouter la condition à l'edge
visual_type = source_node.metadata.get('visual_type')
if visual_type == 'condition':
# Déterminer si c'est la branche true ou false basé sur le port
source_port = vedge.source_port
if 'true' in source_port.lower() or source_port == 'out_true':
constraints.pre_conditions['condition_result'] = True
elif 'false' in source_port.lower() or source_port == 'out_false':
constraints.pre_conditions['condition_result'] = False
# Si l'edge a une condition explicite, l'ajouter
if vedge.condition:
if vedge.condition.type == 'expression' and vedge.condition.expression:
constraints.pre_conditions['expression'] = vedge.condition.expression
elif vedge.condition.type in ['success', 'failure']:
constraints.pre_conditions['execution_status'] = vedge.condition.type
return constraints
def _create_action_from_node(
self,
node: WorkflowNode,
visual_workflow: VisualWorkflow
) -> Action:
"""Crée une Action basée sur le type et les paramètres du node"""
visual_type = node.metadata.get('visual_type', 'unknown')
parameters = node.metadata.get('parameters', {})
# Déterminer le type d'action
action_type = self.NODE_TYPE_TO_ACTION.get(visual_type, 'mouse_click')
# Créer le TargetSpec
target_spec = self._create_target_spec(visual_type, parameters)
# Créer les paramètres d'action
action_params = self._create_action_parameters(visual_type, parameters, visual_workflow)
return Action(
type=action_type,
target=target_spec,
parameters=action_params
)
def _create_target_spec(self, node_type: str, parameters: Dict[str, Any]) -> TargetSpec:
"""Crée un TargetSpec basé sur les paramètres du node"""
# Extraire les informations de cible
target_info = parameters.get('target', {})
# Si target est une string, c'est un sélecteur simple
if isinstance(target_info, str):
return TargetSpec(
by_text=target_info,
selection_policy="first"
)
# Si target est un dict, extraire les détails
if isinstance(target_info, dict):
return TargetSpec(
by_role=target_info.get('role'),
by_text=target_info.get('text'),
by_position=tuple(target_info['position']) if 'position' in target_info else None,
selection_policy=target_info.get('selection_policy', 'first')
)
# Par défaut, créer un target générique
return TargetSpec(
by_role="button",
selection_policy="first"
)
def _create_action_parameters(
self,
node_type: str,
parameters: Dict[str, Any],
visual_workflow: VisualWorkflow
) -> Dict[str, Any]:
"""Crée les paramètres d'action avec substitution de variables"""
action_params = {}
if node_type == 'click':
# Pour les actions de clic
action_params['click_type'] = parameters.get('click_type', 'left')
action_params['timeout_ms'] = parameters.get('timeout', 5000)
action_params['retries'] = parameters.get('retries', 3)
action_params['wait_after_ms'] = parameters.get('wait_after', 500)
elif node_type == 'type':
# Pour les actions de saisie de texte
text = parameters.get('text', '')
text = self._substitute_variables(text, visual_workflow)
action_params['text'] = text
action_params['clear_first'] = parameters.get('clear_first', False)
action_params['typing_speed'] = parameters.get('typing_speed', 'normal')
action_params['press_enter'] = parameters.get('press_enter', False)
elif node_type == 'wait':
# Pour les actions d'attente
duration = parameters.get('duration', 1000)
action_params['duration_ms'] = int(duration)
action_params['wait_type'] = parameters.get('wait_type', 'fixed')
elif node_type == 'navigate':
# Pour la navigation
url = parameters.get('url', '')
url = self._substitute_variables(url, visual_workflow)
action_params['url'] = url
action_params['wait_for_load'] = parameters.get('wait_for_load', True)
action_params['timeout_ms'] = parameters.get('timeout', 10000)
elif node_type == 'validate':
# Pour la validation (touche Entrée)
action_params['key'] = 'Return'
action_params['validation_type'] = parameters.get('validation_type', 'exists')
action_params['expected_text'] = parameters.get('expected_text', '')
elif node_type == 'scroll':
# Pour le défilement
action_params['direction'] = parameters.get('direction', 'down')
action_params['amount'] = parameters.get('amount', 3)
elif node_type == 'screenshot':
# Pour les captures d'écran
action_params['filename'] = parameters.get('filename', '')
action_params['full_screen'] = not parameters.get('region')
elif node_type == 'extract':
# Pour l'extraction de données
variable_name = parameters.get('variable', '')
action_params['variable_name'] = variable_name
action_params['extraction_type'] = parameters.get('extraction_type', 'text')
action_params['attribute_name'] = parameters.get('attribute_name', '')
elif node_type == 'variable':
# Pour la définition de variables
var_name = parameters.get('name', '')
var_value = parameters.get('value', '')
var_value = self._substitute_variables(str(var_value), visual_workflow)
action_params['variable_name'] = var_name
action_params['variable_value'] = var_value
action_params['variable_type'] = parameters.get('variable_type', 'string')
elif node_type == 'transform':
# Pour la transformation de données
action_params['transformation_type'] = parameters.get('transformation_type', 'format')
action_params['input_variable'] = parameters.get('input_variable', '')
action_params['output_variable'] = parameters.get('output_variable', '')
action_params['transformation_rule'] = parameters.get('transformation_rule', '')
elif node_type == 'api':
# Pour les appels API
action_params['method'] = parameters.get('method', 'GET')
action_params['url'] = self._substitute_variables(parameters.get('url', ''), visual_workflow)
action_params['headers'] = parameters.get('headers', {})
action_params['body'] = parameters.get('body', '')
action_params['response_variable'] = parameters.get('response_variable', '')
elif node_type == 'database':
# Pour les requêtes base de données
action_params['connection_string'] = parameters.get('connection_string', '')
action_params['query'] = self._substitute_variables(parameters.get('query', ''), visual_workflow)
action_params['result_variable'] = parameters.get('result_variable', '')
elif node_type == 'condition':
# Pour les conditions (Exigences 8.1, 8.2, 8.5)
expression = parameters.get('expression', '')
expression = self._substitute_variables(expression, visual_workflow)
action_params['expression'] = expression
action_params['condition_type'] = parameters.get('condition_type', 'expression')
# Valider la syntaxe de l'expression (Exigence 8.5)
validation_result = self._validate_expression(expression)
if not validation_result['valid']:
self.warnings.append(
f"Expression de condition potentiellement invalide: {expression} - {validation_result['message']}"
)
elif node_type == 'loop':
# Pour les boucles (Exigences 9.1, 9.2, 9.5)
loop_type = parameters.get('type', 'repeat') # for-each, while, repeat
action_params['loop_type'] = loop_type
if loop_type == 'repeat':
# Boucle avec nombre d'itérations fixe
count = parameters.get('count', 1)
action_params['count'] = int(count)
elif loop_type == 'while':
# Boucle avec condition
condition = parameters.get('condition', '')
condition = self._substitute_variables(condition, visual_workflow)
action_params['condition'] = condition
action_params['max_iterations'] = parameters.get('max_iterations', 100)
elif loop_type == 'for-each':
# Boucle sur une collection
collection = parameters.get('collection', '')
collection = self._substitute_variables(collection, visual_workflow)
action_params['collection'] = collection
action_params['item_variable'] = parameters.get('item_variable', 'item')
action_params['max_iterations'] = parameters.get('max_iterations', 100)
return action_params
def _validate_expression(self, expression: str) -> Dict[str, Any]:
"""
Valide la syntaxe d'une expression de condition.
Exigence: 8.5
"""
# Validation basique - dans une vraie implémentation, on utiliserait un parser
if not expression or not expression.strip():
return {'valid': False, 'message': 'Expression vide'}
# Vérifier les opérateurs de base
valid_operators = ['==', '!=', '<', '>', '<=', '>=', 'and', 'or', 'not', 'in']
has_operator = any(op in expression for op in valid_operators)
if not has_operator:
return {'valid': False, 'message': 'Aucun opérateur de comparaison trouvé'}
# Vérifier les parenthèses équilibrées
if expression.count('(') != expression.count(')'):
return {'valid': False, 'message': 'Parenthèses non équilibrées'}
return {'valid': True, 'message': 'OK'}
def _substitute_variables(self, text: str, visual_workflow: VisualWorkflow) -> str:
"""Substitue les références de variables ${var} dans le texte"""
# Pour l'instant, on garde les références telles quelles
# L'exécution fera la substitution réelle
return text
def _determine_entry_exit_nodes(
self,
visual_workflow: VisualWorkflow,
workflow_nodes: List[WorkflowNode],
workflow_edges: List[WorkflowEdge]
) -> Tuple[List[str], List[str]]:
"""Détermine les nodes d'entrée et de sortie du workflow"""
# Créer des sets pour les nodes avec edges entrants/sortants
nodes_with_incoming = {edge.to_node for edge in workflow_edges}
nodes_with_outgoing = {edge.from_node for edge in workflow_edges}
# Entry nodes = nodes sans edges entrants
entry_nodes = [
node.node_id for node in workflow_nodes
if node.node_id not in nodes_with_incoming
]
# End nodes = nodes sans edges sortants
end_nodes = [
node.node_id for node in workflow_nodes
if node.node_id not in nodes_with_outgoing
]
# Si pas de entry nodes, prendre le premier node
if not entry_nodes and workflow_nodes:
entry_nodes = [workflow_nodes[0].node_id]
self.warnings.append("Aucun node d'entrée détecté, utilisation du premier node")
# Si pas de end nodes, prendre le dernier node
if not end_nodes and workflow_nodes:
end_nodes = [workflow_nodes[-1].node_id]
self.warnings.append("Aucun node de sortie détecté, utilisation du dernier node")
# Marquer les nodes
for node in workflow_nodes:
if node.node_id in entry_nodes:
node.is_entry = True
if node.node_id in end_nodes:
node.is_end = True
return entry_nodes, end_nodes
def _create_safety_rules(self, visual_workflow: VisualWorkflow) -> SafetyRules:
"""Crée les règles de sécurité basées sur les settings du workflow"""
settings = visual_workflow.settings
return SafetyRules(
require_confirmation_for=[],
forbidden_windows=[],
execution_timeout_minutes=settings.timeout // 60000 if settings.timeout > 0 else 0
)
def get_errors(self) -> List[str]:
"""Retourne les erreurs de conversion"""
return self.errors
def get_warnings(self) -> List[str]:
"""Retourne les avertissements de conversion"""
return self.warnings
def convert_visual_to_graph(visual_workflow: VisualWorkflow) -> Workflow:
"""
Fonction utilitaire pour convertir un workflow visuel.
Args:
visual_workflow: Le workflow visuel à convertir
Returns:
Workflow exécutable
Raises:
ConversionError: Si la conversion échoue
"""
converter = VisualToGraphConverter()
return converter.convert(visual_workflow)