fix(agent-chat): suivi replay distant + timeout 15s
- Session ID vide pour auto-détection de la session Agent V1 active
- Timeout augmenté de 5s à 15s pour la requête replay
- Ajout _poll_replay_progress : suit la progression réelle du replay
(polling /replay/{id} toutes les 2s, max 120s) au lieu de marquer
faussement "terminé avec succès" immédiatement
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -994,10 +994,10 @@ def _try_streaming_server_replay(workflow_id: str, params: Dict[str, Any]) -> Op
|
|||||||
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay",
|
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay",
|
||||||
json={
|
json={
|
||||||
"workflow_id": workflow_id,
|
"workflow_id": workflow_id,
|
||||||
"session_id": f"chat_{datetime.now().strftime('%H%M%S')}",
|
"session_id": "", # Vide = auto-détection de la session Agent V1 active
|
||||||
"params": params or {},
|
"params": params or {},
|
||||||
},
|
},
|
||||||
timeout=5,
|
timeout=15,
|
||||||
)
|
)
|
||||||
if resp.status_code == 200:
|
if resp.status_code == 200:
|
||||||
data = resp.json()
|
data = resp.json()
|
||||||
@@ -1018,6 +1018,64 @@ def _try_streaming_server_replay(workflow_id: str, params: Dict[str, Any]) -> Op
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _poll_replay_progress(replay_id: str, workflow_name: str, total_actions: int):
|
||||||
|
"""Suivre la progression d'un replay distant via polling."""
|
||||||
|
import time
|
||||||
|
|
||||||
|
max_wait = 120 # 2 minutes max
|
||||||
|
poll_interval = 2.0
|
||||||
|
elapsed = 0
|
||||||
|
|
||||||
|
while elapsed < max_wait and execution_status.get("running"):
|
||||||
|
time.sleep(poll_interval)
|
||||||
|
elapsed += poll_interval
|
||||||
|
|
||||||
|
try:
|
||||||
|
resp = http_requests.get(
|
||||||
|
f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay/{replay_id}",
|
||||||
|
timeout=3,
|
||||||
|
)
|
||||||
|
if not resp.ok:
|
||||||
|
continue
|
||||||
|
|
||||||
|
data = resp.json()
|
||||||
|
status = data.get("status", "unknown")
|
||||||
|
completed = data.get("completed_actions", 0)
|
||||||
|
failed = data.get("failed_actions", 0)
|
||||||
|
progress = int(10 + (completed / max(total_actions, 1)) * 80)
|
||||||
|
|
||||||
|
socketio.emit('execution_progress', {
|
||||||
|
"progress": progress,
|
||||||
|
"step": f"Action {completed}/{total_actions} exécutée",
|
||||||
|
"current": completed,
|
||||||
|
"total": total_actions,
|
||||||
|
})
|
||||||
|
|
||||||
|
if status == "completed":
|
||||||
|
finish_execution(
|
||||||
|
workflow_name, failed == 0,
|
||||||
|
f"Replay terminé : {completed} actions exécutées"
|
||||||
|
+ (f", {failed} échecs" if failed else "")
|
||||||
|
)
|
||||||
|
return
|
||||||
|
elif status == "failed":
|
||||||
|
finish_execution(
|
||||||
|
workflow_name, False,
|
||||||
|
f"Replay échoué : {completed}/{total_actions} actions, {failed} échecs"
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Poll replay progress: {e}")
|
||||||
|
|
||||||
|
# Timeout
|
||||||
|
if execution_status.get("running"):
|
||||||
|
finish_execution(
|
||||||
|
workflow_name, False,
|
||||||
|
f"Timeout — replay toujours en cours après {max_wait}s"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def execute_workflow(match, params):
|
def execute_workflow(match, params):
|
||||||
"""
|
"""
|
||||||
Exécuter un workflow — tente d'abord le streaming server,
|
Exécuter un workflow — tente d'abord le streaming server,
|
||||||
@@ -1031,22 +1089,31 @@ def execute_workflow(match, params):
|
|||||||
replay_result = _try_streaming_server_replay(match.workflow_id, params)
|
replay_result = _try_streaming_server_replay(match.workflow_id, params)
|
||||||
if replay_result:
|
if replay_result:
|
||||||
# Le streaming server a accepté le replay
|
# Le streaming server a accepté le replay
|
||||||
|
total_actions = replay_result.get("total_actions", 1)
|
||||||
|
target_session = replay_result.get("session_id", "?")
|
||||||
execution_status["running"] = True
|
execution_status["running"] = True
|
||||||
execution_status["workflow"] = match.workflow_name
|
execution_status["workflow"] = match.workflow_name
|
||||||
execution_status["progress"] = 50
|
execution_status["progress"] = 10
|
||||||
execution_status["message"] = "Envoyé au streaming server (Agent V1)"
|
execution_status["message"] = f"Envoyé à l'Agent V1 ({target_session})"
|
||||||
|
|
||||||
socketio.emit('execution_progress', {
|
socketio.emit('execution_progress', {
|
||||||
"progress": 50,
|
"progress": 10,
|
||||||
"step": "Exécution via streaming server...",
|
"step": f"Replay envoyé à l'Agent V1 — {total_actions} actions en attente",
|
||||||
"current": 1,
|
"current": 0,
|
||||||
"total": 1,
|
"total": total_actions,
|
||||||
})
|
})
|
||||||
|
|
||||||
finish_execution(
|
# Suivre la progression du replay (polling toutes les 2s, max 120s)
|
||||||
match.workflow_name, True,
|
replay_id = replay_result.get("replay_id")
|
||||||
f"Workflow envoyé au streaming server ({replay_result.get('status', 'ok')})"
|
if replay_id:
|
||||||
)
|
socketio.start_background_task(
|
||||||
|
_poll_replay_progress, replay_id, match.workflow_name, total_actions
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
finish_execution(
|
||||||
|
match.workflow_name, True,
|
||||||
|
f"Replay envoyé ({total_actions} actions)"
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Fallback : exécution locale
|
# Fallback : exécution locale
|
||||||
|
|||||||
Reference in New Issue
Block a user