Files
rpa_vision_v3/agent_v0/agent_v1/ui/notifications.py
Dom ae65be2555 chore: ajouter agent_v0/ au tracking git (était un repo embarqué)
Suppression du .git embarqué dans agent_v0/ — le code est maintenant
tracké normalement dans le repo principal.
Inclut : agent_v1 (client), server_v1 (streaming), lea_ui (chat client)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 11:12:23 +01:00

207 lines
6.8 KiB
Python

# agent_v1/ui/notifications.py
"""
Gestionnaire de notifications toast natives (Windows/Linux/macOS).
Utilise plyer pour les notifications système, sans dépendance PyQt5.
Remplace les dialogues Qt par des toasts non-bloquants.
Thread-safe avec rate limiting (1 notification / 2 secondes max).
"""
import logging
import threading
import time
from typing import Optional
logger = logging.getLogger(__name__)
# Import conditionnel de plyer — fallback silencieux si absent
try:
from plyer import notification as _plyer_notification
_PLYER_AVAILABLE = True
except ImportError:
_plyer_notification = None
_PLYER_AVAILABLE = False
logger.warning(
"plyer non installé — les notifications toast sont désactivées. "
"Installer avec : pip install plyer"
)
# Nom de l'application affiché dans les toasts
APP_NAME = "Léa"
# Intervalle minimum entre deux notifications (secondes)
RATE_LIMIT_SECONDS = 2
class NotificationManager:
"""
Gestionnaire centralisé de notifications toast.
Thread-safe : peut être appelé depuis n'importe quel thread.
Rate limiting : une seule notification toutes les 2 secondes,
les notifications excédentaires sont ignorées (pas de file d'attente
pour éviter un flood différé).
"""
def __init__(self, icon_path: Optional[str] = None):
"""
Initialise le gestionnaire.
Args:
icon_path: Chemin vers l'icône (.ico/.png) pour les toasts.
None = icône par défaut du système.
"""
self._icon_path = icon_path
self._lock = threading.Lock()
self._last_notification_time: float = 0.0
# ------------------------------------------------------------------ #
# Méthode générique
# ------------------------------------------------------------------ #
def notify(self, title: str, message: str, timeout: int = 5) -> bool:
"""
Affiche une notification toast.
Args:
title: Titre de la notification.
message: Corps du message.
timeout: Durée d'affichage en secondes.
Returns:
True si la notification a été envoyée, False sinon
(plyer absent ou rate limit atteint).
"""
if not _PLYER_AVAILABLE:
logger.debug("Notification ignorée (plyer absent) : %s", title)
return False
with self._lock:
now = time.monotonic()
elapsed = now - self._last_notification_time
if elapsed < RATE_LIMIT_SECONDS:
logger.debug(
"Notification ignorée (rate limit, %.1fs restantes) : %s",
RATE_LIMIT_SECONDS - elapsed,
title,
)
return False
self._last_notification_time = now
# Envoi dans un thread dédié pour ne jamais bloquer l'appelant
thread = threading.Thread(
target=self._send,
args=(title, message, timeout),
daemon=True,
)
thread.start()
return True
def _send(self, title: str, message: str, timeout: int) -> None:
"""Envoi effectif de la notification (exécuté dans un thread dédié)."""
try:
# Windows limite les balloon tips à 256 caractères
if len(title) > 63:
title = title[:60] + "..."
if len(message) > 200:
message = message[:197] + "..."
_plyer_notification.notify(
title=title,
message=message,
app_name=APP_NAME,
app_icon=self._icon_path,
timeout=timeout,
)
except Exception:
logger.exception("Erreur lors de l'envoi de la notification toast")
# ------------------------------------------------------------------ #
# Méthodes métier
# ------------------------------------------------------------------ #
def greet(self) -> bool:
"""Notification de bienvenue au démarrage."""
return self.notify(
title=APP_NAME,
message="Bonjour ! Léa est prête.",
timeout=5,
)
def session_started(self, workflow_name: str) -> bool:
"""Notification de début de session."""
return self.notify(
title=APP_NAME,
message="C'est parti ! Je regarde et je mémorise.",
timeout=5,
)
def session_ended(self, action_count: int) -> bool:
"""Notification de fin de session avec le nombre d'actions."""
return self.notify(
title=APP_NAME,
message=f"C'est noté ! J'ai bien compris les {action_count} étapes.",
timeout=5,
)
def workflow_learned(self, name: str) -> bool:
"""Notification quand une tâche a été apprise."""
return self.notify(
title=APP_NAME,
message=f"J'ai appris '{name}' ! Je peux la refaire quand vous voulez.",
timeout=7,
)
def replay_started(self, workflow_name: str, step_count: int) -> bool:
"""Notification de début de replay."""
return self.notify(
title=APP_NAME,
message=f"Je m'en occupe ! '{workflow_name}' en cours...",
timeout=5,
)
def replay_step(self, current: int, total: int, description: str) -> bool:
"""Notification de progression d'une étape de replay."""
return self.notify(
title=APP_NAME,
message=f"Étape {current}/{total} : {description}",
timeout=3,
)
def replay_finished(self, success: bool, workflow_name: str) -> bool:
"""Notification de fin de replay (succès ou échec)."""
if success:
return self.notify(
title=APP_NAME,
message="C'est fait ! Tout s'est bien passé.",
timeout=5,
)
else:
return self.notify(
title=APP_NAME,
message="Hmm, j'ai eu un souci. Vous pouvez me remontrer ?",
timeout=7,
)
def connection_changed(self, connected: bool, server_host: str) -> bool:
"""Notification de changement d'état de la connexion serveur."""
if connected:
return self.notify(
title=APP_NAME,
message="Connectée au serveur.",
timeout=5,
)
else:
return self.notify(
title=APP_NAME,
message="J'ai perdu la connexion avec le serveur.",
timeout=7,
)
def error(self, message: str) -> bool:
"""Notification d'erreur."""
return self.notify(
title=APP_NAME,
message=f"Oups, un problème : {message}",
timeout=10,
)