Files
rpa_vision_v3/tests/integration/test_replay_single_inflight.py
Dom 4ba426c205 fix(replay): guard single in-flight dispatch
Add a private in-flight helper for replay dispatch, block machine retargeting while an action is still pending on the previous session, and warn on duplicate in-flight entries for the same replay triplet.

Freeze the Notepad runtime dialog success path and add integration coverage for single in-flight dispatch, watchdog late-report documentation, and the known concurrent-poll race as an xfail.
2026-05-25 11:00:59 +02:00

609 lines
22 KiB
Python

"""Non-regression tests for replay single in-flight dispatch guards."""
from __future__ import annotations
import sys
import time
from pathlib import Path
import pytest
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
@pytest.fixture
def isolated_replay_state(monkeypatch):
monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token")
from agent_v0.server_v1 import api_stream
monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token")
if api_stream._replay_lock.locked():
pytest.fail(
"_replay_lock is already held at fixture setup — a previous test "
"likely crashed without releasing it. Fail fast instead of waiting "
"on the 4.5s acquire timeout in get_next_action()."
)
saved_states = dict(api_stream._replay_states)
saved_queues = dict(api_stream._replay_queues)
saved_retry = dict(api_stream._retry_pending)
saved_targets = dict(api_stream._machine_replay_target)
saved_heartbeat = dict(api_stream._last_heartbeat)
api_stream._replay_states.clear()
api_stream._replay_queues.clear()
api_stream._retry_pending.clear()
api_stream._machine_replay_target.clear()
api_stream._last_heartbeat.clear()
yield api_stream
api_stream._replay_states.clear()
api_stream._replay_states.update(saved_states)
api_stream._replay_queues.clear()
api_stream._replay_queues.update(saved_queues)
api_stream._retry_pending.clear()
api_stream._retry_pending.update(saved_retry)
api_stream._machine_replay_target.clear()
api_stream._machine_replay_target.update(saved_targets)
api_stream._last_heartbeat.clear()
api_stream._last_heartbeat.update(saved_heartbeat)
def _running_replay_state(
replay_id: str,
session_id: str,
machine_id: str,
actions: list[dict],
) -> dict:
return {
"replay_id": replay_id,
"workflow_id": "session_replay:test",
"session_id": session_id,
"machine_id": machine_id,
"status": "running",
"total_actions": len(actions),
"completed_actions": 0,
"failed_actions": 0,
"current_action_index": 0,
"params": {},
"results": [],
"actions": list(actions),
"retried_actions": 0,
"unverified_actions": 0,
"error_log": [],
"last_screenshot": None,
"failed_action": None,
"pause_message": None,
"variables": {},
"safety_checks": [],
"checks_acknowledged": [],
"pause_reason": "",
"pause_payload": None,
}
def _click_action(action_id: str) -> dict:
return {
"action_id": action_id,
"type": "click",
"visual_mode": True,
"x_pct": 0.5,
"y_pct": 0.5,
"target_spec": {"by_text": "OK"},
}
def _pending_entry(
action: dict,
replay_id: str,
session_id: str,
machine_id: str,
dispatched_at: float,
) -> dict:
return {
"action": dict(action),
"dispatched_action": dict(action),
"retry_count": 0,
"replay_id": replay_id,
"session_id": session_id,
"machine_id": machine_id,
"dispatched_at": dispatched_at,
"first_dispatched_at": dispatched_at or 0.0,
"resent_count": 0,
"last_resent_at": 0.0,
}
class _NoopReplayLearner:
def record_from_replay_result(self, **kwargs):
return None
class _NoopAuditTrail:
def record(self, entry):
return None
@pytest.mark.asyncio
async def test_real_dispatch_then_next_poll_blocks_with_action_in_flight(
isolated_replay_state,
):
api_stream = isolated_replay_state
replay_id = "replay_real_dispatch_blocks"
session_id = "sess_real_dispatch_blocks"
machine_id = "pc-real-dispatch"
a1 = _click_action("act_real_dispatch_a1")
a2 = _click_action("act_real_dispatch_a2")
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [a1, a2]
)
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
first = await api_stream.get_next_action(session_id, machine_id)
blocked = await api_stream.get_next_action(session_id, machine_id)
blocked_again = await api_stream.get_next_action(session_id, machine_id)
assert first["action"]["action_id"] == a1["action_id"]
assert blocked["action"] is None
assert blocked["action_in_flight"] is True
assert blocked["in_flight_action_id"] == a1["action_id"]
assert blocked["replay_id"] == replay_id
assert blocked_again["action"] is None
assert blocked_again["action_in_flight"] is True
assert blocked_again["in_flight_action_id"] == a1["action_id"]
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
a2["action_id"]
]
@pytest.mark.asyncio
async def test_success_report_clears_inflight_and_allows_next_dispatch(
isolated_replay_state,
monkeypatch,
):
api_stream = isolated_replay_state
replay_id = "replay_success_allows_next"
session_id = "sess_success_allows_next"
machine_id = "pc-success-next"
a1 = _click_action("act_success_next_a1")
a2 = _click_action("act_success_next_a2")
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [a1, a2]
)
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
first = await api_stream.get_next_action(session_id, machine_id)
assert first["action"]["action_id"] == a1["action_id"]
report = api_stream.ReplayResultReport(
session_id=session_id,
action_id=a1["action_id"],
success=True,
)
recorded = await api_stream.report_action_result(report)
second = await api_stream.get_next_action(session_id, machine_id)
assert recorded["status"] == "recorded"
assert a1["action_id"] not in api_stream._retry_pending
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
assert second["action"]["action_id"] == a2["action_id"]
assert api_stream._replay_queues[session_id] == []
@pytest.mark.asyncio
async def test_get_next_action_blocks_when_inflight(isolated_replay_state):
api_stream = isolated_replay_state
replay_id = "replay_single_inflight"
session_id = "sess_single_inflight"
machine_id = "pc-single"
action = _click_action("act_single_inflight")
dispatched_at = time.time()
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [action]
)
api_stream._replay_queues[session_id] = [dict(action)]
api_stream._retry_pending[action["action_id"]] = _pending_entry(
action, replay_id, session_id, machine_id, dispatched_at
)
result = await api_stream.get_next_action(session_id, machine_id)
assert result["action"] is None
assert result["action_in_flight"] is True
assert result["in_flight_action_id"] == action["action_id"]
assert result["replay_id"] == replay_id
assert api_stream._replay_queues[session_id] == [action]
assert (
api_stream._retry_pending[action["action_id"]]["dispatched_at"]
== dispatched_at
)
@pytest.mark.asyncio
async def test_get_next_action_allows_resume_with_dispatched_at_zero(
isolated_replay_state,
):
api_stream = isolated_replay_state
replay_id = "replay_resume_zero"
session_id = "sess_resume_zero"
machine_id = "pc-resume"
action = _click_action("act_resume_zero")
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [action]
)
api_stream._replay_queues[session_id] = [dict(action)]
api_stream._retry_pending[action["action_id"]] = _pending_entry(
action, replay_id, session_id, machine_id, dispatched_at=0.0
)
result = await api_stream.get_next_action(session_id, machine_id)
assert result["action"]["action_id"] == action["action_id"]
assert api_stream._replay_queues[session_id] == []
retry_info = api_stream._retry_pending[action["action_id"]]
assert retry_info["dispatched_at"] > 0
assert retry_info["first_dispatched_at"] > 0
assert retry_info["session_id"] == session_id
assert retry_info["machine_id"] == machine_id
assert retry_info["replay_id"] == replay_id
@pytest.mark.asyncio
async def test_inflight_filter_isolates_replays(isolated_replay_state):
api_stream = isolated_replay_state
session_id = "sess_replay_isolation"
machine_id = "pc-isolation"
old_action = _click_action("act_old_inflight")
new_action = _click_action("act_new_dispatch")
api_stream._replay_states["replay_new"] = _running_replay_state(
"replay_new", session_id, machine_id, [new_action]
)
api_stream._replay_queues[session_id] = [dict(new_action)]
api_stream._retry_pending[old_action["action_id"]] = _pending_entry(
old_action,
"replay_old",
session_id,
machine_id,
dispatched_at=time.time(),
)
result = await api_stream.get_next_action(session_id, machine_id)
assert result["action"]["action_id"] == new_action["action_id"]
assert new_action["action_id"] in api_stream._retry_pending
assert old_action["action_id"] in api_stream._retry_pending
@pytest.mark.asyncio
async def test_inflight_filter_isolates_machines(isolated_replay_state):
api_stream = isolated_replay_state
replay_id = "replay_machine_isolation"
session_id = "sess_machine_isolation"
pending_action = _click_action("act_other_machine_inflight")
action = _click_action("act_this_machine_dispatch")
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, "pc-2", [action]
)
api_stream._replay_queues[session_id] = [dict(action)]
api_stream._retry_pending[pending_action["action_id"]] = _pending_entry(
pending_action,
replay_id,
session_id,
"pc-1",
dispatched_at=time.time(),
)
result = await api_stream.get_next_action(session_id, "pc-2")
assert result["action"]["action_id"] == action["action_id"]
assert action["action_id"] in api_stream._retry_pending
assert pending_action["action_id"] in api_stream._retry_pending
@pytest.mark.parametrize(
"use_machine_target",
[True, False],
ids=["machine_replay_target", "lookup_other_session"],
)
@pytest.mark.asyncio
async def test_get_next_action_blocks_reciblage_with_old_pending_session(
isolated_replay_state,
monkeypatch,
use_machine_target,
):
api_stream = isolated_replay_state
replay_id = "replay_lookup_inflight"
original_session_id = "sess_lookup_original"
polling_session_id = "sess_lookup_polling"
machine_id = "pc-lookup"
a1 = _click_action("act_lookup_inflight_a1")
a2 = _click_action("act_lookup_inflight_a2")
dispatched_at = time.time()
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, original_session_id, machine_id, [a1, a2]
)
api_stream._replay_queues[original_session_id] = [dict(a2)]
if use_machine_target:
api_stream._machine_replay_target[machine_id] = original_session_id
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
a1,
replay_id,
original_session_id,
machine_id,
dispatched_at=dispatched_at,
)
result = await api_stream.get_next_action(polling_session_id, machine_id)
assert result["action"] is None
assert result["action_in_flight"] is True
assert result["in_flight_action_id"] == a1["action_id"]
assert result["replay_id"] == replay_id
assert api_stream._replay_queues[original_session_id] == [a2]
assert polling_session_id not in api_stream._replay_queues
assert api_stream._replay_states[replay_id]["session_id"] == original_session_id
assert (
api_stream._retry_pending[a1["action_id"]]["session_id"]
== original_session_id
)
assert (
api_stream._retry_pending[a1["action_id"]]["dispatched_at"]
== dispatched_at
)
if use_machine_target:
assert api_stream._machine_replay_target[machine_id] == original_session_id
report = api_stream.ReplayResultReport(
session_id=original_session_id,
action_id=a1["action_id"],
success=True,
)
recorded = await api_stream.report_action_result(report)
assert recorded["status"] == "recorded"
assert a1["action_id"] not in api_stream._retry_pending
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
assert api_stream._replay_queues[original_session_id] == [a2]
# -----------------------------------------------------------------------------
# WP4 suite — tests 7, 3, 2 (cf. inbox_codex/2026-05-25_0940_..._WP4-suite-3-
# tests-restants.md ; arbitrage GO inbox_claude/2026-05-25_1018_...).
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_concurrent_dispatch_and_result_no_double_increment(
isolated_replay_state,
monkeypatch,
):
"""WP4 test 7 — report + poll concurrents : a1 ne doit jamais être
re-dispatché, et completed_actions ne doit jamais dépasser 1.
Deux ordres d'exécution valides (sérialisés par _async_replay_lock) :
- cas A : poll exécuté AVANT le pop _retry_pending → action_in_flight=True
- cas B : poll exécuté APRÈS le pop → reçoit a2
Sert de canari anti-régression : si quelqu'un déplace le pop hors lock
ou change la sérialisation, ce test attrape la dérive.
"""
import asyncio
api_stream = isolated_replay_state
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
replay_id = "replay_concurrent_dispatch"
session_id = "sess_concurrent_dispatch"
machine_id = "pc-concurrent"
a1 = _click_action("act_concurrent_a1")
a2 = _click_action("act_concurrent_a2")
dispatched_at = time.time()
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [a1, a2]
)
# a1 déjà en vol : queue contient seulement a2
api_stream._replay_queues[session_id] = [dict(a2)]
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
a1, replay_id, session_id, machine_id, dispatched_at
)
report = api_stream.ReplayResultReport(
session_id=session_id,
action_id=a1["action_id"],
success=True,
)
result_report, result_poll = await asyncio.gather(
api_stream.report_action_result(report),
api_stream.get_next_action(session_id, machine_id),
)
# Report accepté, a1 retiré de _retry_pending
assert result_report["status"] == "recorded"
assert a1["action_id"] not in api_stream._retry_pending
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
# Poll : 2 branches valides, jamais re-dispatch de a1
poll_action = result_poll.get("action")
if poll_action is not None:
# cas B : poll après pop → a reçu a2
assert poll_action["action_id"] == a2["action_id"], (
f"Re-dispatch interdit de a1 : {poll_action}"
)
else:
# cas A : poll avant pop → action_in_flight
assert result_poll.get("action_in_flight") is True
assert result_poll.get("in_flight_action_id") == a1["action_id"]
# a1 ne doit JAMAIS apparaître deux fois dans completed
assert api_stream._replay_states[replay_id]["completed_actions"] <= 1
@pytest.mark.asyncio
async def test_late_report_after_watchdog_repush_documents_dedup_contract(
isolated_replay_state,
monkeypatch,
):
"""WP4 test 3 — late report après repush watchdog.
ATTENTION : couvre une LIMITATION explicite. L'agent Windows DOIT
déduplique par action_id pour éviter une double-exécution après repush
watchdog suivi d'un late report. Voir audit WP-C Q7 et arbitrage Codex :
Option A (hardening serveur) reportée en workpack ultérieur.
Séquence couverte :
T0 : dispatch initial de a1
T0+45 : watchdog repush a1 (dispatched_at = 0.0, resent_count = 1)
T0+46 : poll re-dispatche la copie (0.0 = sentinelle resume/retry)
T0+47 : report tardif arrive — accepté sans flag duplicate
"""
api_stream = isolated_replay_state
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
replay_id = "replay_late_report_dup"
session_id = "sess_late_report_dup"
machine_id = "pc-late-report"
a1 = _click_action("act_late_report_dup")
a2 = _click_action("act_late_report_next")
now = time.time()
# Setup : copie repushée par watchdog (dispatched_at = 0.0, resent_count = 1)
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [a1, a2]
)
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
api_stream._retry_pending[a1["action_id"]] = {
"action": dict(a1),
"dispatched_action": dict(a1),
"retry_count": 0,
"replay_id": replay_id,
"session_id": session_id,
"machine_id": machine_id,
"dispatched_at": 0.0, # sentinelle repush
"first_dispatched_at": now - 45.0,
"resent_count": 1, # watchdog a repush 1x
"last_resent_at": now - 0.5,
}
# 1) Le poll re-dispatche la copie (sentinelle 0.0 autorise)
next_resp = await api_stream.get_next_action(session_id, machine_id)
assert next_resp["action"]["action_id"] == a1["action_id"]
assert api_stream._retry_pending[a1["action_id"]]["dispatched_at"] > 0
# resent_count préservé : signal exploitable côté agent pour dedup local
assert api_stream._retry_pending[a1["action_id"]]["resent_count"] == 1
# 2) Le report tardif (original) arrive — accepté sans signal duplicate
report = api_stream.ReplayResultReport(
session_id=session_id,
action_id=a1["action_id"],
success=True,
)
result = await api_stream.report_action_result(report)
# Contrat actuel : serveur accepte sans détecter la duplication.
# L'agent Windows doit déduplique via cache local action_id sinon
# double-exécution silencieuse possible. À durcir en workpack futur
# (Option A : status="duplicate_late_report" côté report_action_result).
assert result["status"] == "recorded"
assert a1["action_id"] not in api_stream._retry_pending
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
# a2 reste en queue pour le poll suivant
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
a2["action_id"]
]
@pytest.mark.asyncio
@pytest.mark.xfail(
strict=False,
reason=(
"Race fenêtre release/re-acquire api_stream.py entre _replay_lock."
"release() et le re-acquire pour écrire _retry_pending. Documentée "
"comme 'race condition bénigne' dans le code. Voir WP-C Q1 + revue "
"D1 finding #1 : pré-inscrire _retry_pending avant release du lock "
"(solution B au backlog). strict=False car l'ordonnancement asyncio "
"peut faire passer le test fortuitement."
),
)
async def test_get_next_action_two_concurrent_polls(
isolated_replay_state, monkeypatch
):
"""WP4 test 2 — deux polls concurrents sur même (session, machine).
Variante déterministe : monkeypatch sync `_pre_check_screen_state` qui
sleep dans le thread executor (l'event loop reste libre, le 2e poll
peut acquérir le lock pendant la fenêtre release/re-acquire).
"""
import asyncio
api_stream = isolated_replay_state
replay_id = "replay_two_polls"
session_id = "sess_two_polls"
machine_id = "pc-two-polls"
# from_node + heartbeat frais => le pre-check est invoqué -> fenêtre observable
action = _click_action("act_two_polls")
action["from_node"] = "node_two_polls"
api_stream._replay_states[replay_id] = _running_replay_state(
replay_id, session_id, machine_id, [action]
)
api_stream._replay_queues[session_id] = [dict(action)]
api_stream._last_heartbeat[session_id] = {
"timestamp": time.time(),
"path": "/tmp/fake_heartbeat.png",
"detected_text": [],
"ui_elements": [],
"window_info": {},
"ocr_text": "",
}
def _slow_precheck_sync(*args, **kwargs):
# Force la fenêtre release/re-acquire. Sync car appelé via
# loop.run_in_executor : l'event loop reste libre, le 2e poll
# peut acquérir le lock pendant ce sleep.
time.sleep(0.05)
return None
monkeypatch.setattr(
api_stream, "_pre_check_screen_state", _slow_precheck_sync
)
results = await asyncio.gather(
api_stream.get_next_action(session_id, machine_id),
api_stream.get_next_action(session_id, machine_id),
return_exceptions=True,
)
dispatched = [r for r in results if isinstance(r, dict) and r.get("action")]
assert len(dispatched) == 1, (
f"Race détectée : {len(dispatched)} polls ont reçu l'action "
f"(attendu : 1). Results={results}"
)
assert len(api_stream._replay_queues.get(session_id, [])) == 0
assert len(api_stream._retry_pending) == 1