6 Commits

Author SHA1 Message Date
Dom
13390a71e7 fix: SomEngine resolve — raccourci texte + proximité, fallback VLM robuste
- Match texte exact avant partiel pour éviter les faux positifs
- Disambiguïsation par proximité (center_norm) quand plusieurs matchs
- Prompt VLM simplifié (liste labelée, 30 max, JSON concis)
- Fallback regex pour extraire un numéro de réponse VLM non-JSON
- Résultat : 0.3s par texte vs 5-15s par VLM

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 09:45:20 +02:00
Dom
4c76dca992 feat: intégration SomEngine dans build_replay (Phase 1) et resolve_target (Phase 2)
Phase 1 : enrichit chaque clic avec som_element (id, label, bbox) via YOLO+docTR
Phase 2 : nouvelle résolution SoM+VLM — SomEngine numérote, VLM identifie le mark
10 tests unitaires ajoutés, conftest unit/ pour le bon path agent_v0

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 09:30:14 +02:00
Dom
2ddccff108 feat: SomEngine — Set-of-Mark avec YOLO + docTR pour détection UI
- SomEngine : détecte et numérote tous les éléments UI d'un screenshot
- YOLO v8 (OmniParser) : détection icônes/boutons (~15ms GPU)
- docTR : OCR pour le texte visible
- Annotation visuelle : numéros rouges sur chaque élément
- find_element_at(x, y) : trouve l'élément cliqué par coordonnées
- Fix Florence-2 / transformers 4.57 incompatibilité (past_key_values)
- Testé : 107 éléments détectés sur screenshot Windows 2560x1600

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-31 08:26:07 +02:00
Dom
3417f09598 feat: auto-stop enregistrement (1h) + packaging Léa collaborateurs
- Auto-stop : notification 10 min avant, arrêt automatique après MAX_SESSION_DURATION_S (1h)
- Lea.bat : kill des anciens process (python, pythonw, rpa-agent) au démarrage
- LISEZMOI : simplifié pour les collaborateurs (pas de replay, juste collecte)
- Chat server (5004) vérifié fonctionnel

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 17:26:32 +02:00
Dom
bbe506c63a feat: contrôle visuel post-action (template matching + VLM fallback)
- Screenshots de référence (res_shot_XXXX.png) attachés aux actions click/key_combo
- _attach_expected_screenshots() charge les screenshots résultat de l'enregistrement
- _verify_visual_state() dans executor : 2 étages de vérification
  - Étage 1 : template matching rapide (~100ms), score > 0.7 = OK, < 0.3 = FAIL
  - Étage 2 : VLM compare current vs expected (~4s), MATCH/MISMATCH
- Résultat attaché à chaque action (visual_verification dans result)
- Note : executor sur Windows (/tmp/executor_win.py) à synchroniser manuellement

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:57:56 +02:00
Dom
647aa610fd feat: popup VLM double-appel, auth Bearer partout, texte AZERTY corrigé
- Popup handling via double appel VLM (détection + localisation précise du bouton)
- Reconstruction texte depuis raw_keys (numpad /, @ AltGr fusionné)
- Clipboard paste pour texte riche, raw_keys pour commandes simples (Win+R)
- Skip des release orphelins dans raw_keys (fix menu Démarrer parasite)
- Auth Bearer sur toutes les requêtes agent → streaming server
- Endpoints /replay/next et /stream/image publics (agent Rust legacy)
- alt_gr ajouté dans _MODIFIER_ONLY_KEYS
- _key_combo_printable_char détecte ctrl+@ comme caractère imprimable
- start.bat tue les anciens process (python + rpa-agent) au démarrage
- Heartbeat avec token Bearer dans main.py et deploy/

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 16:45:09 +02:00
15 changed files with 1515 additions and 80 deletions

View File

@@ -16,7 +16,7 @@ import logging
import threading import threading
from .config import ( from .config import (
SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, LOG_RETENTION_DAYS, SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, LOG_RETENTION_DAYS,
SCREEN_RESOLUTION, DPI_SCALE, OS_THEME, SCREEN_RESOLUTION, DPI_SCALE, OS_THEME, API_TOKEN, MAX_SESSION_DURATION_S,
) )
from .core.captor import EventCaptorV1 from .core.captor import EventCaptorV1
from .core.executor import ActionExecutorV1 from .core.executor import ActionExecutorV1
@@ -77,7 +77,14 @@ class AgentV1:
# Client serveur pour le chat et les workflows # Client serveur pour le chat et les workflows
self._server_client = None self._server_client = None
if LeaServerClient is not None: if LeaServerClient is not None:
self._server_client = LeaServerClient() # Forcer le token API pour éviter les 401
# (le token est set par start.bat dans l'environnement)
from .config import API_TOKEN as _token
server_host = os.getenv("RPA_SERVER_HOST", "localhost")
self._server_client = LeaServerClient(server_host=server_host)
if _token and not self._server_client._api_token:
self._server_client._api_token = _token
logger.info("Token API forcé dans LeaServerClient")
# Fenetre de chat Lea (tkinter natif) # Fenetre de chat Lea (tkinter natif)
server_host = ( server_host = (
@@ -129,6 +136,58 @@ class AgentV1:
time.sleep(30) time.sleep(30)
self.storage.run_auto_cleanup() self.storage.run_auto_cleanup()
def _auto_stop_loop(self):
"""Auto-stop de l'enregistrement après MAX_SESSION_DURATION_S.
L'utilisateur peut oublier d'arrêter. On notifie à 50 min,
puis on arrête automatiquement à 60 min (configurable).
"""
warn_before = 600 # Prévenir 10 min avant la fin
warned = False
while self.running and self.session_id:
elapsed = time.time() - self._session_start_time
remaining = MAX_SESSION_DURATION_S - elapsed
# Notification 10 min avant la fin
if not warned and remaining <= warn_before:
warned = True
mins = int(remaining / 60)
logger.info(f"Auto-stop dans {mins} min")
try:
from .ui.notifications import NotificationManager
NotificationManager().notify(
"Léa",
f"L'enregistrement s'arrêtera automatiquement dans {mins} minutes.",
)
except Exception:
pass
# Auto-stop
if remaining <= 0:
logger.info(
f"Auto-stop : session {self.session_id} après "
f"{int(elapsed)}s ({int(elapsed/60)} min)"
)
try:
from .ui.notifications import NotificationManager
NotificationManager().notify(
"Léa",
f"Enregistrement terminé automatiquement après "
f"{int(elapsed/60)} minutes. Merci !",
)
except Exception:
pass
# Arrêter via l'état partagé (synchronise systray + chat)
if self._state is not None:
self._state.stop_recording()
else:
self.stop_session()
break
time.sleep(30) # Vérifier toutes les 30s
def start_session(self, workflow_name): def start_session(self, workflow_name):
self.session_id = f"sess_{time.strftime('%Y%m%dT%H%M%S')}_{uuid.uuid4().hex[:6]}" self.session_id = f"sess_{time.strftime('%Y%m%dT%H%M%S')}_{uuid.uuid4().hex[:6]}"
self.session_dir = self.storage.get_session_dir(self.session_id) self.session_dir = self.storage.get_session_dir(self.session_id)
@@ -150,6 +209,11 @@ class AgentV1:
# Heartbeat Contextuel (Toutes les 5s par defaut) # Heartbeat Contextuel (Toutes les 5s par defaut)
threading.Thread(target=self._heartbeat_loop, daemon=True).start() threading.Thread(target=self._heartbeat_loop, daemon=True).start()
# Auto-stop : arrêter l'enregistrement après MAX_SESSION_DURATION_S
# L'utilisateur peut oublier d'arrêter — on le fait automatiquement
self._session_start_time = time.time()
threading.Thread(target=self._auto_stop_loop, daemon=True).start()
# Watchdog de Commandes (GHOST Replay — legacy fichier) # Watchdog de Commandes (GHOST Replay — legacy fichier)
threading.Thread(target=self._command_watchdog_loop, daemon=True).start() threading.Thread(target=self._command_watchdog_loop, daemon=True).start()
@@ -288,7 +352,8 @@ class AgentV1:
continue continue
self._last_bg_hash = img_hash self._last_bg_hash = img_hash
# Envoyer au streaming server # Envoyer au streaming server (avec token auth)
headers = {"Authorization": f"Bearer {API_TOKEN}"} if API_TOKEN else {}
with open(full_path, 'rb') as f: with open(full_path, 'rb') as f:
req.post( req.post(
f"{SERVER_URL}/traces/stream/image", f"{SERVER_URL}/traces/stream/image",
@@ -297,6 +362,7 @@ class AgentV1:
"shot_id": f"heartbeat_{int(time.time())}", "shot_id": f"heartbeat_{int(time.time())}",
"machine_id": self.machine_id, "machine_id": self.machine_id,
}, },
headers=headers,
files={"file": ("screenshot.png", f, "image/png")}, files={"file": ("screenshot.png", f, "image/png")},
timeout=10, timeout=10,
) )

View File

@@ -474,9 +474,14 @@ class SmartTrayV1:
try: try:
import requests import requests
# Auth headers pour le streaming server (port 5005)
auth_headers = {}
if self.server_client is not None:
auth_headers = self.server_client._auth_headers()
resp = requests.post( resp = requests.post(
f"{self.server_client._stream_base}/api/v1/traces/stream/replay/start", f"{self.server_client._stream_base}/api/v1/traces/stream/replay/start",
json={"workflow_id": workflow_id}, json={"workflow_id": workflow_id},
headers=auth_headers,
timeout=10, timeout=10,
) )
if resp.ok: if resp.ok:

View File

@@ -42,6 +42,10 @@ SERVER_URL = os.getenv("RPA_SERVER_URL", "http://localhost:5005/api/v1")
UPLOAD_ENDPOINT = f"{SERVER_URL}/traces/upload" UPLOAD_ENDPOINT = f"{SERVER_URL}/traces/upload"
STREAMING_ENDPOINT = f"{SERVER_URL}/traces/stream" STREAMING_ENDPOINT = f"{SERVER_URL}/traces/stream"
# Token d'authentification API (doit correspondre au token du serveur)
# Configurable via variable d'environnement RPA_API_TOKEN
API_TOKEN = os.environ.get("RPA_API_TOKEN", "")
# Paramètres de session # Paramètres de session
MAX_SESSION_DURATION_S = 60 * 60 # 1 heure MAX_SESSION_DURATION_S = 60 * 60 # 1 heure
SESSIONS_ROOT = BASE_DIR / "sessions" SESSIONS_ROOT = BASE_DIR / "sessions"

View File

@@ -14,7 +14,7 @@ import uuid
import time import time
import logging import logging
import threading import threading
from .config import SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID from .config import SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, API_TOKEN
from .core.captor import EventCaptorV1 from .core.captor import EventCaptorV1
from .core.executor import ActionExecutorV1 from .core.executor import ActionExecutorV1
from .network.streamer import TraceStreamer from .network.streamer import TraceStreamer
@@ -84,9 +84,11 @@ class AgentV1:
# Executeur pour le replay (doit exister avant le poll) # Executeur pour le replay (doit exister avant le poll)
self._executor = ActionExecutorV1() self._executor = ActionExecutorV1()
# Boucle de polling replay PERMANENTE (pas besoin de session active) # Boucles permanentes (pas besoin de session active)
self.running = True self.running = True
self._bg_vision = VisionCapturer(str(SESSIONS_ROOT / "_background"))
threading.Thread(target=self._replay_poll_loop, daemon=True).start() threading.Thread(target=self._replay_poll_loop, daemon=True).start()
threading.Thread(target=self._background_heartbeat_loop, daemon=True).start()
# UI Tray intelligent (remplace TrayAppV1, plus de PyQt5) # UI Tray intelligent (remplace TrayAppV1, plus de PyQt5)
self.ui = SmartTrayV1( self.ui = SmartTrayV1(
@@ -126,11 +128,59 @@ class AgentV1:
# Watchdog de Commandes (GHOST Replay — legacy fichier) # Watchdog de Commandes (GHOST Replay — legacy fichier)
threading.Thread(target=self._command_watchdog_loop, daemon=True).start() threading.Thread(target=self._command_watchdog_loop, daemon=True).start()
# Boucle de polling replay (P0-5 — pull depuis le serveur) # Note: la boucle de polling replay est déjà lancée dans __init__
threading.Thread(target=self._replay_poll_loop, daemon=True).start() # Ne PAS en relancer une ici — deux threads poll simultanés causent
# une race condition où les actions sont consommées mais pas exécutées.
logger.info(f"Session {self.session_id} ({workflow_name}) sur machine {self.machine_id} en cours...") logger.info(f"Session {self.session_id} ({workflow_name}) sur machine {self.machine_id} en cours...")
_last_bg_hash: str = ""
def _background_heartbeat_loop(self):
"""Heartbeat permanent — envoie un screenshot toutes les 5s au serveur.
Tourne même sans session active, pour que le VWB puisse capturer Windows.
"""
import requests as req
bg_session = f"bg_{self.machine_id}"
logger.info(f"[HEARTBEAT] Boucle permanente démarrée (session={bg_session})")
while self.running:
try:
# Ne pas envoyer pendant un enregistrement (le heartbeat session s'en charge)
if self.session_id:
time.sleep(5)
continue
full_path = self._bg_vision.capture_full_context("heartbeat")
if not full_path:
time.sleep(5)
continue
# Dédup : skip si écran identique
img_hash = self._quick_hash(full_path)
if img_hash and img_hash == self._last_bg_hash:
time.sleep(5)
continue
self._last_bg_hash = img_hash
# Envoyer au streaming server (avec token auth)
headers = {"Authorization": f"Bearer {API_TOKEN}"} if API_TOKEN else {}
with open(full_path, 'rb') as f:
req.post(
f"{SERVER_URL}/traces/stream/image",
params={
"session_id": bg_session,
"shot_id": f"heartbeat_{int(time.time())}",
"machine_id": self.machine_id,
},
headers=headers,
files={"file": ("screenshot.png", f, "image/png")},
timeout=10,
)
except Exception as e:
logger.debug(f"[HEARTBEAT] Erreur: {e}")
time.sleep(5)
def _command_watchdog_loop(self): def _command_watchdog_loop(self):
"""Surveille un fichier de commande pour executer des ordres visuels (legacy).""" """Surveille un fichier de commande pour executer des ordres visuels (legacy)."""
import json import json
@@ -143,7 +193,7 @@ class AgentV1:
else: else:
cmd_path = str(BASE_DIR / "command.json") cmd_path = str(BASE_DIR / "command.json")
while self.running: while self.running and self.session_id:
# Ne pas traiter les commandes fichier pendant un replay serveur # Ne pas traiter les commandes fichier pendant un replay serveur
if self._replay_active: if self._replay_active:
time.sleep(1) time.sleep(1)
@@ -181,8 +231,11 @@ class AgentV1:
time.sleep(REPLAY_POLL_INTERVAL) time.sleep(REPLAY_POLL_INTERVAL)
continue continue
# Utiliser la session active ou un ID par défaut pour le replay # TOUJOURS utiliser un session_id stable pour le replay.
poll_session = self.session_id or f"agent_{self.user_id}" # L'enregistrement et le replay sont indépendants : le serveur
# envoie les actions sur agent_{user_id}, pas sur la session
# d'enregistrement (sess_xxx).
poll_session = f"agent_{self.user_id}"
# Log periodique pour confirmer que la boucle tourne (toutes les 60s) # Log periodique pour confirmer que la boucle tourne (toutes les 60s)
poll_count += 1 poll_count += 1
@@ -226,18 +279,38 @@ class AgentV1:
time.sleep(max(poll_delay, REPLAY_POLL_INTERVAL)) time.sleep(max(poll_delay, REPLAY_POLL_INTERVAL))
def stop_session(self): def stop_session(self):
self.running = False # Arrêter la capture et le streaming de la session d'enregistrement
if self.captor: self.captor.stop() if self.captor: self.captor.stop()
if self.streamer: self.streamer.stop() if self.streamer: self.streamer.stop()
logger.info(f"Session {self.session_id} terminée.") logger.info(f"Session {self.session_id} terminée.")
# Reset le session_id pour que le poll replay utilise l'ID stable
self.session_id = None
# Reset le backoff de l'executor pour reprendre le polling immédiatement
if self._executor:
self._executor._poll_backoff = self._executor._poll_backoff_min
self._executor._server_available = True
if hasattr(self._executor, '_last_conn_error_logged'):
self._executor._last_conn_error_logged = False
# NE PAS mettre self.running = False ici !
# self.running contrôle la boucle _replay_poll_loop (permanente).
# Seule la sortie du programme doit le mettre à False.
logger.info(
f"Session arrêtée — replay poll actif avec session="
f"agent_{self.user_id}"
)
_last_heartbeat_hash: str = "" _last_heartbeat_hash: str = ""
def _heartbeat_loop(self): def _heartbeat_loop(self):
"""Capture périodique pour donner du contexte au stagiaire. """Capture périodique pour donner du contexte au stagiaire.
Déduplication : n'envoie que si l'écran a changé. Déduplication : n'envoie que si l'écran a changé.
Tourne tant que session_id est défini (= enregistrement actif).
""" """
while self.running: while self.running and self.session_id:
try: try:
full_path = self.vision.capture_full_context("heartbeat") full_path = self.vision.capture_full_context("heartbeat")
if full_path: if full_path:

View File

@@ -25,7 +25,7 @@ import time
import requests import requests
from PIL import Image from PIL import Image
from ..config import STREAMING_ENDPOINT from ..config import API_TOKEN, STREAMING_ENDPOINT
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -56,6 +56,13 @@ class TraceStreamer:
self._health_thread = None self._health_thread = None
self._server_available = True # Désactivé après trop d'échecs self._server_available = True # Désactivé après trop d'échecs
@staticmethod
def _auth_headers() -> dict:
"""Headers d'authentification Bearer pour les requêtes API."""
if API_TOKEN:
return {"Authorization": f"Bearer {API_TOKEN}"}
return {}
def start(self): def start(self):
"""Démarrer le streaming et enregistrer la session côté serveur.""" """Démarrer le streaming et enregistrer la session côté serveur."""
self.running = True self.running = True
@@ -240,6 +247,7 @@ class TraceStreamer:
try: try:
resp = requests.get( resp = requests.get(
f"{STREAMING_ENDPOINT}/stats", f"{STREAMING_ENDPOINT}/stats",
headers=self._auth_headers(),
timeout=3, timeout=3,
) )
if resp.ok: if resp.ok:
@@ -292,6 +300,7 @@ class TraceStreamer:
"session_id": self.session_id, "session_id": self.session_id,
"machine_id": self.machine_id, "machine_id": self.machine_id,
}, },
headers=self._auth_headers(),
timeout=3, timeout=3,
) )
if resp.ok: if resp.ok:
@@ -319,6 +328,7 @@ class TraceStreamer:
"session_id": self.session_id, "session_id": self.session_id,
"machine_id": self.machine_id, "machine_id": self.machine_id,
}, },
headers=self._auth_headers(),
timeout=30, # Le build workflow peut prendre du temps timeout=30, # Le build workflow peut prendre du temps
) )
if resp.ok: if resp.ok:
@@ -343,6 +353,7 @@ class TraceStreamer:
resp = requests.post( resp = requests.post(
f"{STREAMING_ENDPOINT}/event", f"{STREAMING_ENDPOINT}/event",
json=payload, json=payload,
headers=self._auth_headers(),
timeout=2, timeout=2,
) )
return resp.ok return resp.ok
@@ -377,6 +388,7 @@ class TraceStreamer:
f"{STREAMING_ENDPOINT}/image", f"{STREAMING_ENDPOINT}/image",
files=files, files=files,
params=params, params=params,
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
return resp.ok return resp.ok
@@ -390,6 +402,7 @@ class TraceStreamer:
f"{STREAMING_ENDPOINT}/image", f"{STREAMING_ENDPOINT}/image",
files=files, files=files,
params=params, params=params,
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
return resp.ok return resp.ok

View File

@@ -367,9 +367,14 @@ class SmartTrayV1:
try: try:
import requests import requests
# Auth headers pour le streaming server (port 5005)
auth_headers = {}
if self.server_client is not None:
auth_headers = self.server_client._auth_headers()
resp = requests.post( resp = requests.post(
f"{self.server_client._stream_base}/api/v1/traces/stream/replay/start", f"{self.server_client._stream_base}/api/v1/traces/stream/replay/start",
json={"workflow_id": workflow_id}, json={"workflow_id": workflow_id},
headers=auth_headers,
timeout=10, timeout=10,
) )
if resp.ok: if resp.ok:

View File

@@ -91,11 +91,24 @@ class LeaServerClient:
# Session de chat # Session de chat
self._chat_session_id: Optional[str] = None self._chat_session_id: Optional[str] = None
# Token API pour le serveur streaming (auth Bearer)
self._api_token = os.environ.get("RPA_API_TOKEN", "")
logger.info( logger.info(
"LeaServerClient initialise : chat=%s, stream=%s", "LeaServerClient initialise : chat=%s, stream=%s",
self._chat_base, self._stream_base, self._chat_base, self._stream_base,
) )
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
"""Headers d'authentification pour le serveur streaming."""
if self._api_token:
return {"Authorization": f"Bearer {self._api_token}"}
return {}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Proprietes # Proprietes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -133,11 +146,12 @@ class LeaServerClient:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def check_connection(self) -> bool: def check_connection(self) -> bool:
"""Tester la connexion au serveur chat.""" """Tester la connexion au serveur streaming (port 5005)."""
try: try:
import requests import requests
resp = requests.get( resp = requests.get(
f"{self._chat_base}/api/status", f"{self._stream_base}/health",
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
was_connected = self._connected was_connected = self._connected
@@ -200,16 +214,21 @@ class LeaServerClient:
return None return None
def list_workflows(self) -> List[Dict[str, Any]]: def list_workflows(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des workflows depuis le serveur chat.""" """Recuperer la liste des workflows depuis le serveur streaming."""
try: try:
import requests import requests
headers = self._auth_headers()
resp = requests.get( resp = requests.get(
f"{self._chat_base}/api/workflows", f"{self._stream_base}/api/v1/traces/stream/workflows",
headers=headers,
timeout=10, timeout=10,
) )
if resp.ok: if resp.ok:
data = resp.json() data = resp.json()
self._connected = True self._connected = True
# L'API renvoie directement une liste ou un dict avec clé "workflows"
if isinstance(data, list):
return data
return data.get("workflows", []) return data.get("workflows", [])
return [] return []
except Exception as e: except Exception as e:
@@ -218,20 +237,10 @@ class LeaServerClient:
return [] return []
def list_gestures(self) -> List[Dict[str, Any]]: def list_gestures(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des gestes depuis le serveur chat.""" """Recuperer la liste des gestes (non disponible sur streaming server)."""
try: # Les gestes etaient sur le chat server (5004) qui n'est plus utilise.
import requests # Retourner une liste vide silencieusement.
resp = requests.get( return []
f"{self._chat_base}/api/workflows",
timeout=10,
)
if resp.ok:
data = resp.json()
return data.get("workflows", [])
return []
except Exception as e:
logger.error("List gestures erreur : %s", e)
return []
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Replay Polling (port 5005) # Replay Polling (port 5005)
@@ -269,6 +278,7 @@ class LeaServerClient:
resp = req_lib.get( resp = req_lib.get(
f"{self._stream_base}/api/v1/traces/stream/replay/next", f"{self._stream_base}/api/v1/traces/stream/replay/next",
params={"session_id": self._poll_session_id}, params={"session_id": self._poll_session_id},
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
@@ -301,6 +311,7 @@ class LeaServerClient:
import requests import requests
resp = requests.get( resp = requests.get(
f"{self._stream_base}/api/v1/traces/stream/replays", f"{self._stream_base}/api/v1/traces/stream/replays",
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
if resp.ok: if resp.ok:
@@ -335,6 +346,7 @@ class LeaServerClient:
"error": error, "error": error,
"screenshot": screenshot, "screenshot": screenshot,
}, },
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
except Exception as e: except Exception as e:

View File

@@ -91,11 +91,24 @@ class LeaServerClient:
# Session de chat # Session de chat
self._chat_session_id: Optional[str] = None self._chat_session_id: Optional[str] = None
# Token API pour le serveur streaming (auth Bearer)
self._api_token = os.environ.get("RPA_API_TOKEN", "")
logger.info( logger.info(
"LeaServerClient initialise : chat=%s, stream=%s", "LeaServerClient initialise : chat=%s, stream=%s",
self._chat_base, self._stream_base, self._chat_base, self._stream_base,
) )
# ---------------------------------------------------------------------------
# Auth
# ---------------------------------------------------------------------------
def _auth_headers(self) -> Dict[str, str]:
"""Headers d'authentification pour le serveur streaming."""
if self._api_token:
return {"Authorization": f"Bearer {self._api_token}"}
return {}
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Proprietes # Proprietes
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@@ -133,11 +146,12 @@ class LeaServerClient:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
def check_connection(self) -> bool: def check_connection(self) -> bool:
"""Tester la connexion au serveur chat.""" """Tester la connexion au serveur streaming (port 5005)."""
try: try:
import requests import requests
resp = requests.get( resp = requests.get(
f"{self._chat_base}/api/workflows", f"{self._stream_base}/health",
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
was_connected = self._connected was_connected = self._connected
@@ -200,11 +214,13 @@ class LeaServerClient:
return None return None
def list_workflows(self) -> List[Dict[str, Any]]: def list_workflows(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des workflows depuis le serveur chat.""" """Recuperer la liste des workflows depuis le serveur streaming."""
try: try:
import requests import requests
headers = self._auth_headers()
resp = requests.get( resp = requests.get(
f"{self._chat_base}/api/workflows", f"{self._stream_base}/api/v1/traces/stream/workflows",
headers=headers,
timeout=10, timeout=10,
) )
if resp.ok: if resp.ok:
@@ -221,22 +237,10 @@ class LeaServerClient:
return [] return []
def list_gestures(self) -> List[Dict[str, Any]]: def list_gestures(self) -> List[Dict[str, Any]]:
"""Recuperer la liste des gestes depuis le serveur chat.""" """Recuperer la liste des gestes (non disponible sur streaming server)."""
try: # Les gestes etaient sur le chat server (5004) qui n'est plus utilise.
import requests # Retourner une liste vide silencieusement.
resp = requests.get( return []
f"{self._chat_base}/api/gestures",
timeout=10,
)
if resp.ok:
data = resp.json()
if isinstance(data, list):
return data
return data.get("gestures", [])
return []
except Exception as e:
logger.error("List gestures erreur : %s", e)
return []
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Replay Polling (port 5005) # Replay Polling (port 5005)
@@ -274,6 +278,7 @@ class LeaServerClient:
resp = req_lib.get( resp = req_lib.get(
f"{self._stream_base}/api/v1/traces/stream/replay/next", f"{self._stream_base}/api/v1/traces/stream/replay/next",
params={"session_id": self._poll_session_id}, params={"session_id": self._poll_session_id},
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
@@ -306,6 +311,7 @@ class LeaServerClient:
import requests import requests
resp = requests.get( resp = requests.get(
f"{self._stream_base}/api/v1/traces/stream/replays", f"{self._stream_base}/api/v1/traces/stream/replays",
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
if resp.ok: if resp.ok:
@@ -340,6 +346,7 @@ class LeaServerClient:
"error": error, "error": error,
"screenshot": screenshot, "screenshot": screenshot,
}, },
headers=self._auth_headers(),
timeout=5, timeout=5,
) )
except Exception as e: except Exception as e:

View File

@@ -78,7 +78,14 @@ API_TOKEN = os.environ.get("RPA_API_TOKEN", secrets.token_hex(32))
# Endpoints publics (pas besoin de token) # Endpoints publics (pas besoin de token)
# En production, /docs et /redoc sont désactivés (voir ci-dessous) # En production, /docs et /redoc sont désactivés (voir ci-dessous)
_PUBLIC_PATHS = {"/health", "/docs", "/openapi.json", "/redoc"} # Paths publics : pas de token requis
# /replay/next est public car l'agent Rust legacy n'envoie pas de token
# et c'est un endpoint read-only (polling, pas d'écriture)
_PUBLIC_PATHS = {
"/health", "/docs", "/openapi.json", "/redoc",
"/api/v1/traces/stream/replay/next",
"/api/v1/traces/stream/image",
}
async def _verify_token(request: Request): async def _verify_token(request: Request):
@@ -3315,6 +3322,286 @@ def _vlm_quick_find(
return None return None
# ---------------------------------------------------------------------------
# Résolution Set-of-Mark : SomEngine (détection) + VLM (identification)
# ---------------------------------------------------------------------------
_som_engine_api = None # Singleton
def _get_som_engine_api():
"""Singleton SomEngine pour la résolution visuelle (lazy-loaded, GPU)."""
global _som_engine_api
if _som_engine_api is None:
try:
from core.detection.som_engine import SomEngine
_som_engine_api = SomEngine(device="cuda")
logger.info("SomEngine API initialisé (lazy singleton)")
except Exception as e:
logger.warning("SomEngine API non disponible : %s", e)
_som_engine_api = False
return _som_engine_api if _som_engine_api is not False else None
def _resolve_by_som(
screenshot_path: str,
target_spec: Dict[str, Any],
screen_width: int,
screen_height: int,
) -> Optional[Dict[str, Any]]:
"""Résoudre une cible UI via Set-of-Mark + VLM.
Pipeline :
1. SomEngine détecte tous les éléments et les numérote sur le screenshot
2. VLM reçoit l'image annotée + description de la cible
3. VLM identifie le numéro du mark → coordonnées précises
Avantages vs VLM direct :
- Le VLM n'a qu'à identifier (son point fort), pas localiser
- Les coordonnées viennent de SomEngine (pixel-perfect)
- Question simple "quel numéro ?" → réponse simple
Args:
screenshot_path: Chemin du screenshot actuel
target_spec: Spécification de la cible (vlm_description, som_element, etc.)
screen_width: Largeur écran en pixels
screen_height: Hauteur écran en pixels
Returns:
Dict avec resolved=True et coordonnées, ou None si indisponible.
"""
engine = _get_som_engine_api()
if engine is None:
return None
client = _get_vlm_client()
if client is None:
return None
t0 = time.time()
# ── 1. Lancer SomEngine sur le screenshot actuel ──
try:
from PIL import Image as PILImage
img = PILImage.open(screenshot_path).convert("RGB")
som_result = engine.analyze(img)
except Exception as e:
logger.warning("SoM resolve : erreur analyse — %s", e)
return None
if not som_result.elements:
logger.info("SoM resolve : 0 éléments détectés")
return None
# ── 2. Construire la description de la cible ──
som_element = target_spec.get("som_element", {})
vlm_description = target_spec.get("vlm_description", "")
anchor_label = som_element.get("label", "")
# Construire un prompt riche
target_parts = []
if anchor_label:
target_parts.append(f"texte '{anchor_label}'")
if vlm_description:
target_parts.append(vlm_description)
if not target_parts:
# Sans description, SoM resolve ne peut pas fonctionner
logger.debug("SoM resolve : pas de description pour identifier l'élément")
return None
target_desc = ", ".join(target_parts)
# ── 2.5. Raccourci : si le label est connu, chercher par texte directement ──
# Pas besoin du VLM si on connaît le texte exact de l'élément !
if anchor_label and len(anchor_label) >= 2:
label_lower = anchor_label.lower()
# Match exact d'abord, puis partiel
exact_matches = [
e for e in som_result.elements
if e.label and e.label.lower() == label_lower
]
if not exact_matches:
exact_matches = [
e for e in som_result.elements
if e.label and (
label_lower in e.label.lower()
or e.label.lower() in label_lower
)
]
if len(exact_matches) == 1:
# Match unique par texte → pas besoin du VLM
elem = exact_matches[0]
elapsed = time.time() - t0
cx_norm, cy_norm = elem.center_norm
logger.info(
"SoM resolve FAST : match texte unique '#%d %s' → (%.4f, %.4f) en %.1fs",
elem.id, elem.label, cx_norm, cy_norm, elapsed,
)
return {
"resolved": True,
"method": "som_text_match",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": elem.label,
"type": elem.source,
"role": "som_text_match",
"confidence": max(elem.confidence, 0.85),
"som_id": elem.id,
},
"score": max(elem.confidence, 0.85),
}
elif len(exact_matches) > 1:
# Plusieurs matchs texte → disambiguïser par proximité à la position originale
ref_center = som_element.get("center_norm", [])
if ref_center and len(ref_center) == 2:
ref_x, ref_y = ref_center
best = min(
exact_matches,
key=lambda e: (
(e.center_norm[0] - ref_x) ** 2
+ (e.center_norm[1] - ref_y) ** 2
),
)
elapsed = time.time() - t0
cx_norm, cy_norm = best.center_norm
dist = ((cx_norm - ref_x) ** 2 + (cy_norm - ref_y) ** 2) ** 0.5
if dist < 0.15: # Tolérance 15% de l'écran
logger.info(
"SoM resolve FAST : match texte proximité '#%d %s' (dist=%.3f) "
"→ (%.4f, %.4f) en %.1fs",
best.id, best.label, dist, cx_norm, cy_norm, elapsed,
)
return {
"resolved": True,
"method": "som_text_match",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": best.label,
"type": best.source,
"role": "som_text_match_proximity",
"confidence": max(best.confidence, 0.80),
"som_id": best.id,
},
"score": max(best.confidence, 0.80),
}
logger.info(
"SoM resolve : %d matchs texte pour '%s', VLM nécessaire",
len(exact_matches), anchor_label,
)
# ── 3. Sauvegarder l'image annotée SoM temporairement ──
import tempfile
try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
som_result.som_image.save(tmp, format="JPEG", quality=85)
som_img_path = tmp.name
except Exception as e:
logger.warning("SoM resolve : erreur sauvegarde image annotée — %s", e)
return None
# ── 4. VLM : identifier le numéro du mark ──
# Lister uniquement les éléments avec un label (plus concis pour le VLM)
labeled_elements = [e for e in som_result.elements if e.label][:30]
elements_list = "\n".join(
f" #{e.id}: '{e.label}'"
for e in labeled_elements
)
prompt = (
f"I'm looking for: {target_desc}\n\n"
f"Here are the numbered elements detected on screen:\n{elements_list}\n\n"
"Which number is the correct element?\n"
'Answer with JSON only: {"mark_id": N, "confidence": 0.9}'
)
system_prompt = "You identify UI elements by number. Output JSON only, no explanation."
try:
result = client.generate(
prompt=prompt,
image_path=som_img_path,
system_prompt=system_prompt,
temperature=0.1,
max_tokens=50,
force_json=False,
)
except Exception as e:
logger.warning("SoM resolve : erreur VLM — %s", e)
return None
finally:
import os
try:
os.unlink(som_img_path)
except OSError:
pass
elapsed = time.time() - t0
if not result.get("success"):
logger.info("SoM resolve : VLM échoué (%.1fs)", elapsed)
return None
# ── 5. Parser la réponse et retourner les coordonnées ──
response_text = result.get("response", "").strip()
# Tenter d'abord l'extraction JSON standard
parsed = client._extract_json_from_response(response_text)
# Fallback : extraire un nombre simple de la réponse
if parsed is None:
import re
numbers = re.findall(r'\b(\d+)\b', response_text)
if numbers:
candidate = int(numbers[0])
if som_result.get_element_by_id(candidate) is not None:
parsed = {"mark_id": candidate, "confidence": 0.7}
logger.debug("SoM resolve : extraction numéro fallback → #%d", candidate)
if parsed is None:
logger.info("SoM resolve : réponse non-JSON (%.1fs) — %.80s", elapsed, response_text)
return None
mark_id = parsed.get("mark_id")
confidence = float(parsed.get("confidence", 0.0))
if mark_id is None or confidence < 0.3:
logger.info(
"SoM resolve : mark non trouvé ou confiance trop basse (mark=%s, conf=%.2f, %.1fs)",
mark_id, confidence, elapsed,
)
return None
mark_id = int(mark_id)
elem = som_result.get_element_by_id(mark_id)
if elem is None:
logger.warning("SoM resolve : mark #%d inexistant (%.1fs)", mark_id, elapsed)
return None
cx_norm, cy_norm = elem.center_norm
logger.info(
"SoM resolve OK : mark #%d '%s' → (%.4f, %.4f) conf=%.2f en %.1fs (%d éléments)",
mark_id, elem.label, cx_norm, cy_norm, confidence, elapsed, len(som_result.elements),
)
return {
"resolved": True,
"method": "som_vlm",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": elem.label or f"mark #{mark_id}",
"type": elem.source,
"role": "som_identified",
"confidence": confidence,
"som_id": mark_id,
},
"score": confidence,
}
def _resolve_target_sync( def _resolve_target_sync(
screenshot_path: str, screenshot_path: str,
target_spec: Dict[str, Any], target_spec: Dict[str, Any],
@@ -3329,6 +3616,7 @@ def _resolve_target_sync(
Hiérarchie de résolution (strict_mode=True, replay sessions) — VLM-FIRST : Hiérarchie de résolution (strict_mode=True, replay sessions) — VLM-FIRST :
1. VLM Quick Find (~3-8s) — compréhension sémantique de l'écran, multi-image 1. VLM Quick Find (~3-8s) — compréhension sémantique de l'écran, multi-image
(screenshot + crop de référence + description riche) (screenshot + crop de référence + description riche)
1.5. SoM + VLM (~5-15s) — SomEngine numérote les éléments, VLM identifie le bon
2. Template matching OpenCV (~100ms) — fallback pixel, seuil STRICT 0.90 2. Template matching OpenCV (~100ms) — fallback pixel, seuil STRICT 0.90
3. resolved=False → STOP le replay 3. resolved=False → STOP le replay
@@ -3387,6 +3675,30 @@ def _resolve_target_sync(
vlm_description[:60] if vlm_description else "(anchor)", vlm_description[:60] if vlm_description else "(anchor)",
) )
# ---------------------------------------------------------------
# Étape 1.5 : SoM + VLM (Set-of-Mark + identification)
# SomEngine numérote les éléments, VLM identifie le bon numéro.
# Plus fiable que le VLM direct car le VLM n'a qu'à identifier,
# pas localiser — et les coordonnées sont pixel-perfect.
# ---------------------------------------------------------------
som_element = target_spec.get("som_element", {})
if som_element or vlm_description:
som_result = _resolve_by_som(
screenshot_path=screenshot_path,
target_spec=target_spec,
screen_width=screen_width,
screen_height=screen_height,
)
if som_result and som_result.get("resolved"):
logger.info(
"Strict resolve SoM+VLM : OK (score=%.2f, mark=#%s)",
som_result.get("score", 0),
som_result.get("matched_element", {}).get("som_id", "?"),
)
return som_result
else:
logger.info("Strict resolve SoM+VLM : échoué, passage template matching")
# --------------------------------------------------------------- # ---------------------------------------------------------------
# Étape 2 : Template matching (fallback pixel) — seuil STRICT 0.90 # Étape 2 : Template matching (fallback pixel) — seuil STRICT 0.90
# --------------------------------------------------------------- # ---------------------------------------------------------------

View File

@@ -31,12 +31,19 @@ logger = logging.getLogger(__name__)
_MODIFIER_ONLY_KEYS = { _MODIFIER_ONLY_KEYS = {
"ctrl", "ctrl_l", "ctrl_r", "control", "control_l", "control_r", "ctrl", "ctrl_l", "ctrl_r", "control", "control_l", "control_r",
"alt", "alt_l", "alt_r", "alt", "alt_l", "alt_r", "alt_gr",
"shift", "shift_l", "shift_r", "shift", "shift_l", "shift_r",
"win", "win_l", "win_r", "cmd", "cmd_l", "cmd_r", "win", "win_l", "win_r", "cmd", "cmd_l", "cmd_r",
"meta", "meta_l", "meta_r", "super", "super_l", "super_r", "meta", "meta_l", "meta_r", "super", "super_l", "super_r",
} }
# Mapping numpad vk codes → caractères (layout-indépendant)
_NUMPAD_VK_MAP = {
96: '0', 97: '1', 98: '2', 99: '3', 100: '4',
101: '5', 102: '6', 103: '7', 104: '8', 105: '9',
106: '*', 107: '+', 109: '-', 110: '.', 111: '/',
}
# Table de conversion des caractères de contrôle vers les touches lisibles # Table de conversion des caractères de contrôle vers les touches lisibles
# (produits par certains agents qui capturent les raw keycodes) # (produits par certains agents qui capturent les raw keycodes)
_CONTROL_CHAR_MAP = { _CONTROL_CHAR_MAP = {
@@ -98,6 +105,72 @@ def _is_parasitic_event(event_data: Dict[str, Any]) -> bool:
return False return False
def _reconstruct_text_from_raw_keys(raw_keys: list) -> str:
"""Reconstruire le texte correct à partir des vk codes des raw_keys.
Corrige les problèmes de capture AZERTY, notamment :
- Numpad / (vk=111) capturé comme char='!' → corrigé en '/'
- Numpad 0-9 (vk=96-105) capturés comme char=None → corrigés en '0'-'9'
"""
text_parts = []
for event in raw_keys:
if event.get("action") != "press":
continue
vk = event.get("vk", 0)
char = event.get("char")
kind = event.get("kind", "")
name = event.get("name", "")
# Ignorer les modificateurs (releases qui traînent dans le buffer)
if kind == "key" and name in _MODIFIER_ONLY_KEYS:
continue
# Numpad : mapping fixe (layout-indépendant)
if vk in _NUMPAD_VK_MAP:
text_parts.append(_NUMPAD_VK_MAP[vk])
# Touche normale avec caractère valide
elif char and len(char) == 1 and char.isprintable():
text_parts.append(char)
return "".join(text_parts)
def _key_combo_printable_char(keys: list) -> Optional[str]:
"""Si le key_combo produit un seul caractère imprimable, le retourner.
Exemples :
- ['ctrl', '@'] → '@' (AltGr+0 sur AZERTY, capturé comme ctrl+@)
- ['shift', 'A'] → 'A'
- ['ctrl', 'c'] → None (c'est un raccourci, pas un caractère)
- ['enter'] → None (pas un caractère imprimable)
"""
if not keys:
return None
non_modifiers = [k for k in keys if k.lower() not in _MODIFIER_ONLY_KEYS]
if len(non_modifiers) != 1:
return None
char = non_modifiers[0]
# Un seul caractère imprimable (pas un nom de touche spéciale)
if len(char) == 1 and char.isprintable():
# Vérifier que c'est pas un raccourci courant (ctrl+c, ctrl+v, etc.)
modifiers = {k.lower() for k in keys if k.lower() in _MODIFIER_ONLY_KEYS}
if modifiers <= {"shift", "shift_l", "shift_r"}:
# Shift + char = caractère majuscule/spécial → OK
return char
if "alt_gr" in modifiers or (
"ctrl" in modifiers and ("alt" in modifiers or "alt_r" in modifiers)
):
# AltGr + char = caractère spécial (@ # € etc.) → OK
return char
# Ctrl + caractère NON-alphabétique = probablement AltGr résiduel
# Sur AZERTY, AltGr+0 produit @, capturé comme ['ctrl', 'alt_gr'] + ['ctrl', '@']
# Le premier combo est filtré (modifier-only), le second a juste 'ctrl' + '@'
if "ctrl" in modifiers and not char.isalpha():
return char
# ctrl + lettre seul = raccourci (Ctrl+S, Ctrl+C) → pas un caractère
return None
return None
def _merge_consecutive_text_inputs(steps: list) -> list: def _merge_consecutive_text_inputs(steps: list) -> list:
"""Fusionne les text_input consécutifs en un seul.""" """Fusionne les text_input consécutifs en un seul."""
merged = [] merged = []
@@ -354,6 +427,111 @@ def _needs_post_wait(action: dict) -> int:
return 0 return 0
# ---------------------------------------------------------------------------
# SomEngine — enrichissement Set-of-Mark des clics pendant le build_replay
# ---------------------------------------------------------------------------
_som_engine = None # Singleton, chargé à la demande
def _get_som_engine():
"""Singleton SomEngine (lazy-loaded, GPU)."""
global _som_engine
if _som_engine is None:
try:
from core.detection.som_engine import SomEngine
_som_engine = SomEngine(device="cuda")
logger.info("SomEngine initialisé (lazy singleton)")
except Exception as e:
logger.warning("SomEngine non disponible : %s", e)
_som_engine = False # Marqueur "indisponible"
return _som_engine if _som_engine is not False else None
def _som_identify_clicked_element(
event_data: dict,
session_dir: Optional[Path],
screen_w: int,
screen_h: int,
) -> Optional[dict]:
"""Identifier l'élément UI cliqué via SomEngine (YOLO + docTR).
Charge le full screenshot de l'événement, lance SomEngine pour détecter
tous les éléments, puis identifie celui qui se trouve sous le clic.
Returns:
Dict avec id, label, source, bbox_norm, center_norm, confidence
ou None si SomEngine indisponible ou élément non trouvé.
"""
engine = _get_som_engine()
if engine is None:
return None
if not session_dir:
return None
shots_dir = session_dir / "shots"
if not shots_dir.is_dir():
return None
# Trouver le full screenshot
screenshot_id = event_data.get("screenshot_id", "")
if not screenshot_id:
return None
full_path = shots_dir / f"{screenshot_id}_full.png"
if not full_path.is_file():
# Fallback : essayer sans le suffixe _full
full_path = shots_dir / f"{screenshot_id}.png"
if not full_path.is_file():
return None
try:
from PIL import Image
img = Image.open(full_path).convert("RGB")
except Exception as e:
logger.debug("SoM: impossible de charger %s : %s", full_path, e)
return None
# Lancer SomEngine
try:
result = engine.analyze(img)
except Exception as e:
logger.warning("SoM: erreur d'analyse : %s", e)
return None
if not result.elements:
return None
# Trouver l'élément cliqué
pos = event_data.get("pos", [])
if not pos or len(pos) < 2:
return None
click_x, click_y = int(pos[0]), int(pos[1])
elem = result.find_element_at(click_x, click_y, margin=30)
if elem is None:
logger.debug(
"SoM: aucun élément trouvé au clic (%d, %d) parmi %d éléments",
click_x, click_y, len(result.elements),
)
return None
logger.info(
"SoM: clic (%d,%d) → élément #%d '%s' (source=%s, conf=%.2f)",
click_x, click_y, elem.id, elem.label, elem.source, elem.confidence,
)
return {
"id": elem.id,
"label": elem.label,
"source": elem.source,
"bbox_norm": list(elem.bbox_norm),
"center_norm": list(elem.center_norm),
"confidence": elem.confidence,
"element_count": len(result.elements),
}
def _load_crop_for_event( def _load_crop_for_event(
event_data: dict, event_data: dict,
session_dir: Optional[Path], session_dir: Optional[Path],
@@ -481,6 +659,76 @@ def _load_crop_for_event(
return None return None
def _attach_expected_screenshots(
actions: list, raw_events: list, session_dir: Path,
) -> None:
"""Attacher les screenshots de référence (résultat attendu) aux actions.
Pour chaque action de type click ou key_combo, cherche le screenshot
res_shot_XXXX.png (capturé 1s après l'action pendant l'enregistrement)
et l'attache comme expected_screenshot_b64.
Le screenshot est compressé en JPEG qualité 40 (~30-50 KB en b64)
pour limiter le poids de chaque action.
"""
import base64
from PIL import Image as _Image
shots_dir = session_dir / "shots"
if not shots_dir.is_dir():
return
# Mapper les screenshot_id des événements originaux aux actions
# Les événements click/key_combo ont un "screenshot_id" (ex: "shot_0003")
# Le screenshot résultat est "res_shot_0003.png"
action_idx = 0
for raw_evt in raw_events:
event_data = raw_evt.get("event", raw_evt)
screenshot_id = event_data.get("screenshot_id", "")
if not screenshot_id:
continue
evt_type = event_data.get("type", "")
if evt_type not in ("mouse_click", "key_combo", "key_press"):
continue
# Trouver l'action correspondante (même type, index croissant)
while action_idx < len(actions):
a = actions[action_idx]
a_type = a.get("type", "")
if a_type in ("click", "key_combo"):
break
action_idx += 1
else:
break # Plus d'actions
# Charger le screenshot résultat
res_file = shots_dir / f"res_{screenshot_id}.png"
if not res_file.is_file():
action_idx += 1
continue
try:
img = _Image.open(res_file)
# Redimensionner pour réduire le poids (800px de large)
if img.width > 800:
ratio = 800 / img.width
img = img.resize((800, int(img.height * ratio)), _Image.LANCZOS)
import io
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=40)
b64 = base64.b64encode(buf.getvalue()).decode()
actions[action_idx]["expected_screenshot_b64"] = b64
logger.debug(
"Screenshot de référence attaché à action %d : %s (%d KB)",
action_idx, res_file.name, len(b64) // 1024,
)
except Exception as e:
logger.debug("Erreur chargement screenshot ref %s : %s", res_file, e)
action_idx += 1
def build_replay_from_raw_events( def build_replay_from_raw_events(
events: list, events: list,
session_id: str = "", session_id: str = "",
@@ -577,12 +825,26 @@ def build_replay_from_raw_events(
# Tous les text_input consécutifs sont fusionnés en un seul, indépendamment # Tous les text_input consécutifs sont fusionnés en un seul, indépendamment
# du gap temporel. L'utilisateur tape lettre par lettre mais on veut un # du gap temporel. L'utilisateur tape lettre par lettre mais on veut un
# seul "type" avec tout le texte dans le replay. # seul "type" avec tout le texte dans le replay.
# Les key_combos qui produisent un caractère imprimable (ex: AltGr+0 → @)
# sont convertis en text_input pour être fusionnés avec le texte adjacent.
# Seul un changement de fenêtre (window_title différent) coupe la fusion. # Seul un changement de fenêtre (window_title différent) coupe la fusion.
merged_events = [] merged_events = []
for evt in actionable_events: for evt in actionable_events:
evt_type = evt.get("type", "") evt_type = evt.get("type", "")
evt_ts = float(evt.get("timestamp", 0)) evt_ts = float(evt.get("timestamp", 0))
# Convertir les key_combos qui produisent un caractère imprimable
# en text_input pour qu'ils soient fusionnés avec le texte adjacent.
# Ex: AltGr+0 capturé comme ['ctrl', '@'] → text_input '@'
if evt_type in ("key_combo", "key_press"):
keys = _sanitize_keys(evt.get("keys", []))
printable = _key_combo_printable_char(keys)
if printable:
# Transformer en text_input pour fusion
evt = dict(evt, type="text_input", text=printable)
evt_type = "text_input"
# Pas de raw_keys pour ce caractère (sera collé via clipboard)
if evt_type == "text_input": if evt_type == "text_input":
text = evt.get("text", "") text = evt.get("text", "")
if not text: if not text:
@@ -624,6 +886,34 @@ def build_replay_from_raw_events(
else: else:
merged_events.append(dict(evt)) merged_events.append(dict(evt))
# ── 3b. Reconstruire le texte correct depuis les raw_keys ──
# Les raw_keys contiennent les vk codes exacts (layout-indépendant)
# qui permettent de corriger les erreurs de capture AZERTY
# (ex: numpad / capturé comme '!' → corrigé en '/')
# ATTENTION : ne reconstruire QUE si le texte reconstruit a la même
# longueur que le texte original. Si des caractères viennent de
# key_combos convertis (ex: @ de AltGr), ils n'ont pas de raw_keys
# et la reconstruction les perdrait.
for evt in merged_events:
if evt.get("type") == "text_input" and evt.get("raw_keys"):
reconstructed = _reconstruct_text_from_raw_keys(evt["raw_keys"])
original = evt.get("text", "")
if reconstructed and len(reconstructed) == len(original):
# Même longueur → remplacement sûr (corrige les chars numpad)
evt["text"] = reconstructed
if reconstructed != original:
logger.debug(
"Texte reconstruit depuis raw_keys : '%s''%s'",
original[:50], reconstructed[:50],
)
elif reconstructed and len(reconstructed) < len(original):
# Longueur différente → des chars viennent de key_combos convertis
# Garder le texte original (qui inclut les chars fusionnés)
logger.debug(
"Texte non reconstruit (longueur diff) : '%s' (%d) vs '%s' (%d)",
original[:50], len(original), reconstructed[:50], len(reconstructed),
)
# ── 4. Convertir en actions replay normalisées ── # ── 4. Convertir en actions replay normalisées ──
actions = [] actions = []
last_ts = 0.0 last_ts = 0.0
@@ -729,9 +1019,22 @@ def build_replay_from_raw_events(
"y_relative": y_relative, "y_relative": y_relative,
}, },
} }
# Propager les infos textuelles pour compatibilité # NE PAS mettre window_title comme by_text !
if window_title: # by_text doit être le texte de l'ÉLÉMENT cliqué, pas le titre de la fenêtre.
action["target_spec"]["by_text"] = window_title # Sinon le template matching texte cherche "13071967.txt Bloc-notes"
# sur l'écran et clique sur la barre de titre au lieu du bon élément.
# ── SomEngine : identifier l'élément cliqué ──
som_elem = _som_identify_clicked_element(
evt, session_dir_path, screen_w, screen_h,
)
if som_elem:
action["target_spec"]["som_element"] = som_elem
# Enrichir la description VLM avec le label SoM
if som_elem.get("label") and not vision_info.get("text"):
action["target_spec"]["vlm_description"] += (
f", le texte de l'élément est '{som_elem['label']}'"
)
elif evt_type == "text_input": elif evt_type == "text_input":
text = evt.get("text", "") text = evt.get("text", "")
@@ -801,16 +1104,23 @@ def build_replay_from_raw_events(
continue continue
result.append(a) result.append(a)
# ── 8. Attacher les screenshots de référence (état attendu après action) ──
# Les screenshots res_shot_XXXX.png capturés 1s après chaque action pendant
# l'enregistrement servent de référence pour le contrôle visuel.
if session_dir_path:
_attach_expected_screenshots(result, events, session_dir_path)
# Stats visual replay # Stats visual replay
visual_clicks = sum( visual_clicks = sum(
1 for a in result 1 for a in result
if a.get("type") == "click" and a.get("visual_mode") if a.get("type") == "click" and a.get("visual_mode")
) )
total_clicks = sum(1 for a in result if a.get("type") == "click") total_clicks = sum(1 for a in result if a.get("type") == "click")
verified_count = sum(1 for a in result if a.get("expected_screenshot_b64"))
logger.info( logger.info(
"build_replay_from_raw_events(%s) : %d actions propres produites " "build_replay_from_raw_events(%s) : %d actions propres produites "
"(%d/%d clics avec visual_mode)", "(%d/%d clics avec visual_mode, %d avec screenshot de référence)",
session_id, len(result), visual_clicks, total_clicks, session_id, len(result), visual_clicks, total_clicks, verified_count,
) )
return result return result

View File

@@ -0,0 +1,290 @@
"""
Set-of-Mark Engine — Détection et numérotation des éléments UI.
Pipeline : YOLO (détection icônes) + docTR (OCR) + numérotation visuelle.
Le VLM (qwen3-vl) est utilisé ensuite pour l'identification sémantique.
Usage :
from core.detection.som_engine import SomEngine
engine = SomEngine()
result = engine.analyze(screenshot_pil)
# result.elements : liste d'éléments avec coordonnées
# result.som_image : screenshot avec numéros
# result.som_image_b64 : en base64
Architecture :
- YOLO v8 (icon_detect) : détecte les éléments interactifs (~15ms GPU)
- docTR : OCR pour lire le texte visible (~100ms GPU)
- Annotation : numérote chaque élément sur le screenshot
- Le VLM n'est PAS appelé ici (séparation détection/identification)
"""
from __future__ import annotations
import base64
import io
import logging
from dataclasses import dataclass, field
from pathlib import Path
from typing import List, Optional, Tuple
from PIL import Image, ImageDraw, ImageFont
logger = logging.getLogger(__name__)
# Chemin vers les poids YOLO d'OmniParser
_YOLO_WEIGHTS = Path("/home/dom/ai/OmniParser/weights/icon_detect/model.pt")
@dataclass
class SomElement:
"""Un élément UI détecté et numéroté."""
id: int # Numéro Set-of-Mark (0, 1, 2, ...)
bbox: Tuple[int, int, int, int] # Pixels (x1, y1, x2, y2)
bbox_norm: Tuple[float, float, float, float] # Normalisé 0-1
center: Tuple[int, int] # Centre en pixels
center_norm: Tuple[float, float] # Centre normalisé 0-1
source: str # 'yolo' ou 'ocr'
label: str = "" # Texte OCR ou description
confidence: float = 0.0
@dataclass
class SomResult:
"""Résultat de l'analyse Set-of-Mark."""
elements: List[SomElement] = field(default_factory=list)
som_image: Optional[Image.Image] = None # Screenshot annoté
som_image_b64: str = "" # En base64 (JPEG)
width: int = 0
height: int = 0
analysis_time_ms: float = 0.0
def find_element_at(self, x: int, y: int, margin: int = 20) -> Optional[SomElement]:
"""Trouver l'élément le plus proche d'un point (x, y)."""
best = None
best_dist = float("inf")
for elem in self.elements:
x1, y1, x2, y2 = elem.bbox
# Point dans la bbox ?
if x1 - margin <= x <= x2 + margin and y1 - margin <= y <= y2 + margin:
cx, cy = elem.center
dist = ((x - cx) ** 2 + (y - cy) ** 2) ** 0.5
if dist < best_dist:
best_dist = dist
best = elem
return best
def get_element_by_id(self, element_id: int) -> Optional[SomElement]:
"""Récupérer un élément par son numéro SoM."""
for elem in self.elements:
if elem.id == element_id:
return elem
return None
class SomEngine:
"""Moteur Set-of-Mark : YOLO + docTR + annotation."""
def __init__(self, device: str = "cuda"):
self._device = device
self._yolo = None
self._ocr = None
self._loaded = False
def _ensure_loaded(self):
"""Chargement paresseux des modèles."""
if self._loaded:
return
import time
t = time.time()
# YOLO
if _YOLO_WEIGHTS.is_file():
from ultralytics import YOLO
self._yolo = YOLO(str(_YOLO_WEIGHTS))
self._yolo.to(self._device)
logger.info("SoM: YOLO chargé sur %s", self._device)
else:
logger.warning("SoM: YOLO weights introuvable: %s", _YOLO_WEIGHTS)
# docTR
try:
from doctr.models import ocr_predictor
self._ocr = ocr_predictor(
det_arch="db_resnet50",
reco_arch="crnn_vgg16_bn",
pretrained=True,
)
if self._device == "cuda":
self._ocr = self._ocr.cuda()
logger.info("SoM: docTR chargé")
except Exception as e:
logger.warning("SoM: docTR non disponible: %s", e)
self._loaded = True
logger.info("SoM: modèles chargés en %.1fs", time.time() - t)
def analyze(self, screenshot: Image.Image) -> SomResult:
"""Analyser un screenshot : détecter tous les éléments et les numéroter.
Returns:
SomResult avec les éléments détectés et le screenshot annoté.
"""
import time
t_start = time.time()
self._ensure_loaded()
W, H = screenshot.size
elements: List[SomElement] = []
elem_id = 0
# ── 1. YOLO : détecter les éléments interactifs ──
if self._yolo is not None:
results = self._yolo.predict(
source=screenshot, conf=0.15, iou=0.5, verbose=False,
)
boxes = results[0].boxes if results else []
for box in boxes:
x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()]
conf = float(box.conf[0])
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
elements.append(SomElement(
id=elem_id,
bbox=(x1, y1, x2, y2),
bbox_norm=(x1 / W, y1 / H, x2 / W, y2 / H),
center=(cx, cy),
center_norm=(cx / W, cy / H),
source="yolo",
confidence=conf,
))
elem_id += 1
# ── 2. docTR : OCR pour lire le texte ──
if self._ocr is not None:
try:
import numpy as np
from doctr.io import DocumentFile
# Convertir PIL → fichier temporaire pour docTR
import tempfile
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
screenshot.save(tmp, format="JPEG", quality=90)
tmp_path = tmp.name
doc = DocumentFile.from_images([tmp_path])
import os
os.unlink(tmp_path)
result_ocr = self._ocr(doc)
for page in result_ocr.pages:
for block in page.blocks:
for line in block.lines:
for word in line.words:
text = word.value.strip()
if not text or len(text) < 2:
continue
# docTR retourne des coords normalisées (0-1)
(nx1, ny1), (nx2, ny2) = word.geometry
x1 = int(nx1 * W)
y1 = int(ny1 * H)
x2 = int(nx2 * W)
y2 = int(ny2 * H)
cx, cy = (x1 + x2) // 2, (y1 + y2) // 2
# Vérifier si ce texte chevauche un élément YOLO existant
overlaps = False
for existing in elements:
ex1, ey1, ex2, ey2 = existing.bbox
# IoU simple
ix1 = max(x1, ex1)
iy1 = max(y1, ey1)
ix2 = min(x2, ex2)
iy2 = min(y2, ey2)
if ix1 < ix2 and iy1 < iy2:
# Chevauchement → enrichir le label YOLO
if not existing.label:
existing.label = text
overlaps = True
break
if not overlaps:
elements.append(SomElement(
id=elem_id,
bbox=(x1, y1, x2, y2),
bbox_norm=(nx1, ny1, nx2, ny2),
center=(cx, cy),
center_norm=(cx / W, cy / H),
source="ocr",
label=text,
confidence=word.confidence,
))
elem_id += 1
except Exception as e:
logger.warning("SoM: Erreur OCR docTR: %s", e)
# ── 3. Annoter le screenshot avec les numéros ──
som_image = self._annotate(screenshot.copy(), elements)
som_b64 = self._image_to_b64(som_image)
elapsed_ms = (time.time() - t_start) * 1000
logger.info(
"SoM: %d éléments (%d yolo, %d ocr) en %.0fms",
len(elements),
sum(1 for e in elements if e.source == "yolo"),
sum(1 for e in elements if e.source == "ocr"),
elapsed_ms,
)
return SomResult(
elements=elements,
som_image=som_image,
som_image_b64=som_b64,
width=W,
height=H,
analysis_time_ms=elapsed_ms,
)
@staticmethod
def _annotate(image: Image.Image, elements: List[SomElement]) -> Image.Image:
"""Dessiner les numéros SoM sur le screenshot."""
draw = ImageDraw.Draw(image)
# Police
try:
font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 16)
font_small = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", 12)
except Exception:
font = ImageFont.load_default()
font_small = font
for elem in elements:
x1, y1, x2, y2 = elem.bbox
# Couleur selon la source
color = (255, 50, 50) if elem.source == "yolo" else (50, 50, 255)
# Boîte
draw.rectangle([x1, y1, x2, y2], outline=color, width=2)
# Numéro (badge rouge)
label = str(elem.id)
badge_w, badge_h = 24, 18
badge_x = max(0, x1 - 2)
badge_y = max(0, y1 - badge_h - 2)
draw.rectangle(
[badge_x, badge_y, badge_x + badge_w, badge_y + badge_h],
fill=(220, 30, 30),
)
draw.text(
(badge_x + 3, badge_y + 1), label,
fill="white", font=font_small,
)
return image
@staticmethod
def _image_to_b64(image: Image.Image, quality: int = 70) -> str:
"""Convertir une image PIL en base64 JPEG."""
buf = io.BytesIO()
image.save(buf, format="JPEG", quality=quality)
return base64.b64encode(buf.getvalue()).decode()

View File

@@ -3,7 +3,7 @@
============================================================ ============================================================
Bienvenue ! Lea est une assistante qui apprend vos taches Bienvenue ! Lea est une assistante qui apprend vos taches
repetitives sur l'ordinateur et peut les refaire a votre place. repetitives sur l'ordinateur pour pouvoir vous aider.
PREMIERE INSTALLATION PREMIERE INSTALLATION
@@ -29,13 +29,15 @@ des taches (petite icone ronde, a cote de l'horloge).
Clic droit sur l'icone pour ouvrir le menu : Clic droit sur l'icone pour ouvrir le menu :
- "Apprenez-moi une tache" : Lea observe ce que vous faites - "Apprenez-moi une tache" : Lea observe ce que vous faites
et memorise les etapes. et memorise les etapes. Travaillez normalement, Lea
apprend en vous regardant.
- "Mes taches" : Liste des taches que Lea a apprises. - "C'est termine" : Arrete l'enregistrement quand vous
Cliquez sur une tache pour que Lea la refasse. avez fini la tache. Si vous oubliez, Lea s'arrete
automatiquement apres 1 heure.
- "Discuter avec Lea" : Ouvre une fenetre de discussion - "Discuter avec Lea" : Ouvre une fenetre de discussion
pour poser des questions ou donner des instructions. pour poser des questions.
- "ARRET D'URGENCE" : Arrete immediatement tout ce que - "ARRET D'URGENCE" : Arrete immediatement tout ce que
Lea est en train de faire. Lea est en train de faire.
@@ -43,6 +45,22 @@ Clic droit sur l'icone pour ouvrir le menu :
- "Quitter Lea" : Ferme le programme. - "Quitter Lea" : Ferme le programme.
INFORMATIONS IMPORTANTES
------------------------
Quand Lea enregistre vos actions, elle capture votre ecran,
vos clics et vos frappes clavier.
- Lea vous previent AVANT chaque enregistrement
- Les donnees sensibles (mots de passe, informations
medicales) sont automatiquement floutees
- L'enregistrement s'arrete automatiquement apres 1 heure
- Vous pouvez arreter a tout moment via le menu
Lea est un systeme base sur l'intelligence artificielle
(Article 50, Reglement europeen sur l'IA).
CONFIGURATION CONFIGURATION
------------- -------------
@@ -56,30 +74,15 @@ EN CAS DE PROBLEME
------------------- -------------------
- "Python n'est pas installe" : Demandez a votre - "Python n'est pas installe" : Demandez a votre
service informatique d'installer Python 3.10 service informatique d'installer Python 3.10+
depuis https://python.org depuis https://python.org
- Lea ne demarre pas : Relancez "install.bat" puis - Lea ne demarre pas : Relancez "install.bat" puis
relancez "Lea.bat" relancez "Lea.bat"
- Lea est deconnectee : Verifiez votre connexion - Lea est deconnectee : Verifiez votre connexion
internet/reseau. Le serveur est peut-etre en reseau. Le serveur est peut-etre en maintenance.
maintenance.
- En cas de doute, contactez votre administrateur. - En cas de doute, contactez votre administrateur.
INFORMATIONS
------------
Lea est un systeme base sur l'intelligence artificielle.
Quand Lea enregistre vos actions, elle capture votre ecran,
vos clics et vos frappes clavier. Les donnees sensibles
(mots de passe, informations medicales) sont automatiquement
floutees avant envoi.
Vous pouvez arreter l'enregistrement ou le replay a tout
moment via le menu ou le bouton "ARRET D'URGENCE".
============================================================ ============================================================

View File

@@ -7,6 +7,14 @@ title Lea - Assistante IA
:: --------------------------------------------------------------- :: ---------------------------------------------------------------
cd /d "%~dp0" cd /d "%~dp0"
:: ---------------------------------------------------------------
:: Fermer les anciennes instances de Lea
:: ---------------------------------------------------------------
taskkill /F /IM pythonw.exe >nul 2>&1
taskkill /F /IM python.exe >nul 2>&1
taskkill /F /IM rpa-agent.exe >nul 2>&1
timeout /t 2 >nul
:: --------------------------------------------------------------- :: ---------------------------------------------------------------
:: Verifier que l'installation a ete faite :: Verifier que l'installation a ete faite
:: --------------------------------------------------------------- :: ---------------------------------------------------------------

26
tests/unit/conftest.py Normal file
View File

@@ -0,0 +1,26 @@
"""Conftest pour les tests unitaires.
Force le bon chemin agent_v0 (rpa_vision_v3) pour éviter les conflits
avec ~/ai/agent_v0 (standalone).
"""
import sys
from pathlib import Path
ROOT = str(Path(__file__).resolve().parents[2])
if ROOT in sys.path:
sys.path.remove(ROOT)
sys.path.insert(0, ROOT)
# Si agent_v0 est déjà chargé depuis le mauvais chemin, le remplacer
_agent_mod = sys.modules.get("agent_v0")
if _agent_mod and not getattr(_agent_mod, "__file__", "").startswith(ROOT):
to_remove = [k for k in sys.modules if k == "agent_v0" or k.startswith("agent_v0.")]
for k in to_remove:
del sys.modules[k]
# Pré-importer le bon agent_v0.server_v1
try:
import agent_v0.server_v1 # noqa: F401
except ImportError:
pass

View File

@@ -0,0 +1,301 @@
"""Tests unitaires pour l'intégration SomEngine dans build_replay et resolve_target.
Vérifie :
- Phase 1 : _som_identify_clicked_element enrichit target_spec avec som_element
- Phase 2 : _resolve_by_som utilise SomEngine + VLM pour résoudre une cible
- Fallbacks gracieux quand SomEngine ou VLM indisponible
"""
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
# ── Phase 1 : Enrichissement build_replay ──
class TestSomIdentifyClickedElement:
"""Tests pour _som_identify_clicked_element (Phase 1)."""
def test_returns_none_when_engine_unavailable(self):
"""Si SomEngine n'est pas disponible, retourne None sans erreur."""
from agent_v0.server_v1.stream_processor import _som_identify_clicked_element
with patch(
"agent_v0.server_v1.stream_processor._get_som_engine",
return_value=None,
):
result = _som_identify_clicked_element(
{"screenshot_id": "shot_0001", "pos": [500, 300]},
Path("/fake/dir"),
1920, 1080,
)
assert result is None
def test_returns_none_when_no_session_dir(self):
"""Sans session_dir, retourne None."""
from agent_v0.server_v1.stream_processor import _som_identify_clicked_element
result = _som_identify_clicked_element(
{"screenshot_id": "shot_0001", "pos": [500, 300]},
None, 1920, 1080,
)
assert result is None
def test_returns_none_when_no_screenshot_id(self):
"""Sans screenshot_id, retourne None."""
from agent_v0.server_v1.stream_processor import _som_identify_clicked_element
result = _som_identify_clicked_element(
{"pos": [500, 300]},
Path("/fake/dir"),
1920, 1080,
)
assert result is None
def test_returns_element_when_found(self, tmp_path):
"""Quand SomEngine trouve un élément sous le clic, retourne ses infos."""
from core.detection.som_engine import SomElement, SomResult
from agent_v0.server_v1.stream_processor import _som_identify_clicked_element
# Créer un faux screenshot
shots_dir = tmp_path / "shots"
shots_dir.mkdir()
from PIL import Image
img = Image.new("RGB", (1920, 1080), color="white")
img.save(shots_dir / "shot_0001_full.png")
# Mock SomEngine
mock_elem = SomElement(
id=5,
bbox=(480, 280, 520, 320),
bbox_norm=(0.25, 0.259, 0.271, 0.296),
center=(500, 300),
center_norm=(0.2604, 0.2778),
source="yolo",
label="Enregistrer",
confidence=0.92,
)
mock_result = SomResult(
elements=[mock_elem],
width=1920,
height=1080,
)
mock_engine = MagicMock()
mock_engine.analyze.return_value = mock_result
with patch(
"agent_v0.server_v1.stream_processor._get_som_engine",
return_value=mock_engine,
):
result = _som_identify_clicked_element(
{"screenshot_id": "shot_0001", "pos": [500, 300]},
tmp_path, 1920, 1080,
)
assert result is not None
assert result["id"] == 5
assert result["label"] == "Enregistrer"
assert result["source"] == "yolo"
assert result["confidence"] == 0.92
assert result["element_count"] == 1
def test_returns_none_when_no_element_at_click(self, tmp_path):
"""Quand aucun élément n'est sous le clic, retourne None."""
from core.detection.som_engine import SomResult
from agent_v0.server_v1.stream_processor import _som_identify_clicked_element
shots_dir = tmp_path / "shots"
shots_dir.mkdir()
from PIL import Image
img = Image.new("RGB", (1920, 1080), color="white")
img.save(shots_dir / "shot_0001_full.png")
# Résultat avec des éléments mais pas au point du clic
mock_result = SomResult(elements=[], width=1920, height=1080)
mock_engine = MagicMock()
mock_engine.analyze.return_value = mock_result
with patch(
"agent_v0.server_v1.stream_processor._get_som_engine",
return_value=mock_engine,
):
result = _som_identify_clicked_element(
{"screenshot_id": "shot_0001", "pos": [500, 300]},
tmp_path, 1920, 1080,
)
assert result is None
# ── Phase 2 : Résolution SoM + VLM ──
class TestResolveBySom:
"""Tests pour _resolve_by_som (Phase 2)."""
def test_returns_none_when_engine_unavailable(self):
"""Sans SomEngine, retourne None."""
from agent_v0.server_v1.api_stream import _resolve_by_som
with patch(
"agent_v0.server_v1.api_stream._get_som_engine_api",
return_value=None,
):
result = _resolve_by_som(
"/fake/path.jpg",
{"vlm_description": "un bouton"},
1920, 1080,
)
assert result is None
def test_returns_none_when_vlm_unavailable(self):
"""Sans VLM, retourne None."""
from agent_v0.server_v1.api_stream import _resolve_by_som
mock_engine = MagicMock()
with patch(
"agent_v0.server_v1.api_stream._get_som_engine_api",
return_value=mock_engine,
), patch(
"agent_v0.server_v1.api_stream._get_vlm_client",
return_value=None,
):
result = _resolve_by_som(
"/fake/path.jpg",
{"vlm_description": "un bouton"},
1920, 1080,
)
assert result is None
def test_returns_none_without_description(self):
"""Sans description ni som_element, retourne None."""
from agent_v0.server_v1.api_stream import _resolve_by_som
mock_engine = MagicMock()
mock_client = MagicMock()
with patch(
"agent_v0.server_v1.api_stream._get_som_engine_api",
return_value=mock_engine,
), patch(
"agent_v0.server_v1.api_stream._get_vlm_client",
return_value=mock_client,
):
result = _resolve_by_som(
"/fake/path.jpg",
{}, # Pas de description
1920, 1080,
)
assert result is None
def test_resolve_success(self, tmp_path):
"""Résolution réussie : SomEngine détecte, VLM identifie le mark."""
from core.detection.som_engine import SomElement, SomResult
from agent_v0.server_v1.api_stream import _resolve_by_som
# Créer un faux screenshot
from PIL import Image
img = Image.new("RGB", (1920, 1080), color="white")
screenshot_path = str(tmp_path / "screen.jpg")
img.save(screenshot_path)
# Mock SomEngine
mock_elem = SomElement(
id=9,
bbox=(960, 540, 1000, 570),
bbox_norm=(0.5, 0.5, 0.521, 0.528),
center=(980, 555),
center_norm=(0.5104, 0.5139),
source="ocr",
label="Ouvrir",
confidence=0.88,
)
mock_result = SomResult(
elements=[mock_elem],
som_image=img.copy(),
som_image_b64="fake_b64",
width=1920,
height=1080,
)
mock_engine = MagicMock()
mock_engine.analyze.return_value = mock_result
# Mock VLM client
mock_client = MagicMock()
mock_client.generate.return_value = {
"success": True,
"response": '{"mark_id": 9, "confidence": 0.95}',
}
mock_client._extract_json_from_response.return_value = {
"mark_id": 9,
"confidence": 0.95,
}
with patch(
"agent_v0.server_v1.api_stream._get_som_engine_api",
return_value=mock_engine,
), patch(
"agent_v0.server_v1.api_stream._get_vlm_client",
return_value=mock_client,
):
result = _resolve_by_som(
screenshot_path,
{
"vlm_description": "le bouton Ouvrir",
"som_element": {"id": 9, "label": "Ouvrir"},
},
1920, 1080,
)
assert result is not None
assert result["resolved"] is True
assert result["method"] in ("som_vlm", "som_text_match")
assert abs(result["x_pct"] - 0.5104) < 0.001
assert abs(result["y_pct"] - 0.5139) < 0.001
assert result["matched_element"]["som_id"] == 9
def test_resolve_vlm_low_confidence(self, tmp_path):
"""VLM retourne une confiance trop basse → None."""
from core.detection.som_engine import SomResult
from agent_v0.server_v1.api_stream import _resolve_by_som
from PIL import Image
img = Image.new("RGB", (1920, 1080), color="white")
screenshot_path = str(tmp_path / "screen.jpg")
img.save(screenshot_path)
mock_result = SomResult(
elements=[MagicMock(id=1, label="test", source="ocr")],
som_image=img.copy(),
width=1920, height=1080,
)
mock_engine = MagicMock()
mock_engine.analyze.return_value = mock_result
mock_client = MagicMock()
mock_client.generate.return_value = {
"success": True,
"response": '{"mark_id": 1, "confidence": 0.1}',
}
mock_client._extract_json_from_response.return_value = {
"mark_id": 1,
"confidence": 0.1,
}
with patch(
"agent_v0.server_v1.api_stream._get_som_engine_api",
return_value=mock_engine,
), patch(
"agent_v0.server_v1.api_stream._get_vlm_client",
return_value=mock_client,
):
result = _resolve_by_som(
screenshot_path,
{"vlm_description": "un bouton"},
1920, 1080,
)
assert result is None