diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index cbf3f090e..8890c9f74 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -9,6 +9,7 @@ Inclut les endpoints de replay pour renvoyer des ordres d'exécution à l'Agent """ import atexit +import contextlib import json import logging import os @@ -505,6 +506,33 @@ _pending_lock = threading.Lock() # Chaque session a une queue d'actions à exécuter et un état de replay # ========================================================================= _replay_lock = threading.Lock() + + +# Context manager async pour acquérir _replay_lock sans bloquer l'event loop +# FastAPI. Pattern complémentaire au commit 35b27ae49 (lock async sur +# /replay/next) et 87dbe8c5f (get_replay_status non-bloquant) : tous les +# endpoints `async def` qui faisaient `with _replay_lock:` synchrone gelaient +# l'event loop dès qu'une opération longue tenait le lock dans un autre +# thread. Avec ce helper, l'acquire passe par run_in_executor (l'event loop +# reste libre pour servir les autres requêtes pendant l'attente). Si le lock +# est tenu plus de `timeout` secondes, on retourne 503 plutôt que de geler le +# serveur. +@contextlib.asynccontextmanager +async def _async_replay_lock(timeout: float = 4.5): + import asyncio + loop = asyncio.get_event_loop() + acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, timeout) + if not acquired: + raise HTTPException( + status_code=503, + detail=f"Serveur occupé (lock _replay tenu > {timeout}s) — réessayer", + ) + try: + yield + finally: + _replay_lock.release() + + # session_id -> liste d'actions en attente (FIFO) _replay_queues: Dict[str, List[Dict[str, Any]]] = defaultdict(list) # machine_id -> session_id (mapping pour le replay ciblé par machine) @@ -1982,7 +2010,7 @@ async def start_replay(request: ReplayRequest): resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default") # Injecter les actions dans la queue de la session - with _replay_lock: + async with _async_replay_lock(): _replay_queues[session_id] = list(actions) # Remplacer la queue existante _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -2082,7 +2110,7 @@ async def start_raw_replay(request: RawReplayRequest): session_obj = processor.session_manager.get_session(session_id) resolved_machine_id = target_machine_id or (session_obj.machine_id if session_obj else "default") - with _replay_lock: + async with _async_replay_lock(): # ── Nettoyage : annuler les replays bloqués pour cette machine ── # Un replay en paused_need_help bloque tous les suivants. # Quand on lance un nouveau replay, les anciens sont obsolètes. @@ -2302,7 +2330,7 @@ async def replay_from_session( # ── 5. Injecter dans la queue de replay ── replay_id = f"replay_sess_{uuid.uuid4().hex[:8]}" - with _replay_lock: + async with _async_replay_lock(): _replay_queues[target_session_id] = list(actions) _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -2393,7 +2421,7 @@ async def enqueue_single_action(request: SingleActionRequest): action_id = action["action_id"] - with _replay_lock: + async with _async_replay_lock(): _replay_queues[session_id].append(action) logger.info( @@ -2559,7 +2587,7 @@ async def launch_replay_from_plan(request: PlanReplayRequest): or (session_obj.machine_id if session_obj else "default") ) - with _replay_lock: + async with _async_replay_lock(): _replay_queues[target_session_id] = list(validated) _replay_states[replay_id] = _create_replay_state( replay_id=replay_id, @@ -3117,7 +3145,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): auth_actions = _auth_handler.get_auth_actions(auth_request) if auth_actions: # Injecter les actions d'auth en tête de queue (avant l'action bloquée) - with _replay_lock: + async with _async_replay_lock(): current_q = _replay_queues.get(session_id, []) _replay_queues[session_id] = auth_actions + current_q logger.info( @@ -3126,7 +3154,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): f"type={auth_request.auth_type} (confiance={auth_request.confidence:.2f})" ) # Retourner la première action d'auth immédiatement - with _replay_lock: + async with _async_replay_lock(): first_auth = _replay_queues[session_id].pop(0) return { "action": first_auth, @@ -3174,7 +3202,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): } # Pre-check OK (ou skip) : retirer l'action de la queue et l'envoyer - with _replay_lock: + async with _async_replay_lock(): current_queue = _replay_queues.get(session_id, []) if current_queue and current_queue[0].get("action_id") == action.get("action_id"): current_queue.pop(0) @@ -3306,7 +3334,7 @@ async def report_action_result(report: ReplayResultReport): ) # Trouver le replay correspondant à cette session - with _replay_lock: + async with _async_replay_lock(): replay_state = None for state in _replay_states.values(): if state["session_id"] == session_id and state["status"] == "running": @@ -3339,7 +3367,7 @@ async def report_action_result(report: ReplayResultReport): # Mettre à jour le dernier screenshot reçu screenshot_after = report.screenshot_after or report.screenshot if screenshot_after: - with _replay_lock: + async with _async_replay_lock(): replay_state["last_screenshot"] = screenshot_after # === Vérification post-action === @@ -3410,7 +3438,7 @@ async def report_action_result(report: ReplayResultReport): # Stocker le screenshot actuel comme "before" pour la prochaine action if screenshot_after: - with _replay_lock: + async with _async_replay_lock(): replay_state["_last_screenshot_before"] = screenshot_after # [REPLAY] log structuré de la décision de vérification @@ -3432,7 +3460,7 @@ async def report_action_result(report: ReplayResultReport): ) # === Enregistrer le résultat === - with _replay_lock: + async with _async_replay_lock(): result_entry = { "action_id": action_id, "success": report.success, @@ -3592,7 +3620,7 @@ async def report_action_result(report: ReplayResultReport): except Exception as _mem_exc: logger.debug("Memory record skipped : %s", _mem_exc) - with _replay_lock: + async with _async_replay_lock(): # === Logique de retry / success / failure === if report.success and (verification is None or verification.verified): # Action réussie (vérification OK ou pas de vérification) @@ -4104,7 +4132,7 @@ async def register_error_callback(config: ErrorCallbackConfig): replay_id = config.replay_id callback_url = config.callback_url - with _replay_lock: + async with _async_replay_lock(): if replay_id not in _replay_states: raise HTTPException( status_code=404, @@ -4173,7 +4201,7 @@ async def get_replay_status(replay_id: str): @app.get("/api/v1/traces/stream/replays") async def list_replays(): """Lister tous les replays (actifs, terminés, en erreur).""" - with _replay_lock: + async with _async_replay_lock(): # Filtrer les champs internes (préfixés par _) return { "replays": [ @@ -4205,7 +4233,7 @@ async def resume_replay( `required` doivent figurer dans `acknowledged_check_ids`. Sinon → 400 avec `{"error": "required_checks_missing", "missing": [...]}`. """ - with _replay_lock: + async with _async_replay_lock(): state = _replay_states.get(replay_id) if not state: @@ -4301,7 +4329,7 @@ async def resume_replay( @app.post("/api/v1/traces/stream/replay/{replay_id}/cancel") async def cancel_replay(replay_id: str): """Annuler un replay (quel que soit son statut) et vider sa queue.""" - with _replay_lock: + async with _async_replay_lock(): state = _replay_states.get(replay_id) if not state: raise HTTPException(status_code=404, detail=f"Replay '{replay_id}' non trouvé")