fix(stream): _async_replay_lock helper + 17 endpoints async non-bloquants

Suite directe des commits 35b27ae49 (lock async sur /replay/next) et
87dbe8c5f (get_replay_status non-bloquant) qui n'avaient traité que
2 endpoints sur les 19 utilisant _replay_lock dans api_stream.py.

Reproduit aujourd'hui en pré-démo : un replay urgences a réussi
extract_text + t2a_decision (50s, OK), puis a hang sur l'action
suivante. start_raw_replay (POST /replay) du nouveau replay a tenté
`with _replay_lock:` synchrone à la ligne 2085 → MainThread asyncio
gelé → tous les endpoints derrière. Stack via py-spy confirmée.

Le pattern systémique : 17 sites `with _replay_lock:` synchrones
dans des handlers `async def` (start_replay, start_raw_replay,
replay_from_session, enqueue_single_action, launch_replay_from_plan,
get_next_action [×3], report_action_result [×5], register_error_callback,
list_replays, resume_replay, cancel_replay). Chacun gèle l'event
loop FastAPI dès qu'un autre thread tient le lock.

Modifications :

1. Helper _async_replay_lock(timeout=4.5) (api_stream.py:516).
   Acquire via run_in_executor (event loop libre pendant l'attente),
   timeout 4.5s puis HTTPException 503 plutôt que gel infini.
   Sémantique acquire+release identique au `with` synchrone.

2. Remplacement automatisé des 17 sites async :
   `with _replay_lock:` → `async with _async_replay_lock():`
   2 sites sync intentionnellement préservés (cleanup loop ligne 689,
   chat_status_provider ligne 5048 — pas dans des handlers async).

3. Import contextlib ajouté en haut du fichier.

Tests : 27/27 baseline sprint QW verts, /health 200 (3ms),
/replays 200 (2ms — endpoint qui utilise le nouveau helper).
This commit is contained in:
Dom
2026-05-06 18:06:42 +02:00
parent d1ebf62217
commit 864530c851

View File

@@ -9,6 +9,7 @@ Inclut les endpoints de replay pour renvoyer des ordres d'exécution à l'Agent
"""
import atexit
import contextlib
import json
import logging
import os
@@ -505,6 +506,33 @@ _pending_lock = threading.Lock()
# Chaque session a une queue d'actions à exécuter et un état de replay
# =========================================================================
_replay_lock = threading.Lock()
# Context manager async pour acquérir _replay_lock sans bloquer l'event loop
# FastAPI. Pattern complémentaire au commit 35b27ae49 (lock async sur
# /replay/next) et 87dbe8c5f (get_replay_status non-bloquant) : tous les
# endpoints `async def` qui faisaient `with _replay_lock:` synchrone gelaient
# l'event loop dès qu'une opération longue tenait le lock dans un autre
# thread. Avec ce helper, l'acquire passe par run_in_executor (l'event loop
# reste libre pour servir les autres requêtes pendant l'attente). Si le lock
# est tenu plus de `timeout` secondes, on retourne 503 plutôt que de geler le
# serveur.
@contextlib.asynccontextmanager
async def _async_replay_lock(timeout: float = 4.5):
import asyncio
loop = asyncio.get_event_loop()
acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, timeout)
if not acquired:
raise HTTPException(
status_code=503,
detail=f"Serveur occupé (lock _replay tenu > {timeout}s) — réessayer",
)
try:
yield
finally:
_replay_lock.release()
# session_id -> liste d'actions en attente (FIFO)
_replay_queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
# machine_id -> session_id (mapping pour le replay ciblé par machine)
@@ -1982,7 +2010,7 @@ async def start_replay(request: ReplayRequest):
resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default")
# Injecter les actions dans la queue de la session
with _replay_lock:
async with _async_replay_lock():
_replay_queues[session_id] = list(actions) # Remplacer la queue existante
_replay_states[replay_id] = _create_replay_state(
replay_id=replay_id,
@@ -2082,7 +2110,7 @@ async def start_raw_replay(request: RawReplayRequest):
session_obj = processor.session_manager.get_session(session_id)
resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default")
with _replay_lock:
async with _async_replay_lock():
# ── Nettoyage : annuler les replays bloqués pour cette machine ──
# Un replay en paused_need_help bloque tous les suivants.
# Quand on lance un nouveau replay, les anciens sont obsolètes.
@@ -2302,7 +2330,7 @@ async def replay_from_session(
# ── 5. Injecter dans la queue de replay ──
replay_id = f"replay_sess_{uuid.uuid4().hex[:8]}"
with _replay_lock:
async with _async_replay_lock():
_replay_queues[target_session_id] = list(actions)
_replay_states[replay_id] = _create_replay_state(
replay_id=replay_id,
@@ -2393,7 +2421,7 @@ async def enqueue_single_action(request: SingleActionRequest):
action_id = action["action_id"]
with _replay_lock:
async with _async_replay_lock():
_replay_queues[session_id].append(action)
logger.info(
@@ -2559,7 +2587,7 @@ async def launch_replay_from_plan(request: PlanReplayRequest):
or (session_obj.machine_id if session_obj else "default")
)
with _replay_lock:
async with _async_replay_lock():
_replay_queues[target_session_id] = list(validated)
_replay_states[replay_id] = _create_replay_state(
replay_id=replay_id,
@@ -3117,7 +3145,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
auth_actions = _auth_handler.get_auth_actions(auth_request)
if auth_actions:
# Injecter les actions d'auth en tête de queue (avant l'action bloquée)
with _replay_lock:
async with _async_replay_lock():
current_q = _replay_queues.get(session_id, [])
_replay_queues[session_id] = auth_actions + current_q
logger.info(
@@ -3126,7 +3154,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
f"type={auth_request.auth_type} (confiance={auth_request.confidence:.2f})"
)
# Retourner la première action d'auth immédiatement
with _replay_lock:
async with _async_replay_lock():
first_auth = _replay_queues[session_id].pop(0)
return {
"action": first_auth,
@@ -3174,7 +3202,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
}
# Pre-check OK (ou skip) : retirer l'action de la queue et l'envoyer
with _replay_lock:
async with _async_replay_lock():
current_queue = _replay_queues.get(session_id, [])
if current_queue and current_queue[0].get("action_id") == action.get("action_id"):
current_queue.pop(0)
@@ -3306,7 +3334,7 @@ async def report_action_result(report: ReplayResultReport):
)
# Trouver le replay correspondant à cette session
with _replay_lock:
async with _async_replay_lock():
replay_state = None
for state in _replay_states.values():
if state["session_id"] == session_id and state["status"] == "running":
@@ -3339,7 +3367,7 @@ async def report_action_result(report: ReplayResultReport):
# Mettre à jour le dernier screenshot reçu
screenshot_after = report.screenshot_after or report.screenshot
if screenshot_after:
with _replay_lock:
async with _async_replay_lock():
replay_state["last_screenshot"] = screenshot_after
# === Vérification post-action ===
@@ -3410,7 +3438,7 @@ async def report_action_result(report: ReplayResultReport):
# Stocker le screenshot actuel comme "before" pour la prochaine action
if screenshot_after:
with _replay_lock:
async with _async_replay_lock():
replay_state["_last_screenshot_before"] = screenshot_after
# [REPLAY] log structuré de la décision de vérification
@@ -3432,7 +3460,7 @@ async def report_action_result(report: ReplayResultReport):
)
# === Enregistrer le résultat ===
with _replay_lock:
async with _async_replay_lock():
result_entry = {
"action_id": action_id,
"success": report.success,
@@ -3592,7 +3620,7 @@ async def report_action_result(report: ReplayResultReport):
except Exception as _mem_exc:
logger.debug("Memory record skipped : %s", _mem_exc)
with _replay_lock:
async with _async_replay_lock():
# === Logique de retry / success / failure ===
if report.success and (verification is None or verification.verified):
# Action réussie (vérification OK ou pas de vérification)
@@ -4104,7 +4132,7 @@ async def register_error_callback(config: ErrorCallbackConfig):
replay_id = config.replay_id
callback_url = config.callback_url
with _replay_lock:
async with _async_replay_lock():
if replay_id not in _replay_states:
raise HTTPException(
status_code=404,
@@ -4173,7 +4201,7 @@ async def get_replay_status(replay_id: str):
@app.get("/api/v1/traces/stream/replays")
async def list_replays():
"""Lister tous les replays (actifs, terminés, en erreur)."""
with _replay_lock:
async with _async_replay_lock():
# Filtrer les champs internes (préfixés par _)
return {
"replays": [
@@ -4205,7 +4233,7 @@ async def resume_replay(
`required` doivent figurer dans `acknowledged_check_ids`. Sinon → 400
avec `{"error": "required_checks_missing", "missing": [...]}`.
"""
with _replay_lock:
async with _async_replay_lock():
state = _replay_states.get(replay_id)
if not state:
@@ -4301,7 +4329,7 @@ async def resume_replay(
@app.post("/api/v1/traces/stream/replay/{replay_id}/cancel")
async def cancel_replay(replay_id: str):
"""Annuler un replay (quel que soit son statut) et vider sa queue."""
with _replay_lock:
async with _async_replay_lock():
state = _replay_states.get(replay_id)
if not state:
raise HTTPException(status_code=404, detail=f"Replay '{replay_id}' non trouvé")