Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé, polls morts ×2). Point de rollback stable. Contenu: - agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab hotkey fallback, confirm_save Unicode apostrophe, foreground dialog recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint, requires_post_verify_window_transition - agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed) - server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s - server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan, metrics endpoint - server_v1/replay_engine.py: _schedule_retry préserve original_action + dispatched_action - stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab on save_as dialog open) + _attach_expected_window_before - tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py - tests/unit: test_executor_verify_window_guard.py (start_button, close_tab, runtime_dialog, post_verify, transition fallbacks) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
419 lines
14 KiB
Python
419 lines
14 KiB
Python
# agent_v0/lea_ui/server_client.py
|
|
"""
|
|
Client API pour communiquer avec le serveur Linux RPA Vision V3.
|
|
|
|
Endpoints cibles :
|
|
- Agent Chat (port 5004) : /api/chat, /api/workflows
|
|
- Streaming Server (port 5005) : /api/v1/traces/stream/replay/next, etc.
|
|
|
|
Le polling tourne dans un thread separe pour ne pas bloquer la UI Qt.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
from typing import Any, Callable, Dict, List, Optional
|
|
|
|
logger = logging.getLogger("lea_ui.server_client")
|
|
|
|
|
|
def _get_server_url() -> str:
|
|
"""Recuperer l'URL du serveur RPA (avec /api/v1).
|
|
|
|
Ordre de resolution :
|
|
1. Import depuis agent_v1.config (source de verite unique)
|
|
2. Variable d'environnement RPA_SERVER_URL
|
|
3. Fallback http://localhost:5005/api/v1
|
|
"""
|
|
# 1. Import depuis config.py (source de verite)
|
|
try:
|
|
from agent_v1.config import SERVER_URL
|
|
return SERVER_URL
|
|
except ImportError:
|
|
pass
|
|
|
|
# 2. Variable d'environnement directe
|
|
url = os.environ.get("RPA_SERVER_URL", "").strip().rstrip("/")
|
|
if url:
|
|
return url
|
|
|
|
# 3. Fallback
|
|
return "http://localhost:5005/api/v1"
|
|
|
|
|
|
def _get_server_base(server_url: str) -> str:
|
|
"""Extraire la base URL (sans /api/v1) pour les routes racine (/health)."""
|
|
return server_url.rsplit("/api/v1", 1)[0]
|
|
|
|
|
|
class LeaServerClient:
|
|
"""Client API thread-safe vers le serveur RPA Vision V3.
|
|
|
|
Gere la communication HTTP avec le serveur chat (port 5004)
|
|
et le serveur de streaming (port 5005).
|
|
Le polling replay tourne dans un thread daemon separe.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
server_host: Optional[str] = None,
|
|
chat_port: int = 5004,
|
|
stream_port: int = 5005,
|
|
) -> None:
|
|
# URL unifiée : SERVER_URL contient TOUJOURS /api/v1 (convention INC-1).
|
|
# _stream_url = URL avec /api/v1 (pour les routes API)
|
|
# _stream_base = URL sans /api/v1 (pour /health uniquement)
|
|
self._stream_url = _get_server_url()
|
|
self._stream_base = _get_server_base(self._stream_url)
|
|
|
|
# Extraire le host depuis l'URL pour le chat et pour l'affichage
|
|
try:
|
|
from urllib.parse import urlparse
|
|
parsed = urlparse(self._stream_base)
|
|
self._host = parsed.hostname or "localhost"
|
|
except Exception:
|
|
self._host = server_host or "localhost"
|
|
|
|
self._chat_port = chat_port
|
|
self._stream_port = stream_port
|
|
self._chat_base = f"http://{self._host}:{self._chat_port}"
|
|
|
|
# Etat de connexion
|
|
self._connected = False
|
|
self._last_error: Optional[str] = None
|
|
|
|
# Callbacks UI (appelees depuis le thread de polling)
|
|
self._on_connection_change: Optional[Callable[[bool], None]] = None
|
|
self._on_replay_action: Optional[Callable[[Dict[str, Any]], None]] = None
|
|
self._on_chat_response: Optional[Callable[[Dict[str, Any]], None]] = None
|
|
|
|
# Thread de polling
|
|
self._polling = False
|
|
self._poll_thread: Optional[threading.Thread] = None
|
|
self._poll_interval = 1.0 # secondes
|
|
|
|
# Session de chat
|
|
self._chat_session_id: Optional[str] = None
|
|
|
|
# Token API pour le serveur streaming (auth Bearer)
|
|
self._api_token = os.environ.get("RPA_API_TOKEN", "")
|
|
|
|
logger.info(
|
|
"LeaServerClient initialise : chat=%s, stream_url=%s, stream_base=%s",
|
|
self._chat_base, self._stream_url, self._stream_base,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Auth
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _auth_headers(self) -> Dict[str, str]:
|
|
"""Headers d'authentification pour le serveur streaming."""
|
|
if self._api_token:
|
|
return {"Authorization": f"Bearer {self._api_token}"}
|
|
return {}
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Proprietes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@property
|
|
def connected(self) -> bool:
|
|
return self._connected
|
|
|
|
@property
|
|
def server_host(self) -> str:
|
|
return self._host
|
|
|
|
@property
|
|
def last_error(self) -> Optional[str]:
|
|
return self._last_error
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Callbacks
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def set_on_connection_change(self, callback: Callable[[bool], None]) -> None:
|
|
"""Callback appelee quand l'etat de connexion change."""
|
|
self._on_connection_change = callback
|
|
|
|
def set_on_replay_action(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
|
"""Callback appelee quand une action de replay est recue."""
|
|
self._on_replay_action = callback
|
|
|
|
def set_on_chat_response(self, callback: Callable[[Dict[str, Any]], None]) -> None:
|
|
"""Callback appelee quand une reponse chat est recue."""
|
|
self._on_chat_response = callback
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Connexion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def check_connection(self) -> bool:
|
|
"""Tester la connexion au serveur streaming (port 5005).
|
|
|
|
Le health check utilise _stream_base (sans /api/v1) car la route
|
|
/health est a la racine du serveur FastAPI, pas sous /api/v1.
|
|
"""
|
|
try:
|
|
import requests
|
|
resp = requests.get(
|
|
f"{self._stream_base}/health",
|
|
headers=self._auth_headers(),
|
|
timeout=5,
|
|
)
|
|
was_connected = self._connected
|
|
self._connected = resp.ok
|
|
self._last_error = None
|
|
|
|
if self._connected != was_connected and self._on_connection_change:
|
|
self._on_connection_change(self._connected)
|
|
|
|
return self._connected
|
|
except Exception as e:
|
|
was_connected = self._connected
|
|
self._connected = False
|
|
self._last_error = str(e)
|
|
|
|
if was_connected and self._on_connection_change:
|
|
self._on_connection_change(False)
|
|
|
|
return False
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Chat API (port 5004)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def send_chat_message(self, message: str) -> Optional[Dict[str, Any]]:
|
|
"""Envoyer un message au chat et retourner la reponse.
|
|
|
|
Retourne None en cas d'erreur reseau.
|
|
"""
|
|
try:
|
|
import requests
|
|
payload = {
|
|
"message": message,
|
|
}
|
|
if self._chat_session_id:
|
|
payload["session_id"] = self._chat_session_id
|
|
|
|
resp = requests.post(
|
|
f"{self._chat_base}/api/chat",
|
|
json=payload,
|
|
timeout=30,
|
|
)
|
|
|
|
if resp.ok:
|
|
data = resp.json()
|
|
# Sauvegarder le session_id pour le contexte multi-tour
|
|
if "session_id" in data:
|
|
self._chat_session_id = data["session_id"]
|
|
self._connected = True
|
|
return data
|
|
else:
|
|
self._last_error = f"HTTP {resp.status_code}"
|
|
logger.warning("Chat API erreur : %s", self._last_error)
|
|
return None
|
|
|
|
except Exception as e:
|
|
self._last_error = str(e)
|
|
self._connected = False
|
|
logger.error("Chat API exception : %s", e)
|
|
return None
|
|
|
|
def list_workflows(self) -> List[Dict[str, Any]]:
|
|
"""Recuperer la liste des workflows depuis le serveur streaming."""
|
|
try:
|
|
import requests
|
|
headers = self._auth_headers()
|
|
resp = requests.get(
|
|
f"{self._stream_url}/traces/stream/workflows",
|
|
headers=headers,
|
|
timeout=10,
|
|
)
|
|
if resp.ok:
|
|
data = resp.json()
|
|
self._connected = True
|
|
# L'API renvoie directement une liste ou un dict avec clé "workflows"
|
|
if isinstance(data, list):
|
|
return data
|
|
return data.get("workflows", [])
|
|
return []
|
|
except Exception as e:
|
|
self._last_error = str(e)
|
|
logger.error("List workflows erreur : %s", e)
|
|
return []
|
|
|
|
def list_gestures(self) -> List[Dict[str, Any]]:
|
|
"""Recuperer la liste des gestes (non disponible sur streaming server)."""
|
|
# Les gestes etaient sur le chat server (5004) qui n'est plus utilise.
|
|
# Retourner une liste vide silencieusement.
|
|
return []
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Replay Polling (port 5005)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def start_polling(self, session_id: str) -> None:
|
|
"""Demarrer le polling des actions de replay dans un thread daemon."""
|
|
if self._polling:
|
|
return
|
|
|
|
self._polling = True
|
|
self._poll_session_id = session_id
|
|
self._poll_thread = threading.Thread(
|
|
target=self._poll_loop,
|
|
daemon=True,
|
|
name="lea-replay-poll",
|
|
)
|
|
self._poll_thread.start()
|
|
logger.info("Polling replay demarre pour session %s", session_id)
|
|
|
|
def stop_polling(self) -> None:
|
|
"""Arreter le polling."""
|
|
self._polling = False
|
|
if self._poll_thread:
|
|
self._poll_thread.join(timeout=3)
|
|
self._poll_thread = None
|
|
logger.info("Polling replay arrete")
|
|
|
|
def _poll_loop(self) -> None:
|
|
"""Boucle de polling dans un thread separe."""
|
|
import requests as req_lib
|
|
|
|
while self._polling:
|
|
try:
|
|
resp = req_lib.get(
|
|
f"{self._stream_url}/traces/stream/replay/next",
|
|
params={"session_id": self._poll_session_id},
|
|
headers=self._auth_headers(),
|
|
timeout=5,
|
|
)
|
|
|
|
if resp.ok:
|
|
data = resp.json()
|
|
action = data.get("action")
|
|
if action and self._on_replay_action:
|
|
self._on_replay_action(action)
|
|
# Apres une action, poll plus rapidement
|
|
time.sleep(0.2)
|
|
continue
|
|
|
|
except req_lib.exceptions.ConnectionError:
|
|
# Serveur non disponible — silencieux
|
|
pass
|
|
except req_lib.exceptions.Timeout:
|
|
pass
|
|
except Exception as e:
|
|
logger.error("Erreur poll replay : %s", e)
|
|
|
|
time.sleep(self._poll_interval)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Replay Status
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def get_replay_status(self) -> Optional[Dict[str, Any]]:
|
|
"""Recuperer l'etat des replays en cours."""
|
|
try:
|
|
import requests
|
|
resp = requests.get(
|
|
f"{self._stream_url}/traces/stream/replays",
|
|
headers=self._auth_headers(),
|
|
timeout=5,
|
|
)
|
|
if resp.ok:
|
|
data = resp.json()
|
|
replays = data.get("replays", [])
|
|
# Retourner le premier replay actif
|
|
for r in replays:
|
|
if r.get("status") == "running":
|
|
return r
|
|
return None
|
|
return None
|
|
except Exception:
|
|
return None
|
|
|
|
def resume_replay(self, replay_id: str) -> bool:
|
|
"""Reprendre un replay en pause supervisée via HTTP direct.
|
|
|
|
Fallback du chemin SocketIO (`lea:replay_resume` → agent_chat)
|
|
utilisé quand le bus feedback est déconnecté au moment où
|
|
l'utilisateur clique « Continuer » dans la bulle paused.
|
|
|
|
Retourne True si le serveur streaming a accepté la reprise.
|
|
"""
|
|
if not replay_id:
|
|
return False
|
|
try:
|
|
import requests
|
|
resp = requests.post(
|
|
f"{self._stream_url}/traces/stream/replay/{replay_id}/resume",
|
|
headers=self._auth_headers(),
|
|
timeout=10,
|
|
)
|
|
return bool(resp.ok)
|
|
except Exception:
|
|
logger.debug("resume_replay HTTP silenced", exc_info=True)
|
|
return False
|
|
|
|
def abort_replay(self, replay_id: str) -> bool:
|
|
"""Annuler un replay en pause supervisée via HTTP direct.
|
|
|
|
Symétrique de ``resume_replay`` : fallback du chemin SocketIO
|
|
(`lea:replay_abort`) quand le bus feedback est déconnecté.
|
|
POSTe sur ``/replay/{id}/cancel`` côté serveur streaming.
|
|
"""
|
|
if not replay_id:
|
|
return False
|
|
try:
|
|
import requests
|
|
resp = requests.post(
|
|
f"{self._stream_url}/traces/stream/replay/{replay_id}/cancel",
|
|
headers=self._auth_headers(),
|
|
timeout=10,
|
|
)
|
|
return bool(resp.ok)
|
|
except Exception:
|
|
logger.debug("abort_replay HTTP silenced", exc_info=True)
|
|
return False
|
|
|
|
def report_action_result(
|
|
self,
|
|
session_id: str,
|
|
action_id: str,
|
|
success: bool,
|
|
error: Optional[str] = None,
|
|
screenshot: Optional[str] = None,
|
|
) -> None:
|
|
"""Rapporter le resultat d'execution d'une action au serveur."""
|
|
try:
|
|
import requests
|
|
requests.post(
|
|
f"{self._stream_url}/traces/stream/replay/result",
|
|
json={
|
|
"session_id": session_id,
|
|
"action_id": action_id,
|
|
"success": success,
|
|
"error": error,
|
|
"screenshot": screenshot,
|
|
},
|
|
headers=self._auth_headers(),
|
|
timeout=5,
|
|
)
|
|
except Exception as e:
|
|
logger.error("Report action result erreur : %s", e)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Lifecycle
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def shutdown(self) -> None:
|
|
"""Arreter proprement le client."""
|
|
self.stop_polling()
|
|
logger.info("LeaServerClient arrete")
|