"""Integration tests for the replay orphan watchdog.""" from __future__ import annotations import asyncio import contextlib import importlib import time from typing import Any, Dict, List import pytest @contextlib.asynccontextmanager async def fake_lock(): yield @pytest.fixture(autouse=True) def reset_watchdog_singleton(): import agent_v0.server_v1.replay_watchdog as wd_mod wd_mod._singleton = None for key in list(wd_mod._metrics.keys()): if isinstance(wd_mod._metrics[key], (int, float)): wd_mod._metrics[key] = 0 yield @pytest.fixture def env_short_timeout(monkeypatch): monkeypatch.setenv("RPA_WATCHDOG_ENABLED", "1") monkeypatch.setenv("RPA_WATCHDOG_SCAN_INTERVAL_S", "0.1") monkeypatch.setenv("RPA_WATCHDOG_ORPHAN_TIMEOUT_S", "0.2") monkeypatch.setenv("RPA_WATCHDOG_MAX_RESENDS", "2") import agent_v0.server_v1.replay_watchdog as wd_mod importlib.reload(wd_mod) yield @pytest.mark.asyncio async def test_no_orphan_below_timeout(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog now = time.time() retry_pending: Dict[str, Dict[str, Any]] = { "act1": { "action": {"action_id": "act1", "type": "click"}, "dispatched_action": {"action_id": "act1", "type": "click"}, "session_id": "sess1", "machine_id": "m1", "dispatched_at": now, "first_dispatched_at": now, "resent_count": 0, } } replay_queues: Dict[str, List[Dict[str, Any]]] = {"sess1": []} watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock) result = await watchdog._scan_once() assert result == { "orphans": 0, "resent": 0, "gaveup": 0, "skipped": 0, "in_flight": 1, } assert replay_queues["sess1"] == [] assert retry_pending["act1"]["resent_count"] == 0 @pytest.mark.asyncio async def test_orphan_above_timeout_resent_in_head(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog action = {"action_id": "act1", "type": "click"} other = {"action_id": "act_next", "type": "click"} retry_pending = { "act1": { "action": {"action_id": "original", "type": "click"}, "dispatched_action": action, "session_id": "sess1", "machine_id": "m1", "dispatched_at": time.time() - 5.0, "first_dispatched_at": time.time() - 5.0, "resent_count": 0, } } replay_queues = {"sess1": [other]} watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock) result = await watchdog._scan_once() assert result["resent"] == 1 assert replay_queues["sess1"] == [action, other] assert retry_pending["act1"]["resent_count"] == 1 assert retry_pending["act1"]["dispatched_at"] == 0.0 @pytest.mark.asyncio async def test_giveup_after_max_resends(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog retry_pending = { "act1": { "action": {"action_id": "act1", "type": "click"}, "dispatched_action": {"action_id": "act1", "type": "click"}, "session_id": "sess1", "machine_id": "m1", "dispatched_at": time.time() - 5.0, "first_dispatched_at": time.time() - 90.0, "resent_count": 2, } } replay_queues = {"sess1": []} watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock) result = await watchdog._scan_once() assert result["gaveup"] == 1 assert result["resent"] == 0 assert "act1" not in retry_pending assert replay_queues["sess1"] == [] @pytest.mark.asyncio async def test_race_report_arrives_during_scan(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog retry_pending = { "act1": { "action": {"action_id": "act1", "type": "click"}, "dispatched_action": {"action_id": "act1", "type": "click"}, "session_id": "sess1", "machine_id": "m1", "dispatched_at": time.time() - 5.0, "first_dispatched_at": time.time() - 5.0, "resent_count": 0, } } replay_queues = {"sess1": []} @contextlib.asynccontextmanager async def lock_that_pops_before_resend(): count = getattr(lock_that_pops_before_resend, "_count", 0) + 1 lock_that_pops_before_resend._count = count if count == 2: retry_pending.pop("act1", None) yield watchdog = ReplayWatchdog(retry_pending, replay_queues, lock_that_pops_before_resend) result = await watchdog._scan_once() assert result["orphans"] == 1 assert result["resent"] == 0 assert replay_queues["sess1"] == [] @pytest.mark.asyncio async def test_disabled_via_env(monkeypatch): monkeypatch.setenv("RPA_WATCHDOG_ENABLED", "0") import agent_v0.server_v1.replay_watchdog as wd_mod importlib.reload(wd_mod) watchdog = wd_mod.ReplayWatchdog({}, {}, fake_lock) await watchdog.start() assert watchdog._task is None await watchdog.stop() @pytest.mark.asyncio async def test_lifecycle_start_stop_clean(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog watchdog = ReplayWatchdog({}, {}, fake_lock) await watchdog.start() assert watchdog._task is not None assert not watchdog._task.done() await asyncio.sleep(0.25) await watchdog.stop(timeout_s=2.0) assert watchdog._task is None @pytest.mark.asyncio async def test_orphan_with_repush_tail(monkeypatch, env_short_timeout): monkeypatch.setenv("RPA_WATCHDOG_REPUSH_POSITION", "tail") import agent_v0.server_v1.replay_watchdog as wd_mod importlib.reload(wd_mod) from agent_v0.server_v1.replay_watchdog import ReplayWatchdog action = {"action_id": "act1", "type": "click"} other = {"action_id": "act_next", "type": "click"} retry_pending = { "act1": { "action": {"action_id": "original", "type": "click"}, "dispatched_action": action, "session_id": "sess1", "machine_id": "m1", "dispatched_at": time.time() - 5.0, "first_dispatched_at": time.time() - 5.0, "resent_count": 0, } } replay_queues = {"sess1": [other]} watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock) await watchdog._scan_once() assert replay_queues["sess1"] == [other, action] @pytest.mark.asyncio async def test_metrics_snapshot(env_short_timeout): from agent_v0.server_v1.replay_watchdog import ReplayWatchdog, get_metrics_snapshot retry_pending = { "act1": { "action": {"action_id": "act1", "type": "click"}, "dispatched_action": {"action_id": "act1", "type": "click"}, "session_id": "sess1", "machine_id": "m1", "dispatched_at": time.time() - 5.0, "first_dispatched_at": time.time() - 5.0, "resent_count": 0, } } watchdog = ReplayWatchdog(retry_pending, {"sess1": []}, fake_lock) await watchdog._scan_once() snapshot = get_metrics_snapshot() assert snapshot["scans_total"] >= 1 assert snapshot["orphans_detected_total"] >= 1 assert snapshot["orphans_resent_total"] >= 1 def test_default_orphan_timeout_matches_spec(monkeypatch): monkeypatch.delenv("RPA_WATCHDOG_ORPHAN_TIMEOUT_S", raising=False) import agent_v0.server_v1.replay_watchdog as wd_mod importlib.reload(wd_mod) assert wd_mod.WATCHDOG_ORPHAN_TIMEOUT_S == 45.0 @pytest.mark.asyncio async def test_late_report_clears_resent_duplicate_from_queue(monkeypatch): monkeypatch.setenv("RPA_API_TOKEN", "test_replay_watchdog_token") from agent_v0.server_v1 import api_stream monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_watchdog_token") saved_states = dict(api_stream._replay_states) saved_queues = dict(api_stream._replay_queues) saved_retry = dict(api_stream._retry_pending) api_stream._replay_states.clear() api_stream._replay_queues.clear() api_stream._retry_pending.clear() try: action = { "action_id": "act_setup_sess_click_start", "type": "click", "visual_mode": True, "x_pct": 0.387891, "y_pct": 0.974375, "_setup_phase": True, "target_spec": {"by_role": "start_button"}, } next_action = {"action_id": "act_setup_sess_wait_start", "type": "wait"} replay_id = "replay_watchdog_dup" session_id = "sess_watchdog_dup" now = time.time() api_stream._replay_states[replay_id] = { "replay_id": replay_id, "workflow_id": "session_replay:test", "session_id": session_id, "machine_id": "pc-watchdog", "status": "running", "total_actions": 2, "completed_actions": 0, "failed_actions": 0, "current_action_index": 0, "params": {}, "results": [], "actions": [action, next_action], "retried_actions": 0, "unverified_actions": 0, "error_log": [], "last_screenshot": None, "failed_action": None, "pause_message": None, "variables": {}, "safety_checks": [], "checks_acknowledged": [], "pause_reason": "", "pause_payload": None, } api_stream._replay_queues[session_id] = [dict(action), dict(next_action)] api_stream._retry_pending[action["action_id"]] = { "action": dict(action), "dispatched_action": dict(action), "retry_count": 0, "replay_id": replay_id, "session_id": session_id, "machine_id": "pc-watchdog", "dispatched_at": now, "first_dispatched_at": now - 5.0, "resent_count": 1, "last_resent_at": now - 1.0, } report = api_stream.ReplayResultReport( session_id=session_id, action_id=action["action_id"], success=True, warning="start_button_hotkey_fallback", resolution_method="semantic_start_button_hotkey", resolution_score=1.0, ) result = await api_stream.report_action_result(report) assert result["status"] == "recorded" assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [ "act_setup_sess_wait_start" ] assert action["action_id"] not in api_stream._retry_pending assert api_stream._replay_states[replay_id]["completed_actions"] == 1 assert api_stream._replay_states[replay_id]["current_action_index"] == 1 finally: api_stream._replay_states.clear() api_stream._replay_states.update(saved_states) api_stream._replay_queues.clear() api_stream._replay_queues.update(saved_queues) api_stream._retry_pending.clear() api_stream._retry_pending.update(saved_retry)