merge: disparition Lea (watchdog)

This commit is contained in:
Dom
2026-07-01 12:37:21 +02:00
4 changed files with 545 additions and 49 deletions

View File

@@ -607,29 +607,20 @@ class AgentV1:
def run(self):
self.ui.run()
def _headless_keepalive(agent):
"""Maintient le main thread vivant quand l'UI tray ne peut pas tourner.
def _install_signal_handlers(agent, watchdog) -> None:
"""Installe SIGTERM/SIGINT/SIGBREAK pour un arrêt propre du main thread.
Sans cela, ``agent.run()`` retourne immédiatement (pystray échoue quand
Léa est lancée via SSH sans session interactive Windows), le main thread
se termine, et TOUS les daemon threads — y compris ``_replay_poll_loop``
— meurent avec lui. Observé 3 fois en 24h les 24/05 :
- SSH ``Permission denied`` (1231)
- polls morts après relance distante (1620)
- polls morts ``replay_sess_506d6fa2`` (1627)
Le keepalive ne se déclenche QUE si ``agent.run()`` est sorti tout en
laissant ``agent.running=True`` (cas anormal). En mode interactif
normal, ``pystray.Icon.run()`` ne sort jamais, donc ce code est
invisible.
Met ``agent.running=False`` (les daemon threads s'arrêtent) et réveille
le watchdog (qui sort de sa boucle de surveillance). Sans session
interactive (pystray.Icon.stop indisponible), c'est le SEUL moyen
d'arrêter Léa proprement : ``kill -TERM <pid>`` ou Ctrl+C.
"""
import signal as _sig
_stop = threading.Event()
def _handler(sig, frame):
logger.info(f"[MAIN] Signal {sig} recu — arret propre")
_stop.set()
agent.running = False
watchdog.stop()
for sig_name in ("SIGTERM", "SIGINT", "SIGBREAK"):
sig_obj = getattr(_sig, sig_name, None)
@@ -640,33 +631,49 @@ def _headless_keepalive(agent):
except (ValueError, OSError):
pass
logger.info(
"[MAIN] Keepalive headless actif — main thread bloque pour maintenir "
"les daemon threads (_replay_poll_loop, heartbeat, capture_server) vivants. "
"Pour stopper Lea : kill -TERM <pid> ou Ctrl+C."
)
try:
_stop.wait()
except KeyboardInterrupt:
pass
agent.running = False
logger.info("[MAIN] Keepalive termine — agent.running=False, daemon threads vont s'arreter")
def _agent_should_live(agent) -> bool:
"""Vrai tant que Léa doit vivre : agent actif ET pas de Quitter explicite.
Un « Quitter » utilisateur (``ui._quit_requested``) doit stopper le
watchdog pour de bon ; une simple déconnexion RDP ne met JAMAIS ce flag
→ le tray revient tout seul à la reconnexion.
"""
if not getattr(agent, "running", False):
return False
ui = getattr(agent, "ui", None)
if ui is not None and getattr(ui, "_quit_requested", False):
return False
return True
def main():
agent = AgentV1()
try:
agent.run()
except Exception:
logger.exception("[MAIN] agent.run() a leve une exception")
from .ui.session_watchdog import InteractiveSessionWatchdog
if getattr(agent, "running", False):
logger.warning(
"[MAIN] agent.run() est sorti mais agent.running=True — "
"probablement pystray sans session interactive (SSH). "
"Bascule en keepalive headless."
)
_headless_keepalive(agent)
agent = AgentV1()
# Résilience RDP/Citrix : au lieu de bloquer le main thread pour toujours
# quand pystray sort (session interactive perdue), on surveille la
# session et on ré-affiche le tray + le chat à chaque reconnexion.
# agent.run() (== agent.ui.run()) est ré-entrant : les threads de fond
# ne démarrent qu'une fois, seule l'icône est recréée. Les daemon threads
# de capture/heartbeat/replay tournent contre agent.running et restent
# uniques — le watchdog n'y touche pas.
watchdog = InteractiveSessionWatchdog(
run_ui=agent.run,
is_running=lambda: _agent_should_live(agent),
)
_install_signal_handlers(agent, watchdog)
try:
watchdog.run()
except KeyboardInterrupt:
logger.info("[MAIN] Interruption clavier — arret propre")
except Exception:
logger.exception("[MAIN] Le watchdog de session a leve une exception")
finally:
agent.running = False
logger.info("[MAIN] Sortie — agent.running=False, daemon threads vont s'arreter")
if __name__ == "__main__":

View File

@@ -0,0 +1,197 @@
# agent_v1/ui/session_watchdog.py
"""Watchdog de session interactive Windows — résilience RDP/Citrix.
Problème résolu (preuve poste clinique Émilie, 01/07) :
09:46:28 [MAIN] agent.run() est sorti mais agent.running=True — probablement
pystray sans session interactive (SSH)
09:46:28 [MAIN] Keepalive headless actif — main thread bloque...
Sur les postes cliniques (tous RDP/Citrix), la session interactive
disparaît quand l'utilisateur se déconnecte / la session bascule en
verrouillage. `pystray.Icon.run()` sort alors immédiatement (plus de
bureau interactif `WinSta0\\Default` pour recevoir les entrées et afficher
l'icône). L'ancien `_headless_keepalive` bloquait le main thread *pour
toujours* : l'icône tray + la fenêtre chat DISPARAISSAIENT et ne
revenaient JAMAIS, même après reconnexion RDP. Les soignants croyaient
que Léa avait planté (la capture continuait pourtant en fond).
Solution : un watchdog qui surveille la disponibilité du bureau
interactif via `OpenInputDesktop()` (signal Win32 canonique — échoue quand
la session est déconnectée/verrouillée, réussit à la reconnexion) et
(re)lance l'UI tray dès qu'une session redevient disponible. Les threads
de fond (heartbeat, replay poll, capture_server) NE SONT JAMAIS touchés :
ils tournent contre `agent.running` et restent uniques. On ne relance
JAMAIS un second `AgentV1` — seulement la couche UI (tray + chat).
État de l'art (recherche 01/07) :
- `OpenInputDesktop()` échoue (ERROR_ACCESS_DENIED / ERROR_INVALID_...)
quand le processus n'est pas rattaché au windowstation interactif
`WinSta0` — c'est exactement le cas quand la session RDP est
déconnectée. C'est la méthode fiable recommandée (comparer les
*noms* de bureau via GetUserObjectInformation n'apporte rien de plus
ici : on a juste besoin d'un booléen « input desktop dispo ? »).
- `WTSGetActiveConsoleSessionId` renvoie une pseudo-session même sans
login → PAS fiable pour ce besoin.
- `pystray.Icon.run()` ne sort jamais en session interactive normale ;
il sort immédiatement sinon → c'est notre signal de « session perdue ».
"""
from __future__ import annotations
import logging
import platform
import threading
from typing import Callable, Optional
logger = logging.getLogger(__name__)
# Intervalle de sondage du bureau interactif (secondes).
# 3s = compromis : réactif à la reconnexion sans marteler l'API Win32.
POLL_INTERVAL_S = 3.0
def is_interactive_desktop_available() -> bool:
"""Retourne True si un bureau interactif Windows est disponible.
Utilise `OpenInputDesktop()` : succès => le windowstation interactif
(`WinSta0\\Default`) est accessible et peut afficher un tray. Échec =>
session RDP/Citrix déconnectée ou verrouillée sans bureau d'entrée.
Hors Windows (Linux/dev/tests) : renvoie toujours True (pas de notion
de bureau interactif verrouillable ici — on laisse l'UI tourner).
Toute erreur d'appel Win32 est traitée comme « indisponible » (prudent)
SAUF l'indisponibilité de l'API elle-même (pywin32 absent) → True pour
ne pas priver un poste de son tray à cause d'une dépendance manquante.
"""
if platform.system() != "Windows":
return True
try:
import win32con # type: ignore
import win32service # type: ignore
except Exception:
# pywin32 indisponible : on ne peut pas sonder → on suppose dispo
# (comportement historique : tenter l'UI plutôt que la bloquer).
logger.debug("pywin32 indisponible — sondage bureau interactif ignoré")
return True
hdesk = None
try:
# DESKTOP_SWITCHDESKTOP (0x0100) = droit minimal, aligné sur l'usage
# documenté pour tester la présence du bureau d'entrée.
hdesk = win32service.OpenInputDesktop(0, False, win32con.DESKTOP_SWITCHDESKTOP)
return hdesk is not None
except Exception:
# OpenInputDesktop lève quand aucun bureau d'entrée n'est accessible
# (session déconnectée / verrouillée). C'est le cas « indisponible ».
return False
finally:
if hdesk is not None:
try:
# PyHANDLE se ferme via .Close() (pywin32) ; fallback silencieux.
hdesk.Close()
except Exception:
pass
class InteractiveSessionWatchdog:
"""Surveille la session interactive et (re)lance l'UI tray à la reconnexion.
Ne détient AUCUN état de capture. Sa seule responsabilité : garantir
qu'il existe au plus UN tray vivant à la fois, et le ressusciter quand
une session interactive redevient disponible. Les daemon threads de
l'agent (heartbeat/replay/capture) sont indépendants et intacts.
Paramètres :
run_ui : callable bloquant qui lance le tray (typiquement
``agent.ui.run`` / ``agent.run``). Retourne quand le
tray sort (normal en fin de session interactive).
is_running : callable -> bool ; True tant que l'agent doit vivre
(typiquement ``lambda: agent.running``).
is_available : callable -> bool de détection de session (injectable
pour les tests). Défaut = is_interactive_desktop_available.
poll_interval_s : période de sondage quand la session est absente.
"""
def __init__(
self,
run_ui: Callable[[], None],
is_running: Callable[[], bool],
is_available: Optional[Callable[[], bool]] = None,
poll_interval_s: float = POLL_INTERVAL_S,
) -> None:
self._run_ui = run_ui
self._is_running = is_running
self._is_available = is_available or is_interactive_desktop_available
self._poll_interval_s = poll_interval_s
self._wake = threading.Event()
# Sérialise le lancement de l'UI : jamais deux trays en parallèle.
self._ui_lock = threading.Lock()
def stop(self) -> None:
"""Réveille le watchdog pour qu'il réévalue ``is_running`` et sorte."""
self._wake.set()
def _run_ui_once(self) -> None:
"""Lance l'UI tray une fois (bloquant) sous verrou, avec garde d'erreur.
Le verrou empêche formellement qu'un second appel démarre un tray
alors qu'un premier tourne encore (invariant « un seul tray »).
"""
with self._ui_lock:
try:
self._run_ui()
except Exception:
# Un crash du tray ne doit jamais tuer le watchdog : on log et
# on laisse la boucle décider (retry ou sortie selon is_running).
logger.exception("[WATCHDOG] Le tray UI a levé une exception")
def run(self) -> None:
"""Boucle principale (bloque le main thread à la place du keepalive).
Cycle :
1. Attendre qu'un bureau interactif soit disponible.
2. (Re)lancer le tray — bloque jusqu'à sa sortie (déconnexion RDP).
3. Recommencer tant que ``is_running`` est vrai.
Ne consomme pas de CPU en boucle serrée : sonde toutes les
``poll_interval_s`` via un Event interruptible (réveil immédiat au stop).
"""
logger.info(
"[WATCHDOG] Surveillance session interactive active "
"(re-affichage auto du tray + chat à la reconnexion RDP/Citrix)."
)
first_cycle = True
while self._is_running():
if not self._is_available():
# Session absente : sonder périodiquement sans brûler le CPU.
if first_cycle:
logger.warning(
"[WATCHDOG] Aucune session interactive — Léa reste active "
"en fond (capture/heartbeat), tray masqué. En attente de "
"reconnexion RDP/Citrix pour ré-afficher l'interface."
)
# Event.wait renvoie True si stop() a été appelé → on sort.
if self._wake.wait(timeout=self._poll_interval_s):
break
first_cycle = False
continue
# Session disponible : (re)lancer le tray.
if not first_cycle:
logger.info(
"[WATCHDOG] Session interactive détectée — ré-affichage du "
"tray et de la fenêtre chat de Léa."
)
first_cycle = False
# Bloque jusqu'à la sortie du tray (fin de session interactive).
self._run_ui_once()
# Le tray est sorti. Si l'agent doit vivre, on reboucle (le
# prochain tour re-sondera la session et re-affichera le tray).
if not self._is_running():
break
logger.info("[WATCHDOG] Arrêt de la surveillance de session interactive.")

View File

@@ -137,6 +137,15 @@ class SmartTrayV1:
self._state_lock = threading.Lock()
self._stop_event = threading.Event()
# Résilience RDP/Citrix : run() peut être rappelé plusieurs fois par le
# watchdog de session (ré-affichage du tray à la reconnexion). Les
# threads de fond (connexion, cache workflows, hotkey) et l'accueil ne
# doivent démarrer QU'UNE fois — sinon on duplique les threads.
self._bg_started = False
# Signalé quand l'utilisateur a demandé Quitter : le watchdog ne doit
# alors PAS relancer le tray.
self._quit_requested = False
# Notifications
self._notifier = NotificationManager()
@@ -709,6 +718,11 @@ class SmartTrayV1:
"""Arrete proprement l'agent et quitte."""
logger.info("Arret demande par l'utilisateur")
# Marquer l'arret volontaire : le watchdog de session ne doit PAS
# relancer le tray après un Quitter explicite (à distinguer d'une
# simple déconnexion RDP où le tray doit revenir tout seul).
self._quit_requested = True
# Arreter la session si en cours
if self.is_recording:
self.on_stop()
@@ -885,17 +899,24 @@ class SmartTrayV1:
# ------------------------------------------------------------------
def run(self) -> None:
"""Demarre le tray, les threads de fond, et entre dans la boucle principale."""
# Notification d'accueil — divulgation IA (Article 50, Reglement IA)
self._notifier.greet()
"""Demarre (ou ré-affiche) le tray et entre dans la boucle pystray.
# Enregistrer le hotkey global Ctrl+Shift+L (toggle chat)
self._start_hotkey()
Ré-entrant : le watchdog de session (session_watchdog.py) rappelle
cette méthode à chaque reconnexion RDP/Citrix pour ré-afficher le
tray + la fenêtre chat. Les initialisations one-shot (accueil,
hotkey, threads de fond connexion/cache) sont protégées par
``_bg_started`` pour ne PAS dupliquer les threads. Seule l'icône
pystray est recréée à chaque appel (l'ancienne est morte avec la
session précédente).
"""
self._start_background_once()
# Tooltip avec identifiant machine pour le multi-machine
tray_title = f"Agent V1 - {self.machine_id}"
# Menu statique — reconstruit via _update_icon() quand l'état change
# Menu statique — reconstruit via _update_icon() quand l'état change.
# Nouvelle icône à chaque (ré)affichage : l'objet pystray précédent
# est invalide une fois sa boucle sortie (session interactive perdue).
self.icon = pystray.Icon(
"AgentV1",
self._current_icon(),
@@ -903,6 +924,33 @@ class SmartTrayV1:
menu=pystray.Menu(*self._get_menu_items()),
)
# Rafraîchir les workflows au (ré)affichage — utile après reconnexion.
if self._bg_started and self.server_client is not None:
threading.Thread(target=self._fetch_workflows, daemon=True).start()
# Boucle principale pystray (bloquante). Sort quand la session
# interactive disparaît (RDP déconnecté) OU sur _on_quit → le
# watchdog décide alors de relancer ou non.
logger.info("SmartTrayV1 demarre — entree dans la boucle pystray")
self.icon.run()
def _start_background_once(self) -> None:
"""Initialisations one-shot : accueil, hotkey, threads de fond.
Idempotent : les appels suivants (ré-affichage tray) sont des no-op.
Garantit qu'on n'accumule pas de threads connexion/cache à chaque
reconnexion RDP.
"""
if self._bg_started:
return
self._bg_started = True
# Notification d'accueil — divulgation IA (Article 50, Reglement IA)
self._notifier.greet()
# Enregistrer le hotkey global Ctrl+Shift+L (toggle chat)
self._start_hotkey()
# Demarrer le thread de verification connexion
if self.server_client is not None:
conn_thread = threading.Thread(
@@ -924,7 +972,3 @@ class SmartTrayV1:
threading.Thread(
target=self._fetch_workflows, daemon=True
).start()
# Boucle principale pystray (bloquante)
logger.info("SmartTrayV1 demarre — entree dans la boucle pystray")
self.icon.run()

View File

@@ -0,0 +1,248 @@
"""Tests du watchdog de session interactive (résilience RDP/Citrix).
Vérifie que :
- Le tray est ré-affiché à la reconnexion RDP (run_ui rappelé).
- Un seul tray tourne à la fois (invariant « un seul tray »).
- Les threads de fond de l'agent (heartbeat/replay) ne sont JAMAIS
relancés par le watchdog (il ne relance QUE l'UI).
- Un Quitter explicite arrête le watchdog (pas de résurrection du tray).
- Le détecteur de session Windows tombe en marche (True) hors Windows.
Aucune vraie UI : run_ui et is_available sont des callables mockés.
"""
import sys
import threading
import time
from unittest.mock import MagicMock
# Mocker les libs GUI/Win32 avant tout import du module sous test.
sys.modules.setdefault("pynput", MagicMock())
sys.modules.setdefault("pynput.mouse", MagicMock())
sys.modules.setdefault("pynput.keyboard", MagicMock())
sys.modules.setdefault("pystray", MagicMock())
from agent_v0.agent_v1.ui.session_watchdog import ( # noqa: E402
InteractiveSessionWatchdog,
is_interactive_desktop_available,
)
# ---------------------------------------------------------------------------
# Détection de session
# ---------------------------------------------------------------------------
def test_detection_hors_windows_renvoie_true(monkeypatch):
"""Hors Windows (dev/tests Linux) : bureau toujours 'disponible'."""
monkeypatch.setattr(
"agent_v0.agent_v1.ui.session_watchdog.platform.system",
lambda: "Linux",
)
assert is_interactive_desktop_available() is True
# ---------------------------------------------------------------------------
# Boucle du watchdog
# ---------------------------------------------------------------------------
def test_relance_ui_a_la_reconnexion():
"""Session absente puis présente => le tray est (ré)affiché une fois dispo.
Scénario : la 1re sonde dit 'indisponible' (RDP déconnecté), la 2e dit
'disponible' (reconnexion) => run_ui doit être appelé exactement une fois,
puis le watchdog s'arrête.
"""
availability = iter([False, True])
run_ui_calls = []
def _run_ui():
run_ui_calls.append(time.time())
# L'agent vit jusqu'à ce que le tray ait été affiché une fois.
state = {"alive": True}
def _is_running():
# Après le premier affichage du tray, l'agent s'arrête.
return state["alive"] and len(run_ui_calls) == 0
def _is_available():
return next(availability)
wd = InteractiveSessionWatchdog(
run_ui=_run_ui,
is_running=_is_running,
is_available=_is_available,
poll_interval_s=0.01, # sonde très rapide pour le test
)
wd.run()
# Le tray a été (ré)affiché exactement une fois après la reconnexion.
assert len(run_ui_calls) == 1
def test_reaffichage_apres_chaque_deconnexion():
"""Deux cycles connexion→déconnexion => tray relancé à chaque reconnexion."""
run_ui_calls = []
# is_available toujours True ; le tray 'sort' immédiatement (déconnexion).
def _run_ui():
run_ui_calls.append(1)
def _is_running():
# Vivre pour 2 affichages de tray, puis arrêter.
return len(run_ui_calls) < 2
wd = InteractiveSessionWatchdog(
run_ui=_run_ui,
is_running=_is_running,
is_available=lambda: True,
poll_interval_s=0.01,
)
wd.run()
assert len(run_ui_calls) == 2
def test_un_seul_tray_a_la_fois():
"""L'invariant 'un seul tray' : run_ui n'est jamais réentrant en parallèle."""
concurrent = {"count": 0, "max": 0}
lock = threading.Lock()
def _run_ui():
with lock:
concurrent["count"] += 1
concurrent["max"] = max(concurrent["max"], concurrent["count"])
time.sleep(0.02) # simule un tray qui tourne un peu
with lock:
concurrent["count"] -= 1
calls = {"n": 0}
def _is_running():
calls["n"] += 1
return calls["n"] <= 3 # 3 cycles de tray
wd = InteractiveSessionWatchdog(
run_ui=_run_ui,
is_running=_is_running,
is_available=lambda: True,
poll_interval_s=0.01,
)
wd.run()
# Jamais deux trays simultanés.
assert concurrent["max"] == 1
def test_stop_reveille_le_watchdog_en_attente():
"""stop() sort immédiatement la boucle quand la session est absente."""
run_ui_calls = []
wd = InteractiveSessionWatchdog(
run_ui=lambda: run_ui_calls.append(1),
is_running=lambda: True,
is_available=lambda: False, # jamais de session => reste en attente
poll_interval_s=60, # long : seul stop() peut débloquer
)
t = threading.Thread(target=wd.run)
t.start()
time.sleep(0.05) # laisser entrer dans l'attente
wd.stop()
t.join(timeout=2)
assert not t.is_alive() # le watchdog est bien sorti
assert run_ui_calls == [] # aucun tray (jamais de session dispo)
def test_crash_du_tray_ne_tue_pas_le_watchdog():
"""Une exception dans run_ui est absorbée ; le watchdog reste maître."""
calls = {"n": 0}
def _run_ui():
calls["n"] += 1
raise RuntimeError("tray HS")
def _is_running():
return calls["n"] < 2 # tolérer 2 crashs puis sortir
wd = InteractiveSessionWatchdog(
run_ui=_run_ui,
is_running=_is_running,
is_available=lambda: True,
poll_interval_s=0.01,
)
# Ne doit PAS lever : le crash est loggé, pas propagé.
wd.run()
assert calls["n"] == 2
# ---------------------------------------------------------------------------
# Intégration avec main._agent_should_live (Quitter vs déconnexion)
# ---------------------------------------------------------------------------
def test_tray_run_reentrant_ne_relance_pas_les_threads_de_fond(monkeypatch):
"""SmartTrayV1.run() appelé 2x (reconnexion RDP) : threads de fond 1x seulement.
On vérifie que `_start_background_once` est idempotent : les threads
connexion/cache et l'accueil ne démarrent qu'au premier affichage, mais
une nouvelle icône pystray est recréée à chaque appel (ré-affichage).
"""
import threading as _threading
from agent_v0.agent_v1.ui import smart_tray as smart_tray_mod
tray = smart_tray_mod.SmartTrayV1.__new__(smart_tray_mod.SmartTrayV1)
tray._bg_started = False
tray.machine_id = "poste_test"
tray.server_client = None # pas de threads réseau => simplifie
tray.icon = None
greet_calls = {"n": 0}
hotkey_calls = {"n": 0}
icons_created = {"n": 0}
tray._notifier = MagicMock()
tray._notifier.greet.side_effect = lambda: greet_calls.__setitem__("n", greet_calls["n"] + 1)
monkeypatch.setattr(tray, "_start_hotkey", lambda: hotkey_calls.__setitem__("n", hotkey_calls["n"] + 1))
monkeypatch.setattr(tray, "_current_icon", lambda: object())
monkeypatch.setattr(tray, "_get_menu_items", lambda: [])
# Icône pystray factice : run() ne bloque pas (simule une sortie immédiate).
class _FakeIcon:
def __init__(self, *a, **k):
icons_created["n"] += 1
def run(self):
return None
monkeypatch.setattr(smart_tray_mod.pystray, "Icon", _FakeIcon)
monkeypatch.setattr(smart_tray_mod.pystray, "Menu", lambda *a, **k: None)
# Deux affichages successifs (déconnexion → reconnexion).
tray.run()
tray.run()
# Accueil + hotkey : une seule fois (one-shot).
assert greet_calls["n"] == 1
assert hotkey_calls["n"] == 1
# Mais une nouvelle icône à chaque affichage (le tray revient bien).
assert icons_created["n"] == 2
def test_agent_should_live_distingue_quit_et_deconnexion():
"""Quitter explicite arrête le watchdog ; une déconnexion RDP non."""
from types import SimpleNamespace
from agent_v0.agent_v1.main import _agent_should_live
# Agent actif, tray sans quit demandé => doit vivre (déconnexion RDP OK).
agent = SimpleNamespace(running=True, ui=SimpleNamespace(_quit_requested=False))
assert _agent_should_live(agent) is True
# Quitter explicite => ne doit plus vivre (pas de résurrection).
agent.ui._quit_requested = True
assert _agent_should_live(agent) is False
# agent.running=False => ne vit plus (arrêt global).
agent2 = SimpleNamespace(running=False, ui=SimpleNamespace(_quit_requested=False))
assert _agent_should_live(agent2) is False