Files
rpa_vision_v3/tests/unit/test_workflow_execution_result.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

348 lines
13 KiB
Python

"""
Tests unitaires pour WorkflowExecutionResult avec métadonnées complètes
Auteur: Dom, Alice Kiro - 20 décembre 2024
"""
import pytest
import uuid
from datetime import datetime
from unittest.mock import Mock, patch
from core.models.execution_result import (
WorkflowExecutionResult,
PerformanceMetrics,
RecoveryInfo,
StepExecutionStatus
)
from core.models.screen_state import ScreenState
from core.execution.target_resolver import ResolvedTarget
from core.models.ui_element import UIElement
class TestWorkflowExecutionResult:
"""Tests pour WorkflowExecutionResult avec métadonnées complètes"""
def test_success_result_has_complete_metadata(self):
"""Test que le résultat de succès contient toutes les métadonnées requises"""
# Arrange
execution_id = str(uuid.uuid4())
workflow_id = "test_workflow"
current_node = "node_1"
target_node = "node_2"
action_executed = {"type": "click", "target": "button"}
performance_metrics = PerformanceMetrics(
total_execution_time_ms=150.0,
state_matching_time_ms=50.0,
target_resolution_time_ms=30.0,
action_execution_time_ms=70.0
)
# Act
result = WorkflowExecutionResult.success(
execution_id=execution_id,
workflow_id=workflow_id,
current_node=current_node,
target_node=target_node,
action_executed=action_executed,
performance_metrics=performance_metrics
)
# Assert - Vérifier que toutes les métadonnées requises sont présentes
assert result.execution_id == execution_id
assert result.workflow_id == workflow_id
assert result.correlation_id is not None # UUID généré automatiquement
assert result.success is True
assert result.status == StepExecutionStatus.SUCCESS
assert result.current_node == current_node
assert result.target_node == target_node
assert result.action_executed == action_executed
assert result.performance_metrics == performance_metrics
assert result.created_at is not None
assert isinstance(result.created_at, datetime)
def test_error_result_has_complete_metadata(self):
"""Test que le résultat d'erreur contient toutes les métadonnées requises"""
# Arrange
execution_id = str(uuid.uuid4())
workflow_id = "test_workflow"
error_message = "Target not found"
recovery_info = RecoveryInfo(
strategy="spatial_fallback",
message="Applied spatial fallback strategy",
success=False,
attempts=2,
duration_ms=100.0
)
performance_metrics = PerformanceMetrics(
total_execution_time_ms=200.0,
error_handling_time_ms=100.0
)
# Act
result = WorkflowExecutionResult.error(
execution_id=execution_id,
workflow_id=workflow_id,
error_message=error_message,
recovery_info=recovery_info,
performance_metrics=performance_metrics
)
# Assert - Vérifier que toutes les métadonnées requises sont présentes
assert result.execution_id == execution_id
assert result.workflow_id == workflow_id
assert result.correlation_id is not None
assert result.success is False
assert result.status == StepExecutionStatus.EXECUTION_ERROR
assert result.error == error_message
assert result.recovery_applied == recovery_info
assert result.performance_metrics == performance_metrics
assert result.created_at is not None
def test_no_match_result_has_complete_metadata(self):
"""Test que le résultat de no_match contient toutes les métadonnées requises"""
# Arrange
execution_id = str(uuid.uuid4())
workflow_id = "test_workflow"
current_state = Mock(spec=ScreenState)
recovery_info = RecoveryInfo(
strategy="hierarchical_matching",
message="Applied hierarchical matching fallback",
success=False,
attempts=1,
duration_ms=50.0
)
performance_metrics = PerformanceMetrics(
total_execution_time_ms=100.0,
state_matching_time_ms=80.0,
error_handling_time_ms=50.0
)
# Act
result = WorkflowExecutionResult.no_match(
execution_id=execution_id,
workflow_id=workflow_id,
current_state=current_state,
recovery_info=recovery_info,
performance_metrics=performance_metrics
)
# Assert - Vérifier que toutes les métadonnées requises sont présentes
assert result.execution_id == execution_id
assert result.workflow_id == workflow_id
assert result.correlation_id is not None
assert result.success is False
assert result.status == StepExecutionStatus.NO_MATCH
assert result.current_state == current_state
assert result.recovery_applied == recovery_info
assert result.performance_metrics == performance_metrics
assert result.message == "No matching state found in workflow"
assert result.error == "State matching failed"
def test_workflow_complete_result_has_complete_metadata(self):
"""Test que le résultat de workflow_complete contient toutes les métadonnées requises"""
# Arrange
execution_id = str(uuid.uuid4())
workflow_id = "test_workflow"
current_node = "final_node"
performance_metrics = PerformanceMetrics(
total_execution_time_ms=50.0,
state_matching_time_ms=30.0
)
# Act
result = WorkflowExecutionResult.workflow_complete(
execution_id=execution_id,
workflow_id=workflow_id,
current_node=current_node,
performance_metrics=performance_metrics
)
# Assert - Vérifier que toutes les métadonnées requises sont présentes
assert result.execution_id == execution_id
assert result.workflow_id == workflow_id
assert result.correlation_id is not None
assert result.success is True
assert result.status == StepExecutionStatus.WORKFLOW_COMPLETE
assert result.current_node == current_node
assert result.performance_metrics == performance_metrics
assert result.message == "Workflow completed - no more actions"
def test_to_dict_serialization_includes_all_metadata(self):
"""Test que la sérialisation to_dict inclut toutes les métadonnées"""
# Arrange
execution_id = str(uuid.uuid4())
workflow_id = "test_workflow"
correlation_id = str(uuid.uuid4())
# Créer un UIElement mock pour ResolvedTarget
ui_element = Mock(spec=UIElement)
ui_element.element_id = "button_1"
ui_element.bbox = {"x": 100, "y": 200, "width": 50, "height": 30}
target_resolved = Mock(spec=ResolvedTarget)
target_resolved.element = ui_element
target_resolved.confidence = 0.95
recovery_info = RecoveryInfo(
strategy="semantic_variant",
message="Applied semantic variant strategy",
success=True,
attempts=1,
duration_ms=25.0
)
performance_metrics = PerformanceMetrics(
total_execution_time_ms=175.0,
state_matching_time_ms=40.0,
target_resolution_time_ms=35.0,
action_execution_time_ms=75.0,
error_handling_time_ms=25.0
)
result = WorkflowExecutionResult.success(
execution_id=execution_id,
workflow_id=workflow_id,
current_node="node_1",
target_node="node_2",
action_executed={"type": "click", "target": "button"},
target_resolved=target_resolved,
performance_metrics=performance_metrics
)
result.correlation_id = correlation_id
result.recovery_applied = recovery_info
result.match_result = {"node_id": "node_1", "confidence": 0.92}
result.add_execution_detail("custom_metric", "test_value")
# Act
result_dict = result.to_dict()
# Assert - Vérifier que toutes les métadonnées sont dans le dictionnaire
assert result_dict["execution_id"] == execution_id
assert result_dict["workflow_id"] == workflow_id
assert result_dict["correlation_id"] == correlation_id
assert result_dict["success"] is True
assert result_dict["status"] == StepExecutionStatus.SUCCESS.value
# Vérifier les métriques de performance
assert "performance_metrics" in result_dict
perf_metrics = result_dict["performance_metrics"]
assert perf_metrics["total_execution_time_ms"] == 175.0
assert perf_metrics["state_matching_time_ms"] == 40.0
assert perf_metrics["target_resolution_time_ms"] == 35.0
assert perf_metrics["action_execution_time_ms"] == 75.0
assert perf_metrics["error_handling_time_ms"] == 25.0
# Vérifier les informations de récupération
assert "recovery_applied" in result_dict
recovery = result_dict["recovery_applied"]
assert recovery["strategy"] == "semantic_variant"
assert recovery["success"] is True
assert recovery["attempts"] == 1
assert recovery["duration_ms"] == 25.0
# Vérifier la cible résolue
assert "target_resolved" in result_dict
target = result_dict["target_resolved"]
assert target["element_id"] == "button_1"
assert target["confidence"] == 0.95
# Vérifier les détails d'exécution
assert "execution_details" in result_dict
assert result_dict["execution_details"]["custom_metric"] == "test_value"
# Vérifier le résultat de matching
assert "match_result" in result_dict
assert result_dict["match_result"]["node_id"] == "node_1"
assert result_dict["match_result"]["confidence"] == 0.92
def test_add_execution_detail_stores_custom_metadata(self):
"""Test que add_execution_detail permet d'ajouter des métadonnées personnalisées"""
# Arrange
result = WorkflowExecutionResult.success(
execution_id="test_id",
workflow_id="test_workflow",
current_node="node_1",
target_node="node_2",
action_executed={"type": "click"}
)
# Act
result.add_execution_detail("custom_key", "custom_value")
result.add_execution_detail("retry_count", 3)
result.add_execution_detail("user_context", {"user_id": "123", "session": "abc"})
# Assert
assert result.execution_details["custom_key"] == "custom_value"
assert result.execution_details["retry_count"] == 3
assert result.execution_details["user_context"]["user_id"] == "123"
def test_set_performance_metric_updates_metrics(self):
"""Test que set_performance_metric met à jour les métriques correctement"""
# Arrange
result = WorkflowExecutionResult.success(
execution_id="test_id",
workflow_id="test_workflow",
current_node="node_1",
target_node="node_2",
action_executed={"type": "click"}
)
# Act
result.set_performance_metric("state_matching_time_ms", 45.0)
result.set_performance_metric("custom_metric", 123.0) # Métrique non-standard
# Assert
assert result.performance_metrics.state_matching_time_ms == 45.0
# Les métriques non-standard vont dans execution_details
assert result.execution_details["metric_custom_metric"] == 123.0
class TestPerformanceMetrics:
"""Tests pour PerformanceMetrics"""
def test_performance_metrics_initialization(self):
"""Test que PerformanceMetrics s'initialise correctement"""
# Act
metrics = PerformanceMetrics(
total_execution_time_ms=100.0,
state_matching_time_ms=25.0,
target_resolution_time_ms=30.0,
action_execution_time_ms=40.0,
error_handling_time_ms=5.0
)
# Assert
assert metrics.total_execution_time_ms == 100.0
assert metrics.state_matching_time_ms == 25.0
assert metrics.target_resolution_time_ms == 30.0
assert metrics.action_execution_time_ms == 40.0
assert metrics.error_handling_time_ms == 5.0
class TestRecoveryInfo:
"""Tests pour RecoveryInfo"""
def test_recovery_info_initialization(self):
"""Test que RecoveryInfo s'initialise correctement"""
# Act
recovery = RecoveryInfo(
strategy="spatial_fallback",
message="Applied spatial fallback due to target not found",
success=True,
attempts=2,
duration_ms=150.0
)
# Assert
assert recovery.strategy == "spatial_fallback"
assert recovery.message == "Applied spatial fallback due to target not found"
assert recovery.success is True
assert recovery.attempts == 2
assert recovery.duration_ms == 150.0