diff --git a/agent_v0/agent_v1/core/window_info.py b/agent_v0/agent_v1/core/window_info.py
deleted file mode 100644
index 7e6be8744..000000000
--- a/agent_v0/agent_v1/core/window_info.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# window_info.py
-"""
-Récupération des informations sur la fenêtre active (X11).
-
-v0 :
-- utilise xdotool pour obtenir :
- - le titre de la fenêtre active
- - le PID de la fenêtre active, puis le nom du process via ps
-
-Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown".
-"""
-
-from __future__ import annotations
-
-import subprocess
-from typing import Dict, Optional
-
-
-def _run_cmd(cmd: list[str]) -> Optional[str]:
- """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
- try:
- out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
- return out.decode("utf-8", errors="ignore").strip()
- except Exception:
- return None
-
-
-def get_active_window_info() -> Dict[str, str]:
- """
- Renvoie un dict :
- {
- "title": "...",
- "app_name": "..."
- }
-
- Nécessite xdotool installé sur le système.
- """
- title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
- pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
-
- app_name: Optional[str] = None
- if pid_str:
- pid_str = pid_str.strip()
- # On récupère le nom du binaire via ps
- app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
-
- if not title:
- title = "unknown_window"
- if not app_name:
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
diff --git a/agent_v0/agent_v1/core/window_info_crossplatform.py b/agent_v0/agent_v1/core/window_info_crossplatform.py
deleted file mode 100644
index ba059a3fc..000000000
--- a/agent_v0/agent_v1/core/window_info_crossplatform.py
+++ /dev/null
@@ -1,192 +0,0 @@
-# window_info_crossplatform.py
-"""
-Récupération des informations sur la fenêtre active - CROSS-PLATFORM
-
-Supporte:
-- Linux (X11 via xdotool)
-- Windows (via pywin32)
-- macOS (via pyobjc)
-
-Installation des dépendances:
- pip install pywin32 # Windows
- pip install pyobjc-framework-Cocoa # macOS
- pip install psutil # Tous OS
-"""
-
-from __future__ import annotations
-
-import platform
-import subprocess
-from typing import Dict, Optional
-
-
-def _run_cmd(cmd: list[str]) -> Optional[str]:
- """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
- try:
- out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
- return out.decode("utf-8", errors="ignore").strip()
- except Exception:
- return None
-
-
-def get_active_window_info() -> Dict[str, str]:
- """
- Renvoie un dict :
- {
- "title": "...",
- "app_name": "..."
- }
-
- Détecte automatiquement l'OS et utilise la méthode appropriée.
- """
- system = platform.system()
-
- if system == "Linux":
- return _get_window_info_linux()
- elif system == "Windows":
- return _get_window_info_windows()
- elif system == "Darwin": # macOS
- return _get_window_info_macos()
- else:
- return {"title": "unknown_window", "app_name": "unknown_app"}
-
-
-def _get_window_info_linux() -> Dict[str, str]:
- """
- Linux: utilise xdotool (X11)
-
- Nécessite: sudo apt-get install xdotool
- """
- title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
- pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
-
- app_name: Optional[str] = None
- if pid_str:
- pid_str = pid_str.strip()
- # On récupère le nom du binaire via ps
- app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
-
- if not title:
- title = "unknown_window"
- if not app_name:
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
-
-def _get_window_info_windows() -> Dict[str, str]:
- """
- Windows: utilise pywin32 + psutil
-
- Nécessite: pip install pywin32 psutil
- """
- try:
- import win32gui
- import win32process
- import psutil
-
- # Fenêtre au premier plan
- hwnd = win32gui.GetForegroundWindow()
-
- # Titre de la fenêtre
- title = win32gui.GetWindowText(hwnd)
- if not title:
- title = "unknown_window"
-
- # PID du processus
- _, pid = win32process.GetWindowThreadProcessId(hwnd)
-
- # Nom du processus
- try:
- process = psutil.Process(pid)
- app_name = process.name()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
- except ImportError:
- # pywin32 ou psutil non installé
- return {
- "title": "unknown_window (pywin32 missing)",
- "app_name": "unknown_app (pywin32 missing)",
- }
- except Exception as e:
- return {
- "title": f"error: {e}",
- "app_name": "unknown_app",
- }
-
-
-def _get_window_info_macos() -> Dict[str, str]:
- """
- macOS: utilise pyobjc (AppKit)
-
- Nécessite: pip install pyobjc-framework-Cocoa
-
- Note: Nécessite les permissions "Accessibility" dans System Preferences
- """
- try:
- from AppKit import NSWorkspace
- from Quartz import (
- CGWindowListCopyWindowInfo,
- kCGWindowListOptionOnScreenOnly,
- kCGNullWindowID
- )
-
- # Application active
- active_app = NSWorkspace.sharedWorkspace().activeApplication()
- app_name = active_app.get('NSApplicationName', 'unknown_app')
-
- # Titre de la fenêtre (via Quartz)
- # On cherche la fenêtre de l'app active qui est au premier plan
- window_list = CGWindowListCopyWindowInfo(
- kCGWindowListOptionOnScreenOnly,
- kCGNullWindowID
- )
-
- title = "unknown_window"
- for window in window_list:
- owner_name = window.get('kCGWindowOwnerName', '')
- if owner_name == app_name:
- window_title = window.get('kCGWindowName', '')
- if window_title:
- title = window_title
- break
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
- except ImportError:
- # pyobjc non installé
- return {
- "title": "unknown_window (pyobjc missing)",
- "app_name": "unknown_app (pyobjc missing)",
- }
- except Exception as e:
- return {
- "title": f"error: {e}",
- "app_name": "unknown_app",
- }
-
-
-# Test rapide
-if __name__ == "__main__":
- import time
-
- print(f"OS détecté: {platform.system()}")
- print("\nTest de capture fenêtre active (5 secondes)...")
- print("Changez de fenêtre pour tester!\n")
-
- for i in range(5):
- info = get_active_window_info()
- print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}")
- time.sleep(1)
diff --git a/agent_v0/lea_ui/__init__.py b/agent_v0/lea_ui/__init__.py
index b8fb82ffd..f4463cbba 100644
--- a/agent_v0/lea_ui/__init__.py
+++ b/agent_v0/lea_ui/__init__.py
@@ -1,13 +1,6 @@
-# agent_v0.lea_ui — Interface utilisateur "Lea"
+# agent_v0.lea_ui — Communication serveur pour l'agent Léa
#
-# Panneau PyQt5 integre qui remplace le system tray + navigateur web
-# par une interface unifiee pour piloter l'Agent RPA Vision V3.
-#
-# Composants :
-# - LeaMainWindow : fenetre principale ancree a droite
-# - ChatWidget : zone de conversation avec le serveur
-# - OverlayWidget : feedback visuel pendant le replay
+# Composant :
# - LeaServerClient : client API vers le serveur Linux
-# - styles : theme et couleurs
__version__ = "0.1.0"
diff --git a/agent_v0/lea_ui/__main__.py b/agent_v0/lea_ui/__main__.py
deleted file mode 100644
index 875a978ab..000000000
--- a/agent_v0/lea_ui/__main__.py
+++ /dev/null
@@ -1,6 +0,0 @@
-# agent_v0/lea_ui/__main__.py
-"""Permet le lancement via: python -m agent_v0.lea_ui"""
-
-from .launcher import main
-
-main()
diff --git a/agent_v0/lea_ui/chat_widget.py b/agent_v0/lea_ui/chat_widget.py
deleted file mode 100644
index f66b68669..000000000
--- a/agent_v0/lea_ui/chat_widget.py
+++ /dev/null
@@ -1,250 +0,0 @@
-# agent_v0/lea_ui/chat_widget.py
-"""
-Widget de chat pour l'interface Lea.
-
-Affiche les messages avec des bulles :
- - Utilisateur a droite (fond indigo)
- - Lea a gauche (fond blanc)
-
-Communique avec le serveur Linux via LeaServerClient.
-"""
-
-from __future__ import annotations
-
-import logging
-from typing import List, Optional
-
-from PyQt5.QtCore import (
- QPropertyAnimation,
- QSize,
- Qt,
- QTimer,
- pyqtSignal,
- pyqtSlot,
-)
-from PyQt5.QtGui import QColor, QFont, QPainter, QPainterPath, QPen
-from PyQt5.QtWidgets import (
- QFrame,
- QHBoxLayout,
- QLabel,
- QLineEdit,
- QPushButton,
- QScrollArea,
- QSizePolicy,
- QVBoxLayout,
- QWidget,
-)
-
-from . import styles
-
-logger = logging.getLogger("lea_ui.chat")
-
-
-class ChatBubble(QFrame):
- """Bulle de message individuelle."""
-
- def __init__(
- self,
- text: str,
- is_user: bool = False,
- parent: Optional[QWidget] = None,
- ) -> None:
- super().__init__(parent)
- self._is_user = is_user
-
- # Style de la bulle
- if is_user:
- bg_color = styles.COLOR_BUBBLE_USER
- text_color = styles.COLOR_TEXT_ON_ACCENT
- align = Qt.AlignRight
- else:
- bg_color = styles.COLOR_BUBBLE_LEA
- text_color = styles.COLOR_TEXT
- align = Qt.AlignLeft
-
- self.setStyleSheet(f"""
- QFrame {{
- background-color: {bg_color};
- border-radius: {styles.BUBBLE_RADIUS}px;
- padding: {styles.PADDING}px;
- border: {"none" if is_user else f"1px solid {styles.COLOR_BORDER}"};
- }}
- """)
-
- layout = QVBoxLayout(self)
- layout.setContentsMargins(
- styles.PADDING, styles.PADDING // 2,
- styles.PADDING, styles.PADDING // 2,
- )
-
- label = QLabel(text)
- label.setWordWrap(True)
- label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_NORMAL))
- label.setStyleSheet(f"color: {text_color}; background: transparent; border: none;")
- label.setTextFormat(Qt.RichText)
- label.setOpenExternalLinks(True)
- layout.addWidget(label)
-
- self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
- self.setMaximumWidth(280)
-
-
-class ChatWidget(QWidget):
- """Widget de chat complet avec zone de messages et champ de saisie.
-
- Signals :
- message_sent(str) : emis quand l'utilisateur envoie un message
- """
-
- message_sent = pyqtSignal(str)
-
- def __init__(self, parent: Optional[QWidget] = None) -> None:
- super().__init__(parent)
- self._messages: List[dict] = []
- self._setup_ui()
-
- def _setup_ui(self) -> None:
- layout = QVBoxLayout(self)
- layout.setContentsMargins(0, 0, 0, 0)
- layout.setSpacing(0)
-
- # Zone de messages (scrollable)
- self._scroll_area = QScrollArea()
- self._scroll_area.setWidgetResizable(True)
- self._scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
- self._scroll_area.setStyleSheet(styles.CHAT_AREA_STYLE)
-
- self._messages_container = QWidget()
- self._messages_container.setObjectName("ChatContainer")
- self._messages_layout = QVBoxLayout(self._messages_container)
- self._messages_layout.setContentsMargins(
- styles.PADDING, styles.PADDING,
- styles.PADDING, styles.PADDING,
- )
- self._messages_layout.setSpacing(styles.SPACING)
- self._messages_layout.addStretch()
-
- self._scroll_area.setWidget(self._messages_container)
- layout.addWidget(self._scroll_area, stretch=1)
-
- # Separateur
- sep = QFrame()
- sep.setFrameShape(QFrame.HLine)
- sep.setStyleSheet(f"background-color: {styles.COLOR_BORDER}; max-height: 1px;")
- layout.addWidget(sep)
-
- # Zone de saisie
- input_layout = QHBoxLayout()
- input_layout.setContentsMargins(
- styles.PADDING, styles.SPACING,
- styles.PADDING, styles.SPACING,
- )
- input_layout.setSpacing(styles.SPACING)
-
- self._input = QLineEdit()
- self._input.setObjectName("ChatInput")
- self._input.setPlaceholderText("Ecrivez un message...")
- self._input.setStyleSheet(styles.INPUT_STYLE)
- self._input.returnPressed.connect(self._on_send)
- input_layout.addWidget(self._input, stretch=1)
-
- self._send_btn = QPushButton("Envoyer")
- self._send_btn.setObjectName("SendButton")
- self._send_btn.setStyleSheet(styles.SEND_BUTTON_STYLE)
- self._send_btn.setCursor(Qt.PointingHandCursor)
- self._send_btn.clicked.connect(self._on_send)
- input_layout.addWidget(self._send_btn)
-
- layout.addLayout(input_layout)
-
- def _on_send(self) -> None:
- """Envoyer le message saisi."""
- text = self._input.text().strip()
- if not text:
- return
-
- self._input.clear()
- self.add_user_message(text)
- self.message_sent.emit(text)
-
- # ---------------------------------------------------------------------------
- # API publique
- # ---------------------------------------------------------------------------
-
- def add_user_message(self, text: str) -> None:
- """Ajouter un message utilisateur (bulle a droite)."""
- self._add_bubble(text, is_user=True)
-
- def add_lea_message(self, text: str) -> None:
- """Ajouter un message de Lea (bulle a gauche)."""
- self._add_bubble(text, is_user=False)
-
- def add_system_message(self, text: str) -> None:
- """Ajouter un message systeme (centre, discret)."""
- label = QLabel(text)
- label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL))
- label.setStyleSheet(
- f"color: {styles.COLOR_TEXT_SECONDARY}; "
- f"background: transparent; padding: 4px;"
- )
- label.setAlignment(Qt.AlignCenter)
- label.setWordWrap(True)
-
- # Inserer avant le stretch final
- count = self._messages_layout.count()
- self._messages_layout.insertWidget(count - 1, label)
- self._scroll_to_bottom()
-
- def set_input_enabled(self, enabled: bool) -> None:
- """Activer/desactiver la saisie (pendant le chargement)."""
- self._input.setEnabled(enabled)
- self._send_btn.setEnabled(enabled)
- if not enabled:
- self._input.setPlaceholderText("Lea reflechit...")
- else:
- self._input.setPlaceholderText("Ecrivez un message...")
-
- def clear_messages(self) -> None:
- """Effacer tous les messages."""
- while self._messages_layout.count() > 1:
- item = self._messages_layout.takeAt(0)
- widget = item.widget()
- if widget:
- widget.deleteLater()
- self._messages = []
-
- # ---------------------------------------------------------------------------
- # Internals
- # ---------------------------------------------------------------------------
-
- def _add_bubble(self, text: str, is_user: bool) -> None:
- """Ajouter une bulle au conteneur de messages."""
- bubble = ChatBubble(text, is_user=is_user)
-
- # Conteneur d'alignement
- row = QHBoxLayout()
- row.setContentsMargins(0, 0, 0, 0)
- if is_user:
- row.addStretch()
- row.addWidget(bubble)
- else:
- row.addWidget(bubble)
- row.addStretch()
-
- # Inserer avant le stretch final
- count = self._messages_layout.count()
- wrapper = QWidget()
- wrapper.setLayout(row)
- wrapper.setStyleSheet("background: transparent;")
- self._messages_layout.insertWidget(count - 1, wrapper)
-
- self._messages.append({"text": text, "is_user": is_user})
- self._scroll_to_bottom()
-
- def _scroll_to_bottom(self) -> None:
- """Scroller vers le bas apres l'ajout d'un message."""
- QTimer.singleShot(50, lambda: (
- self._scroll_area.verticalScrollBar().setValue(
- self._scroll_area.verticalScrollBar().maximum()
- )
- ))
diff --git a/agent_v0/lea_ui/launcher.py b/agent_v0/lea_ui/launcher.py
deleted file mode 100644
index 786b0183f..000000000
--- a/agent_v0/lea_ui/launcher.py
+++ /dev/null
@@ -1,218 +0,0 @@
-# agent_v0/lea_ui/launcher.py
-"""
-Point d'entree pour le panneau Lea.
-
-Lancement autonome :
- python -m agent_v0.lea_ui.launcher
-
-Ou integre dans agent_v0/agent_v1/main.py avec flag --ui lea.
-
-Ce module :
- 1. Cree l'application Qt
- 2. Instancie LeaServerClient
- 3. Instancie LeaMainWindow
- 4. Enregistre un raccourci global (Ctrl+Shift+L) via keyboard hook
- 5. Lance la boucle Qt
-"""
-
-from __future__ import annotations
-
-import argparse
-import logging
-import os
-import sys
-from typing import Optional
-
-logger = logging.getLogger("lea_ui.launcher")
-
-
-def _setup_logging(verbose: bool = False) -> None:
- """Configurer le logging pour le panneau Lea."""
- level = logging.DEBUG if verbose else logging.INFO
- logging.basicConfig(
- level=level,
- format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
- datefmt="%H:%M:%S",
- )
-
-
-def _setup_global_hotkey(window) -> Optional[object]:
- """Enregistrer le raccourci global Ctrl+Shift+L pour afficher/cacher le panneau.
-
- Utilise la librairie keyboard si disponible (Windows/Linux).
- Retourne le hook pour pouvoir le desinscrire a l'arret.
- """
- try:
- import keyboard
-
- def on_hotkey():
- # Appeler toggle_visibility dans le thread Qt
- from PyQt5.QtCore import QTimer
- QTimer.singleShot(0, window.toggle_visibility)
-
- keyboard.add_hotkey("ctrl+shift+l", on_hotkey)
- logger.info("Raccourci global Ctrl+Shift+L enregistre")
- return True
- except ImportError:
- logger.info(
- "Librairie 'keyboard' non disponible — "
- "raccourci global Ctrl+Shift+L non enregistre. "
- "Installez-la avec: pip install keyboard"
- )
- return None
- except Exception as e:
- logger.warning("Impossible d'enregistrer le raccourci global : %s", e)
- return None
-
-
-def _load_environment() -> None:
- """Charger les variables d'environnement depuis .env.local."""
- env_paths = [
- os.path.join(os.path.dirname(__file__), "..", "..", ".env.local"),
- os.path.join(os.path.dirname(__file__), "..", ".env.local"),
- ]
- for env_path in env_paths:
- env_path = os.path.abspath(env_path)
- if os.path.exists(env_path):
- try:
- from dotenv import load_dotenv
- load_dotenv(env_path)
- logger.info("Variables d'environnement chargees depuis %s", env_path)
- return
- except ImportError:
- # Fallback : chargement manuel
- with open(env_path, "r", encoding="utf-8") as f:
- for line in f:
- line = line.strip()
- if line and not line.startswith("#") and "=" in line:
- key, value = line.split("=", 1)
- value = value.strip("\"'")
- os.environ[key.strip()] = value
- logger.info("Variables chargees manuellement depuis %s", env_path)
- return
-
-
-def launch_lea(
- server_host: Optional[str] = None,
- chat_port: int = 5004,
- stream_port: int = 5005,
- verbose: bool = False,
- session_id: Optional[str] = None,
-) -> None:
- """Lancer le panneau Lea.
-
- Args:
- server_host: adresse du serveur Linux (None = auto-detection)
- chat_port: port du serveur chat
- stream_port: port du serveur streaming
- verbose: mode debug
- session_id: identifiant de session pour le polling replay
- """
- _setup_logging(verbose)
- _load_environment()
-
- # Import PyQt5 ici pour un message d'erreur clair si absent
- try:
- from PyQt5.QtWidgets import QApplication
- from PyQt5.QtCore import Qt
- except ImportError:
- logger.error(
- "PyQt5 n'est pas installe. Installez-le avec :\n"
- " pip install PyQt5"
- )
- sys.exit(1)
-
- from .server_client import LeaServerClient
- from .main_window import LeaMainWindow
-
- # Creer ou recuperer l'application Qt
- app = QApplication.instance()
- if app is None:
- app = QApplication(sys.argv)
- app.setQuitOnLastWindowClosed(False)
-
- # Client serveur
- client = LeaServerClient(
- server_host=server_host,
- chat_port=chat_port,
- stream_port=stream_port,
- )
-
- # Fenetre principale
- window = LeaMainWindow(server_client=client)
- window.show()
-
- # Raccourci global
- hotkey = _setup_global_hotkey(window)
-
- # Polling replay (si session_id fourni)
- if session_id:
- client.start_polling(session_id)
-
- logger.info(
- "Panneau Lea demarre — serveur=%s, chat_port=%d, stream_port=%d",
- client.server_host, chat_port, stream_port,
- )
-
- # Boucle Qt
- try:
- exit_code = app.exec_()
- finally:
- window.shutdown()
- if hotkey:
- try:
- import keyboard
- keyboard.unhook_all()
- except Exception:
- pass
-
- sys.exit(exit_code)
-
-
-def main() -> None:
- """Point d'entree CLI."""
- parser = argparse.ArgumentParser(
- description="Panneau Lea — Interface utilisateur RPA Vision V3",
- )
- parser.add_argument(
- "--server", "-s",
- dest="server_host",
- default=None,
- help="Adresse du serveur Linux (defaut: RPA_SERVER_HOST ou localhost)",
- )
- parser.add_argument(
- "--chat-port",
- type=int,
- default=5004,
- help="Port du serveur chat (defaut: 5004)",
- )
- parser.add_argument(
- "--stream-port",
- type=int,
- default=5005,
- help="Port du serveur streaming (defaut: 5005)",
- )
- parser.add_argument(
- "--session-id",
- default=None,
- help="Identifiant de session pour le polling replay",
- )
- parser.add_argument(
- "--verbose", "-v",
- action="store_true",
- help="Mode debug (logs verbeux)",
- )
-
- args = parser.parse_args()
-
- launch_lea(
- server_host=args.server_host,
- chat_port=args.chat_port,
- stream_port=args.stream_port,
- verbose=args.verbose,
- session_id=args.session_id,
- )
-
-
-if __name__ == "__main__":
- main()
diff --git a/agent_v0/lea_ui/main_window.py b/agent_v0/lea_ui/main_window.py
deleted file mode 100644
index ed5055275..000000000
--- a/agent_v0/lea_ui/main_window.py
+++ /dev/null
@@ -1,772 +0,0 @@
-# agent_v0/lea_ui/main_window.py
-"""
-Fenetre principale du panneau Lea.
-
-Panneau semi-transparent, ancre a droite de l'ecran, toujours visible.
-Peut etre reduit en mini-barre flottante (avatar + indicateur status).
-
-Sections :
- - Header : avatar "L" + status connexion
- - Zone de chat : messages entrants/sortants (natif PyQt5)
- - Zone de status : progression du replay
- - Boutons rapides : "Apprends-moi", "Que sais-tu faire ?"
-"""
-
-from __future__ import annotations
-
-import logging
-from typing import Dict, Any, Optional
-
-from PyQt5.QtCore import (
- QPoint,
- QPropertyAnimation,
- QRect,
- QSize,
- Qt,
- QTimer,
- pyqtSignal,
- pyqtSlot,
-)
-from PyQt5.QtGui import (
- QColor,
- QFont,
- QIcon,
- QKeySequence,
- QPainter,
- QPainterPath,
- QPen,
-)
-from PyQt5.QtWidgets import (
- QAction,
- QApplication,
- QDesktopWidget,
- QFrame,
- QGraphicsDropShadowEffect,
- QHBoxLayout,
- QLabel,
- QProgressBar,
- QPushButton,
- QShortcut,
- QSizePolicy,
- QVBoxLayout,
- QWidget,
-)
-
-from . import styles
-from .chat_widget import ChatWidget
-from .overlay import OverlayWidget
-from .server_client import LeaServerClient
-
-logger = logging.getLogger("lea_ui.main_window")
-
-
-class LeaAvatar(QWidget):
- """Avatar rond avec l'initiale 'L'."""
-
- def __init__(self, size: int = 40, parent: Optional[QWidget] = None) -> None:
- super().__init__(parent)
- self._size = size
- self._connected = False
- self.setFixedSize(size, size)
-
- def set_connected(self, connected: bool) -> None:
- self._connected = connected
- self.update()
-
- def paintEvent(self, event) -> None: # noqa: N802
- painter = QPainter(self)
- painter.setRenderHint(QPainter.Antialiasing, True)
-
- # Cercle de fond
- painter.setBrush(QColor(styles.COLOR_ACCENT))
- painter.setPen(Qt.NoPen)
- painter.drawEllipse(2, 2, self._size - 4, self._size - 4)
-
- # Initiale "L"
- painter.setPen(QColor(styles.COLOR_TEXT_ON_ACCENT))
- font = QFont(styles.FONT_FAMILY, self._size // 3, QFont.Bold)
- painter.setFont(font)
- painter.drawText(
- QRect(0, 0, self._size, self._size),
- Qt.AlignCenter,
- "L",
- )
-
- # Indicateur de connexion (petit cercle en bas a droite)
- indicator_size = 12
- ix = self._size - indicator_size - 1
- iy = self._size - indicator_size - 1
- indicator_color = (
- QColor(styles.COLOR_SUCCESS) if self._connected
- else QColor(styles.COLOR_ERROR)
- )
- painter.setBrush(indicator_color)
- painter.setPen(QPen(QColor(styles.COLOR_BG), 2))
- painter.drawEllipse(ix, iy, indicator_size, indicator_size)
-
- painter.end()
-
-
-class LeaMainWindow(QWidget):
- """Panneau principal de l'interface Lea.
-
- Fenetre semi-transparente, ancree a droite de l'ecran.
- Peut basculer en mode mini-barre.
- """
-
- # Signal pour les actions de replay a afficher sur l'overlay
- replay_action_received = pyqtSignal(dict)
-
- def __init__(
- self,
- server_client: Optional[LeaServerClient] = None,
- parent: Optional[QWidget] = None,
- ) -> None:
- super().__init__(parent)
-
- # Client serveur
- self._client = server_client or LeaServerClient()
-
- # Overlay de feedback
- self._overlay = OverlayWidget()
-
- # Mode courant
- self._minimized = False
-
- # Setup
- self._setup_window()
- self._setup_ui()
- self._setup_shortcuts()
- self._connect_signals()
- self._start_connection_check()
-
- # Message d'accueil
- QTimer.singleShot(500, self._show_welcome)
-
- # ---------------------------------------------------------------------------
- # Setup
- # ---------------------------------------------------------------------------
-
- def _setup_window(self) -> None:
- """Configurer les proprietes de la fenetre."""
- self.setWindowFlags(
- Qt.WindowStaysOnTopHint
- | Qt.FramelessWindowHint
- | Qt.Tool
- )
- self.setAttribute(Qt.WA_TranslucentBackground, True)
- self.setObjectName("LeaMainWindow")
-
- # Dimensions et position (ancre a droite)
- self.setFixedWidth(styles.PANEL_WIDTH)
- self.setMinimumHeight(styles.PANEL_MIN_HEIGHT)
- self._anchor_to_right()
-
- # Ombre portee
- shadow = QGraphicsDropShadowEffect()
- shadow.setBlurRadius(20)
- shadow.setColor(QColor(0, 0, 0, 60))
- shadow.setOffset(0, 4)
- self.setGraphicsEffect(shadow)
-
- def _anchor_to_right(self) -> None:
- """Positionner le panneau ancre a droite de l'ecran."""
- desktop = QApplication.desktop()
- if desktop:
- screen_rect = desktop.availableGeometry(desktop.primaryScreen())
- x = screen_rect.right() - styles.PANEL_WIDTH - 10
- y = screen_rect.top() + 40
- height = screen_rect.height() - 80
- self.setGeometry(x, y, styles.PANEL_WIDTH, height)
-
- def _setup_ui(self) -> None:
- """Construire l'interface du panneau."""
- # Conteneur principal avec fond et coins arrondis
- self._main_layout = QVBoxLayout(self)
- self._main_layout.setContentsMargins(0, 0, 0, 0)
- self._main_layout.setSpacing(0)
-
- # Widget de fond (pour appliquer le style)
- self._bg_widget = QWidget()
- self._bg_widget.setObjectName("LeaPanelBg")
- self._bg_widget.setStyleSheet(f"""
- QWidget#LeaPanelBg {{
- background-color: {styles.COLOR_BG};
- border-radius: {styles.BORDER_RADIUS}px;
- border: 1px solid {styles.COLOR_BORDER};
- }}
- """)
-
- bg_layout = QVBoxLayout(self._bg_widget)
- bg_layout.setContentsMargins(0, 0, 0, 0)
- bg_layout.setSpacing(0)
-
- # --- Header ---
- self._header = self._create_header()
- bg_layout.addWidget(self._header)
-
- # --- Chat ---
- self._chat = ChatWidget()
- bg_layout.addWidget(self._chat, stretch=1)
-
- # --- Zone de status replay ---
- self._status_bar = self._create_status_bar()
- bg_layout.addWidget(self._status_bar)
-
- # --- Boutons rapides ---
- self._quick_buttons = self._create_quick_buttons()
- bg_layout.addWidget(self._quick_buttons)
-
- self._main_layout.addWidget(self._bg_widget)
-
- # --- Mini-barre (cachee par defaut) ---
- self._mini_bar = self._create_mini_bar()
- self._mini_bar.hide()
- self._main_layout.addWidget(self._mini_bar)
-
- def _create_header(self) -> QWidget:
- """Creer le header avec avatar et status."""
- header = QWidget()
- header.setObjectName("LeaHeader")
- header.setStyleSheet(styles.HEADER_STYLE)
- header.setFixedHeight(60)
-
- layout = QHBoxLayout(header)
- layout.setContentsMargins(
- styles.PADDING, styles.SPACING,
- styles.PADDING, styles.SPACING,
- )
-
- # Avatar
- self._avatar = LeaAvatar(styles.AVATAR_SIZE)
- layout.addWidget(self._avatar)
-
- # Titre + status
- text_layout = QVBoxLayout()
- text_layout.setSpacing(2)
-
- title = QLabel("Lea")
- title.setObjectName("LeaTitle")
- title.setStyleSheet(styles.HEADER_STYLE)
- text_layout.addWidget(title)
-
- self._status_label = QLabel("Connexion...")
- self._status_label.setObjectName("LeaStatus")
- self._status_label.setStyleSheet(styles.HEADER_STYLE)
- text_layout.addWidget(self._status_label)
-
- layout.addLayout(text_layout, stretch=1)
-
- # Bouton reduire
- minimize_btn = QPushButton("_")
- minimize_btn.setFixedSize(30, 30)
- minimize_btn.setCursor(Qt.PointingHandCursor)
- minimize_btn.setStyleSheet(f"""
- QPushButton {{
- background: transparent;
- color: {styles.COLOR_TEXT_SECONDARY};
- border: none;
- border-radius: 15px;
- font-size: 16px;
- font-weight: bold;
- }}
- QPushButton:hover {{
- background-color: {styles.COLOR_BORDER};
- }}
- """)
- minimize_btn.clicked.connect(self.toggle_minimize)
- layout.addWidget(minimize_btn)
-
- return header
-
- def _create_status_bar(self) -> QWidget:
- """Creer la barre de status du replay."""
- container = QWidget()
- container.setFixedHeight(50)
- layout = QVBoxLayout(container)
- layout.setContentsMargins(
- styles.PADDING, styles.SPACING,
- styles.PADDING, styles.SPACING,
- )
- layout.setSpacing(4)
-
- self._replay_label = QLabel("")
- self._replay_label.setObjectName("StatusLabel")
- self._replay_label.setStyleSheet(styles.STATUS_LABEL_STYLE)
- self._replay_label.hide()
- layout.addWidget(self._replay_label)
-
- self._progress_bar = QProgressBar()
- self._progress_bar.setStyleSheet(styles.PROGRESS_STYLE)
- self._progress_bar.setTextVisible(False)
- self._progress_bar.hide()
- layout.addWidget(self._progress_bar)
-
- container.hide()
- self._status_container = container
- return container
-
- def _create_quick_buttons(self) -> QWidget:
- """Creer les boutons d'action rapide."""
- container = QWidget()
- layout = QHBoxLayout(container)
- layout.setContentsMargins(
- styles.PADDING, styles.SPACING,
- styles.PADDING, styles.PADDING,
- )
- layout.setSpacing(styles.SPACING)
-
- btn_learn = QPushButton("Apprends-moi")
- btn_learn.setObjectName("QuickButton")
- btn_learn.setStyleSheet(styles.QUICK_BUTTON_STYLE)
- btn_learn.setCursor(Qt.PointingHandCursor)
- btn_learn.clicked.connect(self._on_learn_clicked)
- layout.addWidget(btn_learn)
-
- btn_list = QPushButton("Que sais-tu faire ?")
- btn_list.setObjectName("QuickButton")
- btn_list.setStyleSheet(styles.QUICK_BUTTON_STYLE)
- btn_list.setCursor(Qt.PointingHandCursor)
- btn_list.clicked.connect(self._on_list_clicked)
- layout.addWidget(btn_list)
-
- return container
-
- def _create_mini_bar(self) -> QWidget:
- """Creer la mini-barre flottante (mode reduit)."""
- bar = QWidget()
- bar.setObjectName("MiniBar")
- bar.setStyleSheet(styles.MINI_BAR_STYLE)
- bar.setFixedSize(80, 50)
-
- layout = QHBoxLayout(bar)
- layout.setContentsMargins(8, 4, 8, 4)
-
- mini_avatar = LeaAvatar(32)
- self._mini_avatar = mini_avatar
- layout.addWidget(mini_avatar)
-
- expand_btn = QPushButton(">")
- expand_btn.setFixedSize(24, 24)
- expand_btn.setCursor(Qt.PointingHandCursor)
- expand_btn.setStyleSheet(f"""
- QPushButton {{
- background: transparent;
- color: {styles.COLOR_TEXT_SECONDARY};
- border: none;
- font-size: 14px;
- font-weight: bold;
- }}
- QPushButton:hover {{
- color: {styles.COLOR_ACCENT};
- }}
- """)
- expand_btn.clicked.connect(self.toggle_minimize)
- layout.addWidget(expand_btn)
-
- return bar
-
- def _setup_shortcuts(self) -> None:
- """Configurer les raccourcis globaux."""
- # Ctrl+Shift+L pour afficher/cacher
- # Note : Sur Windows, les raccourcis globaux necessitent
- # un mecanisme supplementaire (keyboard hook). Ici on utilise
- # le raccourci local qui fonctionne quand le panneau a le focus.
- # Un hook global sera ajoute dans le launcher.
- shortcut = QShortcut(QKeySequence("Ctrl+Shift+L"), self)
- shortcut.activated.connect(self.toggle_visibility)
-
- def _connect_signals(self) -> None:
- """Connecter les signaux internes."""
- # Chat
- self._chat.message_sent.connect(self._on_message_sent)
-
- # Client serveur
- self._client.set_on_connection_change(self._on_connection_changed)
- self._client.set_on_replay_action(self._on_replay_action)
-
- # Overlay
- self._overlay.action_display_finished.connect(self._on_overlay_finished)
-
- # Replay via signal (thread-safe)
- self.replay_action_received.connect(self._handle_replay_action)
-
- def _start_connection_check(self) -> None:
- """Demarrer le timer de verification de connexion."""
- self._conn_timer = QTimer(self)
- self._conn_timer.timeout.connect(self._check_connection)
- self._conn_timer.start(10000) # Toutes les 10 secondes
- # Premiere verification immediatement
- QTimer.singleShot(1000, self._check_connection)
-
- # ---------------------------------------------------------------------------
- # Actions
- # ---------------------------------------------------------------------------
-
- def _show_welcome(self) -> None:
- """Afficher le message d'accueil."""
- self._chat.add_lea_message(
- "Bonjour ! Je suis Lea, votre assistante RPA.
"
- "Je peux apprendre vos taches, les rejouer, "
- "et vous montrer ce que je fais.
"
- "Que souhaitez-vous faire ?"
- )
-
- @pyqtSlot(str)
- def _on_message_sent(self, message: str) -> None:
- """Traiter un message envoye par l'utilisateur."""
- self._chat.set_input_enabled(False)
-
- # Envoyer au serveur dans un timer pour ne pas bloquer
- QTimer.singleShot(100, lambda: self._send_to_server(message))
-
- def _send_to_server(self, message: str) -> None:
- """Envoyer le message au serveur et afficher la reponse."""
- response = self._client.send_chat_message(message)
-
- if response is None:
- self._chat.add_lea_message(
- "Je n'arrive pas a joindre le serveur. "
- "Verifiez que le serveur Linux est demarre."
- )
- elif "error" in response:
- self._chat.add_lea_message(
- f"Erreur : {response['error']}"
- )
- else:
- # Extraire la reponse textuelle
- reply_text = response.get("response", "")
- if not reply_text:
- # Construire une reponse a partir des donnees structurees
- reply_text = self._format_response(response)
-
- self._chat.add_lea_message(reply_text)
-
- # Si un workflow a ete lance, mettre a jour la status bar
- if response.get("success") and response.get("workflow"):
- self._show_replay_status(
- f"Execution : {response['workflow']}",
- 0, 1,
- )
-
- self._chat.set_input_enabled(True)
-
- def _format_response(self, data: Dict[str, Any]) -> str:
- """Formater une reponse structuree du serveur en texte lisible."""
- # Reponse de confirmation
- if data.get("needs_confirmation"):
- conf = data.get("confirmation", {})
- return (
- f"Voulez-vous que j'execute {conf.get('workflow_name', '?')} ?
"
- f"Risque : {conf.get('risk_level', 'normal')}
"
- "Repondez oui ou non."
- )
-
- # Liste de workflows
- if "workflows" in data:
- workflows = data["workflows"]
- if not workflows:
- return "Je ne connais aucun workflow pour le moment."
- items = []
- for wf in workflows[:10]:
- name = wf.get("name", wf.get("id", "?"))
- desc = wf.get("description", "")
- items.append(f"- {name}{': ' + desc if desc else ''}")
- result = "Voici ce que je sais faire :
" + "
".join(items)
- if len(workflows) > 10:
- result += f"
... et {len(workflows) - 10} autres"
- return result
-
- # Workflow non trouve
- if data.get("not_found"):
- return (
- f"Je ne trouve pas de workflow correspondant a "
- f"'{data.get('query', '?')}'.
"
- "Essayez 'Que sais-tu faire ?' pour voir la liste."
- )
-
- # Execution reussie
- if data.get("success"):
- return (
- f"C'est parti ! J'execute {data.get('workflow', '?')}.
"
- "Regardez l'ecran, je vais vous montrer ce que je fais."
- )
-
- # Confirmation/refus
- if data.get("confirmed"):
- return f"D'accord, je lance {data.get('workflow', '?')} !"
- if data.get("denied"):
- return "Pas de probleme, j'annule."
-
- # Fallback
- return str(data)
-
- def _on_learn_clicked(self) -> None:
- """Action du bouton 'Apprends-moi'."""
- self._chat.add_user_message("Apprends-moi une nouvelle tache")
- self._chat.add_lea_message(
- "D'accord ! Pour m'apprendre une tache :
"
- "1. Cliquez sur Demarrer dans le tray Agent V1
"
- "2. Effectuez votre tache normalement
"
- "3. Cliquez sur Terminer quand c'est fini
"
- "Je vais observer et apprendre automatiquement."
- )
-
- def _on_list_clicked(self) -> None:
- """Action du bouton 'Que sais-tu faire ?'."""
- self._chat.add_user_message("Que sais-tu faire ?")
- self._chat.set_input_enabled(False)
- QTimer.singleShot(100, self._fetch_workflows)
-
- def _fetch_workflows(self) -> None:
- """Recuperer et afficher la liste des workflows."""
- workflows = self._client.list_workflows()
- if workflows:
- items = []
- for wf in workflows[:15]:
- name = wf.get("name", wf.get("id", "?"))
- desc = wf.get("description", "")
- items.append(f"- {name}{': ' + desc if desc else ''}")
- text = "Voici les workflows que je connais :
" + "
".join(items)
- if len(workflows) > 15:
- text += f"
... et {len(workflows) - 15} autres"
- else:
- text = (
- "Je ne connais aucun workflow pour le moment.
"
- "Apprenez-moi une tache avec le bouton 'Apprends-moi' !"
- )
- self._chat.add_lea_message(text)
- self._chat.set_input_enabled(True)
-
- # ---------------------------------------------------------------------------
- # Connexion
- # ---------------------------------------------------------------------------
-
- def _check_connection(self) -> None:
- """Verifier la connexion au serveur (dans un timer)."""
- connected = self._client.check_connection()
- self._update_connection_ui(connected)
-
- def _on_connection_changed(self, connected: bool) -> None:
- """Callback quand l'etat de connexion change."""
- # Appeler dans le thread principal via QTimer
- QTimer.singleShot(0, lambda: self._update_connection_ui(connected))
-
- def _update_connection_ui(self, connected: bool) -> None:
- """Mettre a jour l'UI selon l'etat de connexion."""
- self._avatar.set_connected(connected)
- if hasattr(self, '_mini_avatar'):
- self._mini_avatar.set_connected(connected)
-
- if connected:
- self._status_label.setText(
- f"Connecte a {self._client.server_host}"
- )
- self._status_label.setStyleSheet(
- f"color: {styles.COLOR_SUCCESS}; "
- f"font-family: '{styles.FONT_FAMILY}'; "
- f"font-size: {styles.FONT_SIZE_SMALL}px; "
- f"background: transparent; border: none;"
- )
- else:
- error = self._client.last_error or "Serveur injoignable"
- self._status_label.setText(f"Deconnecte ({error[:30]})")
- self._status_label.setStyleSheet(
- f"color: {styles.COLOR_ERROR}; "
- f"font-family: '{styles.FONT_FAMILY}'; "
- f"font-size: {styles.FONT_SIZE_SMALL}px; "
- f"background: transparent; border: none;"
- )
-
- # ---------------------------------------------------------------------------
- # Replay & Overlay
- # ---------------------------------------------------------------------------
-
- def _on_replay_action(self, action: Dict[str, Any]) -> None:
- """Callback appelee depuis le thread de polling (pas thread-safe).
-
- Emettre un signal pour traiter dans le thread Qt.
- """
- self.replay_action_received.emit(action)
-
- @pyqtSlot(dict)
- def _handle_replay_action(self, action: Dict[str, Any]) -> None:
- """Traiter une action de replay dans le thread Qt.
-
- Afficher l'overlay AVANT l'execution pour que l'utilisateur
- voie ce qui va se passer.
- """
- action_type = action.get("type", "?")
- action_text = self._describe_action(action)
-
- # Calculer les coordonnees ecran
- desktop = QApplication.desktop()
- screen = desktop.screenGeometry(desktop.primaryScreen()) if desktop else None
- if screen:
- sw, sh = screen.width(), screen.height()
- else:
- sw, sh = 1920, 1080
-
- target_x = int(action.get("x_pct", 0.5) * sw)
- target_y = int(action.get("y_pct", 0.5) * sh)
-
- # Recuperer la progression depuis le replay status
- replay = self._client.get_replay_status()
- step_current = 0
- step_total = 0
- if replay:
- step_total = replay.get("total_actions", 0)
- step_current = replay.get("completed_actions", 0) + 1
-
- # Mettre a jour la status bar
- self._show_replay_status(action_text, step_current, step_total)
-
- # Afficher l'overlay
- self._overlay.show_action(
- target_x, target_y,
- action_text,
- step_current, step_total,
- duration_ms=1500,
- )
-
- # Ajouter dans le chat
- self._chat.add_system_message(
- f"Etape {step_current}/{step_total} : {action_text}"
- )
-
- def _describe_action(self, action: Dict[str, Any]) -> str:
- """Generer une description lisible d'une action de replay."""
- action_type = action.get("type", "?")
- target_text = action.get("target_text", "")
- target_role = action.get("target_role", "")
-
- if action_type == "click":
- target = target_text or target_role or "cet element"
- return f"Je clique sur [{target}]"
- elif action_type == "type":
- text = action.get("text", "")
- preview = text[:30] + "..." if len(text) > 30 else text
- return f"Je tape : {preview}"
- elif action_type == "key_combo":
- keys = action.get("keys", [])
- return f"Je tape : {'+'.join(keys)}"
- elif action_type == "scroll":
- return "Je fais defiler la page"
- elif action_type == "wait":
- ms = action.get("duration_ms", 500)
- return f"J'attends {ms}ms"
- else:
- return f"Action : {action_type}"
-
- def _on_overlay_finished(self) -> None:
- """Callback quand l'overlay a fini d'afficher une action."""
- pass # L'executor continue de son cote
-
- def _show_replay_status(
- self, text: str, current: int, total: int,
- ) -> None:
- """Afficher la barre de progression du replay."""
- self._status_container.show()
- self._replay_label.show()
- self._replay_label.setText(text)
-
- if total > 0:
- self._progress_bar.show()
- self._progress_bar.setMaximum(total)
- self._progress_bar.setValue(current)
- else:
- self._progress_bar.hide()
-
- def hide_replay_status(self) -> None:
- """Masquer la barre de progression du replay."""
- self._status_container.hide()
-
- # ---------------------------------------------------------------------------
- # Visibilite
- # ---------------------------------------------------------------------------
-
- def toggle_visibility(self) -> None:
- """Afficher/cacher le panneau (raccourci Ctrl+Shift+L)."""
- if self.isVisible():
- self.hide()
- else:
- self.show()
- self.raise_()
- self.activateWindow()
-
- def toggle_minimize(self) -> None:
- """Basculer entre panneau complet et mini-barre."""
- if self._minimized:
- # Restaurer
- self._mini_bar.hide()
- self._bg_widget.show()
- self._minimized = False
- self._anchor_to_right()
- else:
- # Reduire
- self._bg_widget.hide()
- self._mini_bar.show()
- self._minimized = True
- # Positionner la mini-barre en haut a droite
- desktop = QApplication.desktop()
- if desktop:
- screen = desktop.availableGeometry(desktop.primaryScreen())
- x = screen.right() - 90
- y = screen.top() + 10
- self.setGeometry(x, y, 80, 50)
-
- # ---------------------------------------------------------------------------
- # Drag (deplacer la fenetre sans barre de titre)
- # ---------------------------------------------------------------------------
-
- def mousePressEvent(self, event) -> None: # noqa: N802
- if event.button() == Qt.LeftButton:
- self._drag_pos = event.globalPos() - self.frameGeometry().topLeft()
- event.accept()
-
- def mouseMoveEvent(self, event) -> None: # noqa: N802
- if event.buttons() == Qt.LeftButton and hasattr(self, '_drag_pos'):
- self.move(event.globalPos() - self._drag_pos)
- event.accept()
-
- # ---------------------------------------------------------------------------
- # Painting (fond arrondi semi-transparent)
- # ---------------------------------------------------------------------------
-
- def paintEvent(self, event) -> None: # noqa: N802
- """Peindre le fond semi-transparent avec coins arrondis."""
- painter = QPainter(self)
- painter.setRenderHint(QPainter.Antialiasing, True)
-
- path = QPainterPath()
- path.addRoundedRect(
- 0, 0, self.width(), self.height(),
- styles.BORDER_RADIUS, styles.BORDER_RADIUS,
- )
-
- # Fond semi-transparent
- bg = QColor(styles.COLOR_BG)
- bg.setAlpha(245) # Legerement transparent
- painter.fillPath(path, bg)
-
- # Bordure
- painter.setPen(QPen(QColor(styles.COLOR_BORDER), 1))
- painter.drawPath(path)
-
- painter.end()
-
- # ---------------------------------------------------------------------------
- # Lifecycle
- # ---------------------------------------------------------------------------
-
- def closeEvent(self, event) -> None: # noqa: N802
- """Ne pas fermer, juste cacher."""
- event.ignore()
- self.hide()
-
- def shutdown(self) -> None:
- """Arret propre."""
- self._conn_timer.stop()
- self._overlay.hide_overlay()
- self._client.shutdown()
- logger.info("LeaMainWindow arretee")
diff --git a/agent_v0/lea_ui/overlay.py b/agent_v0/lea_ui/overlay.py
deleted file mode 100644
index c2e850786..000000000
--- a/agent_v0/lea_ui/overlay.py
+++ /dev/null
@@ -1,354 +0,0 @@
-# agent_v0/lea_ui/overlay.py
-"""
-Overlay de feedback visuel pour le replay.
-
-Fenetre transparente plein ecran, click-through, qui affiche :
- - Cercle rouge pulsant autour de la cible du clic
- - Texte descriptif de l'action en cours
- - Fleche pointant vers la cible
- - Barre de progression etape X/Y
-
-Le overlay ne capture JAMAIS les clics (Qt.WA_TransparentForMouseEvents).
-"""
-
-from __future__ import annotations
-
-import logging
-import math
-from typing import Optional, Tuple
-
-from PyQt5.QtCore import (
- QPoint,
- QPropertyAnimation,
- QRect,
- QRectF,
- QSize,
- Qt,
- QTimer,
- pyqtProperty,
- pyqtSignal,
-)
-from PyQt5.QtGui import (
- QBrush,
- QColor,
- QFont,
- QFontMetrics,
- QPainter,
- QPainterPath,
- QPen,
- QPolygonF,
-)
-from PyQt5.QtWidgets import QApplication, QDesktopWidget, QWidget
-
-from . import styles
-
-logger = logging.getLogger("lea_ui.overlay")
-
-
-class OverlayWidget(QWidget):
- """Overlay plein ecran transparent pour le feedback visuel du replay.
-
- Flags critiques :
- - WindowStaysOnTopHint : toujours au-dessus
- - FramelessWindowHint : pas de decoration
- - Tool : n'apparait pas dans la barre des taches
- - WA_TranslucentBackground : fond transparent
- - WA_TransparentForMouseEvents : CLICK-THROUGH COMPLET
- """
-
- # Signal emis quand l'animation d'une action est terminee
- action_display_finished = pyqtSignal()
-
- def __init__(self, parent: Optional[QWidget] = None) -> None:
- super().__init__(parent)
-
- # Flags de fenetre pour click-through complet
- self.setWindowFlags(
- Qt.WindowStaysOnTopHint
- | Qt.FramelessWindowHint
- | Qt.Tool
- )
- self.setAttribute(Qt.WA_TranslucentBackground, True)
- self.setAttribute(Qt.WA_TransparentForMouseEvents, True)
-
- # Etat de l'affichage
- self._target_pos: Optional[Tuple[int, int]] = None
- self._action_text: str = ""
- self._progress_current: int = 0
- self._progress_total: int = 0
- self._action_done: bool = False
- self._visible = False
-
- # Animation du cercle pulsant
- self._pulse_radius: float = 30.0
- self._pulse_growing = True
- self._pulse_opacity: float = 0.8
-
- # Timer d'animation
- self._anim_timer = QTimer(self)
- self._anim_timer.timeout.connect(self._animate_pulse)
- self._anim_timer.setInterval(30) # ~33 FPS
-
- # Timer d'effacement automatique
- self._fade_timer = QTimer(self)
- self._fade_timer.setSingleShot(True)
- self._fade_timer.timeout.connect(self._on_fade)
-
- # Couvrir tout l'ecran
- self._update_geometry()
-
- def _update_geometry(self) -> None:
- """Positionner l'overlay sur tout l'ecran principal."""
- desktop = QApplication.desktop()
- if desktop:
- screen_rect = desktop.screenGeometry(desktop.primaryScreen())
- self.setGeometry(screen_rect)
-
- # ---------------------------------------------------------------------------
- # API publique
- # ---------------------------------------------------------------------------
-
- def show_action(
- self,
- target_x: int,
- target_y: int,
- text: str,
- step_current: int = 0,
- step_total: int = 0,
- duration_ms: int = 1500,
- ) -> None:
- """Afficher le feedback pour une action de replay.
-
- Args:
- target_x: position X du clic cible (pixels ecran)
- target_y: position Y du clic cible (pixels ecran)
- text: description de l'action (ex: "Je clique sur [Valider]")
- step_current: etape courante (1-indexed)
- step_total: nombre total d'etapes
- duration_ms: duree d'affichage en ms (defaut 1500ms)
- """
- self._target_pos = (target_x, target_y)
- self._action_text = text
- self._progress_current = step_current
- self._progress_total = step_total
- self._action_done = False
- self._pulse_radius = 30.0
- self._pulse_opacity = 0.8
- self._visible = True
-
- self._update_geometry()
- self.show()
- self.raise_()
- self._anim_timer.start()
-
- # Programmer l'effacement
- self._fade_timer.start(duration_ms)
- self.update()
-
- def show_done(self, text: Optional[str] = None) -> None:
- """Marquer l'action courante comme terminee (coche verte)."""
- self._action_done = True
- if text:
- self._action_text = text
- self.update()
-
- # Effacer apres 800ms
- self._fade_timer.start(800)
-
- def hide_overlay(self) -> None:
- """Masquer immediatement l'overlay."""
- self._anim_timer.stop()
- self._fade_timer.stop()
- self._visible = False
- self._target_pos = None
- self.hide()
-
- # ---------------------------------------------------------------------------
- # Animations
- # ---------------------------------------------------------------------------
-
- def _animate_pulse(self) -> None:
- """Animer le cercle pulsant."""
- if self._action_done:
- # Pas d'animation en mode "done"
- return
-
- pulse_speed = 0.8
- if self._pulse_growing:
- self._pulse_radius += pulse_speed
- if self._pulse_radius >= 45.0:
- self._pulse_growing = False
- else:
- self._pulse_radius -= pulse_speed
- if self._pulse_radius <= 25.0:
- self._pulse_growing = True
-
- # Opacite qui suit le pulse
- self._pulse_opacity = 0.5 + 0.3 * (
- (self._pulse_radius - 25.0) / 20.0
- )
-
- self.update()
-
- def _on_fade(self) -> None:
- """Callback apres le timer d'effacement."""
- self._anim_timer.stop()
- self._visible = False
- self._target_pos = None
- self.hide()
- self.action_display_finished.emit()
-
- # ---------------------------------------------------------------------------
- # Rendu
- # ---------------------------------------------------------------------------
-
- def paintEvent(self, event) -> None: # noqa: N802
- """Dessiner l'overlay."""
- if not self._visible or not self._target_pos:
- return
-
- painter = QPainter(self)
- painter.setRenderHint(QPainter.Antialiasing, True)
-
- tx, ty = self._target_pos
-
- if self._action_done:
- self._draw_done_indicator(painter, tx, ty)
- else:
- self._draw_pulse_circle(painter, tx, ty)
- self._draw_arrow(painter, tx, ty)
-
- self._draw_action_text(painter, tx, ty)
- self._draw_progress_bar(painter)
-
- painter.end()
-
- def _draw_pulse_circle(self, painter: QPainter, cx: int, cy: int) -> None:
- """Dessiner le cercle rouge pulsant autour de la cible."""
- # Cercle exterieur (pulsant, semi-transparent)
- color = QColor(styles.COLOR_OVERLAY_PULSE)
- color.setAlphaF(self._pulse_opacity * 0.4)
- painter.setBrush(QBrush(color))
- painter.setPen(Qt.NoPen)
- painter.drawEllipse(
- QPoint(cx, cy),
- int(self._pulse_radius),
- int(self._pulse_radius),
- )
-
- # Cercle interieur (fixe, plus opaque)
- color_inner = QColor(styles.COLOR_OVERLAY_PULSE)
- color_inner.setAlphaF(0.7)
- pen = QPen(color_inner, 3)
- painter.setPen(pen)
- painter.setBrush(Qt.NoBrush)
- painter.drawEllipse(QPoint(cx, cy), 20, 20)
-
- # Point central
- painter.setPen(Qt.NoPen)
- painter.setBrush(QBrush(QColor(styles.COLOR_OVERLAY_PULSE)))
- painter.drawEllipse(QPoint(cx, cy), 4, 4)
-
- def _draw_done_indicator(self, painter: QPainter, cx: int, cy: int) -> None:
- """Dessiner l'indicateur de succes (cercle vert + coche)."""
- # Cercle vert
- color = QColor(styles.COLOR_SUCCESS)
- color.setAlphaF(0.8)
- painter.setBrush(QBrush(color))
- painter.setPen(Qt.NoPen)
- painter.drawEllipse(QPoint(cx, cy), 25, 25)
-
- # Coche blanche
- pen = QPen(QColor(styles.COLOR_TEXT_ON_ACCENT), 3)
- pen.setCapStyle(Qt.RoundCap)
- pen.setJoinStyle(Qt.RoundJoin)
- painter.setPen(pen)
- painter.setBrush(Qt.NoBrush)
-
- path = QPainterPath()
- path.moveTo(cx - 10, cy)
- path.lineTo(cx - 3, cy + 8)
- path.lineTo(cx + 12, cy - 8)
- painter.drawPath(path)
-
- def _draw_arrow(self, painter: QPainter, tx: int, ty: int) -> None:
- """Dessiner une fleche pointant vers la cible depuis le texte."""
- # Position du texte (au-dessus ou en dessous selon l'espace)
- text_y = ty - 80 if ty > 120 else ty + 80
- text_x = max(100, min(tx, self.width() - 200))
-
- # Ligne de la fleche
- color = QColor(styles.COLOR_OVERLAY_PULSE)
- color.setAlphaF(0.6)
- pen = QPen(color, 2, Qt.DashLine)
- painter.setPen(pen)
- painter.drawLine(text_x, text_y + (15 if text_y < ty else -15), tx, ty)
-
- def _draw_action_text(self, painter: QPainter, tx: int, ty: int) -> None:
- """Dessiner le texte descriptif de l'action."""
- if not self._action_text:
- return
-
- # Positionner le texte au-dessus ou en dessous de la cible
- text_y = ty - 90 if ty > 140 else ty + 70
-
- font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_LARGE, QFont.Bold)
- painter.setFont(font)
- metrics = QFontMetrics(font)
-
- # Mesurer le texte
- text_rect = metrics.boundingRect(self._action_text)
- text_width = text_rect.width() + 30
- text_height = text_rect.height() + 16
-
- # Centrer horizontalement sur la cible (avec limites d'ecran)
- box_x = max(10, min(tx - text_width // 2, self.width() - text_width - 10))
- box_y = text_y - text_height // 2
-
- # Fond semi-transparent arrondi
- bg_color = QColor(31, 41, 55, 200) # Gris fonce semi-transparent
- painter.setBrush(QBrush(bg_color))
- painter.setPen(Qt.NoPen)
- painter.drawRoundedRect(box_x, box_y, text_width, text_height, 8, 8)
-
- # Texte blanc
- painter.setPen(QPen(QColor(styles.COLOR_OVERLAY_TEXT)))
- painter.drawText(
- QRect(box_x, box_y, text_width, text_height),
- Qt.AlignCenter,
- self._action_text,
- )
-
- def _draw_progress_bar(self, painter: QPainter) -> None:
- """Dessiner la barre de progression en bas de l'ecran."""
- if self._progress_total <= 0:
- return
-
- bar_width = 300
- bar_height = 6
- bar_x = (self.width() - bar_width) // 2
- bar_y = self.height() - 50
-
- # Fond
- bg_color = QColor(255, 255, 255, 80)
- painter.setBrush(QBrush(bg_color))
- painter.setPen(Qt.NoPen)
- painter.drawRoundedRect(bar_x, bar_y, bar_width, bar_height, 3, 3)
-
- # Progression
- progress_pct = self._progress_current / self._progress_total
- fill_width = int(bar_width * progress_pct)
- accent_color = QColor(styles.COLOR_ACCENT)
- accent_color.setAlphaF(0.9)
- painter.setBrush(QBrush(accent_color))
- painter.drawRoundedRect(bar_x, bar_y, fill_width, bar_height, 3, 3)
-
- # Label "Etape X/Y"
- label_font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL)
- painter.setFont(label_font)
- painter.setPen(QPen(QColor(255, 255, 255, 200)))
- painter.drawText(
- QRect(bar_x, bar_y + bar_height + 4, bar_width, 20),
- Qt.AlignCenter,
- f"Etape {self._progress_current}/{self._progress_total}",
- )
diff --git a/agent_v0/lea_ui/replay_integration.py b/agent_v0/lea_ui/replay_integration.py
deleted file mode 100644
index da29ce418..000000000
--- a/agent_v0/lea_ui/replay_integration.py
+++ /dev/null
@@ -1,191 +0,0 @@
-# agent_v0/lea_ui/replay_integration.py
-"""
-Integration du feedback visuel (overlay) dans la boucle de replay de l'Agent V1.
-
-Ce module fournit un wrapper autour de ActionExecutorV1.execute_replay_action
-qui affiche l'overlay AVANT chaque action et la marque comme terminee APRES.
-
-Sequence pour chaque action :
- 1. Afficher l'overlay avec la description de l'action (1.5s)
- 2. Attendre que l'overlay ait ete vu par l'utilisateur
- 3. Executer l'action
- 4. Mettre a jour l'overlay (coche verte)
- 5. Passer a l'action suivante
-"""
-
-from __future__ import annotations
-
-import logging
-import time
-from typing import Any, Callable, Dict, Optional, Tuple
-
-logger = logging.getLogger("lea_ui.replay_integration")
-
-# Delai d'affichage de l'overlay avant execution (secondes)
-PRE_ACTION_DELAY = 1.5
-# Delai apres la coche verte (secondes)
-POST_ACTION_DELAY = 0.5
-
-
-class ReplayOverlayBridge:
- """Pont entre la boucle de replay et l'overlay.
-
- Fonctionne de maniere thread-safe : la boucle de replay tourne dans
- un thread daemon, et l'overlay est controle via des signaux Qt.
-
- L'overlay est optionnel — si non connecte, l'execution continue normalement.
- """
-
- def __init__(self) -> None:
- self._overlay = None
- self._show_callback: Optional[Callable] = None
- self._done_callback: Optional[Callable] = None
- self._hide_callback: Optional[Callable] = None
- self._enabled = False
-
- # Compteur de progression
- self._step_current = 0
- self._step_total = 0
-
- def connect_overlay(
- self,
- show_fn: Callable[[int, int, str, int, int, int], None],
- done_fn: Callable[[Optional[str]], None],
- hide_fn: Callable[[], None],
- ) -> None:
- """Connecter les callbacks de l'overlay.
-
- Args:
- show_fn: overlay.show_action(target_x, target_y, text, step, total, duration_ms)
- done_fn: overlay.show_done(text)
- hide_fn: overlay.hide_overlay()
- """
- self._show_callback = show_fn
- self._done_callback = done_fn
- self._hide_callback = hide_fn
- self._enabled = True
- logger.info("Overlay connecte au bridge de replay")
-
- def disconnect_overlay(self) -> None:
- """Deconnecter l'overlay."""
- self._show_callback = None
- self._done_callback = None
- self._hide_callback = None
- self._enabled = False
-
- def set_total_steps(self, total: int) -> None:
- """Definir le nombre total d'etapes du replay."""
- self._step_total = total
- self._step_current = 0
-
- def wrap_execute(
- self,
- action: Dict[str, Any],
- executor_fn: Callable[[Dict[str, Any]], Dict[str, Any]],
- screen_width: int = 1920,
- screen_height: int = 1080,
- ) -> Dict[str, Any]:
- """Wrapper autour de l'execution d'une action avec feedback overlay.
-
- Args:
- action: action normalisee (type, x_pct, y_pct, text, keys, ...)
- executor_fn: fonction d'execution (ex: ActionExecutorV1.execute_replay_action)
- screen_width: largeur de l'ecran en pixels
- screen_height: hauteur de l'ecran en pixels
-
- Returns:
- Resultat de l'execution (dict avec success, error, screenshot, ...)
- """
- self._step_current += 1
-
- if not self._enabled or not self._show_callback:
- # Pas d'overlay — execution directe
- return executor_fn(action)
-
- # --- 1. Afficher l'overlay ---
- action_text = self._describe_action(action)
- target_x, target_y = self._get_target_coords(action, screen_width, screen_height)
-
- try:
- self._show_callback(
- target_x, target_y,
- action_text,
- self._step_current,
- self._step_total,
- int(PRE_ACTION_DELAY * 1000),
- )
- except Exception as e:
- logger.warning("Erreur affichage overlay : %s", e)
-
- # --- 2. Attendre que l'utilisateur ait vu ---
- time.sleep(PRE_ACTION_DELAY)
-
- # --- 3. Executer l'action ---
- result = executor_fn(action)
-
- # --- 4. Marquer comme terminee ---
- if result.get("success"):
- done_text = f"{action_text} OK"
- else:
- done_text = f"{action_text} ECHEC"
-
- try:
- if self._done_callback:
- self._done_callback(done_text)
- except Exception as e:
- logger.warning("Erreur overlay done : %s", e)
-
- time.sleep(POST_ACTION_DELAY)
-
- # --- 5. Cacher si c'etait la derniere etape ---
- if self._step_current >= self._step_total and self._hide_callback:
- try:
- self._hide_callback()
- except Exception:
- pass
-
- return result
-
- def _describe_action(self, action: Dict[str, Any]) -> str:
- """Generer une description lisible d'une action."""
- action_type = action.get("type", "?")
- target_text = action.get("target_text", "")
- target_role = action.get("target_role", "")
-
- if action_type == "click":
- target = target_text or target_role or "cet element"
- return f"Je clique sur [{target}]"
- elif action_type == "type":
- text = action.get("text", "")
- preview = text[:25] + "..." if len(text) > 25 else text
- return f"Je tape : {preview}"
- elif action_type == "key_combo":
- keys = action.get("keys", [])
- return f"Combinaison : {'+'.join(keys)}"
- elif action_type == "scroll":
- return "Defilement"
- elif action_type == "wait":
- ms = action.get("duration_ms", 500)
- return f"Attente {ms}ms"
- else:
- return f"Action : {action_type}"
-
- def _get_target_coords(
- self, action: Dict[str, Any], sw: int, sh: int,
- ) -> Tuple[int, int]:
- """Calculer les coordonnees cible en pixels."""
- x_pct = action.get("x_pct", 0.5)
- y_pct = action.get("y_pct", 0.5)
- return int(x_pct * sw), int(y_pct * sh)
-
-
-# Instance globale (singleton) pour l'integration
-_bridge: Optional[ReplayOverlayBridge] = None
-
-
-def get_replay_bridge() -> ReplayOverlayBridge:
- """Obtenir l'instance globale du bridge overlay/replay."""
- global _bridge
- if _bridge is None:
- _bridge = ReplayOverlayBridge()
- return _bridge
diff --git a/agent_v0/lea_ui/styles.py b/agent_v0/lea_ui/styles.py
deleted file mode 100644
index 524c56856..000000000
--- a/agent_v0/lea_ui/styles.py
+++ /dev/null
@@ -1,200 +0,0 @@
-# agent_v0/lea_ui/styles.py
-"""
-Theme et couleurs pour l'interface Lea.
-
-Palette douce et moderne, pensee pour ne pas fatiguer les yeux
-lors d'une utilisation prolongee sur un poste de travail Windows.
-"""
-
-# ---------------------------------------------------------------------------
-# Palette de couleurs
-# ---------------------------------------------------------------------------
-
-# Fond principal
-COLOR_BG = "#F5F7FA"
-# Fond secondaire (sidebar, header)
-COLOR_BG_SECONDARY = "#EEF1F6"
-# Fond des bulles utilisateur
-COLOR_BUBBLE_USER = "#6366F1"
-# Fond des bulles Lea
-COLOR_BUBBLE_LEA = "#FFFFFF"
-# Accent principal (indigo)
-COLOR_ACCENT = "#6366F1"
-# Accent hover
-COLOR_ACCENT_HOVER = "#4F46E5"
-# Texte principal
-COLOR_TEXT = "#1F2937"
-# Texte secondaire
-COLOR_TEXT_SECONDARY = "#6B7280"
-# Texte sur accent (blanc)
-COLOR_TEXT_ON_ACCENT = "#FFFFFF"
-# Bordure legere
-COLOR_BORDER = "#E5E7EB"
-# Succes (vert)
-COLOR_SUCCESS = "#10B981"
-# Erreur (rouge)
-COLOR_ERROR = "#EF4444"
-# Avertissement (orange)
-COLOR_WARNING = "#F59E0B"
-# Overlay rouge pulsant
-COLOR_OVERLAY_PULSE = "#EF4444"
-# Overlay texte
-COLOR_OVERLAY_TEXT = "#FFFFFF"
-# Overlay fond info
-COLOR_OVERLAY_INFO_BG = "rgba(31, 41, 55, 200)"
-
-# ---------------------------------------------------------------------------
-# Typographie
-# ---------------------------------------------------------------------------
-
-FONT_FAMILY = "Segoe UI"
-FONT_SIZE_SMALL = 11
-FONT_SIZE_NORMAL = 13
-FONT_SIZE_LARGE = 15
-FONT_SIZE_TITLE = 18
-
-# ---------------------------------------------------------------------------
-# Dimensions
-# ---------------------------------------------------------------------------
-
-# Largeur du panneau Lea
-PANEL_WIDTH = 380
-# Hauteur minimale
-PANEL_MIN_HEIGHT = 500
-# Rayon des coins arrondis
-BORDER_RADIUS = 12
-# Rayon des bulles de chat
-BUBBLE_RADIUS = 16
-# Padding interne
-PADDING = 12
-# Taille de l'avatar
-AVATAR_SIZE = 40
-# Marge entre les elements
-SPACING = 8
-
-# ---------------------------------------------------------------------------
-# Stylesheet global du panneau Lea
-# ---------------------------------------------------------------------------
-
-MAIN_WINDOW_STYLE = f"""
- QWidget#LeaMainWindow {{
- background-color: {COLOR_BG};
- border-radius: {BORDER_RADIUS}px;
- border: 1px solid {COLOR_BORDER};
- }}
-"""
-
-HEADER_STYLE = f"""
- QWidget#LeaHeader {{
- background-color: {COLOR_BG_SECONDARY};
- border-top-left-radius: {BORDER_RADIUS}px;
- border-top-right-radius: {BORDER_RADIUS}px;
- border-bottom: 1px solid {COLOR_BORDER};
- }}
- QLabel#LeaTitle {{
- color: {COLOR_TEXT};
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_TITLE}px;
- font-weight: bold;
- }}
- QLabel#LeaStatus {{
- color: {COLOR_TEXT_SECONDARY};
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_SMALL}px;
- }}
-"""
-
-CHAT_AREA_STYLE = f"""
- QScrollArea {{
- border: none;
- background-color: {COLOR_BG};
- }}
- QWidget#ChatContainer {{
- background-color: {COLOR_BG};
- }}
-"""
-
-INPUT_STYLE = f"""
- QLineEdit#ChatInput {{
- background-color: {COLOR_BUBBLE_LEA};
- border: 1px solid {COLOR_BORDER};
- border-radius: 20px;
- padding: 8px 16px;
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_NORMAL}px;
- color: {COLOR_TEXT};
- }}
- QLineEdit#ChatInput:focus {{
- border-color: {COLOR_ACCENT};
- }}
-"""
-
-SEND_BUTTON_STYLE = f"""
- QPushButton#SendButton {{
- background-color: {COLOR_ACCENT};
- color: {COLOR_TEXT_ON_ACCENT};
- border: none;
- border-radius: 20px;
- padding: 8px 16px;
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_NORMAL}px;
- font-weight: bold;
- min-width: 50px;
- }}
- QPushButton#SendButton:hover {{
- background-color: {COLOR_ACCENT_HOVER};
- }}
- QPushButton#SendButton:pressed {{
- background-color: #3730A3;
- }}
-"""
-
-QUICK_BUTTON_STYLE = f"""
- QPushButton#QuickButton {{
- background-color: {COLOR_BUBBLE_LEA};
- color: {COLOR_ACCENT};
- border: 1px solid {COLOR_ACCENT};
- border-radius: 18px;
- padding: 6px 14px;
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_SMALL}px;
- }}
- QPushButton#QuickButton:hover {{
- background-color: {COLOR_ACCENT};
- color: {COLOR_TEXT_ON_ACCENT};
- }}
-"""
-
-PROGRESS_STYLE = f"""
- QProgressBar {{
- border: none;
- border-radius: 4px;
- background-color: {COLOR_BORDER};
- text-align: center;
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_SMALL}px;
- color: {COLOR_TEXT};
- max-height: 8px;
- }}
- QProgressBar::chunk {{
- background-color: {COLOR_ACCENT};
- border-radius: 4px;
- }}
-"""
-
-STATUS_LABEL_STYLE = f"""
- QLabel#StatusLabel {{
- color: {COLOR_TEXT_SECONDARY};
- font-family: "{FONT_FAMILY}";
- font-size: {FONT_SIZE_SMALL}px;
- padding: 4px 8px;
- }}
-"""
-
-MINI_BAR_STYLE = f"""
- QWidget#MiniBar {{
- background-color: {COLOR_BG_SECONDARY};
- border-radius: 20px;
- border: 1px solid {COLOR_BORDER};
- }}
-"""
diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py
index 18c6208cb..c6dd773f2 100644
--- a/agent_v0/server_v1/api_stream.py
+++ b/agent_v0/server_v1/api_stream.py
@@ -3326,21 +3326,13 @@ def _vlm_quick_find(
# Résolution Set-of-Mark : SomEngine (détection) + VLM (identification)
# ---------------------------------------------------------------------------
-_som_engine_api = None # Singleton
-
-
def _get_som_engine_api():
- """Singleton SomEngine pour la résolution visuelle (lazy-loaded, GPU)."""
- global _som_engine_api
- if _som_engine_api is None:
- try:
- from core.detection.som_engine import SomEngine
- _som_engine_api = SomEngine(device="cuda")
- logger.info("SomEngine API initialisé (lazy singleton)")
- except Exception as e:
- logger.warning("SomEngine API non disponible : %s", e)
- _som_engine_api = False
- return _som_engine_api if _som_engine_api is not False else None
+ """Singleton SomEngine partagé."""
+ try:
+ from core.detection.som_engine import get_shared_engine
+ return get_shared_engine()
+ except ImportError:
+ return None
def _resolve_by_som(
@@ -3423,7 +3415,7 @@ def _resolve_by_som(
if not exact_matches:
exact_matches = [
e for e in som_result.elements
- if e.label and (
+ if e.label and len(e.label) >= 3 and (
label_lower in e.label.lower()
or e.label.lower() in label_lower
)
@@ -3493,6 +3485,10 @@ def _resolve_by_som(
)
# ── 3. Sauvegarder l'image annotée SoM temporairement ──
+ if som_result.som_image is None:
+ logger.debug("SoM resolve : pas d'image annotée, skip VLM")
+ return None
+
import tempfile
try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py
index 6f1afd105..234abaa2e 100644
--- a/agent_v0/server_v1/stream_processor.py
+++ b/agent_v0/server_v1/stream_processor.py
@@ -431,21 +431,17 @@ def _needs_post_wait(action: dict) -> int:
# SomEngine — enrichissement Set-of-Mark des clics pendant le build_replay
# ---------------------------------------------------------------------------
-_som_engine = None # Singleton, chargé à la demande
+_som_cache: Dict[str, Any] = {} # screenshot_id -> SomResult (cache build_replay)
+_SOM_CACHE_MAX = 50
def _get_som_engine():
- """Singleton SomEngine (lazy-loaded, GPU)."""
- global _som_engine
- if _som_engine is None:
- try:
- from core.detection.som_engine import SomEngine
- _som_engine = SomEngine(device="cuda")
- logger.info("SomEngine initialisé (lazy singleton)")
- except Exception as e:
- logger.warning("SomEngine non disponible : %s", e)
- _som_engine = False # Marqueur "indisponible"
- return _som_engine if _som_engine is not False else None
+ """Singleton SomEngine partagé."""
+ try:
+ from core.detection.som_engine import get_shared_engine
+ return get_shared_engine()
+ except ImportError:
+ return None
def _som_identify_clicked_element(
@@ -486,19 +482,38 @@ def _som_identify_clicked_element(
if not full_path.is_file():
return None
- try:
- from PIL import Image
- img = Image.open(full_path).convert("RGB")
- except Exception as e:
- logger.debug("SoM: impossible de charger %s : %s", full_path, e)
- return None
+ # Vérifier le cache SomResult par (session_dir, screenshot_id)
+ cache_key = f"{session_dir}:{screenshot_id}"
+ if cache_key in _som_cache:
+ result = _som_cache[cache_key]
+ else:
+ try:
+ from PIL import Image
+ img = Image.open(full_path).convert("RGB")
+ except Exception as e:
+ logger.debug("SoM: impossible de charger %s : %s", full_path, e)
+ return None
- # Lancer SomEngine
- try:
- result = engine.analyze(img)
- except Exception as e:
- logger.warning("SoM: erreur d'analyse : %s", e)
- return None
+ # Lancer SomEngine
+ try:
+ result = engine.analyze(img)
+ except Exception as e:
+ logger.warning("SoM: erreur d'analyse : %s", e)
+ return None
+
+ # Stocker dans le cache (éléments seulement, pas l'image annotée)
+ from core.detection.som_engine import SomResult
+ cached = SomResult(
+ elements=result.elements,
+ width=result.width,
+ height=result.height,
+ analysis_time_ms=result.analysis_time_ms,
+ )
+ if len(_som_cache) >= _SOM_CACHE_MAX:
+ # Supprimer la plus ancienne entrée (FIFO)
+ oldest_key = next(iter(_som_cache))
+ del _som_cache[oldest_key]
+ _som_cache[cache_key] = cached
if not result.elements:
return None
diff --git a/agent_v0/window_info.py b/agent_v0/window_info.py
deleted file mode 100644
index 7e6be8744..000000000
--- a/agent_v0/window_info.py
+++ /dev/null
@@ -1,55 +0,0 @@
-# window_info.py
-"""
-Récupération des informations sur la fenêtre active (X11).
-
-v0 :
-- utilise xdotool pour obtenir :
- - le titre de la fenêtre active
- - le PID de la fenêtre active, puis le nom du process via ps
-
-Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown".
-"""
-
-from __future__ import annotations
-
-import subprocess
-from typing import Dict, Optional
-
-
-def _run_cmd(cmd: list[str]) -> Optional[str]:
- """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
- try:
- out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
- return out.decode("utf-8", errors="ignore").strip()
- except Exception:
- return None
-
-
-def get_active_window_info() -> Dict[str, str]:
- """
- Renvoie un dict :
- {
- "title": "...",
- "app_name": "..."
- }
-
- Nécessite xdotool installé sur le système.
- """
- title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
- pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
-
- app_name: Optional[str] = None
- if pid_str:
- pid_str = pid_str.strip()
- # On récupère le nom du binaire via ps
- app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
-
- if not title:
- title = "unknown_window"
- if not app_name:
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
diff --git a/agent_v0/window_info_crossplatform.py b/agent_v0/window_info_crossplatform.py
deleted file mode 100644
index ba059a3fc..000000000
--- a/agent_v0/window_info_crossplatform.py
+++ /dev/null
@@ -1,192 +0,0 @@
-# window_info_crossplatform.py
-"""
-Récupération des informations sur la fenêtre active - CROSS-PLATFORM
-
-Supporte:
-- Linux (X11 via xdotool)
-- Windows (via pywin32)
-- macOS (via pyobjc)
-
-Installation des dépendances:
- pip install pywin32 # Windows
- pip install pyobjc-framework-Cocoa # macOS
- pip install psutil # Tous OS
-"""
-
-from __future__ import annotations
-
-import platform
-import subprocess
-from typing import Dict, Optional
-
-
-def _run_cmd(cmd: list[str]) -> Optional[str]:
- """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
- try:
- out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
- return out.decode("utf-8", errors="ignore").strip()
- except Exception:
- return None
-
-
-def get_active_window_info() -> Dict[str, str]:
- """
- Renvoie un dict :
- {
- "title": "...",
- "app_name": "..."
- }
-
- Détecte automatiquement l'OS et utilise la méthode appropriée.
- """
- system = platform.system()
-
- if system == "Linux":
- return _get_window_info_linux()
- elif system == "Windows":
- return _get_window_info_windows()
- elif system == "Darwin": # macOS
- return _get_window_info_macos()
- else:
- return {"title": "unknown_window", "app_name": "unknown_app"}
-
-
-def _get_window_info_linux() -> Dict[str, str]:
- """
- Linux: utilise xdotool (X11)
-
- Nécessite: sudo apt-get install xdotool
- """
- title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
- pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
-
- app_name: Optional[str] = None
- if pid_str:
- pid_str = pid_str.strip()
- # On récupère le nom du binaire via ps
- app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
-
- if not title:
- title = "unknown_window"
- if not app_name:
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
-
-def _get_window_info_windows() -> Dict[str, str]:
- """
- Windows: utilise pywin32 + psutil
-
- Nécessite: pip install pywin32 psutil
- """
- try:
- import win32gui
- import win32process
- import psutil
-
- # Fenêtre au premier plan
- hwnd = win32gui.GetForegroundWindow()
-
- # Titre de la fenêtre
- title = win32gui.GetWindowText(hwnd)
- if not title:
- title = "unknown_window"
-
- # PID du processus
- _, pid = win32process.GetWindowThreadProcessId(hwnd)
-
- # Nom du processus
- try:
- process = psutil.Process(pid)
- app_name = process.name()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- app_name = "unknown_app"
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
- except ImportError:
- # pywin32 ou psutil non installé
- return {
- "title": "unknown_window (pywin32 missing)",
- "app_name": "unknown_app (pywin32 missing)",
- }
- except Exception as e:
- return {
- "title": f"error: {e}",
- "app_name": "unknown_app",
- }
-
-
-def _get_window_info_macos() -> Dict[str, str]:
- """
- macOS: utilise pyobjc (AppKit)
-
- Nécessite: pip install pyobjc-framework-Cocoa
-
- Note: Nécessite les permissions "Accessibility" dans System Preferences
- """
- try:
- from AppKit import NSWorkspace
- from Quartz import (
- CGWindowListCopyWindowInfo,
- kCGWindowListOptionOnScreenOnly,
- kCGNullWindowID
- )
-
- # Application active
- active_app = NSWorkspace.sharedWorkspace().activeApplication()
- app_name = active_app.get('NSApplicationName', 'unknown_app')
-
- # Titre de la fenêtre (via Quartz)
- # On cherche la fenêtre de l'app active qui est au premier plan
- window_list = CGWindowListCopyWindowInfo(
- kCGWindowListOptionOnScreenOnly,
- kCGNullWindowID
- )
-
- title = "unknown_window"
- for window in window_list:
- owner_name = window.get('kCGWindowOwnerName', '')
- if owner_name == app_name:
- window_title = window.get('kCGWindowName', '')
- if window_title:
- title = window_title
- break
-
- return {
- "title": title,
- "app_name": app_name,
- }
-
- except ImportError:
- # pyobjc non installé
- return {
- "title": "unknown_window (pyobjc missing)",
- "app_name": "unknown_app (pyobjc missing)",
- }
- except Exception as e:
- return {
- "title": f"error: {e}",
- "app_name": "unknown_app",
- }
-
-
-# Test rapide
-if __name__ == "__main__":
- import time
-
- print(f"OS détecté: {platform.system()}")
- print("\nTest de capture fenêtre active (5 secondes)...")
- print("Changez de fenêtre pour tester!\n")
-
- for i in range(5):
- info = get_active_window_info()
- print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}")
- time.sleep(1)
diff --git a/core/detection/som_engine.py b/core/detection/som_engine.py
index 041aa7235..5aab8eaf7 100644
--- a/core/detection/som_engine.py
+++ b/core/detection/som_engine.py
@@ -25,6 +25,7 @@ from __future__ import annotations
import base64
import io
import logging
+import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, Tuple
@@ -33,8 +34,10 @@ from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
-# Chemin vers les poids YOLO d'OmniParser
-_YOLO_WEIGHTS = Path("/home/dom/ai/OmniParser/weights/icon_detect/model.pt")
+# Chemin vers les poids YOLO d'OmniParser (configurable via env)
+_YOLO_WEIGHTS = Path(
+ os.environ.get("SOM_YOLO_WEIGHTS", "/home/dom/ai/OmniParser/weights/icon_detect/model.pt")
+)
@dataclass
@@ -165,17 +168,17 @@ class SomEngine:
# ── 2. docTR : OCR pour lire le texte ──
if self._ocr is not None:
try:
- import numpy as np
from doctr.io import DocumentFile
# Convertir PIL → fichier temporaire pour docTR
import tempfile
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
screenshot.save(tmp, format="JPEG", quality=90)
tmp_path = tmp.name
- doc = DocumentFile.from_images([tmp_path])
- import os
- os.unlink(tmp_path)
- result_ocr = self._ocr(doc)
+ try:
+ doc = DocumentFile.from_images([tmp_path])
+ result_ocr = self._ocr(doc)
+ finally:
+ os.unlink(tmp_path)
for page in result_ocr.pages:
for block in page.blocks:
@@ -288,3 +291,25 @@ class SomEngine:
buf = io.BytesIO()
image.save(buf, format="JPEG", quality=quality)
return base64.b64encode(buf.getvalue()).decode()
+
+
+# ---------------------------------------------------------------------------
+# Singleton partagé (lazy-loaded, thread-safe)
+# ---------------------------------------------------------------------------
+_shared_engine: Optional[SomEngine] = None
+_shared_lock = __import__("threading").Lock()
+
+
+def get_shared_engine(device: str = "cuda") -> Optional[SomEngine]:
+ """Singleton SomEngine partagé entre tous les modules."""
+ global _shared_engine
+ if _shared_engine is None:
+ with _shared_lock:
+ if _shared_engine is None:
+ try:
+ _shared_engine = SomEngine(device=device)
+ logger.info("SomEngine singleton partagé initialisé")
+ except Exception as e:
+ logger.warning("SomEngine non disponible : %s", e)
+ return None
+ return _shared_engine