refactor: nettoyage agent + fix SomEngine review (singleton partagé, cache, thread-safe)

Nettoyage Windows agent :
- Suppression lea_ui inutilisés (chat_widget, overlay, styles, etc. — -1991 lignes)
- Suppression window_info*.py dupliqués (racine + core/ — -494 lignes)
- build/ + dist/ supprimés (48 MB PyInstaller abandonné, gitignorés)

Fix SomEngine (review quality guardian) :
- Singleton GPU partagé via get_shared_engine() (1 instance au lieu de 2)
- Thread-safe avec threading.Lock (double-checked locking)
- Cache SomResult par screenshot_id (max 50, évite YOLO+OCR redondants)
- Fuite fichier temp docTR corrigée (finally block)
- Chemin YOLO configurable via SOM_YOLO_WEIGHTS env var
- Guard som_image None avant VLM
- Match texte partiel : len(label) >= 3

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-31 10:04:27 +02:00
parent 13390a71e7
commit a92d04621a
15 changed files with 84 additions and 2540 deletions

View File

@@ -1,55 +0,0 @@
# window_info.py
"""
Récupération des informations sur la fenêtre active (X11).
v0 :
- utilise xdotool pour obtenir :
- le titre de la fenêtre active
- le PID de la fenêtre active, puis le nom du process via ps
Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown".
"""
from __future__ import annotations
import subprocess
from typing import Dict, Optional
def _run_cmd(cmd: list[str]) -> Optional[str]:
"""Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return out.decode("utf-8", errors="ignore").strip()
except Exception:
return None
def get_active_window_info() -> Dict[str, str]:
"""
Renvoie un dict :
{
"title": "...",
"app_name": "..."
}
Nécessite xdotool installé sur le système.
"""
title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
app_name: Optional[str] = None
if pid_str:
pid_str = pid_str.strip()
# On récupère le nom du binaire via ps
app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
if not title:
title = "unknown_window"
if not app_name:
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}

View File

@@ -1,192 +0,0 @@
# window_info_crossplatform.py
"""
Récupération des informations sur la fenêtre active - CROSS-PLATFORM
Supporte:
- Linux (X11 via xdotool)
- Windows (via pywin32)
- macOS (via pyobjc)
Installation des dépendances:
pip install pywin32 # Windows
pip install pyobjc-framework-Cocoa # macOS
pip install psutil # Tous OS
"""
from __future__ import annotations
import platform
import subprocess
from typing import Dict, Optional
def _run_cmd(cmd: list[str]) -> Optional[str]:
"""Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return out.decode("utf-8", errors="ignore").strip()
except Exception:
return None
def get_active_window_info() -> Dict[str, str]:
"""
Renvoie un dict :
{
"title": "...",
"app_name": "..."
}
Détecte automatiquement l'OS et utilise la méthode appropriée.
"""
system = platform.system()
if system == "Linux":
return _get_window_info_linux()
elif system == "Windows":
return _get_window_info_windows()
elif system == "Darwin": # macOS
return _get_window_info_macos()
else:
return {"title": "unknown_window", "app_name": "unknown_app"}
def _get_window_info_linux() -> Dict[str, str]:
"""
Linux: utilise xdotool (X11)
Nécessite: sudo apt-get install xdotool
"""
title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
app_name: Optional[str] = None
if pid_str:
pid_str = pid_str.strip()
# On récupère le nom du binaire via ps
app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
if not title:
title = "unknown_window"
if not app_name:
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}
def _get_window_info_windows() -> Dict[str, str]:
"""
Windows: utilise pywin32 + psutil
Nécessite: pip install pywin32 psutil
"""
try:
import win32gui
import win32process
import psutil
# Fenêtre au premier plan
hwnd = win32gui.GetForegroundWindow()
# Titre de la fenêtre
title = win32gui.GetWindowText(hwnd)
if not title:
title = "unknown_window"
# PID du processus
_, pid = win32process.GetWindowThreadProcessId(hwnd)
# Nom du processus
try:
process = psutil.Process(pid)
app_name = process.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}
except ImportError:
# pywin32 ou psutil non installé
return {
"title": "unknown_window (pywin32 missing)",
"app_name": "unknown_app (pywin32 missing)",
}
except Exception as e:
return {
"title": f"error: {e}",
"app_name": "unknown_app",
}
def _get_window_info_macos() -> Dict[str, str]:
"""
macOS: utilise pyobjc (AppKit)
Nécessite: pip install pyobjc-framework-Cocoa
Note: Nécessite les permissions "Accessibility" dans System Preferences
"""
try:
from AppKit import NSWorkspace
from Quartz import (
CGWindowListCopyWindowInfo,
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID
)
# Application active
active_app = NSWorkspace.sharedWorkspace().activeApplication()
app_name = active_app.get('NSApplicationName', 'unknown_app')
# Titre de la fenêtre (via Quartz)
# On cherche la fenêtre de l'app active qui est au premier plan
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID
)
title = "unknown_window"
for window in window_list:
owner_name = window.get('kCGWindowOwnerName', '')
if owner_name == app_name:
window_title = window.get('kCGWindowName', '')
if window_title:
title = window_title
break
return {
"title": title,
"app_name": app_name,
}
except ImportError:
# pyobjc non installé
return {
"title": "unknown_window (pyobjc missing)",
"app_name": "unknown_app (pyobjc missing)",
}
except Exception as e:
return {
"title": f"error: {e}",
"app_name": "unknown_app",
}
# Test rapide
if __name__ == "__main__":
import time
print(f"OS détecté: {platform.system()}")
print("\nTest de capture fenêtre active (5 secondes)...")
print("Changez de fenêtre pour tester!\n")
for i in range(5):
info = get_active_window_info()
print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}")
time.sleep(1)

View File

@@ -1,13 +1,6 @@
# agent_v0.lea_ui — Interface utilisateur "Lea" # agent_v0.lea_ui — Communication serveur pour l'agent Léa
# #
# Panneau PyQt5 integre qui remplace le system tray + navigateur web # Composant :
# par une interface unifiee pour piloter l'Agent RPA Vision V3.
#
# Composants :
# - LeaMainWindow : fenetre principale ancree a droite
# - ChatWidget : zone de conversation avec le serveur
# - OverlayWidget : feedback visuel pendant le replay
# - LeaServerClient : client API vers le serveur Linux # - LeaServerClient : client API vers le serveur Linux
# - styles : theme et couleurs
__version__ = "0.1.0" __version__ = "0.1.0"

View File

@@ -1,6 +0,0 @@
# agent_v0/lea_ui/__main__.py
"""Permet le lancement via: python -m agent_v0.lea_ui"""
from .launcher import main
main()

View File

@@ -1,250 +0,0 @@
# agent_v0/lea_ui/chat_widget.py
"""
Widget de chat pour l'interface Lea.
Affiche les messages avec des bulles :
- Utilisateur a droite (fond indigo)
- Lea a gauche (fond blanc)
Communique avec le serveur Linux via LeaServerClient.
"""
from __future__ import annotations
import logging
from typing import List, Optional
from PyQt5.QtCore import (
QPropertyAnimation,
QSize,
Qt,
QTimer,
pyqtSignal,
pyqtSlot,
)
from PyQt5.QtGui import QColor, QFont, QPainter, QPainterPath, QPen
from PyQt5.QtWidgets import (
QFrame,
QHBoxLayout,
QLabel,
QLineEdit,
QPushButton,
QScrollArea,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from . import styles
logger = logging.getLogger("lea_ui.chat")
class ChatBubble(QFrame):
"""Bulle de message individuelle."""
def __init__(
self,
text: str,
is_user: bool = False,
parent: Optional[QWidget] = None,
) -> None:
super().__init__(parent)
self._is_user = is_user
# Style de la bulle
if is_user:
bg_color = styles.COLOR_BUBBLE_USER
text_color = styles.COLOR_TEXT_ON_ACCENT
align = Qt.AlignRight
else:
bg_color = styles.COLOR_BUBBLE_LEA
text_color = styles.COLOR_TEXT
align = Qt.AlignLeft
self.setStyleSheet(f"""
QFrame {{
background-color: {bg_color};
border-radius: {styles.BUBBLE_RADIUS}px;
padding: {styles.PADDING}px;
border: {"none" if is_user else f"1px solid {styles.COLOR_BORDER}"};
}}
""")
layout = QVBoxLayout(self)
layout.setContentsMargins(
styles.PADDING, styles.PADDING // 2,
styles.PADDING, styles.PADDING // 2,
)
label = QLabel(text)
label.setWordWrap(True)
label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_NORMAL))
label.setStyleSheet(f"color: {text_color}; background: transparent; border: none;")
label.setTextFormat(Qt.RichText)
label.setOpenExternalLinks(True)
layout.addWidget(label)
self.setSizePolicy(QSizePolicy.Preferred, QSizePolicy.Minimum)
self.setMaximumWidth(280)
class ChatWidget(QWidget):
"""Widget de chat complet avec zone de messages et champ de saisie.
Signals :
message_sent(str) : emis quand l'utilisateur envoie un message
"""
message_sent = pyqtSignal(str)
def __init__(self, parent: Optional[QWidget] = None) -> None:
super().__init__(parent)
self._messages: List[dict] = []
self._setup_ui()
def _setup_ui(self) -> None:
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(0)
# Zone de messages (scrollable)
self._scroll_area = QScrollArea()
self._scroll_area.setWidgetResizable(True)
self._scroll_area.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff)
self._scroll_area.setStyleSheet(styles.CHAT_AREA_STYLE)
self._messages_container = QWidget()
self._messages_container.setObjectName("ChatContainer")
self._messages_layout = QVBoxLayout(self._messages_container)
self._messages_layout.setContentsMargins(
styles.PADDING, styles.PADDING,
styles.PADDING, styles.PADDING,
)
self._messages_layout.setSpacing(styles.SPACING)
self._messages_layout.addStretch()
self._scroll_area.setWidget(self._messages_container)
layout.addWidget(self._scroll_area, stretch=1)
# Separateur
sep = QFrame()
sep.setFrameShape(QFrame.HLine)
sep.setStyleSheet(f"background-color: {styles.COLOR_BORDER}; max-height: 1px;")
layout.addWidget(sep)
# Zone de saisie
input_layout = QHBoxLayout()
input_layout.setContentsMargins(
styles.PADDING, styles.SPACING,
styles.PADDING, styles.SPACING,
)
input_layout.setSpacing(styles.SPACING)
self._input = QLineEdit()
self._input.setObjectName("ChatInput")
self._input.setPlaceholderText("Ecrivez un message...")
self._input.setStyleSheet(styles.INPUT_STYLE)
self._input.returnPressed.connect(self._on_send)
input_layout.addWidget(self._input, stretch=1)
self._send_btn = QPushButton("Envoyer")
self._send_btn.setObjectName("SendButton")
self._send_btn.setStyleSheet(styles.SEND_BUTTON_STYLE)
self._send_btn.setCursor(Qt.PointingHandCursor)
self._send_btn.clicked.connect(self._on_send)
input_layout.addWidget(self._send_btn)
layout.addLayout(input_layout)
def _on_send(self) -> None:
"""Envoyer le message saisi."""
text = self._input.text().strip()
if not text:
return
self._input.clear()
self.add_user_message(text)
self.message_sent.emit(text)
# ---------------------------------------------------------------------------
# API publique
# ---------------------------------------------------------------------------
def add_user_message(self, text: str) -> None:
"""Ajouter un message utilisateur (bulle a droite)."""
self._add_bubble(text, is_user=True)
def add_lea_message(self, text: str) -> None:
"""Ajouter un message de Lea (bulle a gauche)."""
self._add_bubble(text, is_user=False)
def add_system_message(self, text: str) -> None:
"""Ajouter un message systeme (centre, discret)."""
label = QLabel(text)
label.setFont(QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL))
label.setStyleSheet(
f"color: {styles.COLOR_TEXT_SECONDARY}; "
f"background: transparent; padding: 4px;"
)
label.setAlignment(Qt.AlignCenter)
label.setWordWrap(True)
# Inserer avant le stretch final
count = self._messages_layout.count()
self._messages_layout.insertWidget(count - 1, label)
self._scroll_to_bottom()
def set_input_enabled(self, enabled: bool) -> None:
"""Activer/desactiver la saisie (pendant le chargement)."""
self._input.setEnabled(enabled)
self._send_btn.setEnabled(enabled)
if not enabled:
self._input.setPlaceholderText("Lea reflechit...")
else:
self._input.setPlaceholderText("Ecrivez un message...")
def clear_messages(self) -> None:
"""Effacer tous les messages."""
while self._messages_layout.count() > 1:
item = self._messages_layout.takeAt(0)
widget = item.widget()
if widget:
widget.deleteLater()
self._messages = []
# ---------------------------------------------------------------------------
# Internals
# ---------------------------------------------------------------------------
def _add_bubble(self, text: str, is_user: bool) -> None:
"""Ajouter une bulle au conteneur de messages."""
bubble = ChatBubble(text, is_user=is_user)
# Conteneur d'alignement
row = QHBoxLayout()
row.setContentsMargins(0, 0, 0, 0)
if is_user:
row.addStretch()
row.addWidget(bubble)
else:
row.addWidget(bubble)
row.addStretch()
# Inserer avant le stretch final
count = self._messages_layout.count()
wrapper = QWidget()
wrapper.setLayout(row)
wrapper.setStyleSheet("background: transparent;")
self._messages_layout.insertWidget(count - 1, wrapper)
self._messages.append({"text": text, "is_user": is_user})
self._scroll_to_bottom()
def _scroll_to_bottom(self) -> None:
"""Scroller vers le bas apres l'ajout d'un message."""
QTimer.singleShot(50, lambda: (
self._scroll_area.verticalScrollBar().setValue(
self._scroll_area.verticalScrollBar().maximum()
)
))

View File

@@ -1,218 +0,0 @@
# agent_v0/lea_ui/launcher.py
"""
Point d'entree pour le panneau Lea.
Lancement autonome :
python -m agent_v0.lea_ui.launcher
Ou integre dans agent_v0/agent_v1/main.py avec flag --ui lea.
Ce module :
1. Cree l'application Qt
2. Instancie LeaServerClient
3. Instancie LeaMainWindow
4. Enregistre un raccourci global (Ctrl+Shift+L) via keyboard hook
5. Lance la boucle Qt
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
from typing import Optional
logger = logging.getLogger("lea_ui.launcher")
def _setup_logging(verbose: bool = False) -> None:
"""Configurer le logging pour le panneau Lea."""
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%H:%M:%S",
)
def _setup_global_hotkey(window) -> Optional[object]:
"""Enregistrer le raccourci global Ctrl+Shift+L pour afficher/cacher le panneau.
Utilise la librairie keyboard si disponible (Windows/Linux).
Retourne le hook pour pouvoir le desinscrire a l'arret.
"""
try:
import keyboard
def on_hotkey():
# Appeler toggle_visibility dans le thread Qt
from PyQt5.QtCore import QTimer
QTimer.singleShot(0, window.toggle_visibility)
keyboard.add_hotkey("ctrl+shift+l", on_hotkey)
logger.info("Raccourci global Ctrl+Shift+L enregistre")
return True
except ImportError:
logger.info(
"Librairie 'keyboard' non disponible — "
"raccourci global Ctrl+Shift+L non enregistre. "
"Installez-la avec: pip install keyboard"
)
return None
except Exception as e:
logger.warning("Impossible d'enregistrer le raccourci global : %s", e)
return None
def _load_environment() -> None:
"""Charger les variables d'environnement depuis .env.local."""
env_paths = [
os.path.join(os.path.dirname(__file__), "..", "..", ".env.local"),
os.path.join(os.path.dirname(__file__), "..", ".env.local"),
]
for env_path in env_paths:
env_path = os.path.abspath(env_path)
if os.path.exists(env_path):
try:
from dotenv import load_dotenv
load_dotenv(env_path)
logger.info("Variables d'environnement chargees depuis %s", env_path)
return
except ImportError:
# Fallback : chargement manuel
with open(env_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
value = value.strip("\"'")
os.environ[key.strip()] = value
logger.info("Variables chargees manuellement depuis %s", env_path)
return
def launch_lea(
server_host: Optional[str] = None,
chat_port: int = 5004,
stream_port: int = 5005,
verbose: bool = False,
session_id: Optional[str] = None,
) -> None:
"""Lancer le panneau Lea.
Args:
server_host: adresse du serveur Linux (None = auto-detection)
chat_port: port du serveur chat
stream_port: port du serveur streaming
verbose: mode debug
session_id: identifiant de session pour le polling replay
"""
_setup_logging(verbose)
_load_environment()
# Import PyQt5 ici pour un message d'erreur clair si absent
try:
from PyQt5.QtWidgets import QApplication
from PyQt5.QtCore import Qt
except ImportError:
logger.error(
"PyQt5 n'est pas installe. Installez-le avec :\n"
" pip install PyQt5"
)
sys.exit(1)
from .server_client import LeaServerClient
from .main_window import LeaMainWindow
# Creer ou recuperer l'application Qt
app = QApplication.instance()
if app is None:
app = QApplication(sys.argv)
app.setQuitOnLastWindowClosed(False)
# Client serveur
client = LeaServerClient(
server_host=server_host,
chat_port=chat_port,
stream_port=stream_port,
)
# Fenetre principale
window = LeaMainWindow(server_client=client)
window.show()
# Raccourci global
hotkey = _setup_global_hotkey(window)
# Polling replay (si session_id fourni)
if session_id:
client.start_polling(session_id)
logger.info(
"Panneau Lea demarre — serveur=%s, chat_port=%d, stream_port=%d",
client.server_host, chat_port, stream_port,
)
# Boucle Qt
try:
exit_code = app.exec_()
finally:
window.shutdown()
if hotkey:
try:
import keyboard
keyboard.unhook_all()
except Exception:
pass
sys.exit(exit_code)
def main() -> None:
"""Point d'entree CLI."""
parser = argparse.ArgumentParser(
description="Panneau Lea — Interface utilisateur RPA Vision V3",
)
parser.add_argument(
"--server", "-s",
dest="server_host",
default=None,
help="Adresse du serveur Linux (defaut: RPA_SERVER_HOST ou localhost)",
)
parser.add_argument(
"--chat-port",
type=int,
default=5004,
help="Port du serveur chat (defaut: 5004)",
)
parser.add_argument(
"--stream-port",
type=int,
default=5005,
help="Port du serveur streaming (defaut: 5005)",
)
parser.add_argument(
"--session-id",
default=None,
help="Identifiant de session pour le polling replay",
)
parser.add_argument(
"--verbose", "-v",
action="store_true",
help="Mode debug (logs verbeux)",
)
args = parser.parse_args()
launch_lea(
server_host=args.server_host,
chat_port=args.chat_port,
stream_port=args.stream_port,
verbose=args.verbose,
session_id=args.session_id,
)
if __name__ == "__main__":
main()

View File

@@ -1,772 +0,0 @@
# agent_v0/lea_ui/main_window.py
"""
Fenetre principale du panneau Lea.
Panneau semi-transparent, ancre a droite de l'ecran, toujours visible.
Peut etre reduit en mini-barre flottante (avatar + indicateur status).
Sections :
- Header : avatar "L" + status connexion
- Zone de chat : messages entrants/sortants (natif PyQt5)
- Zone de status : progression du replay
- Boutons rapides : "Apprends-moi", "Que sais-tu faire ?"
"""
from __future__ import annotations
import logging
from typing import Dict, Any, Optional
from PyQt5.QtCore import (
QPoint,
QPropertyAnimation,
QRect,
QSize,
Qt,
QTimer,
pyqtSignal,
pyqtSlot,
)
from PyQt5.QtGui import (
QColor,
QFont,
QIcon,
QKeySequence,
QPainter,
QPainterPath,
QPen,
)
from PyQt5.QtWidgets import (
QAction,
QApplication,
QDesktopWidget,
QFrame,
QGraphicsDropShadowEffect,
QHBoxLayout,
QLabel,
QProgressBar,
QPushButton,
QShortcut,
QSizePolicy,
QVBoxLayout,
QWidget,
)
from . import styles
from .chat_widget import ChatWidget
from .overlay import OverlayWidget
from .server_client import LeaServerClient
logger = logging.getLogger("lea_ui.main_window")
class LeaAvatar(QWidget):
"""Avatar rond avec l'initiale 'L'."""
def __init__(self, size: int = 40, parent: Optional[QWidget] = None) -> None:
super().__init__(parent)
self._size = size
self._connected = False
self.setFixedSize(size, size)
def set_connected(self, connected: bool) -> None:
self._connected = connected
self.update()
def paintEvent(self, event) -> None: # noqa: N802
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing, True)
# Cercle de fond
painter.setBrush(QColor(styles.COLOR_ACCENT))
painter.setPen(Qt.NoPen)
painter.drawEllipse(2, 2, self._size - 4, self._size - 4)
# Initiale "L"
painter.setPen(QColor(styles.COLOR_TEXT_ON_ACCENT))
font = QFont(styles.FONT_FAMILY, self._size // 3, QFont.Bold)
painter.setFont(font)
painter.drawText(
QRect(0, 0, self._size, self._size),
Qt.AlignCenter,
"L",
)
# Indicateur de connexion (petit cercle en bas a droite)
indicator_size = 12
ix = self._size - indicator_size - 1
iy = self._size - indicator_size - 1
indicator_color = (
QColor(styles.COLOR_SUCCESS) if self._connected
else QColor(styles.COLOR_ERROR)
)
painter.setBrush(indicator_color)
painter.setPen(QPen(QColor(styles.COLOR_BG), 2))
painter.drawEllipse(ix, iy, indicator_size, indicator_size)
painter.end()
class LeaMainWindow(QWidget):
"""Panneau principal de l'interface Lea.
Fenetre semi-transparente, ancree a droite de l'ecran.
Peut basculer en mode mini-barre.
"""
# Signal pour les actions de replay a afficher sur l'overlay
replay_action_received = pyqtSignal(dict)
def __init__(
self,
server_client: Optional[LeaServerClient] = None,
parent: Optional[QWidget] = None,
) -> None:
super().__init__(parent)
# Client serveur
self._client = server_client or LeaServerClient()
# Overlay de feedback
self._overlay = OverlayWidget()
# Mode courant
self._minimized = False
# Setup
self._setup_window()
self._setup_ui()
self._setup_shortcuts()
self._connect_signals()
self._start_connection_check()
# Message d'accueil
QTimer.singleShot(500, self._show_welcome)
# ---------------------------------------------------------------------------
# Setup
# ---------------------------------------------------------------------------
def _setup_window(self) -> None:
"""Configurer les proprietes de la fenetre."""
self.setWindowFlags(
Qt.WindowStaysOnTopHint
| Qt.FramelessWindowHint
| Qt.Tool
)
self.setAttribute(Qt.WA_TranslucentBackground, True)
self.setObjectName("LeaMainWindow")
# Dimensions et position (ancre a droite)
self.setFixedWidth(styles.PANEL_WIDTH)
self.setMinimumHeight(styles.PANEL_MIN_HEIGHT)
self._anchor_to_right()
# Ombre portee
shadow = QGraphicsDropShadowEffect()
shadow.setBlurRadius(20)
shadow.setColor(QColor(0, 0, 0, 60))
shadow.setOffset(0, 4)
self.setGraphicsEffect(shadow)
def _anchor_to_right(self) -> None:
"""Positionner le panneau ancre a droite de l'ecran."""
desktop = QApplication.desktop()
if desktop:
screen_rect = desktop.availableGeometry(desktop.primaryScreen())
x = screen_rect.right() - styles.PANEL_WIDTH - 10
y = screen_rect.top() + 40
height = screen_rect.height() - 80
self.setGeometry(x, y, styles.PANEL_WIDTH, height)
def _setup_ui(self) -> None:
"""Construire l'interface du panneau."""
# Conteneur principal avec fond et coins arrondis
self._main_layout = QVBoxLayout(self)
self._main_layout.setContentsMargins(0, 0, 0, 0)
self._main_layout.setSpacing(0)
# Widget de fond (pour appliquer le style)
self._bg_widget = QWidget()
self._bg_widget.setObjectName("LeaPanelBg")
self._bg_widget.setStyleSheet(f"""
QWidget#LeaPanelBg {{
background-color: {styles.COLOR_BG};
border-radius: {styles.BORDER_RADIUS}px;
border: 1px solid {styles.COLOR_BORDER};
}}
""")
bg_layout = QVBoxLayout(self._bg_widget)
bg_layout.setContentsMargins(0, 0, 0, 0)
bg_layout.setSpacing(0)
# --- Header ---
self._header = self._create_header()
bg_layout.addWidget(self._header)
# --- Chat ---
self._chat = ChatWidget()
bg_layout.addWidget(self._chat, stretch=1)
# --- Zone de status replay ---
self._status_bar = self._create_status_bar()
bg_layout.addWidget(self._status_bar)
# --- Boutons rapides ---
self._quick_buttons = self._create_quick_buttons()
bg_layout.addWidget(self._quick_buttons)
self._main_layout.addWidget(self._bg_widget)
# --- Mini-barre (cachee par defaut) ---
self._mini_bar = self._create_mini_bar()
self._mini_bar.hide()
self._main_layout.addWidget(self._mini_bar)
def _create_header(self) -> QWidget:
"""Creer le header avec avatar et status."""
header = QWidget()
header.setObjectName("LeaHeader")
header.setStyleSheet(styles.HEADER_STYLE)
header.setFixedHeight(60)
layout = QHBoxLayout(header)
layout.setContentsMargins(
styles.PADDING, styles.SPACING,
styles.PADDING, styles.SPACING,
)
# Avatar
self._avatar = LeaAvatar(styles.AVATAR_SIZE)
layout.addWidget(self._avatar)
# Titre + status
text_layout = QVBoxLayout()
text_layout.setSpacing(2)
title = QLabel("Lea")
title.setObjectName("LeaTitle")
title.setStyleSheet(styles.HEADER_STYLE)
text_layout.addWidget(title)
self._status_label = QLabel("Connexion...")
self._status_label.setObjectName("LeaStatus")
self._status_label.setStyleSheet(styles.HEADER_STYLE)
text_layout.addWidget(self._status_label)
layout.addLayout(text_layout, stretch=1)
# Bouton reduire
minimize_btn = QPushButton("_")
minimize_btn.setFixedSize(30, 30)
minimize_btn.setCursor(Qt.PointingHandCursor)
minimize_btn.setStyleSheet(f"""
QPushButton {{
background: transparent;
color: {styles.COLOR_TEXT_SECONDARY};
border: none;
border-radius: 15px;
font-size: 16px;
font-weight: bold;
}}
QPushButton:hover {{
background-color: {styles.COLOR_BORDER};
}}
""")
minimize_btn.clicked.connect(self.toggle_minimize)
layout.addWidget(minimize_btn)
return header
def _create_status_bar(self) -> QWidget:
"""Creer la barre de status du replay."""
container = QWidget()
container.setFixedHeight(50)
layout = QVBoxLayout(container)
layout.setContentsMargins(
styles.PADDING, styles.SPACING,
styles.PADDING, styles.SPACING,
)
layout.setSpacing(4)
self._replay_label = QLabel("")
self._replay_label.setObjectName("StatusLabel")
self._replay_label.setStyleSheet(styles.STATUS_LABEL_STYLE)
self._replay_label.hide()
layout.addWidget(self._replay_label)
self._progress_bar = QProgressBar()
self._progress_bar.setStyleSheet(styles.PROGRESS_STYLE)
self._progress_bar.setTextVisible(False)
self._progress_bar.hide()
layout.addWidget(self._progress_bar)
container.hide()
self._status_container = container
return container
def _create_quick_buttons(self) -> QWidget:
"""Creer les boutons d'action rapide."""
container = QWidget()
layout = QHBoxLayout(container)
layout.setContentsMargins(
styles.PADDING, styles.SPACING,
styles.PADDING, styles.PADDING,
)
layout.setSpacing(styles.SPACING)
btn_learn = QPushButton("Apprends-moi")
btn_learn.setObjectName("QuickButton")
btn_learn.setStyleSheet(styles.QUICK_BUTTON_STYLE)
btn_learn.setCursor(Qt.PointingHandCursor)
btn_learn.clicked.connect(self._on_learn_clicked)
layout.addWidget(btn_learn)
btn_list = QPushButton("Que sais-tu faire ?")
btn_list.setObjectName("QuickButton")
btn_list.setStyleSheet(styles.QUICK_BUTTON_STYLE)
btn_list.setCursor(Qt.PointingHandCursor)
btn_list.clicked.connect(self._on_list_clicked)
layout.addWidget(btn_list)
return container
def _create_mini_bar(self) -> QWidget:
"""Creer la mini-barre flottante (mode reduit)."""
bar = QWidget()
bar.setObjectName("MiniBar")
bar.setStyleSheet(styles.MINI_BAR_STYLE)
bar.setFixedSize(80, 50)
layout = QHBoxLayout(bar)
layout.setContentsMargins(8, 4, 8, 4)
mini_avatar = LeaAvatar(32)
self._mini_avatar = mini_avatar
layout.addWidget(mini_avatar)
expand_btn = QPushButton(">")
expand_btn.setFixedSize(24, 24)
expand_btn.setCursor(Qt.PointingHandCursor)
expand_btn.setStyleSheet(f"""
QPushButton {{
background: transparent;
color: {styles.COLOR_TEXT_SECONDARY};
border: none;
font-size: 14px;
font-weight: bold;
}}
QPushButton:hover {{
color: {styles.COLOR_ACCENT};
}}
""")
expand_btn.clicked.connect(self.toggle_minimize)
layout.addWidget(expand_btn)
return bar
def _setup_shortcuts(self) -> None:
"""Configurer les raccourcis globaux."""
# Ctrl+Shift+L pour afficher/cacher
# Note : Sur Windows, les raccourcis globaux necessitent
# un mecanisme supplementaire (keyboard hook). Ici on utilise
# le raccourci local qui fonctionne quand le panneau a le focus.
# Un hook global sera ajoute dans le launcher.
shortcut = QShortcut(QKeySequence("Ctrl+Shift+L"), self)
shortcut.activated.connect(self.toggle_visibility)
def _connect_signals(self) -> None:
"""Connecter les signaux internes."""
# Chat
self._chat.message_sent.connect(self._on_message_sent)
# Client serveur
self._client.set_on_connection_change(self._on_connection_changed)
self._client.set_on_replay_action(self._on_replay_action)
# Overlay
self._overlay.action_display_finished.connect(self._on_overlay_finished)
# Replay via signal (thread-safe)
self.replay_action_received.connect(self._handle_replay_action)
def _start_connection_check(self) -> None:
"""Demarrer le timer de verification de connexion."""
self._conn_timer = QTimer(self)
self._conn_timer.timeout.connect(self._check_connection)
self._conn_timer.start(10000) # Toutes les 10 secondes
# Premiere verification immediatement
QTimer.singleShot(1000, self._check_connection)
# ---------------------------------------------------------------------------
# Actions
# ---------------------------------------------------------------------------
def _show_welcome(self) -> None:
"""Afficher le message d'accueil."""
self._chat.add_lea_message(
"Bonjour ! Je suis <b>Lea</b>, votre assistante RPA.<br>"
"Je peux apprendre vos taches, les rejouer, "
"et vous montrer ce que je fais.<br><br>"
"Que souhaitez-vous faire ?"
)
@pyqtSlot(str)
def _on_message_sent(self, message: str) -> None:
"""Traiter un message envoye par l'utilisateur."""
self._chat.set_input_enabled(False)
# Envoyer au serveur dans un timer pour ne pas bloquer
QTimer.singleShot(100, lambda: self._send_to_server(message))
def _send_to_server(self, message: str) -> None:
"""Envoyer le message au serveur et afficher la reponse."""
response = self._client.send_chat_message(message)
if response is None:
self._chat.add_lea_message(
"Je n'arrive pas a joindre le serveur. "
"Verifiez que le serveur Linux est demarre."
)
elif "error" in response:
self._chat.add_lea_message(
f"Erreur : {response['error']}"
)
else:
# Extraire la reponse textuelle
reply_text = response.get("response", "")
if not reply_text:
# Construire une reponse a partir des donnees structurees
reply_text = self._format_response(response)
self._chat.add_lea_message(reply_text)
# Si un workflow a ete lance, mettre a jour la status bar
if response.get("success") and response.get("workflow"):
self._show_replay_status(
f"Execution : {response['workflow']}",
0, 1,
)
self._chat.set_input_enabled(True)
def _format_response(self, data: Dict[str, Any]) -> str:
"""Formater une reponse structuree du serveur en texte lisible."""
# Reponse de confirmation
if data.get("needs_confirmation"):
conf = data.get("confirmation", {})
return (
f"Voulez-vous que j'execute <b>{conf.get('workflow_name', '?')}</b> ?<br>"
f"Risque : {conf.get('risk_level', 'normal')}<br>"
"Repondez <b>oui</b> ou <b>non</b>."
)
# Liste de workflows
if "workflows" in data:
workflows = data["workflows"]
if not workflows:
return "Je ne connais aucun workflow pour le moment."
items = []
for wf in workflows[:10]:
name = wf.get("name", wf.get("id", "?"))
desc = wf.get("description", "")
items.append(f"- <b>{name}</b>{': ' + desc if desc else ''}")
result = "Voici ce que je sais faire :<br>" + "<br>".join(items)
if len(workflows) > 10:
result += f"<br><i>... et {len(workflows) - 10} autres</i>"
return result
# Workflow non trouve
if data.get("not_found"):
return (
f"Je ne trouve pas de workflow correspondant a "
f"'{data.get('query', '?')}'.<br>"
"Essayez 'Que sais-tu faire ?' pour voir la liste."
)
# Execution reussie
if data.get("success"):
return (
f"C'est parti ! J'execute <b>{data.get('workflow', '?')}</b>.<br>"
"Regardez l'ecran, je vais vous montrer ce que je fais."
)
# Confirmation/refus
if data.get("confirmed"):
return f"D'accord, je lance <b>{data.get('workflow', '?')}</b> !"
if data.get("denied"):
return "Pas de probleme, j'annule."
# Fallback
return str(data)
def _on_learn_clicked(self) -> None:
"""Action du bouton 'Apprends-moi'."""
self._chat.add_user_message("Apprends-moi une nouvelle tache")
self._chat.add_lea_message(
"D'accord ! Pour m'apprendre une tache :<br>"
"1. Cliquez sur <b>Demarrer</b> dans le tray Agent V1<br>"
"2. Effectuez votre tache normalement<br>"
"3. Cliquez sur <b>Terminer</b> quand c'est fini<br><br>"
"Je vais observer et apprendre automatiquement."
)
def _on_list_clicked(self) -> None:
"""Action du bouton 'Que sais-tu faire ?'."""
self._chat.add_user_message("Que sais-tu faire ?")
self._chat.set_input_enabled(False)
QTimer.singleShot(100, self._fetch_workflows)
def _fetch_workflows(self) -> None:
"""Recuperer et afficher la liste des workflows."""
workflows = self._client.list_workflows()
if workflows:
items = []
for wf in workflows[:15]:
name = wf.get("name", wf.get("id", "?"))
desc = wf.get("description", "")
items.append(f"- <b>{name}</b>{': ' + desc if desc else ''}")
text = "Voici les workflows que je connais :<br>" + "<br>".join(items)
if len(workflows) > 15:
text += f"<br><i>... et {len(workflows) - 15} autres</i>"
else:
text = (
"Je ne connais aucun workflow pour le moment.<br>"
"Apprenez-moi une tache avec le bouton 'Apprends-moi' !"
)
self._chat.add_lea_message(text)
self._chat.set_input_enabled(True)
# ---------------------------------------------------------------------------
# Connexion
# ---------------------------------------------------------------------------
def _check_connection(self) -> None:
"""Verifier la connexion au serveur (dans un timer)."""
connected = self._client.check_connection()
self._update_connection_ui(connected)
def _on_connection_changed(self, connected: bool) -> None:
"""Callback quand l'etat de connexion change."""
# Appeler dans le thread principal via QTimer
QTimer.singleShot(0, lambda: self._update_connection_ui(connected))
def _update_connection_ui(self, connected: bool) -> None:
"""Mettre a jour l'UI selon l'etat de connexion."""
self._avatar.set_connected(connected)
if hasattr(self, '_mini_avatar'):
self._mini_avatar.set_connected(connected)
if connected:
self._status_label.setText(
f"Connecte a {self._client.server_host}"
)
self._status_label.setStyleSheet(
f"color: {styles.COLOR_SUCCESS}; "
f"font-family: '{styles.FONT_FAMILY}'; "
f"font-size: {styles.FONT_SIZE_SMALL}px; "
f"background: transparent; border: none;"
)
else:
error = self._client.last_error or "Serveur injoignable"
self._status_label.setText(f"Deconnecte ({error[:30]})")
self._status_label.setStyleSheet(
f"color: {styles.COLOR_ERROR}; "
f"font-family: '{styles.FONT_FAMILY}'; "
f"font-size: {styles.FONT_SIZE_SMALL}px; "
f"background: transparent; border: none;"
)
# ---------------------------------------------------------------------------
# Replay & Overlay
# ---------------------------------------------------------------------------
def _on_replay_action(self, action: Dict[str, Any]) -> None:
"""Callback appelee depuis le thread de polling (pas thread-safe).
Emettre un signal pour traiter dans le thread Qt.
"""
self.replay_action_received.emit(action)
@pyqtSlot(dict)
def _handle_replay_action(self, action: Dict[str, Any]) -> None:
"""Traiter une action de replay dans le thread Qt.
Afficher l'overlay AVANT l'execution pour que l'utilisateur
voie ce qui va se passer.
"""
action_type = action.get("type", "?")
action_text = self._describe_action(action)
# Calculer les coordonnees ecran
desktop = QApplication.desktop()
screen = desktop.screenGeometry(desktop.primaryScreen()) if desktop else None
if screen:
sw, sh = screen.width(), screen.height()
else:
sw, sh = 1920, 1080
target_x = int(action.get("x_pct", 0.5) * sw)
target_y = int(action.get("y_pct", 0.5) * sh)
# Recuperer la progression depuis le replay status
replay = self._client.get_replay_status()
step_current = 0
step_total = 0
if replay:
step_total = replay.get("total_actions", 0)
step_current = replay.get("completed_actions", 0) + 1
# Mettre a jour la status bar
self._show_replay_status(action_text, step_current, step_total)
# Afficher l'overlay
self._overlay.show_action(
target_x, target_y,
action_text,
step_current, step_total,
duration_ms=1500,
)
# Ajouter dans le chat
self._chat.add_system_message(
f"Etape {step_current}/{step_total} : {action_text}"
)
def _describe_action(self, action: Dict[str, Any]) -> str:
"""Generer une description lisible d'une action de replay."""
action_type = action.get("type", "?")
target_text = action.get("target_text", "")
target_role = action.get("target_role", "")
if action_type == "click":
target = target_text or target_role or "cet element"
return f"Je clique sur [{target}]"
elif action_type == "type":
text = action.get("text", "")
preview = text[:30] + "..." if len(text) > 30 else text
return f"Je tape : {preview}"
elif action_type == "key_combo":
keys = action.get("keys", [])
return f"Je tape : {'+'.join(keys)}"
elif action_type == "scroll":
return "Je fais defiler la page"
elif action_type == "wait":
ms = action.get("duration_ms", 500)
return f"J'attends {ms}ms"
else:
return f"Action : {action_type}"
def _on_overlay_finished(self) -> None:
"""Callback quand l'overlay a fini d'afficher une action."""
pass # L'executor continue de son cote
def _show_replay_status(
self, text: str, current: int, total: int,
) -> None:
"""Afficher la barre de progression du replay."""
self._status_container.show()
self._replay_label.show()
self._replay_label.setText(text)
if total > 0:
self._progress_bar.show()
self._progress_bar.setMaximum(total)
self._progress_bar.setValue(current)
else:
self._progress_bar.hide()
def hide_replay_status(self) -> None:
"""Masquer la barre de progression du replay."""
self._status_container.hide()
# ---------------------------------------------------------------------------
# Visibilite
# ---------------------------------------------------------------------------
def toggle_visibility(self) -> None:
"""Afficher/cacher le panneau (raccourci Ctrl+Shift+L)."""
if self.isVisible():
self.hide()
else:
self.show()
self.raise_()
self.activateWindow()
def toggle_minimize(self) -> None:
"""Basculer entre panneau complet et mini-barre."""
if self._minimized:
# Restaurer
self._mini_bar.hide()
self._bg_widget.show()
self._minimized = False
self._anchor_to_right()
else:
# Reduire
self._bg_widget.hide()
self._mini_bar.show()
self._minimized = True
# Positionner la mini-barre en haut a droite
desktop = QApplication.desktop()
if desktop:
screen = desktop.availableGeometry(desktop.primaryScreen())
x = screen.right() - 90
y = screen.top() + 10
self.setGeometry(x, y, 80, 50)
# ---------------------------------------------------------------------------
# Drag (deplacer la fenetre sans barre de titre)
# ---------------------------------------------------------------------------
def mousePressEvent(self, event) -> None: # noqa: N802
if event.button() == Qt.LeftButton:
self._drag_pos = event.globalPos() - self.frameGeometry().topLeft()
event.accept()
def mouseMoveEvent(self, event) -> None: # noqa: N802
if event.buttons() == Qt.LeftButton and hasattr(self, '_drag_pos'):
self.move(event.globalPos() - self._drag_pos)
event.accept()
# ---------------------------------------------------------------------------
# Painting (fond arrondi semi-transparent)
# ---------------------------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
"""Peindre le fond semi-transparent avec coins arrondis."""
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing, True)
path = QPainterPath()
path.addRoundedRect(
0, 0, self.width(), self.height(),
styles.BORDER_RADIUS, styles.BORDER_RADIUS,
)
# Fond semi-transparent
bg = QColor(styles.COLOR_BG)
bg.setAlpha(245) # Legerement transparent
painter.fillPath(path, bg)
# Bordure
painter.setPen(QPen(QColor(styles.COLOR_BORDER), 1))
painter.drawPath(path)
painter.end()
# ---------------------------------------------------------------------------
# Lifecycle
# ---------------------------------------------------------------------------
def closeEvent(self, event) -> None: # noqa: N802
"""Ne pas fermer, juste cacher."""
event.ignore()
self.hide()
def shutdown(self) -> None:
"""Arret propre."""
self._conn_timer.stop()
self._overlay.hide_overlay()
self._client.shutdown()
logger.info("LeaMainWindow arretee")

View File

@@ -1,354 +0,0 @@
# agent_v0/lea_ui/overlay.py
"""
Overlay de feedback visuel pour le replay.
Fenetre transparente plein ecran, click-through, qui affiche :
- Cercle rouge pulsant autour de la cible du clic
- Texte descriptif de l'action en cours
- Fleche pointant vers la cible
- Barre de progression etape X/Y
Le overlay ne capture JAMAIS les clics (Qt.WA_TransparentForMouseEvents).
"""
from __future__ import annotations
import logging
import math
from typing import Optional, Tuple
from PyQt5.QtCore import (
QPoint,
QPropertyAnimation,
QRect,
QRectF,
QSize,
Qt,
QTimer,
pyqtProperty,
pyqtSignal,
)
from PyQt5.QtGui import (
QBrush,
QColor,
QFont,
QFontMetrics,
QPainter,
QPainterPath,
QPen,
QPolygonF,
)
from PyQt5.QtWidgets import QApplication, QDesktopWidget, QWidget
from . import styles
logger = logging.getLogger("lea_ui.overlay")
class OverlayWidget(QWidget):
"""Overlay plein ecran transparent pour le feedback visuel du replay.
Flags critiques :
- WindowStaysOnTopHint : toujours au-dessus
- FramelessWindowHint : pas de decoration
- Tool : n'apparait pas dans la barre des taches
- WA_TranslucentBackground : fond transparent
- WA_TransparentForMouseEvents : CLICK-THROUGH COMPLET
"""
# Signal emis quand l'animation d'une action est terminee
action_display_finished = pyqtSignal()
def __init__(self, parent: Optional[QWidget] = None) -> None:
super().__init__(parent)
# Flags de fenetre pour click-through complet
self.setWindowFlags(
Qt.WindowStaysOnTopHint
| Qt.FramelessWindowHint
| Qt.Tool
)
self.setAttribute(Qt.WA_TranslucentBackground, True)
self.setAttribute(Qt.WA_TransparentForMouseEvents, True)
# Etat de l'affichage
self._target_pos: Optional[Tuple[int, int]] = None
self._action_text: str = ""
self._progress_current: int = 0
self._progress_total: int = 0
self._action_done: bool = False
self._visible = False
# Animation du cercle pulsant
self._pulse_radius: float = 30.0
self._pulse_growing = True
self._pulse_opacity: float = 0.8
# Timer d'animation
self._anim_timer = QTimer(self)
self._anim_timer.timeout.connect(self._animate_pulse)
self._anim_timer.setInterval(30) # ~33 FPS
# Timer d'effacement automatique
self._fade_timer = QTimer(self)
self._fade_timer.setSingleShot(True)
self._fade_timer.timeout.connect(self._on_fade)
# Couvrir tout l'ecran
self._update_geometry()
def _update_geometry(self) -> None:
"""Positionner l'overlay sur tout l'ecran principal."""
desktop = QApplication.desktop()
if desktop:
screen_rect = desktop.screenGeometry(desktop.primaryScreen())
self.setGeometry(screen_rect)
# ---------------------------------------------------------------------------
# API publique
# ---------------------------------------------------------------------------
def show_action(
self,
target_x: int,
target_y: int,
text: str,
step_current: int = 0,
step_total: int = 0,
duration_ms: int = 1500,
) -> None:
"""Afficher le feedback pour une action de replay.
Args:
target_x: position X du clic cible (pixels ecran)
target_y: position Y du clic cible (pixels ecran)
text: description de l'action (ex: "Je clique sur [Valider]")
step_current: etape courante (1-indexed)
step_total: nombre total d'etapes
duration_ms: duree d'affichage en ms (defaut 1500ms)
"""
self._target_pos = (target_x, target_y)
self._action_text = text
self._progress_current = step_current
self._progress_total = step_total
self._action_done = False
self._pulse_radius = 30.0
self._pulse_opacity = 0.8
self._visible = True
self._update_geometry()
self.show()
self.raise_()
self._anim_timer.start()
# Programmer l'effacement
self._fade_timer.start(duration_ms)
self.update()
def show_done(self, text: Optional[str] = None) -> None:
"""Marquer l'action courante comme terminee (coche verte)."""
self._action_done = True
if text:
self._action_text = text
self.update()
# Effacer apres 800ms
self._fade_timer.start(800)
def hide_overlay(self) -> None:
"""Masquer immediatement l'overlay."""
self._anim_timer.stop()
self._fade_timer.stop()
self._visible = False
self._target_pos = None
self.hide()
# ---------------------------------------------------------------------------
# Animations
# ---------------------------------------------------------------------------
def _animate_pulse(self) -> None:
"""Animer le cercle pulsant."""
if self._action_done:
# Pas d'animation en mode "done"
return
pulse_speed = 0.8
if self._pulse_growing:
self._pulse_radius += pulse_speed
if self._pulse_radius >= 45.0:
self._pulse_growing = False
else:
self._pulse_radius -= pulse_speed
if self._pulse_radius <= 25.0:
self._pulse_growing = True
# Opacite qui suit le pulse
self._pulse_opacity = 0.5 + 0.3 * (
(self._pulse_radius - 25.0) / 20.0
)
self.update()
def _on_fade(self) -> None:
"""Callback apres le timer d'effacement."""
self._anim_timer.stop()
self._visible = False
self._target_pos = None
self.hide()
self.action_display_finished.emit()
# ---------------------------------------------------------------------------
# Rendu
# ---------------------------------------------------------------------------
def paintEvent(self, event) -> None: # noqa: N802
"""Dessiner l'overlay."""
if not self._visible or not self._target_pos:
return
painter = QPainter(self)
painter.setRenderHint(QPainter.Antialiasing, True)
tx, ty = self._target_pos
if self._action_done:
self._draw_done_indicator(painter, tx, ty)
else:
self._draw_pulse_circle(painter, tx, ty)
self._draw_arrow(painter, tx, ty)
self._draw_action_text(painter, tx, ty)
self._draw_progress_bar(painter)
painter.end()
def _draw_pulse_circle(self, painter: QPainter, cx: int, cy: int) -> None:
"""Dessiner le cercle rouge pulsant autour de la cible."""
# Cercle exterieur (pulsant, semi-transparent)
color = QColor(styles.COLOR_OVERLAY_PULSE)
color.setAlphaF(self._pulse_opacity * 0.4)
painter.setBrush(QBrush(color))
painter.setPen(Qt.NoPen)
painter.drawEllipse(
QPoint(cx, cy),
int(self._pulse_radius),
int(self._pulse_radius),
)
# Cercle interieur (fixe, plus opaque)
color_inner = QColor(styles.COLOR_OVERLAY_PULSE)
color_inner.setAlphaF(0.7)
pen = QPen(color_inner, 3)
painter.setPen(pen)
painter.setBrush(Qt.NoBrush)
painter.drawEllipse(QPoint(cx, cy), 20, 20)
# Point central
painter.setPen(Qt.NoPen)
painter.setBrush(QBrush(QColor(styles.COLOR_OVERLAY_PULSE)))
painter.drawEllipse(QPoint(cx, cy), 4, 4)
def _draw_done_indicator(self, painter: QPainter, cx: int, cy: int) -> None:
"""Dessiner l'indicateur de succes (cercle vert + coche)."""
# Cercle vert
color = QColor(styles.COLOR_SUCCESS)
color.setAlphaF(0.8)
painter.setBrush(QBrush(color))
painter.setPen(Qt.NoPen)
painter.drawEllipse(QPoint(cx, cy), 25, 25)
# Coche blanche
pen = QPen(QColor(styles.COLOR_TEXT_ON_ACCENT), 3)
pen.setCapStyle(Qt.RoundCap)
pen.setJoinStyle(Qt.RoundJoin)
painter.setPen(pen)
painter.setBrush(Qt.NoBrush)
path = QPainterPath()
path.moveTo(cx - 10, cy)
path.lineTo(cx - 3, cy + 8)
path.lineTo(cx + 12, cy - 8)
painter.drawPath(path)
def _draw_arrow(self, painter: QPainter, tx: int, ty: int) -> None:
"""Dessiner une fleche pointant vers la cible depuis le texte."""
# Position du texte (au-dessus ou en dessous selon l'espace)
text_y = ty - 80 if ty > 120 else ty + 80
text_x = max(100, min(tx, self.width() - 200))
# Ligne de la fleche
color = QColor(styles.COLOR_OVERLAY_PULSE)
color.setAlphaF(0.6)
pen = QPen(color, 2, Qt.DashLine)
painter.setPen(pen)
painter.drawLine(text_x, text_y + (15 if text_y < ty else -15), tx, ty)
def _draw_action_text(self, painter: QPainter, tx: int, ty: int) -> None:
"""Dessiner le texte descriptif de l'action."""
if not self._action_text:
return
# Positionner le texte au-dessus ou en dessous de la cible
text_y = ty - 90 if ty > 140 else ty + 70
font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_LARGE, QFont.Bold)
painter.setFont(font)
metrics = QFontMetrics(font)
# Mesurer le texte
text_rect = metrics.boundingRect(self._action_text)
text_width = text_rect.width() + 30
text_height = text_rect.height() + 16
# Centrer horizontalement sur la cible (avec limites d'ecran)
box_x = max(10, min(tx - text_width // 2, self.width() - text_width - 10))
box_y = text_y - text_height // 2
# Fond semi-transparent arrondi
bg_color = QColor(31, 41, 55, 200) # Gris fonce semi-transparent
painter.setBrush(QBrush(bg_color))
painter.setPen(Qt.NoPen)
painter.drawRoundedRect(box_x, box_y, text_width, text_height, 8, 8)
# Texte blanc
painter.setPen(QPen(QColor(styles.COLOR_OVERLAY_TEXT)))
painter.drawText(
QRect(box_x, box_y, text_width, text_height),
Qt.AlignCenter,
self._action_text,
)
def _draw_progress_bar(self, painter: QPainter) -> None:
"""Dessiner la barre de progression en bas de l'ecran."""
if self._progress_total <= 0:
return
bar_width = 300
bar_height = 6
bar_x = (self.width() - bar_width) // 2
bar_y = self.height() - 50
# Fond
bg_color = QColor(255, 255, 255, 80)
painter.setBrush(QBrush(bg_color))
painter.setPen(Qt.NoPen)
painter.drawRoundedRect(bar_x, bar_y, bar_width, bar_height, 3, 3)
# Progression
progress_pct = self._progress_current / self._progress_total
fill_width = int(bar_width * progress_pct)
accent_color = QColor(styles.COLOR_ACCENT)
accent_color.setAlphaF(0.9)
painter.setBrush(QBrush(accent_color))
painter.drawRoundedRect(bar_x, bar_y, fill_width, bar_height, 3, 3)
# Label "Etape X/Y"
label_font = QFont(styles.FONT_FAMILY, styles.FONT_SIZE_SMALL)
painter.setFont(label_font)
painter.setPen(QPen(QColor(255, 255, 255, 200)))
painter.drawText(
QRect(bar_x, bar_y + bar_height + 4, bar_width, 20),
Qt.AlignCenter,
f"Etape {self._progress_current}/{self._progress_total}",
)

View File

@@ -1,191 +0,0 @@
# agent_v0/lea_ui/replay_integration.py
"""
Integration du feedback visuel (overlay) dans la boucle de replay de l'Agent V1.
Ce module fournit un wrapper autour de ActionExecutorV1.execute_replay_action
qui affiche l'overlay AVANT chaque action et la marque comme terminee APRES.
Sequence pour chaque action :
1. Afficher l'overlay avec la description de l'action (1.5s)
2. Attendre que l'overlay ait ete vu par l'utilisateur
3. Executer l'action
4. Mettre a jour l'overlay (coche verte)
5. Passer a l'action suivante
"""
from __future__ import annotations
import logging
import time
from typing import Any, Callable, Dict, Optional, Tuple
logger = logging.getLogger("lea_ui.replay_integration")
# Delai d'affichage de l'overlay avant execution (secondes)
PRE_ACTION_DELAY = 1.5
# Delai apres la coche verte (secondes)
POST_ACTION_DELAY = 0.5
class ReplayOverlayBridge:
"""Pont entre la boucle de replay et l'overlay.
Fonctionne de maniere thread-safe : la boucle de replay tourne dans
un thread daemon, et l'overlay est controle via des signaux Qt.
L'overlay est optionnel — si non connecte, l'execution continue normalement.
"""
def __init__(self) -> None:
self._overlay = None
self._show_callback: Optional[Callable] = None
self._done_callback: Optional[Callable] = None
self._hide_callback: Optional[Callable] = None
self._enabled = False
# Compteur de progression
self._step_current = 0
self._step_total = 0
def connect_overlay(
self,
show_fn: Callable[[int, int, str, int, int, int], None],
done_fn: Callable[[Optional[str]], None],
hide_fn: Callable[[], None],
) -> None:
"""Connecter les callbacks de l'overlay.
Args:
show_fn: overlay.show_action(target_x, target_y, text, step, total, duration_ms)
done_fn: overlay.show_done(text)
hide_fn: overlay.hide_overlay()
"""
self._show_callback = show_fn
self._done_callback = done_fn
self._hide_callback = hide_fn
self._enabled = True
logger.info("Overlay connecte au bridge de replay")
def disconnect_overlay(self) -> None:
"""Deconnecter l'overlay."""
self._show_callback = None
self._done_callback = None
self._hide_callback = None
self._enabled = False
def set_total_steps(self, total: int) -> None:
"""Definir le nombre total d'etapes du replay."""
self._step_total = total
self._step_current = 0
def wrap_execute(
self,
action: Dict[str, Any],
executor_fn: Callable[[Dict[str, Any]], Dict[str, Any]],
screen_width: int = 1920,
screen_height: int = 1080,
) -> Dict[str, Any]:
"""Wrapper autour de l'execution d'une action avec feedback overlay.
Args:
action: action normalisee (type, x_pct, y_pct, text, keys, ...)
executor_fn: fonction d'execution (ex: ActionExecutorV1.execute_replay_action)
screen_width: largeur de l'ecran en pixels
screen_height: hauteur de l'ecran en pixels
Returns:
Resultat de l'execution (dict avec success, error, screenshot, ...)
"""
self._step_current += 1
if not self._enabled or not self._show_callback:
# Pas d'overlay — execution directe
return executor_fn(action)
# --- 1. Afficher l'overlay ---
action_text = self._describe_action(action)
target_x, target_y = self._get_target_coords(action, screen_width, screen_height)
try:
self._show_callback(
target_x, target_y,
action_text,
self._step_current,
self._step_total,
int(PRE_ACTION_DELAY * 1000),
)
except Exception as e:
logger.warning("Erreur affichage overlay : %s", e)
# --- 2. Attendre que l'utilisateur ait vu ---
time.sleep(PRE_ACTION_DELAY)
# --- 3. Executer l'action ---
result = executor_fn(action)
# --- 4. Marquer comme terminee ---
if result.get("success"):
done_text = f"{action_text} OK"
else:
done_text = f"{action_text} ECHEC"
try:
if self._done_callback:
self._done_callback(done_text)
except Exception as e:
logger.warning("Erreur overlay done : %s", e)
time.sleep(POST_ACTION_DELAY)
# --- 5. Cacher si c'etait la derniere etape ---
if self._step_current >= self._step_total and self._hide_callback:
try:
self._hide_callback()
except Exception:
pass
return result
def _describe_action(self, action: Dict[str, Any]) -> str:
"""Generer une description lisible d'une action."""
action_type = action.get("type", "?")
target_text = action.get("target_text", "")
target_role = action.get("target_role", "")
if action_type == "click":
target = target_text or target_role or "cet element"
return f"Je clique sur [{target}]"
elif action_type == "type":
text = action.get("text", "")
preview = text[:25] + "..." if len(text) > 25 else text
return f"Je tape : {preview}"
elif action_type == "key_combo":
keys = action.get("keys", [])
return f"Combinaison : {'+'.join(keys)}"
elif action_type == "scroll":
return "Defilement"
elif action_type == "wait":
ms = action.get("duration_ms", 500)
return f"Attente {ms}ms"
else:
return f"Action : {action_type}"
def _get_target_coords(
self, action: Dict[str, Any], sw: int, sh: int,
) -> Tuple[int, int]:
"""Calculer les coordonnees cible en pixels."""
x_pct = action.get("x_pct", 0.5)
y_pct = action.get("y_pct", 0.5)
return int(x_pct * sw), int(y_pct * sh)
# Instance globale (singleton) pour l'integration
_bridge: Optional[ReplayOverlayBridge] = None
def get_replay_bridge() -> ReplayOverlayBridge:
"""Obtenir l'instance globale du bridge overlay/replay."""
global _bridge
if _bridge is None:
_bridge = ReplayOverlayBridge()
return _bridge

View File

@@ -1,200 +0,0 @@
# agent_v0/lea_ui/styles.py
"""
Theme et couleurs pour l'interface Lea.
Palette douce et moderne, pensee pour ne pas fatiguer les yeux
lors d'une utilisation prolongee sur un poste de travail Windows.
"""
# ---------------------------------------------------------------------------
# Palette de couleurs
# ---------------------------------------------------------------------------
# Fond principal
COLOR_BG = "#F5F7FA"
# Fond secondaire (sidebar, header)
COLOR_BG_SECONDARY = "#EEF1F6"
# Fond des bulles utilisateur
COLOR_BUBBLE_USER = "#6366F1"
# Fond des bulles Lea
COLOR_BUBBLE_LEA = "#FFFFFF"
# Accent principal (indigo)
COLOR_ACCENT = "#6366F1"
# Accent hover
COLOR_ACCENT_HOVER = "#4F46E5"
# Texte principal
COLOR_TEXT = "#1F2937"
# Texte secondaire
COLOR_TEXT_SECONDARY = "#6B7280"
# Texte sur accent (blanc)
COLOR_TEXT_ON_ACCENT = "#FFFFFF"
# Bordure legere
COLOR_BORDER = "#E5E7EB"
# Succes (vert)
COLOR_SUCCESS = "#10B981"
# Erreur (rouge)
COLOR_ERROR = "#EF4444"
# Avertissement (orange)
COLOR_WARNING = "#F59E0B"
# Overlay rouge pulsant
COLOR_OVERLAY_PULSE = "#EF4444"
# Overlay texte
COLOR_OVERLAY_TEXT = "#FFFFFF"
# Overlay fond info
COLOR_OVERLAY_INFO_BG = "rgba(31, 41, 55, 200)"
# ---------------------------------------------------------------------------
# Typographie
# ---------------------------------------------------------------------------
FONT_FAMILY = "Segoe UI"
FONT_SIZE_SMALL = 11
FONT_SIZE_NORMAL = 13
FONT_SIZE_LARGE = 15
FONT_SIZE_TITLE = 18
# ---------------------------------------------------------------------------
# Dimensions
# ---------------------------------------------------------------------------
# Largeur du panneau Lea
PANEL_WIDTH = 380
# Hauteur minimale
PANEL_MIN_HEIGHT = 500
# Rayon des coins arrondis
BORDER_RADIUS = 12
# Rayon des bulles de chat
BUBBLE_RADIUS = 16
# Padding interne
PADDING = 12
# Taille de l'avatar
AVATAR_SIZE = 40
# Marge entre les elements
SPACING = 8
# ---------------------------------------------------------------------------
# Stylesheet global du panneau Lea
# ---------------------------------------------------------------------------
MAIN_WINDOW_STYLE = f"""
QWidget#LeaMainWindow {{
background-color: {COLOR_BG};
border-radius: {BORDER_RADIUS}px;
border: 1px solid {COLOR_BORDER};
}}
"""
HEADER_STYLE = f"""
QWidget#LeaHeader {{
background-color: {COLOR_BG_SECONDARY};
border-top-left-radius: {BORDER_RADIUS}px;
border-top-right-radius: {BORDER_RADIUS}px;
border-bottom: 1px solid {COLOR_BORDER};
}}
QLabel#LeaTitle {{
color: {COLOR_TEXT};
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_TITLE}px;
font-weight: bold;
}}
QLabel#LeaStatus {{
color: {COLOR_TEXT_SECONDARY};
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_SMALL}px;
}}
"""
CHAT_AREA_STYLE = f"""
QScrollArea {{
border: none;
background-color: {COLOR_BG};
}}
QWidget#ChatContainer {{
background-color: {COLOR_BG};
}}
"""
INPUT_STYLE = f"""
QLineEdit#ChatInput {{
background-color: {COLOR_BUBBLE_LEA};
border: 1px solid {COLOR_BORDER};
border-radius: 20px;
padding: 8px 16px;
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_NORMAL}px;
color: {COLOR_TEXT};
}}
QLineEdit#ChatInput:focus {{
border-color: {COLOR_ACCENT};
}}
"""
SEND_BUTTON_STYLE = f"""
QPushButton#SendButton {{
background-color: {COLOR_ACCENT};
color: {COLOR_TEXT_ON_ACCENT};
border: none;
border-radius: 20px;
padding: 8px 16px;
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_NORMAL}px;
font-weight: bold;
min-width: 50px;
}}
QPushButton#SendButton:hover {{
background-color: {COLOR_ACCENT_HOVER};
}}
QPushButton#SendButton:pressed {{
background-color: #3730A3;
}}
"""
QUICK_BUTTON_STYLE = f"""
QPushButton#QuickButton {{
background-color: {COLOR_BUBBLE_LEA};
color: {COLOR_ACCENT};
border: 1px solid {COLOR_ACCENT};
border-radius: 18px;
padding: 6px 14px;
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_SMALL}px;
}}
QPushButton#QuickButton:hover {{
background-color: {COLOR_ACCENT};
color: {COLOR_TEXT_ON_ACCENT};
}}
"""
PROGRESS_STYLE = f"""
QProgressBar {{
border: none;
border-radius: 4px;
background-color: {COLOR_BORDER};
text-align: center;
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_SMALL}px;
color: {COLOR_TEXT};
max-height: 8px;
}}
QProgressBar::chunk {{
background-color: {COLOR_ACCENT};
border-radius: 4px;
}}
"""
STATUS_LABEL_STYLE = f"""
QLabel#StatusLabel {{
color: {COLOR_TEXT_SECONDARY};
font-family: "{FONT_FAMILY}";
font-size: {FONT_SIZE_SMALL}px;
padding: 4px 8px;
}}
"""
MINI_BAR_STYLE = f"""
QWidget#MiniBar {{
background-color: {COLOR_BG_SECONDARY};
border-radius: 20px;
border: 1px solid {COLOR_BORDER};
}}
"""

View File

@@ -3326,21 +3326,13 @@ def _vlm_quick_find(
# Résolution Set-of-Mark : SomEngine (détection) + VLM (identification) # Résolution Set-of-Mark : SomEngine (détection) + VLM (identification)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_som_engine_api = None # Singleton
def _get_som_engine_api(): def _get_som_engine_api():
"""Singleton SomEngine pour la résolution visuelle (lazy-loaded, GPU).""" """Singleton SomEngine partagé."""
global _som_engine_api
if _som_engine_api is None:
try: try:
from core.detection.som_engine import SomEngine from core.detection.som_engine import get_shared_engine
_som_engine_api = SomEngine(device="cuda") return get_shared_engine()
logger.info("SomEngine API initialisé (lazy singleton)") except ImportError:
except Exception as e: return None
logger.warning("SomEngine API non disponible : %s", e)
_som_engine_api = False
return _som_engine_api if _som_engine_api is not False else None
def _resolve_by_som( def _resolve_by_som(
@@ -3423,7 +3415,7 @@ def _resolve_by_som(
if not exact_matches: if not exact_matches:
exact_matches = [ exact_matches = [
e for e in som_result.elements e for e in som_result.elements
if e.label and ( if e.label and len(e.label) >= 3 and (
label_lower in e.label.lower() label_lower in e.label.lower()
or e.label.lower() in label_lower or e.label.lower() in label_lower
) )
@@ -3493,6 +3485,10 @@ def _resolve_by_som(
) )
# ── 3. Sauvegarder l'image annotée SoM temporairement ── # ── 3. Sauvegarder l'image annotée SoM temporairement ──
if som_result.som_image is None:
logger.debug("SoM resolve : pas d'image annotée, skip VLM")
return None
import tempfile import tempfile
try: try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:

View File

@@ -431,21 +431,17 @@ def _needs_post_wait(action: dict) -> int:
# SomEngine — enrichissement Set-of-Mark des clics pendant le build_replay # SomEngine — enrichissement Set-of-Mark des clics pendant le build_replay
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_som_engine = None # Singleton, chargé à la demande _som_cache: Dict[str, Any] = {} # screenshot_id -> SomResult (cache build_replay)
_SOM_CACHE_MAX = 50
def _get_som_engine(): def _get_som_engine():
"""Singleton SomEngine (lazy-loaded, GPU).""" """Singleton SomEngine partagé."""
global _som_engine
if _som_engine is None:
try: try:
from core.detection.som_engine import SomEngine from core.detection.som_engine import get_shared_engine
_som_engine = SomEngine(device="cuda") return get_shared_engine()
logger.info("SomEngine initialisé (lazy singleton)") except ImportError:
except Exception as e: return None
logger.warning("SomEngine non disponible : %s", e)
_som_engine = False # Marqueur "indisponible"
return _som_engine if _som_engine is not False else None
def _som_identify_clicked_element( def _som_identify_clicked_element(
@@ -486,6 +482,11 @@ def _som_identify_clicked_element(
if not full_path.is_file(): if not full_path.is_file():
return None return None
# Vérifier le cache SomResult par (session_dir, screenshot_id)
cache_key = f"{session_dir}:{screenshot_id}"
if cache_key in _som_cache:
result = _som_cache[cache_key]
else:
try: try:
from PIL import Image from PIL import Image
img = Image.open(full_path).convert("RGB") img = Image.open(full_path).convert("RGB")
@@ -500,6 +501,20 @@ def _som_identify_clicked_element(
logger.warning("SoM: erreur d'analyse : %s", e) logger.warning("SoM: erreur d'analyse : %s", e)
return None return None
# Stocker dans le cache (éléments seulement, pas l'image annotée)
from core.detection.som_engine import SomResult
cached = SomResult(
elements=result.elements,
width=result.width,
height=result.height,
analysis_time_ms=result.analysis_time_ms,
)
if len(_som_cache) >= _SOM_CACHE_MAX:
# Supprimer la plus ancienne entrée (FIFO)
oldest_key = next(iter(_som_cache))
del _som_cache[oldest_key]
_som_cache[cache_key] = cached
if not result.elements: if not result.elements:
return None return None

View File

@@ -1,55 +0,0 @@
# window_info.py
"""
Récupération des informations sur la fenêtre active (X11).
v0 :
- utilise xdotool pour obtenir :
- le titre de la fenêtre active
- le PID de la fenêtre active, puis le nom du process via ps
Si quelque chose ne fonctionne pas, on renvoie des valeurs "unknown".
"""
from __future__ import annotations
import subprocess
from typing import Dict, Optional
def _run_cmd(cmd: list[str]) -> Optional[str]:
"""Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return out.decode("utf-8", errors="ignore").strip()
except Exception:
return None
def get_active_window_info() -> Dict[str, str]:
"""
Renvoie un dict :
{
"title": "...",
"app_name": "..."
}
Nécessite xdotool installé sur le système.
"""
title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
app_name: Optional[str] = None
if pid_str:
pid_str = pid_str.strip()
# On récupère le nom du binaire via ps
app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
if not title:
title = "unknown_window"
if not app_name:
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}

View File

@@ -1,192 +0,0 @@
# window_info_crossplatform.py
"""
Récupération des informations sur la fenêtre active - CROSS-PLATFORM
Supporte:
- Linux (X11 via xdotool)
- Windows (via pywin32)
- macOS (via pyobjc)
Installation des dépendances:
pip install pywin32 # Windows
pip install pyobjc-framework-Cocoa # macOS
pip install psutil # Tous OS
"""
from __future__ import annotations
import platform
import subprocess
from typing import Dict, Optional
def _run_cmd(cmd: list[str]) -> Optional[str]:
"""Exécute une commande et renvoie la sortie texte (strippée), ou None en cas d'erreur."""
try:
out = subprocess.check_output(cmd, stderr=subprocess.DEVNULL)
return out.decode("utf-8", errors="ignore").strip()
except Exception:
return None
def get_active_window_info() -> Dict[str, str]:
"""
Renvoie un dict :
{
"title": "...",
"app_name": "..."
}
Détecte automatiquement l'OS et utilise la méthode appropriée.
"""
system = platform.system()
if system == "Linux":
return _get_window_info_linux()
elif system == "Windows":
return _get_window_info_windows()
elif system == "Darwin": # macOS
return _get_window_info_macos()
else:
return {"title": "unknown_window", "app_name": "unknown_app"}
def _get_window_info_linux() -> Dict[str, str]:
"""
Linux: utilise xdotool (X11)
Nécessite: sudo apt-get install xdotool
"""
title = _run_cmd(["xdotool", "getactivewindow", "getwindowname"])
pid_str = _run_cmd(["xdotool", "getactivewindow", "getwindowpid"])
app_name: Optional[str] = None
if pid_str:
pid_str = pid_str.strip()
# On récupère le nom du binaire via ps
app_name = _run_cmd(["ps", "-p", pid_str, "-o", "comm="])
if not title:
title = "unknown_window"
if not app_name:
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}
def _get_window_info_windows() -> Dict[str, str]:
"""
Windows: utilise pywin32 + psutil
Nécessite: pip install pywin32 psutil
"""
try:
import win32gui
import win32process
import psutil
# Fenêtre au premier plan
hwnd = win32gui.GetForegroundWindow()
# Titre de la fenêtre
title = win32gui.GetWindowText(hwnd)
if not title:
title = "unknown_window"
# PID du processus
_, pid = win32process.GetWindowThreadProcessId(hwnd)
# Nom du processus
try:
process = psutil.Process(pid)
app_name = process.name()
except (psutil.NoSuchProcess, psutil.AccessDenied):
app_name = "unknown_app"
return {
"title": title,
"app_name": app_name,
}
except ImportError:
# pywin32 ou psutil non installé
return {
"title": "unknown_window (pywin32 missing)",
"app_name": "unknown_app (pywin32 missing)",
}
except Exception as e:
return {
"title": f"error: {e}",
"app_name": "unknown_app",
}
def _get_window_info_macos() -> Dict[str, str]:
"""
macOS: utilise pyobjc (AppKit)
Nécessite: pip install pyobjc-framework-Cocoa
Note: Nécessite les permissions "Accessibility" dans System Preferences
"""
try:
from AppKit import NSWorkspace
from Quartz import (
CGWindowListCopyWindowInfo,
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID
)
# Application active
active_app = NSWorkspace.sharedWorkspace().activeApplication()
app_name = active_app.get('NSApplicationName', 'unknown_app')
# Titre de la fenêtre (via Quartz)
# On cherche la fenêtre de l'app active qui est au premier plan
window_list = CGWindowListCopyWindowInfo(
kCGWindowListOptionOnScreenOnly,
kCGNullWindowID
)
title = "unknown_window"
for window in window_list:
owner_name = window.get('kCGWindowOwnerName', '')
if owner_name == app_name:
window_title = window.get('kCGWindowName', '')
if window_title:
title = window_title
break
return {
"title": title,
"app_name": app_name,
}
except ImportError:
# pyobjc non installé
return {
"title": "unknown_window (pyobjc missing)",
"app_name": "unknown_app (pyobjc missing)",
}
except Exception as e:
return {
"title": f"error: {e}",
"app_name": "unknown_app",
}
# Test rapide
if __name__ == "__main__":
import time
print(f"OS détecté: {platform.system()}")
print("\nTest de capture fenêtre active (5 secondes)...")
print("Changez de fenêtre pour tester!\n")
for i in range(5):
info = get_active_window_info()
print(f"[{i+1}] App: {info['app_name']:20s} | Title: {info['title']}")
time.sleep(1)

View File

@@ -25,6 +25,7 @@ from __future__ import annotations
import base64 import base64
import io import io
import logging import logging
import os
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
@@ -33,8 +34,10 @@ from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Chemin vers les poids YOLO d'OmniParser # Chemin vers les poids YOLO d'OmniParser (configurable via env)
_YOLO_WEIGHTS = Path("/home/dom/ai/OmniParser/weights/icon_detect/model.pt") _YOLO_WEIGHTS = Path(
os.environ.get("SOM_YOLO_WEIGHTS", "/home/dom/ai/OmniParser/weights/icon_detect/model.pt")
)
@dataclass @dataclass
@@ -165,17 +168,17 @@ class SomEngine:
# ── 2. docTR : OCR pour lire le texte ── # ── 2. docTR : OCR pour lire le texte ──
if self._ocr is not None: if self._ocr is not None:
try: try:
import numpy as np
from doctr.io import DocumentFile from doctr.io import DocumentFile
# Convertir PIL → fichier temporaire pour docTR # Convertir PIL → fichier temporaire pour docTR
import tempfile import tempfile
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
screenshot.save(tmp, format="JPEG", quality=90) screenshot.save(tmp, format="JPEG", quality=90)
tmp_path = tmp.name tmp_path = tmp.name
try:
doc = DocumentFile.from_images([tmp_path]) doc = DocumentFile.from_images([tmp_path])
import os
os.unlink(tmp_path)
result_ocr = self._ocr(doc) result_ocr = self._ocr(doc)
finally:
os.unlink(tmp_path)
for page in result_ocr.pages: for page in result_ocr.pages:
for block in page.blocks: for block in page.blocks:
@@ -288,3 +291,25 @@ class SomEngine:
buf = io.BytesIO() buf = io.BytesIO()
image.save(buf, format="JPEG", quality=quality) image.save(buf, format="JPEG", quality=quality)
return base64.b64encode(buf.getvalue()).decode() return base64.b64encode(buf.getvalue()).decode()
# ---------------------------------------------------------------------------
# Singleton partagé (lazy-loaded, thread-safe)
# ---------------------------------------------------------------------------
_shared_engine: Optional[SomEngine] = None
_shared_lock = __import__("threading").Lock()
def get_shared_engine(device: str = "cuda") -> Optional[SomEngine]:
"""Singleton SomEngine partagé entre tous les modules."""
global _shared_engine
if _shared_engine is None:
with _shared_lock:
if _shared_engine is None:
try:
_shared_engine = SomEngine(device=device)
logger.info("SomEngine singleton partagé initialisé")
except Exception as e:
logger.warning("SomEngine non disponible : %s", e)
return None
return _shared_engine