- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
268 lines
9.7 KiB
Python
268 lines
9.7 KiB
Python
"""
|
|
Modèles de résultats d'exécution pour WorkflowPipeline
|
|
|
|
Auteur: Dom, Alice Kiro - 20 décembre 2024
|
|
"""
|
|
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Optional, Dict, Any, List
|
|
from enum import Enum
|
|
|
|
from .screen_state import ScreenState
|
|
from .workflow_graph import WorkflowEdge, Action
|
|
from ..execution.target_resolver import ResolvedTarget
|
|
|
|
|
|
class StepExecutionStatus(Enum):
|
|
"""Statut d'exécution d'étape de workflow"""
|
|
SUCCESS = "success"
|
|
FAILED = "failed"
|
|
NO_MATCH = "no_match"
|
|
WORKFLOW_COMPLETE = "workflow_complete"
|
|
TARGET_NOT_FOUND = "target_not_found"
|
|
POSTCONDITION_FAILED = "postcondition_failed"
|
|
EXECUTION_ERROR = "execution_error"
|
|
|
|
|
|
@dataclass
|
|
class RecoveryInfo:
|
|
"""Informations sur la récupération appliquée"""
|
|
strategy: str
|
|
message: str
|
|
success: bool
|
|
attempts: int = 0
|
|
duration_ms: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class PerformanceMetrics:
|
|
"""Métriques de performance d'exécution"""
|
|
total_execution_time_ms: float
|
|
state_matching_time_ms: float = 0.0
|
|
target_resolution_time_ms: float = 0.0
|
|
action_execution_time_ms: float = 0.0
|
|
error_handling_time_ms: float = 0.0
|
|
|
|
|
|
@dataclass
|
|
class WorkflowExecutionResult:
|
|
"""
|
|
Résultat complet d'exécution d'étape de workflow
|
|
|
|
Contient toutes les métadonnées nécessaires pour l'audit,
|
|
l'apprentissage, et le debugging.
|
|
"""
|
|
# Identifiants
|
|
execution_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
workflow_id: str = ""
|
|
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
|
|
# Statut d'exécution
|
|
status: StepExecutionStatus = StepExecutionStatus.FAILED
|
|
success: bool = False
|
|
step_type: str = "unknown"
|
|
|
|
# Contexte d'exécution
|
|
current_node: Optional[str] = None
|
|
target_node: Optional[str] = None
|
|
current_state: Optional[ScreenState] = None
|
|
|
|
# Action exécutée
|
|
action_executed: Optional[Dict[str, Any]] = None
|
|
target_resolved: Optional[ResolvedTarget] = None
|
|
|
|
# Résultats et erreurs
|
|
message: str = ""
|
|
error: Optional[str] = None
|
|
recovery_applied: Optional[RecoveryInfo] = None
|
|
|
|
# Métriques
|
|
performance_metrics: PerformanceMetrics = field(default_factory=lambda: PerformanceMetrics(0.0))
|
|
|
|
# Métadonnées d'audit
|
|
created_at: datetime = field(default_factory=datetime.now)
|
|
match_result: Optional[Dict[str, Any]] = None
|
|
execution_details: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@classmethod
|
|
def success(
|
|
cls,
|
|
execution_id: str,
|
|
workflow_id: str,
|
|
current_node: str,
|
|
target_node: str,
|
|
action_executed: Dict[str, Any],
|
|
target_resolved: Optional[ResolvedTarget] = None,
|
|
match_result: Optional[Dict[str, Any]] = None,
|
|
performance_metrics: Optional[PerformanceMetrics] = None
|
|
) -> 'WorkflowExecutionResult':
|
|
"""Créer un résultat de succès"""
|
|
return cls(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status=StepExecutionStatus.SUCCESS,
|
|
success=True,
|
|
step_type="action_execution",
|
|
current_node=current_node,
|
|
target_node=target_node,
|
|
action_executed=action_executed,
|
|
target_resolved=target_resolved,
|
|
match_result=match_result,
|
|
performance_metrics=performance_metrics or PerformanceMetrics(0.0),
|
|
message="Workflow step executed successfully"
|
|
)
|
|
|
|
@classmethod
|
|
def no_match(
|
|
cls,
|
|
execution_id: str,
|
|
workflow_id: str,
|
|
current_state: ScreenState,
|
|
recovery_info: Optional[RecoveryInfo] = None,
|
|
performance_metrics: Optional[PerformanceMetrics] = None
|
|
) -> 'WorkflowExecutionResult':
|
|
"""Créer un résultat d'échec de matching"""
|
|
return cls(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status=StepExecutionStatus.NO_MATCH,
|
|
success=False,
|
|
step_type="state_matching",
|
|
current_state=current_state,
|
|
recovery_applied=recovery_info,
|
|
performance_metrics=performance_metrics or PerformanceMetrics(0.0),
|
|
message="No matching state found in workflow",
|
|
error="State matching failed"
|
|
)
|
|
|
|
@classmethod
|
|
def workflow_complete(
|
|
cls,
|
|
execution_id: str,
|
|
workflow_id: str,
|
|
current_node: str,
|
|
performance_metrics: Optional[PerformanceMetrics] = None
|
|
) -> 'WorkflowExecutionResult':
|
|
"""Créer un résultat de workflow terminé"""
|
|
return cls(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status=StepExecutionStatus.WORKFLOW_COMPLETE,
|
|
success=True,
|
|
step_type="workflow_complete",
|
|
current_node=current_node,
|
|
performance_metrics=performance_metrics or PerformanceMetrics(0.0),
|
|
message="Workflow completed - no more actions"
|
|
)
|
|
|
|
@classmethod
|
|
def error(
|
|
cls,
|
|
execution_id: str,
|
|
workflow_id: str,
|
|
error_message: str,
|
|
step_type: str = "execution_error",
|
|
current_node: Optional[str] = None,
|
|
recovery_info: Optional[RecoveryInfo] = None,
|
|
performance_metrics: Optional[PerformanceMetrics] = None
|
|
) -> 'WorkflowExecutionResult':
|
|
"""Créer un résultat d'erreur"""
|
|
return cls(
|
|
execution_id=execution_id,
|
|
workflow_id=workflow_id,
|
|
status=StepExecutionStatus.EXECUTION_ERROR,
|
|
success=False,
|
|
step_type=step_type,
|
|
current_node=current_node,
|
|
recovery_applied=recovery_info,
|
|
performance_metrics=performance_metrics or PerformanceMetrics(0.0),
|
|
message=f"Execution failed: {error_message}",
|
|
error=error_message
|
|
)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
"""Convertir en dictionnaire pour sérialisation"""
|
|
result = {
|
|
"execution_id": self.execution_id,
|
|
"workflow_id": self.workflow_id,
|
|
"correlation_id": self.correlation_id,
|
|
"status": self.status.value,
|
|
"success": self.success,
|
|
"step_type": self.step_type,
|
|
"message": self.message,
|
|
"created_at": self.created_at.isoformat(),
|
|
"performance_metrics": {
|
|
"total_execution_time_ms": self.performance_metrics.total_execution_time_ms,
|
|
"state_matching_time_ms": self.performance_metrics.state_matching_time_ms,
|
|
"target_resolution_time_ms": self.performance_metrics.target_resolution_time_ms,
|
|
"action_execution_time_ms": self.performance_metrics.action_execution_time_ms,
|
|
"error_handling_time_ms": self.performance_metrics.error_handling_time_ms
|
|
}
|
|
}
|
|
|
|
# Ajouter les champs optionnels s'ils existent
|
|
if self.current_node:
|
|
result["current_node"] = self.current_node
|
|
if self.target_node:
|
|
result["target_node"] = self.target_node
|
|
if self.action_executed:
|
|
result["action_executed"] = self.action_executed
|
|
if self.target_resolved:
|
|
# Gérer la sérialisation de bbox qui peut être un objet BBox
|
|
bbox_data = self.target_resolved.element.bbox
|
|
if hasattr(bbox_data, 'to_tuple'):
|
|
# Si c'est un objet BBox avec méthode to_tuple
|
|
bbox_serialized = {
|
|
"x": bbox_data.x,
|
|
"y": bbox_data.y,
|
|
"width": bbox_data.width,
|
|
"height": bbox_data.height
|
|
}
|
|
elif isinstance(bbox_data, dict):
|
|
bbox_serialized = bbox_data
|
|
elif isinstance(bbox_data, (list, tuple)) and len(bbox_data) >= 4:
|
|
bbox_serialized = {
|
|
"x": bbox_data[0],
|
|
"y": bbox_data[1],
|
|
"width": bbox_data[2],
|
|
"height": bbox_data[3]
|
|
}
|
|
else:
|
|
bbox_serialized = str(bbox_data) # Fallback to string
|
|
|
|
result["target_resolved"] = {
|
|
"element_id": self.target_resolved.element.element_id,
|
|
"confidence": self.target_resolved.confidence,
|
|
"method": getattr(self.target_resolved, 'method', 'standard'),
|
|
"bbox": bbox_serialized
|
|
}
|
|
if self.error:
|
|
result["error"] = str(self.error) # Forcer la conversion en string
|
|
if self.recovery_applied:
|
|
result["recovery_applied"] = {
|
|
"strategy": self.recovery_applied.strategy,
|
|
"message": self.recovery_applied.message,
|
|
"success": self.recovery_applied.success,
|
|
"attempts": self.recovery_applied.attempts,
|
|
"duration_ms": self.recovery_applied.duration_ms
|
|
}
|
|
if self.match_result:
|
|
result["match_result"] = self.match_result
|
|
if self.execution_details:
|
|
result["execution_details"] = self.execution_details
|
|
|
|
return result
|
|
|
|
def add_execution_detail(self, key: str, value: Any) -> None:
|
|
"""Ajouter un détail d'exécution"""
|
|
self.execution_details[key] = value
|
|
|
|
def set_performance_metric(self, metric_name: str, value: float) -> None:
|
|
"""Définir une métrique de performance"""
|
|
if hasattr(self.performance_metrics, metric_name):
|
|
setattr(self.performance_metrics, metric_name, value)
|
|
else:
|
|
# Ajouter comme détail d'exécution si pas une métrique standard
|
|
self.execution_details[f"metric_{metric_name}"] = value |