From 864530c85141b07fe6287964b567cf141545e4b7 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 6 May 2026 18:06:42 +0200 Subject: [PATCH] fix(stream): _async_replay_lock helper + 17 endpoints async non-bloquants MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Suite directe des commits 35b27ae49 (lock async sur /replay/next) et 87dbe8c5f (get_replay_status non-bloquant) qui n'avaient traité que 2 endpoints sur les 19 utilisant _replay_lock dans api_stream.py. Reproduit aujourd'hui en pré-démo : un replay urgences a réussi extract_text + t2a_decision (50s, OK), puis a hang sur l'action suivante. start_raw_replay (POST /replay) du nouveau replay a tenté `with _replay_lock:` synchrone à la ligne 2085 → MainThread asyncio gelé → tous les endpoints derrière. Stack via py-spy confirmée. Le pattern systémique : 17 sites `with _replay_lock:` synchrones dans des handlers `async def` (start_replay, start_raw_replay, replay_from_session, enqueue_single_action, launch_replay_from_plan, get_next_action [×3], report_action_result [×5], register_error_callback, list_replays, resume_replay, cancel_replay). Chacun gèle l'event loop FastAPI dès qu'un autre thread tient le lock. Modifications : 1. Helper _async_replay_lock(timeout=4.5) (api_stream.py:516). Acquire via run_in_executor (event loop libre pendant l'attente), timeout 4.5s puis HTTPException 503 plutôt que gel infini. Sémantique acquire+release identique au `with` synchrone. 2. Remplacement automatisé des 17 sites async : `with _replay_lock:` → `async with _async_replay_lock():` 2 sites sync intentionnellement préservés (cleanup loop ligne 689, chat_status_provider ligne 5048 — pas dans des handlers async). 3. Import contextlib ajouté en haut du fichier. Tests : 27/27 baseline sprint QW verts, /health 200 (3ms), /replays 200 (2ms — endpoint qui utilise le nouveau helper). --- agent_v0/server_v1/api_stream.py | 62 +++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index cbf3f090e..8890c9f74 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -9,6 +9,7 @@ Inclut les endpoints de replay pour renvoyer des ordres d'exécution à l'Agent """ import atexit +import contextlib import json import logging import os @@ -505,6 +506,33 @@ _pending_lock = threading.Lock() # Chaque session a une queue d'actions à exécuter et un état de replay # ========================================================================= _replay_lock = threading.Lock() + + +# Context manager async pour acquérir _replay_lock sans bloquer l'event loop +# FastAPI. Pattern complémentaire au commit 35b27ae49 (lock async sur +# /replay/next) et 87dbe8c5f (get_replay_status non-bloquant) : tous les +# endpoints `async def` qui faisaient `with _replay_lock:` synchrone gelaient +# l'event loop dès qu'une opération longue tenait le lock dans un autre +# thread. Avec ce helper, l'acquire passe par run_in_executor (l'event loop +# reste libre pour servir les autres requêtes pendant l'attente). Si le lock +# est tenu plus de `timeout` secondes, on retourne 503 plutôt que de geler le +# serveur. +@contextlib.asynccontextmanager +async def _async_replay_lock(timeout: float = 4.5): + import asyncio + loop = asyncio.get_event_loop() + acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, timeout) + if not acquired: + raise HTTPException( + status_code=503, + detail=f"Serveur occupé (lock _replay tenu > {timeout}s) — réessayer", + ) + try: + yield + finally: + _replay_lock.release() + + # session_id -> liste d'actions en attente (FIFO) _replay_queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list) # machine_id -> session_id (mapping pour le replay ciblé par machine) @@ -1982,7 +2010,7 @@ async def start_replay(request: ReplayRequest): resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default") # Injecter les actions dans la queue de la session - with _replay_lock: + async with _async_replay_lock(): _replay_queues[session_id] = list(actions) # Remplacer la queue existante _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -2082,7 +2110,7 @@ async def start_raw_replay(request: RawReplayRequest): session_obj = processor.session_manager.get_session(session_id) resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default") - with _replay_lock: + async with _async_replay_lock(): # ── Nettoyage : annuler les replays bloqués pour cette machine ── # Un replay en paused_need_help bloque tous les suivants. # Quand on lance un nouveau replay, les anciens sont obsolètes. @@ -2302,7 +2330,7 @@ async def replay_from_session( # ── 5. Injecter dans la queue de replay ── replay_id = f"replay_sess_{uuid.uuid4().hex[:8]}" - with _replay_lock: + async with _async_replay_lock(): _replay_queues[target_session_id] = list(actions) _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -2393,7 +2421,7 @@ async def enqueue_single_action(request: SingleActionRequest): action_id = action["action_id"] - with _replay_lock: + async with _async_replay_lock(): _replay_queues[session_id].append(action) logger.info( @@ -2559,7 +2587,7 @@ async def launch_replay_from_plan(request: PlanReplayRequest): or (session_obj.machine_id if session_obj else "default") ) - with _replay_lock: + async with _async_replay_lock(): _replay_queues[target_session_id] = list(validated) _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -3117,7 +3145,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): auth_actions = _auth_handler.get_auth_actions(auth_request) if auth_actions: # Injecter les actions d'auth en tête de queue (avant l'action bloquée) - with _replay_lock: + async with _async_replay_lock(): current_q = _replay_queues.get(session_id, []) _replay_queues[session_id] = auth_actions + current_q logger.info( @@ -3126,7 +3154,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): f"type={auth_request.auth_type} (confiance={auth_request.confidence:.2f})" ) # Retourner la première action d'auth immédiatement - with _replay_lock: + async with _async_replay_lock(): first_auth = _replay_queues[session_id].pop(0) return { "action": first_auth, @@ -3174,7 +3202,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): } # Pre-check OK (ou skip) : retirer l'action de la queue et l'envoyer - with _replay_lock: + async with _async_replay_lock(): current_queue = _replay_queues.get(session_id, []) if current_queue and current_queue[0].get("action_id") == action.get("action_id"): current_queue.pop(0) @@ -3306,7 +3334,7 @@ async def report_action_result(report: ReplayResultReport): ) # Trouver le replay correspondant à cette session - with _replay_lock: + async with _async_replay_lock(): replay_state = None for state in _replay_states.values(): if state["session_id"] == session_id and state["status"] == "running": @@ -3339,7 +3367,7 @@ async def report_action_result(report: ReplayResultReport): # Mettre à jour le dernier screenshot reçu screenshot_after = report.screenshot_after or report.screenshot if screenshot_after: - with _replay_lock: + async with _async_replay_lock(): replay_state["last_screenshot"] = screenshot_after # === Vérification post-action === @@ -3410,7 +3438,7 @@ async def report_action_result(report: ReplayResultReport): # Stocker le screenshot actuel comme "before" pour la prochaine action if screenshot_after: - with _replay_lock: + async with _async_replay_lock(): replay_state["_last_screenshot_before"] = screenshot_after # [REPLAY] log structuré de la décision de vérification @@ -3432,7 +3460,7 @@ async def report_action_result(report: ReplayResultReport): ) # === Enregistrer le résultat === - with _replay_lock: + async with _async_replay_lock(): result_entry = { "action_id": action_id, "success": report.success, @@ -3592,7 +3620,7 @@ async def report_action_result(report: ReplayResultReport): except Exception as _mem_exc: logger.debug("Memory record skipped : %s", _mem_exc) - with _replay_lock: + async with _async_replay_lock(): # === Logique de retry / success / failure === if report.success and (verification is None or verification.verified): # Action réussie (vérification OK ou pas de vérification) @@ -4104,7 +4132,7 @@ async def register_error_callback(config: ErrorCallbackConfig): replay_id = config.replay_id callback_url = config.callback_url - with _replay_lock: + async with _async_replay_lock(): if replay_id not in _replay_states: raise HTTPException( status_code=404, @@ -4173,7 +4201,7 @@ async def get_replay_status(replay_id: str): @app.get("/api/v1/traces/stream/replays") async def list_replays(): """Lister tous les replays (actifs, terminés, en erreur).""" - with _replay_lock: + async with _async_replay_lock(): # Filtrer les champs internes (préfixés par _) return { "replays": [ @@ -4205,7 +4233,7 @@ async def resume_replay( `required` doivent figurer dans `acknowledged_check_ids`. Sinon → 400 avec `{"error": "required_checks_missing", "missing": [...]}`. """ - with _replay_lock: + async with _async_replay_lock(): state = _replay_states.get(replay_id) if not state: @@ -4301,7 +4329,7 @@ async def resume_replay( @app.post("/api/v1/traces/stream/replay/{replay_id}/cancel") async def cancel_replay(replay_id: str): """Annuler un replay (quel que soit son statut) et vider sa queue.""" - with _replay_lock: + async with _async_replay_lock(): state = _replay_states.get(replay_id) if not state: raise HTTPException(status_code=404, detail=f"Replay '{replay_id}' non trouvé")