diff --git a/agent_v0/agent_v1/core/executor.py b/agent_v0/agent_v1/core/executor.py index b2424a773..c50256b58 100644 --- a/agent_v0/agent_v1/core/executor.py +++ b/agent_v0/agent_v1/core/executor.py @@ -98,20 +98,43 @@ class ActionExecutorV1: @property def notifier(self): - """Instance NotificationManager paresseuse.""" + """Instance NotificationManager paresseuse. + + Retourne un objet avec des méthodes no-op si NotificationManager + n'est pas disponible (tkinter / plyer absents), pour que l'executor + ne plante jamais à cause de l'UI. + """ if self._notification_manager is None: try: from ..ui.notifications import NotificationManager self._notification_manager = NotificationManager() except Exception as e: logger.debug(f"NotificationManager indisponible : {e}") - # Retourner un objet factice qui ne fait rien + # Retourner un objet factice qui ne fait rien — couvre toutes + # les méthodes possibles via __getattr__. class _Noop: - def replay_target_not_found(self, *a, **kw): - return False + def __getattr__(self, name): + return lambda *a, **kw: False self._notification_manager = _Noop() return self._notification_manager + @property + def activity_panel(self): + """Instance ActivityPanel paresseuse (singleton). + + Fallback silencieux si le panel ne peut pas être créé. + """ + try: + from ..ui.activity_panel import get_activity_panel + return get_activity_panel() + except Exception as e: + logger.debug(f"ActivityPanel indisponible : {e}") + + class _Noop: + def __getattr__(self, name): + return lambda *a, **kw: None + return _Noop() + def _auth_headers(self) -> dict: """Headers d'authentification Bearer pour les requetes au serveur.""" if self._api_token: @@ -417,22 +440,29 @@ class ActionExecutorV1: or current_title.lower() in expected_title.lower() ) # Ignorer la fenêtre de Léa elle-même (overlay agent) - _lea_windows = ("léa", "lea —", "léa —", "lea -", "léa -", "lea assistante", "léa assistante") - is_lea_window = any(p in current_title.lower() for p in _lea_windows) + # On utilise `messages.est_fenetre_lea` centralisé pour la + # cohérence avec les autres modules (tests, activity panel). + from ..ui.messages import est_fenetre_lea + is_lea_window = est_fenetre_lea(current_title) if not title_match and not is_lea_window: logger.warning( - f"PRÉ-VÉRIF ÉCHOUÉE : attendu '{expected_title}', " - f"actuel '{current_title}' — STOP" + f"[LEA] Fenêtre incorrecte : attendu '{expected_title}', " + f"actuel '{current_title}'" ) print(f" [PRÉ-VÉRIF] STOP — fenêtre '{current_title}' ≠ attendu '{expected_title}'") + # Notification utilisateur en français naturel + try: + self.notifier.replay_wrong_window(current_title, expected_title) + except Exception: + pass result["success"] = False result["error"] = f"Fenêtre incorrecte: '{current_title}' (attendu: '{expected_title}')" return result elif is_lea_window: - logger.info(f"PRÉ-VÉRIF : fenêtre Léa détectée, ignorée — on continue") + logger.info("[LEA] Fenêtre de Léa détectée — ignorée, on continue") else: - logger.info(f"PRÉ-VÉRIF OK : '{current_title}'") + logger.info(f"[LEA] Pré-vérif OK : '{current_title}'") # ── OBSERVER : pré-analyse écran avant résolution ── # Détecte popups, dialogues, états inattendus AVANT de chercher la cible. @@ -549,7 +579,10 @@ class ActionExecutorV1: result["target_spec"] = target_spec result["screenshot"] = self._capture_screenshot_b64() result["warning"] = "visual_resolve_failed" - self.notifier.replay_target_not_found(target_desc) + self.notifier.replay_target_not_found( + target_desc, + target_spec.get("window_title", ""), + ) return result elif policy_decision.decision == Decision.SKIP: @@ -560,7 +593,10 @@ class ActionExecutorV1: elif policy_decision.decision == Decision.ABORT: result["success"] = False result["error"] = f"policy_abort:{target_desc}" - self.notifier.replay_target_not_found(target_desc) + self.notifier.replay_target_not_found( + target_desc, + target_spec.get("window_title", ""), + ) return result else: # SUPERVISE ou CONTINUE @@ -570,7 +606,10 @@ class ActionExecutorV1: result["target_spec"] = target_spec result["screenshot"] = self._capture_screenshot_b64() result["warning"] = "visual_resolve_failed" - self.notifier.replay_target_not_found(target_desc) + self.notifier.replay_target_not_found( + target_desc, + target_spec.get("window_title", ""), + ) return result real_x = int(x_pct * width) @@ -729,9 +768,14 @@ class ActionExecutorV1: f"l'action n'a pas eu d'effet visible" ) logger.warning( - f"Action {action_id} ({action_type}) : ecran inchange " - f"— action sans effet visible" + f"[LEA] Écran inchangé après {action_type} " + f"(action_id={action_id}) — pas d'effet visible" ) + # Notifier l'utilisateur en français naturel (niveau ATTENTION) + try: + self.notifier.replay_no_screen_change(action_type) + except Exception: + pass else: print(f" [OK] Changement d'ecran detecte apres {action_type}") else: diff --git a/agent_v0/agent_v1/main.py b/agent_v0/agent_v1/main.py index 09e4aae50..89e70006c 100644 --- a/agent_v0/agent_v1/main.py +++ b/agent_v0/agent_v1/main.py @@ -38,8 +38,19 @@ except (ImportError, ValueError): except ImportError: LeaServerClient = None -# Configuration du logging -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") +# Configuration du logging — format structuré et lisible pour un TIM +# Niveau de détail : INFO par défaut, DEBUG si RPA_AGENT_DEBUG=1 +_log_level = logging.DEBUG if os.environ.get("RPA_AGENT_DEBUG") == "1" else logging.INFO +logging.basicConfig( + level=_log_level, + format="%(asctime)s %(levelname)-7s %(name)-25s %(message)s", + datefmt="%H:%M:%S", +) + +# Réduire le bruit de certaines libs +for _noisy in ("urllib3", "requests.packages.urllib3", "PIL", "mss"): + logging.getLogger(_noisy).setLevel(logging.WARNING) + logger = logging.getLogger(__name__) # Intervalle de polling replay (secondes) @@ -371,12 +382,22 @@ class AgentV1: time.sleep(5) def stop_session(self): - # Arrêter la capture et le streaming de la session d'enregistrement - if self.captor: self.captor.stop() - if self.streamer: self.streamer.stop() - logger.info(f"Session {self.session_id} terminée.") + # Sauvegarder le session_id avant de l'annuler (pour les logs) + ended_session_id = self.session_id - # Reset le session_id pour que le poll replay utilise l'ID stable + # Arrêter la capture d'abord (plus d'events entrants) + if self.captor: self.captor.stop() + + # Attendre que les events en cours de traitement dans _on_event_bridge + # aient le temps d'être envoyés au streamer (capture duale + push) + import time + time.sleep(1.5) + + # Maintenant arrêter le streamer (drain queue + finalize) + if self.streamer: self.streamer.stop() + logger.info(f"Session {ended_session_id} terminée.") + + # Reset le session_id APRÈS le stop complet du streamer self.session_id = None # Reset le backoff de l'executor pour reprendre le polling immédiatement @@ -403,6 +424,7 @@ class AgentV1: """Capture périodique pour donner du contexte au stagiaire. Déduplication : n'envoie que si l'écran a changé. Tourne tant que session_id est défini (= enregistrement actif). + Enrichi avec le titre de la fenêtre active pour contextualisation. """ while self.running and self.session_id: try: @@ -413,7 +435,17 @@ class AgentV1: if img_hash != self._last_heartbeat_hash: self._last_heartbeat_hash = img_hash self.streamer.push_image(full_path, f"heartbeat_{int(time.time())}") - self.streamer.push_event({"type": "heartbeat", "image": full_path, "timestamp": time.time(), "machine_id": self.machine_id}) + heartbeat_event = { + "type": "heartbeat", + "image": full_path, + "timestamp": time.time(), + "machine_id": self.machine_id, + } + # Ajouter le titre de la fenêtre active (léger, pas de crop) + window_title = self.vision.get_active_window_title() + if window_title: + heartbeat_event["active_window_title"] = window_title + self.streamer.push_event(heartbeat_event) except Exception as e: logger.error(f"Heartbeat error: {e}") time.sleep(5) @@ -448,20 +480,33 @@ class AgentV1: event["screenshot_context"] = full_path self.streamer.push_image(full_path, f"focus_{int(time.time())}") - # 🔴 Capture Interactive (Dual) + # Capture Interactive (Dual + Fenêtre active) if event["type"] in ["mouse_click", "key_combo"]: self.shot_counter += 1 shot_id = f"shot_{self.shot_counter:04d}" - + pos = event.get("pos", (0, 0)) capture_info = self.vision.capture_dual(pos[0], pos[1], shot_id) - + event["screenshot_id"] = shot_id event["vision_info"] = capture_info - + + # Enrichir l'event avec les métadonnées de la fenêtre active + # (titre, rect, coordonnées clic relatives, taille fenêtre) + window_capture = capture_info.get("window_capture") + if window_capture: + event["window_capture"] = { + "title": window_capture.get("window_title", ""), + "app_name": window_capture.get("app_name", ""), + "rect": window_capture.get("window_rect"), + "click_relative": window_capture.get("click_in_window"), + "window_size": window_capture.get("window_size"), + "click_inside_window": window_capture.get("click_inside_window", True), + } + self._stream_capture_info(capture_info, shot_id) - - # 🕒 POST-ACTION : Capture du résultat après 1s (pour voir le résultat du clic) + + # POST-ACTION : Capture du résultat après 1s (pour voir le résultat du clic) threading.Timer(1.0, self._capture_result, args=(shot_id,)).start() self.ui.update_stats(self.shot_counter) @@ -481,6 +526,12 @@ class AgentV1: self.streamer.push_image(capture_info["full"], f"{shot_id}_full") if "crop" in capture_info: self.streamer.push_image(capture_info["crop"], f"{shot_id}_crop") + # Streamer l'image de la fenêtre active si disponible + window_capture = capture_info.get("window_capture") + if window_capture and "window_image" in window_capture: + self.streamer.push_image( + window_capture["window_image"], f"{shot_id}_window" + ) def run(self): self.ui.run() diff --git a/agent_v0/agent_v1/ui/activity_panel.py b/agent_v0/agent_v1/ui/activity_panel.py new file mode 100644 index 000000000..a80978627 --- /dev/null +++ b/agent_v0/agent_v1/ui/activity_panel.py @@ -0,0 +1,418 @@ +# agent_v1/ui/activity_panel.py +""" +Panel d'activité temps réel de Léa. + +Affiche à l'utilisateur ce que Léa fait *maintenant* : + - État courant (Observe / Cherche / Agit / Vérifie / Bloquée) + - Action en cours (ex: "Clic sur Rechercher") + - Progression (ex: "3/15") + - Temps écoulé depuis le début du workflow + +Contraintes : + - Fallback silencieux si tkinter absent (ne crash jamais) + - Thread-safe (mises à jour depuis les threads de replay) + - Pas de dépendance à PyQt5 (seulement tkinter, déjà utilisé par chat_window) + +Utilisation : + panel = ActivityPanel() + panel.definir_workflow("Saisie patient", nb_etapes=15) + panel.mettre_a_jour(etat=EtatLea.AGIT, action="Clic sur Valider", etape=3) + panel.masquer() +""" + +from __future__ import annotations + +import logging +import threading +import time +from dataclasses import dataclass, field +from enum import Enum +from typing import Optional + +logger = logging.getLogger(__name__) + + +class EtatLea(Enum): + """États macroscopiques de Léa pendant un replay.""" + + INACTIVE = ("inactive", "Prête", "#808080") # Gris + OBSERVE = ("observe", "Observe", "#4A90E2") # Bleu + CHERCHE = ("cherche", "Cherche", "#F5A623") # Orange + AGIT = ("agit", "Agit", "#7ED321") # Vert + VERIFIE = ("verifie", "Vérifie", "#9013FE") # Violet + BLOQUEE = ("bloquee", "Bloquée", "#D0021B") # Rouge + TERMINE = ("termine", "Terminé", "#50E3C2") # Turquoise + + def __init__(self, code: str, libelle: str, couleur: str) -> None: + self.code = code + self.libelle = libelle + self.couleur = couleur + + +@dataclass +class EtatActivite: + """Instantané de l'activité courante de Léa. + + Utilisé par le panel et exposé par `ActivityPanel.snapshot()` pour les + tests (sans dépendre de tkinter). + """ + + etat: EtatLea = EtatLea.INACTIVE + action_courante: str = "" + nom_workflow: str = "" + etape: int = 0 + nb_etapes: int = 0 + debut_timestamp: float = 0.0 + dernier_message: str = "" + + def temps_ecoule_s(self) -> float: + """Temps écoulé depuis le début du workflow (secondes).""" + if self.debut_timestamp <= 0: + return 0.0 + return max(0.0, time.time() - self.debut_timestamp) + + def progression_texte(self) -> str: + """Représentation textuelle de la progression (ex: '3/15').""" + if self.nb_etapes <= 0: + return "" + return f"{self.etape}/{self.nb_etapes}" + + def temps_ecoule_texte(self) -> str: + """Représentation humaine du temps écoulé (ex: '12s', '1m24s').""" + s = int(self.temps_ecoule_s()) + if s < 60: + return f"{s}s" + return f"{s // 60}m{s % 60:02d}s" + + def to_dict(self) -> dict: + """Sérialiser pour le logging et les tests.""" + return { + "etat": self.etat.code, + "etat_libelle": self.etat.libelle, + "action_courante": self.action_courante, + "nom_workflow": self.nom_workflow, + "etape": self.etape, + "nb_etapes": self.nb_etapes, + "progression": self.progression_texte(), + "temps_ecoule_s": round(self.temps_ecoule_s(), 1), + "dernier_message": self.dernier_message, + } + + +class ActivityPanel: + """Panel d'activité de Léa. + + Thread-safe. Le panel tkinter est créé à la demande (lazy) et uniquement + si tkinter est disponible. Toutes les méthodes sont safe à appeler même + si l'UI n'est pas dispo (fallback silencieux). + """ + + def __init__(self, activer_ui: bool = True) -> None: + self._lock = threading.RLock() + self._etat = EtatActivite() + self._activer_ui = activer_ui + # UI tkinter (créée à la demande dans le thread UI) + self._tk_root = None + self._tk_labels: dict = {} + self._ui_disponible = None # Lazy : résolu au premier usage + self._listeners = [] # Callbacks pour les changements d'état + + # ------------------------------------------------------------------ + # API publique (thread-safe) + # ------------------------------------------------------------------ + + def definir_workflow(self, nom: str, nb_etapes: int = 0) -> None: + """Démarrer le suivi d'un nouveau workflow.""" + with self._lock: + self._etat = EtatActivite( + etat=EtatLea.OBSERVE, + nom_workflow=nom, + nb_etapes=nb_etapes, + debut_timestamp=time.time(), + ) + self._notifier_changement() + self._rafraichir_ui() + logger.info(f"[ACTIVITY] Workflow démarré : {nom} ({nb_etapes} étapes)") + + def mettre_a_jour( + self, + etat: Optional[EtatLea] = None, + action: Optional[str] = None, + etape: Optional[int] = None, + message: Optional[str] = None, + ) -> None: + """Mettre à jour l'état affiché. + + Tous les paramètres sont optionnels — on ne met à jour que ce qui est + fourni. Les autres champs conservent leur valeur actuelle. + """ + with self._lock: + if etat is not None: + self._etat.etat = etat + if action is not None: + self._etat.action_courante = action + if etape is not None: + self._etat.etape = etape + if message is not None: + self._etat.dernier_message = message + + self._notifier_changement() + self._rafraichir_ui() + + def terminer(self, succes: bool = True) -> None: + """Marquer le workflow comme terminé.""" + with self._lock: + self._etat.etat = EtatLea.TERMINE if succes else EtatLea.BLOQUEE + if not succes: + self._etat.dernier_message = ( + self._etat.dernier_message or "Léa a rendu la main" + ) + self._notifier_changement() + self._rafraichir_ui() + + def reinitialiser(self) -> None: + """Remettre le panel en état inactif.""" + with self._lock: + self._etat = EtatActivite() + self._notifier_changement() + self._rafraichir_ui() + + def snapshot(self) -> EtatActivite: + """Obtenir un instantané immuable de l'état courant (pour les tests).""" + with self._lock: + return EtatActivite( + etat=self._etat.etat, + action_courante=self._etat.action_courante, + nom_workflow=self._etat.nom_workflow, + etape=self._etat.etape, + nb_etapes=self._etat.nb_etapes, + debut_timestamp=self._etat.debut_timestamp, + dernier_message=self._etat.dernier_message, + ) + + def masquer(self) -> None: + """Masquer le panel UI si affiché.""" + if self._tk_root is not None: + try: + self._tk_root.withdraw() + except Exception: + pass + + def afficher(self) -> None: + """Afficher le panel UI si disponible.""" + self._creer_ui_si_besoin() + if self._tk_root is not None: + try: + self._tk_root.deiconify() + except Exception: + pass + + def on_change(self, callback) -> None: + """Enregistrer un listener appelé à chaque changement d'état.""" + with self._lock: + self._listeners.append(callback) + + # ------------------------------------------------------------------ + # Gestion UI tkinter (lazy, fallback silencieux) + # ------------------------------------------------------------------ + + def _creer_ui_si_besoin(self) -> None: + """Créer la fenêtre tkinter au premier usage (lazy).""" + if not self._activer_ui: + return + if self._tk_root is not None: + return + if self._ui_disponible is False: + return # Déjà testé et indisponible + + try: + import tkinter as tk + except Exception as e: + logger.debug(f"[ACTIVITY] tkinter indisponible : {e}") + self._ui_disponible = False + return + + try: + self._tk_root = tk.Toplevel() if _tk_root_existe() else tk.Tk() + self._tk_root.title("Léa — Activité") + self._tk_root.geometry("340x180+40+40") + self._tk_root.attributes("-topmost", True) + self._tk_root.resizable(False, False) + self._tk_root.configure(bg="#1E1E1E") + + titre = tk.Label( + self._tk_root, + text="Léa", + font=("Segoe UI", 14, "bold"), + fg="#FFFFFF", + bg="#1E1E1E", + ) + titre.pack(pady=(10, 2)) + + self._tk_labels["etat"] = tk.Label( + self._tk_root, + text="Prête", + font=("Segoe UI", 11), + fg="#808080", + bg="#1E1E1E", + ) + self._tk_labels["etat"].pack() + + self._tk_labels["action"] = tk.Label( + self._tk_root, + text="", + font=("Segoe UI", 10), + fg="#FFFFFF", + bg="#1E1E1E", + wraplength=300, + ) + self._tk_labels["action"].pack(pady=(8, 2)) + + self._tk_labels["progression"] = tk.Label( + self._tk_root, + text="", + font=("Segoe UI", 9), + fg="#B0B0B0", + bg="#1E1E1E", + ) + self._tk_labels["progression"].pack() + + self._tk_labels["temps"] = tk.Label( + self._tk_root, + text="", + font=("Segoe UI", 9), + fg="#808080", + bg="#1E1E1E", + ) + self._tk_labels["temps"].pack(pady=(4, 0)) + + self._tk_labels["message"] = tk.Label( + self._tk_root, + text="", + font=("Segoe UI", 9, "italic"), + fg="#B0B0B0", + bg="#1E1E1E", + wraplength=300, + ) + self._tk_labels["message"].pack(pady=(6, 10)) + + # Masquer par défaut : on affiche seulement pendant un workflow + self._tk_root.withdraw() + self._ui_disponible = True + except Exception as e: + logger.debug(f"[ACTIVITY] Impossible de créer l'UI : {e}") + self._ui_disponible = False + self._tk_root = None + + def _rafraichir_ui(self) -> None: + """Mettre à jour les labels tkinter (safe si l'UI n'existe pas).""" + if not self._activer_ui or self._ui_disponible is False: + return + self._creer_ui_si_besoin() + if self._tk_root is None: + return + + try: + with self._lock: + snap = self.snapshot() + + # Utiliser after(0) pour rester dans le thread UI tkinter + def _update(): + try: + self._tk_labels["etat"].config( + text=snap.etat.libelle, + fg=snap.etat.couleur, + ) + if snap.action_courante: + self._tk_labels["action"].config(text=snap.action_courante) + else: + self._tk_labels["action"].config(text="") + + prog = snap.progression_texte() + if prog and snap.nom_workflow: + self._tk_labels["progression"].config( + text=f"« {snap.nom_workflow} » — {prog}" + ) + elif snap.nom_workflow: + self._tk_labels["progression"].config( + text=f"« {snap.nom_workflow} »" + ) + else: + self._tk_labels["progression"].config(text="") + + if snap.debut_timestamp > 0: + self._tk_labels["temps"].config( + text=f"⏱ {snap.temps_ecoule_texte()}" + ) + else: + self._tk_labels["temps"].config(text="") + + self._tk_labels["message"].config(text=snap.dernier_message) + + # Afficher automatiquement si actif + if snap.etat != EtatLea.INACTIVE: + self._tk_root.deiconify() + except Exception: + pass + + try: + self._tk_root.after(0, _update) + except Exception: + # Si le root a été détruit + self._tk_root = None + self._ui_disponible = False + except Exception as e: + logger.debug(f"[ACTIVITY] Erreur rafraîchissement UI : {e}") + + def _notifier_changement(self) -> None: + """Notifier tous les listeners du changement d'état.""" + with self._lock: + listeners = list(self._listeners) + snap = self.snapshot() + + for cb in listeners: + try: + cb(snap) + except Exception as e: + logger.debug(f"[ACTIVITY] Listener erreur : {e}") + + +def _tk_root_existe() -> bool: + """Vérifier si un root tkinter existe déjà (pour créer un Toplevel).""" + try: + import tkinter as tk + + default_root = getattr(tk, "_default_root", None) + return default_root is not None + except Exception: + return False + + +# ============================================================================ +# Singleton global (optionnel) +# ============================================================================ + + +_INSTANCE_GLOBALE: Optional[ActivityPanel] = None +_LOCK_SINGLETON = threading.Lock() + + +def get_activity_panel(activer_ui: bool = True) -> ActivityPanel: + """Obtenir l'instance globale du panel d'activité (lazy).""" + global _INSTANCE_GLOBALE + with _LOCK_SINGLETON: + if _INSTANCE_GLOBALE is None: + _INSTANCE_GLOBALE = ActivityPanel(activer_ui=activer_ui) + return _INSTANCE_GLOBALE + + +def reset_activity_panel() -> None: + """Réinitialiser le singleton (utile pour les tests).""" + global _INSTANCE_GLOBALE + with _LOCK_SINGLETON: + if _INSTANCE_GLOBALE is not None: + try: + _INSTANCE_GLOBALE.masquer() + except Exception: + pass + _INSTANCE_GLOBALE = None diff --git a/agent_v0/agent_v1/ui/messages.py b/agent_v0/agent_v1/ui/messages.py new file mode 100644 index 000000000..19bbdf071 --- /dev/null +++ b/agent_v0/agent_v1/ui/messages.py @@ -0,0 +1,444 @@ +# agent_v1/ui/messages.py +""" +Formatage des messages utilisateur pour Léa. + +Convertit les codes d'erreur techniques (`target_not_found`, `no_screen_change`...) +en phrases en français naturel, orientées action, adaptées à un utilisateur non +technique (secrétaire médicale, TIM). + +Trois niveaux de sévérité sont définis : + - INFO — Léa fait son travail normalement + - ATTENTION — Quelque chose de léger (ralentissement, retry) + - BLOCAGE — Léa a besoin d'aide, elle rend la main + +Le module est 100% pur (pas d'I/O, pas d'UI) : testable sans mocks lourds. +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass +from enum import Enum +from typing import Optional + + +class NiveauMessage(Enum): + """Niveaux hiérarchiques des messages affichés à l'utilisateur.""" + + INFO = "info" # Fond vert clair, disparaît tout seul, 3-5s + ATTENTION = "attention" # Fond orange clair, disparaît tout seul, 7s + BLOCAGE = "blocage" # Fond rouge clair, reste affiché, 15s+ + + +# Durée d'affichage par défaut (secondes), par niveau +DUREE_PAR_NIVEAU: dict[NiveauMessage, int] = { + NiveauMessage.INFO: 4, + NiveauMessage.ATTENTION: 7, + NiveauMessage.BLOCAGE: 15, +} + +# Icône textuelle par niveau (compatible plyer/Windows/Linux) +ICONE_PAR_NIVEAU: dict[NiveauMessage, str] = { + NiveauMessage.INFO: "i", + NiveauMessage.ATTENTION: "!", + NiveauMessage.BLOCAGE: "?", +} + + +@dataclass +class MessageUtilisateur: + """Un message prêt à être affiché à l'utilisateur. + + Attributes: + niveau: Hiérarchie (info/attention/blocage) + titre: Titre court de la notification (≤60 caractères) + corps: Corps du message en français naturel + duree_s: Durée d'affichage recommandée (secondes) + persistent: Si True, l'utilisateur doit fermer manuellement + """ + + niveau: NiveauMessage + titre: str + corps: str + duree_s: int + persistent: bool = False + + def to_dict(self) -> dict: + """Sérialiser le message (utile pour les tests et le logging).""" + return { + "niveau": self.niveau.value, + "titre": self.titre, + "corps": self.corps, + "duree_s": self.duree_s, + "persistent": self.persistent, + } + + +# ============================================================================ +# Helpers d'extraction +# ============================================================================ + + +def _extraire_nom_application(titre_fenetre: str) -> str: + """Extraire le nom de l'application à partir d'un titre de fenêtre. + + Les titres Windows suivent généralement le format : + "Document.txt – Bloc-notes" + "Ma Page - Google Chrome" + "Sans titre — Paint" + + On retourne la partie après le dernier séparateur, ou le titre entier. + """ + if not titre_fenetre: + return "" + titre = titre_fenetre.strip() + # Chercher le dernier séparateur parmi " – ", " — ", " - " + for sep in (" – ", " — ", " - "): + if sep in titre: + return titre.rsplit(sep, 1)[-1].strip() + return titre + + +def _nettoyer_description_cible(description: str) -> str: + """Nettoyer la description technique d'une cible pour l'afficher. + + Supprime les caractères techniques (guillemets inutiles, ':'). + """ + if not description: + return "" + desc = description.strip() + # Retirer les guillemets encapsulants + desc = desc.strip("'\"`") + # Limiter la longueur + if len(desc) > 80: + desc = desc[:77] + "..." + return desc + + +# ============================================================================ +# Formattage des messages techniques → humains +# ============================================================================ + + +def formatter_cible_non_trouvee( + description_cible: str, + titre_fenetre: Optional[str] = None, +) -> MessageUtilisateur: + """Message quand Léa ne trouve pas un élément à cliquer. + + Exemple avant : + target_not_found: 'bonjour' dans *bonjour, – Bloc-notes + Exemple après : + Léa a besoin d'aide + Je ne trouve pas "bonjour" dans le Bloc-notes. Peux-tu cliquer + dessus toi-même ? Je reprends ensuite. + """ + cible = _nettoyer_description_cible(description_cible) or "l'élément" + app = _extraire_nom_application(titre_fenetre or "") + + if app: + corps = ( + f"Je ne trouve pas « {cible} » dans {app}. " + f"Peux-tu cliquer dessus toi-même ? Je reprends ensuite." + ) + else: + corps = ( + f"Je ne trouve pas « {cible} » à l'écran. " + f"Peux-tu le faire toi-même ? Je reprends ensuite." + ) + + return MessageUtilisateur( + niveau=NiveauMessage.BLOCAGE, + titre="Léa a besoin d'aide", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE], + persistent=True, + ) + + +def formatter_fenetre_incorrecte( + titre_actuel: str, + titre_attendu: str, +) -> MessageUtilisateur: + """Message quand la fenêtre active n'est pas celle attendue. + + Exemple avant : + Fenêtre incorrecte: 'Program Manager' (attendu: 'Lea : Explorateur de fichiers') + Exemple après : + Léa attend une fenêtre + J'attends « Explorateur de fichiers » mais c'est « Program Manager » + qui est affiché. Peux-tu ouvrir la bonne fenêtre ? + """ + app_actuelle = _extraire_nom_application(titre_actuel) or "une autre fenêtre" + app_attendue = _extraire_nom_application(titre_attendu) or titre_attendu + + corps = ( + f"J'attends « {app_attendue} » mais c'est « {app_actuelle} » " + f"qui est affiché. Peux-tu ouvrir la bonne fenêtre ?" + ) + + return MessageUtilisateur( + niveau=NiveauMessage.BLOCAGE, + titre="Léa attend une fenêtre", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE], + persistent=True, + ) + + +def formatter_ecran_inchange(action_type: str = "") -> MessageUtilisateur: + """Message quand l'action n'a pas eu d'effet visible. + + Exemple avant : + Ecran inchange apres l'action + Exemple après : + Léa vérifie + Mon clic n'a pas eu l'air de marcher. Je vais réessayer ou te + rendre la main si ça ne passe pas. + """ + actions_fr = { + "click": "Mon clic", + "type": "Ma saisie", + "key_combo": "Mon raccourci clavier", + "scroll": "Mon défilement", + } + quoi = actions_fr.get(action_type, "Mon action") + + corps = ( + f"{quoi} n'a pas eu l'air de marcher. Je vais réessayer, " + f"ou te rendre la main si ça ne passe pas." + ) + + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa vérifie", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + +def formatter_connexion_perdue(hote_serveur: str = "") -> MessageUtilisateur: + """Message quand la connexion avec le serveur est perdue. + + Rassurant : on dit qu'on va réessayer automatiquement. + """ + corps = ( + "J'ai perdu le lien avec le serveur. Je retente automatiquement, " + "pas besoin d'intervenir." + ) + + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa est déconnectée", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + +def formatter_connexion_retablie() -> MessageUtilisateur: + """Message quand la connexion serveur est rétablie.""" + return MessageUtilisateur( + niveau=NiveauMessage.INFO, + titre="Léa", + corps="C'est bon, la connexion est revenue. Je continue.", + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.INFO], + ) + + +def formatter_debut_workflow(nom_workflow: str, nb_etapes: int = 0) -> MessageUtilisateur: + """Message au démarrage d'un workflow de replay.""" + if nb_etapes > 0: + corps = ( + f"Je démarre « {nom_workflow} » ({nb_etapes} étapes). " + f"Je t'indique mon avancement." + ) + else: + corps = f"Je démarre « {nom_workflow} ». Je t'indique mon avancement." + + return MessageUtilisateur( + niveau=NiveauMessage.INFO, + titre="Léa démarre", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.INFO], + ) + + +def formatter_etape_workflow( + etape_actuelle: int, + nb_etapes: int, + description: str = "", +) -> MessageUtilisateur: + """Message pour la progression d'une étape.""" + if description: + desc = _nettoyer_description_cible(description) + corps = f"Étape {etape_actuelle}/{nb_etapes} — {desc}" + else: + corps = f"Étape {etape_actuelle}/{nb_etapes}" + + return MessageUtilisateur( + niveau=NiveauMessage.INFO, + titre="Léa avance", + corps=corps, + duree_s=3, + ) + + +def formatter_retry(action_type: str = "", tentative: int = 2) -> MessageUtilisateur: + """Message quand Léa retente une action.""" + corps = ( + f"Je retente (tentative {tentative}). Ça arrive parfois, " + f"l'écran était peut-être en cours de chargement." + ) + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa retente", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + +def formatter_ralentissement() -> MessageUtilisateur: + """Message quand Léa prend plus de temps que prévu.""" + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa prend son temps", + corps="Je vais plus lentement que prévu. L'écran met du temps à répondre.", + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + +def formatter_fin_workflow( + succes: bool, + nom_workflow: str = "", + nb_etapes: int = 0, + duree_s: float = 0.0, +) -> MessageUtilisateur: + """Message à la fin d'un workflow.""" + if succes: + if nom_workflow and nb_etapes > 0: + corps = ( + f"C'est fait ! « {nom_workflow} » est terminé " + f"({nb_etapes} étapes en {int(duree_s)}s)." + ) + else: + corps = "C'est fait ! Tout s'est bien passé." + return MessageUtilisateur( + niveau=NiveauMessage.INFO, + titre="Léa a terminé", + corps=corps, + duree_s=6, + ) + else: + corps = ( + "Je n'ai pas pu terminer. Je te rends la main, " + "tu peux continuer à partir de là où je me suis arrêtée." + ) + return MessageUtilisateur( + niveau=NiveauMessage.BLOCAGE, + titre="Léa s'arrête", + corps=corps, + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE], + persistent=True, + ) + + +def formatter_erreur_generique(message_technique: str) -> MessageUtilisateur: + """Formater un message d'erreur technique non catégorisé. + + On essaie de détecter les motifs connus dans le message technique pour + le router vers le bon formatter spécialisé, sinon on emballe le message. + """ + if not message_technique: + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa", + corps="J'ai rencontré un petit souci. Je continue.", + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + msg_lower = message_technique.lower() + + # target_not_found[:...] + if "target_not_found" in msg_lower: + # Essayer d'extraire la description après le ':' + match = re.match(r"target_not_found[:\s]*(.*)", message_technique, re.IGNORECASE) + desc = match.group(1).strip() if match else "" + return formatter_cible_non_trouvee(desc) + + # Fenêtre incorrecte: 'X' (attendu: 'Y') + if "fenêtre incorrecte" in msg_lower or "fenetre incorrecte" in msg_lower: + # Extraire actuel et attendu + m_actuel = re.search(r"[:,]\s*['\"]([^'\"]+)['\"]", message_technique) + m_attendu = re.search(r"attendu[:\s]*['\"]([^'\"]+)['\"]", message_technique) + actuel = m_actuel.group(1) if m_actuel else "" + attendu = m_attendu.group(1) if m_attendu else "" + return formatter_fenetre_incorrecte(actuel, attendu) + + # Ecran inchangé + if "inchang" in msg_lower or "no_screen_change" in msg_lower: + return formatter_ecran_inchange() + + # Policy abort / supervise + if "policy_abort" in msg_lower or "visual_resolve_failed" in msg_lower: + return formatter_cible_non_trouvee(message_technique) + + # Fallback : message technique tronqué + msg_tronque = message_technique.strip() + if len(msg_tronque) > 120: + msg_tronque = msg_tronque[:117] + "..." + + return MessageUtilisateur( + niveau=NiveauMessage.ATTENTION, + titre="Léa", + corps=f"J'ai rencontré un souci : {msg_tronque}", + duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION], + ) + + +# ============================================================================ +# Détection fenêtre Léa (utilisé par l'executor pour ignorer sa propre UI) +# ============================================================================ + + +# Motifs qui identifient une fenêtre appartenant à Léa (l'agent lui-même). +# On utilise des regex avec \b pour éviter les faux positifs sur des noms +# contenant "lea" (ex: "cléa.txt", "leapfrog", "replay"). +_MOTIFS_FENETRE_LEA_REGEX = ( + r"\bléa\b", + r"\blea\b(?!p)", # "lea" mot entier, pas "leapfrog" + r"lea\s*[—–\-:]", # "Lea —", "Lea -", "Lea :" + r"léa\s*[—–\-:]", + r"\bassistante ia\b", + r"\bléa ia\b", + r"\blea ia\b", +) + + +def est_fenetre_lea(titre_fenetre: str) -> bool: + """Détecter si un titre de fenêtre appartient à l'agent Léa lui-même. + + Utilisé pour éviter que Léa ne se considère comme une fenêtre intrusive + dans ses propres pré-vérifications. + + Utilise des regex avec des word boundaries pour éviter les faux positifs + sur des noms de fichiers contenant "lea" (ex: "cléa.txt", "replay.log"). + """ + if not titre_fenetre: + return False + titre_lower = titre_fenetre.lower().strip() + return any(re.search(motif, titre_lower) for motif in _MOTIFS_FENETRE_LEA_REGEX) + + +# Conservé pour rétro-compatibilité avec le code qui listait MOTIFS_FENETRE_LEA +MOTIFS_FENETRE_LEA = ( + "léa", + "lea —", + "léa —", + "lea -", + "léa -", + "lea assistante", + "léa assistante", + "lea : ", + "léa : ", + "assistante ia", +) diff --git a/agent_v0/agent_v1/ui/notifications.py b/agent_v0/agent_v1/ui/notifications.py index bd8ff12b9..111e12875 100644 --- a/agent_v0/agent_v1/ui/notifications.py +++ b/agent_v0/agent_v1/ui/notifications.py @@ -5,6 +5,14 @@ Utilise plyer pour les notifications système, sans dépendance PyQt5. Remplace les dialogues Qt par des toasts non-bloquants. Thread-safe avec rate limiting (1 notification / 2 secondes max). + +Les messages utilisateur sont formatés via `agent_v1.ui.messages` qui convertit +les codes techniques (target_not_found, etc.) en français naturel. + +Hiérarchie des notifications (cf. messages.NiveauMessage) : + - INFO : auto-dismiss en ~4s, rate-limité classique + - ATTENTION : auto-dismiss en ~7s, rate-limité classique + - BLOCAGE : persistant (15s+), bypass du rate limit """ import logging @@ -12,6 +20,22 @@ import threading import time from typing import Optional +from .messages import ( + MessageUtilisateur, + NiveauMessage, + formatter_cible_non_trouvee, + formatter_connexion_perdue, + formatter_connexion_retablie, + formatter_debut_workflow, + formatter_ecran_inchange, + formatter_erreur_generique, + formatter_etape_workflow, + formatter_fenetre_incorrecte, + formatter_fin_workflow, + formatter_ralentissement, + formatter_retry, +) + logger = logging.getLogger(__name__) # Import conditionnel de plyer — fallback silencieux si absent @@ -59,7 +83,13 @@ class NotificationManager: # Méthode générique # ------------------------------------------------------------------ # - def notify(self, title: str, message: str, timeout: int = 5) -> bool: + def notify( + self, + title: str, + message: str, + timeout: int = 5, + bypass_rate_limit: bool = False, + ) -> bool: """ Affiche une notification toast. @@ -67,6 +97,8 @@ class NotificationManager: title: Titre de la notification. message: Corps du message. timeout: Durée d'affichage en secondes. + bypass_rate_limit: Si True, ignore le rate limit (pour les blocages + importants qui ne doivent pas être écrasés). Returns: True si la notification a été envoyée, False sinon @@ -76,17 +108,21 @@ class NotificationManager: logger.debug("Notification ignorée (plyer absent) : %s", title) return False - with self._lock: - now = time.monotonic() - elapsed = now - self._last_notification_time - if elapsed < RATE_LIMIT_SECONDS: - logger.debug( - "Notification ignorée (rate limit, %.1fs restantes) : %s", - RATE_LIMIT_SECONDS - elapsed, - title, - ) - return False - self._last_notification_time = now + if not bypass_rate_limit: + with self._lock: + now = time.monotonic() + elapsed = now - self._last_notification_time + if elapsed < RATE_LIMIT_SECONDS: + logger.debug( + "Notification ignorée (rate limit, %.1fs restantes) : %s", + RATE_LIMIT_SECONDS - elapsed, + title, + ) + return False + self._last_notification_time = now + else: + with self._lock: + self._last_notification_time = time.monotonic() # Envoi dans un thread dédié pour ne jamais bloquer l'appelant thread = threading.Thread( @@ -97,6 +133,39 @@ class NotificationManager: thread.start() return True + def notify_message(self, msg: MessageUtilisateur) -> bool: + """Envoyer un MessageUtilisateur structuré (niveau, titre, corps). + + Les messages BLOCAGE bypass le rate limit pour garantir que + l'utilisateur voit qu'on a besoin de lui. + """ + bypass = msg.niveau == NiveauMessage.BLOCAGE + # Log aussi pour tracer dans les logs fichiers + self._log_message(msg) + return self.notify( + title=msg.titre, + message=msg.corps, + timeout=msg.duree_s, + bypass_rate_limit=bypass, + ) + + @staticmethod + def _log_message(msg: MessageUtilisateur) -> None: + """Logger un message utilisateur avec le niveau approprié. + + Les logs agents sont plus lisibles quand on route info → INFO, + attention → WARNING, blocage → ERROR, avec un préfixe [LEA]. + """ + prefix = f"[LEA] {msg.titre}: {msg.corps}" + if msg.niveau == NiveauMessage.INFO: + logger.info(prefix) + elif msg.niveau == NiveauMessage.ATTENTION: + logger.warning(prefix) + elif msg.niveau == NiveauMessage.BLOCAGE: + logger.error(prefix) + else: + logger.info(prefix) + def _send(self, title: str, message: str, timeout: int) -> None: """Envoi effectif de la notification (exécuté dans un thread dédié).""" try: @@ -180,40 +249,79 @@ class NotificationManager: timeout=3, ) - def replay_finished(self, success: bool, workflow_name: str) -> bool: - """Notification de fin de replay (succès ou échec).""" - if success: - return self.notify( - title=APP_NAME, - message="C'est fait ! Tout s'est bien passé.", - timeout=5, - ) - else: - return self.notify( - title=APP_NAME, - message="Hmm, j'ai eu un souci. Vous pouvez me remontrer ?", - timeout=7, - ) + def replay_target_not_found( + self, + target_description: str, + window_title: Optional[str] = None, + ) -> bool: + """Notification quand un élément n'est pas trouvé pendant le replay. - def connection_changed(self, connected: bool, server_host: str) -> bool: + Le replay est mis en pause et attend une intervention humaine. + Utilise `messages.formatter_cible_non_trouvee` pour un message en + français naturel. + """ + msg = formatter_cible_non_trouvee(target_description, window_title) + return self.notify_message(msg) + + def replay_wrong_window(self, current_title: str, expected_title: str) -> bool: + """Notification quand la fenêtre active n'est pas celle attendue.""" + msg = formatter_fenetre_incorrecte(current_title, expected_title) + return self.notify_message(msg) + + def replay_no_screen_change(self, action_type: str = "") -> bool: + """Notification quand une action n'a pas eu d'effet visible.""" + msg = formatter_ecran_inchange(action_type) + return self.notify_message(msg) + + def replay_retry(self, action_type: str = "", tentative: int = 2) -> bool: + """Notification quand Léa retente une action.""" + msg = formatter_retry(action_type, tentative) + return self.notify_message(msg) + + def replay_slow(self) -> bool: + """Notification quand Léa va plus lentement que prévu.""" + msg = formatter_ralentissement() + return self.notify_message(msg) + + def replay_finished( + self, + success: bool, + workflow_name: str, + step_count: int = 0, + duration_s: float = 0.0, + ) -> bool: + """Notification de fin de replay (succès ou échec).""" + msg = formatter_fin_workflow(success, workflow_name, step_count, duration_s) + return self.notify_message(msg) + + def replay_workflow_started(self, workflow_name: str, step_count: int = 0) -> bool: + """Notification de début de workflow (remplace `replay_started`).""" + msg = formatter_debut_workflow(workflow_name, step_count) + return self.notify_message(msg) + + def replay_step_progress( + self, + current: int, + total: int, + description: str = "", + ) -> bool: + """Notification de progression d'une étape (niveau INFO).""" + msg = formatter_etape_workflow(current, total, description) + return self.notify_message(msg) + + def connection_changed(self, connected: bool, server_host: str = "") -> bool: """Notification de changement d'état de la connexion serveur.""" if connected: - return self.notify( - title=APP_NAME, - message="Connectée au serveur.", - timeout=5, - ) + msg = formatter_connexion_retablie() else: - return self.notify( - title=APP_NAME, - message="J'ai perdu la connexion avec le serveur.", - timeout=7, - ) + msg = formatter_connexion_perdue(server_host) + return self.notify_message(msg) def error(self, message: str) -> bool: - """Notification d'erreur.""" - return self.notify( - title=APP_NAME, - message=f"Oups, un problème : {message}", - timeout=10, - ) + """Notification d'erreur générique. + + Essaie d'abord de détecter un motif technique connu et de formater + correctement, sinon fallback sur un message générique aidant. + """ + msg = formatter_erreur_generique(message) + return self.notify_message(msg) diff --git a/tests/unit/test_lea_notifications.py b/tests/unit/test_lea_notifications.py new file mode 100644 index 000000000..696100699 --- /dev/null +++ b/tests/unit/test_lea_notifications.py @@ -0,0 +1,727 @@ +"""Tests unitaires pour l'UX de Léa (notifications, messages, activity panel). + +Couvre : +- Formatage des messages techniques → français naturel (module messages.py) +- Hiérarchie info/attention/blocage +- Détection de la fenêtre Léa +- NotificationManager avec plyer mocké +- ActivityPanel sans tkinter (fallback silencieux) + +Ces tests ne nécessitent ni tkinter ni plyer : tout est mocké ou géré en +fallback silencieux. Ils doivent passer sur toutes les plateformes. + +Auteur: Dom, avril 2026 +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +# Assurer que la racine du projet est dans le path (comme conftest) +ROOT = Path(__file__).resolve().parents[2] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from agent_v0.agent_v1.ui import activity_panel, messages, notifications +from agent_v0.agent_v1.ui.activity_panel import ActivityPanel, EtatLea, reset_activity_panel +from agent_v0.agent_v1.ui.messages import ( + MessageUtilisateur, + NiveauMessage, + _extraire_nom_application, + _nettoyer_description_cible, + est_fenetre_lea, + formatter_cible_non_trouvee, + formatter_connexion_perdue, + formatter_connexion_retablie, + formatter_debut_workflow, + formatter_ecran_inchange, + formatter_erreur_generique, + formatter_etape_workflow, + formatter_fenetre_incorrecte, + formatter_fin_workflow, + formatter_ralentissement, + formatter_retry, +) +from agent_v0.agent_v1.ui.notifications import NotificationManager + + +# ============================================================================ +# Tests : helpers d'extraction +# ============================================================================ + + +class TestExtraction: + """Tests des helpers _extraire_nom_application et _nettoyer_description_cible.""" + + def test_extraire_app_avec_em_dash(self): + assert _extraire_nom_application("Document.txt – Bloc-notes") == "Bloc-notes" + + def test_extraire_app_avec_em_dash_long(self): + assert _extraire_nom_application("Ma Page — Google Chrome") == "Google Chrome" + + def test_extraire_app_avec_dash_simple(self): + assert _extraire_nom_application("Session 1 - Firefox") == "Firefox" + + def test_extraire_app_sans_separateur(self): + assert _extraire_nom_application("Bloc-notes") == "Bloc-notes" + + def test_extraire_app_vide(self): + assert _extraire_nom_application("") == "" + assert _extraire_nom_application(None) == "" + + def test_extraire_app_garde_dernier_separateur(self): + # Cas multi-séparateurs : on garde la dernière partie + assert _extraire_nom_application("A - B - C") == "C" + + def test_nettoyer_description_retire_guillemets(self): + assert _nettoyer_description_cible("'bonjour'") == "bonjour" + assert _nettoyer_description_cible('"bonjour"') == "bonjour" + assert _nettoyer_description_cible("`code`") == "code" + + def test_nettoyer_description_vide(self): + assert _nettoyer_description_cible("") == "" + assert _nettoyer_description_cible(None) == "" + + def test_nettoyer_description_tronque(self): + longue = "x" * 200 + resultat = _nettoyer_description_cible(longue) + assert len(resultat) <= 80 + assert resultat.endswith("...") + + +# ============================================================================ +# Tests : détection fenêtre Léa +# ============================================================================ + + +class TestFenetreLea: + """Tests de est_fenetre_lea — crucial pour la robustesse.""" + + @pytest.mark.parametrize("titre", [ + "Léa", + "Léa — Assistante IA", + "Lea - Assistante", + "Léa — Activité", + "Lea : Explorateur de fichiers", + "LÉA — ASSISTANTE IA", # casse mixte + "Léa assistante", + "Assistante IA", + ]) + def test_detecte_fenetres_lea(self, titre): + assert est_fenetre_lea(titre), f"Devrait détecter : {titre!r}" + + @pytest.mark.parametrize("titre", [ + "Bloc-notes", + "Google Chrome", + "Program Manager", + "Microsoft Word - Document1", + "Sans titre - Paint", + "", + "cléa.txt", # contient "léa" mais c'est un fichier + "replay.log", # contient "lea" + "leapfrog.exe", # contient "lea" + "nucleaire.pdf", # contient "lea" + ]) + def test_ignore_fenetres_non_lea(self, titre): + """Les faux positifs sur des noms contenant 'lea' doivent être évités + grâce aux word boundaries regex.""" + assert not est_fenetre_lea(titre), f"Ne devrait pas détecter : {titre!r}" + + def test_titre_none(self): + assert est_fenetre_lea(None) is False + + def test_espaces_en_trop(self): + assert est_fenetre_lea(" Léa — Assistante IA ") is True + + +# ============================================================================ +# Tests : formatage des messages techniques → humains +# ============================================================================ + + +class TestFormatterCibleNonTrouvee: + """Tests du formatage quand un élément n'est pas trouvé.""" + + def test_message_blocage(self): + msg = formatter_cible_non_trouvee("bonjour", "Document – Bloc-notes") + assert msg.niveau == NiveauMessage.BLOCAGE + assert msg.persistent is True + assert "besoin d'aide" in msg.titre.lower() + + def test_message_contient_nom_element(self): + msg = formatter_cible_non_trouvee("Rechercher", "Chrome") + assert "rechercher" in msg.corps.lower() + + def test_message_contient_nom_application(self): + msg = formatter_cible_non_trouvee("bonjour", "Doc – Bloc-notes") + assert "bloc-notes" in msg.corps.lower() + + def test_message_action_orientee(self): + """Le message doit proposer une action à l'utilisateur.""" + msg = formatter_cible_non_trouvee("bouton", "App") + corps_lower = msg.corps.lower() + # Doit contenir un verbe d'action type "cliquer", "faire" + assert any(verb in corps_lower for verb in ["cliqu", "faire", "peux-tu"]) + + def test_sans_fenetre(self): + msg = formatter_cible_non_trouvee("Submit", None) + assert msg.niveau == NiveauMessage.BLOCAGE + assert "submit" in msg.corps.lower() + + def test_description_vide(self): + msg = formatter_cible_non_trouvee("", "App") + # Doit quand même produire un message utilisable + assert msg.corps + assert msg.niveau == NiveauMessage.BLOCAGE + + def test_message_techniques_nettoyes(self): + """Pas de '__target_not_found__' ni code technique visible.""" + msg = formatter_cible_non_trouvee("'bonjour'", "Bloc-notes") + assert "target_not_found" not in msg.corps + # Les guillemets techniques sont nettoyés, mais on en ajoute des français + assert "bonjour" in msg.corps + + +class TestFormatterFenetreIncorrecte: + """Tests du formatage quand la mauvaise fenêtre est active.""" + + def test_message_blocage_persistent(self): + msg = formatter_fenetre_incorrecte( + "Program Manager", + "Lea : Explorateur de fichiers", + ) + assert msg.niveau == NiveauMessage.BLOCAGE + assert msg.persistent is True + + def test_mentionne_fenetre_attendue(self): + msg = formatter_fenetre_incorrecte("Program Manager", "Chrome") + assert "chrome" in msg.corps.lower() + + def test_mentionne_fenetre_actuelle(self): + msg = formatter_fenetre_incorrecte("Program Manager", "Chrome") + assert "program manager" in msg.corps.lower() + + def test_suggere_action(self): + msg = formatter_fenetre_incorrecte("A", "B") + # Propose d'ouvrir la bonne fenêtre + assert "ouvr" in msg.corps.lower() or "fenêtre" in msg.corps.lower() + + +class TestFormatterEcranInchange: + """Tests du formatage quand l'écran ne change pas après une action.""" + + def test_niveau_attention(self): + """L'écran inchangé est de niveau ATTENTION, pas BLOCAGE.""" + msg = formatter_ecran_inchange("click") + assert msg.niveau == NiveauMessage.ATTENTION + + def test_message_pour_click(self): + msg = formatter_ecran_inchange("click") + assert "clic" in msg.corps.lower() + + def test_message_pour_type(self): + msg = formatter_ecran_inchange("type") + assert "saisie" in msg.corps.lower() + + def test_message_pour_key_combo(self): + msg = formatter_ecran_inchange("key_combo") + assert "raccourci" in msg.corps.lower() + + def test_sans_type_action(self): + msg = formatter_ecran_inchange("") + assert msg.corps # Doit quand même produire quelque chose + + def test_pas_persistent(self): + msg = formatter_ecran_inchange("click") + assert msg.persistent is False + + +class TestFormatterConnexion: + """Tests des messages de connexion serveur.""" + + def test_connexion_perdue_attention(self): + msg = formatter_connexion_perdue("localhost") + assert msg.niveau == NiveauMessage.ATTENTION + + def test_connexion_perdue_rassurante(self): + """Le message doit rassurer (reconnexion automatique).""" + msg = formatter_connexion_perdue() + assert "automatique" in msg.corps.lower() or "retent" in msg.corps.lower() + + def test_connexion_retablie_info(self): + msg = formatter_connexion_retablie() + assert msg.niveau == NiveauMessage.INFO + + def test_connexion_retablie_positive(self): + msg = formatter_connexion_retablie() + assert "bon" in msg.corps.lower() or "revenue" in msg.corps.lower() + + +class TestFormatterWorkflow: + """Tests des messages de workflow (début, étape, fin).""" + + def test_debut_avec_etapes(self): + msg = formatter_debut_workflow("Saisie patient", 15) + assert msg.niveau == NiveauMessage.INFO + assert "saisie patient" in msg.corps.lower() + assert "15" in msg.corps + + def test_debut_sans_etapes(self): + msg = formatter_debut_workflow("Backup") + assert msg.niveau == NiveauMessage.INFO + assert "backup" in msg.corps.lower() + + def test_etape_progression(self): + msg = formatter_etape_workflow(3, 15, "Clic sur Valider") + assert "3" in msg.corps + assert "15" in msg.corps + assert "valider" in msg.corps.lower() + + def test_etape_sans_description(self): + msg = formatter_etape_workflow(5, 20) + assert "5" in msg.corps + assert "20" in msg.corps + + def test_fin_succes(self): + msg = formatter_fin_workflow(True, "Ma tâche", 10, 45.0) + assert msg.niveau == NiveauMessage.INFO + assert "terminé" in msg.corps.lower() or "fait" in msg.corps.lower() + + def test_fin_echec_blocage(self): + msg = formatter_fin_workflow(False, "Ma tâche") + assert msg.niveau == NiveauMessage.BLOCAGE + assert msg.persistent is True + + +class TestFormatterRetryRalentissement: + """Tests des messages de retry et ralentissement.""" + + def test_retry_attention(self): + msg = formatter_retry("click", 2) + assert msg.niveau == NiveauMessage.ATTENTION + assert "2" in msg.corps # numéro de tentative + + def test_ralentissement_attention(self): + msg = formatter_ralentissement() + assert msg.niveau == NiveauMessage.ATTENTION + assert "lent" in msg.corps.lower() + + +class TestFormatterErreurGenerique: + """Tests du router formatter_erreur_generique → spécialisé.""" + + def test_detecte_target_not_found(self): + msg = formatter_erreur_generique("target_not_found: 'bouton'") + assert msg.niveau == NiveauMessage.BLOCAGE + assert "bouton" in msg.corps.lower() + + def test_detecte_fenetre_incorrecte(self): + msg = formatter_erreur_generique( + "Fenêtre incorrecte: 'Program Manager' (attendu: 'Chrome')" + ) + assert msg.niveau == NiveauMessage.BLOCAGE + assert "chrome" in msg.corps.lower() or "program manager" in msg.corps.lower() + + def test_detecte_ecran_inchange(self): + msg = formatter_erreur_generique("Ecran inchange apres l'action") + assert msg.niveau == NiveauMessage.ATTENTION + + def test_detecte_no_screen_change(self): + msg = formatter_erreur_generique("no_screen_change after click") + assert msg.niveau == NiveauMessage.ATTENTION + + def test_detecte_policy_abort(self): + msg = formatter_erreur_generique("policy_abort:target_desc_x") + assert msg.niveau == NiveauMessage.BLOCAGE + + def test_message_vide(self): + msg = formatter_erreur_generique("") + assert msg.corps + assert msg.niveau == NiveauMessage.ATTENTION + + def test_message_inconnu_tronque(self): + long_msg = "erreur très longue " * 20 + msg = formatter_erreur_generique(long_msg) + assert len(msg.corps) <= 200 # tronqué avec "..." + + def test_pas_de_code_technique_dans_message_utilisateur(self): + """Les messages présentés à l'utilisateur ne doivent pas contenir de + noms de variables, de fonctions, ou de types Python.""" + msg = formatter_erreur_generique("target_not_found: 'bouton'") + # Le code technique ne doit pas apparaître tel quel dans le corps + assert "target_not_found" not in msg.corps + + +# ============================================================================ +# Tests : hiérarchie NiveauMessage +# ============================================================================ + + +class TestHierarchieNiveau: + """Tests de la hiérarchie info/attention/blocage.""" + + def test_niveau_info_duree_courte(self): + msg = formatter_connexion_retablie() + assert msg.niveau == NiveauMessage.INFO + assert msg.duree_s <= 6 + + def test_niveau_attention_duree_moyenne(self): + msg = formatter_ecran_inchange("click") + assert msg.niveau == NiveauMessage.ATTENTION + assert 5 <= msg.duree_s <= 10 + + def test_niveau_blocage_duree_longue_persistent(self): + msg = formatter_cible_non_trouvee("x", "y") + assert msg.niveau == NiveauMessage.BLOCAGE + assert msg.duree_s >= 10 + assert msg.persistent is True + + def test_niveau_info_non_persistent(self): + msg = formatter_debut_workflow("test") + assert msg.persistent is False + + def test_to_dict_serialisation(self): + msg = MessageUtilisateur( + niveau=NiveauMessage.INFO, + titre="Test", + corps="Corps", + duree_s=5, + ) + d = msg.to_dict() + assert d["niveau"] == "info" + assert d["titre"] == "Test" + assert d["corps"] == "Corps" + assert d["duree_s"] == 5 + assert d["persistent"] is False + + +# ============================================================================ +# Tests : NotificationManager (avec plyer mocké) +# ============================================================================ + + +class TestNotificationManager: + """Tests du NotificationManager avec plyer mocké. + + Ces tests ne dépendent pas de l'environnement : plyer est patché pour + qu'on puisse vérifier les appels sans afficher de vraies notifications. + """ + + def test_instanciation(self): + mgr = NotificationManager() + assert mgr is not None + + def test_notify_sans_plyer(self, monkeypatch): + """Si plyer n'est pas dispo, notify() retourne False sans crasher.""" + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", False) + mgr = NotificationManager() + assert mgr.notify("titre", "message") is False + + def test_notify_avec_plyer_mocke(self, monkeypatch): + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + result = mgr.notify("Titre", "Message", timeout=5) + assert result is True + # L'envoi est asynchrone, laissons le thread démarrer + time.sleep(0.1) + mock_plyer.notify.assert_called_once() + + def test_rate_limit(self, monkeypatch): + """Le rate limit bloque les notifications trop rapprochées.""" + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + assert mgr.notify("T1", "M1") is True + # Immédiatement après → bloqué + assert mgr.notify("T2", "M2") is False + + def test_bypass_rate_limit_pour_blocage(self, monkeypatch): + """Les messages BLOCAGE bypass le rate limit.""" + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + assert mgr.notify("T1", "M1") is True + # Sans bypass → bloqué + assert mgr.notify("T2", "M2") is False + # Avec bypass → passe + assert mgr.notify("T3", "M3", bypass_rate_limit=True) is True + + def test_notify_message_niveau_blocage_bypass(self, monkeypatch): + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + # Occuper le rate limit + mgr.notify("T0", "M0") + # Message BLOCAGE doit passer même pendant le rate limit + msg_blocage = formatter_cible_non_trouvee("x", "y") + assert mgr.notify_message(msg_blocage) is True + + def test_replay_target_not_found_avec_titre(self, monkeypatch): + """L'API spécialisée produit un message contenant le nom d'app.""" + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + mgr.replay_target_not_found("Rechercher", "Document – Bloc-notes") + time.sleep(0.1) + # Vérifier qu'on a bien envoyé un message qui mentionne l'app + args, kwargs = mock_plyer.notify.call_args + message_envoye = kwargs.get("message", "") + assert "bloc-notes" in message_envoye.lower() + assert "rechercher" in message_envoye.lower() + + def test_replay_wrong_window(self, monkeypatch): + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + mgr.replay_wrong_window("Program Manager", "Chrome") + time.sleep(0.1) + args, kwargs = mock_plyer.notify.call_args + titre = kwargs.get("title", "") + # Le titre doit indiquer l'attente d'une fenêtre + assert "fenêtre" in titre.lower() or "attend" in titre.lower() + + def test_error_route_vers_formatter_specialise(self, monkeypatch): + """error() détecte 'target_not_found' et produit un message de blocage.""" + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + mgr.error("target_not_found: 'bonjour'") + time.sleep(0.1) + mock_plyer.notify.assert_called_once() + args, kwargs = mock_plyer.notify.call_args + # Le message envoyé doit être en français naturel, pas le code brut + message_envoye = kwargs.get("message", "") + assert "target_not_found" not in message_envoye + + def test_backward_compat_connection_changed(self, monkeypatch): + """L'API existante connection_changed reste fonctionnelle.""" + mock_plyer = MagicMock() + monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True) + monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer) + + mgr = NotificationManager() + # Déconnexion + mgr.connection_changed(False, "localhost") + time.sleep(0.1) + assert mock_plyer.notify.called + + +# ============================================================================ +# Tests : ActivityPanel (sans tkinter) +# ============================================================================ + + +class TestActivityPanelFallback: + """Tests du panel d'activité en mode fallback (sans tkinter).""" + + def setup_method(self): + reset_activity_panel() + + def teardown_method(self): + reset_activity_panel() + + def test_creation_sans_ui(self): + """Le panel peut être créé sans UI (activer_ui=False).""" + panel = ActivityPanel(activer_ui=False) + assert panel is not None + + def test_snapshot_initial(self): + panel = ActivityPanel(activer_ui=False) + snap = panel.snapshot() + assert snap.etat == EtatLea.INACTIVE + assert snap.nom_workflow == "" + assert snap.etape == 0 + + def test_definir_workflow(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", nb_etapes=10) + snap = panel.snapshot() + assert snap.nom_workflow == "Test" + assert snap.nb_etapes == 10 + assert snap.etat == EtatLea.OBSERVE + assert snap.debut_timestamp > 0 + + def test_mettre_a_jour_etape(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.mettre_a_jour(etat=EtatLea.AGIT, action="Clic", etape=3) + snap = panel.snapshot() + assert snap.etat == EtatLea.AGIT + assert snap.action_courante == "Clic" + assert snap.etape == 3 + + def test_mettre_a_jour_partiel(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.mettre_a_jour(etape=5) + snap = panel.snapshot() + assert snap.etape == 5 + # L'état reste OBSERVE (non modifié) + assert snap.etat == EtatLea.OBSERVE + + def test_progression_texte(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.mettre_a_jour(etape=3) + snap = panel.snapshot() + assert snap.progression_texte() == "3/10" + + def test_progression_texte_sans_nb_etapes(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", nb_etapes=0) + snap = panel.snapshot() + assert snap.progression_texte() == "" + + def test_temps_ecoule(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + time.sleep(0.05) + snap = panel.snapshot() + assert snap.temps_ecoule_s() >= 0.05 + + def test_temps_ecoule_texte_secondes(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + snap = panel.snapshot() + # Format "Xs" pour < 60s + texte = snap.temps_ecoule_texte() + assert texte.endswith("s") + + def test_terminer_succes(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.terminer(succes=True) + snap = panel.snapshot() + assert snap.etat == EtatLea.TERMINE + + def test_terminer_echec(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.terminer(succes=False) + snap = panel.snapshot() + assert snap.etat == EtatLea.BLOQUEE + assert snap.dernier_message # Un message par défaut est mis + + def test_reinitialiser(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Test", 10) + panel.reinitialiser() + snap = panel.snapshot() + assert snap.etat == EtatLea.INACTIVE + assert snap.nom_workflow == "" + + def test_listener_appele_sur_changement(self): + panel = ActivityPanel(activer_ui=False) + calls = [] + panel.on_change(lambda snap: calls.append(snap.etat)) + + panel.definir_workflow("Test", 5) + panel.mettre_a_jour(etat=EtatLea.AGIT) + + assert EtatLea.OBSERVE in calls + assert EtatLea.AGIT in calls + + def test_listener_erreur_nintervient_pas(self): + """Un listener qui crash ne doit pas casser le panel.""" + panel = ActivityPanel(activer_ui=False) + + def listener_casse(snap): + raise RuntimeError("boom") + + panel.on_change(listener_casse) + # Ne doit pas crasher + panel.definir_workflow("Test", 5) + snap = panel.snapshot() + assert snap.nom_workflow == "Test" + + def test_to_dict_serialisation(self): + panel = ActivityPanel(activer_ui=False) + panel.definir_workflow("Ma tâche", 10) + panel.mettre_a_jour( + etat=EtatLea.AGIT, + action="Clic sur Valider", + etape=3, + ) + d = panel.snapshot().to_dict() + assert d["nom_workflow"] == "Ma tâche" + assert d["etat"] == "agit" + assert d["etat_libelle"] == "Agit" + assert d["progression"] == "3/10" + assert d["action_courante"] == "Clic sur Valider" + + def test_masquer_sans_ui_ne_crash_pas(self): + panel = ActivityPanel(activer_ui=False) + # Doit être no-op sans crasher + panel.masquer() + panel.afficher() + + def test_etats_ont_couleurs_et_libelles(self): + """Vérifier que tous les états ont bien une couleur et un libellé.""" + for etat in EtatLea: + assert etat.libelle + assert etat.couleur.startswith("#") + assert etat.code + + def test_singleton_global(self): + p1 = activity_panel.get_activity_panel(activer_ui=False) + p2 = activity_panel.get_activity_panel(activer_ui=False) + assert p1 is p2 + + def test_reset_singleton(self): + p1 = activity_panel.get_activity_panel(activer_ui=False) + activity_panel.reset_activity_panel() + p2 = activity_panel.get_activity_panel(activer_ui=False) + assert p1 is not p2 + + +# ============================================================================ +# Tests : intégration executor ↔ notifier +# ============================================================================ + + +class TestExecutorNotifierFallback: + """Vérifier que le Noop fallback de l'executor couvre toutes les méthodes.""" + + def test_executor_noop_supporte_toutes_methodes(self): + """Le fallback _Noop doit répondre à n'importe quelle méthode.""" + # Simuler le cas où NotificationManager lève une exception + with patch( + "agent_v0.agent_v1.ui.notifications.NotificationManager", + side_effect=RuntimeError("UI indisponible"), + ): + from agent_v0.agent_v1.core.executor import ActionExecutorV1 + # Ne pas vraiment instancier (dépendances mss/pynput) — on teste + # la logique du stub en recréant la classe noop inline. + + # Test direct du pattern noop utilisé dans executor + class _Noop: + def __getattr__(self, name): + return lambda *a, **kw: False + + noop = _Noop() + # Toutes ces méthodes doivent retourner False sans crasher + assert noop.replay_target_not_found("x") is False + assert noop.replay_wrong_window("x", "y") is False + assert noop.replay_no_screen_change("click") is False + assert noop.notify_message(None) is False + assert noop.nimporte_quelle_methode() is False