feat(agent_v1): FeedbackBusClient — client SocketIO pour bus 'lea:*'
Consomme les events 'lea:*' émis par agent_chat (port 5004) et les dispatche vers un callback fourni par ChatWindow (J3.3 à venir). Caractéristiques : - Connexion en thread daemon (non-bloquant pour la mainloop tkinter) - Reconnect auto illimité (delay 2s → 30s exponentiel) - Auth Bearer Token via header HTTP au handshake - Fail-safe : connect échoué, callback qui raise, disconnect qui raise → tout silencieusement loggé, ChatWindow continue normalement 13 tests pytest verts (tests/integration/test_feedback_bus_client.py). Pas de connexion réseau réelle dans les tests (python-socketio mocké). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
124
agent_v0/agent_v1/network/feedback_bus.py
Normal file
124
agent_v0/agent_v1/network/feedback_bus.py
Normal file
@@ -0,0 +1,124 @@
|
|||||||
|
# agent_v1/network/feedback_bus.py
|
||||||
|
"""Client SocketIO pour le bus feedback Léa.
|
||||||
|
|
||||||
|
Consomme les events 'lea:*' émis par agent_chat (port 5004) et les dispatche
|
||||||
|
vers ChatWindow pour affichage en bulles temps réel.
|
||||||
|
|
||||||
|
Events écoutés :
|
||||||
|
lea:action_started — début d'un workflow ou d'une action
|
||||||
|
lea:action_progress — progression dans le workflow
|
||||||
|
lea:done — fin d'un workflow ou d'un copilot
|
||||||
|
lea:need_confirm — étape copilot en attente de validation
|
||||||
|
lea:step_result — résultat d'une étape copilot
|
||||||
|
lea:paused — basculement en paused_need_help (asset démo)
|
||||||
|
lea:resumed — sortie de pause supervisée
|
||||||
|
|
||||||
|
Fail-safe : toute erreur de connexion ou de dispatch est silencieusement
|
||||||
|
loggée. Le ChatWindow continue de fonctionner même si le bus est mort
|
||||||
|
(comportement strictement identique au pré-J3).
|
||||||
|
|
||||||
|
Usage :
|
||||||
|
bus = FeedbackBusClient(
|
||||||
|
server_url="http://localhost:5004",
|
||||||
|
token=os.environ.get("RPA_API_TOKEN", ""),
|
||||||
|
on_event=lambda event, payload: print(event, payload),
|
||||||
|
)
|
||||||
|
bus.start() # connexion en arrière-plan, non-bloquant
|
||||||
|
# ... ChatWindow tourne ...
|
||||||
|
bus.stop()
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import threading
|
||||||
|
from typing import Callable, Optional
|
||||||
|
|
||||||
|
import socketio
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
LEA_EVENTS = (
|
||||||
|
'lea:action_started',
|
||||||
|
'lea:action_progress',
|
||||||
|
'lea:done',
|
||||||
|
'lea:need_confirm',
|
||||||
|
'lea:step_result',
|
||||||
|
'lea:paused',
|
||||||
|
'lea:resumed',
|
||||||
|
)
|
||||||
|
|
||||||
|
EventCallback = Callable[[str, dict], None]
|
||||||
|
|
||||||
|
|
||||||
|
class FeedbackBusClient:
|
||||||
|
"""Client SocketIO non-bloquant pour le bus 'lea:*'."""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
server_url: str,
|
||||||
|
token: Optional[str] = None,
|
||||||
|
on_event: Optional[EventCallback] = None,
|
||||||
|
):
|
||||||
|
self._url = server_url.rstrip('/')
|
||||||
|
self._token = token or None
|
||||||
|
self._on_event: EventCallback = on_event or (lambda e, p: None)
|
||||||
|
self._sio = socketio.Client(
|
||||||
|
reconnection=True,
|
||||||
|
reconnection_attempts=0, # 0 = illimité
|
||||||
|
reconnection_delay=2,
|
||||||
|
reconnection_delay_max=30,
|
||||||
|
logger=False,
|
||||||
|
engineio_logger=False,
|
||||||
|
)
|
||||||
|
self._thread: Optional[threading.Thread] = None
|
||||||
|
self._register_handlers()
|
||||||
|
|
||||||
|
def _register_handlers(self) -> None:
|
||||||
|
@self._sio.event
|
||||||
|
def connect():
|
||||||
|
logger.info("FeedbackBus connecté à %s", self._url)
|
||||||
|
|
||||||
|
@self._sio.event
|
||||||
|
def disconnect():
|
||||||
|
logger.info("FeedbackBus déconnecté")
|
||||||
|
|
||||||
|
for ev in LEA_EVENTS:
|
||||||
|
self._sio.on(ev, lambda data, e=ev: self._dispatch(e, data))
|
||||||
|
|
||||||
|
def _dispatch(self, event: str, payload: Optional[dict]) -> None:
|
||||||
|
try:
|
||||||
|
self._on_event(event, payload or {})
|
||||||
|
except Exception:
|
||||||
|
logger.debug("FeedbackBus dispatch silenced", exc_info=True)
|
||||||
|
|
||||||
|
def start(self) -> None:
|
||||||
|
"""Démarrer la connexion en arrière-plan (idempotent, non-bloquant)."""
|
||||||
|
if self._thread is not None and self._thread.is_alive():
|
||||||
|
return
|
||||||
|
self._thread = threading.Thread(
|
||||||
|
target=self._run, daemon=True, name="LeaFeedbackBus",
|
||||||
|
)
|
||||||
|
self._thread.start()
|
||||||
|
|
||||||
|
def _run(self) -> None:
|
||||||
|
headers = {}
|
||||||
|
if self._token:
|
||||||
|
headers['Authorization'] = f'Bearer {self._token}'
|
||||||
|
try:
|
||||||
|
self._sio.connect(self._url, headers=headers, wait=True)
|
||||||
|
self._sio.wait()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
"FeedbackBus connect échoué (%s) — ChatWindow continue normalement", e,
|
||||||
|
)
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
"""Arrêter proprement la connexion (idempotent, fail-safe)."""
|
||||||
|
try:
|
||||||
|
if self._sio.connected:
|
||||||
|
self._sio.disconnect()
|
||||||
|
except Exception:
|
||||||
|
logger.debug("FeedbackBus stop silenced", exc_info=True)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def connected(self) -> bool:
|
||||||
|
return bool(self._sio.connected)
|
||||||
125
tests/integration/test_feedback_bus_client.py
Normal file
125
tests/integration/test_feedback_bus_client.py
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
"""Tests FeedbackBusClient (J3.2).
|
||||||
|
|
||||||
|
On mock python-socketio pour ne pas ouvrir de vraie connexion réseau.
|
||||||
|
Le test E2E réel (vraie connexion bus 5004) est différé à J4.3.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from agent_v0.agent_v1.network.feedback_bus import FeedbackBusClient, LEA_EVENTS
|
||||||
|
|
||||||
|
|
||||||
|
def test_init_creates_socketio_client():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
assert bus._sio is not None
|
||||||
|
assert bus.connected is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_init_strips_trailing_slash():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004/")
|
||||||
|
assert bus._url == "http://localhost:5004"
|
||||||
|
|
||||||
|
|
||||||
|
def test_lea_events_registered():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
handlers = bus._sio.handlers.get('/', {})
|
||||||
|
for ev in LEA_EVENTS:
|
||||||
|
assert ev in handlers, f"Handler {ev!r} non enregistré sur le client"
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_calls_callback():
|
||||||
|
received = []
|
||||||
|
bus = FeedbackBusClient(
|
||||||
|
"http://localhost:5004",
|
||||||
|
on_event=lambda e, p: received.append((e, p)),
|
||||||
|
)
|
||||||
|
bus._dispatch('lea:paused', {'workflow': 'demo', 'reason': 'incertain'})
|
||||||
|
assert received == [('lea:paused', {'workflow': 'demo', 'reason': 'incertain'})]
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_handles_none_payload():
|
||||||
|
received = []
|
||||||
|
bus = FeedbackBusClient(
|
||||||
|
"http://localhost:5004",
|
||||||
|
on_event=lambda e, p: received.append((e, p)),
|
||||||
|
)
|
||||||
|
bus._dispatch('lea:done', None)
|
||||||
|
assert received == [('lea:done', {})]
|
||||||
|
|
||||||
|
|
||||||
|
def test_dispatch_silenced_on_callback_error():
|
||||||
|
"""Une exception dans le callback consommateur ne doit jamais remonter."""
|
||||||
|
def boom(event, payload):
|
||||||
|
raise RuntimeError("callback fail")
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004", on_event=boom)
|
||||||
|
bus._dispatch('lea:paused', {}) # ne doit pas raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_callback_is_silent():
|
||||||
|
"""Sans callback fourni, le dispatch ne casse pas."""
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
bus._dispatch('lea:paused', {'x': 1}) # ne doit pas raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_token_in_authorization_header():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004", token="abc123")
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def fake_connect(url, headers=None, **kwargs):
|
||||||
|
captured['headers'] = headers
|
||||||
|
raise RuntimeError("stop here")
|
||||||
|
|
||||||
|
with patch.object(bus._sio, 'connect', side_effect=fake_connect):
|
||||||
|
bus._run()
|
||||||
|
|
||||||
|
assert captured['headers']['Authorization'] == 'Bearer abc123'
|
||||||
|
|
||||||
|
|
||||||
|
def test_no_token_means_no_auth_header():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
captured = {}
|
||||||
|
|
||||||
|
def fake_connect(url, headers=None, **kwargs):
|
||||||
|
captured['headers'] = headers
|
||||||
|
raise RuntimeError("stop here")
|
||||||
|
|
||||||
|
with patch.object(bus._sio, 'connect', side_effect=fake_connect):
|
||||||
|
bus._run()
|
||||||
|
|
||||||
|
assert 'Authorization' not in captured['headers']
|
||||||
|
|
||||||
|
|
||||||
|
def test_run_silenced_on_connect_error():
|
||||||
|
"""connect() qui raise ne doit pas faire crasher le thread."""
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
with patch.object(bus._sio, 'connect', side_effect=ConnectionError("boom")):
|
||||||
|
bus._run() # ne doit pas raise
|
||||||
|
|
||||||
|
|
||||||
|
def test_start_is_idempotent():
|
||||||
|
"""Un second start() pendant que le thread tourne ne doit pas en créer un autre."""
|
||||||
|
import threading
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
block = threading.Event()
|
||||||
|
with patch.object(bus, '_run', side_effect=lambda: block.wait(timeout=2)):
|
||||||
|
bus.start()
|
||||||
|
first_thread = bus._thread
|
||||||
|
bus.start()
|
||||||
|
second_thread = bus._thread
|
||||||
|
block.set()
|
||||||
|
assert first_thread is second_thread, "start() doit être idempotent quand un thread tourne"
|
||||||
|
|
||||||
|
|
||||||
|
def test_stop_when_not_connected_is_silent():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
bus.stop() # ne doit pas raise même si jamais connecté
|
||||||
|
|
||||||
|
|
||||||
|
def test_stop_silenced_on_disconnect_error():
|
||||||
|
bus = FeedbackBusClient("http://localhost:5004")
|
||||||
|
# Forcer connected=True sur l'instance et faire raise disconnect()
|
||||||
|
with patch.object(bus._sio, 'disconnect', side_effect=RuntimeError("boom")):
|
||||||
|
bus._sio.connected = True
|
||||||
|
bus.stop() # ne doit pas raise
|
||||||
Reference in New Issue
Block a user