feat(vwb): execute wait for state
This commit is contained in:
96
tests/unit/test_vwb_wait_for_state.py
Normal file
96
tests/unit/test_vwb_wait_for_state.py
Normal file
@@ -0,0 +1,96 @@
|
||||
from visual_workflow_builder.backend.actions.base_action import VWBActionStatus
|
||||
from visual_workflow_builder.backend.actions.control.wait_for_state import (
|
||||
VWBWaitForStateAction,
|
||||
)
|
||||
from visual_workflow_builder.backend.services.wait_for_state import (
|
||||
match_expected_state,
|
||||
wait_for_expected_state,
|
||||
)
|
||||
|
||||
|
||||
def test_match_expected_state_accepts_window_title_and_process_alias():
|
||||
match = match_expected_state(
|
||||
expected_state={
|
||||
"window_title_contains": ["Executer"],
|
||||
"process_active": "explorer.exe",
|
||||
},
|
||||
observed_state={
|
||||
"window_title": "Executer",
|
||||
"process_active": "explorer",
|
||||
},
|
||||
)
|
||||
|
||||
assert match.matched is True
|
||||
assert match.matched_signals == ["window_title_contains", "process_active"]
|
||||
assert match.failed_signals == []
|
||||
|
||||
|
||||
def test_wait_for_expected_state_polls_until_match():
|
||||
states = iter(
|
||||
[
|
||||
{"window_title": "Visual Workflow Builder", "process_active": "chrome"},
|
||||
{"window_title": "Executer", "process_active": "explorer.exe"},
|
||||
]
|
||||
)
|
||||
|
||||
result = wait_for_expected_state(
|
||||
expected_state={"window_title_in": ["Executer"]},
|
||||
timeout_ms=500,
|
||||
poll_interval_ms=50,
|
||||
state_provider=lambda: next(states),
|
||||
)
|
||||
|
||||
assert result.matched is True
|
||||
assert result.timed_out is False
|
||||
assert result.polls == 2
|
||||
assert result.match["matched_signals"] == ["window_title_in"]
|
||||
|
||||
|
||||
def test_wait_for_state_action_returns_success_evidence_contract():
|
||||
action = VWBWaitForStateAction(
|
||||
action_id="wait_state_001",
|
||||
parameters={
|
||||
"expected_state": {
|
||||
"window_title_in": ["Executer"],
|
||||
"process_active": "explorer.exe",
|
||||
},
|
||||
"timeout_ms": 500,
|
||||
"poll_interval_ms": 50,
|
||||
"_state_provider": lambda: {
|
||||
"window_title": "Executer",
|
||||
"process_active": "explorer",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
result = action.execute("step_001")
|
||||
|
||||
assert result.status == VWBActionStatus.SUCCESS
|
||||
assert result.output_data["matched"] is True
|
||||
assert result.output_data["match"]["matched_signals"] == [
|
||||
"window_title_in",
|
||||
"process_active",
|
||||
]
|
||||
|
||||
|
||||
def test_wait_for_state_action_returns_timeout_with_observed_state():
|
||||
action = VWBWaitForStateAction(
|
||||
action_id="wait_state_002",
|
||||
parameters={
|
||||
"expected_state": {"window_title_contains": ["Enregistrer sous"]},
|
||||
"timeout_ms": 100,
|
||||
"poll_interval_ms": 50,
|
||||
"_state_provider": lambda: {
|
||||
"window_title": "Executer",
|
||||
"process_active": "explorer.exe",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
result = action.execute("step_002")
|
||||
|
||||
assert result.status == VWBActionStatus.TIMEOUT
|
||||
assert result.error is not None
|
||||
assert result.output_data["matched"] is False
|
||||
assert result.output_data["match"]["failed_signals"] == ["window_title_contains"]
|
||||
assert result.output_data["match"]["observed_state"]["window_title"] == "Executer"
|
||||
Reference in New Issue
Block a user