From 4ba426c205798db53b6f7ad5fbd46c27332b2ea1 Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 25 May 2026 11:00:59 +0200 Subject: [PATCH] fix(replay): guard single in-flight dispatch Add a private in-flight helper for replay dispatch, block machine retargeting while an action is still pending on the previous session, and warn on duplicate in-flight entries for the same replay triplet. Freeze the Notepad runtime dialog success path and add integration coverage for single in-flight dispatch, watchdog late-report documentation, and the known concurrent-poll race as an xfail. --- agent_v0/server_v1/api_stream.py | 134 +++- .../test_replay_single_inflight.py | 608 ++++++++++++++++++ .../unit/test_executor_verify_window_guard.py | 73 +++ 3 files changed, 805 insertions(+), 10 deletions(-) create mode 100644 tests/integration/test_replay_single_inflight.py diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 989af4467..43d4f021a 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -625,6 +625,40 @@ def _remove_queued_action_duplicates(session_id: str, action_id: str) -> int: return removed +def _find_in_flight_action( + session_id: str, + machine_id: str, + replay_id: str, +) -> Optional[str]: + """Return the in-flight action for this replay triplet, if any. + + Must be called while `_replay_lock` is held. `dispatched_at == 0.0` is + intentionally not in-flight: it marks a resume/retry/watchdog repush that + still needs to be dispatched. + """ + if not replay_id: + return None + matches: list[str] = [] + for pending_action_id, pending in list(_retry_pending.items()): + if ( + pending.get("session_id") == session_id + and pending.get("machine_id") == machine_id + and pending.get("replay_id") == replay_id + and float(pending.get("dispatched_at") or 0) > 0 + ): + matches.append(pending_action_id) + if not matches: + return None + if len(matches) > 1: + logger.warning( + "[REPLAY] _find_in_flight_action: %d in-flight actions for triplet " + "session=%s machine=%s replay=%s — state may be corrupted, " + "returning first (insertion order). action_ids=%s", + len(matches), session_id, machine_id, replay_id, matches, + ) + return matches[0] + + class StreamEvent(BaseModel): session_id: str timestamp: float @@ -3071,21 +3105,61 @@ async def get_next_action(session_id: str, machine_id: str = "default"): f"actions_en_attente={len(queue)}" ) + if owning_replay is not None: + replay_id = owning_replay.get("replay_id", "") + in_flight_action_id = _find_in_flight_action( + session_id, machine_id, replay_id + ) + if in_flight_action_id is not None: + logger.debug( + "[REPLAY] action déjà en vol replay=%s session=%s " + "machine=%s action_id=%s — pas de nouveau dispatch", + replay_id, session_id, machine_id, in_flight_action_id, + ) + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "action_in_flight": True, + "in_flight_action_id": in_flight_action_id, + "replay_id": replay_id, + } + if not queue and machine_id != "default": # Lookup 1 : machine_replay_target (mapping explicite POST /replay) target_sid = _machine_replay_target.get(machine_id) if target_sid and target_sid != session_id: target_queue = _replay_queues.get(target_sid, []) - if target_queue: - # Vérifier que le replay_state ciblé concerne BIEN cette machine - target_state = None - for state in _replay_states.values(): - if (state.get("session_id") == target_sid - and state.get("machine_id") == machine_id - and state["status"] == "running"): - target_state = state - break - if target_state: + # Vérifier que le replay_state ciblé concerne BIEN cette machine + target_state = None + for state in _replay_states.values(): + if (state.get("session_id") == target_sid + and state.get("machine_id") == machine_id + and state["status"] == "running"): + target_state = state + break + if target_state: + replay_id = target_state.get("replay_id", "") + in_flight_action_id = _find_in_flight_action( + target_sid, machine_id, replay_id + ) + if in_flight_action_id is not None: + logger.debug( + "[REPLAY] action déjà en vol replay=%s session=%s " + "ancienne_session=%s machine=%s action_id=%s — " + "reciblage différé", + replay_id, session_id, target_sid, machine_id, + in_flight_action_id, + ) + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "action_in_flight": True, + "in_flight_action_id": in_flight_action_id, + "replay_id": replay_id, + } + if target_queue: queue = target_queue owning_replay = target_state _replay_queues[session_id] = target_queue @@ -3101,6 +3175,26 @@ async def get_next_action(session_id: str, machine_id: str = "default"): and state["status"] == "running" and state["session_id"] != session_id): other_sid = state["session_id"] + replay_id = state.get("replay_id", "") + in_flight_action_id = _find_in_flight_action( + other_sid, machine_id, replay_id + ) + if in_flight_action_id is not None: + logger.debug( + "[REPLAY] action déjà en vol replay=%s session=%s " + "ancienne_session=%s machine=%s action_id=%s — " + "reciblage différé", + replay_id, session_id, other_sid, machine_id, + in_flight_action_id, + ) + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "action_in_flight": True, + "in_flight_action_id": in_flight_action_id, + "replay_id": replay_id, + } other_queue = _replay_queues.get(other_sid, []) if other_queue: queue = other_queue @@ -3112,6 +3206,26 @@ async def get_next_action(session_id: str, machine_id: str = "default"): logger.info(f"Replay machine-state: {machine_id} -> {other_sid} -> {session_id}") break + if owning_replay is not None: + replay_id = owning_replay.get("replay_id", "") + in_flight_action_id = _find_in_flight_action( + session_id, machine_id, replay_id + ) + if in_flight_action_id is not None: + logger.debug( + "[REPLAY] action déjà en vol replay=%s session=%s " + "machine=%s action_id=%s — pas de nouveau dispatch", + replay_id, session_id, machine_id, in_flight_action_id, + ) + return { + "action": None, + "session_id": session_id, + "machine_id": machine_id, + "action_in_flight": True, + "in_flight_action_id": in_flight_action_id, + "replay_id": replay_id, + } + if not queue: return {"action": None, "session_id": session_id, "machine_id": machine_id} diff --git a/tests/integration/test_replay_single_inflight.py b/tests/integration/test_replay_single_inflight.py new file mode 100644 index 000000000..e70d5f9e8 --- /dev/null +++ b/tests/integration/test_replay_single_inflight.py @@ -0,0 +1,608 @@ +"""Non-regression tests for replay single in-flight dispatch guards.""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path + +import pytest + + +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +@pytest.fixture +def isolated_replay_state(monkeypatch): + monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token") + + from agent_v0.server_v1 import api_stream + + monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token") + + if api_stream._replay_lock.locked(): + pytest.fail( + "_replay_lock is already held at fixture setup — a previous test " + "likely crashed without releasing it. Fail fast instead of waiting " + "on the 4.5s acquire timeout in get_next_action()." + ) + + saved_states = dict(api_stream._replay_states) + saved_queues = dict(api_stream._replay_queues) + saved_retry = dict(api_stream._retry_pending) + saved_targets = dict(api_stream._machine_replay_target) + saved_heartbeat = dict(api_stream._last_heartbeat) + + api_stream._replay_states.clear() + api_stream._replay_queues.clear() + api_stream._retry_pending.clear() + api_stream._machine_replay_target.clear() + api_stream._last_heartbeat.clear() + + yield api_stream + + api_stream._replay_states.clear() + api_stream._replay_states.update(saved_states) + api_stream._replay_queues.clear() + api_stream._replay_queues.update(saved_queues) + api_stream._retry_pending.clear() + api_stream._retry_pending.update(saved_retry) + api_stream._machine_replay_target.clear() + api_stream._machine_replay_target.update(saved_targets) + api_stream._last_heartbeat.clear() + api_stream._last_heartbeat.update(saved_heartbeat) + + +def _running_replay_state( + replay_id: str, + session_id: str, + machine_id: str, + actions: list[dict], +) -> dict: + return { + "replay_id": replay_id, + "workflow_id": "session_replay:test", + "session_id": session_id, + "machine_id": machine_id, + "status": "running", + "total_actions": len(actions), + "completed_actions": 0, + "failed_actions": 0, + "current_action_index": 0, + "params": {}, + "results": [], + "actions": list(actions), + "retried_actions": 0, + "unverified_actions": 0, + "error_log": [], + "last_screenshot": None, + "failed_action": None, + "pause_message": None, + "variables": {}, + "safety_checks": [], + "checks_acknowledged": [], + "pause_reason": "", + "pause_payload": None, + } + + +def _click_action(action_id: str) -> dict: + return { + "action_id": action_id, + "type": "click", + "visual_mode": True, + "x_pct": 0.5, + "y_pct": 0.5, + "target_spec": {"by_text": "OK"}, + } + + +def _pending_entry( + action: dict, + replay_id: str, + session_id: str, + machine_id: str, + dispatched_at: float, +) -> dict: + return { + "action": dict(action), + "dispatched_action": dict(action), + "retry_count": 0, + "replay_id": replay_id, + "session_id": session_id, + "machine_id": machine_id, + "dispatched_at": dispatched_at, + "first_dispatched_at": dispatched_at or 0.0, + "resent_count": 0, + "last_resent_at": 0.0, + } + + +class _NoopReplayLearner: + def record_from_replay_result(self, **kwargs): + return None + + +class _NoopAuditTrail: + def record(self, entry): + return None + + +@pytest.mark.asyncio +async def test_real_dispatch_then_next_poll_blocks_with_action_in_flight( + isolated_replay_state, +): + api_stream = isolated_replay_state + replay_id = "replay_real_dispatch_blocks" + session_id = "sess_real_dispatch_blocks" + machine_id = "pc-real-dispatch" + a1 = _click_action("act_real_dispatch_a1") + a2 = _click_action("act_real_dispatch_a2") + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [a1, a2] + ) + api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] + + first = await api_stream.get_next_action(session_id, machine_id) + blocked = await api_stream.get_next_action(session_id, machine_id) + blocked_again = await api_stream.get_next_action(session_id, machine_id) + + assert first["action"]["action_id"] == a1["action_id"] + assert blocked["action"] is None + assert blocked["action_in_flight"] is True + assert blocked["in_flight_action_id"] == a1["action_id"] + assert blocked["replay_id"] == replay_id + assert blocked_again["action"] is None + assert blocked_again["action_in_flight"] is True + assert blocked_again["in_flight_action_id"] == a1["action_id"] + assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [ + a2["action_id"] + ] + + +@pytest.mark.asyncio +async def test_success_report_clears_inflight_and_allows_next_dispatch( + isolated_replay_state, + monkeypatch, +): + api_stream = isolated_replay_state + replay_id = "replay_success_allows_next" + session_id = "sess_success_allows_next" + machine_id = "pc-success-next" + a1 = _click_action("act_success_next_a1") + a2 = _click_action("act_success_next_a2") + + monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) + monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [a1, a2] + ) + api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] + + first = await api_stream.get_next_action(session_id, machine_id) + assert first["action"]["action_id"] == a1["action_id"] + + report = api_stream.ReplayResultReport( + session_id=session_id, + action_id=a1["action_id"], + success=True, + ) + recorded = await api_stream.report_action_result(report) + second = await api_stream.get_next_action(session_id, machine_id) + + assert recorded["status"] == "recorded" + assert a1["action_id"] not in api_stream._retry_pending + assert api_stream._replay_states[replay_id]["completed_actions"] == 1 + assert api_stream._replay_states[replay_id]["current_action_index"] == 1 + assert second["action"]["action_id"] == a2["action_id"] + assert api_stream._replay_queues[session_id] == [] + + +@pytest.mark.asyncio +async def test_get_next_action_blocks_when_inflight(isolated_replay_state): + api_stream = isolated_replay_state + replay_id = "replay_single_inflight" + session_id = "sess_single_inflight" + machine_id = "pc-single" + action = _click_action("act_single_inflight") + dispatched_at = time.time() + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [action] + ) + api_stream._replay_queues[session_id] = [dict(action)] + api_stream._retry_pending[action["action_id"]] = _pending_entry( + action, replay_id, session_id, machine_id, dispatched_at + ) + + result = await api_stream.get_next_action(session_id, machine_id) + + assert result["action"] is None + assert result["action_in_flight"] is True + assert result["in_flight_action_id"] == action["action_id"] + assert result["replay_id"] == replay_id + assert api_stream._replay_queues[session_id] == [action] + assert ( + api_stream._retry_pending[action["action_id"]]["dispatched_at"] + == dispatched_at + ) + + +@pytest.mark.asyncio +async def test_get_next_action_allows_resume_with_dispatched_at_zero( + isolated_replay_state, +): + api_stream = isolated_replay_state + replay_id = "replay_resume_zero" + session_id = "sess_resume_zero" + machine_id = "pc-resume" + action = _click_action("act_resume_zero") + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [action] + ) + api_stream._replay_queues[session_id] = [dict(action)] + api_stream._retry_pending[action["action_id"]] = _pending_entry( + action, replay_id, session_id, machine_id, dispatched_at=0.0 + ) + + result = await api_stream.get_next_action(session_id, machine_id) + + assert result["action"]["action_id"] == action["action_id"] + assert api_stream._replay_queues[session_id] == [] + retry_info = api_stream._retry_pending[action["action_id"]] + assert retry_info["dispatched_at"] > 0 + assert retry_info["first_dispatched_at"] > 0 + assert retry_info["session_id"] == session_id + assert retry_info["machine_id"] == machine_id + assert retry_info["replay_id"] == replay_id + + +@pytest.mark.asyncio +async def test_inflight_filter_isolates_replays(isolated_replay_state): + api_stream = isolated_replay_state + session_id = "sess_replay_isolation" + machine_id = "pc-isolation" + old_action = _click_action("act_old_inflight") + new_action = _click_action("act_new_dispatch") + + api_stream._replay_states["replay_new"] = _running_replay_state( + "replay_new", session_id, machine_id, [new_action] + ) + api_stream._replay_queues[session_id] = [dict(new_action)] + api_stream._retry_pending[old_action["action_id"]] = _pending_entry( + old_action, + "replay_old", + session_id, + machine_id, + dispatched_at=time.time(), + ) + + result = await api_stream.get_next_action(session_id, machine_id) + + assert result["action"]["action_id"] == new_action["action_id"] + assert new_action["action_id"] in api_stream._retry_pending + assert old_action["action_id"] in api_stream._retry_pending + + +@pytest.mark.asyncio +async def test_inflight_filter_isolates_machines(isolated_replay_state): + api_stream = isolated_replay_state + replay_id = "replay_machine_isolation" + session_id = "sess_machine_isolation" + pending_action = _click_action("act_other_machine_inflight") + action = _click_action("act_this_machine_dispatch") + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, "pc-2", [action] + ) + api_stream._replay_queues[session_id] = [dict(action)] + api_stream._retry_pending[pending_action["action_id"]] = _pending_entry( + pending_action, + replay_id, + session_id, + "pc-1", + dispatched_at=time.time(), + ) + + result = await api_stream.get_next_action(session_id, "pc-2") + + assert result["action"]["action_id"] == action["action_id"] + assert action["action_id"] in api_stream._retry_pending + assert pending_action["action_id"] in api_stream._retry_pending + + +@pytest.mark.parametrize( + "use_machine_target", + [True, False], + ids=["machine_replay_target", "lookup_other_session"], +) +@pytest.mark.asyncio +async def test_get_next_action_blocks_reciblage_with_old_pending_session( + isolated_replay_state, + monkeypatch, + use_machine_target, +): + api_stream = isolated_replay_state + replay_id = "replay_lookup_inflight" + original_session_id = "sess_lookup_original" + polling_session_id = "sess_lookup_polling" + machine_id = "pc-lookup" + a1 = _click_action("act_lookup_inflight_a1") + a2 = _click_action("act_lookup_inflight_a2") + dispatched_at = time.time() + + monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) + monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, original_session_id, machine_id, [a1, a2] + ) + api_stream._replay_queues[original_session_id] = [dict(a2)] + if use_machine_target: + api_stream._machine_replay_target[machine_id] = original_session_id + api_stream._retry_pending[a1["action_id"]] = _pending_entry( + a1, + replay_id, + original_session_id, + machine_id, + dispatched_at=dispatched_at, + ) + + result = await api_stream.get_next_action(polling_session_id, machine_id) + + assert result["action"] is None + assert result["action_in_flight"] is True + assert result["in_flight_action_id"] == a1["action_id"] + assert result["replay_id"] == replay_id + assert api_stream._replay_queues[original_session_id] == [a2] + assert polling_session_id not in api_stream._replay_queues + assert api_stream._replay_states[replay_id]["session_id"] == original_session_id + assert ( + api_stream._retry_pending[a1["action_id"]]["session_id"] + == original_session_id + ) + assert ( + api_stream._retry_pending[a1["action_id"]]["dispatched_at"] + == dispatched_at + ) + if use_machine_target: + assert api_stream._machine_replay_target[machine_id] == original_session_id + + report = api_stream.ReplayResultReport( + session_id=original_session_id, + action_id=a1["action_id"], + success=True, + ) + recorded = await api_stream.report_action_result(report) + + assert recorded["status"] == "recorded" + assert a1["action_id"] not in api_stream._retry_pending + assert api_stream._replay_states[replay_id]["completed_actions"] == 1 + assert api_stream._replay_states[replay_id]["current_action_index"] == 1 + assert api_stream._replay_queues[original_session_id] == [a2] + + +# ----------------------------------------------------------------------------- +# WP4 suite — tests 7, 3, 2 (cf. inbox_codex/2026-05-25_0940_..._WP4-suite-3- +# tests-restants.md ; arbitrage GO inbox_claude/2026-05-25_1018_...). +# ----------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_concurrent_dispatch_and_result_no_double_increment( + isolated_replay_state, + monkeypatch, +): + """WP4 test 7 — report + poll concurrents : a1 ne doit jamais être + re-dispatché, et completed_actions ne doit jamais dépasser 1. + + Deux ordres d'exécution valides (sérialisés par _async_replay_lock) : + - cas A : poll exécuté AVANT le pop _retry_pending → action_in_flight=True + - cas B : poll exécuté APRÈS le pop → reçoit a2 + + Sert de canari anti-régression : si quelqu'un déplace le pop hors lock + ou change la sérialisation, ce test attrape la dérive. + """ + import asyncio + + api_stream = isolated_replay_state + monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) + monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) + + replay_id = "replay_concurrent_dispatch" + session_id = "sess_concurrent_dispatch" + machine_id = "pc-concurrent" + a1 = _click_action("act_concurrent_a1") + a2 = _click_action("act_concurrent_a2") + dispatched_at = time.time() + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [a1, a2] + ) + # a1 déjà en vol : queue contient seulement a2 + api_stream._replay_queues[session_id] = [dict(a2)] + api_stream._retry_pending[a1["action_id"]] = _pending_entry( + a1, replay_id, session_id, machine_id, dispatched_at + ) + + report = api_stream.ReplayResultReport( + session_id=session_id, + action_id=a1["action_id"], + success=True, + ) + + result_report, result_poll = await asyncio.gather( + api_stream.report_action_result(report), + api_stream.get_next_action(session_id, machine_id), + ) + + # Report accepté, a1 retiré de _retry_pending + assert result_report["status"] == "recorded" + assert a1["action_id"] not in api_stream._retry_pending + assert api_stream._replay_states[replay_id]["completed_actions"] == 1 + + # Poll : 2 branches valides, jamais re-dispatch de a1 + poll_action = result_poll.get("action") + if poll_action is not None: + # cas B : poll après pop → a reçu a2 + assert poll_action["action_id"] == a2["action_id"], ( + f"Re-dispatch interdit de a1 : {poll_action}" + ) + else: + # cas A : poll avant pop → action_in_flight + assert result_poll.get("action_in_flight") is True + assert result_poll.get("in_flight_action_id") == a1["action_id"] + + # a1 ne doit JAMAIS apparaître deux fois dans completed + assert api_stream._replay_states[replay_id]["completed_actions"] <= 1 + + +@pytest.mark.asyncio +async def test_late_report_after_watchdog_repush_documents_dedup_contract( + isolated_replay_state, + monkeypatch, +): + """WP4 test 3 — late report après repush watchdog. + + ATTENTION : couvre une LIMITATION explicite. L'agent Windows DOIT + déduplique par action_id pour éviter une double-exécution après repush + watchdog suivi d'un late report. Voir audit WP-C Q7 et arbitrage Codex : + Option A (hardening serveur) reportée en workpack ultérieur. + + Séquence couverte : + T0 : dispatch initial de a1 + T0+45 : watchdog repush a1 (dispatched_at = 0.0, resent_count = 1) + T0+46 : poll re-dispatche la copie (0.0 = sentinelle resume/retry) + T0+47 : report tardif arrive — accepté sans flag duplicate + """ + api_stream = isolated_replay_state + monkeypatch.setattr(api_stream, "_replay_learner", _NoopReplayLearner()) + monkeypatch.setattr(api_stream, "_audit_trail", _NoopAuditTrail()) + + replay_id = "replay_late_report_dup" + session_id = "sess_late_report_dup" + machine_id = "pc-late-report" + a1 = _click_action("act_late_report_dup") + a2 = _click_action("act_late_report_next") + now = time.time() + + # Setup : copie repushée par watchdog (dispatched_at = 0.0, resent_count = 1) + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [a1, a2] + ) + api_stream._replay_queues[session_id] = [dict(a1), dict(a2)] + api_stream._retry_pending[a1["action_id"]] = { + "action": dict(a1), + "dispatched_action": dict(a1), + "retry_count": 0, + "replay_id": replay_id, + "session_id": session_id, + "machine_id": machine_id, + "dispatched_at": 0.0, # sentinelle repush + "first_dispatched_at": now - 45.0, + "resent_count": 1, # watchdog a repush 1x + "last_resent_at": now - 0.5, + } + + # 1) Le poll re-dispatche la copie (sentinelle 0.0 autorise) + next_resp = await api_stream.get_next_action(session_id, machine_id) + assert next_resp["action"]["action_id"] == a1["action_id"] + assert api_stream._retry_pending[a1["action_id"]]["dispatched_at"] > 0 + # resent_count préservé : signal exploitable côté agent pour dedup local + assert api_stream._retry_pending[a1["action_id"]]["resent_count"] == 1 + + # 2) Le report tardif (original) arrive — accepté sans signal duplicate + report = api_stream.ReplayResultReport( + session_id=session_id, + action_id=a1["action_id"], + success=True, + ) + result = await api_stream.report_action_result(report) + + # Contrat actuel : serveur accepte sans détecter la duplication. + # L'agent Windows doit déduplique via cache local action_id sinon + # double-exécution silencieuse possible. À durcir en workpack futur + # (Option A : status="duplicate_late_report" côté report_action_result). + assert result["status"] == "recorded" + assert a1["action_id"] not in api_stream._retry_pending + assert api_stream._replay_states[replay_id]["completed_actions"] == 1 + # a2 reste en queue pour le poll suivant + assert [a["action_id"] for a in api_stream._replay_queues[session_id]] == [ + a2["action_id"] + ] + + +@pytest.mark.asyncio +@pytest.mark.xfail( + strict=False, + reason=( + "Race fenêtre release/re-acquire api_stream.py entre _replay_lock." + "release() et le re-acquire pour écrire _retry_pending. Documentée " + "comme 'race condition bénigne' dans le code. Voir WP-C Q1 + revue " + "D1 finding #1 : pré-inscrire _retry_pending avant release du lock " + "(solution B au backlog). strict=False car l'ordonnancement asyncio " + "peut faire passer le test fortuitement." + ), +) +async def test_get_next_action_two_concurrent_polls( + isolated_replay_state, monkeypatch +): + """WP4 test 2 — deux polls concurrents sur même (session, machine). + + Variante déterministe : monkeypatch sync `_pre_check_screen_state` qui + sleep dans le thread executor (l'event loop reste libre, le 2e poll + peut acquérir le lock pendant la fenêtre release/re-acquire). + """ + import asyncio + + api_stream = isolated_replay_state + replay_id = "replay_two_polls" + session_id = "sess_two_polls" + machine_id = "pc-two-polls" + + # from_node + heartbeat frais => le pre-check est invoqué -> fenêtre observable + action = _click_action("act_two_polls") + action["from_node"] = "node_two_polls" + + api_stream._replay_states[replay_id] = _running_replay_state( + replay_id, session_id, machine_id, [action] + ) + api_stream._replay_queues[session_id] = [dict(action)] + api_stream._last_heartbeat[session_id] = { + "timestamp": time.time(), + "path": "/tmp/fake_heartbeat.png", + "detected_text": [], + "ui_elements": [], + "window_info": {}, + "ocr_text": "", + } + + def _slow_precheck_sync(*args, **kwargs): + # Force la fenêtre release/re-acquire. Sync car appelé via + # loop.run_in_executor : l'event loop reste libre, le 2e poll + # peut acquérir le lock pendant ce sleep. + time.sleep(0.05) + return None + + monkeypatch.setattr( + api_stream, "_pre_check_screen_state", _slow_precheck_sync + ) + + results = await asyncio.gather( + api_stream.get_next_action(session_id, machine_id), + api_stream.get_next_action(session_id, machine_id), + return_exceptions=True, + ) + + dispatched = [r for r in results if isinstance(r, dict) and r.get("action")] + assert len(dispatched) == 1, ( + f"Race détectée : {len(dispatched)} polls ont reçu l'action " + f"(attendu : 1). Results={results}" + ) + assert len(api_stream._replay_queues.get(session_id, [])) == 0 + assert len(api_stream._retry_pending) == 1 diff --git a/tests/unit/test_executor_verify_window_guard.py b/tests/unit/test_executor_verify_window_guard.py index 628399752..68a0f90a3 100644 --- a/tests/unit/test_executor_verify_window_guard.py +++ b/tests/unit/test_executor_verify_window_guard.py @@ -391,6 +391,8 @@ class TestSetupActionsSkipPixelChange: class TestRuntimeDialogHandling: def test_handle_confirm_save_dialog_clicks_oui_via_server(self): exe = _make_executor_skeleton() + exe._active_window_rect_for_dialog = MagicMock(return_value=None) + exe._try_click_runtime_dialog_button_uia = MagicMock(return_value=None) exe._capture_screenshot_b64 = MagicMock(return_value="abc") exe._server_resolve_target = MagicMock( return_value={ @@ -403,6 +405,7 @@ class TestRuntimeDialogHandling: ) exe._find_text_on_screen = MagicMock(return_value=None) exe._click = MagicMock() + exe._wait_until_title_changes = MagicMock(return_value="Enregistrer sous") spec = ActionExecutorV1._match_known_runtime_dialog( "Confirmer l'enregistrement" @@ -418,6 +421,22 @@ class TestRuntimeDialogHandling: exe._server_resolve_target.assert_called_once() exe._click.assert_called_once_with((480, 810), "left") + def test_confirm_save_geometry_fallback_targets_yes_button_in_active_window(self): + exe = _make_executor_skeleton() + spec = ActionExecutorV1._match_known_runtime_dialog( + "Confirmer l'enregistrement" + ) + rect = { + "left": 1000, + "top": 600, + "width": 520, + "height": 200, + } + + pos = exe._runtime_dialog_button_geometry_fallback(spec, "Oui", rect) + + assert pos == (1338, 764) + def test_runtime_dialog_before_pause_returns_skip_result(self): exe = _make_executor_skeleton() exe._check_and_pause_on_system_dialog = MagicMock(return_value=False) @@ -491,6 +510,60 @@ class TestRuntimeDialogHandling: assert res["actual_position"] == {"x_pct": 0.5, "y_pct": 0.5} exe._handle_known_runtime_dialog.assert_called_once() + def test_live_notepad_confirm_save_dialog_is_frozen_offline(self): + exe = _make_executor_skeleton() + exe._click = MagicMock() + exe._quick_screenshot_hash = MagicMock(return_value="hash_before") + exe._wait_for_screen_change = MagicMock(return_value=True) + exe._capture_human_correction = MagicMock(return_value=[]) + exe._handle_known_runtime_dialog = MagicMock( + return_value={ + "handled": True, + "button_text": "Oui", + "x_pct": 0.63, + "y_pct": 0.76, + "resolution_score": 0.9, + "post_title": "http192.168.1.408765dossier.htmlid=.txt - Bloc-notes", + } + ) + + action = { + "action_id": "act_raw_a8dbaaac", + "type": "click", + "x_pct": 0.5, + "y_pct": 0.5, + "expected_window_title": ( + "http192.168.1.408765dossier.htmlid=.txt - Bloc-notes" + ), + } + + titles = iter( + [ + {"title": "Confirmer l'enregistrement"}, + {"title": "http192.168.1.408765dossier.htmlid=.txt - Bloc-notes"}, + ] + ) + + with patch("agent_v0.agent_v1.core.executor.time.sleep", lambda *_a, **_k: None): + with patch( + "agent_v0.agent_v1.window_info_crossplatform.get_active_window_info", + side_effect=lambda: next(titles), + ): + res = exe.execute_replay_action(action) + + assert res["success"] is True + assert res["warning"] == "runtime_dialog_handled_post_verify" + assert res["action_id"] == "act_raw_a8dbaaac" + assert res.get("needs_human") is not True + assert "correction" not in res + assert res["runtime_dialog"] == { + "dialog_id": "confirm_save_overwrite", + "dialog_title": "Confirmer l'enregistrement", + "button_text": "Oui", + } + exe._handle_known_runtime_dialog.assert_called_once() + exe._capture_human_correction.assert_not_called() + def test_post_verify_can_retry_same_runtime_dialog_before_recovery(self): exe = _make_executor_skeleton() exe._click = MagicMock()