617 lines
22 KiB
Python
617 lines
22 KiB
Python
"""Non-regression tests for replay single in-flight dispatch guards."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[2]
|
|
if str(ROOT) not in sys.path:
|
|
sys.path.insert(0, str(ROOT))
|
|
|
|
|
|
@pytest.fixture
|
|
def isolated_replay_state(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token")
|
|
|
|
from agent_v0.server_v1 import api_stream
|
|
from agent_v0.server_v1.agent_registry import AgentRegistry
|
|
|
|
monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token")
|
|
|
|
# Isoler le registre pour que _agent_registry_has_entries() retourne False
|
|
# (mode dev, aucun agent enrolle) — sinon le garde fleet bloque les tests
|
|
original_registry = api_stream.agent_registry
|
|
empty_registry = AgentRegistry(db_path=str(tmp_path / "empty_agents.db"))
|
|
monkeypatch.setattr(api_stream, "agent_registry", empty_registry)
|
|
|
|
if api_stream._replay_lock.locked():
|
|
pytest.fail(
|
|
"_replay_lock is already held at fixture setup — a previous test "
|
|
"likely crashed without releasing it. Fail fast instead of waiting "
|
|
"on the 4.5s acquire timeout in get_next_action()."
|
|
)
|
|
|
|
saved_states = dict(api_stream._replay_states)
|
|
saved_queues = dict(api_stream._replay_queues)
|
|
saved_retry = dict(api_stream._retry_pending)
|
|
saved_targets = dict(api_stream._machine_replay_target)
|
|
saved_heartbeat = dict(api_stream._last_heartbeat)
|
|
|
|
api_stream._replay_states.clear()
|
|
api_stream._replay_queues.clear()
|
|
api_stream._retry_pending.clear()
|
|
api_stream._machine_replay_target.clear()
|
|
api_stream._last_heartbeat.clear()
|
|
|
|
yield api_stream
|
|
|
|
api_stream._replay_states.clear()
|
|
api_stream._replay_states.update(saved_states)
|
|
api_stream._replay_queues.clear()
|
|
api_stream._replay_queues.update(saved_queues)
|
|
api_stream._retry_pending.clear()
|
|
api_stream._retry_pending.update(saved_retry)
|
|
api_stream._machine_replay_target.clear()
|
|
api_stream._machine_replay_target.update(saved_targets)
|
|
api_stream._last_heartbeat.clear()
|
|
api_stream._last_heartbeat.update(saved_heartbeat)
|
|
monkeypatch.setattr(api_stream, "agent_registry", original_registry)
|
|
|
|
|
|
def _running_replay_state(
|
|
replay_id: str,
|
|
session_id: str,
|
|
machine_id: str,
|
|
actions: list[dict],
|
|
) -> dict:
|
|
return {
|
|
"replay_id": replay_id,
|
|
"workflow_id": "session_replay:test",
|
|
"session_id": session_id,
|
|
"machine_id": machine_id,
|
|
"status": "running",
|
|
"total_actions": len(actions),
|
|
"completed_actions": 0,
|
|
"failed_actions": 0,
|
|
"current_action_index": 0,
|
|
"params": {},
|
|
"results": [],
|
|
"actions": list(actions),
|
|
"retried_actions": 0,
|
|
"unverified_actions": 0,
|
|
"error_log": [],
|
|
"last_screenshot": None,
|
|
"failed_action": None,
|
|
"pause_message": None,
|
|
"variables": {},
|
|
"safety_checks": [],
|
|
"checks_acknowledged": [],
|
|
"pause_reason": "",
|
|
"pause_payload": None,
|
|
}
|
|
|
|
|
|
def _click_action(action_id: str) -> dict:
|
|
return {
|
|
"action_id": action_id,
|
|
"type": "click",
|
|
"visual_mode": True,
|
|
"x_pct": 0.5,
|
|
"y_pct": 0.5,
|
|
"target_spec": {"by_text": "OK"},
|
|
}
|
|
|
|
|
|
def _pending_entry(
|
|
action: dict,
|
|
replay_id: str,
|
|
session_id: str,
|
|
machine_id: str,
|
|
dispatched_at: float,
|
|
) -> dict:
|
|
return {
|
|
"action": dict(action),
|
|
"dispatched_action": dict(action),
|
|
"retry_count": 0,
|
|
"replay_id": replay_id,
|
|
"session_id": session_id,
|
|
"machine_id": machine_id,
|
|
"dispatched_at": dispatched_at,
|
|
"first_dispatched_at": dispatched_at or 0.0,
|
|
"resent_count": 0,
|
|
"last_resent_at": 0.0,
|
|
}
|
|
|
|
|
|
class _NoopReplayLearner:
|
|
def record_from_replay_result(self, **kwargs):
|
|
return None
|
|
|
|
|
|
class _NoopAuditTrail:
|
|
def record(self, entry):
|
|
return None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_real_dispatch_then_next_poll_blocks_with_action_in_flight(
|
|
isolated_replay_state,
|
|
):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_real_dispatch_blocks"
|
|
session_id = "sess_real_dispatch_blocks"
|
|
machine_id = "pc-real-dispatch"
|
|
a1 = _click_action("act_real_dispatch_a1")
|
|
a2 = _click_action("act_real_dispatch_a2")
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [a1, a2]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
|
|
|
first = await api_stream.get_next_action(session_id, machine_id)
|
|
blocked = await api_stream.get_next_action(session_id, machine_id)
|
|
blocked_again = await api_stream.get_next_action(session_id, machine_id)
|
|
|
|
assert first["action"]["action_id"] == a1["action_id"]
|
|
assert blocked["action"] is None
|
|
assert blocked["action_in_flight"] is True
|
|
assert blocked["in_flight_action_id"] == a1["action_id"]
|
|
assert blocked["replay_id"] == replay_id
|
|
assert blocked_again["action"] is None
|
|
assert blocked_again["action_in_flight"] is True
|
|
assert blocked_again["in_flight_action_id"] == a1["action_id"]
|
|
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
|
|
a2["action_id"]
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_success_report_clears_inflight_and_allows_next_dispatch(
|
|
isolated_replay_state,
|
|
monkeypatch,
|
|
):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_success_allows_next"
|
|
session_id = "sess_success_allows_next"
|
|
machine_id = "pc-success-next"
|
|
a1 = _click_action("act_success_next_a1")
|
|
a2 = _click_action("act_success_next_a2")
|
|
|
|
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
|
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [a1, a2]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
|
|
|
first = await api_stream.get_next_action(session_id, machine_id)
|
|
assert first["action"]["action_id"] == a1["action_id"]
|
|
|
|
report = api_stream.ReplayResultReport(
|
|
session_id=session_id,
|
|
action_id=a1["action_id"],
|
|
success=True,
|
|
)
|
|
recorded = await api_stream.report_action_result(report)
|
|
second = await api_stream.get_next_action(session_id, machine_id)
|
|
|
|
assert recorded["status"] == "recorded"
|
|
assert a1["action_id"] not in api_stream._retry_pending
|
|
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
|
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
|
|
assert second["action"]["action_id"] == a2["action_id"]
|
|
assert api_stream._replay_queues[session_id] == []
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_next_action_blocks_when_inflight(isolated_replay_state):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_single_inflight"
|
|
session_id = "sess_single_inflight"
|
|
machine_id = "pc-single"
|
|
action = _click_action("act_single_inflight")
|
|
dispatched_at = time.time()
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [action]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(action)]
|
|
api_stream._retry_pending[action["action_id"]] = _pending_entry(
|
|
action, replay_id, session_id, machine_id, dispatched_at
|
|
)
|
|
|
|
result = await api_stream.get_next_action(session_id, machine_id)
|
|
|
|
assert result["action"] is None
|
|
assert result["action_in_flight"] is True
|
|
assert result["in_flight_action_id"] == action["action_id"]
|
|
assert result["replay_id"] == replay_id
|
|
assert api_stream._replay_queues[session_id] == [action]
|
|
assert (
|
|
api_stream._retry_pending[action["action_id"]]["dispatched_at"]
|
|
== dispatched_at
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_next_action_allows_resume_with_dispatched_at_zero(
|
|
isolated_replay_state,
|
|
):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_resume_zero"
|
|
session_id = "sess_resume_zero"
|
|
machine_id = "pc-resume"
|
|
action = _click_action("act_resume_zero")
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [action]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(action)]
|
|
api_stream._retry_pending[action["action_id"]] = _pending_entry(
|
|
action, replay_id, session_id, machine_id, dispatched_at=0.0
|
|
)
|
|
|
|
result = await api_stream.get_next_action(session_id, machine_id)
|
|
|
|
assert result["action"]["action_id"] == action["action_id"]
|
|
assert api_stream._replay_queues[session_id] == []
|
|
retry_info = api_stream._retry_pending[action["action_id"]]
|
|
assert retry_info["dispatched_at"] > 0
|
|
assert retry_info["first_dispatched_at"] > 0
|
|
assert retry_info["session_id"] == session_id
|
|
assert retry_info["machine_id"] == machine_id
|
|
assert retry_info["replay_id"] == replay_id
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inflight_filter_isolates_replays(isolated_replay_state):
|
|
api_stream = isolated_replay_state
|
|
session_id = "sess_replay_isolation"
|
|
machine_id = "pc-isolation"
|
|
old_action = _click_action("act_old_inflight")
|
|
new_action = _click_action("act_new_dispatch")
|
|
|
|
api_stream._replay_states["replay_new"] = _running_replay_state(
|
|
"replay_new", session_id, machine_id, [new_action]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(new_action)]
|
|
api_stream._retry_pending[old_action["action_id"]] = _pending_entry(
|
|
old_action,
|
|
"replay_old",
|
|
session_id,
|
|
machine_id,
|
|
dispatched_at=time.time(),
|
|
)
|
|
|
|
result = await api_stream.get_next_action(session_id, machine_id)
|
|
|
|
assert result["action"]["action_id"] == new_action["action_id"]
|
|
assert new_action["action_id"] in api_stream._retry_pending
|
|
assert old_action["action_id"] in api_stream._retry_pending
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_inflight_filter_isolates_machines(isolated_replay_state):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_machine_isolation"
|
|
session_id = "sess_machine_isolation"
|
|
pending_action = _click_action("act_other_machine_inflight")
|
|
action = _click_action("act_this_machine_dispatch")
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, "pc-2", [action]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(action)]
|
|
api_stream._retry_pending[pending_action["action_id"]] = _pending_entry(
|
|
pending_action,
|
|
replay_id,
|
|
session_id,
|
|
"pc-1",
|
|
dispatched_at=time.time(),
|
|
)
|
|
|
|
result = await api_stream.get_next_action(session_id, "pc-2")
|
|
|
|
assert result["action"]["action_id"] == action["action_id"]
|
|
assert action["action_id"] in api_stream._retry_pending
|
|
assert pending_action["action_id"] in api_stream._retry_pending
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"use_machine_target",
|
|
[True, False],
|
|
ids=["machine_replay_target", "lookup_other_session"],
|
|
)
|
|
@pytest.mark.asyncio
|
|
async def test_get_next_action_blocks_reciblage_with_old_pending_session(
|
|
isolated_replay_state,
|
|
monkeypatch,
|
|
use_machine_target,
|
|
):
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_lookup_inflight"
|
|
original_session_id = "sess_lookup_original"
|
|
polling_session_id = "sess_lookup_polling"
|
|
machine_id = "pc-lookup"
|
|
a1 = _click_action("act_lookup_inflight_a1")
|
|
a2 = _click_action("act_lookup_inflight_a2")
|
|
dispatched_at = time.time()
|
|
|
|
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
|
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, original_session_id, machine_id, [a1, a2]
|
|
)
|
|
api_stream._replay_queues[original_session_id] = [dict(a2)]
|
|
if use_machine_target:
|
|
api_stream._machine_replay_target[machine_id] = original_session_id
|
|
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
|
|
a1,
|
|
replay_id,
|
|
original_session_id,
|
|
machine_id,
|
|
dispatched_at=dispatched_at,
|
|
)
|
|
|
|
result = await api_stream.get_next_action(polling_session_id, machine_id)
|
|
|
|
assert result["action"] is None
|
|
assert result["action_in_flight"] is True
|
|
assert result["in_flight_action_id"] == a1["action_id"]
|
|
assert result["replay_id"] == replay_id
|
|
assert api_stream._replay_queues[original_session_id] == [a2]
|
|
assert polling_session_id not in api_stream._replay_queues
|
|
assert api_stream._replay_states[replay_id]["session_id"] == original_session_id
|
|
assert (
|
|
api_stream._retry_pending[a1["action_id"]]["session_id"]
|
|
== original_session_id
|
|
)
|
|
assert (
|
|
api_stream._retry_pending[a1["action_id"]]["dispatched_at"]
|
|
== dispatched_at
|
|
)
|
|
if use_machine_target:
|
|
assert api_stream._machine_replay_target[machine_id] == original_session_id
|
|
|
|
report = api_stream.ReplayResultReport(
|
|
session_id=original_session_id,
|
|
action_id=a1["action_id"],
|
|
success=True,
|
|
)
|
|
recorded = await api_stream.report_action_result(report)
|
|
|
|
assert recorded["status"] == "recorded"
|
|
assert a1["action_id"] not in api_stream._retry_pending
|
|
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
|
assert api_stream._replay_states[replay_id]["current_action_index"] == 1
|
|
assert api_stream._replay_queues[original_session_id] == [a2]
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# WP4 suite — tests 7, 3, 2 (cf. inbox_codex/2026-05-25_0940_..._WP4-suite-3-
|
|
# tests-restants.md ; arbitrage GO inbox_claude/2026-05-25_1018_...).
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_concurrent_dispatch_and_result_no_double_increment(
|
|
isolated_replay_state,
|
|
monkeypatch,
|
|
):
|
|
"""WP4 test 7 — report + poll concurrents : a1 ne doit jamais être
|
|
re-dispatché, et completed_actions ne doit jamais dépasser 1.
|
|
|
|
Deux ordres d'exécution valides (sérialisés par _async_replay_lock) :
|
|
- cas A : poll exécuté AVANT le pop _retry_pending → action_in_flight=True
|
|
- cas B : poll exécuté APRÈS le pop → reçoit a2
|
|
|
|
Sert de canari anti-régression : si quelqu'un déplace le pop hors lock
|
|
ou change la sérialisation, ce test attrape la dérive.
|
|
"""
|
|
import asyncio
|
|
|
|
api_stream = isolated_replay_state
|
|
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
|
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
|
|
|
replay_id = "replay_concurrent_dispatch"
|
|
session_id = "sess_concurrent_dispatch"
|
|
machine_id = "pc-concurrent"
|
|
a1 = _click_action("act_concurrent_a1")
|
|
a2 = _click_action("act_concurrent_a2")
|
|
dispatched_at = time.time()
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [a1, a2]
|
|
)
|
|
# a1 déjà en vol : queue contient seulement a2
|
|
api_stream._replay_queues[session_id] = [dict(a2)]
|
|
api_stream._retry_pending[a1["action_id"]] = _pending_entry(
|
|
a1, replay_id, session_id, machine_id, dispatched_at
|
|
)
|
|
|
|
report = api_stream.ReplayResultReport(
|
|
session_id=session_id,
|
|
action_id=a1["action_id"],
|
|
success=True,
|
|
)
|
|
|
|
result_report, result_poll = await asyncio.gather(
|
|
api_stream.report_action_result(report),
|
|
api_stream.get_next_action(session_id, machine_id),
|
|
)
|
|
|
|
# Report accepté, a1 retiré de _retry_pending
|
|
assert result_report["status"] == "recorded"
|
|
assert a1["action_id"] not in api_stream._retry_pending
|
|
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
|
|
|
# Poll : 2 branches valides, jamais re-dispatch de a1
|
|
poll_action = result_poll.get("action")
|
|
if poll_action is not None:
|
|
# cas B : poll après pop → a reçu a2
|
|
assert poll_action["action_id"] == a2["action_id"], (
|
|
f"Re-dispatch interdit de a1 : {poll_action}"
|
|
)
|
|
else:
|
|
# cas A : poll avant pop → action_in_flight
|
|
assert result_poll.get("action_in_flight") is True
|
|
assert result_poll.get("in_flight_action_id") == a1["action_id"]
|
|
|
|
# a1 ne doit JAMAIS apparaître deux fois dans completed
|
|
assert api_stream._replay_states[replay_id]["completed_actions"] <= 1
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_late_report_after_watchdog_repush_documents_dedup_contract(
|
|
isolated_replay_state,
|
|
monkeypatch,
|
|
):
|
|
"""WP4 test 3 — late report après repush watchdog.
|
|
|
|
ATTENTION : couvre une LIMITATION explicite. L'agent Windows DOIT
|
|
déduplique par action_id pour éviter une double-exécution après repush
|
|
watchdog suivi d'un late report. Voir audit WP-C Q7 et arbitrage Codex :
|
|
Option A (hardening serveur) reportée en workpack ultérieur.
|
|
|
|
Séquence couverte :
|
|
T0 : dispatch initial de a1
|
|
T0+45 : watchdog repush a1 (dispatched_at = 0.0, resent_count = 1)
|
|
T0+46 : poll re-dispatche la copie (0.0 = sentinelle resume/retry)
|
|
T0+47 : report tardif arrive — accepté sans flag duplicate
|
|
"""
|
|
api_stream = isolated_replay_state
|
|
monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner())
|
|
monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail())
|
|
|
|
replay_id = "replay_late_report_dup"
|
|
session_id = "sess_late_report_dup"
|
|
machine_id = "pc-late-report"
|
|
a1 = _click_action("act_late_report_dup")
|
|
a2 = _click_action("act_late_report_next")
|
|
now = time.time()
|
|
|
|
# Setup : copie repushée par watchdog (dispatched_at = 0.0, resent_count = 1)
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [a1, a2]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(a1), dict(a2)]
|
|
api_stream._retry_pending[a1["action_id"]] = {
|
|
"action": dict(a1),
|
|
"dispatched_action": dict(a1),
|
|
"retry_count": 0,
|
|
"replay_id": replay_id,
|
|
"session_id": session_id,
|
|
"machine_id": machine_id,
|
|
"dispatched_at": 0.0, # sentinelle repush
|
|
"first_dispatched_at": now - 45.0,
|
|
"resent_count": 1, # watchdog a repush 1x
|
|
"last_resent_at": now - 0.5,
|
|
}
|
|
|
|
# 1) Le poll re-dispatche la copie (sentinelle 0.0 autorise)
|
|
next_resp = await api_stream.get_next_action(session_id, machine_id)
|
|
assert next_resp["action"]["action_id"] == a1["action_id"]
|
|
assert api_stream._retry_pending[a1["action_id"]]["dispatched_at"] > 0
|
|
# resent_count préservé : signal exploitable côté agent pour dedup local
|
|
assert api_stream._retry_pending[a1["action_id"]]["resent_count"] == 1
|
|
|
|
# 2) Le report tardif (original) arrive — accepté sans signal duplicate
|
|
report = api_stream.ReplayResultReport(
|
|
session_id=session_id,
|
|
action_id=a1["action_id"],
|
|
success=True,
|
|
)
|
|
result = await api_stream.report_action_result(report)
|
|
|
|
# Contrat actuel : serveur accepte sans détecter la duplication.
|
|
# L'agent Windows doit déduplique via cache local action_id sinon
|
|
# double-exécution silencieuse possible. À durcir en workpack futur
|
|
# (Option A : status="duplicate_late_report" côté report_action_result).
|
|
assert result["status"] == "recorded"
|
|
assert a1["action_id"] not in api_stream._retry_pending
|
|
assert api_stream._replay_states[replay_id]["completed_actions"] == 1
|
|
# a2 reste en queue pour le poll suivant
|
|
assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [
|
|
a2["action_id"]
|
|
]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.xfail(
|
|
strict=False,
|
|
reason=(
|
|
"Race fenêtre release/re-acquire api_stream.py entre _replay_lock."
|
|
"release() et le re-acquire pour écrire _retry_pending. Documentée "
|
|
"comme 'race condition bénigne' dans le code. Voir WP-C Q1 + revue "
|
|
"D1 finding #1 : pré-inscrire _retry_pending avant release du lock "
|
|
"(solution B au backlog). strict=False car l'ordonnancement asyncio "
|
|
"peut faire passer le test fortuitement."
|
|
),
|
|
)
|
|
async def test_get_next_action_two_concurrent_polls(
|
|
isolated_replay_state, monkeypatch
|
|
):
|
|
"""WP4 test 2 — deux polls concurrents sur même (session, machine).
|
|
|
|
Variante déterministe : monkeypatch sync `_pre_check_screen_state` qui
|
|
sleep dans le thread executor (l'event loop reste libre, le 2e poll
|
|
peut acquérir le lock pendant la fenêtre release/re-acquire).
|
|
"""
|
|
import asyncio
|
|
|
|
api_stream = isolated_replay_state
|
|
replay_id = "replay_two_polls"
|
|
session_id = "sess_two_polls"
|
|
machine_id = "pc-two-polls"
|
|
|
|
# from_node + heartbeat frais => le pre-check est invoqué -> fenêtre observable
|
|
action = _click_action("act_two_polls")
|
|
action["from_node"] = "node_two_polls"
|
|
|
|
api_stream._replay_states[replay_id] = _running_replay_state(
|
|
replay_id, session_id, machine_id, [action]
|
|
)
|
|
api_stream._replay_queues[session_id] = [dict(action)]
|
|
api_stream._last_heartbeat[session_id] = {
|
|
"timestamp": time.time(),
|
|
"path": "/tmp/fake_heartbeat.png",
|
|
"detected_text": [],
|
|
"ui_elements": [],
|
|
"window_info": {},
|
|
"ocr_text": "",
|
|
}
|
|
|
|
def _slow_precheck_sync(*args, **kwargs):
|
|
# Force la fenêtre release/re-acquire. Sync car appelé via
|
|
# loop.run_in_executor : l'event loop reste libre, le 2e poll
|
|
# peut acquérir le lock pendant ce sleep.
|
|
time.sleep(0.05)
|
|
return None
|
|
|
|
monkeypatch.setattr(
|
|
api_stream, "_pre_check_screen_state", _slow_precheck_sync
|
|
)
|
|
|
|
results = await asyncio.gather(
|
|
api_stream.get_next_action(session_id, machine_id),
|
|
api_stream.get_next_action(session_id, machine_id),
|
|
return_exceptions=True,
|
|
)
|
|
|
|
dispatched = [r for r in results if isinstance(r, dict) and r.get("action")]
|
|
assert len(dispatched) == 1, (
|
|
f"Race détectée : {len(dispatched)} polls ont reçu l'action "
|
|
f"(attendu : 1). Results={results}"
|
|
)
|
|
assert len(api_stream._replay_queues.get(session_id, [])) == 0
|
|
assert len(api_stream._retry_pending) == 1
|