From 41eba898c053569b0479096321364d8833862767 Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 28 Apr 2026 08:43:26 +0200 Subject: [PATCH] =?UTF-8?q?feat(agent=5Fv1):=20FeedbackBusClient=20?= =?UTF-8?q?=E2=80=94=20client=20SocketIO=20pour=20bus=20'lea:*'?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consomme les events 'lea:*' émis par agent_chat (port 5004) et les dispatche vers un callback fourni par ChatWindow (J3.3 à venir). Caractéristiques : - Connexion en thread daemon (non-bloquant pour la mainloop tkinter) - Reconnect auto illimité (delay 2s → 30s exponentiel) - Auth Bearer Token via header HTTP au handshake - Fail-safe : connect échoué, callback qui raise, disconnect qui raise → tout silencieusement loggé, ChatWindow continue normalement 13 tests pytest verts (tests/integration/test_feedback_bus_client.py). Pas de connexion réseau réelle dans les tests (python-socketio mocké). Co-Authored-By: Claude Opus 4.7 (1M context) --- agent_v0/agent_v1/network/feedback_bus.py | 124 +++++++++++++++++ tests/integration/test_feedback_bus_client.py | 125 ++++++++++++++++++ 2 files changed, 249 insertions(+) create mode 100644 agent_v0/agent_v1/network/feedback_bus.py create mode 100644 tests/integration/test_feedback_bus_client.py diff --git a/agent_v0/agent_v1/network/feedback_bus.py b/agent_v0/agent_v1/network/feedback_bus.py new file mode 100644 index 000000000..05cfd3dea --- /dev/null +++ b/agent_v0/agent_v1/network/feedback_bus.py @@ -0,0 +1,124 @@ +# agent_v1/network/feedback_bus.py +"""Client SocketIO pour le bus feedback Léa. + +Consomme les events 'lea:*' émis par agent_chat (port 5004) et les dispatche +vers ChatWindow pour affichage en bulles temps réel. + +Events écoutés : + lea:action_started — début d'un workflow ou d'une action + lea:action_progress — progression dans le workflow + lea:done — fin d'un workflow ou d'un copilot + lea:need_confirm — étape copilot en attente de validation + lea:step_result — résultat d'une étape copilot + lea:paused — basculement en paused_need_help (asset démo) + lea:resumed — sortie de pause supervisée + +Fail-safe : toute erreur de connexion ou de dispatch est silencieusement +loggée. Le ChatWindow continue de fonctionner même si le bus est mort +(comportement strictement identique au pré-J3). + +Usage : + bus = FeedbackBusClient( + server_url="http://localhost:5004", + token=os.environ.get("RPA_API_TOKEN", ""), + on_event=lambda event, payload: print(event, payload), + ) + bus.start() # connexion en arrière-plan, non-bloquant + # ... ChatWindow tourne ... + bus.stop() +""" + +import logging +import threading +from typing import Callable, Optional + +import socketio + +logger = logging.getLogger(__name__) + +LEA_EVENTS = ( + 'lea:action_started', + 'lea:action_progress', + 'lea:done', + 'lea:need_confirm', + 'lea:step_result', + 'lea:paused', + 'lea:resumed', +) + +EventCallback = Callable[[str, dict], None] + + +class FeedbackBusClient: + """Client SocketIO non-bloquant pour le bus 'lea:*'.""" + + def __init__( + self, + server_url: str, + token: Optional[str] = None, + on_event: Optional[EventCallback] = None, + ): + self._url = server_url.rstrip('/') + self._token = token or None + self._on_event: EventCallback = on_event or (lambda e, p: None) + self._sio = socketio.Client( + reconnection=True, + reconnection_attempts=0, # 0 = illimité + reconnection_delay=2, + reconnection_delay_max=30, + logger=False, + engineio_logger=False, + ) + self._thread: Optional[threading.Thread] = None + self._register_handlers() + + def _register_handlers(self) -> None: + @self._sio.event + def connect(): + logger.info("FeedbackBus connecté à %s", self._url) + + @self._sio.event + def disconnect(): + logger.info("FeedbackBus déconnecté") + + for ev in LEA_EVENTS: + self._sio.on(ev, lambda data, e=ev: self._dispatch(e, data)) + + def _dispatch(self, event: str, payload: Optional[dict]) -> None: + try: + self._on_event(event, payload or {}) + except Exception: + logger.debug("FeedbackBus dispatch silenced", exc_info=True) + + def start(self) -> None: + """Démarrer la connexion en arrière-plan (idempotent, non-bloquant).""" + if self._thread is not None and self._thread.is_alive(): + return + self._thread = threading.Thread( + target=self._run, daemon=True, name="LeaFeedbackBus", + ) + self._thread.start() + + def _run(self) -> None: + headers = {} + if self._token: + headers['Authorization'] = f'Bearer {self._token}' + try: + self._sio.connect(self._url, headers=headers, wait=True) + self._sio.wait() + except Exception as e: + logger.warning( + "FeedbackBus connect échoué (%s) — ChatWindow continue normalement", e, + ) + + def stop(self) -> None: + """Arrêter proprement la connexion (idempotent, fail-safe).""" + try: + if self._sio.connected: + self._sio.disconnect() + except Exception: + logger.debug("FeedbackBus stop silenced", exc_info=True) + + @property + def connected(self) -> bool: + return bool(self._sio.connected) diff --git a/tests/integration/test_feedback_bus_client.py b/tests/integration/test_feedback_bus_client.py new file mode 100644 index 000000000..f69815ed7 --- /dev/null +++ b/tests/integration/test_feedback_bus_client.py @@ -0,0 +1,125 @@ +"""Tests FeedbackBusClient (J3.2). + +On mock python-socketio pour ne pas ouvrir de vraie connexion réseau. +Le test E2E réel (vraie connexion bus 5004) est différé à J4.3. +""" + +from unittest.mock import patch + +import pytest + +from agent_v0.agent_v1.network.feedback_bus import FeedbackBusClient, LEA_EVENTS + + +def test_init_creates_socketio_client(): + bus = FeedbackBusClient("http://localhost:5004") + assert bus._sio is not None + assert bus.connected is False + + +def test_init_strips_trailing_slash(): + bus = FeedbackBusClient("http://localhost:5004/") + assert bus._url == "http://localhost:5004" + + +def test_lea_events_registered(): + bus = FeedbackBusClient("http://localhost:5004") + handlers = bus._sio.handlers.get('/', {}) + for ev in LEA_EVENTS: + assert ev in handlers, f"Handler {ev!r} non enregistré sur le client" + + +def test_dispatch_calls_callback(): + received = [] + bus = FeedbackBusClient( + "http://localhost:5004", + on_event=lambda e, p: received.append((e, p)), + ) + bus._dispatch('lea:paused', {'workflow': 'demo', 'reason': 'incertain'}) + assert received == [('lea:paused', {'workflow': 'demo', 'reason': 'incertain'})] + + +def test_dispatch_handles_none_payload(): + received = [] + bus = FeedbackBusClient( + "http://localhost:5004", + on_event=lambda e, p: received.append((e, p)), + ) + bus._dispatch('lea:done', None) + assert received == [('lea:done', {})] + + +def test_dispatch_silenced_on_callback_error(): + """Une exception dans le callback consommateur ne doit jamais remonter.""" + def boom(event, payload): + raise RuntimeError("callback fail") + bus = FeedbackBusClient("http://localhost:5004", on_event=boom) + bus._dispatch('lea:paused', {}) # ne doit pas raise + + +def test_default_callback_is_silent(): + """Sans callback fourni, le dispatch ne casse pas.""" + bus = FeedbackBusClient("http://localhost:5004") + bus._dispatch('lea:paused', {'x': 1}) # ne doit pas raise + + +def test_token_in_authorization_header(): + bus = FeedbackBusClient("http://localhost:5004", token="abc123") + captured = {} + + def fake_connect(url, headers=None, **kwargs): + captured['headers'] = headers + raise RuntimeError("stop here") + + with patch.object(bus._sio, 'connect', side_effect=fake_connect): + bus._run() + + assert captured['headers']['Authorization'] == 'Bearer abc123' + + +def test_no_token_means_no_auth_header(): + bus = FeedbackBusClient("http://localhost:5004") + captured = {} + + def fake_connect(url, headers=None, **kwargs): + captured['headers'] = headers + raise RuntimeError("stop here") + + with patch.object(bus._sio, 'connect', side_effect=fake_connect): + bus._run() + + assert 'Authorization' not in captured['headers'] + + +def test_run_silenced_on_connect_error(): + """connect() qui raise ne doit pas faire crasher le thread.""" + bus = FeedbackBusClient("http://localhost:5004") + with patch.object(bus._sio, 'connect', side_effect=ConnectionError("boom")): + bus._run() # ne doit pas raise + + +def test_start_is_idempotent(): + """Un second start() pendant que le thread tourne ne doit pas en créer un autre.""" + import threading + bus = FeedbackBusClient("http://localhost:5004") + block = threading.Event() + with patch.object(bus, '_run', side_effect=lambda: block.wait(timeout=2)): + bus.start() + first_thread = bus._thread + bus.start() + second_thread = bus._thread + block.set() + assert first_thread is second_thread, "start() doit être idempotent quand un thread tourne" + + +def test_stop_when_not_connected_is_silent(): + bus = FeedbackBusClient("http://localhost:5004") + bus.stop() # ne doit pas raise même si jamais connecté + + +def test_stop_silenced_on_disconnect_error(): + bus = FeedbackBusClient("http://localhost:5004") + # Forcer connected=True sur l'instance et faire raise disconnect() + with patch.object(bus._sio, 'disconnect', side_effect=RuntimeError("boom")): + bus._sio.connected = True + bus.stop() # ne doit pas raise