# agent_v1/ui/notifications.py """ Gestionnaire de notifications toast natives (Windows/Linux/macOS). Utilise plyer pour les notifications système, sans dépendance PyQt5. Remplace les dialogues Qt par des toasts non-bloquants. Thread-safe avec rate limiting (1 notification / 2 secondes max). Les messages utilisateur sont formatés via `agent_v1.ui.messages` qui convertit les codes techniques (target_not_found, etc.) en français naturel. Hiérarchie des notifications (cf. messages.NiveauMessage) : - INFO : auto-dismiss en ~4s, rate-limité classique - ATTENTION : auto-dismiss en ~7s, rate-limité classique - BLOCAGE : persistant (15s+), bypass du rate limit """ import logging import threading import time from typing import Optional from .messages import ( MessageUtilisateur, NiveauMessage, formatter_cible_non_trouvee, formatter_connexion_perdue, formatter_connexion_retablie, formatter_debut_workflow, formatter_ecran_inchange, formatter_erreur_generique, formatter_etape_workflow, formatter_fenetre_incorrecte, formatter_fin_workflow, formatter_mode_apprentissage, formatter_ralentissement, formatter_retry, ) logger = logging.getLogger(__name__) # Import conditionnel de plyer — fallback silencieux si absent try: from plyer import notification as _plyer_notification _PLYER_AVAILABLE = True except ImportError: _plyer_notification = None _PLYER_AVAILABLE = False logger.warning( "plyer non installé — les notifications toast sont désactivées. " "Installer avec : pip install plyer" ) # Nom de l'application affiché dans les toasts APP_NAME = "Léa" # Intervalle minimum entre deux notifications (secondes) RATE_LIMIT_SECONDS = 2 class NotificationManager: """ Gestionnaire centralisé de notifications toast. Thread-safe : peut être appelé depuis n'importe quel thread. Rate limiting : une seule notification toutes les 2 secondes, les notifications excédentaires sont ignorées (pas de file d'attente pour éviter un flood différé). """ def __init__(self, icon_path: Optional[str] = None): """ Initialise le gestionnaire. Args: icon_path: Chemin vers l'icône (.ico/.png) pour les toasts. None = icône par défaut du système. """ self._icon_path = icon_path self._lock = threading.Lock() self._last_notification_time: float = 0.0 # ------------------------------------------------------------------ # # Méthode générique # ------------------------------------------------------------------ # def notify( self, title: str, message: str, timeout: int = 5, bypass_rate_limit: bool = False, ) -> bool: """ Affiche une notification toast. Args: title: Titre de la notification. message: Corps du message. timeout: Durée d'affichage en secondes. bypass_rate_limit: Si True, ignore le rate limit (pour les blocages importants qui ne doivent pas être écrasés). Returns: True si la notification a été envoyée, False sinon (plyer absent ou rate limit atteint). """ if not _PLYER_AVAILABLE: logger.debug("Notification ignorée (plyer absent) : %s", title) return False if not bypass_rate_limit: with self._lock: now = time.monotonic() elapsed = now - self._last_notification_time if elapsed < RATE_LIMIT_SECONDS: logger.debug( "Notification ignorée (rate limit, %.1fs restantes) : %s", RATE_LIMIT_SECONDS - elapsed, title, ) return False self._last_notification_time = now else: with self._lock: self._last_notification_time = time.monotonic() # Envoi dans un thread dédié pour ne jamais bloquer l'appelant thread = threading.Thread( target=self._send, args=(title, message, timeout), daemon=True, ) thread.start() return True def notify_message(self, msg: MessageUtilisateur) -> bool: """Envoyer un MessageUtilisateur structuré (niveau, titre, corps). Les messages BLOCAGE bypass le rate limit pour garantir que l'utilisateur voit qu'on a besoin de lui. """ bypass = msg.niveau == NiveauMessage.BLOCAGE # Log aussi pour tracer dans les logs fichiers self._log_message(msg) return self.notify( title=msg.titre, message=msg.corps, timeout=msg.duree_s, bypass_rate_limit=bypass, ) @staticmethod def _log_message(msg: MessageUtilisateur) -> None: """Logger un message utilisateur avec le niveau approprié. Les logs agents sont plus lisibles quand on route info → INFO, attention → WARNING, blocage → ERROR, avec un préfixe [LEA]. """ prefix = f"[LEA] {msg.titre}: {msg.corps}" if msg.niveau == NiveauMessage.INFO: logger.info(prefix) elif msg.niveau == NiveauMessage.ATTENTION: logger.warning(prefix) elif msg.niveau == NiveauMessage.BLOCAGE: logger.error(prefix) else: logger.info(prefix) def _send(self, title: str, message: str, timeout: int) -> None: """Envoi effectif de la notification (exécuté dans un thread dédié).""" try: # Windows limite les balloon tips à 256 caractères if len(title) > 63: title = title[:60] + "..." if len(message) > 200: message = message[:197] + "..." _plyer_notification.notify( title=title, message=message, app_name=APP_NAME, app_icon=self._icon_path, timeout=timeout, ) except Exception: logger.exception("Erreur lors de l'envoi de la notification toast") # ------------------------------------------------------------------ # # Méthodes métier # ------------------------------------------------------------------ # def greet(self) -> bool: """Notification de bienvenue au démarrage. Inclut la divulgation IA obligatoire (Article 50, Règlement IA). """ return self.notify( title=APP_NAME, message=( "Bonjour ! Léa est prête. " "Je suis une assistante basée sur l'intelligence artificielle." ), timeout=7, ) def session_started(self, workflow_name: str) -> bool: """Notification de début de session.""" return self.notify( title=APP_NAME, message="C'est parti ! Je regarde et je mémorise.", timeout=5, ) def session_ended(self, action_count: int) -> bool: """Notification de fin de session avec le nombre d'actions.""" return self.notify( title=APP_NAME, message=f"C'est noté ! J'ai bien compris les {action_count} étapes.", timeout=5, ) def workflow_learned(self, name: str) -> bool: """Notification quand une tâche a été apprise.""" return self.notify( title=APP_NAME, message=f"J'ai appris '{name}' ! Je peux la refaire quand vous voulez.", timeout=7, ) def replay_started(self, workflow_name: str, step_count: int) -> bool: """Notification de début de replay. Transparence obligatoire en mode autonome (Article 50, Règlement IA) : l'utilisateur doit savoir qu'un système d'IA agit sur son écran. """ return self.notify( title=APP_NAME, message=( f"Le système d'intelligence artificielle exécute la tâche " f"'{workflow_name}' sur votre écran." ), timeout=7, ) def replay_step(self, current: int, total: int, description: str) -> bool: """Notification de progression d'une étape de replay.""" return self.notify( title=APP_NAME, message=f"Étape {current}/{total} : {description}", timeout=3, ) def replay_target_not_found( self, target_description: str, window_title: Optional[str] = None, ) -> bool: """Notification quand un élément n'est pas trouvé pendant le replay. Le replay est mis en pause et attend une intervention humaine. Utilise `messages.formatter_cible_non_trouvee` pour un message en français naturel. """ msg = formatter_cible_non_trouvee(target_description, window_title) return self.notify_message(msg) def replay_wrong_window(self, current_title: str, expected_title: str) -> bool: """Notification quand la fenêtre active n'est pas celle attendue.""" msg = formatter_fenetre_incorrecte(current_title, expected_title) return self.notify_message(msg) def replay_no_screen_change(self, action_type: str = "") -> bool: """Notification quand une action n'a pas eu d'effet visible.""" msg = formatter_ecran_inchange(action_type) return self.notify_message(msg) def replay_learning_mode( self, raison: str = "", target_description: str = "", window_title: Optional[str] = None, ) -> bool: """Notification quand Léa passe en mode apprentissage. Léa est bloquée et demande à l'utilisateur de montrer comment faire. Message humble et actionnable pour un utilisateur non technique. """ msg = formatter_mode_apprentissage(raison, target_description, window_title) return self.notify_message(msg) def replay_retry(self, action_type: str = "", tentative: int = 2) -> bool: """Notification quand Léa retente une action.""" msg = formatter_retry(action_type, tentative) return self.notify_message(msg) def replay_slow(self) -> bool: """Notification quand Léa va plus lentement que prévu.""" msg = formatter_ralentissement() return self.notify_message(msg) def replay_finished( self, success: bool, workflow_name: str, step_count: int = 0, duration_s: float = 0.0, ) -> bool: """Notification de fin de replay (succès ou échec).""" msg = formatter_fin_workflow(success, workflow_name, step_count, duration_s) return self.notify_message(msg) def replay_workflow_started(self, workflow_name: str, step_count: int = 0) -> bool: """Notification de début de workflow (remplace `replay_started`).""" msg = formatter_debut_workflow(workflow_name, step_count) return self.notify_message(msg) def replay_step_progress( self, current: int, total: int, description: str = "", ) -> bool: """Notification de progression d'une étape (niveau INFO).""" msg = formatter_etape_workflow(current, total, description) return self.notify_message(msg) def connection_changed(self, connected: bool, server_host: str = "") -> bool: """Notification de changement d'état de la connexion serveur.""" if connected: msg = formatter_connexion_retablie() else: msg = formatter_connexion_perdue(server_host) return self.notify_message(msg) def error(self, message: str) -> bool: """Notification d'erreur générique. Essaie d'abord de détecter un motif technique connu et de formater correctement, sinon fallback sur un message générique aidant. """ msg = formatter_erreur_generique(message) return self.notify_message(msg)