82 lines
2.6 KiB
Python
82 lines
2.6 KiB
Python
from flask import Flask
|
|
|
|
from visual_workflow_builder.backend.catalog_routes_v2_vlm import catalog_bp
|
|
|
|
|
|
class _WaitResult:
|
|
matched = False
|
|
|
|
def to_dict(self):
|
|
return {
|
|
"matched": False,
|
|
"timed_out": True,
|
|
"match": {
|
|
"expected_state": {"window_title_in": ["Executer"]},
|
|
"observed_state": {"window_title": "Enregistrer sous"},
|
|
},
|
|
}
|
|
|
|
|
|
def test_catalog_pause_for_human_returns_paused_status():
|
|
app = Flask(__name__)
|
|
app.register_blueprint(catalog_bp)
|
|
|
|
with app.test_client() as client:
|
|
response = client.post(
|
|
"/api/vwb/catalog/execute",
|
|
json={
|
|
"type": "pause_for_human",
|
|
"step_id": "pause_1",
|
|
"parameters": {
|
|
"message": "Valider le resultat",
|
|
"phase": "after",
|
|
"verdict_required": True,
|
|
"competence_id": "key_win_r_wait_explorer_exe",
|
|
},
|
|
},
|
|
)
|
|
|
|
data = response.get_json()
|
|
assert response.status_code == 200
|
|
assert data["result"]["status"] == "paused"
|
|
assert data["result"]["output_data"]["needs_human"] is True
|
|
assert data["result"]["output_data"]["write_back_enabled"] is False
|
|
|
|
|
|
def test_catalog_wait_for_state_popup_pauses_without_auto_resolution(monkeypatch):
|
|
monkeypatch.setattr(
|
|
"visual_workflow_builder.backend.services.wait_for_state.wait_for_expected_state",
|
|
lambda **_kwargs: _WaitResult(),
|
|
)
|
|
monkeypatch.setattr(
|
|
"core.execution.input_handler.check_screen_for_patterns",
|
|
lambda: {"pattern": "save_as", "title": "Enregistrer sous"},
|
|
)
|
|
|
|
app = Flask(__name__)
|
|
app.register_blueprint(catalog_bp)
|
|
|
|
with app.test_client() as client:
|
|
response = client.post(
|
|
"/api/vwb/catalog/execute",
|
|
json={
|
|
"type": "wait_for_state",
|
|
"step_id": "wait_1",
|
|
"parameters": {
|
|
"expected_state": {"window_title_in": ["Executer"]},
|
|
"timeout_ms": 1,
|
|
"poll_interval_ms": 1,
|
|
"supervised_popup_detection": True,
|
|
"competence_id": "key_win_r_wait_explorer_exe",
|
|
"source_method_id": "step_2_wait_state",
|
|
},
|
|
},
|
|
)
|
|
|
|
data = response.get_json()
|
|
assert response.status_code == 200
|
|
assert data["result"]["status"] == "paused"
|
|
pause = data["result"]["output_data"]["human_pause"]
|
|
assert pause["pause_reason"] == "unexpected_popup"
|
|
assert pause["auto_resolution"] is False
|