fix(replay): guard single in-flight dispatch
Add a private in-flight helper for replay dispatch, block machine retargeting while an action is still pending on the previous session, and warn on duplicate in-flight entries for the same replay triplet. Freeze the Notepad runtime dialog success path and add integration coverage for single in-flight dispatch, watchdog late-report documentation, and the known concurrent-poll race as an xfail.
This commit is contained in:
608
tests/integration/test_replay_single_inflight.py
Normal file
608
tests/integration/test_replay_single_inflight.py
Normal file
@@ -0,0 +1,608 @@
|
||||
"""Non-regression tests for replay single in-flight dispatch guards."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def isolated_replay_state(monkeypatch):
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token")
|
||||
|
||||
from agent_v0.server_v1 import api_stream
|
||||
|
||||
monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token")
|
||||
|
||||
if api_stream._replay_lock.locked():
|
||||
pytest.fail(
|
||||
"_replay_lock is already held at fixture setup — a previous test "
|
||||
"likely crashed without releasing it. Fail fast instead of waiting "
|
||||
"on the 4.5s acquire timeout in get_next_action()."
|
||||
)
|
||||
|
||||
saved_states = dict(api_stream._replay_states)
|
||||
saved_queues = dict(api_stream._replay_queues)
|
||||
saved_retry = dict(api_stream._retry_pending)
|
||||
saved_targets = dict(api_stream._machine_replay_target)
|
||||
saved_heartbeat = dict(api_stream._last_heartbeat)
|
||||
|
||||
api_stream._replay_states.clear()
|
||||
api_stream._replay_queues.clear()
|
||||
api_stream._retry_pending.clear()
|
||||
api_stream._machine_replay_target.clear()
|
||||
api_stream._last_heartbeat.clear()
|
||||
|
||||
yield api_stream
|
||||
|
||||
api_stream._replay_states.clear()
|
||||
api_stream._replay_states.update(saved_states)
|
||||
api_stream._replay_queues.clear()
|
||||
api_stream._replay_queues.update(saved_queues)
|
||||
api_stream._retry_pending.clear()
|
||||
api_stream._retry_pending.update(saved_retry)
|
||||
api_stream._machine_replay_target.clear()
|
||||
api_stream._machine_replay_target.update(saved_targets)
|
||||
api_stream._last_heartbeat.clear()
|
||||
api_stream._last_heartbeat.update(saved_heartbeat)
|
||||
|
||||
|
||||
def _running_replay_state(
|
||||
replay_id: str,
|
||||
session_id: str,
|
||||
machine_id: str,
|
||||
actions: list[dict],
|
||||
) -> dict:
|
||||
return {
|
||||
"replay_id": replay_id,
|
||||
"workflow_id": "session_replay:test",
|
||||
"session_id": session_id,
|
||||
"machine_id": machine_id,
|
||||
"status": "running",
|
||||
"total_actions": len(actions),
|
||||
"completed_actions": 0,
|
||||
"failed_actions": 0,
|
||||
"current_action_index": 0,
|
||||
"params": {},
|
||||
"results": [],
|
||||
"actions": list(actions),
|
||||
"retried_actions": 0,
|
||||
"unverified_actions": 0,
|
||||
"error_log": [],
|
||||
"last_screenshot": None,
|
||||
"failed_action": None,
|
||||
"pause_message": None,
|
||||
"variables": {},
|
||||
"safety_checks": [],
|
||||
"checks_acknowledged": [],
|
||||
"pause_reason": "",
|
||||
"pause_payload": None,
|
||||
}
|
||||
|
||||
|
||||
def _click_action(action_id: str) -> dict:
|
||||
return {
|
||||
"action_id": action_id,
|
||||
"type": "click",
|
||||
"visual_mode": True,
|
||||
"x_pct": 0.5,
|
||||
"y_pct": 0.5,
|
||||
"target_spec": {"by_text": "OK"},
|
||||
}
|
||||
|
||||
|
||||
def _pending_entry(
|
||||
action: dict,
|
||||
replay_id: str,
|
||||
session_id: str,
|
||||
machine_id: str,
|
||||
dispatched_at: float,
|
||||
) -> dict:
|
||||
return {
|
||||
"action": dict(action),
|
||||
"dispatched_action": dict(action),
|
||||
"retry_count": 0,
|
||||
"replay_id": replay_id,
|
||||
"session_id": session_id,
|
||||
"machine_id": machine_id,
|
||||
"dispatched_at": dispatched_at,
|
||||
"first_dispatched_at": dispatched_at or 0.0,
|
||||
"resent_count": 0,
|
||||
"last_resent_at": 0.0,
|
||||
}
|
||||
|
||||
|
||||
class _NoopReplayLearner:
|
||||
def record_from_replay_result(self, **kwargs):
|
||||
return None
|
||||
|
||||
|
||||
class _NoopAuditTrail:
|
||||
def record(self, entry):
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_real_dispatch_then_next_poll_blocks_with_action_in_flight(
|
||||
isolated_replay_state,
|
||||
):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_real_dispatch_blocks"
|
||||
session_id = "sess_real_dispatch_blocks"
|
||||
machine_id = "pc-real-dispatch"
|
||||
a1 = _click_action("act_real_dispatch_a1")
|
||||
a2 = _click_action("act_real_dispatch_a2")
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [a1, a2]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
||||
|
||||
first = await api_stream.get_next_action(session_id, machine_id)
|
||||
blocked = await api_stream.get_next_action(session_id, machine_id)
|
||||
blocked_again = await api_stream.get_next_action(session_id, machine_id)
|
||||
|
||||
assert first["action"]["action_id"] == a1["action_id"]
|
||||
assert blocked["action"] is None
|
||||
assert blocked["action_in_flight"] is True
|
||||
assert blocked["in_flight_action_id"] == a1["action_id"]
|
||||
assert blocked["replay_id"] == replay_id
|
||||
assert blocked_again["action"] is None
|
||||
assert blocked_again["action_in_flight"] is True
|
||||
assert blocked_again["in_flight_action_id"] == a1["action_id"]
|
||||
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
|
||||
a2["action_id"]
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_success_report_clears_inflight_and_allows_next_dispatch(
|
||||
isolated_replay_state,
|
||||
monkeypatch,
|
||||
):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_success_allows_next"
|
||||
session_id = "sess_success_allows_next"
|
||||
machine_id = "pc-success-next"
|
||||
a1 = _click_action("act_success_next_a1")
|
||||
a2 = _click_action("act_success_next_a2")
|
||||
|
||||
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
||||
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [a1, a2]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
||||
|
||||
first = await api_stream.get_next_action(session_id, machine_id)
|
||||
assert first["action"]["action_id"] == a1["action_id"]
|
||||
|
||||
report = api_stream.ReplayResultReport(
|
||||
session_id=session_id,
|
||||
action_id=a1["action_id"],
|
||||
success=True,
|
||||
)
|
||||
recorded = await api_stream.report_action_result(report)
|
||||
second = await api_stream.get_next_action(session_id, machine_id)
|
||||
|
||||
assert recorded["status"] == "recorded"
|
||||
assert a1["action_id"] not in api_stream._retry_pending
|
||||
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
||||
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
|
||||
assert second["action"]["action_id"] == a2["action_id"]
|
||||
assert api_stream._replay_queues[session_id] == []
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_next_action_blocks_when_inflight(isolated_replay_state):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_single_inflight"
|
||||
session_id = "sess_single_inflight"
|
||||
machine_id = "pc-single"
|
||||
action = _click_action("act_single_inflight")
|
||||
dispatched_at = time.time()
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [action]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(action)]
|
||||
api_stream._retry_pending[action["action_id"]] = _pending_entry(
|
||||
action, replay_id, session_id, machine_id, dispatched_at
|
||||
)
|
||||
|
||||
result = await api_stream.get_next_action(session_id, machine_id)
|
||||
|
||||
assert result["action"] is None
|
||||
assert result["action_in_flight"] is True
|
||||
assert result["in_flight_action_id"] == action["action_id"]
|
||||
assert result["replay_id"] == replay_id
|
||||
assert api_stream._replay_queues[session_id] == [action]
|
||||
assert (
|
||||
api_stream._retry_pending[action["action_id"]]["dispatched_at"]
|
||||
== dispatched_at
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_next_action_allows_resume_with_dispatched_at_zero(
|
||||
isolated_replay_state,
|
||||
):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_resume_zero"
|
||||
session_id = "sess_resume_zero"
|
||||
machine_id = "pc-resume"
|
||||
action = _click_action("act_resume_zero")
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [action]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(action)]
|
||||
api_stream._retry_pending[action["action_id"]] = _pending_entry(
|
||||
action, replay_id, session_id, machine_id, dispatched_at=0.0
|
||||
)
|
||||
|
||||
result = await api_stream.get_next_action(session_id, machine_id)
|
||||
|
||||
assert result["action"]["action_id"] == action["action_id"]
|
||||
assert api_stream._replay_queues[session_id] == []
|
||||
retry_info = api_stream._retry_pending[action["action_id"]]
|
||||
assert retry_info["dispatched_at"] > 0
|
||||
assert retry_info["first_dispatched_at"] > 0
|
||||
assert retry_info["session_id"] == session_id
|
||||
assert retry_info["machine_id"] == machine_id
|
||||
assert retry_info["replay_id"] == replay_id
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inflight_filter_isolates_replays(isolated_replay_state):
|
||||
api_stream = isolated_replay_state
|
||||
session_id = "sess_replay_isolation"
|
||||
machine_id = "pc-isolation"
|
||||
old_action = _click_action("act_old_inflight")
|
||||
new_action = _click_action("act_new_dispatch")
|
||||
|
||||
api_stream._replay_states["replay_new"] = _running_replay_state(
|
||||
"replay_new", session_id, machine_id, [new_action]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(new_action)]
|
||||
api_stream._retry_pending[old_action["action_id"]] = _pending_entry(
|
||||
old_action,
|
||||
"replay_old",
|
||||
session_id,
|
||||
machine_id,
|
||||
dispatched_at=time.time(),
|
||||
)
|
||||
|
||||
result = await api_stream.get_next_action(session_id, machine_id)
|
||||
|
||||
assert result["action"]["action_id"] == new_action["action_id"]
|
||||
assert new_action["action_id"] in api_stream._retry_pending
|
||||
assert old_action["action_id"] in api_stream._retry_pending
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inflight_filter_isolates_machines(isolated_replay_state):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_machine_isolation"
|
||||
session_id = "sess_machine_isolation"
|
||||
pending_action = _click_action("act_other_machine_inflight")
|
||||
action = _click_action("act_this_machine_dispatch")
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, "pc-2", [action]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(action)]
|
||||
api_stream._retry_pending[pending_action["action_id"]] = _pending_entry(
|
||||
pending_action,
|
||||
replay_id,
|
||||
session_id,
|
||||
"pc-1",
|
||||
dispatched_at=time.time(),
|
||||
)
|
||||
|
||||
result = await api_stream.get_next_action(session_id, "pc-2")
|
||||
|
||||
assert result["action"]["action_id"] == action["action_id"]
|
||||
assert action["action_id"] in api_stream._retry_pending
|
||||
assert pending_action["action_id"] in api_stream._retry_pending
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"use_machine_target",
|
||||
[True, False],
|
||||
ids=["machine_replay_target", "lookup_other_session"],
|
||||
)
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_next_action_blocks_reciblage_with_old_pending_session(
|
||||
isolated_replay_state,
|
||||
monkeypatch,
|
||||
use_machine_target,
|
||||
):
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_lookup_inflight"
|
||||
original_session_id = "sess_lookup_original"
|
||||
polling_session_id = "sess_lookup_polling"
|
||||
machine_id = "pc-lookup"
|
||||
a1 = _click_action("act_lookup_inflight_a1")
|
||||
a2 = _click_action("act_lookup_inflight_a2")
|
||||
dispatched_at = time.time()
|
||||
|
||||
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
||||
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, original_session_id, machine_id, [a1, a2]
|
||||
)
|
||||
api_stream._replay_queues[original_session_id] = [dict(a2)]
|
||||
if use_machine_target:
|
||||
api_stream._machine_replay_target[machine_id] = original_session_id
|
||||
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
|
||||
a1,
|
||||
replay_id,
|
||||
original_session_id,
|
||||
machine_id,
|
||||
dispatched_at=dispatched_at,
|
||||
)
|
||||
|
||||
result = await api_stream.get_next_action(polling_session_id, machine_id)
|
||||
|
||||
assert result["action"] is None
|
||||
assert result["action_in_flight"] is True
|
||||
assert result["in_flight_action_id"] == a1["action_id"]
|
||||
assert result["replay_id"] == replay_id
|
||||
assert api_stream._replay_queues[original_session_id] == [a2]
|
||||
assert polling_session_id not in api_stream._replay_queues
|
||||
assert api_stream._replay_states[replay_id]["session_id"] == original_session_id
|
||||
assert (
|
||||
api_stream._retry_pending[a1["action_id"]]["session_id"]
|
||||
== original_session_id
|
||||
)
|
||||
assert (
|
||||
api_stream._retry_pending[a1["action_id"]]["dispatched_at"]
|
||||
== dispatched_at
|
||||
)
|
||||
if use_machine_target:
|
||||
assert api_stream._machine_replay_target[machine_id] == original_session_id
|
||||
|
||||
report = api_stream.ReplayResultReport(
|
||||
session_id=original_session_id,
|
||||
action_id=a1["action_id"],
|
||||
success=True,
|
||||
)
|
||||
recorded = await api_stream.report_action_result(report)
|
||||
|
||||
assert recorded["status"] == "recorded"
|
||||
assert a1["action_id"] not in api_stream._retry_pending
|
||||
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
||||
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
|
||||
assert api_stream._replay_queues[original_session_id] == [a2]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# WP4 suite — tests 7, 3, 2 (cf. inbox_codex/2026-05-25_0940_..._WP4-suite-3-
|
||||
# tests-restants.md ; arbitrage GO inbox_claude/2026-05-25_1018_...).
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_concurrent_dispatch_and_result_no_double_increment(
|
||||
isolated_replay_state,
|
||||
monkeypatch,
|
||||
):
|
||||
"""WP4 test 7 — report + poll concurrents : a1 ne doit jamais être
|
||||
re-dispatché, et completed_actions ne doit jamais dépasser 1.
|
||||
|
||||
Deux ordres d'exécution valides (sérialisés par _async_replay_lock) :
|
||||
- cas A : poll exécuté AVANT le pop _retry_pending → action_in_flight=True
|
||||
- cas B : poll exécuté APRÈS le pop → reçoit a2
|
||||
|
||||
Sert de canari anti-régression : si quelqu'un déplace le pop hors lock
|
||||
ou change la sérialisation, ce test attrape la dérive.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
api_stream = isolated_replay_state
|
||||
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
||||
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
||||
|
||||
replay_id = "replay_concurrent_dispatch"
|
||||
session_id = "sess_concurrent_dispatch"
|
||||
machine_id = "pc-concurrent"
|
||||
a1 = _click_action("act_concurrent_a1")
|
||||
a2 = _click_action("act_concurrent_a2")
|
||||
dispatched_at = time.time()
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [a1, a2]
|
||||
)
|
||||
# a1 déjà en vol : queue contient seulement a2
|
||||
api_stream._replay_queues[session_id] = [dict(a2)]
|
||||
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
|
||||
a1, replay_id, session_id, machine_id, dispatched_at
|
||||
)
|
||||
|
||||
report = api_stream.ReplayResultReport(
|
||||
session_id=session_id,
|
||||
action_id=a1["action_id"],
|
||||
success=True,
|
||||
)
|
||||
|
||||
result_report, result_poll = await asyncio.gather(
|
||||
api_stream.report_action_result(report),
|
||||
api_stream.get_next_action(session_id, machine_id),
|
||||
)
|
||||
|
||||
# Report accepté, a1 retiré de _retry_pending
|
||||
assert result_report["status"] == "recorded"
|
||||
assert a1["action_id"] not in api_stream._retry_pending
|
||||
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
||||
|
||||
# Poll : 2 branches valides, jamais re-dispatch de a1
|
||||
poll_action = result_poll.get("action")
|
||||
if poll_action is not None:
|
||||
# cas B : poll après pop → a reçu a2
|
||||
assert poll_action["action_id"] == a2["action_id"], (
|
||||
f"Re-dispatch interdit de a1 : {poll_action}"
|
||||
)
|
||||
else:
|
||||
# cas A : poll avant pop → action_in_flight
|
||||
assert result_poll.get("action_in_flight") is True
|
||||
assert result_poll.get("in_flight_action_id") == a1["action_id"]
|
||||
|
||||
# a1 ne doit JAMAIS apparaître deux fois dans completed
|
||||
assert api_stream._replay_states[replay_id]["completed_actions"] <= 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_late_report_after_watchdog_repush_documents_dedup_contract(
|
||||
isolated_replay_state,
|
||||
monkeypatch,
|
||||
):
|
||||
"""WP4 test 3 — late report après repush watchdog.
|
||||
|
||||
ATTENTION : couvre une LIMITATION explicite. L'agent Windows DOIT
|
||||
déduplique par action_id pour éviter une double-exécution après repush
|
||||
watchdog suivi d'un late report. Voir audit WP-C Q7 et arbitrage Codex :
|
||||
Option A (hardening serveur) reportée en workpack ultérieur.
|
||||
|
||||
Séquence couverte :
|
||||
T0 : dispatch initial de a1
|
||||
T0+45 : watchdog repush a1 (dispatched_at = 0.0, resent_count = 1)
|
||||
T0+46 : poll re-dispatche la copie (0.0 = sentinelle resume/retry)
|
||||
T0+47 : report tardif arrive — accepté sans flag duplicate
|
||||
"""
|
||||
api_stream = isolated_replay_state
|
||||
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
||||
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
||||
|
||||
replay_id = "replay_late_report_dup"
|
||||
session_id = "sess_late_report_dup"
|
||||
machine_id = "pc-late-report"
|
||||
a1 = _click_action("act_late_report_dup")
|
||||
a2 = _click_action("act_late_report_next")
|
||||
now = time.time()
|
||||
|
||||
# Setup : copie repushée par watchdog (dispatched_at = 0.0, resent_count = 1)
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [a1, a2]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
||||
api_stream._retry_pending[a1["action_id"]] = {
|
||||
"action": dict(a1),
|
||||
"dispatched_action": dict(a1),
|
||||
"retry_count": 0,
|
||||
"replay_id": replay_id,
|
||||
"session_id": session_id,
|
||||
"machine_id": machine_id,
|
||||
"dispatched_at": 0.0, # sentinelle repush
|
||||
"first_dispatched_at": now - 45.0,
|
||||
"resent_count": 1, # watchdog a repush 1x
|
||||
"last_resent_at": now - 0.5,
|
||||
}
|
||||
|
||||
# 1) Le poll re-dispatche la copie (sentinelle 0.0 autorise)
|
||||
next_resp = await api_stream.get_next_action(session_id, machine_id)
|
||||
assert next_resp["action"]["action_id"] == a1["action_id"]
|
||||
assert api_stream._retry_pending[a1["action_id"]]["dispatched_at"] > 0
|
||||
# resent_count préservé : signal exploitable côté agent pour dedup local
|
||||
assert api_stream._retry_pending[a1["action_id"]]["resent_count"] == 1
|
||||
|
||||
# 2) Le report tardif (original) arrive — accepté sans signal duplicate
|
||||
report = api_stream.ReplayResultReport(
|
||||
session_id=session_id,
|
||||
action_id=a1["action_id"],
|
||||
success=True,
|
||||
)
|
||||
result = await api_stream.report_action_result(report)
|
||||
|
||||
# Contrat actuel : serveur accepte sans détecter la duplication.
|
||||
# L'agent Windows doit déduplique via cache local action_id sinon
|
||||
# double-exécution silencieuse possible. À durcir en workpack futur
|
||||
# (Option A : status="duplicate_late_report" côté report_action_result).
|
||||
assert result["status"] == "recorded"
|
||||
assert a1["action_id"] not in api_stream._retry_pending
|
||||
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
||||
# a2 reste en queue pour le poll suivant
|
||||
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
|
||||
a2["action_id"]
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.xfail(
|
||||
strict=False,
|
||||
reason=(
|
||||
"Race fenêtre release/re-acquire api_stream.py entre _replay_lock."
|
||||
"release() et le re-acquire pour écrire _retry_pending. Documentée "
|
||||
"comme 'race condition bénigne' dans le code. Voir WP-C Q1 + revue "
|
||||
"D1 finding #1 : pré-inscrire _retry_pending avant release du lock "
|
||||
"(solution B au backlog). strict=False car l'ordonnancement asyncio "
|
||||
"peut faire passer le test fortuitement."
|
||||
),
|
||||
)
|
||||
async def test_get_next_action_two_concurrent_polls(
|
||||
isolated_replay_state, monkeypatch
|
||||
):
|
||||
"""WP4 test 2 — deux polls concurrents sur même (session, machine).
|
||||
|
||||
Variante déterministe : monkeypatch sync `_pre_check_screen_state` qui
|
||||
sleep dans le thread executor (l'event loop reste libre, le 2e poll
|
||||
peut acquérir le lock pendant la fenêtre release/re-acquire).
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
api_stream = isolated_replay_state
|
||||
replay_id = "replay_two_polls"
|
||||
session_id = "sess_two_polls"
|
||||
machine_id = "pc-two-polls"
|
||||
|
||||
# from_node + heartbeat frais => le pre-check est invoqué -> fenêtre observable
|
||||
action = _click_action("act_two_polls")
|
||||
action["from_node"] = "node_two_polls"
|
||||
|
||||
api_stream._replay_states[replay_id] = _running_replay_state(
|
||||
replay_id, session_id, machine_id, [action]
|
||||
)
|
||||
api_stream._replay_queues[session_id] = [dict(action)]
|
||||
api_stream._last_heartbeat[session_id] = {
|
||||
"timestamp": time.time(),
|
||||
"path": "/tmp/fake_heartbeat.png",
|
||||
"detected_text": [],
|
||||
"ui_elements": [],
|
||||
"window_info": {},
|
||||
"ocr_text": "",
|
||||
}
|
||||
|
||||
def _slow_precheck_sync(*args, **kwargs):
|
||||
# Force la fenêtre release/re-acquire. Sync car appelé via
|
||||
# loop.run_in_executor : l'event loop reste libre, le 2e poll
|
||||
# peut acquérir le lock pendant ce sleep.
|
||||
time.sleep(0.05)
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(
|
||||
api_stream, "_pre_check_screen_state", _slow_precheck_sync
|
||||
)
|
||||
|
||||
results = await asyncio.gather(
|
||||
api_stream.get_next_action(session_id, machine_id),
|
||||
api_stream.get_next_action(session_id, machine_id),
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
dispatched = [r for r in results if isinstance(r, dict) and r.get("action")]
|
||||
assert len(dispatched) == 1, (
|
||||
f"Race détectée : {len(dispatched)} polls ont reçu l'action "
|
||||
f"(attendu : 1). Results={results}"
|
||||
)
|
||||
assert len(api_stream._replay_queues.get(session_id, [])) == 0
|
||||
assert len(api_stream._retry_pending) == 1
|
||||
Reference in New Issue
Block a user