"""Non-regression tests for replay single in-flight dispatch guards.""" from __future__ import annotations import sys import time from pathlib import Path import pytest ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) @pytest.fixture def isolated_replay_state(monkeypatch, tmp_path): monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token") from agent_v0.server_v1 import api_stream from agent_v0.server_v1.agent_registry import AgentRegistry monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token") # Isoler le registre pour que _agent_registry_has_entries() retourne False # (mode dev, aucun agent enrolle) — sinon le garde fleet bloque les tests original_registry = api_stream.agent_registry empty_registry = AgentRegistry(db_path=str(tmp_path / "empty_agents.db")) monkeypatch.setattr(api_stream, "agent_registry", empty_registry) if api_stream._replay_lock.locked(): pytest.fail( "_replay_lock is already held at fixture setup — a previous test " "likely crashed without releasing it. Fail fast instead of waiting " "on the 4.5s acquire timeout in get_next_action()." ) saved_states = dict(api_stream._replay_states) saved_queues = dict(api_stream._replay_queues) saved_retry = dict(api_stream._retry_pending) saved_targets = dict(api_stream._machine_replay_target) saved_heartbeat = dict(api_stream._last_heartbeat) api_stream._replay_states.clear() api_stream._replay_queues.clear() api_stream._retry_pending.clear() api_stream._machine_replay_target.clear() api_stream._last_heartbeat.clear() yield api_stream api_stream._replay_states.clear() api_stream._replay_states.update(saved_states) api_stream._replay_queues.clear() api_stream._replay_queues.update(saved_queues) api_stream._retry_pending.clear() api_stream._retry_pending.update(saved_retry) api_stream._machine_replay_target.clear() api_stream._machine_replay_target.update(saved_targets) api_stream._last_heartbeat.clear() api_stream._last_heartbeat.update(saved_heartbeat) monkeypatch.setattr(api_stream, "agent_registry", original_registry) def _running_replay_state( replay_id: str, session_id: str, machine_id: str, actions: list[dict], ) -> dict: return { "replay_id": replay_id, "workflow_id": "session_replay:test", "session_id": session_id, "machine_id": machine_id, "status": "running", "total_actions": len(actions), "completed_actions": 0, "failed_actions": 0, "current_action_index": 0, "params": {}, "results": [], "actions": list(actions), "retried_actions": 0, "unverified_actions": 0, "error_log": [], "last_screenshot": None, "failed_action": None, "pause_message": None, "variables": {}, "safety_checks": [], "checks_acknowledged": [], "pause_reason": "", "pause_payload": None, } def _click_action(action_id: str) -> dict: return { "action_id": action_id, "type": "click", "visual_mode": True, "x_pct": 0.5, "y_pct": 0.5, "target_spec": {"by_text": "OK"}, } def _pending_entry( action: dict, replay_id: str, session_id: str, machine_id: str, dispatched_at: float, ) -> dict: return { "action": dict(action), "dispatched_action": dict(action), "retry_count": 0, "replay_id": replay_id, "session_id": session_id, "machine_id": machine_id, "dispatched_at": dispatched_at, "first_dispatched_at": dispatched_at or 0.0, "resent_count": 0, "last_resent_at": 0.0, } class _NoopReplayLearner: def record_from_replay_result(self, **kwargs): return None class _NoopAuditTrail: def record(self, entry): return None @pytest.mark.asyncio async def test_real_dispatch_then_next_poll_blocks_with_action_in_flight( isolated_replay_state, ): api_stream = isolated_replay_state replay_id = "replay_real_dispatch_blocks" session_id = "sess_real_dispatch_blocks" machine_id = "pc-real-dispatch" a1 = _click_action("act_real_dispatch_a1") a2 = _click_action("act_real_dispatch_a2") api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [a1, a2] ) api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] first = await api_stream.get_next_action(session_id, machine_id) blocked = await api_stream.get_next_action(session_id, machine_id) blocked_again = await api_stream.get_next_action(session_id, machine_id) assert first["action"]["action_id"] == a1["action_id"] assert blocked["action"] is None assert blocked["action_in_flight"] is True assert blocked["in_flight_action_id"] == a1["action_id"] assert blocked["replay_id"] == replay_id assert blocked_again["action"] is None assert blocked_again["action_in_flight"] is True assert blocked_again["in_flight_action_id"] == a1["action_id"] assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [ a2["action_id"] ] @pytest.mark.asyncio async def test_success_report_clears_inflight_and_allows_next_dispatch( isolated_replay_state, monkeypatch, ): api_stream = isolated_replay_state replay_id = "replay_success_allows_next" session_id = "sess_success_allows_next" machine_id = "pc-success-next" a1 = _click_action("act_success_next_a1") a2 = _click_action("act_success_next_a2") monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [a1, a2] ) api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] first = await api_stream.get_next_action(session_id, machine_id) assert first["action"]["action_id"] == a1["action_id"] report = api_stream.ReplayResultReport( session_id=session_id, action_id=a1["action_id"], success=True, ) recorded = await api_stream.report_action_result(report) second = await api_stream.get_next_action(session_id, machine_id) assert recorded["status"] == "recorded" assert a1["action_id"] not in api_stream._retry_pending assert api_stream._replay_states[replay_id]["completed_actions"] == 1 assert api_stream._replay_states[replay_id]["current_action_index"] == 1 assert second["action"]["action_id"] == a2["action_id"] assert api_stream._replay_queues[session_id] == [] @pytest.mark.asyncio async def test_get_next_action_blocks_when_inflight(isolated_replay_state): api_stream = isolated_replay_state replay_id = "replay_single_inflight" session_id = "sess_single_inflight" machine_id = "pc-single" action = _click_action("act_single_inflight") dispatched_at = time.time() api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [action] ) api_stream._replay_queues[session_id] = [dict(action)] api_stream._retry_pending[action["action_id"]] = _pending_entry( action, replay_id, session_id, machine_id, dispatched_at ) result = await api_stream.get_next_action(session_id, machine_id) assert result["action"] is None assert result["action_in_flight"] is True assert result["in_flight_action_id"] == action["action_id"] assert result["replay_id"] == replay_id assert api_stream._replay_queues[session_id] == [action] assert ( api_stream._retry_pending[action["action_id"]]["dispatched_at"] == dispatched_at ) @pytest.mark.asyncio async def test_get_next_action_allows_resume_with_dispatched_at_zero( isolated_replay_state, ): api_stream = isolated_replay_state replay_id = "replay_resume_zero" session_id = "sess_resume_zero" machine_id = "pc-resume" action = _click_action("act_resume_zero") api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [action] ) api_stream._replay_queues[session_id] = [dict(action)] api_stream._retry_pending[action["action_id"]] = _pending_entry( action, replay_id, session_id, machine_id, dispatched_at=0.0 ) result = await api_stream.get_next_action(session_id, machine_id) assert result["action"]["action_id"] == action["action_id"] assert api_stream._replay_queues[session_id] == [] retry_info = api_stream._retry_pending[action["action_id"]] assert retry_info["dispatched_at"] > 0 assert retry_info["first_dispatched_at"] > 0 assert retry_info["session_id"] == session_id assert retry_info["machine_id"] == machine_id assert retry_info["replay_id"] == replay_id @pytest.mark.asyncio async def test_inflight_filter_isolates_replays(isolated_replay_state): api_stream = isolated_replay_state session_id = "sess_replay_isolation" machine_id = "pc-isolation" old_action = _click_action("act_old_inflight") new_action = _click_action("act_new_dispatch") api_stream._replay_states["replay_new"] = _running_replay_state( "replay_new", session_id, machine_id, [new_action] ) api_stream._replay_queues[session_id] = [dict(new_action)] api_stream._retry_pending[old_action["action_id"]] = _pending_entry( old_action, "replay_old", session_id, machine_id, dispatched_at=time.time(), ) result = await api_stream.get_next_action(session_id, machine_id) assert result["action"]["action_id"] == new_action["action_id"] assert new_action["action_id"] in api_stream._retry_pending assert old_action["action_id"] in api_stream._retry_pending @pytest.mark.asyncio async def test_inflight_filter_isolates_machines(isolated_replay_state): api_stream = isolated_replay_state replay_id = "replay_machine_isolation" session_id = "sess_machine_isolation" pending_action = _click_action("act_other_machine_inflight") action = _click_action("act_this_machine_dispatch") api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, "pc-2", [action] ) api_stream._replay_queues[session_id] = [dict(action)] api_stream._retry_pending[pending_action["action_id"]] = _pending_entry( pending_action, replay_id, session_id, "pc-1", dispatched_at=time.time(), ) result = await api_stream.get_next_action(session_id, "pc-2") assert result["action"]["action_id"] == action["action_id"] assert action["action_id"] in api_stream._retry_pending assert pending_action["action_id"] in api_stream._retry_pending @pytest.mark.parametrize( "use_machine_target", [True, False], ids=["machine_replay_target", "lookup_other_session"], ) @pytest.mark.asyncio async def test_get_next_action_blocks_reciblage_with_old_pending_session( isolated_replay_state, monkeypatch, use_machine_target, ): api_stream = isolated_replay_state replay_id = "replay_lookup_inflight" original_session_id = "sess_lookup_original" polling_session_id = "sess_lookup_polling" machine_id = "pc-lookup" a1 = _click_action("act_lookup_inflight_a1") a2 = _click_action("act_lookup_inflight_a2") dispatched_at = time.time() monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) api_stream._replay_states[replay_id] = _running_replay_state( replay_id, original_session_id, machine_id, [a1, a2] ) api_stream._replay_queues[original_session_id] = [dict(a2)] if use_machine_target: api_stream._machine_replay_target[machine_id] = original_session_id api_stream._retry_pending[a1["action_id"]] = _pending_entry( a1, replay_id, original_session_id, machine_id, dispatched_at=dispatched_at, ) result = await api_stream.get_next_action(polling_session_id, machine_id) assert result["action"] is None assert result["action_in_flight"] is True assert result["in_flight_action_id"] == a1["action_id"] assert result["replay_id"] == replay_id assert api_stream._replay_queues[original_session_id] == [a2] assert polling_session_id not in api_stream._replay_queues assert api_stream._replay_states[replay_id]["session_id"] == original_session_id assert ( api_stream._retry_pending[a1["action_id"]]["session_id"] == original_session_id ) assert ( api_stream._retry_pending[a1["action_id"]]["dispatched_at"] == dispatched_at ) if use_machine_target: assert api_stream._machine_replay_target[machine_id] == original_session_id report = api_stream.ReplayResultReport( session_id=original_session_id, action_id=a1["action_id"], success=True, ) recorded = await api_stream.report_action_result(report) assert recorded["status"] == "recorded" assert a1["action_id"] not in api_stream._retry_pending assert api_stream._replay_states[replay_id]["completed_actions"] == 1 assert api_stream._replay_states[replay_id]["current_action_index"] == 1 assert api_stream._replay_queues[original_session_id] == [a2] # ----------------------------------------------------------------------------- # WP4 suite — tests 7, 3, 2 (cf. inbox_codex/2026-05-25_0940_..._WP4-suite-3- # tests-restants.md ; arbitrage GO inbox_claude/2026-05-25_1018_...). # ----------------------------------------------------------------------------- @pytest.mark.asyncio async def test_concurrent_dispatch_and_result_no_double_increment( isolated_replay_state, monkeypatch, ): """WP4 test 7 — report + poll concurrents : a1 ne doit jamais être re-dispatché, et completed_actions ne doit jamais dépasser 1. Deux ordres d'exécution valides (sérialisés par _async_replay_lock) : - cas A : poll exécuté AVANT le pop _retry_pending → action_in_flight=True - cas B : poll exécuté APRÈS le pop → reçoit a2 Sert de canari anti-régression : si quelqu'un déplace le pop hors lock ou change la sérialisation, ce test attrape la dérive. """ import asyncio api_stream = isolated_replay_state monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) replay_id = "replay_concurrent_dispatch" session_id = "sess_concurrent_dispatch" machine_id = "pc-concurrent" a1 = _click_action("act_concurrent_a1") a2 = _click_action("act_concurrent_a2") dispatched_at = time.time() api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [a1, a2] ) # a1 déjà en vol : queue contient seulement a2 api_stream._replay_queues[session_id] = [dict(a2)] api_stream._retry_pending[a1["action_id"]] = _pending_entry( a1, replay_id, session_id, machine_id, dispatched_at ) report = api_stream.ReplayResultReport( session_id=session_id, action_id=a1["action_id"], success=True, ) result_report, result_poll = await asyncio.gather( api_stream.report_action_result(report), api_stream.get_next_action(session_id, machine_id), ) # Report accepté, a1 retiré de _retry_pending assert result_report["status"] == "recorded" assert a1["action_id"] not in api_stream._retry_pending assert api_stream._replay_states[replay_id]["completed_actions"] == 1 # Poll : 2 branches valides, jamais re-dispatch de a1 poll_action = result_poll.get("action") if poll_action is not None: # cas B : poll après pop → a reçu a2 assert poll_action["action_id"] == a2["action_id"], ( f"Re-dispatch interdit de a1 : {poll_action}" ) else: # cas A : poll avant pop → action_in_flight assert result_poll.get("action_in_flight") is True assert result_poll.get("in_flight_action_id") == a1["action_id"] # a1 ne doit JAMAIS apparaître deux fois dans completed assert api_stream._replay_states[replay_id]["completed_actions"] <= 1 @pytest.mark.asyncio async def test_late_report_after_watchdog_repush_documents_dedup_contract( isolated_replay_state, monkeypatch, ): """WP4 test 3 — late report après repush watchdog. ATTENTION : couvre une LIMITATION explicite. L'agent Windows DOIT déduplique par action_id pour éviter une double-exécution après repush watchdog suivi d'un late report. Voir audit WP-C Q7 et arbitrage Codex : Option A (hardening serveur) reportée en workpack ultérieur. Séquence couverte : T0 : dispatch initial de a1 T0+45 : watchdog repush a1 (dispatched_at = 0.0, resent_count = 1) T0+46 : poll re-dispatche la copie (0.0 = sentinelle resume/retry) T0+47 : report tardif arrive — accepté sans flag duplicate """ api_stream = isolated_replay_state monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) replay_id = "replay_late_report_dup" session_id = "sess_late_report_dup" machine_id = "pc-late-report" a1 = _click_action("act_late_report_dup") a2 = _click_action("act_late_report_next") now = time.time() # Setup : copie repushée par watchdog (dispatched_at = 0.0, resent_count = 1) api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [a1, a2] ) api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] api_stream._retry_pending[a1["action_id"]] = { "action": dict(a1), "dispatched_action": dict(a1), "retry_count": 0, "replay_id": replay_id, "session_id": session_id, "machine_id": machine_id, "dispatched_at": 0.0, # sentinelle repush "first_dispatched_at": now - 45.0, "resent_count": 1, # watchdog a repush 1x "last_resent_at": now - 0.5, } # 1) Le poll re-dispatche la copie (sentinelle 0.0 autorise) next_resp = await api_stream.get_next_action(session_id, machine_id) assert next_resp["action"]["action_id"] == a1["action_id"] assert api_stream._retry_pending[a1["action_id"]]["dispatched_at"] > 0 # resent_count préservé : signal exploitable côté agent pour dedup local assert api_stream._retry_pending[a1["action_id"]]["resent_count"] == 1 # 2) Le report tardif (original) arrive — accepté sans signal duplicate report = api_stream.ReplayResultReport( session_id=session_id, action_id=a1["action_id"], success=True, ) result = await api_stream.report_action_result(report) # Contrat actuel : serveur accepte sans détecter la duplication. # L'agent Windows doit déduplique via cache local action_id sinon # double-exécution silencieuse possible. À durcir en workpack futur # (Option A : status="duplicate_late_report" côté report_action_result). assert result["status"] == "recorded" assert a1["action_id"] not in api_stream._retry_pending assert api_stream._replay_states[replay_id]["completed_actions"] == 1 # a2 reste en queue pour le poll suivant assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [ a2["action_id"] ] @pytest.mark.asyncio @pytest.mark.xfail( strict=False, reason=( "Race fenêtre release/re-acquire api_stream.py entre _replay_lock." "release() et le re-acquire pour écrire _retry_pending. Documentée " "comme 'race condition bénigne' dans le code. Voir WP-C Q1 + revue " "D1 finding #1 : pré-inscrire _retry_pending avant release du lock " "(solution B au backlog). strict=False car l'ordonnancement asyncio " "peut faire passer le test fortuitement." ), ) async def test_get_next_action_two_concurrent_polls( isolated_replay_state, monkeypatch ): """WP4 test 2 — deux polls concurrents sur même (session, machine). Variante déterministe : monkeypatch sync `_pre_check_screen_state` qui sleep dans le thread executor (l'event loop reste libre, le 2e poll peut acquérir le lock pendant la fenêtre release/re-acquire). """ import asyncio api_stream = isolated_replay_state replay_id = "replay_two_polls" session_id = "sess_two_polls" machine_id = "pc-two-polls" # from_node + heartbeat frais => le pre-check est invoqué -> fenêtre observable action = _click_action("act_two_polls") action["from_node"] = "node_two_polls" api_stream._replay_states[replay_id] = _running_replay_state( replay_id, session_id, machine_id, [action] ) api_stream._replay_queues[session_id] = [dict(action)] api_stream._last_heartbeat[session_id] = { "timestamp": time.time(), "path": "/tmp/fake_heartbeat.png", "detected_text": [], "ui_elements": [], "window_info": {}, "ocr_text": "", } def _slow_precheck_sync(*args, **kwargs): # Force la fenêtre release/re-acquire. Sync car appelé via # loop.run_in_executor : l'event loop reste libre, le 2e poll # peut acquérir le lock pendant ce sleep. time.sleep(0.05) return None monkeypatch.setattr( api_stream, "_pre_check_screen_state", _slow_precheck_sync ) results = await asyncio.gather( api_stream.get_next_action(session_id, machine_id), api_stream.get_next_action(session_id, machine_id), return_exceptions=True, ) dispatched = [r for r in results if isinstance(r, dict) and r.get("action")] assert len(dispatched) == 1, ( f"Race détectée : {len(dispatched)} polls ont reçu l'action " f"(attendu : 1). Results={results}" ) assert len(api_stream._replay_queues.get(session_id, [])) == 0 assert len(api_stream._retry_pending) == 1