From 87dbe8c5ffeb606fb67d4bf7a32df525b0b0ab58 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 6 May 2026 17:19:05 +0200 Subject: [PATCH] fix(stream): get_replay_status non-bloquant + bornage actions serveur MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Suite du commit 35b27ae49 (lock async sur /replay/next) qui n'avait traité que la moitié du problème. Le sprint QW4 (commit f5c33477f) a recâblé le polling frontend PauseDialog vers /replay/{replay_id} → get_replay_status, qui gardait un `with _replay_lock:` synchrone. Conséquence : dès qu'une action serveur (extract_text/extract_table/ t2a_decision) tient le lock, l'event loop FastAPI gèle entièrement (heartbeats Windows, polls replay/next, get_replay_status, tout). Reproduit aujourd'hui en pré-démo : un replay urgences a fait extract_text → la queue suivante a tenu le lock → polling VWB sur get_replay_status a bloqué le MainThread asyncio → 23 minutes de gel total (py-spy a confirmé MainThread sur api_stream.py:4117). Modifications : 1. get_replay_status : acquire timeboxé 0.5s via run_in_executor (même pattern que /replay/next ligne 2815). Si le lock est tenu, retour immédiat {status: "busy"} → le frontend retentera dans 1s. Aucun cas où ce poll bloque l'event loop. 2. Actions serveur lignes 2994/3000/3006 : enveloppées dans asyncio.wait_for(timeout=180). Borne dure pour qu'un hang d'EasyOCR / Ollama / I/O ne tienne plus jamais le lock indéfiniment. TimeoutError est rattrapée par l'except Exception existant → queue.pop(0) → on continue. Tests : 27/27 baseline sprint QW verts. --- agent_v0/server_v1/api_stream.py | 91 ++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 29 deletions(-) diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 0f5d3a19e..cbf3f090e 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -2988,25 +2988,40 @@ async def get_next_action(session_id: str, machine_id: str = "default"): # les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s). # Le lock reste tenu (queue cohérente) mais l'event loop est libre, # donc les polls concurrents peuvent recevoir {server_busy: True}. + # + # Borne dure 180s par action : un hang d'EasyOCR / Ollama / I/O + # ne doit JAMAIS pouvoir tenir _replay_lock indéfiniment, sinon + # tous les endpoints sous lock (get_replay_status, /replay/next…) + # gèlent le serveur. TimeoutError est rattrapée par l'except + # Exception ci-dessous → queue.pop(0) → on passe à la suite. if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None: try: if type_ == "extract_text": - await loop.run_in_executor( - None, - _handle_extract_text_action, - action, owning_replay, session_id, _last_heartbeat, + await asyncio.wait_for( + loop.run_in_executor( + None, + _handle_extract_text_action, + action, owning_replay, session_id, _last_heartbeat, + ), + timeout=180, ) elif type_ == "extract_table": - await loop.run_in_executor( - None, - _handle_extract_table_action, - action, owning_replay, session_id, _last_heartbeat, + await asyncio.wait_for( + loop.run_in_executor( + None, + _handle_extract_table_action, + action, owning_replay, session_id, _last_heartbeat, + ), + timeout=180, ) elif type_ == "t2a_decision": - await loop.run_in_executor( - None, - _handle_t2a_decision_action, - action, owning_replay, + await asyncio.wait_for( + loop.run_in_executor( + None, + _handle_t2a_decision_action, + action, owning_replay, + ), + timeout=180, ) except Exception as e: logger.warning(f"Action serveur {type_} a levé : {e}") @@ -4113,28 +4128,46 @@ async def get_replay_status(replay_id: str): Quand le replay est en pause supervisee (paused_need_help), la reponse inclut le contexte complet de l'echec : action echouee, screenshot, target_spec, et message utilisateur. + + Endpoint poll-friendly : l'acquisition du lock est timeboxée à 0.5 s. + Si une action serveur lente (extract_text/extract_table/t2a_decision) + tient le lock, le poll repart immédiatement avec status="busy" plutôt + que de bloquer l'event loop FastAPI (qui gèlerait l'ensemble des + endpoints jusqu'à libération). Suite logique du commit 35b27ae49 qui + avait déjà appliqué ce pattern à /replay/next ; QW4 a recâblé le + polling frontend ici → même classe de bug, même remède. """ - with _replay_lock: + import asyncio + loop = asyncio.get_event_loop() + acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 0.5) + if not acquired: + return { + "replay_id": replay_id, + "status": "busy", + "message": "Serveur occupé (action en cours), réessaie dans 1s", + } + try: state = _replay_states.get(replay_id) + if not state: + raise HTTPException( + status_code=404, detail=f"Replay '{replay_id}' non trouvé" + ) - if not state: - raise HTTPException( - status_code=404, detail=f"Replay '{replay_id}' non trouvé" - ) + # Filtrer les champs internes (prefixes par _) + result = {k: v for k, v in state.items() if not k.startswith("_")} - # Filtrer les champs internes (prefixes par _) - result = {k: v for k, v in state.items() if not k.startswith("_")} + # Enrichir avec le contexte de pause si applicable + if state["status"] == "paused_need_help": + session_id = state["session_id"] + remaining = len(_replay_queues.get(session_id, [])) + result["actions_completed"] = state["completed_actions"] + result["actions_remaining"] = remaining + result["message"] = state.get("pause_message", "Replay en pause") + # Le failed_action contient deja screenshot_b64 et target_spec - # Enrichir avec le contexte de pause si applicable - if state["status"] == "paused_need_help": - session_id = state["session_id"] - remaining = len(_replay_queues.get(session_id, [])) - result["actions_completed"] = state["completed_actions"] - result["actions_remaining"] = remaining - result["message"] = state.get("pause_message", "Replay en pause") - # Le failed_action contient deja screenshot_b64 et target_spec - - return result + return result + finally: + _replay_lock.release() @app.get("/api/v1/traces/stream/replays")