# agent_v1/ui/notifications.py """ Gestionnaire de notifications toast natives (Windows/Linux/macOS). Utilise plyer pour les notifications système, sans dépendance PyQt5. Remplace les dialogues Qt par des toasts non-bloquants. Thread-safe avec rate limiting (1 notification / 2 secondes max). """ import logging import threading import time from typing import Optional logger = logging.getLogger(__name__) # Import conditionnel de plyer — fallback silencieux si absent try: from plyer import notification as _plyer_notification _PLYER_AVAILABLE = True except ImportError: _plyer_notification = None _PLYER_AVAILABLE = False logger.warning( "plyer non installé — les notifications toast sont désactivées. " "Installer avec : pip install plyer" ) # Nom de l'application affiché dans les toasts APP_NAME = "Léa" # Intervalle minimum entre deux notifications (secondes) RATE_LIMIT_SECONDS = 2 class NotificationManager: """ Gestionnaire centralisé de notifications toast. Thread-safe : peut être appelé depuis n'importe quel thread. Rate limiting : une seule notification toutes les 2 secondes, les notifications excédentaires sont ignorées (pas de file d'attente pour éviter un flood différé). """ def __init__(self, icon_path: Optional[str] = None): """ Initialise le gestionnaire. Args: icon_path: Chemin vers l'icône (.ico/.png) pour les toasts. None = icône par défaut du système. """ self._icon_path = icon_path self._lock = threading.Lock() self._last_notification_time: float = 0.0 # ------------------------------------------------------------------ # # Méthode générique # ------------------------------------------------------------------ # def notify(self, title: str, message: str, timeout: int = 5) -> bool: """ Affiche une notification toast. Args: title: Titre de la notification. message: Corps du message. timeout: Durée d'affichage en secondes. Returns: True si la notification a été envoyée, False sinon (plyer absent ou rate limit atteint). """ if not _PLYER_AVAILABLE: logger.debug("Notification ignorée (plyer absent) : %s", title) return False with self._lock: now = time.monotonic() elapsed = now - self._last_notification_time if elapsed < RATE_LIMIT_SECONDS: logger.debug( "Notification ignorée (rate limit, %.1fs restantes) : %s", RATE_LIMIT_SECONDS - elapsed, title, ) return False self._last_notification_time = now # Envoi dans un thread dédié pour ne jamais bloquer l'appelant thread = threading.Thread( target=self._send, args=(title, message, timeout), daemon=True, ) thread.start() return True def _send(self, title: str, message: str, timeout: int) -> None: """Envoi effectif de la notification (exécuté dans un thread dédié).""" try: # Windows limite les balloon tips à 256 caractères if len(title) > 63: title = title[:60] + "..." if len(message) > 200: message = message[:197] + "..." _plyer_notification.notify( title=title, message=message, app_name=APP_NAME, app_icon=self._icon_path, timeout=timeout, ) except Exception: logger.exception("Erreur lors de l'envoi de la notification toast") # ------------------------------------------------------------------ # # Méthodes métier # ------------------------------------------------------------------ # def greet(self) -> bool: """Notification de bienvenue au démarrage.""" return self.notify( title=APP_NAME, message="Bonjour ! Léa est prête.", timeout=5, ) def session_started(self, workflow_name: str) -> bool: """Notification de début de session.""" return self.notify( title=APP_NAME, message="C'est parti ! Je regarde et je mémorise.", timeout=5, ) def session_ended(self, action_count: int) -> bool: """Notification de fin de session avec le nombre d'actions.""" return self.notify( title=APP_NAME, message=f"C'est noté ! J'ai bien compris les {action_count} étapes.", timeout=5, ) def workflow_learned(self, name: str) -> bool: """Notification quand une tâche a été apprise.""" return self.notify( title=APP_NAME, message=f"J'ai appris '{name}' ! Je peux la refaire quand vous voulez.", timeout=7, ) def replay_started(self, workflow_name: str, step_count: int) -> bool: """Notification de début de replay.""" return self.notify( title=APP_NAME, message=f"Je m'en occupe ! '{workflow_name}' en cours...", timeout=5, ) def replay_step(self, current: int, total: int, description: str) -> bool: """Notification de progression d'une étape de replay.""" return self.notify( title=APP_NAME, message=f"Étape {current}/{total} : {description}", timeout=3, ) def replay_finished(self, success: bool, workflow_name: str) -> bool: """Notification de fin de replay (succès ou échec).""" if success: return self.notify( title=APP_NAME, message="C'est fait ! Tout s'est bien passé.", timeout=5, ) else: return self.notify( title=APP_NAME, message="Hmm, j'ai eu un souci. Vous pouvez me remontrer ?", timeout=7, ) def connection_changed(self, connected: bool, server_host: str) -> bool: """Notification de changement d'état de la connexion serveur.""" if connected: return self.notify( title=APP_NAME, message="Connectée au serveur.", timeout=5, ) else: return self.notify( title=APP_NAME, message="J'ai perdu la connexion avec le serveur.", timeout=7, ) def error(self, message: str) -> bool: """Notification d'erreur.""" return self.notify( title=APP_NAME, message=f"Oups, un problème : {message}", timeout=10, )