Files
rpa_vision_v3/agent_v0/lea_ui/server_client.py
Dom ae65be2555 chore: ajouter agent_v0/ au tracking git (était un repo embarqué)
Suppression du .git embarqué dans agent_v0/ — le code est maintenant
tracké normalement dans le repo principal.
Inclut : agent_v1 (client), server_v1 (streaming), lea_ui (chat client)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 11:12:23 +01:00

356 lines
12 KiB
Python

# agent_v0/lea_ui/server_client.py
"""
Client API pour communiquer avec le serveur Linux RPA Vision V3.
Endpoints cibles :
- Agent Chat (port 5004) : /api/chat, /api/workflows
- Streaming Server (port 5005) : /api/v1/traces/stream/replay/next, etc.
Le polling tourne dans un thread separe pour ne pas bloquer la UI Qt.
"""
from __future__ import annotations
import json
import logging
import os
import threading
import time
from typing import Any, Callable, Dict, List, Optional
logger = logging.getLogger("lea_ui.server_client")
def _get_server_host() -> str:
"""Recuperer l'adresse du serveur Linux.
Ordre de resolution :
1. Variable d'environnement RPA_SERVER_HOST
2. Fichier de config agent_config.json (cle "server_host")
3. Fallback localhost
"""
# 1. Variable d'environnement
host = os.environ.get("RPA_SERVER_HOST", "").strip()
if host:
return host
# 2. Fichier de config
config_paths = [
os.path.join(os.path.dirname(__file__), "..", "agent_config.json"),
os.path.join(os.path.dirname(__file__), "..", "..", "agent_config.json"),
]
for config_path in config_paths:
try:
with open(config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
host = cfg.get("server_host", "").strip()
if host:
return host
except (OSError, json.JSONDecodeError):
continue
# 3. Fallback
return "localhost"
class LeaServerClient:
"""Client API thread-safe vers le serveur RPA Vision V3.
Gere la communication HTTP avec le serveur chat (port 5004)
et le serveur de streaming (port 5005).
Le polling replay tourne dans un thread daemon separe.
"""
def __init__(
self,
server_host: Optional[str] = None,
chat_port: int = 5004,
stream_port: int = 5005,
) -> None:
self._host = server_host or _get_server_host()
self._chat_port = chat_port
self._stream_port = stream_port
self._chat_base = f"http://{self._host}:{self._chat_port}"
self._stream_base = f"http://{self._host}:{self._stream_port}"
# Etat de connexion
self._connected = False
self._last_error: Optional[str] = None
# Callbacks UI (appelees depuis le thread de polling)
self._on_connection_change: Optional[Callable[[bool], None]] = None
self._on_replay_action: Optional[Callable[[Dict[str, Any]], None]] = None
self._on_chat_response: Optional[Callable[[Dict[str, Any]], None]] = None
# Thread de polling
self._polling = False
self._poll_thread: Optional[threading.Thread] = None
self._poll_interval = 1.0 # secondes
# Session de chat
self._chat_session_id: Optional[str] = None
logger.info(
"LeaServerClient initialise : chat=%s, stream=%s",
self._chat_base, self._stream_base,
)
# ---------------------------------------------------------------------------
# Proprietes
# ---------------------------------------------------------------------------
@property
def connected(self) -> bool:
return self._connected
@property
def server_host(self) -> str:
return self._host
@property
def last_error(self) -> Optional[str]:
return self._last_error
# ---------------------------------------------------------------------------
# Callbacks
# ---------------------------------------------------------------------------
def set_on_connection_change(self, callback: Callable[[bool], None]) -> None:
"""Callback appelee quand l'etat de connexion change."""
self._on_connection_change = callback
def set_on_replay_action(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""Callback appelee quand une action de replay est recue."""
self._on_replay_action = callback
def set_on_chat_response(self, callback: Callable[[Dict[str, Any]], None]) -> None:
"""Callback appelee quand une reponse chat est recue."""
self._on_chat_response = callback
# ---------------------------------------------------------------------------
# Connexion
# ---------------------------------------------------------------------------
def check_connection(self) -> bool:
"""Tester la connexion au serveur chat."""
try:
import requests
resp = requests.get(
f"{self._chat_base}/api/workflows",
timeout=5,
)
was_connected = self._connected
self._connected = resp.ok
self._last_error = None
if self._connected != was_connected and self._on_connection_change:
self._on_connection_change(self._connected)
return self._connected
except Exception as e:
was_connected = self._connected
self._connected = False
self._last_error = str(e)
if was_connected and self._on_connection_change:
self._on_connection_change(False)
return False
# ---------------------------------------------------------------------------
# Chat API (port 5004)
# ---------------------------------------------------------------------------
def send_chat_message(self, message: str) -> Optional[Dict[str, Any]]:
"""Envoyer un message au chat et retourner la reponse.
Retourne None en cas d'erreur reseau.
"""
try:
import requests
payload = {
"message": message,
}
if self._chat_session_id:
payload["session_id"] = self._chat_session_id
resp = requests.post(
f"{self._chat_base}/api/chat",
json=payload,
timeout=30,
)
if resp.ok:
data = resp.json()
# Sauvegarder le session_id pour le contexte multi-tour
if "session_id" in data:
self._chat_session_id = data["session_id"]
self._connected = True
return data
else:
self._last_error = f"HTTP {resp.status_code}"
logger.warning("Chat API erreur : %s", self._last_error)
return None
except Exception as e:
self._last_error = str(e)
self._connected = False
logger.error("Chat API exception : %s", e)
return None
def list_workflows(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des workflows depuis le serveur chat."""
try:
import requests
resp = requests.get(
f"{self._chat_base}/api/workflows",
timeout=10,
)
if resp.ok:
data = resp.json()
self._connected = True
# L'API renvoie directement une liste ou un dict avec clé "workflows"
if isinstance(data, list):
return data
return data.get("workflows", [])
return []
except Exception as e:
self._last_error = str(e)
logger.error("List workflows erreur : %s", e)
return []
def list_gestures(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des gestes depuis le serveur chat."""
try:
import requests
resp = requests.get(
f"{self._chat_base}/api/gestures",
timeout=10,
)
if resp.ok:
data = resp.json()
if isinstance(data, list):
return data
return data.get("gestures", [])
return []
except Exception as e:
logger.error("List gestures erreur : %s", e)
return []
# ---------------------------------------------------------------------------
# Replay Polling (port 5005)
# ---------------------------------------------------------------------------
def start_polling(self, session_id: str) -> None:
"""Demarrer le polling des actions de replay dans un thread daemon."""
if self._polling:
return
self._polling = True
self._poll_session_id = session_id
self._poll_thread = threading.Thread(
target=self._poll_loop,
daemon=True,
name="lea-replay-poll",
)
self._poll_thread.start()
logger.info("Polling replay demarre pour session %s", session_id)
def stop_polling(self) -> None:
"""Arreter le polling."""
self._polling = False
if self._poll_thread:
self._poll_thread.join(timeout=3)
self._poll_thread = None
logger.info("Polling replay arrete")
def _poll_loop(self) -> None:
"""Boucle de polling dans un thread separe."""
import requests as req_lib
while self._polling:
try:
resp = req_lib.get(
f"{self._stream_base}/api/v1/traces/stream/replay/next",
params={"session_id": self._poll_session_id},
timeout=5,
)
if resp.ok:
data = resp.json()
action = data.get("action")
if action and self._on_replay_action:
self._on_replay_action(action)
# Apres une action, poll plus rapidement
time.sleep(0.2)
continue
except req_lib.exceptions.ConnectionError:
# Serveur non disponible — silencieux
pass
except req_lib.exceptions.Timeout:
pass
except Exception as e:
logger.error("Erreur poll replay : %s", e)
time.sleep(self._poll_interval)
# ---------------------------------------------------------------------------
# Replay Status
# ---------------------------------------------------------------------------
def get_replay_status(self) -> Optional[Dict[str, Any]]:
"""Recuperer l'etat des replays en cours."""
try:
import requests
resp = requests.get(
f"{self._stream_base}/api/v1/traces/stream/replays",
timeout=5,
)
if resp.ok:
data = resp.json()
replays = data.get("replays", [])
# Retourner le premier replay actif
for r in replays:
if r.get("status") == "running":
return r
return None
return None
except Exception:
return None
def report_action_result(
self,
session_id: str,
action_id: str,
success: bool,
error: Optional[str] = None,
screenshot: Optional[str] = None,
) -> None:
"""Rapporter le resultat d'execution d'une action au serveur."""
try:
import requests
requests.post(
f"{self._stream_base}/api/v1/traces/stream/replay/result",
json={
"session_id": session_id,
"action_id": action_id,
"success": success,
"error": error,
"screenshot": screenshot,
},
timeout=5,
)
except Exception as e:
logger.error("Report action result erreur : %s", e)
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def shutdown(self) -> None:
"""Arreter proprement le client."""
self.stop_polling()
logger.info("LeaServerClient arrete")