fix(stream): get_replay_status non-bloquant + bornage actions serveur
Suite du commit35b27ae49(lock async sur /replay/next) qui n'avait traité que la moitié du problème. Le sprint QW4 (commitf5c33477f) a recâblé le polling frontend PauseDialog vers /replay/{replay_id} → get_replay_status, qui gardait un `with _replay_lock:` synchrone. Conséquence : dès qu'une action serveur (extract_text/extract_table/ t2a_decision) tient le lock, l'event loop FastAPI gèle entièrement (heartbeats Windows, polls replay/next, get_replay_status, tout). Reproduit aujourd'hui en pré-démo : un replay urgences a fait extract_text → la queue suivante a tenu le lock → polling VWB sur get_replay_status a bloqué le MainThread asyncio → 23 minutes de gel total (py-spy a confirmé MainThread sur api_stream.py:4117). Modifications : 1. get_replay_status : acquire timeboxé 0.5s via run_in_executor (même pattern que /replay/next ligne 2815). Si le lock est tenu, retour immédiat {status: "busy"} → le frontend retentera dans 1s. Aucun cas où ce poll bloque l'event loop. 2. Actions serveur lignes 2994/3000/3006 : enveloppées dans asyncio.wait_for(timeout=180). Borne dure pour qu'un hang d'EasyOCR / Ollama / I/O ne tienne plus jamais le lock indéfiniment. TimeoutError est rattrapée par l'except Exception existant → queue.pop(0) → on continue. Tests : 27/27 baseline sprint QW verts.
This commit is contained in:
@@ -2988,25 +2988,40 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
|
|||||||
# les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s).
|
# les autres polls (extract_text OCR ~5s, t2a_decision LLM ~8-13s).
|
||||||
# Le lock reste tenu (queue cohérente) mais l'event loop est libre,
|
# Le lock reste tenu (queue cohérente) mais l'event loop est libre,
|
||||||
# donc les polls concurrents peuvent recevoir {server_busy: True}.
|
# donc les polls concurrents peuvent recevoir {server_busy: True}.
|
||||||
|
#
|
||||||
|
# Borne dure 180s par action : un hang d'EasyOCR / Ollama / I/O
|
||||||
|
# ne doit JAMAIS pouvoir tenir _replay_lock indéfiniment, sinon
|
||||||
|
# tous les endpoints sous lock (get_replay_status, /replay/next…)
|
||||||
|
# gèlent le serveur. TimeoutError est rattrapée par l'except
|
||||||
|
# Exception ci-dessous → queue.pop(0) → on passe à la suite.
|
||||||
if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None:
|
if type_ in _SERVER_SIDE_ACTION_TYPES and owning_replay is not None:
|
||||||
try:
|
try:
|
||||||
if type_ == "extract_text":
|
if type_ == "extract_text":
|
||||||
await loop.run_in_executor(
|
await asyncio.wait_for(
|
||||||
|
loop.run_in_executor(
|
||||||
None,
|
None,
|
||||||
_handle_extract_text_action,
|
_handle_extract_text_action,
|
||||||
action, owning_replay, session_id, _last_heartbeat,
|
action, owning_replay, session_id, _last_heartbeat,
|
||||||
|
),
|
||||||
|
timeout=180,
|
||||||
)
|
)
|
||||||
elif type_ == "extract_table":
|
elif type_ == "extract_table":
|
||||||
await loop.run_in_executor(
|
await asyncio.wait_for(
|
||||||
|
loop.run_in_executor(
|
||||||
None,
|
None,
|
||||||
_handle_extract_table_action,
|
_handle_extract_table_action,
|
||||||
action, owning_replay, session_id, _last_heartbeat,
|
action, owning_replay, session_id, _last_heartbeat,
|
||||||
|
),
|
||||||
|
timeout=180,
|
||||||
)
|
)
|
||||||
elif type_ == "t2a_decision":
|
elif type_ == "t2a_decision":
|
||||||
await loop.run_in_executor(
|
await asyncio.wait_for(
|
||||||
|
loop.run_in_executor(
|
||||||
None,
|
None,
|
||||||
_handle_t2a_decision_action,
|
_handle_t2a_decision_action,
|
||||||
action, owning_replay,
|
action, owning_replay,
|
||||||
|
),
|
||||||
|
timeout=180,
|
||||||
)
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Action serveur {type_} a levé : {e}")
|
logger.warning(f"Action serveur {type_} a levé : {e}")
|
||||||
@@ -4113,10 +4128,26 @@ async def get_replay_status(replay_id: str):
|
|||||||
Quand le replay est en pause supervisee (paused_need_help), la reponse
|
Quand le replay est en pause supervisee (paused_need_help), la reponse
|
||||||
inclut le contexte complet de l'echec : action echouee, screenshot,
|
inclut le contexte complet de l'echec : action echouee, screenshot,
|
||||||
target_spec, et message utilisateur.
|
target_spec, et message utilisateur.
|
||||||
"""
|
|
||||||
with _replay_lock:
|
|
||||||
state = _replay_states.get(replay_id)
|
|
||||||
|
|
||||||
|
Endpoint poll-friendly : l'acquisition du lock est timeboxée à 0.5 s.
|
||||||
|
Si une action serveur lente (extract_text/extract_table/t2a_decision)
|
||||||
|
tient le lock, le poll repart immédiatement avec status="busy" plutôt
|
||||||
|
que de bloquer l'event loop FastAPI (qui gèlerait l'ensemble des
|
||||||
|
endpoints jusqu'à libération). Suite logique du commit 35b27ae49 qui
|
||||||
|
avait déjà appliqué ce pattern à /replay/next ; QW4 a recâblé le
|
||||||
|
polling frontend ici → même classe de bug, même remède.
|
||||||
|
"""
|
||||||
|
import asyncio
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 0.5)
|
||||||
|
if not acquired:
|
||||||
|
return {
|
||||||
|
"replay_id": replay_id,
|
||||||
|
"status": "busy",
|
||||||
|
"message": "Serveur occupé (action en cours), réessaie dans 1s",
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
state = _replay_states.get(replay_id)
|
||||||
if not state:
|
if not state:
|
||||||
raise HTTPException(
|
raise HTTPException(
|
||||||
status_code=404, detail=f"Replay '{replay_id}' non trouvé"
|
status_code=404, detail=f"Replay '{replay_id}' non trouvé"
|
||||||
@@ -4135,6 +4166,8 @@ async def get_replay_status(replay_id: str):
|
|||||||
# Le failed_action contient deja screenshot_b64 et target_spec
|
# Le failed_action contient deja screenshot_b64 et target_spec
|
||||||
|
|
||||||
return result
|
return result
|
||||||
|
finally:
|
||||||
|
_replay_lock.release()
|
||||||
|
|
||||||
|
|
||||||
@app.get("/api/v1/traces/stream/replays")
|
@app.get("/api/v1/traces/stream/replays")
|
||||||
|
|||||||
Reference in New Issue
Block a user