feat: Léa UX — messages français naturels + feedback temps réel

Aspect 1/4 de Léa (agent Windows) : rendre Léa humaine.

Nouveaux modules :
- agent_v1/ui/messages.py : 11 formatters (cible non trouvée, mauvaise fenêtre,
  écran inchangé, connexion, workflow, retry, ralentissement, erreur générique)
- agent_v1/ui/activity_panel.py : panneau tkinter lazy avec état courant,
  action, progression X/Y, temps écoulé, 7 états (OBSERVE/CHERCHE/AGIT/VERIFIE...)

Hiérarchie de notifications :
- INFO (4s, vert) — début workflow, étape en cours
- ATTENTION (7s, orange) — retry, ralentissement
- BLOCAGE (15s, rouge, persistent, bypass rate-limit) — cible introuvable, mauvaise fenêtre

Transformations de messages :
  AVANT : "target_not_found: dans *bonjour, – Bloc-notes"
  APRÈS : "Léa a besoin d'aide"
          "Je ne trouve pas « bonjour » dans Bloc-notes.
           Peux-tu cliquer dessus toi-même ? Je reprends ensuite."

Robustesse :
- Détection fenêtre Léa via regex word-boundaries (évite cléa.txt, leapfrog.exe)
- Centralisée dans messages.est_fenetre_lea() — source unique de vérité
- Noop stub universel via __getattr__ (plus besoin de lister les méthodes)
- Thread-safe (RLock + snapshots immutables)
- Fallback silencieux si tkinter/plyer absent

101 nouveaux tests, aucune régression.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-10 08:42:01 +02:00
parent f6ad5ff2b2
commit a6eb4c168f
6 changed files with 1864 additions and 72 deletions

View File

@@ -98,20 +98,43 @@ class ActionExecutorV1:
@property
def notifier(self):
"""Instance NotificationManager paresseuse."""
"""Instance NotificationManager paresseuse.
Retourne un objet avec des méthodes no-op si NotificationManager
n'est pas disponible (tkinter / plyer absents), pour que l'executor
ne plante jamais à cause de l'UI.
"""
if self._notification_manager is None:
try:
from ..ui.notifications import NotificationManager
self._notification_manager = NotificationManager()
except Exception as e:
logger.debug(f"NotificationManager indisponible : {e}")
# Retourner un objet factice qui ne fait rien
# Retourner un objet factice qui ne fait rien — couvre toutes
# les méthodes possibles via __getattr__.
class _Noop:
def replay_target_not_found(self, *a, **kw):
return False
def __getattr__(self, name):
return lambda *a, **kw: False
self._notification_manager = _Noop()
return self._notification_manager
@property
def activity_panel(self):
"""Instance ActivityPanel paresseuse (singleton).
Fallback silencieux si le panel ne peut pas être créé.
"""
try:
from ..ui.activity_panel import get_activity_panel
return get_activity_panel()
except Exception as e:
logger.debug(f"ActivityPanel indisponible : {e}")
class _Noop:
def __getattr__(self, name):
return lambda *a, **kw: None
return _Noop()
def _auth_headers(self) -> dict:
"""Headers d'authentification Bearer pour les requetes au serveur."""
if self._api_token:
@@ -417,22 +440,29 @@ class ActionExecutorV1:
or current_title.lower() in expected_title.lower()
)
# Ignorer la fenêtre de Léa elle-même (overlay agent)
_lea_windows = ("léa", "lea —", "léa —", "lea -", "léa -", "lea assistante", "léa assistante")
is_lea_window = any(p in current_title.lower() for p in _lea_windows)
# On utilise `messages.est_fenetre_lea` centralisé pour la
# cohérence avec les autres modules (tests, activity panel).
from ..ui.messages import est_fenetre_lea
is_lea_window = est_fenetre_lea(current_title)
if not title_match and not is_lea_window:
logger.warning(
f"PRÉ-VÉRIF ÉCHOUÉE : attendu '{expected_title}', "
f"actuel '{current_title}' — STOP"
f"[LEA] Fenêtre incorrecte : attendu '{expected_title}', "
f"actuel '{current_title}'"
)
print(f" [PRÉ-VÉRIF] STOP — fenêtre '{current_title}' ≠ attendu '{expected_title}'")
# Notification utilisateur en français naturel
try:
self.notifier.replay_wrong_window(current_title, expected_title)
except Exception:
pass
result["success"] = False
result["error"] = f"Fenêtre incorrecte: '{current_title}' (attendu: '{expected_title}')"
return result
elif is_lea_window:
logger.info(f"PRÉ-VÉRIF : fenêtre Léa détectée, ignorée on continue")
logger.info("[LEA] Fenêtre de Léa détectée ignorée, on continue")
else:
logger.info(f"PRÉ-VÉRIF OK : '{current_title}'")
logger.info(f"[LEA] Pré-vérif OK : '{current_title}'")
# ── OBSERVER : pré-analyse écran avant résolution ──
# Détecte popups, dialogues, états inattendus AVANT de chercher la cible.
@@ -549,7 +579,10 @@ class ActionExecutorV1:
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
self.notifier.replay_target_not_found(target_desc)
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
return result
elif policy_decision.decision == Decision.SKIP:
@@ -560,7 +593,10 @@ class ActionExecutorV1:
elif policy_decision.decision == Decision.ABORT:
result["success"] = False
result["error"] = f"policy_abort:{target_desc}"
self.notifier.replay_target_not_found(target_desc)
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
return result
else: # SUPERVISE ou CONTINUE
@@ -570,7 +606,10 @@ class ActionExecutorV1:
result["target_spec"] = target_spec
result["screenshot"] = self._capture_screenshot_b64()
result["warning"] = "visual_resolve_failed"
self.notifier.replay_target_not_found(target_desc)
self.notifier.replay_target_not_found(
target_desc,
target_spec.get("window_title", ""),
)
return result
real_x = int(x_pct * width)
@@ -729,9 +768,14 @@ class ActionExecutorV1:
f"l'action n'a pas eu d'effet visible"
)
logger.warning(
f"Action {action_id} ({action_type}) : ecran inchange "
f"action sans effet visible"
f"[LEA] Écran inchangé après {action_type} "
f"(action_id={action_id}) — pas d'effet visible"
)
# Notifier l'utilisateur en français naturel (niveau ATTENTION)
try:
self.notifier.replay_no_screen_change(action_type)
except Exception:
pass
else:
print(f" [OK] Changement d'ecran detecte apres {action_type}")
else:

View File

@@ -38,8 +38,19 @@ except (ImportError, ValueError):
except ImportError:
LeaServerClient = None
# Configuration du logging
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
# Configuration du logging — format structuré et lisible pour un TIM
# Niveau de détail : INFO par défaut, DEBUG si RPA_AGENT_DEBUG=1
_log_level = logging.DEBUG if os.environ.get("RPA_AGENT_DEBUG") == "1" else logging.INFO
logging.basicConfig(
level=_log_level,
format="%(asctime)s %(levelname)-7s %(name)-25s %(message)s",
datefmt="%H:%M:%S",
)
# Réduire le bruit de certaines libs
for _noisy in ("urllib3", "requests.packages.urllib3", "PIL", "mss"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# Intervalle de polling replay (secondes)
@@ -371,12 +382,22 @@ class AgentV1:
time.sleep(5)
def stop_session(self):
# Arrêter la capture et le streaming de la session d'enregistrement
if self.captor: self.captor.stop()
if self.streamer: self.streamer.stop()
logger.info(f"Session {self.session_id} terminée.")
# Sauvegarder le session_id avant de l'annuler (pour les logs)
ended_session_id = self.session_id
# Reset le session_id pour que le poll replay utilise l'ID stable
# Arrêter la capture d'abord (plus d'events entrants)
if self.captor: self.captor.stop()
# Attendre que les events en cours de traitement dans _on_event_bridge
# aient le temps d'être envoyés au streamer (capture duale + push)
import time
time.sleep(1.5)
# Maintenant arrêter le streamer (drain queue + finalize)
if self.streamer: self.streamer.stop()
logger.info(f"Session {ended_session_id} terminée.")
# Reset le session_id APRÈS le stop complet du streamer
self.session_id = None
# Reset le backoff de l'executor pour reprendre le polling immédiatement
@@ -403,6 +424,7 @@ class AgentV1:
"""Capture périodique pour donner du contexte au stagiaire.
Déduplication : n'envoie que si l'écran a changé.
Tourne tant que session_id est défini (= enregistrement actif).
Enrichi avec le titre de la fenêtre active pour contextualisation.
"""
while self.running and self.session_id:
try:
@@ -413,7 +435,17 @@ class AgentV1:
if img_hash != self._last_heartbeat_hash:
self._last_heartbeat_hash = img_hash
self.streamer.push_image(full_path, f"heartbeat_{int(time.time())}")
self.streamer.push_event({"type": "heartbeat", "image": full_path, "timestamp": time.time(), "machine_id": self.machine_id})
heartbeat_event = {
"type": "heartbeat",
"image": full_path,
"timestamp": time.time(),
"machine_id": self.machine_id,
}
# Ajouter le titre de la fenêtre active (léger, pas de crop)
window_title = self.vision.get_active_window_title()
if window_title:
heartbeat_event["active_window_title"] = window_title
self.streamer.push_event(heartbeat_event)
except Exception as e:
logger.error(f"Heartbeat error: {e}")
time.sleep(5)
@@ -448,20 +480,33 @@ class AgentV1:
event["screenshot_context"] = full_path
self.streamer.push_image(full_path, f"focus_{int(time.time())}")
# 🔴 Capture Interactive (Dual)
# Capture Interactive (Dual + Fenêtre active)
if event["type"] in ["mouse_click", "key_combo"]:
self.shot_counter += 1
shot_id = f"shot_{self.shot_counter:04d}"
pos = event.get("pos", (0, 0))
capture_info = self.vision.capture_dual(pos[0], pos[1], shot_id)
event["screenshot_id"] = shot_id
event["vision_info"] = capture_info
# Enrichir l'event avec les métadonnées de la fenêtre active
# (titre, rect, coordonnées clic relatives, taille fenêtre)
window_capture = capture_info.get("window_capture")
if window_capture:
event["window_capture"] = {
"title": window_capture.get("window_title", ""),
"app_name": window_capture.get("app_name", ""),
"rect": window_capture.get("window_rect"),
"click_relative": window_capture.get("click_in_window"),
"window_size": window_capture.get("window_size"),
"click_inside_window": window_capture.get("click_inside_window", True),
}
self._stream_capture_info(capture_info, shot_id)
# 🕒 POST-ACTION : Capture du résultat après 1s (pour voir le résultat du clic)
# POST-ACTION : Capture du résultat après 1s (pour voir le résultat du clic)
threading.Timer(1.0, self._capture_result, args=(shot_id,)).start()
self.ui.update_stats(self.shot_counter)
@@ -481,6 +526,12 @@ class AgentV1:
self.streamer.push_image(capture_info["full"], f"{shot_id}_full")
if "crop" in capture_info:
self.streamer.push_image(capture_info["crop"], f"{shot_id}_crop")
# Streamer l'image de la fenêtre active si disponible
window_capture = capture_info.get("window_capture")
if window_capture and "window_image" in window_capture:
self.streamer.push_image(
window_capture["window_image"], f"{shot_id}_window"
)
def run(self):
self.ui.run()

View File

@@ -0,0 +1,418 @@
# agent_v1/ui/activity_panel.py
"""
Panel d'activité temps réel de Léa.
Affiche à l'utilisateur ce que Léa fait *maintenant* :
- État courant (Observe / Cherche / Agit / Vérifie / Bloquée)
- Action en cours (ex: "Clic sur Rechercher")
- Progression (ex: "3/15")
- Temps écoulé depuis le début du workflow
Contraintes :
- Fallback silencieux si tkinter absent (ne crash jamais)
- Thread-safe (mises à jour depuis les threads de replay)
- Pas de dépendance à PyQt5 (seulement tkinter, déjà utilisé par chat_window)
Utilisation :
panel = ActivityPanel()
panel.definir_workflow("Saisie patient", nb_etapes=15)
panel.mettre_a_jour(etat=EtatLea.AGIT, action="Clic sur Valider", etape=3)
panel.masquer()
"""
from __future__ import annotations
import logging
import threading
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional
logger = logging.getLogger(__name__)
class EtatLea(Enum):
"""États macroscopiques de Léa pendant un replay."""
INACTIVE = ("inactive", "Prête", "#808080") # Gris
OBSERVE = ("observe", "Observe", "#4A90E2") # Bleu
CHERCHE = ("cherche", "Cherche", "#F5A623") # Orange
AGIT = ("agit", "Agit", "#7ED321") # Vert
VERIFIE = ("verifie", "Vérifie", "#9013FE") # Violet
BLOQUEE = ("bloquee", "Bloquée", "#D0021B") # Rouge
TERMINE = ("termine", "Terminé", "#50E3C2") # Turquoise
def __init__(self, code: str, libelle: str, couleur: str) -> None:
self.code = code
self.libelle = libelle
self.couleur = couleur
@dataclass
class EtatActivite:
"""Instantané de l'activité courante de Léa.
Utilisé par le panel et exposé par `ActivityPanel.snapshot()` pour les
tests (sans dépendre de tkinter).
"""
etat: EtatLea = EtatLea.INACTIVE
action_courante: str = ""
nom_workflow: str = ""
etape: int = 0
nb_etapes: int = 0
debut_timestamp: float = 0.0
dernier_message: str = ""
def temps_ecoule_s(self) -> float:
"""Temps écoulé depuis le début du workflow (secondes)."""
if self.debut_timestamp <= 0:
return 0.0
return max(0.0, time.time() - self.debut_timestamp)
def progression_texte(self) -> str:
"""Représentation textuelle de la progression (ex: '3/15')."""
if self.nb_etapes <= 0:
return ""
return f"{self.etape}/{self.nb_etapes}"
def temps_ecoule_texte(self) -> str:
"""Représentation humaine du temps écoulé (ex: '12s', '1m24s')."""
s = int(self.temps_ecoule_s())
if s < 60:
return f"{s}s"
return f"{s // 60}m{s % 60:02d}s"
def to_dict(self) -> dict:
"""Sérialiser pour le logging et les tests."""
return {
"etat": self.etat.code,
"etat_libelle": self.etat.libelle,
"action_courante": self.action_courante,
"nom_workflow": self.nom_workflow,
"etape": self.etape,
"nb_etapes": self.nb_etapes,
"progression": self.progression_texte(),
"temps_ecoule_s": round(self.temps_ecoule_s(), 1),
"dernier_message": self.dernier_message,
}
class ActivityPanel:
"""Panel d'activité de Léa.
Thread-safe. Le panel tkinter est créé à la demande (lazy) et uniquement
si tkinter est disponible. Toutes les méthodes sont safe à appeler même
si l'UI n'est pas dispo (fallback silencieux).
"""
def __init__(self, activer_ui: bool = True) -> None:
self._lock = threading.RLock()
self._etat = EtatActivite()
self._activer_ui = activer_ui
# UI tkinter (créée à la demande dans le thread UI)
self._tk_root = None
self._tk_labels: dict = {}
self._ui_disponible = None # Lazy : résolu au premier usage
self._listeners = [] # Callbacks pour les changements d'état
# ------------------------------------------------------------------
# API publique (thread-safe)
# ------------------------------------------------------------------
def definir_workflow(self, nom: str, nb_etapes: int = 0) -> None:
"""Démarrer le suivi d'un nouveau workflow."""
with self._lock:
self._etat = EtatActivite(
etat=EtatLea.OBSERVE,
nom_workflow=nom,
nb_etapes=nb_etapes,
debut_timestamp=time.time(),
)
self._notifier_changement()
self._rafraichir_ui()
logger.info(f"[ACTIVITY] Workflow démarré : {nom} ({nb_etapes} étapes)")
def mettre_a_jour(
self,
etat: Optional[EtatLea] = None,
action: Optional[str] = None,
etape: Optional[int] = None,
message: Optional[str] = None,
) -> None:
"""Mettre à jour l'état affiché.
Tous les paramètres sont optionnels — on ne met à jour que ce qui est
fourni. Les autres champs conservent leur valeur actuelle.
"""
with self._lock:
if etat is not None:
self._etat.etat = etat
if action is not None:
self._etat.action_courante = action
if etape is not None:
self._etat.etape = etape
if message is not None:
self._etat.dernier_message = message
self._notifier_changement()
self._rafraichir_ui()
def terminer(self, succes: bool = True) -> None:
"""Marquer le workflow comme terminé."""
with self._lock:
self._etat.etat = EtatLea.TERMINE if succes else EtatLea.BLOQUEE
if not succes:
self._etat.dernier_message = (
self._etat.dernier_message or "Léa a rendu la main"
)
self._notifier_changement()
self._rafraichir_ui()
def reinitialiser(self) -> None:
"""Remettre le panel en état inactif."""
with self._lock:
self._etat = EtatActivite()
self._notifier_changement()
self._rafraichir_ui()
def snapshot(self) -> EtatActivite:
"""Obtenir un instantané immuable de l'état courant (pour les tests)."""
with self._lock:
return EtatActivite(
etat=self._etat.etat,
action_courante=self._etat.action_courante,
nom_workflow=self._etat.nom_workflow,
etape=self._etat.etape,
nb_etapes=self._etat.nb_etapes,
debut_timestamp=self._etat.debut_timestamp,
dernier_message=self._etat.dernier_message,
)
def masquer(self) -> None:
"""Masquer le panel UI si affiché."""
if self._tk_root is not None:
try:
self._tk_root.withdraw()
except Exception:
pass
def afficher(self) -> None:
"""Afficher le panel UI si disponible."""
self._creer_ui_si_besoin()
if self._tk_root is not None:
try:
self._tk_root.deiconify()
except Exception:
pass
def on_change(self, callback) -> None:
"""Enregistrer un listener appelé à chaque changement d'état."""
with self._lock:
self._listeners.append(callback)
# ------------------------------------------------------------------
# Gestion UI tkinter (lazy, fallback silencieux)
# ------------------------------------------------------------------
def _creer_ui_si_besoin(self) -> None:
"""Créer la fenêtre tkinter au premier usage (lazy)."""
if not self._activer_ui:
return
if self._tk_root is not None:
return
if self._ui_disponible is False:
return # Déjà testé et indisponible
try:
import tkinter as tk
except Exception as e:
logger.debug(f"[ACTIVITY] tkinter indisponible : {e}")
self._ui_disponible = False
return
try:
self._tk_root = tk.Toplevel() if _tk_root_existe() else tk.Tk()
self._tk_root.title("Léa — Activité")
self._tk_root.geometry("340x180+40+40")
self._tk_root.attributes("-topmost", True)
self._tk_root.resizable(False, False)
self._tk_root.configure(bg="#1E1E1E")
titre = tk.Label(
self._tk_root,
text="Léa",
font=("Segoe UI", 14, "bold"),
fg="#FFFFFF",
bg="#1E1E1E",
)
titre.pack(pady=(10, 2))
self._tk_labels["etat"] = tk.Label(
self._tk_root,
text="Prête",
font=("Segoe UI", 11),
fg="#808080",
bg="#1E1E1E",
)
self._tk_labels["etat"].pack()
self._tk_labels["action"] = tk.Label(
self._tk_root,
text="",
font=("Segoe UI", 10),
fg="#FFFFFF",
bg="#1E1E1E",
wraplength=300,
)
self._tk_labels["action"].pack(pady=(8, 2))
self._tk_labels["progression"] = tk.Label(
self._tk_root,
text="",
font=("Segoe UI", 9),
fg="#B0B0B0",
bg="#1E1E1E",
)
self._tk_labels["progression"].pack()
self._tk_labels["temps"] = tk.Label(
self._tk_root,
text="",
font=("Segoe UI", 9),
fg="#808080",
bg="#1E1E1E",
)
self._tk_labels["temps"].pack(pady=(4, 0))
self._tk_labels["message"] = tk.Label(
self._tk_root,
text="",
font=("Segoe UI", 9, "italic"),
fg="#B0B0B0",
bg="#1E1E1E",
wraplength=300,
)
self._tk_labels["message"].pack(pady=(6, 10))
# Masquer par défaut : on affiche seulement pendant un workflow
self._tk_root.withdraw()
self._ui_disponible = True
except Exception as e:
logger.debug(f"[ACTIVITY] Impossible de créer l'UI : {e}")
self._ui_disponible = False
self._tk_root = None
def _rafraichir_ui(self) -> None:
"""Mettre à jour les labels tkinter (safe si l'UI n'existe pas)."""
if not self._activer_ui or self._ui_disponible is False:
return
self._creer_ui_si_besoin()
if self._tk_root is None:
return
try:
with self._lock:
snap = self.snapshot()
# Utiliser after(0) pour rester dans le thread UI tkinter
def _update():
try:
self._tk_labels["etat"].config(
text=snap.etat.libelle,
fg=snap.etat.couleur,
)
if snap.action_courante:
self._tk_labels["action"].config(text=snap.action_courante)
else:
self._tk_labels["action"].config(text="")
prog = snap.progression_texte()
if prog and snap.nom_workflow:
self._tk_labels["progression"].config(
text=f"« {snap.nom_workflow} » — {prog}"
)
elif snap.nom_workflow:
self._tk_labels["progression"].config(
text=f"« {snap.nom_workflow} »"
)
else:
self._tk_labels["progression"].config(text="")
if snap.debut_timestamp > 0:
self._tk_labels["temps"].config(
text=f"{snap.temps_ecoule_texte()}"
)
else:
self._tk_labels["temps"].config(text="")
self._tk_labels["message"].config(text=snap.dernier_message)
# Afficher automatiquement si actif
if snap.etat != EtatLea.INACTIVE:
self._tk_root.deiconify()
except Exception:
pass
try:
self._tk_root.after(0, _update)
except Exception:
# Si le root a été détruit
self._tk_root = None
self._ui_disponible = False
except Exception as e:
logger.debug(f"[ACTIVITY] Erreur rafraîchissement UI : {e}")
def _notifier_changement(self) -> None:
"""Notifier tous les listeners du changement d'état."""
with self._lock:
listeners = list(self._listeners)
snap = self.snapshot()
for cb in listeners:
try:
cb(snap)
except Exception as e:
logger.debug(f"[ACTIVITY] Listener erreur : {e}")
def _tk_root_existe() -> bool:
"""Vérifier si un root tkinter existe déjà (pour créer un Toplevel)."""
try:
import tkinter as tk
default_root = getattr(tk, "_default_root", None)
return default_root is not None
except Exception:
return False
# ============================================================================
# Singleton global (optionnel)
# ============================================================================
_INSTANCE_GLOBALE: Optional[ActivityPanel] = None
_LOCK_SINGLETON = threading.Lock()
def get_activity_panel(activer_ui: bool = True) -> ActivityPanel:
"""Obtenir l'instance globale du panel d'activité (lazy)."""
global _INSTANCE_GLOBALE
with _LOCK_SINGLETON:
if _INSTANCE_GLOBALE is None:
_INSTANCE_GLOBALE = ActivityPanel(activer_ui=activer_ui)
return _INSTANCE_GLOBALE
def reset_activity_panel() -> None:
"""Réinitialiser le singleton (utile pour les tests)."""
global _INSTANCE_GLOBALE
with _LOCK_SINGLETON:
if _INSTANCE_GLOBALE is not None:
try:
_INSTANCE_GLOBALE.masquer()
except Exception:
pass
_INSTANCE_GLOBALE = None

View File

@@ -0,0 +1,444 @@
# agent_v1/ui/messages.py
"""
Formatage des messages utilisateur pour Léa.
Convertit les codes d'erreur techniques (`target_not_found`, `no_screen_change`...)
en phrases en français naturel, orientées action, adaptées à un utilisateur non
technique (secrétaire médicale, TIM).
Trois niveaux de sévérité sont définis :
- INFO — Léa fait son travail normalement
- ATTENTION — Quelque chose de léger (ralentissement, retry)
- BLOCAGE — Léa a besoin d'aide, elle rend la main
Le module est 100% pur (pas d'I/O, pas d'UI) : testable sans mocks lourds.
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class NiveauMessage(Enum):
"""Niveaux hiérarchiques des messages affichés à l'utilisateur."""
INFO = "info" # Fond vert clair, disparaît tout seul, 3-5s
ATTENTION = "attention" # Fond orange clair, disparaît tout seul, 7s
BLOCAGE = "blocage" # Fond rouge clair, reste affiché, 15s+
# Durée d'affichage par défaut (secondes), par niveau
DUREE_PAR_NIVEAU: dict[NiveauMessage, int] = {
NiveauMessage.INFO: 4,
NiveauMessage.ATTENTION: 7,
NiveauMessage.BLOCAGE: 15,
}
# Icône textuelle par niveau (compatible plyer/Windows/Linux)
ICONE_PAR_NIVEAU: dict[NiveauMessage, str] = {
NiveauMessage.INFO: "i",
NiveauMessage.ATTENTION: "!",
NiveauMessage.BLOCAGE: "?",
}
@dataclass
class MessageUtilisateur:
"""Un message prêt à être affiché à l'utilisateur.
Attributes:
niveau: Hiérarchie (info/attention/blocage)
titre: Titre court de la notification (≤60 caractères)
corps: Corps du message en français naturel
duree_s: Durée d'affichage recommandée (secondes)
persistent: Si True, l'utilisateur doit fermer manuellement
"""
niveau: NiveauMessage
titre: str
corps: str
duree_s: int
persistent: bool = False
def to_dict(self) -> dict:
"""Sérialiser le message (utile pour les tests et le logging)."""
return {
"niveau": self.niveau.value,
"titre": self.titre,
"corps": self.corps,
"duree_s": self.duree_s,
"persistent": self.persistent,
}
# ============================================================================
# Helpers d'extraction
# ============================================================================
def _extraire_nom_application(titre_fenetre: str) -> str:
"""Extraire le nom de l'application à partir d'un titre de fenêtre.
Les titres Windows suivent généralement le format :
"Document.txt Bloc-notes"
"Ma Page - Google Chrome"
"Sans titre — Paint"
On retourne la partie après le dernier séparateur, ou le titre entier.
"""
if not titre_fenetre:
return ""
titre = titre_fenetre.strip()
# Chercher le dernier séparateur parmi " ", " — ", " - "
for sep in (" ", "", " - "):
if sep in titre:
return titre.rsplit(sep, 1)[-1].strip()
return titre
def _nettoyer_description_cible(description: str) -> str:
"""Nettoyer la description technique d'une cible pour l'afficher.
Supprime les caractères techniques (guillemets inutiles, ':').
"""
if not description:
return ""
desc = description.strip()
# Retirer les guillemets encapsulants
desc = desc.strip("'\"`")
# Limiter la longueur
if len(desc) > 80:
desc = desc[:77] + "..."
return desc
# ============================================================================
# Formattage des messages techniques → humains
# ============================================================================
def formatter_cible_non_trouvee(
description_cible: str,
titre_fenetre: Optional[str] = None,
) -> MessageUtilisateur:
"""Message quand Léa ne trouve pas un élément à cliquer.
Exemple avant :
target_not_found: 'bonjour' dans *bonjour, Bloc-notes
Exemple après :
Léa a besoin d'aide
Je ne trouve pas "bonjour" dans le Bloc-notes. Peux-tu cliquer
dessus toi-même ? Je reprends ensuite.
"""
cible = _nettoyer_description_cible(description_cible) or "l'élément"
app = _extraire_nom_application(titre_fenetre or "")
if app:
corps = (
f"Je ne trouve pas « {cible} » dans {app}. "
f"Peux-tu cliquer dessus toi-même ? Je reprends ensuite."
)
else:
corps = (
f"Je ne trouve pas « {cible} » à l'écran. "
f"Peux-tu le faire toi-même ? Je reprends ensuite."
)
return MessageUtilisateur(
niveau=NiveauMessage.BLOCAGE,
titre="Léa a besoin d'aide",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE],
persistent=True,
)
def formatter_fenetre_incorrecte(
titre_actuel: str,
titre_attendu: str,
) -> MessageUtilisateur:
"""Message quand la fenêtre active n'est pas celle attendue.
Exemple avant :
Fenêtre incorrecte: 'Program Manager' (attendu: 'Lea : Explorateur de fichiers')
Exemple après :
Léa attend une fenêtre
J'attends « Explorateur de fichiers » mais c'est « Program Manager »
qui est affiché. Peux-tu ouvrir la bonne fenêtre ?
"""
app_actuelle = _extraire_nom_application(titre_actuel) or "une autre fenêtre"
app_attendue = _extraire_nom_application(titre_attendu) or titre_attendu
corps = (
f"J'attends « {app_attendue} » mais c'est « {app_actuelle} » "
f"qui est affiché. Peux-tu ouvrir la bonne fenêtre ?"
)
return MessageUtilisateur(
niveau=NiveauMessage.BLOCAGE,
titre="Léa attend une fenêtre",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE],
persistent=True,
)
def formatter_ecran_inchange(action_type: str = "") -> MessageUtilisateur:
"""Message quand l'action n'a pas eu d'effet visible.
Exemple avant :
Ecran inchange apres l'action
Exemple après :
Léa vérifie
Mon clic n'a pas eu l'air de marcher. Je vais réessayer ou te
rendre la main si ça ne passe pas.
"""
actions_fr = {
"click": "Mon clic",
"type": "Ma saisie",
"key_combo": "Mon raccourci clavier",
"scroll": "Mon défilement",
}
quoi = actions_fr.get(action_type, "Mon action")
corps = (
f"{quoi} n'a pas eu l'air de marcher. Je vais réessayer, "
f"ou te rendre la main si ça ne passe pas."
)
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa vérifie",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
def formatter_connexion_perdue(hote_serveur: str = "") -> MessageUtilisateur:
"""Message quand la connexion avec le serveur est perdue.
Rassurant : on dit qu'on va réessayer automatiquement.
"""
corps = (
"J'ai perdu le lien avec le serveur. Je retente automatiquement, "
"pas besoin d'intervenir."
)
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa est déconnectée",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
def formatter_connexion_retablie() -> MessageUtilisateur:
"""Message quand la connexion serveur est rétablie."""
return MessageUtilisateur(
niveau=NiveauMessage.INFO,
titre="Léa",
corps="C'est bon, la connexion est revenue. Je continue.",
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.INFO],
)
def formatter_debut_workflow(nom_workflow: str, nb_etapes: int = 0) -> MessageUtilisateur:
"""Message au démarrage d'un workflow de replay."""
if nb_etapes > 0:
corps = (
f"Je démarre « {nom_workflow} » ({nb_etapes} étapes). "
f"Je t'indique mon avancement."
)
else:
corps = f"Je démarre « {nom_workflow} ». Je t'indique mon avancement."
return MessageUtilisateur(
niveau=NiveauMessage.INFO,
titre="Léa démarre",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.INFO],
)
def formatter_etape_workflow(
etape_actuelle: int,
nb_etapes: int,
description: str = "",
) -> MessageUtilisateur:
"""Message pour la progression d'une étape."""
if description:
desc = _nettoyer_description_cible(description)
corps = f"Étape {etape_actuelle}/{nb_etapes}{desc}"
else:
corps = f"Étape {etape_actuelle}/{nb_etapes}"
return MessageUtilisateur(
niveau=NiveauMessage.INFO,
titre="Léa avance",
corps=corps,
duree_s=3,
)
def formatter_retry(action_type: str = "", tentative: int = 2) -> MessageUtilisateur:
"""Message quand Léa retente une action."""
corps = (
f"Je retente (tentative {tentative}). Ça arrive parfois, "
f"l'écran était peut-être en cours de chargement."
)
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa retente",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
def formatter_ralentissement() -> MessageUtilisateur:
"""Message quand Léa prend plus de temps que prévu."""
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa prend son temps",
corps="Je vais plus lentement que prévu. L'écran met du temps à répondre.",
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
def formatter_fin_workflow(
succes: bool,
nom_workflow: str = "",
nb_etapes: int = 0,
duree_s: float = 0.0,
) -> MessageUtilisateur:
"""Message à la fin d'un workflow."""
if succes:
if nom_workflow and nb_etapes > 0:
corps = (
f"C'est fait ! « {nom_workflow} » est terminé "
f"({nb_etapes} étapes en {int(duree_s)}s)."
)
else:
corps = "C'est fait ! Tout s'est bien passé."
return MessageUtilisateur(
niveau=NiveauMessage.INFO,
titre="Léa a terminé",
corps=corps,
duree_s=6,
)
else:
corps = (
"Je n'ai pas pu terminer. Je te rends la main, "
"tu peux continuer à partir de là où je me suis arrêtée."
)
return MessageUtilisateur(
niveau=NiveauMessage.BLOCAGE,
titre="Léa s'arrête",
corps=corps,
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.BLOCAGE],
persistent=True,
)
def formatter_erreur_generique(message_technique: str) -> MessageUtilisateur:
"""Formater un message d'erreur technique non catégorisé.
On essaie de détecter les motifs connus dans le message technique pour
le router vers le bon formatter spécialisé, sinon on emballe le message.
"""
if not message_technique:
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa",
corps="J'ai rencontré un petit souci. Je continue.",
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
msg_lower = message_technique.lower()
# target_not_found[:...]
if "target_not_found" in msg_lower:
# Essayer d'extraire la description après le ':'
match = re.match(r"target_not_found[:\s]*(.*)", message_technique, re.IGNORECASE)
desc = match.group(1).strip() if match else ""
return formatter_cible_non_trouvee(desc)
# Fenêtre incorrecte: 'X' (attendu: 'Y')
if "fenêtre incorrecte" in msg_lower or "fenetre incorrecte" in msg_lower:
# Extraire actuel et attendu
m_actuel = re.search(r"[:,]\s*['\"]([^'\"]+)['\"]", message_technique)
m_attendu = re.search(r"attendu[:\s]*['\"]([^'\"]+)['\"]", message_technique)
actuel = m_actuel.group(1) if m_actuel else ""
attendu = m_attendu.group(1) if m_attendu else ""
return formatter_fenetre_incorrecte(actuel, attendu)
# Ecran inchangé
if "inchang" in msg_lower or "no_screen_change" in msg_lower:
return formatter_ecran_inchange()
# Policy abort / supervise
if "policy_abort" in msg_lower or "visual_resolve_failed" in msg_lower:
return formatter_cible_non_trouvee(message_technique)
# Fallback : message technique tronqué
msg_tronque = message_technique.strip()
if len(msg_tronque) > 120:
msg_tronque = msg_tronque[:117] + "..."
return MessageUtilisateur(
niveau=NiveauMessage.ATTENTION,
titre="Léa",
corps=f"J'ai rencontré un souci : {msg_tronque}",
duree_s=DUREE_PAR_NIVEAU[NiveauMessage.ATTENTION],
)
# ============================================================================
# Détection fenêtre Léa (utilisé par l'executor pour ignorer sa propre UI)
# ============================================================================
# Motifs qui identifient une fenêtre appartenant à Léa (l'agent lui-même).
# On utilise des regex avec \b pour éviter les faux positifs sur des noms
# contenant "lea" (ex: "cléa.txt", "leapfrog", "replay").
_MOTIFS_FENETRE_LEA_REGEX = (
r"\bléa\b",
r"\blea\b(?!p)", # "lea" mot entier, pas "leapfrog"
r"lea\s*[—–\-:]", # "Lea —", "Lea -", "Lea :"
r"léa\s*[—–\-:]",
r"\bassistante ia\b",
r"\bléa ia\b",
r"\blea ia\b",
)
def est_fenetre_lea(titre_fenetre: str) -> bool:
"""Détecter si un titre de fenêtre appartient à l'agent Léa lui-même.
Utilisé pour éviter que Léa ne se considère comme une fenêtre intrusive
dans ses propres pré-vérifications.
Utilise des regex avec des word boundaries pour éviter les faux positifs
sur des noms de fichiers contenant "lea" (ex: "cléa.txt", "replay.log").
"""
if not titre_fenetre:
return False
titre_lower = titre_fenetre.lower().strip()
return any(re.search(motif, titre_lower) for motif in _MOTIFS_FENETRE_LEA_REGEX)
# Conservé pour rétro-compatibilité avec le code qui listait MOTIFS_FENETRE_LEA
MOTIFS_FENETRE_LEA = (
"léa",
"lea —",
"léa —",
"lea -",
"léa -",
"lea assistante",
"léa assistante",
"lea : ",
"léa : ",
"assistante ia",
)

View File

@@ -5,6 +5,14 @@ Utilise plyer pour les notifications système, sans dépendance PyQt5.
Remplace les dialogues Qt par des toasts non-bloquants.
Thread-safe avec rate limiting (1 notification / 2 secondes max).
Les messages utilisateur sont formatés via `agent_v1.ui.messages` qui convertit
les codes techniques (target_not_found, etc.) en français naturel.
Hiérarchie des notifications (cf. messages.NiveauMessage) :
- INFO : auto-dismiss en ~4s, rate-limité classique
- ATTENTION : auto-dismiss en ~7s, rate-limité classique
- BLOCAGE : persistant (15s+), bypass du rate limit
"""
import logging
@@ -12,6 +20,22 @@ import threading
import time
from typing import Optional
from .messages import (
MessageUtilisateur,
NiveauMessage,
formatter_cible_non_trouvee,
formatter_connexion_perdue,
formatter_connexion_retablie,
formatter_debut_workflow,
formatter_ecran_inchange,
formatter_erreur_generique,
formatter_etape_workflow,
formatter_fenetre_incorrecte,
formatter_fin_workflow,
formatter_ralentissement,
formatter_retry,
)
logger = logging.getLogger(__name__)
# Import conditionnel de plyer — fallback silencieux si absent
@@ -59,7 +83,13 @@ class NotificationManager:
# Méthode générique
# ------------------------------------------------------------------ #
def notify(self, title: str, message: str, timeout: int = 5) -> bool:
def notify(
self,
title: str,
message: str,
timeout: int = 5,
bypass_rate_limit: bool = False,
) -> bool:
"""
Affiche une notification toast.
@@ -67,6 +97,8 @@ class NotificationManager:
title: Titre de la notification.
message: Corps du message.
timeout: Durée d'affichage en secondes.
bypass_rate_limit: Si True, ignore le rate limit (pour les blocages
importants qui ne doivent pas être écrasés).
Returns:
True si la notification a été envoyée, False sinon
@@ -76,17 +108,21 @@ class NotificationManager:
logger.debug("Notification ignorée (plyer absent) : %s", title)
return False
with self._lock:
now = time.monotonic()
elapsed = now - self._last_notification_time
if elapsed < RATE_LIMIT_SECONDS:
logger.debug(
"Notification ignorée (rate limit, %.1fs restantes) : %s",
RATE_LIMIT_SECONDS - elapsed,
title,
)
return False
self._last_notification_time = now
if not bypass_rate_limit:
with self._lock:
now = time.monotonic()
elapsed = now - self._last_notification_time
if elapsed < RATE_LIMIT_SECONDS:
logger.debug(
"Notification ignorée (rate limit, %.1fs restantes) : %s",
RATE_LIMIT_SECONDS - elapsed,
title,
)
return False
self._last_notification_time = now
else:
with self._lock:
self._last_notification_time = time.monotonic()
# Envoi dans un thread dédié pour ne jamais bloquer l'appelant
thread = threading.Thread(
@@ -97,6 +133,39 @@ class NotificationManager:
thread.start()
return True
def notify_message(self, msg: MessageUtilisateur) -> bool:
"""Envoyer un MessageUtilisateur structuré (niveau, titre, corps).
Les messages BLOCAGE bypass le rate limit pour garantir que
l'utilisateur voit qu'on a besoin de lui.
"""
bypass = msg.niveau == NiveauMessage.BLOCAGE
# Log aussi pour tracer dans les logs fichiers
self._log_message(msg)
return self.notify(
title=msg.titre,
message=msg.corps,
timeout=msg.duree_s,
bypass_rate_limit=bypass,
)
@staticmethod
def _log_message(msg: MessageUtilisateur) -> None:
"""Logger un message utilisateur avec le niveau approprié.
Les logs agents sont plus lisibles quand on route info → INFO,
attention → WARNING, blocage → ERROR, avec un préfixe [LEA].
"""
prefix = f"[LEA] {msg.titre}: {msg.corps}"
if msg.niveau == NiveauMessage.INFO:
logger.info(prefix)
elif msg.niveau == NiveauMessage.ATTENTION:
logger.warning(prefix)
elif msg.niveau == NiveauMessage.BLOCAGE:
logger.error(prefix)
else:
logger.info(prefix)
def _send(self, title: str, message: str, timeout: int) -> None:
"""Envoi effectif de la notification (exécuté dans un thread dédié)."""
try:
@@ -180,40 +249,79 @@ class NotificationManager:
timeout=3,
)
def replay_finished(self, success: bool, workflow_name: str) -> bool:
"""Notification de fin de replay (succès ou échec)."""
if success:
return self.notify(
title=APP_NAME,
message="C'est fait ! Tout s'est bien passé.",
timeout=5,
)
else:
return self.notify(
title=APP_NAME,
message="Hmm, j'ai eu un souci. Vous pouvez me remontrer ?",
timeout=7,
)
def replay_target_not_found(
self,
target_description: str,
window_title: Optional[str] = None,
) -> bool:
"""Notification quand un élément n'est pas trouvé pendant le replay.
def connection_changed(self, connected: bool, server_host: str) -> bool:
Le replay est mis en pause et attend une intervention humaine.
Utilise `messages.formatter_cible_non_trouvee` pour un message en
français naturel.
"""
msg = formatter_cible_non_trouvee(target_description, window_title)
return self.notify_message(msg)
def replay_wrong_window(self, current_title: str, expected_title: str) -> bool:
"""Notification quand la fenêtre active n'est pas celle attendue."""
msg = formatter_fenetre_incorrecte(current_title, expected_title)
return self.notify_message(msg)
def replay_no_screen_change(self, action_type: str = "") -> bool:
"""Notification quand une action n'a pas eu d'effet visible."""
msg = formatter_ecran_inchange(action_type)
return self.notify_message(msg)
def replay_retry(self, action_type: str = "", tentative: int = 2) -> bool:
"""Notification quand Léa retente une action."""
msg = formatter_retry(action_type, tentative)
return self.notify_message(msg)
def replay_slow(self) -> bool:
"""Notification quand Léa va plus lentement que prévu."""
msg = formatter_ralentissement()
return self.notify_message(msg)
def replay_finished(
self,
success: bool,
workflow_name: str,
step_count: int = 0,
duration_s: float = 0.0,
) -> bool:
"""Notification de fin de replay (succès ou échec)."""
msg = formatter_fin_workflow(success, workflow_name, step_count, duration_s)
return self.notify_message(msg)
def replay_workflow_started(self, workflow_name: str, step_count: int = 0) -> bool:
"""Notification de début de workflow (remplace `replay_started`)."""
msg = formatter_debut_workflow(workflow_name, step_count)
return self.notify_message(msg)
def replay_step_progress(
self,
current: int,
total: int,
description: str = "",
) -> bool:
"""Notification de progression d'une étape (niveau INFO)."""
msg = formatter_etape_workflow(current, total, description)
return self.notify_message(msg)
def connection_changed(self, connected: bool, server_host: str = "") -> bool:
"""Notification de changement d'état de la connexion serveur."""
if connected:
return self.notify(
title=APP_NAME,
message="Connectée au serveur.",
timeout=5,
)
msg = formatter_connexion_retablie()
else:
return self.notify(
title=APP_NAME,
message="J'ai perdu la connexion avec le serveur.",
timeout=7,
)
msg = formatter_connexion_perdue(server_host)
return self.notify_message(msg)
def error(self, message: str) -> bool:
"""Notification d'erreur."""
return self.notify(
title=APP_NAME,
message=f"Oups, un problème : {message}",
timeout=10,
)
"""Notification d'erreur générique.
Essaie d'abord de détecter un motif technique connu et de formater
correctement, sinon fallback sur un message générique aidant.
"""
msg = formatter_erreur_generique(message)
return self.notify_message(msg)

View File

@@ -0,0 +1,727 @@
"""Tests unitaires pour l'UX de Léa (notifications, messages, activity panel).
Couvre :
- Formatage des messages techniques → français naturel (module messages.py)
- Hiérarchie info/attention/blocage
- Détection de la fenêtre Léa
- NotificationManager avec plyer mocké
- ActivityPanel sans tkinter (fallback silencieux)
Ces tests ne nécessitent ni tkinter ni plyer : tout est mocké ou géré en
fallback silencieux. Ils doivent passer sur toutes les plateformes.
Auteur: Dom, avril 2026
"""
from __future__ import annotations
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# Assurer que la racine du projet est dans le path (comme conftest)
ROOT = Path(__file__).resolve().parents[2]
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
from agent_v0.agent_v1.ui import activity_panel, messages, notifications
from agent_v0.agent_v1.ui.activity_panel import ActivityPanel, EtatLea, reset_activity_panel
from agent_v0.agent_v1.ui.messages import (
MessageUtilisateur,
NiveauMessage,
_extraire_nom_application,
_nettoyer_description_cible,
est_fenetre_lea,
formatter_cible_non_trouvee,
formatter_connexion_perdue,
formatter_connexion_retablie,
formatter_debut_workflow,
formatter_ecran_inchange,
formatter_erreur_generique,
formatter_etape_workflow,
formatter_fenetre_incorrecte,
formatter_fin_workflow,
formatter_ralentissement,
formatter_retry,
)
from agent_v0.agent_v1.ui.notifications import NotificationManager
# ============================================================================
# Tests : helpers d'extraction
# ============================================================================
class TestExtraction:
"""Tests des helpers _extraire_nom_application et _nettoyer_description_cible."""
def test_extraire_app_avec_em_dash(self):
assert _extraire_nom_application("Document.txt Bloc-notes") == "Bloc-notes"
def test_extraire_app_avec_em_dash_long(self):
assert _extraire_nom_application("Ma Page — Google Chrome") == "Google Chrome"
def test_extraire_app_avec_dash_simple(self):
assert _extraire_nom_application("Session 1 - Firefox") == "Firefox"
def test_extraire_app_sans_separateur(self):
assert _extraire_nom_application("Bloc-notes") == "Bloc-notes"
def test_extraire_app_vide(self):
assert _extraire_nom_application("") == ""
assert _extraire_nom_application(None) == ""
def test_extraire_app_garde_dernier_separateur(self):
# Cas multi-séparateurs : on garde la dernière partie
assert _extraire_nom_application("A - B - C") == "C"
def test_nettoyer_description_retire_guillemets(self):
assert _nettoyer_description_cible("'bonjour'") == "bonjour"
assert _nettoyer_description_cible('"bonjour"') == "bonjour"
assert _nettoyer_description_cible("`code`") == "code"
def test_nettoyer_description_vide(self):
assert _nettoyer_description_cible("") == ""
assert _nettoyer_description_cible(None) == ""
def test_nettoyer_description_tronque(self):
longue = "x" * 200
resultat = _nettoyer_description_cible(longue)
assert len(resultat) <= 80
assert resultat.endswith("...")
# ============================================================================
# Tests : détection fenêtre Léa
# ============================================================================
class TestFenetreLea:
"""Tests de est_fenetre_lea — crucial pour la robustesse."""
@pytest.mark.parametrize("titre", [
"Léa",
"Léa — Assistante IA",
"Lea - Assistante",
"Léa — Activité",
"Lea : Explorateur de fichiers",
"LÉA — ASSISTANTE IA", # casse mixte
"Léa assistante",
"Assistante IA",
])
def test_detecte_fenetres_lea(self, titre):
assert est_fenetre_lea(titre), f"Devrait détecter : {titre!r}"
@pytest.mark.parametrize("titre", [
"Bloc-notes",
"Google Chrome",
"Program Manager",
"Microsoft Word - Document1",
"Sans titre - Paint",
"",
"cléa.txt", # contient "léa" mais c'est un fichier
"replay.log", # contient "lea"
"leapfrog.exe", # contient "lea"
"nucleaire.pdf", # contient "lea"
])
def test_ignore_fenetres_non_lea(self, titre):
"""Les faux positifs sur des noms contenant 'lea' doivent être évités
grâce aux word boundaries regex."""
assert not est_fenetre_lea(titre), f"Ne devrait pas détecter : {titre!r}"
def test_titre_none(self):
assert est_fenetre_lea(None) is False
def test_espaces_en_trop(self):
assert est_fenetre_lea(" Léa — Assistante IA ") is True
# ============================================================================
# Tests : formatage des messages techniques → humains
# ============================================================================
class TestFormatterCibleNonTrouvee:
"""Tests du formatage quand un élément n'est pas trouvé."""
def test_message_blocage(self):
msg = formatter_cible_non_trouvee("bonjour", "Document Bloc-notes")
assert msg.niveau == NiveauMessage.BLOCAGE
assert msg.persistent is True
assert "besoin d'aide" in msg.titre.lower()
def test_message_contient_nom_element(self):
msg = formatter_cible_non_trouvee("Rechercher", "Chrome")
assert "rechercher" in msg.corps.lower()
def test_message_contient_nom_application(self):
msg = formatter_cible_non_trouvee("bonjour", "Doc Bloc-notes")
assert "bloc-notes" in msg.corps.lower()
def test_message_action_orientee(self):
"""Le message doit proposer une action à l'utilisateur."""
msg = formatter_cible_non_trouvee("bouton", "App")
corps_lower = msg.corps.lower()
# Doit contenir un verbe d'action type "cliquer", "faire"
assert any(verb in corps_lower for verb in ["cliqu", "faire", "peux-tu"])
def test_sans_fenetre(self):
msg = formatter_cible_non_trouvee("Submit", None)
assert msg.niveau == NiveauMessage.BLOCAGE
assert "submit" in msg.corps.lower()
def test_description_vide(self):
msg = formatter_cible_non_trouvee("", "App")
# Doit quand même produire un message utilisable
assert msg.corps
assert msg.niveau == NiveauMessage.BLOCAGE
def test_message_techniques_nettoyes(self):
"""Pas de '__target_not_found__' ni code technique visible."""
msg = formatter_cible_non_trouvee("'bonjour'", "Bloc-notes")
assert "target_not_found" not in msg.corps
# Les guillemets techniques sont nettoyés, mais on en ajoute des français
assert "bonjour" in msg.corps
class TestFormatterFenetreIncorrecte:
"""Tests du formatage quand la mauvaise fenêtre est active."""
def test_message_blocage_persistent(self):
msg = formatter_fenetre_incorrecte(
"Program Manager",
"Lea : Explorateur de fichiers",
)
assert msg.niveau == NiveauMessage.BLOCAGE
assert msg.persistent is True
def test_mentionne_fenetre_attendue(self):
msg = formatter_fenetre_incorrecte("Program Manager", "Chrome")
assert "chrome" in msg.corps.lower()
def test_mentionne_fenetre_actuelle(self):
msg = formatter_fenetre_incorrecte("Program Manager", "Chrome")
assert "program manager" in msg.corps.lower()
def test_suggere_action(self):
msg = formatter_fenetre_incorrecte("A", "B")
# Propose d'ouvrir la bonne fenêtre
assert "ouvr" in msg.corps.lower() or "fenêtre" in msg.corps.lower()
class TestFormatterEcranInchange:
"""Tests du formatage quand l'écran ne change pas après une action."""
def test_niveau_attention(self):
"""L'écran inchangé est de niveau ATTENTION, pas BLOCAGE."""
msg = formatter_ecran_inchange("click")
assert msg.niveau == NiveauMessage.ATTENTION
def test_message_pour_click(self):
msg = formatter_ecran_inchange("click")
assert "clic" in msg.corps.lower()
def test_message_pour_type(self):
msg = formatter_ecran_inchange("type")
assert "saisie" in msg.corps.lower()
def test_message_pour_key_combo(self):
msg = formatter_ecran_inchange("key_combo")
assert "raccourci" in msg.corps.lower()
def test_sans_type_action(self):
msg = formatter_ecran_inchange("")
assert msg.corps # Doit quand même produire quelque chose
def test_pas_persistent(self):
msg = formatter_ecran_inchange("click")
assert msg.persistent is False
class TestFormatterConnexion:
"""Tests des messages de connexion serveur."""
def test_connexion_perdue_attention(self):
msg = formatter_connexion_perdue("localhost")
assert msg.niveau == NiveauMessage.ATTENTION
def test_connexion_perdue_rassurante(self):
"""Le message doit rassurer (reconnexion automatique)."""
msg = formatter_connexion_perdue()
assert "automatique" in msg.corps.lower() or "retent" in msg.corps.lower()
def test_connexion_retablie_info(self):
msg = formatter_connexion_retablie()
assert msg.niveau == NiveauMessage.INFO
def test_connexion_retablie_positive(self):
msg = formatter_connexion_retablie()
assert "bon" in msg.corps.lower() or "revenue" in msg.corps.lower()
class TestFormatterWorkflow:
"""Tests des messages de workflow (début, étape, fin)."""
def test_debut_avec_etapes(self):
msg = formatter_debut_workflow("Saisie patient", 15)
assert msg.niveau == NiveauMessage.INFO
assert "saisie patient" in msg.corps.lower()
assert "15" in msg.corps
def test_debut_sans_etapes(self):
msg = formatter_debut_workflow("Backup")
assert msg.niveau == NiveauMessage.INFO
assert "backup" in msg.corps.lower()
def test_etape_progression(self):
msg = formatter_etape_workflow(3, 15, "Clic sur Valider")
assert "3" in msg.corps
assert "15" in msg.corps
assert "valider" in msg.corps.lower()
def test_etape_sans_description(self):
msg = formatter_etape_workflow(5, 20)
assert "5" in msg.corps
assert "20" in msg.corps
def test_fin_succes(self):
msg = formatter_fin_workflow(True, "Ma tâche", 10, 45.0)
assert msg.niveau == NiveauMessage.INFO
assert "terminé" in msg.corps.lower() or "fait" in msg.corps.lower()
def test_fin_echec_blocage(self):
msg = formatter_fin_workflow(False, "Ma tâche")
assert msg.niveau == NiveauMessage.BLOCAGE
assert msg.persistent is True
class TestFormatterRetryRalentissement:
"""Tests des messages de retry et ralentissement."""
def test_retry_attention(self):
msg = formatter_retry("click", 2)
assert msg.niveau == NiveauMessage.ATTENTION
assert "2" in msg.corps # numéro de tentative
def test_ralentissement_attention(self):
msg = formatter_ralentissement()
assert msg.niveau == NiveauMessage.ATTENTION
assert "lent" in msg.corps.lower()
class TestFormatterErreurGenerique:
"""Tests du router formatter_erreur_generique → spécialisé."""
def test_detecte_target_not_found(self):
msg = formatter_erreur_generique("target_not_found: 'bouton'")
assert msg.niveau == NiveauMessage.BLOCAGE
assert "bouton" in msg.corps.lower()
def test_detecte_fenetre_incorrecte(self):
msg = formatter_erreur_generique(
"Fenêtre incorrecte: 'Program Manager' (attendu: 'Chrome')"
)
assert msg.niveau == NiveauMessage.BLOCAGE
assert "chrome" in msg.corps.lower() or "program manager" in msg.corps.lower()
def test_detecte_ecran_inchange(self):
msg = formatter_erreur_generique("Ecran inchange apres l'action")
assert msg.niveau == NiveauMessage.ATTENTION
def test_detecte_no_screen_change(self):
msg = formatter_erreur_generique("no_screen_change after click")
assert msg.niveau == NiveauMessage.ATTENTION
def test_detecte_policy_abort(self):
msg = formatter_erreur_generique("policy_abort:target_desc_x")
assert msg.niveau == NiveauMessage.BLOCAGE
def test_message_vide(self):
msg = formatter_erreur_generique("")
assert msg.corps
assert msg.niveau == NiveauMessage.ATTENTION
def test_message_inconnu_tronque(self):
long_msg = "erreur très longue " * 20
msg = formatter_erreur_generique(long_msg)
assert len(msg.corps) <= 200 # tronqué avec "..."
def test_pas_de_code_technique_dans_message_utilisateur(self):
"""Les messages présentés à l'utilisateur ne doivent pas contenir de
noms de variables, de fonctions, ou de types Python."""
msg = formatter_erreur_generique("target_not_found: 'bouton'")
# Le code technique ne doit pas apparaître tel quel dans le corps
assert "target_not_found" not in msg.corps
# ============================================================================
# Tests : hiérarchie NiveauMessage
# ============================================================================
class TestHierarchieNiveau:
"""Tests de la hiérarchie info/attention/blocage."""
def test_niveau_info_duree_courte(self):
msg = formatter_connexion_retablie()
assert msg.niveau == NiveauMessage.INFO
assert msg.duree_s <= 6
def test_niveau_attention_duree_moyenne(self):
msg = formatter_ecran_inchange("click")
assert msg.niveau == NiveauMessage.ATTENTION
assert 5 <= msg.duree_s <= 10
def test_niveau_blocage_duree_longue_persistent(self):
msg = formatter_cible_non_trouvee("x", "y")
assert msg.niveau == NiveauMessage.BLOCAGE
assert msg.duree_s >= 10
assert msg.persistent is True
def test_niveau_info_non_persistent(self):
msg = formatter_debut_workflow("test")
assert msg.persistent is False
def test_to_dict_serialisation(self):
msg = MessageUtilisateur(
niveau=NiveauMessage.INFO,
titre="Test",
corps="Corps",
duree_s=5,
)
d = msg.to_dict()
assert d["niveau"] == "info"
assert d["titre"] == "Test"
assert d["corps"] == "Corps"
assert d["duree_s"] == 5
assert d["persistent"] is False
# ============================================================================
# Tests : NotificationManager (avec plyer mocké)
# ============================================================================
class TestNotificationManager:
"""Tests du NotificationManager avec plyer mocké.
Ces tests ne dépendent pas de l'environnement : plyer est patché pour
qu'on puisse vérifier les appels sans afficher de vraies notifications.
"""
def test_instanciation(self):
mgr = NotificationManager()
assert mgr is not None
def test_notify_sans_plyer(self, monkeypatch):
"""Si plyer n'est pas dispo, notify() retourne False sans crasher."""
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", False)
mgr = NotificationManager()
assert mgr.notify("titre", "message") is False
def test_notify_avec_plyer_mocke(self, monkeypatch):
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
result = mgr.notify("Titre", "Message", timeout=5)
assert result is True
# L'envoi est asynchrone, laissons le thread démarrer
time.sleep(0.1)
mock_plyer.notify.assert_called_once()
def test_rate_limit(self, monkeypatch):
"""Le rate limit bloque les notifications trop rapprochées."""
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
assert mgr.notify("T1", "M1") is True
# Immédiatement après → bloqué
assert mgr.notify("T2", "M2") is False
def test_bypass_rate_limit_pour_blocage(self, monkeypatch):
"""Les messages BLOCAGE bypass le rate limit."""
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
assert mgr.notify("T1", "M1") is True
# Sans bypass → bloqué
assert mgr.notify("T2", "M2") is False
# Avec bypass → passe
assert mgr.notify("T3", "M3", bypass_rate_limit=True) is True
def test_notify_message_niveau_blocage_bypass(self, monkeypatch):
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
# Occuper le rate limit
mgr.notify("T0", "M0")
# Message BLOCAGE doit passer même pendant le rate limit
msg_blocage = formatter_cible_non_trouvee("x", "y")
assert mgr.notify_message(msg_blocage) is True
def test_replay_target_not_found_avec_titre(self, monkeypatch):
"""L'API spécialisée produit un message contenant le nom d'app."""
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
mgr.replay_target_not_found("Rechercher", "Document Bloc-notes")
time.sleep(0.1)
# Vérifier qu'on a bien envoyé un message qui mentionne l'app
args, kwargs = mock_plyer.notify.call_args
message_envoye = kwargs.get("message", "")
assert "bloc-notes" in message_envoye.lower()
assert "rechercher" in message_envoye.lower()
def test_replay_wrong_window(self, monkeypatch):
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
mgr.replay_wrong_window("Program Manager", "Chrome")
time.sleep(0.1)
args, kwargs = mock_plyer.notify.call_args
titre = kwargs.get("title", "")
# Le titre doit indiquer l'attente d'une fenêtre
assert "fenêtre" in titre.lower() or "attend" in titre.lower()
def test_error_route_vers_formatter_specialise(self, monkeypatch):
"""error() détecte 'target_not_found' et produit un message de blocage."""
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
mgr.error("target_not_found: 'bonjour'")
time.sleep(0.1)
mock_plyer.notify.assert_called_once()
args, kwargs = mock_plyer.notify.call_args
# Le message envoyé doit être en français naturel, pas le code brut
message_envoye = kwargs.get("message", "")
assert "target_not_found" not in message_envoye
def test_backward_compat_connection_changed(self, monkeypatch):
"""L'API existante connection_changed reste fonctionnelle."""
mock_plyer = MagicMock()
monkeypatch.setattr(notifications, "_PLYER_AVAILABLE", True)
monkeypatch.setattr(notifications, "_plyer_notification", mock_plyer)
mgr = NotificationManager()
# Déconnexion
mgr.connection_changed(False, "localhost")
time.sleep(0.1)
assert mock_plyer.notify.called
# ============================================================================
# Tests : ActivityPanel (sans tkinter)
# ============================================================================
class TestActivityPanelFallback:
"""Tests du panel d'activité en mode fallback (sans tkinter)."""
def setup_method(self):
reset_activity_panel()
def teardown_method(self):
reset_activity_panel()
def test_creation_sans_ui(self):
"""Le panel peut être créé sans UI (activer_ui=False)."""
panel = ActivityPanel(activer_ui=False)
assert panel is not None
def test_snapshot_initial(self):
panel = ActivityPanel(activer_ui=False)
snap = panel.snapshot()
assert snap.etat == EtatLea.INACTIVE
assert snap.nom_workflow == ""
assert snap.etape == 0
def test_definir_workflow(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", nb_etapes=10)
snap = panel.snapshot()
assert snap.nom_workflow == "Test"
assert snap.nb_etapes == 10
assert snap.etat == EtatLea.OBSERVE
assert snap.debut_timestamp > 0
def test_mettre_a_jour_etape(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.mettre_a_jour(etat=EtatLea.AGIT, action="Clic", etape=3)
snap = panel.snapshot()
assert snap.etat == EtatLea.AGIT
assert snap.action_courante == "Clic"
assert snap.etape == 3
def test_mettre_a_jour_partiel(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.mettre_a_jour(etape=5)
snap = panel.snapshot()
assert snap.etape == 5
# L'état reste OBSERVE (non modifié)
assert snap.etat == EtatLea.OBSERVE
def test_progression_texte(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.mettre_a_jour(etape=3)
snap = panel.snapshot()
assert snap.progression_texte() == "3/10"
def test_progression_texte_sans_nb_etapes(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", nb_etapes=0)
snap = panel.snapshot()
assert snap.progression_texte() == ""
def test_temps_ecoule(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
time.sleep(0.05)
snap = panel.snapshot()
assert snap.temps_ecoule_s() >= 0.05
def test_temps_ecoule_texte_secondes(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
snap = panel.snapshot()
# Format "Xs" pour < 60s
texte = snap.temps_ecoule_texte()
assert texte.endswith("s")
def test_terminer_succes(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.terminer(succes=True)
snap = panel.snapshot()
assert snap.etat == EtatLea.TERMINE
def test_terminer_echec(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.terminer(succes=False)
snap = panel.snapshot()
assert snap.etat == EtatLea.BLOQUEE
assert snap.dernier_message # Un message par défaut est mis
def test_reinitialiser(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Test", 10)
panel.reinitialiser()
snap = panel.snapshot()
assert snap.etat == EtatLea.INACTIVE
assert snap.nom_workflow == ""
def test_listener_appele_sur_changement(self):
panel = ActivityPanel(activer_ui=False)
calls = []
panel.on_change(lambda snap: calls.append(snap.etat))
panel.definir_workflow("Test", 5)
panel.mettre_a_jour(etat=EtatLea.AGIT)
assert EtatLea.OBSERVE in calls
assert EtatLea.AGIT in calls
def test_listener_erreur_nintervient_pas(self):
"""Un listener qui crash ne doit pas casser le panel."""
panel = ActivityPanel(activer_ui=False)
def listener_casse(snap):
raise RuntimeError("boom")
panel.on_change(listener_casse)
# Ne doit pas crasher
panel.definir_workflow("Test", 5)
snap = panel.snapshot()
assert snap.nom_workflow == "Test"
def test_to_dict_serialisation(self):
panel = ActivityPanel(activer_ui=False)
panel.definir_workflow("Ma tâche", 10)
panel.mettre_a_jour(
etat=EtatLea.AGIT,
action="Clic sur Valider",
etape=3,
)
d = panel.snapshot().to_dict()
assert d["nom_workflow"] == "Ma tâche"
assert d["etat"] == "agit"
assert d["etat_libelle"] == "Agit"
assert d["progression"] == "3/10"
assert d["action_courante"] == "Clic sur Valider"
def test_masquer_sans_ui_ne_crash_pas(self):
panel = ActivityPanel(activer_ui=False)
# Doit être no-op sans crasher
panel.masquer()
panel.afficher()
def test_etats_ont_couleurs_et_libelles(self):
"""Vérifier que tous les états ont bien une couleur et un libellé."""
for etat in EtatLea:
assert etat.libelle
assert etat.couleur.startswith("#")
assert etat.code
def test_singleton_global(self):
p1 = activity_panel.get_activity_panel(activer_ui=False)
p2 = activity_panel.get_activity_panel(activer_ui=False)
assert p1 is p2
def test_reset_singleton(self):
p1 = activity_panel.get_activity_panel(activer_ui=False)
activity_panel.reset_activity_panel()
p2 = activity_panel.get_activity_panel(activer_ui=False)
assert p1 is not p2
# ============================================================================
# Tests : intégration executor ↔ notifier
# ============================================================================
class TestExecutorNotifierFallback:
"""Vérifier que le Noop fallback de l'executor couvre toutes les méthodes."""
def test_executor_noop_supporte_toutes_methodes(self):
"""Le fallback _Noop doit répondre à n'importe quelle méthode."""
# Simuler le cas où NotificationManager lève une exception
with patch(
"agent_v0.agent_v1.ui.notifications.NotificationManager",
side_effect=RuntimeError("UI indisponible"),
):
from agent_v0.agent_v1.core.executor import ActionExecutorV1
# Ne pas vraiment instancier (dépendances mss/pynput) — on teste
# la logique du stub en recréant la classe noop inline.
# Test direct du pattern noop utilisé dans executor
class _Noop:
def __getattr__(self, name):
return lambda *a, **kw: False
noop = _Noop()
# Toutes ces méthodes doivent retourner False sans crasher
assert noop.replay_target_not_found("x") is False
assert noop.replay_wrong_window("x", "y") is False
assert noop.replay_no_screen_change("click") is False
assert noop.notify_message(None) is False
assert noop.nimporte_quelle_methode() is False