fix(stream): get_replay_status non-bloquant + bornage actions serveur
Suite du commit35b27ae49(lock async sur /replay/next) qui n'avait traité que la moitié du problème. Le sprint QW4 (commitf5c33477f) a recâblé le polling frontend PauseDialog vers /replay/{replay_id} → get_replay_status, qui gardait un `with _replay_lock:` synchrone. Conséquence : dès qu'une action serveur (extract_text/extract_table/ t2a_decision) tient le lock, l'event loop FastAPI gèle entièrement (heartbeats Windows, polls replay/next, get_replay_status, tout). Reproduit aujourd'hui en pré-démo : un replay urgences a fait extract_text → la queue suivante a tenu le lock → polling VWB sur get_replay_status a bloqué le MainThread asyncio → 23 minutes de gel total (py-spy a confirmé MainThread sur api_stream.py:4117). Modifications : 1. get_replay_status : acquire timeboxé 0.5s via run_in_executor (même pattern que /replay/next ligne 2815). Si le lock est tenu, retour immédiat {status: "busy"} → le frontend retentera dans 1s. Aucun cas où ce poll bloque l'event loop. 2. Actions serveur lignes 2994/3000/3006 : enveloppées dans asyncio.wait_for(timeout=180). Borne dure pour qu'un hang d'EasyOCR / Ollama / I/O ne tienne plus jamais le lock indéfiniment. TimeoutError est rattrapée par l'except Exception existant → queue.pop(0) → on continue. Tests : 27/27 baseline sprint QW verts.
This commit is contained in:
@@ -2988,25 +2988,40 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
|
||||
# les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s).
|
||||
# Le lock reste tenu (queue cohérente) mais l'event loop est libre,
|
||||
# donc les polls concurrents peuvent recevoir {server_busy: True}.
|
||||
#
|
||||
# Borne dure 180s par action : un hang d'EasyOCR / Ollama / I/O
|
||||
# ne doit JAMAIS pouvoir tenir _replay_lock indéfiniment, sinon
|
||||
# tous les endpoints sous lock (get_replay_status, /replay/next…)
|
||||
# gèlent le serveur. TimeoutError est rattrapée par l'except
|
||||
# Exception ci-dessous → queue.pop(0) → on passe à la suite.
|
||||
if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None:
|
||||
try:
|
||||
if type_ == "extract_text":
|
||||
await loop.run_in_executor(
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(
|
||||
None,
|
||||
_handle_extract_text_action,
|
||||
action, owning_replay, session_id, _last_heartbeat,
|
||||
),
|
||||
timeout=180,
|
||||
)
|
||||
elif type_ == "extract_table":
|
||||
await loop.run_in_executor(
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(
|
||||
None,
|
||||
_handle_extract_table_action,
|
||||
action, owning_replay, session_id, _last_heartbeat,
|
||||
),
|
||||
timeout=180,
|
||||
)
|
||||
elif type_ == "t2a_decision":
|
||||
await loop.run_in_executor(
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(
|
||||
None,
|
||||
_handle_t2a_decision_action,
|
||||
action, owning_replay,
|
||||
),
|
||||
timeout=180,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(f"Action serveur {type_} a levé : {e}")
|
||||
@@ -4113,10 +4128,26 @@ async def get_replay_status(replay_id: str):
|
||||
Quand le replay est en pause supervisee (paused_need_help), la reponse
|
||||
inclut le contexte complet de l'echec : action echouee, screenshot,
|
||||
target_spec, et message utilisateur.
|
||||
"""
|
||||
with _replay_lock:
|
||||
state = _replay_states.get(replay_id)
|
||||
|
||||
Endpoint poll-friendly : l'acquisition du lock est timeboxée à 0.5 s.
|
||||
Si une action serveur lente (extract_text/extract_table/t2a_decision)
|
||||
tient le lock, le poll repart immédiatement avec status="busy" plutôt
|
||||
que de bloquer l'event loop FastAPI (qui gèlerait l'ensemble des
|
||||
endpoints jusqu'à libération). Suite logique du commit 35b27ae49 qui
|
||||
avait déjà appliqué ce pattern à /replay/next ; QW4 a recâblé le
|
||||
polling frontend ici → même classe de bug, même remède.
|
||||
"""
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 0.5)
|
||||
if not acquired:
|
||||
return {
|
||||
"replay_id": replay_id,
|
||||
"status": "busy",
|
||||
"message": "Serveur occupé (action en cours), réessaie dans 1s",
|
||||
}
|
||||
try:
|
||||
state = _replay_states.get(replay_id)
|
||||
if not state:
|
||||
raise HTTPException(
|
||||
status_code=404, detail=f"Replay '{replay_id}' non trouvé"
|
||||
@@ -4135,6 +4166,8 @@ async def get_replay_status(replay_id: str):
|
||||
# Le failed_action contient deja screenshot_b64 et target_spec
|
||||
|
||||
return result
|
||||
finally:
|
||||
_replay_lock.release()
|
||||
|
||||
|
||||
@app.get("/api/v1/traces/stream/replays")
|
||||
|
||||
Reference in New Issue
Block a user