From a92d04621a66c4d970bb9f10e4d392c897df437f Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 31 Mar 2026 10:04:27 +0200 Subject: [PATCH] =?UTF-8?q?refactor:=20nettoyage=20agent=20+=20fix=20SomEn?= =?UTF-8?q?gine=20review=20(singleton=20partag=C3=A9,=20cache,=20thread-sa?= =?UTF-8?q?fe)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Nettoyage Windows agent : - Suppression lea_ui inutilisés (chat_widget, overlay, styles, etc. — -1991 lignes) - Suppression window_info*.py dupliqués (racine + core/ — -494 lignes) - build/ + dist/ supprimés (48 MB PyInstaller abandonné, gitignorés) Fix SomEngine (review quality guardian) : - Singleton GPU partagé via get_shared_engine() (1 instance au lieu de 2) - Thread-safe avec threading.Lock (double-checked locking) - Cache SomResult par screenshot_id (max 50, évite YOLO+OCR redondants) - Fuite fichier temp docTR corrigée (finally block) - Chemin YOLO configurable via SOM_YOLO_WEIGHTS env var - Guard som_image None avant VLM - Match texte partiel : len(label) >= 3 Co-Authored-By: Claude Opus 4.6 (1M context) --- agent_v0/agent_v1/core/window_info.py | 55 -- .../core/window_info_crossplatform.py | 192 ----- agent_v0/lea_ui/__init__.py | 11 +- agent_v0/lea_ui/__main__.py | 6 - agent_v0/lea_ui/chat_widget.py | 250 ------ agent_v0/lea_ui/launcher.py | 218 ----- agent_v0/lea_ui/main_window.py | 772 ------------------ agent_v0/lea_ui/overlay.py | 354 -------- agent_v0/lea_ui/replay_integration.py | 191 ----- agent_v0/lea_ui/styles.py | 200 ----- agent_v0/server_v1/api_stream.py | 26 +- agent_v0/server_v1/stream_processor.py | 63 +- agent_v0/window_info.py | 55 -- agent_v0/window_info_crossplatform.py | 192 ----- core/detection/som_engine.py | 39 +- 15 files changed, 84 insertions(+), 2540 deletions(-) delete mode 100644 agent_v0/agent_v1/core/window_info.py delete mode 100644 agent_v0/agent_v1/core/window_info_crossplatform.py delete mode 100644 agent_v0/lea_ui/__main__.py delete mode 100644 agent_v0/lea_ui/chat_widget.py delete mode 100644 agent_v0/lea_ui/launcher.py delete mode 100644 agent_v0/lea_ui/main_window.py delete mode 100644 agent_v0/lea_ui/overlay.py delete mode 100644 agent_v0/lea_ui/replay_integration.py delete mode 100644 agent_v0/lea_ui/styles.py delete mode 100644 agent_v0/window_info.py delete mode 100644 agent_v0/window_info_crossplatform.py diff --git a/agent_v0/agent_v1/core/window_info.py b/agent_v0/agent_v1/core/window_info.py deleted file mode 100644 index 7e6be8744..000000000 --- a/agent_v0/agent_v1/core/window_info.py +++ /dev/null @@ -1,55 +0,0 @@ -# window_info.py -""" -Récupération des informations sur la fenêtre active (X11). - -v0 : -- utilise xdotool pour obtenir : - - le titre de la fenêtre active - - le PID de la fenêtre active, puis le nom du process via ps - -Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown". -""" - -from __future__ import annotations - -import subprocess -from typing import Dict, Optional - - -def _run_cmd(cmd: list[str]) -> Optional[str]: - """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur.""" - try: - out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) - return out.decode("utf-8", errors="ignore").strip() - except Exception: - return None - - -def get_active_window_info() -> Dict[str, str]: - """ - Renvoie un dict : - { - "title": "...", - "app_name": "..." - } - - Nécessite xdotool installé sur le système. - """ - title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"]) - pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"]) - - app_name: Optional[str] = None - if pid_str: - pid_str = pid_str.strip() - # On récupère le nom du binaire via ps - app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="]) - - if not title: - title = "unknown_window" - if not app_name: - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } diff --git a/agent_v0/agent_v1/core/window_info_crossplatform.py b/agent_v0/agent_v1/core/window_info_crossplatform.py deleted file mode 100644 index ba059a3fc..000000000 --- a/agent_v0/agent_v1/core/window_info_crossplatform.py +++ /dev/null @@ -1,192 +0,0 @@ -# window_info_crossplatform.py -""" -Récupération des informations sur la fenêtre active - CROSS-PLATFORM - -Supporte: -- Linux (X11 via xdotool) -- Windows (via pywin32) -- macOS (via pyobjc) - -Installation des dépendances: - pip install pywin32 # Windows - pip install pyobjc-framework-Cocoa # macOS - pip install psutil # Tous OS -""" - -from __future__ import annotations - -import platform -import subprocess -from typing import Dict, Optional - - -def _run_cmd(cmd: list[str]) -> Optional[str]: - """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur.""" - try: - out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) - return out.decode("utf-8", errors="ignore").strip() - except Exception: - return None - - -def get_active_window_info() -> Dict[str, str]: - """ - Renvoie un dict : - { - "title": "...", - "app_name": "..." - } - - Détecte automatiquement l'OS et utilise la méthode appropriée. - """ - system = platform.system() - - if system == "Linux": - return _get_window_info_linux() - elif system == "Windows": - return _get_window_info_windows() - elif system == "Darwin": # macOS - return _get_window_info_macos() - else: - return {"title": "unknown_window", "app_name": "unknown_app"} - - -def _get_window_info_linux() -> Dict[str, str]: - """ - Linux: utilise xdotool (X11) - - Nécessite: sudo apt-get install xdotool - """ - title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"]) - pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"]) - - app_name: Optional[str] = None - if pid_str: - pid_str = pid_str.strip() - # On récupère le nom du binaire via ps - app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="]) - - if not title: - title = "unknown_window" - if not app_name: - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } - - -def _get_window_info_windows() -> Dict[str, str]: - """ - Windows: utilise pywin32 + psutil - - Nécessite: pip install pywin32 psutil - """ - try: - import win32gui - import win32process - import psutil - - # Fenêtre au premier plan - hwnd = win32gui.GetForegroundWindow() - - # Titre de la fenêtre - title = win32gui.GetWindowText(hwnd) - if not title: - title = "unknown_window" - - # PID du processus - _, pid = win32process.GetWindowThreadProcessId(hwnd) - - # Nom du processus - try: - process = psutil.Process(pid) - app_name = process.name() - except (psutil.NoSuchProcess, psutil.AccessDenied): - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } - - except ImportError: - # pywin32 ou psutil non installé - return { - "title": "unknown_window (pywin32 missing)", - "app_name": "unknown_app (pywin32 missing)", - } - except Exception as e: - return { - "title": f"error: {e}", - "app_name": "unknown_app", - } - - -def _get_window_info_macos() -> Dict[str, str]: - """ - macOS: utilise pyobjc (AppKit) - - Nécessite: pip install pyobjc-framework-Cocoa - - Note: Nécessite les permissions "Accessibility" dans System Preferences - """ - try: - from AppKit import NSWorkspace - from Quartz import ( - CGWindowListCopyWindowInfo, - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID - ) - - # Application active - active_app = NSWorkspace.sharedWorkspace().activeApplication() - app_name = active_app.get('NSApplicationName', 'unknown_app') - - # Titre de la fenêtre (via Quartz) - # On cherche la fenêtre de l'app active qui est au premier plan - window_list = CGWindowListCopyWindowInfo( - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID - ) - - title = "unknown_window" - for window in window_list: - owner_name = window.get('kCGWindowOwnerName', '') - if owner_name == app_name: - window_title = window.get('kCGWindowName', '') - if window_title: - title = window_title - break - - return { - "title": title, - "app_name": app_name, - } - - except ImportError: - # pyobjc non installé - return { - "title": "unknown_window (pyobjc missing)", - "app_name": "unknown_app (pyobjc missing)", - } - except Exception as e: - return { - "title": f"error: {e}", - "app_name": "unknown_app", - } - - -# Test rapide -if __name__ == "__main__": - import time - - print(f"OS détecté: {platform.system()}") - print("\nTest de capture fenêtre active (5 secondes)...") - print("Changez de fenêtre pour tester!\n") - - for i in range(5): - info = get_active_window_info() - print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}") - time.sleep(1) diff --git a/agent_v0/lea_ui/__init__.py b/agent_v0/lea_ui/__init__.py index b8fb82ffd..f4463cbba 100644 --- a/agent_v0/lea_ui/__init__.py +++ b/agent_v0/lea_ui/__init__.py @@ -1,13 +1,6 @@ -# agent_v0.lea_ui — Interface utilisateur "Lea" +# agent_v0.lea_ui — Communication serveur pour l'agent Léa # -# Panneau PyQt5 integre qui remplace le system tray + navigateur web -# par une interface unifiee pour piloter l'Agent RPA Vision V3. -# -# Composants : -# - LeaMainWindow : fenetre principale ancree a droite -# - ChatWidget : zone de conversation avec le serveur -# - OverlayWidget : feedback visuel pendant le replay +# Composant : # - LeaServerClient : client API vers le serveur Linux -# - styles : theme et couleurs __version__ = "0.1.0" diff --git a/agent_v0/lea_ui/__main__.py b/agent_v0/lea_ui/__main__.py deleted file mode 100644 index 875a978ab..000000000 --- a/agent_v0/lea_ui/__main__.py +++ /dev/null @@ -1,6 +0,0 @@ -# agent_v0/lea_ui/__main__.py -"""Permet le lancement via: python -m agent_v0.lea_ui""" - -from .launcher import main - -main() diff --git a/agent_v0/lea_ui/chat_widget.py b/agent_v0/lea_ui/chat_widget.py deleted file mode 100644 index f66b68669..000000000 --- a/agent_v0/lea_ui/chat_widget.py +++ /dev/null @@ -1,250 +0,0 @@ -# agent_v0/lea_ui/chat_widget.py -""" -Widget de chat pour l'interface Lea. - -Affiche les messages avec des bulles : - - Utilisateur a droite (fond indigo) - - Lea a gauche (fond blanc) - -Communique avec le serveur Linux via LeaServerClient. -""" - -from __future__ import annotations - -import logging -from typing import List, Optional - -from PyQt5.QtCore import ( - QPropertyAnimation, - QSize, - Qt, - QTimer, - pyqtSignal, - pyqtSlot, -) -from PyQt5.QtGui import QColor, QFont, QPainter, QPainterPath, QPen -from PyQt5.QtWidgets import ( - QFrame, - QHBoxLayout, - QLabel, - QLineEdit, - QPushButton, - QScrollArea, - QSizePolicy, - QVBoxLayout, - QWidget, -) - -from . import styles - -logger = logging.getLogger("lea_ui.chat") - - -class ChatBubble(QFrame): - """Bulle de message individuelle.""" - - def __init__( - self, - text: str, - is_user: bool = False, - parent: Optional[QWidget] = None, - ) -> None: - super().__init__(parent) - self._is_user = is_user - - # Style de la bulle - if is_user: - bg_color = styles.COLOR_BUBBLE_USER - text_color = styles.COLOR_TEXT_ON_ACCENT - align = Qt.AlignRight - else: - bg_color = styles.COLOR_BUBBLE_LEA - text_color = styles.COLOR_TEXT - align = Qt.AlignLeft - - self.setStyleSheet(f""" - QFrame {{ - background-color: {bg_color}; - border-radius: {styles.BUBBLE_RADIUS}px; - padding: {styles.PADDING}px; - border: {"none" if is_user else f"1px solid {styles.COLOR_BORDER}"}; - }} - """) - - layout = QVBoxLayout(self) - layout.setContentsMargins( - styles.PADDING, styles.PADDING // 2, - styles.PADDING, styles.PADDING // 2, - ) - - label = QLabel(text) - label.setWordWrap(True) - label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_NORMAL)) - label.setStyleSheet(f"color: {text_color}; background: transparent; border: none;") - label.setTextFormat(Qt.RichText) - label.setOpenExternalLinks(True) - layout.addWidget(label) - - self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum) - self.setMaximumWidth(280) - - -class ChatWidget(QWidget): - """Widget de chat complet avec zone de messages et champ de saisie. - - Signals : - message_sent(str) : emis quand l'utilisateur envoie un message - """ - - message_sent = pyqtSignal(str) - - def __init__(self, parent: Optional[QWidget] = None) -> None: - super().__init__(parent) - self._messages: List[dict] = [] - self._setup_ui() - - def _setup_ui(self) -> None: - layout = QVBoxLayout(self) - layout.setContentsMargins(0, 0, 0, 0) - layout.setSpacing(0) - - # Zone de messages (scrollable) - self._scroll_area = QScrollArea() - self._scroll_area.setWidgetResizable(True) - self._scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) - self._scroll_area.setStyleSheet(styles.CHAT_AREA_STYLE) - - self._messages_container = QWidget() - self._messages_container.setObjectName("ChatContainer") - self._messages_layout = QVBoxLayout(self._messages_container) - self._messages_layout.setContentsMargins( - styles.PADDING, styles.PADDING, - styles.PADDING, styles.PADDING, - ) - self._messages_layout.setSpacing(styles.SPACING) - self._messages_layout.addStretch() - - self._scroll_area.setWidget(self._messages_container) - layout.addWidget(self._scroll_area, stretch=1) - - # Separateur - sep = QFrame() - sep.setFrameShape(QFrame.HLine) - sep.setStyleSheet(f"background-color: {styles.COLOR_BORDER}; max-height: 1px;") - layout.addWidget(sep) - - # Zone de saisie - input_layout = QHBoxLayout() - input_layout.setContentsMargins( - styles.PADDING, styles.SPACING, - styles.PADDING, styles.SPACING, - ) - input_layout.setSpacing(styles.SPACING) - - self._input = QLineEdit() - self._input.setObjectName("ChatInput") - self._input.setPlaceholderText("Ecrivez un message...") - self._input.setStyleSheet(styles.INPUT_STYLE) - self._input.returnPressed.connect(self._on_send) - input_layout.addWidget(self._input, stretch=1) - - self._send_btn = QPushButton("Envoyer") - self._send_btn.setObjectName("SendButton") - self._send_btn.setStyleSheet(styles.SEND_BUTTON_STYLE) - self._send_btn.setCursor(Qt.PointingHandCursor) - self._send_btn.clicked.connect(self._on_send) - input_layout.addWidget(self._send_btn) - - layout.addLayout(input_layout) - - def _on_send(self) -> None: - """Envoyer le message saisi.""" - text = self._input.text().strip() - if not text: - return - - self._input.clear() - self.add_user_message(text) - self.message_sent.emit(text) - - # --------------------------------------------------------------------------- - # API publique - # --------------------------------------------------------------------------- - - def add_user_message(self, text: str) -> None: - """Ajouter un message utilisateur (bulle a droite).""" - self._add_bubble(text, is_user=True) - - def add_lea_message(self, text: str) -> None: - """Ajouter un message de Lea (bulle a gauche).""" - self._add_bubble(text, is_user=False) - - def add_system_message(self, text: str) -> None: - """Ajouter un message systeme (centre, discret).""" - label = QLabel(text) - label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL)) - label.setStyleSheet( - f"color: {styles.COLOR_TEXT_SECONDARY}; " - f"background: transparent; padding: 4px;" - ) - label.setAlignment(Qt.AlignCenter) - label.setWordWrap(True) - - # Inserer avant le stretch final - count = self._messages_layout.count() - self._messages_layout.insertWidget(count - 1, label) - self._scroll_to_bottom() - - def set_input_enabled(self, enabled: bool) -> None: - """Activer/desactiver la saisie (pendant le chargement).""" - self._input.setEnabled(enabled) - self._send_btn.setEnabled(enabled) - if not enabled: - self._input.setPlaceholderText("Lea reflechit...") - else: - self._input.setPlaceholderText("Ecrivez un message...") - - def clear_messages(self) -> None: - """Effacer tous les messages.""" - while self._messages_layout.count() > 1: - item = self._messages_layout.takeAt(0) - widget = item.widget() - if widget: - widget.deleteLater() - self._messages = [] - - # --------------------------------------------------------------------------- - # Internals - # --------------------------------------------------------------------------- - - def _add_bubble(self, text: str, is_user: bool) -> None: - """Ajouter une bulle au conteneur de messages.""" - bubble = ChatBubble(text, is_user=is_user) - - # Conteneur d'alignement - row = QHBoxLayout() - row.setContentsMargins(0, 0, 0, 0) - if is_user: - row.addStretch() - row.addWidget(bubble) - else: - row.addWidget(bubble) - row.addStretch() - - # Inserer avant le stretch final - count = self._messages_layout.count() - wrapper = QWidget() - wrapper.setLayout(row) - wrapper.setStyleSheet("background: transparent;") - self._messages_layout.insertWidget(count - 1, wrapper) - - self._messages.append({"text": text, "is_user": is_user}) - self._scroll_to_bottom() - - def _scroll_to_bottom(self) -> None: - """Scroller vers le bas apres l'ajout d'un message.""" - QTimer.singleShot(50, lambda: ( - self._scroll_area.verticalScrollBar().setValue( - self._scroll_area.verticalScrollBar().maximum() - ) - )) diff --git a/agent_v0/lea_ui/launcher.py b/agent_v0/lea_ui/launcher.py deleted file mode 100644 index 786b0183f..000000000 --- a/agent_v0/lea_ui/launcher.py +++ /dev/null @@ -1,218 +0,0 @@ -# agent_v0/lea_ui/launcher.py -""" -Point d'entree pour le panneau Lea. - -Lancement autonome : - python -m agent_v0.lea_ui.launcher - -Ou integre dans agent_v0/agent_v1/main.py avec flag --ui lea. - -Ce module : - 1. Cree l'application Qt - 2. Instancie LeaServerClient - 3. Instancie LeaMainWindow - 4. Enregistre un raccourci global (Ctrl+Shift+L) via keyboard hook - 5. Lance la boucle Qt -""" - -from __future__ import annotations - -import argparse -import logging -import os -import sys -from typing import Optional - -logger = logging.getLogger("lea_ui.launcher") - - -def _setup_logging(verbose: bool = False) -> None: - """Configurer le logging pour le panneau Lea.""" - level = logging.DEBUG if verbose else logging.INFO - logging.basicConfig( - level=level, - format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", - datefmt="%H:%M:%S", - ) - - -def _setup_global_hotkey(window) -> Optional[object]: - """Enregistrer le raccourci global Ctrl+Shift+L pour afficher/cacher le panneau. - - Utilise la librairie keyboard si disponible (Windows/Linux). - Retourne le hook pour pouvoir le desinscrire a l'arret. - """ - try: - import keyboard - - def on_hotkey(): - # Appeler toggle_visibility dans le thread Qt - from PyQt5.QtCore import QTimer - QTimer.singleShot(0, window.toggle_visibility) - - keyboard.add_hotkey("ctrl+shift+l", on_hotkey) - logger.info("Raccourci global Ctrl+Shift+L enregistre") - return True - except ImportError: - logger.info( - "Librairie 'keyboard' non disponible — " - "raccourci global Ctrl+Shift+L non enregistre. " - "Installez-la avec: pip install keyboard" - ) - return None - except Exception as e: - logger.warning("Impossible d'enregistrer le raccourci global : %s", e) - return None - - -def _load_environment() -> None: - """Charger les variables d'environnement depuis .env.local.""" - env_paths = [ - os.path.join(os.path.dirname(__file__), "..", "..", ".env.local"), - os.path.join(os.path.dirname(__file__), "..", ".env.local"), - ] - for env_path in env_paths: - env_path = os.path.abspath(env_path) - if os.path.exists(env_path): - try: - from dotenv import load_dotenv - load_dotenv(env_path) - logger.info("Variables d'environnement chargees depuis %s", env_path) - return - except ImportError: - # Fallback : chargement manuel - with open(env_path, "r", encoding="utf-8") as f: - for line in f: - line = line.strip() - if line and not line.startswith("#") and "=" in line: - key, value = line.split("=", 1) - value = value.strip("\"'") - os.environ[key.strip()] = value - logger.info("Variables chargees manuellement depuis %s", env_path) - return - - -def launch_lea( - server_host: Optional[str] = None, - chat_port: int = 5004, - stream_port: int = 5005, - verbose: bool = False, - session_id: Optional[str] = None, -) -> None: - """Lancer le panneau Lea. - - Args: - server_host: adresse du serveur Linux (None = auto-detection) - chat_port: port du serveur chat - stream_port: port du serveur streaming - verbose: mode debug - session_id: identifiant de session pour le polling replay - """ - _setup_logging(verbose) - _load_environment() - - # Import PyQt5 ici pour un message d'erreur clair si absent - try: - from PyQt5.QtWidgets import QApplication - from PyQt5.QtCore import Qt - except ImportError: - logger.error( - "PyQt5 n'est pas installe. Installez-le avec :\n" - " pip install PyQt5" - ) - sys.exit(1) - - from .server_client import LeaServerClient - from .main_window import LeaMainWindow - - # Creer ou recuperer l'application Qt - app = QApplication.instance() - if app is None: - app = QApplication(sys.argv) - app.setQuitOnLastWindowClosed(False) - - # Client serveur - client = LeaServerClient( - server_host=server_host, - chat_port=chat_port, - stream_port=stream_port, - ) - - # Fenetre principale - window = LeaMainWindow(server_client=client) - window.show() - - # Raccourci global - hotkey = _setup_global_hotkey(window) - - # Polling replay (si session_id fourni) - if session_id: - client.start_polling(session_id) - - logger.info( - "Panneau Lea demarre — serveur=%s, chat_port=%d, stream_port=%d", - client.server_host, chat_port, stream_port, - ) - - # Boucle Qt - try: - exit_code = app.exec_() - finally: - window.shutdown() - if hotkey: - try: - import keyboard - keyboard.unhook_all() - except Exception: - pass - - sys.exit(exit_code) - - -def main() -> None: - """Point d'entree CLI.""" - parser = argparse.ArgumentParser( - description="Panneau Lea — Interface utilisateur RPA Vision V3", - ) - parser.add_argument( - "--server", "-s", - dest="server_host", - default=None, - help="Adresse du serveur Linux (defaut: RPA_SERVER_HOST ou localhost)", - ) - parser.add_argument( - "--chat-port", - type=int, - default=5004, - help="Port du serveur chat (defaut: 5004)", - ) - parser.add_argument( - "--stream-port", - type=int, - default=5005, - help="Port du serveur streaming (defaut: 5005)", - ) - parser.add_argument( - "--session-id", - default=None, - help="Identifiant de session pour le polling replay", - ) - parser.add_argument( - "--verbose", "-v", - action="store_true", - help="Mode debug (logs verbeux)", - ) - - args = parser.parse_args() - - launch_lea( - server_host=args.server_host, - chat_port=args.chat_port, - stream_port=args.stream_port, - verbose=args.verbose, - session_id=args.session_id, - ) - - -if __name__ == "__main__": - main() diff --git a/agent_v0/lea_ui/main_window.py b/agent_v0/lea_ui/main_window.py deleted file mode 100644 index ed5055275..000000000 --- a/agent_v0/lea_ui/main_window.py +++ /dev/null @@ -1,772 +0,0 @@ -# agent_v0/lea_ui/main_window.py -""" -Fenetre principale du panneau Lea. - -Panneau semi-transparent, ancre a droite de l'ecran, toujours visible. -Peut etre reduit en mini-barre flottante (avatar + indicateur status). - -Sections : - - Header : avatar "L" + status connexion - - Zone de chat : messages entrants/sortants (natif PyQt5) - - Zone de status : progression du replay - - Boutons rapides : "Apprends-moi", "Que sais-tu faire ?" -""" - -from __future__ import annotations - -import logging -from typing import Dict, Any, Optional - -from PyQt5.QtCore import ( - QPoint, - QPropertyAnimation, - QRect, - QSize, - Qt, - QTimer, - pyqtSignal, - pyqtSlot, -) -from PyQt5.QtGui import ( - QColor, - QFont, - QIcon, - QKeySequence, - QPainter, - QPainterPath, - QPen, -) -from PyQt5.QtWidgets import ( - QAction, - QApplication, - QDesktopWidget, - QFrame, - QGraphicsDropShadowEffect, - QHBoxLayout, - QLabel, - QProgressBar, - QPushButton, - QShortcut, - QSizePolicy, - QVBoxLayout, - QWidget, -) - -from . import styles -from .chat_widget import ChatWidget -from .overlay import OverlayWidget -from .server_client import LeaServerClient - -logger = logging.getLogger("lea_ui.main_window") - - -class LeaAvatar(QWidget): - """Avatar rond avec l'initiale 'L'.""" - - def __init__(self, size: int = 40, parent: Optional[QWidget] = None) -> None: - super().__init__(parent) - self._size = size - self._connected = False - self.setFixedSize(size, size) - - def set_connected(self, connected: bool) -> None: - self._connected = connected - self.update() - - def paintEvent(self, event) -> None: # noqa: N802 - painter = QPainter(self) - painter.setRenderHint(QPainter.Antialiasing, True) - - # Cercle de fond - painter.setBrush(QColor(styles.COLOR_ACCENT)) - painter.setPen(Qt.NoPen) - painter.drawEllipse(2, 2, self._size - 4, self._size - 4) - - # Initiale "L" - painter.setPen(QColor(styles.COLOR_TEXT_ON_ACCENT)) - font = QFont(styles.FONT_FAMILY, self._size // 3, QFont.Bold) - painter.setFont(font) - painter.drawText( - QRect(0, 0, self._size, self._size), - Qt.AlignCenter, - "L", - ) - - # Indicateur de connexion (petit cercle en bas a droite) - indicator_size = 12 - ix = self._size - indicator_size - 1 - iy = self._size - indicator_size - 1 - indicator_color = ( - QColor(styles.COLOR_SUCCESS) if self._connected - else QColor(styles.COLOR_ERROR) - ) - painter.setBrush(indicator_color) - painter.setPen(QPen(QColor(styles.COLOR_BG), 2)) - painter.drawEllipse(ix, iy, indicator_size, indicator_size) - - painter.end() - - -class LeaMainWindow(QWidget): - """Panneau principal de l'interface Lea. - - Fenetre semi-transparente, ancree a droite de l'ecran. - Peut basculer en mode mini-barre. - """ - - # Signal pour les actions de replay a afficher sur l'overlay - replay_action_received = pyqtSignal(dict) - - def __init__( - self, - server_client: Optional[LeaServerClient] = None, - parent: Optional[QWidget] = None, - ) -> None: - super().__init__(parent) - - # Client serveur - self._client = server_client or LeaServerClient() - - # Overlay de feedback - self._overlay = OverlayWidget() - - # Mode courant - self._minimized = False - - # Setup - self._setup_window() - self._setup_ui() - self._setup_shortcuts() - self._connect_signals() - self._start_connection_check() - - # Message d'accueil - QTimer.singleShot(500, self._show_welcome) - - # --------------------------------------------------------------------------- - # Setup - # --------------------------------------------------------------------------- - - def _setup_window(self) -> None: - """Configurer les proprietes de la fenetre.""" - self.setWindowFlags( - Qt.WindowStaysOnTopHint - | Qt.FramelessWindowHint - | Qt.Tool - ) - self.setAttribute(Qt.WA_TranslucentBackground, True) - self.setObjectName("LeaMainWindow") - - # Dimensions et position (ancre a droite) - self.setFixedWidth(styles.PANEL_WIDTH) - self.setMinimumHeight(styles.PANEL_MIN_HEIGHT) - self._anchor_to_right() - - # Ombre portee - shadow = QGraphicsDropShadowEffect() - shadow.setBlurRadius(20) - shadow.setColor(QColor(0, 0, 0, 60)) - shadow.setOffset(0, 4) - self.setGraphicsEffect(shadow) - - def _anchor_to_right(self) -> None: - """Positionner le panneau ancre a droite de l'ecran.""" - desktop = QApplication.desktop() - if desktop: - screen_rect = desktop.availableGeometry(desktop.primaryScreen()) - x = screen_rect.right() - styles.PANEL_WIDTH - 10 - y = screen_rect.top() + 40 - height = screen_rect.height() - 80 - self.setGeometry(x, y, styles.PANEL_WIDTH, height) - - def _setup_ui(self) -> None: - """Construire l'interface du panneau.""" - # Conteneur principal avec fond et coins arrondis - self._main_layout = QVBoxLayout(self) - self._main_layout.setContentsMargins(0, 0, 0, 0) - self._main_layout.setSpacing(0) - - # Widget de fond (pour appliquer le style) - self._bg_widget = QWidget() - self._bg_widget.setObjectName("LeaPanelBg") - self._bg_widget.setStyleSheet(f""" - QWidget#LeaPanelBg {{ - background-color: {styles.COLOR_BG}; - border-radius: {styles.BORDER_RADIUS}px; - border: 1px solid {styles.COLOR_BORDER}; - }} - """) - - bg_layout = QVBoxLayout(self._bg_widget) - bg_layout.setContentsMargins(0, 0, 0, 0) - bg_layout.setSpacing(0) - - # --- Header --- - self._header = self._create_header() - bg_layout.addWidget(self._header) - - # --- Chat --- - self._chat = ChatWidget() - bg_layout.addWidget(self._chat, stretch=1) - - # --- Zone de status replay --- - self._status_bar = self._create_status_bar() - bg_layout.addWidget(self._status_bar) - - # --- Boutons rapides --- - self._quick_buttons = self._create_quick_buttons() - bg_layout.addWidget(self._quick_buttons) - - self._main_layout.addWidget(self._bg_widget) - - # --- Mini-barre (cachee par defaut) --- - self._mini_bar = self._create_mini_bar() - self._mini_bar.hide() - self._main_layout.addWidget(self._mini_bar) - - def _create_header(self) -> QWidget: - """Creer le header avec avatar et status.""" - header = QWidget() - header.setObjectName("LeaHeader") - header.setStyleSheet(styles.HEADER_STYLE) - header.setFixedHeight(60) - - layout = QHBoxLayout(header) - layout.setContentsMargins( - styles.PADDING, styles.SPACING, - styles.PADDING, styles.SPACING, - ) - - # Avatar - self._avatar = LeaAvatar(styles.AVATAR_SIZE) - layout.addWidget(self._avatar) - - # Titre + status - text_layout = QVBoxLayout() - text_layout.setSpacing(2) - - title = QLabel("Lea") - title.setObjectName("LeaTitle") - title.setStyleSheet(styles.HEADER_STYLE) - text_layout.addWidget(title) - - self._status_label = QLabel("Connexion...") - self._status_label.setObjectName("LeaStatus") - self._status_label.setStyleSheet(styles.HEADER_STYLE) - text_layout.addWidget(self._status_label) - - layout.addLayout(text_layout, stretch=1) - - # Bouton reduire - minimize_btn = QPushButton("_") - minimize_btn.setFixedSize(30, 30) - minimize_btn.setCursor(Qt.PointingHandCursor) - minimize_btn.setStyleSheet(f""" - QPushButton {{ - background: transparent; - color: {styles.COLOR_TEXT_SECONDARY}; - border: none; - border-radius: 15px; - font-size: 16px; - font-weight: bold; - }} - QPushButton:hover {{ - background-color: {styles.COLOR_BORDER}; - }} - """) - minimize_btn.clicked.connect(self.toggle_minimize) - layout.addWidget(minimize_btn) - - return header - - def _create_status_bar(self) -> QWidget: - """Creer la barre de status du replay.""" - container = QWidget() - container.setFixedHeight(50) - layout = QVBoxLayout(container) - layout.setContentsMargins( - styles.PADDING, styles.SPACING, - styles.PADDING, styles.SPACING, - ) - layout.setSpacing(4) - - self._replay_label = QLabel("") - self._replay_label.setObjectName("StatusLabel") - self._replay_label.setStyleSheet(styles.STATUS_LABEL_STYLE) - self._replay_label.hide() - layout.addWidget(self._replay_label) - - self._progress_bar = QProgressBar() - self._progress_bar.setStyleSheet(styles.PROGRESS_STYLE) - self._progress_bar.setTextVisible(False) - self._progress_bar.hide() - layout.addWidget(self._progress_bar) - - container.hide() - self._status_container = container - return container - - def _create_quick_buttons(self) -> QWidget: - """Creer les boutons d'action rapide.""" - container = QWidget() - layout = QHBoxLayout(container) - layout.setContentsMargins( - styles.PADDING, styles.SPACING, - styles.PADDING, styles.PADDING, - ) - layout.setSpacing(styles.SPACING) - - btn_learn = QPushButton("Apprends-moi") - btn_learn.setObjectName("QuickButton") - btn_learn.setStyleSheet(styles.QUICK_BUTTON_STYLE) - btn_learn.setCursor(Qt.PointingHandCursor) - btn_learn.clicked.connect(self._on_learn_clicked) - layout.addWidget(btn_learn) - - btn_list = QPushButton("Que sais-tu faire ?") - btn_list.setObjectName("QuickButton") - btn_list.setStyleSheet(styles.QUICK_BUTTON_STYLE) - btn_list.setCursor(Qt.PointingHandCursor) - btn_list.clicked.connect(self._on_list_clicked) - layout.addWidget(btn_list) - - return container - - def _create_mini_bar(self) -> QWidget: - """Creer la mini-barre flottante (mode reduit).""" - bar = QWidget() - bar.setObjectName("MiniBar") - bar.setStyleSheet(styles.MINI_BAR_STYLE) - bar.setFixedSize(80, 50) - - layout = QHBoxLayout(bar) - layout.setContentsMargins(8, 4, 8, 4) - - mini_avatar = LeaAvatar(32) - self._mini_avatar = mini_avatar - layout.addWidget(mini_avatar) - - expand_btn = QPushButton(">") - expand_btn.setFixedSize(24, 24) - expand_btn.setCursor(Qt.PointingHandCursor) - expand_btn.setStyleSheet(f""" - QPushButton {{ - background: transparent; - color: {styles.COLOR_TEXT_SECONDARY}; - border: none; - font-size: 14px; - font-weight: bold; - }} - QPushButton:hover {{ - color: {styles.COLOR_ACCENT}; - }} - """) - expand_btn.clicked.connect(self.toggle_minimize) - layout.addWidget(expand_btn) - - return bar - - def _setup_shortcuts(self) -> None: - """Configurer les raccourcis globaux.""" - # Ctrl+Shift+L pour afficher/cacher - # Note : Sur Windows, les raccourcis globaux necessitent - # un mecanisme supplementaire (keyboard hook). Ici on utilise - # le raccourci local qui fonctionne quand le panneau a le focus. - # Un hook global sera ajoute dans le launcher. - shortcut = QShortcut(QKeySequence("Ctrl+Shift+L"), self) - shortcut.activated.connect(self.toggle_visibility) - - def _connect_signals(self) -> None: - """Connecter les signaux internes.""" - # Chat - self._chat.message_sent.connect(self._on_message_sent) - - # Client serveur - self._client.set_on_connection_change(self._on_connection_changed) - self._client.set_on_replay_action(self._on_replay_action) - - # Overlay - self._overlay.action_display_finished.connect(self._on_overlay_finished) - - # Replay via signal (thread-safe) - self.replay_action_received.connect(self._handle_replay_action) - - def _start_connection_check(self) -> None: - """Demarrer le timer de verification de connexion.""" - self._conn_timer = QTimer(self) - self._conn_timer.timeout.connect(self._check_connection) - self._conn_timer.start(10000) # Toutes les 10 secondes - # Premiere verification immediatement - QTimer.singleShot(1000, self._check_connection) - - # --------------------------------------------------------------------------- - # Actions - # --------------------------------------------------------------------------- - - def _show_welcome(self) -> None: - """Afficher le message d'accueil.""" - self._chat.add_lea_message( - "Bonjour ! Je suis Lea, votre assistante RPA.
" - "Je peux apprendre vos taches, les rejouer, " - "et vous montrer ce que je fais.

" - "Que souhaitez-vous faire ?" - ) - - @pyqtSlot(str) - def _on_message_sent(self, message: str) -> None: - """Traiter un message envoye par l'utilisateur.""" - self._chat.set_input_enabled(False) - - # Envoyer au serveur dans un timer pour ne pas bloquer - QTimer.singleShot(100, lambda: self._send_to_server(message)) - - def _send_to_server(self, message: str) -> None: - """Envoyer le message au serveur et afficher la reponse.""" - response = self._client.send_chat_message(message) - - if response is None: - self._chat.add_lea_message( - "Je n'arrive pas a joindre le serveur. " - "Verifiez que le serveur Linux est demarre." - ) - elif "error" in response: - self._chat.add_lea_message( - f"Erreur : {response['error']}" - ) - else: - # Extraire la reponse textuelle - reply_text = response.get("response", "") - if not reply_text: - # Construire une reponse a partir des donnees structurees - reply_text = self._format_response(response) - - self._chat.add_lea_message(reply_text) - - # Si un workflow a ete lance, mettre a jour la status bar - if response.get("success") and response.get("workflow"): - self._show_replay_status( - f"Execution : {response['workflow']}", - 0, 1, - ) - - self._chat.set_input_enabled(True) - - def _format_response(self, data: Dict[str, Any]) -> str: - """Formater une reponse structuree du serveur en texte lisible.""" - # Reponse de confirmation - if data.get("needs_confirmation"): - conf = data.get("confirmation", {}) - return ( - f"Voulez-vous que j'execute {conf.get('workflow_name', '?')} ?
" - f"Risque : {conf.get('risk_level', 'normal')}
" - "Repondez oui ou non." - ) - - # Liste de workflows - if "workflows" in data: - workflows = data["workflows"] - if not workflows: - return "Je ne connais aucun workflow pour le moment." - items = [] - for wf in workflows[:10]: - name = wf.get("name", wf.get("id", "?")) - desc = wf.get("description", "") - items.append(f"- {name}{': ' + desc if desc else ''}") - result = "Voici ce que je sais faire :
" + "
".join(items) - if len(workflows) > 10: - result += f"
... et {len(workflows) - 10} autres" - return result - - # Workflow non trouve - if data.get("not_found"): - return ( - f"Je ne trouve pas de workflow correspondant a " - f"'{data.get('query', '?')}'.
" - "Essayez 'Que sais-tu faire ?' pour voir la liste." - ) - - # Execution reussie - if data.get("success"): - return ( - f"C'est parti ! J'execute {data.get('workflow', '?')}.
" - "Regardez l'ecran, je vais vous montrer ce que je fais." - ) - - # Confirmation/refus - if data.get("confirmed"): - return f"D'accord, je lance {data.get('workflow', '?')} !" - if data.get("denied"): - return "Pas de probleme, j'annule." - - # Fallback - return str(data) - - def _on_learn_clicked(self) -> None: - """Action du bouton 'Apprends-moi'.""" - self._chat.add_user_message("Apprends-moi une nouvelle tache") - self._chat.add_lea_message( - "D'accord ! Pour m'apprendre une tache :
" - "1. Cliquez sur Demarrer dans le tray Agent V1
" - "2. Effectuez votre tache normalement
" - "3. Cliquez sur Terminer quand c'est fini

" - "Je vais observer et apprendre automatiquement." - ) - - def _on_list_clicked(self) -> None: - """Action du bouton 'Que sais-tu faire ?'.""" - self._chat.add_user_message("Que sais-tu faire ?") - self._chat.set_input_enabled(False) - QTimer.singleShot(100, self._fetch_workflows) - - def _fetch_workflows(self) -> None: - """Recuperer et afficher la liste des workflows.""" - workflows = self._client.list_workflows() - if workflows: - items = [] - for wf in workflows[:15]: - name = wf.get("name", wf.get("id", "?")) - desc = wf.get("description", "") - items.append(f"- {name}{': ' + desc if desc else ''}") - text = "Voici les workflows que je connais :
" + "
".join(items) - if len(workflows) > 15: - text += f"
... et {len(workflows) - 15} autres" - else: - text = ( - "Je ne connais aucun workflow pour le moment.
" - "Apprenez-moi une tache avec le bouton 'Apprends-moi' !" - ) - self._chat.add_lea_message(text) - self._chat.set_input_enabled(True) - - # --------------------------------------------------------------------------- - # Connexion - # --------------------------------------------------------------------------- - - def _check_connection(self) -> None: - """Verifier la connexion au serveur (dans un timer).""" - connected = self._client.check_connection() - self._update_connection_ui(connected) - - def _on_connection_changed(self, connected: bool) -> None: - """Callback quand l'etat de connexion change.""" - # Appeler dans le thread principal via QTimer - QTimer.singleShot(0, lambda: self._update_connection_ui(connected)) - - def _update_connection_ui(self, connected: bool) -> None: - """Mettre a jour l'UI selon l'etat de connexion.""" - self._avatar.set_connected(connected) - if hasattr(self, '_mini_avatar'): - self._mini_avatar.set_connected(connected) - - if connected: - self._status_label.setText( - f"Connecte a {self._client.server_host}" - ) - self._status_label.setStyleSheet( - f"color: {styles.COLOR_SUCCESS}; " - f"font-family: '{styles.FONT_FAMILY}'; " - f"font-size: {styles.FONT_SIZE_SMALL}px; " - f"background: transparent; border: none;" - ) - else: - error = self._client.last_error or "Serveur injoignable" - self._status_label.setText(f"Deconnecte ({error[:30]})") - self._status_label.setStyleSheet( - f"color: {styles.COLOR_ERROR}; " - f"font-family: '{styles.FONT_FAMILY}'; " - f"font-size: {styles.FONT_SIZE_SMALL}px; " - f"background: transparent; border: none;" - ) - - # --------------------------------------------------------------------------- - # Replay & Overlay - # --------------------------------------------------------------------------- - - def _on_replay_action(self, action: Dict[str, Any]) -> None: - """Callback appelee depuis le thread de polling (pas thread-safe). - - Emettre un signal pour traiter dans le thread Qt. - """ - self.replay_action_received.emit(action) - - @pyqtSlot(dict) - def _handle_replay_action(self, action: Dict[str, Any]) -> None: - """Traiter une action de replay dans le thread Qt. - - Afficher l'overlay AVANT l'execution pour que l'utilisateur - voie ce qui va se passer. - """ - action_type = action.get("type", "?") - action_text = self._describe_action(action) - - # Calculer les coordonnees ecran - desktop = QApplication.desktop() - screen = desktop.screenGeometry(desktop.primaryScreen()) if desktop else None - if screen: - sw, sh = screen.width(), screen.height() - else: - sw, sh = 1920, 1080 - - target_x = int(action.get("x_pct", 0.5) * sw) - target_y = int(action.get("y_pct", 0.5) * sh) - - # Recuperer la progression depuis le replay status - replay = self._client.get_replay_status() - step_current = 0 - step_total = 0 - if replay: - step_total = replay.get("total_actions", 0) - step_current = replay.get("completed_actions", 0) + 1 - - # Mettre a jour la status bar - self._show_replay_status(action_text, step_current, step_total) - - # Afficher l'overlay - self._overlay.show_action( - target_x, target_y, - action_text, - step_current, step_total, - duration_ms=1500, - ) - - # Ajouter dans le chat - self._chat.add_system_message( - f"Etape {step_current}/{step_total} : {action_text}" - ) - - def _describe_action(self, action: Dict[str, Any]) -> str: - """Generer une description lisible d'une action de replay.""" - action_type = action.get("type", "?") - target_text = action.get("target_text", "") - target_role = action.get("target_role", "") - - if action_type == "click": - target = target_text or target_role or "cet element" - return f"Je clique sur [{target}]" - elif action_type == "type": - text = action.get("text", "") - preview = text[:30] + "..." if len(text) > 30 else text - return f"Je tape : {preview}" - elif action_type == "key_combo": - keys = action.get("keys", []) - return f"Je tape : {'+'.join(keys)}" - elif action_type == "scroll": - return "Je fais defiler la page" - elif action_type == "wait": - ms = action.get("duration_ms", 500) - return f"J'attends {ms}ms" - else: - return f"Action : {action_type}" - - def _on_overlay_finished(self) -> None: - """Callback quand l'overlay a fini d'afficher une action.""" - pass # L'executor continue de son cote - - def _show_replay_status( - self, text: str, current: int, total: int, - ) -> None: - """Afficher la barre de progression du replay.""" - self._status_container.show() - self._replay_label.show() - self._replay_label.setText(text) - - if total > 0: - self._progress_bar.show() - self._progress_bar.setMaximum(total) - self._progress_bar.setValue(current) - else: - self._progress_bar.hide() - - def hide_replay_status(self) -> None: - """Masquer la barre de progression du replay.""" - self._status_container.hide() - - # --------------------------------------------------------------------------- - # Visibilite - # --------------------------------------------------------------------------- - - def toggle_visibility(self) -> None: - """Afficher/cacher le panneau (raccourci Ctrl+Shift+L).""" - if self.isVisible(): - self.hide() - else: - self.show() - self.raise_() - self.activateWindow() - - def toggle_minimize(self) -> None: - """Basculer entre panneau complet et mini-barre.""" - if self._minimized: - # Restaurer - self._mini_bar.hide() - self._bg_widget.show() - self._minimized = False - self._anchor_to_right() - else: - # Reduire - self._bg_widget.hide() - self._mini_bar.show() - self._minimized = True - # Positionner la mini-barre en haut a droite - desktop = QApplication.desktop() - if desktop: - screen = desktop.availableGeometry(desktop.primaryScreen()) - x = screen.right() - 90 - y = screen.top() + 10 - self.setGeometry(x, y, 80, 50) - - # --------------------------------------------------------------------------- - # Drag (deplacer la fenetre sans barre de titre) - # --------------------------------------------------------------------------- - - def mousePressEvent(self, event) -> None: # noqa: N802 - if event.button() == Qt.LeftButton: - self._drag_pos = event.globalPos() - self.frameGeometry().topLeft() - event.accept() - - def mouseMoveEvent(self, event) -> None: # noqa: N802 - if event.buttons() == Qt.LeftButton and hasattr(self, '_drag_pos'): - self.move(event.globalPos() - self._drag_pos) - event.accept() - - # --------------------------------------------------------------------------- - # Painting (fond arrondi semi-transparent) - # --------------------------------------------------------------------------- - - def paintEvent(self, event) -> None: # noqa: N802 - """Peindre le fond semi-transparent avec coins arrondis.""" - painter = QPainter(self) - painter.setRenderHint(QPainter.Antialiasing, True) - - path = QPainterPath() - path.addRoundedRect( - 0, 0, self.width(), self.height(), - styles.BORDER_RADIUS, styles.BORDER_RADIUS, - ) - - # Fond semi-transparent - bg = QColor(styles.COLOR_BG) - bg.setAlpha(245) # Legerement transparent - painter.fillPath(path, bg) - - # Bordure - painter.setPen(QPen(QColor(styles.COLOR_BORDER), 1)) - painter.drawPath(path) - - painter.end() - - # --------------------------------------------------------------------------- - # Lifecycle - # --------------------------------------------------------------------------- - - def closeEvent(self, event) -> None: # noqa: N802 - """Ne pas fermer, juste cacher.""" - event.ignore() - self.hide() - - def shutdown(self) -> None: - """Arret propre.""" - self._conn_timer.stop() - self._overlay.hide_overlay() - self._client.shutdown() - logger.info("LeaMainWindow arretee") diff --git a/agent_v0/lea_ui/overlay.py b/agent_v0/lea_ui/overlay.py deleted file mode 100644 index c2e850786..000000000 --- a/agent_v0/lea_ui/overlay.py +++ /dev/null @@ -1,354 +0,0 @@ -# agent_v0/lea_ui/overlay.py -""" -Overlay de feedback visuel pour le replay. - -Fenetre transparente plein ecran, click-through, qui affiche : - - Cercle rouge pulsant autour de la cible du clic - - Texte descriptif de l'action en cours - - Fleche pointant vers la cible - - Barre de progression etape X/Y - -Le overlay ne capture JAMAIS les clics (Qt.WA_TransparentForMouseEvents). -""" - -from __future__ import annotations - -import logging -import math -from typing import Optional, Tuple - -from PyQt5.QtCore import ( - QPoint, - QPropertyAnimation, - QRect, - QRectF, - QSize, - Qt, - QTimer, - pyqtProperty, - pyqtSignal, -) -from PyQt5.QtGui import ( - QBrush, - QColor, - QFont, - QFontMetrics, - QPainter, - QPainterPath, - QPen, - QPolygonF, -) -from PyQt5.QtWidgets import QApplication, QDesktopWidget, QWidget - -from . import styles - -logger = logging.getLogger("lea_ui.overlay") - - -class OverlayWidget(QWidget): - """Overlay plein ecran transparent pour le feedback visuel du replay. - - Flags critiques : - - WindowStaysOnTopHint : toujours au-dessus - - FramelessWindowHint : pas de decoration - - Tool : n'apparait pas dans la barre des taches - - WA_TranslucentBackground : fond transparent - - WA_TransparentForMouseEvents : CLICK-THROUGH COMPLET - """ - - # Signal emis quand l'animation d'une action est terminee - action_display_finished = pyqtSignal() - - def __init__(self, parent: Optional[QWidget] = None) -> None: - super().__init__(parent) - - # Flags de fenetre pour click-through complet - self.setWindowFlags( - Qt.WindowStaysOnTopHint - | Qt.FramelessWindowHint - | Qt.Tool - ) - self.setAttribute(Qt.WA_TranslucentBackground, True) - self.setAttribute(Qt.WA_TransparentForMouseEvents, True) - - # Etat de l'affichage - self._target_pos: Optional[Tuple[int, int]] = None - self._action_text: str = "" - self._progress_current: int = 0 - self._progress_total: int = 0 - self._action_done: bool = False - self._visible = False - - # Animation du cercle pulsant - self._pulse_radius: float = 30.0 - self._pulse_growing = True - self._pulse_opacity: float = 0.8 - - # Timer d'animation - self._anim_timer = QTimer(self) - self._anim_timer.timeout.connect(self._animate_pulse) - self._anim_timer.setInterval(30) # ~33 FPS - - # Timer d'effacement automatique - self._fade_timer = QTimer(self) - self._fade_timer.setSingleShot(True) - self._fade_timer.timeout.connect(self._on_fade) - - # Couvrir tout l'ecran - self._update_geometry() - - def _update_geometry(self) -> None: - """Positionner l'overlay sur tout l'ecran principal.""" - desktop = QApplication.desktop() - if desktop: - screen_rect = desktop.screenGeometry(desktop.primaryScreen()) - self.setGeometry(screen_rect) - - # --------------------------------------------------------------------------- - # API publique - # --------------------------------------------------------------------------- - - def show_action( - self, - target_x: int, - target_y: int, - text: str, - step_current: int = 0, - step_total: int = 0, - duration_ms: int = 1500, - ) -> None: - """Afficher le feedback pour une action de replay. - - Args: - target_x: position X du clic cible (pixels ecran) - target_y: position Y du clic cible (pixels ecran) - text: description de l'action (ex: "Je clique sur [Valider]") - step_current: etape courante (1-indexed) - step_total: nombre total d'etapes - duration_ms: duree d'affichage en ms (defaut 1500ms) - """ - self._target_pos = (target_x, target_y) - self._action_text = text - self._progress_current = step_current - self._progress_total = step_total - self._action_done = False - self._pulse_radius = 30.0 - self._pulse_opacity = 0.8 - self._visible = True - - self._update_geometry() - self.show() - self.raise_() - self._anim_timer.start() - - # Programmer l'effacement - self._fade_timer.start(duration_ms) - self.update() - - def show_done(self, text: Optional[str] = None) -> None: - """Marquer l'action courante comme terminee (coche verte).""" - self._action_done = True - if text: - self._action_text = text - self.update() - - # Effacer apres 800ms - self._fade_timer.start(800) - - def hide_overlay(self) -> None: - """Masquer immediatement l'overlay.""" - self._anim_timer.stop() - self._fade_timer.stop() - self._visible = False - self._target_pos = None - self.hide() - - # --------------------------------------------------------------------------- - # Animations - # --------------------------------------------------------------------------- - - def _animate_pulse(self) -> None: - """Animer le cercle pulsant.""" - if self._action_done: - # Pas d'animation en mode "done" - return - - pulse_speed = 0.8 - if self._pulse_growing: - self._pulse_radius += pulse_speed - if self._pulse_radius >= 45.0: - self._pulse_growing = False - else: - self._pulse_radius -= pulse_speed - if self._pulse_radius <= 25.0: - self._pulse_growing = True - - # Opacite qui suit le pulse - self._pulse_opacity = 0.5 + 0.3 * ( - (self._pulse_radius - 25.0) / 20.0 - ) - - self.update() - - def _on_fade(self) -> None: - """Callback apres le timer d'effacement.""" - self._anim_timer.stop() - self._visible = False - self._target_pos = None - self.hide() - self.action_display_finished.emit() - - # --------------------------------------------------------------------------- - # Rendu - # --------------------------------------------------------------------------- - - def paintEvent(self, event) -> None: # noqa: N802 - """Dessiner l'overlay.""" - if not self._visible or not self._target_pos: - return - - painter = QPainter(self) - painter.setRenderHint(QPainter.Antialiasing, True) - - tx, ty = self._target_pos - - if self._action_done: - self._draw_done_indicator(painter, tx, ty) - else: - self._draw_pulse_circle(painter, tx, ty) - self._draw_arrow(painter, tx, ty) - - self._draw_action_text(painter, tx, ty) - self._draw_progress_bar(painter) - - painter.end() - - def _draw_pulse_circle(self, painter: QPainter, cx: int, cy: int) -> None: - """Dessiner le cercle rouge pulsant autour de la cible.""" - # Cercle exterieur (pulsant, semi-transparent) - color = QColor(styles.COLOR_OVERLAY_PULSE) - color.setAlphaF(self._pulse_opacity * 0.4) - painter.setBrush(QBrush(color)) - painter.setPen(Qt.NoPen) - painter.drawEllipse( - QPoint(cx, cy), - int(self._pulse_radius), - int(self._pulse_radius), - ) - - # Cercle interieur (fixe, plus opaque) - color_inner = QColor(styles.COLOR_OVERLAY_PULSE) - color_inner.setAlphaF(0.7) - pen = QPen(color_inner, 3) - painter.setPen(pen) - painter.setBrush(Qt.NoBrush) - painter.drawEllipse(QPoint(cx, cy), 20, 20) - - # Point central - painter.setPen(Qt.NoPen) - painter.setBrush(QBrush(QColor(styles.COLOR_OVERLAY_PULSE))) - painter.drawEllipse(QPoint(cx, cy), 4, 4) - - def _draw_done_indicator(self, painter: QPainter, cx: int, cy: int) -> None: - """Dessiner l'indicateur de succes (cercle vert + coche).""" - # Cercle vert - color = QColor(styles.COLOR_SUCCESS) - color.setAlphaF(0.8) - painter.setBrush(QBrush(color)) - painter.setPen(Qt.NoPen) - painter.drawEllipse(QPoint(cx, cy), 25, 25) - - # Coche blanche - pen = QPen(QColor(styles.COLOR_TEXT_ON_ACCENT), 3) - pen.setCapStyle(Qt.RoundCap) - pen.setJoinStyle(Qt.RoundJoin) - painter.setPen(pen) - painter.setBrush(Qt.NoBrush) - - path = QPainterPath() - path.moveTo(cx - 10, cy) - path.lineTo(cx - 3, cy + 8) - path.lineTo(cx + 12, cy - 8) - painter.drawPath(path) - - def _draw_arrow(self, painter: QPainter, tx: int, ty: int) -> None: - """Dessiner une fleche pointant vers la cible depuis le texte.""" - # Position du texte (au-dessus ou en dessous selon l'espace) - text_y = ty - 80 if ty > 120 else ty + 80 - text_x = max(100, min(tx, self.width() - 200)) - - # Ligne de la fleche - color = QColor(styles.COLOR_OVERLAY_PULSE) - color.setAlphaF(0.6) - pen = QPen(color, 2, Qt.DashLine) - painter.setPen(pen) - painter.drawLine(text_x, text_y + (15 if text_y < ty else -15), tx, ty) - - def _draw_action_text(self, painter: QPainter, tx: int, ty: int) -> None: - """Dessiner le texte descriptif de l'action.""" - if not self._action_text: - return - - # Positionner le texte au-dessus ou en dessous de la cible - text_y = ty - 90 if ty > 140 else ty + 70 - - font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_LARGE, QFont.Bold) - painter.setFont(font) - metrics = QFontMetrics(font) - - # Mesurer le texte - text_rect = metrics.boundingRect(self._action_text) - text_width = text_rect.width() + 30 - text_height = text_rect.height() + 16 - - # Centrer horizontalement sur la cible (avec limites d'ecran) - box_x = max(10, min(tx - text_width // 2, self.width() - text_width - 10)) - box_y = text_y - text_height // 2 - - # Fond semi-transparent arrondi - bg_color = QColor(31, 41, 55, 200) # Gris fonce semi-transparent - painter.setBrush(QBrush(bg_color)) - painter.setPen(Qt.NoPen) - painter.drawRoundedRect(box_x, box_y, text_width, text_height, 8, 8) - - # Texte blanc - painter.setPen(QPen(QColor(styles.COLOR_OVERLAY_TEXT))) - painter.drawText( - QRect(box_x, box_y, text_width, text_height), - Qt.AlignCenter, - self._action_text, - ) - - def _draw_progress_bar(self, painter: QPainter) -> None: - """Dessiner la barre de progression en bas de l'ecran.""" - if self._progress_total <= 0: - return - - bar_width = 300 - bar_height = 6 - bar_x = (self.width() - bar_width) // 2 - bar_y = self.height() - 50 - - # Fond - bg_color = QColor(255, 255, 255, 80) - painter.setBrush(QBrush(bg_color)) - painter.setPen(Qt.NoPen) - painter.drawRoundedRect(bar_x, bar_y, bar_width, bar_height, 3, 3) - - # Progression - progress_pct = self._progress_current / self._progress_total - fill_width = int(bar_width * progress_pct) - accent_color = QColor(styles.COLOR_ACCENT) - accent_color.setAlphaF(0.9) - painter.setBrush(QBrush(accent_color)) - painter.drawRoundedRect(bar_x, bar_y, fill_width, bar_height, 3, 3) - - # Label "Etape X/Y" - label_font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL) - painter.setFont(label_font) - painter.setPen(QPen(QColor(255, 255, 255, 200))) - painter.drawText( - QRect(bar_x, bar_y + bar_height + 4, bar_width, 20), - Qt.AlignCenter, - f"Etape {self._progress_current}/{self._progress_total}", - ) diff --git a/agent_v0/lea_ui/replay_integration.py b/agent_v0/lea_ui/replay_integration.py deleted file mode 100644 index da29ce418..000000000 --- a/agent_v0/lea_ui/replay_integration.py +++ /dev/null @@ -1,191 +0,0 @@ -# agent_v0/lea_ui/replay_integration.py -""" -Integration du feedback visuel (overlay) dans la boucle de replay de l'Agent V1. - -Ce module fournit un wrapper autour de ActionExecutorV1.execute_replay_action -qui affiche l'overlay AVANT chaque action et la marque comme terminee APRES. - -Sequence pour chaque action : - 1. Afficher l'overlay avec la description de l'action (1.5s) - 2. Attendre que l'overlay ait ete vu par l'utilisateur - 3. Executer l'action - 4. Mettre a jour l'overlay (coche verte) - 5. Passer a l'action suivante -""" - -from __future__ import annotations - -import logging -import time -from typing import Any, Callable, Dict, Optional, Tuple - -logger = logging.getLogger("lea_ui.replay_integration") - -# Delai d'affichage de l'overlay avant execution (secondes) -PRE_ACTION_DELAY = 1.5 -# Delai apres la coche verte (secondes) -POST_ACTION_DELAY = 0.5 - - -class ReplayOverlayBridge: - """Pont entre la boucle de replay et l'overlay. - - Fonctionne de maniere thread-safe : la boucle de replay tourne dans - un thread daemon, et l'overlay est controle via des signaux Qt. - - L'overlay est optionnel — si non connecte, l'execution continue normalement. - """ - - def __init__(self) -> None: - self._overlay = None - self._show_callback: Optional[Callable] = None - self._done_callback: Optional[Callable] = None - self._hide_callback: Optional[Callable] = None - self._enabled = False - - # Compteur de progression - self._step_current = 0 - self._step_total = 0 - - def connect_overlay( - self, - show_fn: Callable[[int, int, str, int, int, int], None], - done_fn: Callable[[Optional[str]], None], - hide_fn: Callable[[], None], - ) -> None: - """Connecter les callbacks de l'overlay. - - Args: - show_fn: overlay.show_action(target_x, target_y, text, step, total, duration_ms) - done_fn: overlay.show_done(text) - hide_fn: overlay.hide_overlay() - """ - self._show_callback = show_fn - self._done_callback = done_fn - self._hide_callback = hide_fn - self._enabled = True - logger.info("Overlay connecte au bridge de replay") - - def disconnect_overlay(self) -> None: - """Deconnecter l'overlay.""" - self._show_callback = None - self._done_callback = None - self._hide_callback = None - self._enabled = False - - def set_total_steps(self, total: int) -> None: - """Definir le nombre total d'etapes du replay.""" - self._step_total = total - self._step_current = 0 - - def wrap_execute( - self, - action: Dict[str, Any], - executor_fn: Callable[[Dict[str, Any]], Dict[str, Any]], - screen_width: int = 1920, - screen_height: int = 1080, - ) -> Dict[str, Any]: - """Wrapper autour de l'execution d'une action avec feedback overlay. - - Args: - action: action normalisee (type, x_pct, y_pct, text, keys, ...) - executor_fn: fonction d'execution (ex: ActionExecutorV1.execute_replay_action) - screen_width: largeur de l'ecran en pixels - screen_height: hauteur de l'ecran en pixels - - Returns: - Resultat de l'execution (dict avec success, error, screenshot, ...) - """ - self._step_current += 1 - - if not self._enabled or not self._show_callback: - # Pas d'overlay — execution directe - return executor_fn(action) - - # --- 1. Afficher l'overlay --- - action_text = self._describe_action(action) - target_x, target_y = self._get_target_coords(action, screen_width, screen_height) - - try: - self._show_callback( - target_x, target_y, - action_text, - self._step_current, - self._step_total, - int(PRE_ACTION_DELAY * 1000), - ) - except Exception as e: - logger.warning("Erreur affichage overlay : %s", e) - - # --- 2. Attendre que l'utilisateur ait vu --- - time.sleep(PRE_ACTION_DELAY) - - # --- 3. Executer l'action --- - result = executor_fn(action) - - # --- 4. Marquer comme terminee --- - if result.get("success"): - done_text = f"{action_text} OK" - else: - done_text = f"{action_text} ECHEC" - - try: - if self._done_callback: - self._done_callback(done_text) - except Exception as e: - logger.warning("Erreur overlay done : %s", e) - - time.sleep(POST_ACTION_DELAY) - - # --- 5. Cacher si c'etait la derniere etape --- - if self._step_current >= self._step_total and self._hide_callback: - try: - self._hide_callback() - except Exception: - pass - - return result - - def _describe_action(self, action: Dict[str, Any]) -> str: - """Generer une description lisible d'une action.""" - action_type = action.get("type", "?") - target_text = action.get("target_text", "") - target_role = action.get("target_role", "") - - if action_type == "click": - target = target_text or target_role or "cet element" - return f"Je clique sur [{target}]" - elif action_type == "type": - text = action.get("text", "") - preview = text[:25] + "..." if len(text) > 25 else text - return f"Je tape : {preview}" - elif action_type == "key_combo": - keys = action.get("keys", []) - return f"Combinaison : {'+'.join(keys)}" - elif action_type == "scroll": - return "Defilement" - elif action_type == "wait": - ms = action.get("duration_ms", 500) - return f"Attente {ms}ms" - else: - return f"Action : {action_type}" - - def _get_target_coords( - self, action: Dict[str, Any], sw: int, sh: int, - ) -> Tuple[int, int]: - """Calculer les coordonnees cible en pixels.""" - x_pct = action.get("x_pct", 0.5) - y_pct = action.get("y_pct", 0.5) - return int(x_pct * sw), int(y_pct * sh) - - -# Instance globale (singleton) pour l'integration -_bridge: Optional[ReplayOverlayBridge] = None - - -def get_replay_bridge() -> ReplayOverlayBridge: - """Obtenir l'instance globale du bridge overlay/replay.""" - global _bridge - if _bridge is None: - _bridge = ReplayOverlayBridge() - return _bridge diff --git a/agent_v0/lea_ui/styles.py b/agent_v0/lea_ui/styles.py deleted file mode 100644 index 524c56856..000000000 --- a/agent_v0/lea_ui/styles.py +++ /dev/null @@ -1,200 +0,0 @@ -# agent_v0/lea_ui/styles.py -""" -Theme et couleurs pour l'interface Lea. - -Palette douce et moderne, pensee pour ne pas fatiguer les yeux -lors d'une utilisation prolongee sur un poste de travail Windows. -""" - -# --------------------------------------------------------------------------- -# Palette de couleurs -# --------------------------------------------------------------------------- - -# Fond principal -COLOR_BG = "#F5F7FA" -# Fond secondaire (sidebar, header) -COLOR_BG_SECONDARY = "#EEF1F6" -# Fond des bulles utilisateur -COLOR_BUBBLE_USER = "#6366F1" -# Fond des bulles Lea -COLOR_BUBBLE_LEA = "#FFFFFF" -# Accent principal (indigo) -COLOR_ACCENT = "#6366F1" -# Accent hover -COLOR_ACCENT_HOVER = "#4F46E5" -# Texte principal -COLOR_TEXT = "#1F2937" -# Texte secondaire -COLOR_TEXT_SECONDARY = "#6B7280" -# Texte sur accent (blanc) -COLOR_TEXT_ON_ACCENT = "#FFFFFF" -# Bordure legere -COLOR_BORDER = "#E5E7EB" -# Succes (vert) -COLOR_SUCCESS = "#10B981" -# Erreur (rouge) -COLOR_ERROR = "#EF4444" -# Avertissement (orange) -COLOR_WARNING = "#F59E0B" -# Overlay rouge pulsant -COLOR_OVERLAY_PULSE = "#EF4444" -# Overlay texte -COLOR_OVERLAY_TEXT = "#FFFFFF" -# Overlay fond info -COLOR_OVERLAY_INFO_BG = "rgba(31, 41, 55, 200)" - -# --------------------------------------------------------------------------- -# Typographie -# --------------------------------------------------------------------------- - -FONT_FAMILY = "Segoe UI" -FONT_SIZE_SMALL = 11 -FONT_SIZE_NORMAL = 13 -FONT_SIZE_LARGE = 15 -FONT_SIZE_TITLE = 18 - -# --------------------------------------------------------------------------- -# Dimensions -# --------------------------------------------------------------------------- - -# Largeur du panneau Lea -PANEL_WIDTH = 380 -# Hauteur minimale -PANEL_MIN_HEIGHT = 500 -# Rayon des coins arrondis -BORDER_RADIUS = 12 -# Rayon des bulles de chat -BUBBLE_RADIUS = 16 -# Padding interne -PADDING = 12 -# Taille de l'avatar -AVATAR_SIZE = 40 -# Marge entre les elements -SPACING = 8 - -# --------------------------------------------------------------------------- -# Stylesheet global du panneau Lea -# --------------------------------------------------------------------------- - -MAIN_WINDOW_STYLE = f""" - QWidget#LeaMainWindow {{ - background-color: {COLOR_BG}; - border-radius: {BORDER_RADIUS}px; - border: 1px solid {COLOR_BORDER}; - }} -""" - -HEADER_STYLE = f""" - QWidget#LeaHeader {{ - background-color: {COLOR_BG_SECONDARY}; - border-top-left-radius: {BORDER_RADIUS}px; - border-top-right-radius: {BORDER_RADIUS}px; - border-bottom: 1px solid {COLOR_BORDER}; - }} - QLabel#LeaTitle {{ - color: {COLOR_TEXT}; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_TITLE}px; - font-weight: bold; - }} - QLabel#LeaStatus {{ - color: {COLOR_TEXT_SECONDARY}; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_SMALL}px; - }} -""" - -CHAT_AREA_STYLE = f""" - QScrollArea {{ - border: none; - background-color: {COLOR_BG}; - }} - QWidget#ChatContainer {{ - background-color: {COLOR_BG}; - }} -""" - -INPUT_STYLE = f""" - QLineEdit#ChatInput {{ - background-color: {COLOR_BUBBLE_LEA}; - border: 1px solid {COLOR_BORDER}; - border-radius: 20px; - padding: 8px 16px; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_NORMAL}px; - color: {COLOR_TEXT}; - }} - QLineEdit#ChatInput:focus {{ - border-color: {COLOR_ACCENT}; - }} -""" - -SEND_BUTTON_STYLE = f""" - QPushButton#SendButton {{ - background-color: {COLOR_ACCENT}; - color: {COLOR_TEXT_ON_ACCENT}; - border: none; - border-radius: 20px; - padding: 8px 16px; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_NORMAL}px; - font-weight: bold; - min-width: 50px; - }} - QPushButton#SendButton:hover {{ - background-color: {COLOR_ACCENT_HOVER}; - }} - QPushButton#SendButton:pressed {{ - background-color: #3730A3; - }} -""" - -QUICK_BUTTON_STYLE = f""" - QPushButton#QuickButton {{ - background-color: {COLOR_BUBBLE_LEA}; - color: {COLOR_ACCENT}; - border: 1px solid {COLOR_ACCENT}; - border-radius: 18px; - padding: 6px 14px; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_SMALL}px; - }} - QPushButton#QuickButton:hover {{ - background-color: {COLOR_ACCENT}; - color: {COLOR_TEXT_ON_ACCENT}; - }} -""" - -PROGRESS_STYLE = f""" - QProgressBar {{ - border: none; - border-radius: 4px; - background-color: {COLOR_BORDER}; - text-align: center; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_SMALL}px; - color: {COLOR_TEXT}; - max-height: 8px; - }} - QProgressBar::chunk {{ - background-color: {COLOR_ACCENT}; - border-radius: 4px; - }} -""" - -STATUS_LABEL_STYLE = f""" - QLabel#StatusLabel {{ - color: {COLOR_TEXT_SECONDARY}; - font-family: "{FONT_FAMILY}"; - font-size: {FONT_SIZE_SMALL}px; - padding: 4px 8px; - }} -""" - -MINI_BAR_STYLE = f""" - QWidget#MiniBar {{ - background-color: {COLOR_BG_SECONDARY}; - border-radius: 20px; - border: 1px solid {COLOR_BORDER}; - }} -""" diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 18c6208cb..c6dd773f2 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -3326,21 +3326,13 @@ def _vlm_quick_find( # Résolution Set-of-Mark : SomEngine (détection) + VLM (identification) # --------------------------------------------------------------------------- -_som_engine_api = None # Singleton - - def _get_som_engine_api(): - """Singleton SomEngine pour la résolution visuelle (lazy-loaded, GPU).""" - global _som_engine_api - if _som_engine_api is None: - try: - from core.detection.som_engine import SomEngine - _som_engine_api = SomEngine(device="cuda") - logger.info("SomEngine API initialisé (lazy singleton)") - except Exception as e: - logger.warning("SomEngine API non disponible : %s", e) - _som_engine_api = False - return _som_engine_api if _som_engine_api is not False else None + """Singleton SomEngine partagé.""" + try: + from core.detection.som_engine import get_shared_engine + return get_shared_engine() + except ImportError: + return None def _resolve_by_som( @@ -3423,7 +3415,7 @@ def _resolve_by_som( if not exact_matches: exact_matches = [ e for e in som_result.elements - if e.label and ( + if e.label and len(e.label) >= 3 and ( label_lower in e.label.lower() or e.label.lower() in label_lower ) @@ -3493,6 +3485,10 @@ def _resolve_by_som( ) # ── 3. Sauvegarder l'image annotée SoM temporairement ── + if som_result.som_image is None: + logger.debug("SoM resolve : pas d'image annotée, skip VLM") + return None + import tempfile try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index 6f1afd105..234abaa2e 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -431,21 +431,17 @@ def _needs_post_wait(action: dict) -> int: # SomEngine — enrichissement Set-of-Mark des clics pendant le build_replay # --------------------------------------------------------------------------- -_som_engine = None # Singleton, chargé à la demande +_som_cache: Dict[str, Any] = {} # screenshot_id -> SomResult (cache build_replay) +_SOM_CACHE_MAX = 50 def _get_som_engine(): - """Singleton SomEngine (lazy-loaded, GPU).""" - global _som_engine - if _som_engine is None: - try: - from core.detection.som_engine import SomEngine - _som_engine = SomEngine(device="cuda") - logger.info("SomEngine initialisé (lazy singleton)") - except Exception as e: - logger.warning("SomEngine non disponible : %s", e) - _som_engine = False # Marqueur "indisponible" - return _som_engine if _som_engine is not False else None + """Singleton SomEngine partagé.""" + try: + from core.detection.som_engine import get_shared_engine + return get_shared_engine() + except ImportError: + return None def _som_identify_clicked_element( @@ -486,19 +482,38 @@ def _som_identify_clicked_element( if not full_path.is_file(): return None - try: - from PIL import Image - img = Image.open(full_path).convert("RGB") - except Exception as e: - logger.debug("SoM: impossible de charger %s : %s", full_path, e) - return None + # Vérifier le cache SomResult par (session_dir, screenshot_id) + cache_key = f"{session_dir}:{screenshot_id}" + if cache_key in _som_cache: + result = _som_cache[cache_key] + else: + try: + from PIL import Image + img = Image.open(full_path).convert("RGB") + except Exception as e: + logger.debug("SoM: impossible de charger %s : %s", full_path, e) + return None - # Lancer SomEngine - try: - result = engine.analyze(img) - except Exception as e: - logger.warning("SoM: erreur d'analyse : %s", e) - return None + # Lancer SomEngine + try: + result = engine.analyze(img) + except Exception as e: + logger.warning("SoM: erreur d'analyse : %s", e) + return None + + # Stocker dans le cache (éléments seulement, pas l'image annotée) + from core.detection.som_engine import SomResult + cached = SomResult( + elements=result.elements, + width=result.width, + height=result.height, + analysis_time_ms=result.analysis_time_ms, + ) + if len(_som_cache) >= _SOM_CACHE_MAX: + # Supprimer la plus ancienne entrée (FIFO) + oldest_key = next(iter(_som_cache)) + del _som_cache[oldest_key] + _som_cache[cache_key] = cached if not result.elements: return None diff --git a/agent_v0/window_info.py b/agent_v0/window_info.py deleted file mode 100644 index 7e6be8744..000000000 --- a/agent_v0/window_info.py +++ /dev/null @@ -1,55 +0,0 @@ -# window_info.py -""" -Récupération des informations sur la fenêtre active (X11). - -v0 : -- utilise xdotool pour obtenir : - - le titre de la fenêtre active - - le PID de la fenêtre active, puis le nom du process via ps - -Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown". -""" - -from __future__ import annotations - -import subprocess -from typing import Dict, Optional - - -def _run_cmd(cmd: list[str]) -> Optional[str]: - """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur.""" - try: - out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) - return out.decode("utf-8", errors="ignore").strip() - except Exception: - return None - - -def get_active_window_info() -> Dict[str, str]: - """ - Renvoie un dict : - { - "title": "...", - "app_name": "..." - } - - Nécessite xdotool installé sur le système. - """ - title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"]) - pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"]) - - app_name: Optional[str] = None - if pid_str: - pid_str = pid_str.strip() - # On récupère le nom du binaire via ps - app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="]) - - if not title: - title = "unknown_window" - if not app_name: - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } diff --git a/agent_v0/window_info_crossplatform.py b/agent_v0/window_info_crossplatform.py deleted file mode 100644 index ba059a3fc..000000000 --- a/agent_v0/window_info_crossplatform.py +++ /dev/null @@ -1,192 +0,0 @@ -# window_info_crossplatform.py -""" -Récupération des informations sur la fenêtre active - CROSS-PLATFORM - -Supporte: -- Linux (X11 via xdotool) -- Windows (via pywin32) -- macOS (via pyobjc) - -Installation des dépendances: - pip install pywin32 # Windows - pip install pyobjc-framework-Cocoa # macOS - pip install psutil # Tous OS -""" - -from __future__ import annotations - -import platform -import subprocess -from typing import Dict, Optional - - -def _run_cmd(cmd: list[str]) -> Optional[str]: - """Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur.""" - try: - out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL) - return out.decode("utf-8", errors="ignore").strip() - except Exception: - return None - - -def get_active_window_info() -> Dict[str, str]: - """ - Renvoie un dict : - { - "title": "...", - "app_name": "..." - } - - Détecte automatiquement l'OS et utilise la méthode appropriée. - """ - system = platform.system() - - if system == "Linux": - return _get_window_info_linux() - elif system == "Windows": - return _get_window_info_windows() - elif system == "Darwin": # macOS - return _get_window_info_macos() - else: - return {"title": "unknown_window", "app_name": "unknown_app"} - - -def _get_window_info_linux() -> Dict[str, str]: - """ - Linux: utilise xdotool (X11) - - Nécessite: sudo apt-get install xdotool - """ - title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"]) - pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"]) - - app_name: Optional[str] = None - if pid_str: - pid_str = pid_str.strip() - # On récupère le nom du binaire via ps - app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="]) - - if not title: - title = "unknown_window" - if not app_name: - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } - - -def _get_window_info_windows() -> Dict[str, str]: - """ - Windows: utilise pywin32 + psutil - - Nécessite: pip install pywin32 psutil - """ - try: - import win32gui - import win32process - import psutil - - # Fenêtre au premier plan - hwnd = win32gui.GetForegroundWindow() - - # Titre de la fenêtre - title = win32gui.GetWindowText(hwnd) - if not title: - title = "unknown_window" - - # PID du processus - _, pid = win32process.GetWindowThreadProcessId(hwnd) - - # Nom du processus - try: - process = psutil.Process(pid) - app_name = process.name() - except (psutil.NoSuchProcess, psutil.AccessDenied): - app_name = "unknown_app" - - return { - "title": title, - "app_name": app_name, - } - - except ImportError: - # pywin32 ou psutil non installé - return { - "title": "unknown_window (pywin32 missing)", - "app_name": "unknown_app (pywin32 missing)", - } - except Exception as e: - return { - "title": f"error: {e}", - "app_name": "unknown_app", - } - - -def _get_window_info_macos() -> Dict[str, str]: - """ - macOS: utilise pyobjc (AppKit) - - Nécessite: pip install pyobjc-framework-Cocoa - - Note: Nécessite les permissions "Accessibility" dans System Preferences - """ - try: - from AppKit import NSWorkspace - from Quartz import ( - CGWindowListCopyWindowInfo, - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID - ) - - # Application active - active_app = NSWorkspace.sharedWorkspace().activeApplication() - app_name = active_app.get('NSApplicationName', 'unknown_app') - - # Titre de la fenêtre (via Quartz) - # On cherche la fenêtre de l'app active qui est au premier plan - window_list = CGWindowListCopyWindowInfo( - kCGWindowListOptionOnScreenOnly, - kCGNullWindowID - ) - - title = "unknown_window" - for window in window_list: - owner_name = window.get('kCGWindowOwnerName', '') - if owner_name == app_name: - window_title = window.get('kCGWindowName', '') - if window_title: - title = window_title - break - - return { - "title": title, - "app_name": app_name, - } - - except ImportError: - # pyobjc non installé - return { - "title": "unknown_window (pyobjc missing)", - "app_name": "unknown_app (pyobjc missing)", - } - except Exception as e: - return { - "title": f"error: {e}", - "app_name": "unknown_app", - } - - -# Test rapide -if __name__ == "__main__": - import time - - print(f"OS détecté: {platform.system()}") - print("\nTest de capture fenêtre active (5 secondes)...") - print("Changez de fenêtre pour tester!\n") - - for i in range(5): - info = get_active_window_info() - print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}") - time.sleep(1) diff --git a/core/detection/som_engine.py b/core/detection/som_engine.py index 041aa7235..5aab8eaf7 100644 --- a/core/detection/som_engine.py +++ b/core/detection/som_engine.py @@ -25,6 +25,7 @@ from __future__ import annotations import base64 import io import logging +import os from dataclasses import dataclass, field from pathlib import Path from typing import List, Optional, Tuple @@ -33,8 +34,10 @@ from PIL import Image, ImageDraw, ImageFont logger = logging.getLogger(__name__) -# Chemin vers les poids YOLO d'OmniParser -_YOLO_WEIGHTS = Path("/home/dom/ai/OmniParser/weights/icon_detect/model.pt") +# Chemin vers les poids YOLO d'OmniParser (configurable via env) +_YOLO_WEIGHTS = Path( + os.environ.get("SOM_YOLO_WEIGHTS", "/home/dom/ai/OmniParser/weights/icon_detect/model.pt") +) @dataclass @@ -165,17 +168,17 @@ class SomEngine: # ── 2. docTR : OCR pour lire le texte ── if self._ocr is not None: try: - import numpy as np from doctr.io import DocumentFile # Convertir PIL → fichier temporaire pour docTR import tempfile with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: screenshot.save(tmp, format="JPEG", quality=90) tmp_path = tmp.name - doc = DocumentFile.from_images([tmp_path]) - import os - os.unlink(tmp_path) - result_ocr = self._ocr(doc) + try: + doc = DocumentFile.from_images([tmp_path]) + result_ocr = self._ocr(doc) + finally: + os.unlink(tmp_path) for page in result_ocr.pages: for block in page.blocks: @@ -288,3 +291,25 @@ class SomEngine: buf = io.BytesIO() image.save(buf, format="JPEG", quality=quality) return base64.b64encode(buf.getvalue()).decode() + + +# --------------------------------------------------------------------------- +# Singleton partagé (lazy-loaded, thread-safe) +# --------------------------------------------------------------------------- +_shared_engine: Optional[SomEngine] = None +_shared_lock = __import__("threading").Lock() + + +def get_shared_engine(device: str = "cuda") -> Optional[SomEngine]: + """Singleton SomEngine partagé entre tous les modules.""" + global _shared_engine + if _shared_engine is None: + with _shared_lock: + if _shared_engine is None: + try: + _shared_engine = SomEngine(device=device) + logger.info("SomEngine singleton partagé initialisé") + except Exception as e: + logger.warning("SomEngine non disponible : %s", e) + return None + return _shared_engine