Files
rpa_vision_v3/tests/integration/test_replay_watchdog.py
Dom 7df51d2c79 snapshot: WIP 5j replay reliability (B1 watchdog + dialog handlers + grounding drift)
Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé,
polls morts ×2). Point de rollback stable.

Contenu:
- agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab
  hotkey fallback, confirm_save Unicode apostrophe, foreground dialog
  recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint,
  requires_post_verify_window_transition
- agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed)
- server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s
- server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan,
  metrics endpoint
- server_v1/replay_engine.py: _schedule_retry préserve original_action +
  dispatched_action
- stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab
  on save_as dialog open) + _attach_expected_window_before
- tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py
- tests/unit: test_executor_verify_window_guard.py (start_button, close_tab,
  runtime_dialog, post_verify, transition fallbacks)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 16:48:37 +02:00

353 lines
11 KiB
Python

"""Integration tests for the replay orphan watchdog."""
from __future__ import annotations
import asyncio
import contextlib
import importlib
import time
from typing import Any, Dict, List
import pytest
@contextlib.asynccontextmanager
async def fake_lock():
yield
@pytest.fixture(autouse=True)
def reset_watchdog_singleton():
import agent_v0.server_v1.replay_watchdog as wd_mod
wd_mod._singleton = None
for key in list(wd_mod._metrics.keys()):
if isinstance(wd_mod._metrics[key], (int, float)):
wd_mod._metrics[key] = 0
yield
@pytest.fixture
def env_short_timeout(monkeypatch):
monkeypatch.setenv("RPA_WATCHDOG_ENABLED", "1")
monkeypatch.setenv("RPA_WATCHDOG_SCAN_INTERVAL_S", "0.1")
monkeypatch.setenv("RPA_WATCHDOG_ORPHAN_TIMEOUT_S", "0.2")
monkeypatch.setenv("RPA_WATCHDOG_MAX_RESENDS", "2")
import agent_v0.server_v1.replay_watchdog as wd_mod
importlib.reload(wd_mod)
yield
@pytest.mark.asyncio
async def test_no_orphan_below_timeout(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
now = time.time()
retry_pending: Dict[str, Dict[str, Any]] = {
"act1": {
"action": {"action_id": "act1", "type": "click"},
"dispatched_action": {"action_id": "act1", "type": "click"},
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": now,
"first_dispatched_at": now,
"resent_count": 0,
}
}
replay_queues: Dict[str, List[Dict[str, Any]]] = {"sess1": []}
watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock)
result = await watchdog._scan_once()
assert result == {
"orphans": 0,
"resent": 0,
"gaveup": 0,
"skipped": 0,
"in_flight": 1,
}
assert replay_queues["sess1"] == []
assert retry_pending["act1"]["resent_count"] == 0
@pytest.mark.asyncio
async def test_orphan_above_timeout_resent_in_head(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
action = {"action_id": "act1", "type": "click"}
other = {"action_id": "act_next", "type": "click"}
retry_pending = {
"act1": {
"action": {"action_id": "original", "type": "click"},
"dispatched_action": action,
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": time.time() - 5.0,
"first_dispatched_at": time.time() - 5.0,
"resent_count": 0,
}
}
replay_queues = {"sess1": [other]}
watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock)
result = await watchdog._scan_once()
assert result["resent"] == 1
assert replay_queues["sess1"] == [action, other]
assert retry_pending["act1"]["resent_count"] == 1
assert retry_pending["act1"]["dispatched_at"] == 0.0
@pytest.mark.asyncio
async def test_giveup_after_max_resends(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
retry_pending = {
"act1": {
"action": {"action_id": "act1", "type": "click"},
"dispatched_action": {"action_id": "act1", "type": "click"},
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": time.time() - 5.0,
"first_dispatched_at": time.time() - 90.0,
"resent_count": 2,
}
}
replay_queues = {"sess1": []}
watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock)
result = await watchdog._scan_once()
assert result["gaveup"] == 1
assert result["resent"] == 0
assert "act1" not in retry_pending
assert replay_queues["sess1"] == []
@pytest.mark.asyncio
async def test_race_report_arrives_during_scan(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
retry_pending = {
"act1": {
"action": {"action_id": "act1", "type": "click"},
"dispatched_action": {"action_id": "act1", "type": "click"},
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": time.time() - 5.0,
"first_dispatched_at": time.time() - 5.0,
"resent_count": 0,
}
}
replay_queues = {"sess1": []}
@contextlib.asynccontextmanager
async def lock_that_pops_before_resend():
count = getattr(lock_that_pops_before_resend, "_count", 0) + 1
lock_that_pops_before_resend._count = count
if count == 2:
retry_pending.pop("act1", None)
yield
watchdog = ReplayWatchdog(retry_pending, replay_queues, lock_that_pops_before_resend)
result = await watchdog._scan_once()
assert result["orphans"] == 1
assert result["resent"] == 0
assert replay_queues["sess1"] == []
@pytest.mark.asyncio
async def test_disabled_via_env(monkeypatch):
monkeypatch.setenv("RPA_WATCHDOG_ENABLED", "0")
import agent_v0.server_v1.replay_watchdog as wd_mod
importlib.reload(wd_mod)
watchdog = wd_mod.ReplayWatchdog({}, {}, fake_lock)
await watchdog.start()
assert watchdog._task is None
await watchdog.stop()
@pytest.mark.asyncio
async def test_lifecycle_start_stop_clean(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
watchdog = ReplayWatchdog({}, {}, fake_lock)
await watchdog.start()
assert watchdog._task is not None
assert not watchdog._task.done()
await asyncio.sleep(0.25)
await watchdog.stop(timeout_s=2.0)
assert watchdog._task is None
@pytest.mark.asyncio
async def test_orphan_with_repush_tail(monkeypatch, env_short_timeout):
monkeypatch.setenv("RPA_WATCHDOG_REPUSH_POSITION", "tail")
import agent_v0.server_v1.replay_watchdog as wd_mod
importlib.reload(wd_mod)
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog
action = {"action_id": "act1", "type": "click"}
other = {"action_id": "act_next", "type": "click"}
retry_pending = {
"act1": {
"action": {"action_id": "original", "type": "click"},
"dispatched_action": action,
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": time.time() - 5.0,
"first_dispatched_at": time.time() - 5.0,
"resent_count": 0,
}
}
replay_queues = {"sess1": [other]}
watchdog = ReplayWatchdog(retry_pending, replay_queues, fake_lock)
await watchdog._scan_once()
assert replay_queues["sess1"] == [other, action]
@pytest.mark.asyncio
async def test_metrics_snapshot(env_short_timeout):
from agent_v0.server_v1.replay_watchdog import ReplayWatchdog, get_metrics_snapshot
retry_pending = {
"act1": {
"action": {"action_id": "act1", "type": "click"},
"dispatched_action": {"action_id": "act1", "type": "click"},
"session_id": "sess1",
"machine_id": "m1",
"dispatched_at": time.time() - 5.0,
"first_dispatched_at": time.time() - 5.0,
"resent_count": 0,
}
}
watchdog = ReplayWatchdog(retry_pending, {"sess1": []}, fake_lock)
await watchdog._scan_once()
snapshot = get_metrics_snapshot()
assert snapshot["scans_total"] >= 1
assert snapshot["orphans_detected_total"] >= 1
assert snapshot["orphans_resent_total"] >= 1
def test_default_orphan_timeout_matches_spec(monkeypatch):
monkeypatch.delenv("RPA_WATCHDOG_ORPHAN_TIMEOUT_S", raising=False)
import agent_v0.server_v1.replay_watchdog as wd_mod
importlib.reload(wd_mod)
assert wd_mod.WATCHDOG_ORPHAN_TIMEOUT_S == 45.0
@pytest.mark.asyncio
async def test_late_report_clears_resent_duplicate_from_queue(monkeypatch):
monkeypatch.setenv("RPA_API_TOKEN", "test_replay_watchdog_token")
from agent_v0.server_v1 import api_stream
monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_watchdog_token")
saved_states = dict(api_stream._replay_states)
saved_queues = dict(api_stream._replay_queues)
saved_retry = dict(api_stream._retry_pending)
api_stream._replay_states.clear()
api_stream._replay_queues.clear()
api_stream._retry_pending.clear()
try:
action = {
"action_id": "act_setup_sess_click_start",
"type": "click",
"visual_mode": True,
"x_pct": 0.387891,
"y_pct": 0.974375,
"_setup_phase": True,
"target_spec": {"by_role": "start_button"},
}
next_action = {"action_id": "act_setup_sess_wait_start", "type": "wait"}
replay_id = "replay_watchdog_dup"
session_id = "sess_watchdog_dup"
now = time.time()
api_stream._replay_states[replay_id] = {
"replay_id": replay_id,
"workflow_id": "session_replay:test",
"session_id": session_id,
"machine_id": "pc-watchdog",
"status": "running",
"total_actions": 2,
"completed_actions": 0,
"failed_actions": 0,
"current_action_index": 0,
"params": {},
"results": [],
"actions": [action, next_action],
"retried_actions": 0,
"unverified_actions": 0,
"error_log": [],
"last_screenshot": None,
"failed_action": None,
"pause_message": None,
"variables": {},
"safety_checks": [],
"checks_acknowledged": [],
"pause_reason": "",
"pause_payload": None,
}
api_stream._replay_queues[session_id] = [dict(action), dict(next_action)]
api_stream._retry_pending[action["action_id"]] = {
"action": dict(action),
"dispatched_action": dict(action),
"retry_count": 0,
"replay_id": replay_id,
"session_id": session_id,
"machine_id": "pc-watchdog",
"dispatched_at": now,
"first_dispatched_at": now - 5.0,
"resent_count": 1,
"last_resent_at": now - 1.0,
}
report = api_stream.ReplayResultReport(
session_id=session_id,
action_id=action["action_id"],
success=True,
warning="start_button_hotkey_fallback",
resolution_method="semantic_start_button_hotkey",
resolution_score=1.0,
)
result = await api_stream.report_action_result(report)
assert result["status"] == "recorded"
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
"act_setup_sess_wait_start"
]
assert action["action_id"] not in api_stream._retry_pending
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
finally:
api_stream._replay_states.clear()
api_stream._replay_states.update(saved_states)
api_stream._replay_queues.clear()
api_stream._replay_queues.update(saved_queues)
api_stream._retry_pending.clear()
api_stream._retry_pending.update(saved_retry)