3 Commits

Author SHA1 Message Date
Dom
2d71e2a249 feat(qw1): enrichissement Agent V1 (monitor_index + monitors_geometry) + hook serveur
Some checks failed
tests / Lint (ruff + black) (push) Successful in 16s
tests / Tests unitaires (sans GPU) (push) Failing after 15s
tests / Tests sécurité (critique) (push) Has been skipped
Côté client Agent V1 :
- helpers _get_monitors_geometry() / _get_active_monitor_index() via screeninfo
  (fallback gracieux [] / None si screeninfo absent)
- _enrich_with_monitor_info() ajouté aux payloads dict de capture_dual,
  capture_active_window, et heartbeat_event poussé par main.py
- screeninfo>=0.8 ajouté aux requirements (source + deploy Windows)
- Deploy capturer.py reçoit l'enrichissement de manière additive (pas de
  copie verbatim qui aurait introduit BLUR_SENSITIVE absent côté deploy)

Côté serveur :
- import resolve_target_monitor depuis monitor_router (créé en QW1.1)
- /replay/next : enrichissement action.monitor_resolution avant envoi
  au client (idx, offset_x/y, w, h, source de la décision)
- live_session_manager.add_event : propagation monitor_index +
  monitors_geometry depuis window_capture ET depuis le payload event
  brut (cas heartbeat enrichi sans window/window_title)

Cascade de résolution (cf monitor_router.py) :
1. action.monitor_index (hérité de la session source)
2. session.last_focused_monitor (focus actif vu en dernier heartbeat)
3. composite_fallback (offset 0,0) — backward compat strict

Backward 100% : si geometry vide, fallback composite identique au
comportement actuel mss.monitors[0].

Tests : baseline 89/89 préservée, monitor_router 4/4 OK (total 93/93).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 23:05:44 +02:00
Dom
fae95c5366 feat(qw1): capture par monitor + propagation offsets dans grounding cascade
_capture_screen() accepte un monitor_idx optionnel (None = composite legacy).
Index logique 0..N-1 mappé sur mss.monitors[idx+1] (mss[0] = composite).

Les 3 niveaux de grounding (OCR, UI-TARS, VLM) propagent l'offset retourné
par la capture pour traduire les coordonnées locales monitor en coordonnées
absolues écran (correct pour pyautogui.click).

find_element_on_screen() accepte monitor_idx et le forwarde aux 3 niveaux.

Backward 100% : monitor_idx=None partout → comportement strictement actuel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 22:55:04 +02:00
Dom
6582a69d31 feat(qw1): MonitorRouter — résolution de l'écran cible pour le replay
Module isolé qui choisit l'écran cible avec stratégie en cascade :
1. action.monitor_index (session source) → cible explicite
2. session.last_focused_monitor → fallback focus actif
3. composite (offset 0,0) → backward compat (comportement actuel)

Backward 100% : actions sans monitor_index → fallback composite identique
au comportement mss.monitors[0] actuel.

Tests : 4 cas (cible OK, fallback focus, fallback composite, index invalide).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 22:50:22 +02:00
11 changed files with 455 additions and 22 deletions

View File

@@ -448,6 +448,12 @@ class AgentV1:
window_title = self.vision.get_active_window_title()
if window_title:
heartbeat_event["active_window_title"] = window_title
# QW1 — enrichissement multi-écrans (additif, fallback gracieux)
try:
from .vision.capturer import _enrich_with_monitor_info
_enrich_with_monitor_info(heartbeat_event)
except Exception:
pass
self.streamer.push_event(heartbeat_event)
except Exception as e:
logger.error(f"Heartbeat error: {e}")

View File

@@ -5,6 +5,7 @@ Pillow>=10.0.0 # Crops et processing image
requests>=2.31.0 # Streaming réseau
python-socketio[client]>=5.10,<6.0 # Bus feedback Léa 'lea:*' (compat Flask-SocketIO 5.3.x serveur)
psutil>=5.9.0 # Monitoring CPU/RAM
screeninfo>=0.8 # QW1 — détection des monitors physiques + offsets
pystray>=0.19.5 # Icône Tray UI
plyer>=2.1.0 # Notifications toast natives (remplace PyQt5)
pywebview>=5.0 # Fenêtre de chat Léa intégrée (Edge WebView2 sur Windows)

View File

@@ -15,7 +15,7 @@ import time
import logging
import hashlib
import platform
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional
from PIL import Image, ImageFilter, ImageStat
import mss
from ..config import TARGETED_CROP_SIZE, SCREENSHOT_QUALITY, BLUR_SENSITIVE
@@ -26,6 +26,66 @@ logger = logging.getLogger(__name__)
# OS courant (détecté une seule fois)
_SYSTEM = platform.system()
# QW1 — détection multi-écrans (fallback gracieux si screeninfo absent)
try:
from screeninfo import get_monitors as _screeninfo_get_monitors
_SCREENINFO_AVAILABLE = True
except ImportError:
_SCREENINFO_AVAILABLE = False
def _get_monitors_geometry() -> List[Dict[str, Any]]:
"""Retourne la liste des monitors physiques avec leurs offsets.
Returns:
List[dict] : [{idx, x, y, w, h, primary}, ...]. Vide si screeninfo
indisponible (le serveur tombera sur fallback composite).
"""
if not _SCREENINFO_AVAILABLE:
return []
try:
monitors = _screeninfo_get_monitors()
return [
{
"idx": i,
"x": int(m.x),
"y": int(m.y),
"w": int(m.width),
"h": int(m.height),
"primary": bool(getattr(m, "is_primary", False)),
}
for i, m in enumerate(monitors)
]
except Exception:
return []
def _get_active_monitor_index() -> Optional[int]:
"""Retourne l'index logique du monitor où se trouve le curseur (focus actif).
Returns:
int ou None si indéterminable.
"""
if not _SCREENINFO_AVAILABLE:
return None
try:
import pyautogui # import paresseux : évite la dépendance dure
cx, cy = pyautogui.position()
for i, m in enumerate(_screeninfo_get_monitors()):
if m.x <= cx < m.x + m.width and m.y <= cy < m.y + m.height:
return i
except Exception:
return None
return None
def _enrich_with_monitor_info(payload: dict) -> dict:
"""Ajoute monitor_index et monitors_geometry au payload (in-place + return)."""
if isinstance(payload, dict):
payload["monitor_index"] = _get_active_monitor_index()
payload["monitors_geometry"] = _get_monitors_geometry()
return payload
class VisionCapturer:
def __init__(self, session_dir: str):
self.session_dir = session_dir
@@ -121,6 +181,9 @@ class VisionCapturer:
if window_info:
result["window_capture"] = window_info
# QW1 — enrichissement multi-écrans (additif, fallback gracieux)
_enrich_with_monitor_info(result)
return result
except Exception as e:
logger.error(f"Erreur Dual Capture: {e}")
@@ -223,6 +286,9 @@ class VisionCapturer:
"click_inside_window": click_inside,
}
# QW1 — enrichissement multi-écrans (additif)
_enrich_with_monitor_info(result)
logger.debug(
f"Fenêtre capturée : {title} ({win_w}x{win_h}) — "
f"clic relatif ({click_rel_x}, {click_rel_y})"

View File

@@ -8,12 +8,73 @@ import os
import time
import logging
import hashlib
from typing import Any, Dict, List, Optional
from PIL import Image, ImageFilter, ImageStat
import mss
from ..config import TARGETED_CROP_SIZE, SCREENSHOT_QUALITY
logger = logging.getLogger(__name__)
# QW1 — détection multi-écrans (fallback gracieux si screeninfo absent)
try:
from screeninfo import get_monitors as _screeninfo_get_monitors
_SCREENINFO_AVAILABLE = True
except ImportError:
_SCREENINFO_AVAILABLE = False
def _get_monitors_geometry() -> List[Dict[str, Any]]:
"""Retourne la liste des monitors physiques avec leurs offsets.
Returns:
List[dict] : [{idx, x, y, w, h, primary}, ...]. Vide si screeninfo
indisponible (le serveur tombera sur fallback composite).
"""
if not _SCREENINFO_AVAILABLE:
return []
try:
monitors = _screeninfo_get_monitors()
return [
{
"idx": i,
"x": int(m.x),
"y": int(m.y),
"w": int(m.width),
"h": int(m.height),
"primary": bool(getattr(m, "is_primary", False)),
}
for i, m in enumerate(monitors)
]
except Exception:
return []
def _get_active_monitor_index() -> Optional[int]:
"""Retourne l'index logique du monitor où se trouve le curseur (focus actif).
Returns:
int ou None si indéterminable.
"""
if not _SCREENINFO_AVAILABLE:
return None
try:
import pyautogui # import paresseux : évite la dépendance dure
cx, cy = pyautogui.position()
for i, m in enumerate(_screeninfo_get_monitors()):
if m.x <= cx < m.x + m.width and m.y <= cy < m.y + m.height:
return i
except Exception:
return None
return None
def _enrich_with_monitor_info(payload: dict) -> dict:
"""Ajoute monitor_index et monitors_geometry au payload (in-place + return)."""
if isinstance(payload, dict):
payload["monitor_index"] = _get_active_monitor_index()
payload["monitors_geometry"] = _get_monitors_geometry()
return payload
class VisionCapturer:
def __init__(self, session_dir: str):
self.session_dir = session_dir
@@ -72,7 +133,12 @@ class VisionCapturer:
# Mise à jour du hash pour le prochain heartbeat
self.last_img_hash = self._compute_quick_hash(img)
return {"full": full_path, "crop": crop_path}
result = {"full": full_path, "crop": crop_path}
# QW1 — enrichissement multi-écrans (additif, fallback gracieux)
_enrich_with_monitor_info(result)
return result
except Exception as e:
logger.error(f"Erreur Dual Capture: {e}")
return {}

View File

@@ -5,6 +5,7 @@ Pillow>=10.0.0 # Crops et processing image
requests>=2.31.0 # Streaming réseau
python-socketio[client]>=5.10,<6.0 # Bus feedback Léa 'lea:*' (compat Flask-SocketIO 5.3.x serveur)
psutil>=5.9.0 # Monitoring CPU/RAM
screeninfo>=0.8 # QW1 — détection des monitors physiques + offsets
pystray>=0.19.5 # Icône Tray UI
plyer>=2.1.0 # Notifications toast natives (remplace PyQt5)

View File

@@ -33,6 +33,7 @@ from .audit_trail import AuditTrail, AuditEntry
from .agent_registry import AgentRegistry, AgentAlreadyEnrolledError
from .stream_processor import StreamProcessor, build_replay_from_raw_events, enrich_click_from_screenshot
from .worker_stream import StreamWorker
from .monitor_router import resolve_target_monitor # QW1 — résolution écran cible
from .execution_plan_runner import (
execution_plan_to_actions,
inject_plan_into_queue,
@@ -222,6 +223,7 @@ from .replay_engine import (
_resolve_runtime_vars,
_SERVER_SIDE_ACTION_TYPES,
_handle_extract_text_action,
_handle_extract_table_action,
_handle_t2a_decision_action,
_expand_compound_steps,
_pre_check_screen_state as _pre_check_screen_state_impl,
@@ -511,6 +513,7 @@ class ReplayRequest(BaseModel):
session_id: str
machine_id: Optional[str] = None # Machine cible pour le replay (multi-machine)
params: Optional[Dict[str, Any]] = None
variables: Optional[Dict[str, Any]] = None # Variables runtime initiales (templating {{var}})
class RawReplayRequest(BaseModel):
@@ -765,6 +768,21 @@ async def startup():
_cleanup_thread = threading.Thread(target=_cleanup_loop, daemon=True, name="replay_cleanup")
_cleanup_thread.start()
# Préchargement EasyOCR en arrière-plan : sans ça, le 1er extract_text /
# extract_table déclenche un cold start de ~3-5s qui bloque l'event loop
# FastAPI (constaté 2026-05-05 : streaming server inaccessible 2 min).
# Le thread tourne pendant que le boot continue ; le 1er appel OCR sera rapide.
def _preload_easyocr():
try:
t0 = time.time()
from core.llm.ocr_extractor import _get_reader
_get_reader()
logger.info("[OCR] EasyOCR préchargé (fr+en, CPU) en %.1fs", time.time() - t0)
except Exception as e:
logger.warning("[OCR] Échec préchargement EasyOCR : %s", e)
threading.Thread(target=_preload_easyocr, daemon=True, name="preload_easyocr").start()
logger.info(
"API Streaming démarrée — StreamProcessor, Worker et Cleanup prêts. "
"VLM Worker dans un process séparé (run_worker.py)."
@@ -1962,6 +1980,11 @@ async def start_replay(request: ReplayRequest):
machine_id=resolved_machine_id,
actions=actions,
)
# Pré-injection des variables runtime (templating {{var}} sur by_text,
# text, target_spec.* etc.). Permet à l'orchestrateur d'appeler ce
# workflow avec p.ex. variables={"patient_id": "25003284"} pour boucler.
if request.variables:
_replay_states[replay_id]["variables"].update(request.variables)
# Enregistrer le mapping machine -> session pour le replay ciblé
if resolved_machine_id and resolved_machine_id != "default":
_machine_replay_target[resolved_machine_id] = session_id
@@ -2914,6 +2937,12 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
_handle_extract_text_action,
action, owning_replay, session_id, _last_heartbeat,
)
elif type_ == "extract_table":
await loop.run_in_executor(
None,
_handle_extract_table_action,
action, owning_replay, session_id, _last_heartbeat,
)
elif type_ == "t2a_decision":
await loop.run_in_executor(
None,
@@ -3117,6 +3146,29 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
f"{_precheck_sim}"
)
# QW1 — Résoudre l'écran cible et joindre l'info à l'action
# Cascade : action.monitor_index → session.last_focused_monitor → composite_fallback
try:
session_qw1 = processor.session_manager.get_session(session_id)
last_window_info_qw1 = (
session_qw1.last_window_info if session_qw1 is not None else {}
) or {}
session_state_qw1 = {
"monitors_geometry": last_window_info_qw1.get("monitors_geometry", []),
"last_focused_monitor": last_window_info_qw1.get("monitor_index"),
}
target = resolve_target_monitor(action, session_state_qw1)
action["monitor_resolution"] = {
"idx": target.idx,
"offset_x": target.offset_x,
"offset_y": target.offset_y,
"w": target.w,
"h": target.h,
"source": target.source,
}
except Exception as e:
logger.debug("QW1 monitor_resolution skip (%s)", e)
response: Dict[str, Any] = {
"action": action,
"session_id": session_id,

View File

@@ -256,6 +256,20 @@ class LiveSessionManager:
session.last_window_info["title"] = wc_title
if wc_app:
session.last_window_info["app_name"] = wc_app
# QW1 — propager monitor_index et monitors_geometry depuis window_capture
if "monitor_index" in window_capture:
session.last_window_info["monitor_index"] = window_capture["monitor_index"]
if "monitors_geometry" in window_capture:
session.last_window_info["monitors_geometry"] = window_capture["monitors_geometry"]
# QW1 — propager monitor_index/monitors_geometry du payload event
# (cas heartbeat enrichi sans window/window_title). Toujours
# rafraîchir le focus actif (change souvent) et la géométrie
# (l'utilisateur peut brancher/débrancher un écran).
if "monitor_index" in event_data:
session.last_window_info["monitor_index"] = event_data["monitor_index"]
if "monitors_geometry" in event_data and event_data["monitors_geometry"]:
session.last_window_info["monitors_geometry"] = event_data["monitors_geometry"]
# Accumuler les titres/apps pour le nommage automatique
title = session.last_window_info.get("title", "").strip()

View File

@@ -0,0 +1,88 @@
# agent_v0/server_v1/monitor_router.py
"""MonitorRouter — résolution de l'écran cible pour le replay (QW1).
Stratégie en cascade :
1. action.monitor_index (hérité de la session source) → cible cet écran
2. session.last_focused_monitor (focus actif vu en dernier heartbeat) → fallback
3. composite (offset 0, 0) → backward compat
Émet sur le bus lea:* l'event monitor_routed avec la source de la décision.
"""
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
@dataclass
class MonitorTarget:
"""Représente l'écran cible résolu pour une action de replay."""
idx: int
offset_x: int
offset_y: int
w: int
h: int
source: str # "action" | "focus" | "composite_fallback"
_COMPOSITE_FALLBACK = MonitorTarget(
idx=-1,
offset_x=0,
offset_y=0,
w=0,
h=0,
source="composite_fallback",
)
def _find_monitor(geometry: List[Dict[str, Any]], idx: int) -> Optional[Dict[str, Any]]:
"""Retourne le monitor d'index donné, ou None si absent."""
for m in geometry:
if m.get("idx") == idx:
return m
return None
def _to_target(monitor: Dict[str, Any], source: str) -> MonitorTarget:
return MonitorTarget(
idx=int(monitor["idx"]),
offset_x=int(monitor.get("x", 0)),
offset_y=int(monitor.get("y", 0)),
w=int(monitor.get("w", 0)),
h=int(monitor.get("h", 0)),
source=source,
)
def resolve_target_monitor(
action: Dict[str, Any],
session_state: Dict[str, Any],
) -> MonitorTarget:
"""Résout l'écran cible d'une action de replay.
Args:
action: Dict de l'action (peut contenir `monitor_index`).
session_state: État de la session (doit contenir `monitors_geometry`
et `last_focused_monitor`).
Returns:
MonitorTarget avec l'offset à appliquer aux coordonnées de grounding.
"""
geometry: List[Dict[str, Any]] = session_state.get("monitors_geometry") or []
# 1. Cible explicite via action
explicit_idx = action.get("monitor_index")
if explicit_idx is not None and geometry:
m = _find_monitor(geometry, int(explicit_idx))
if m is not None:
return _to_target(m, source="action")
# Index invalide → on tombe sur le fallback focus
# 2. Fallback focus actif
focused_idx = session_state.get("last_focused_monitor")
if focused_idx is not None and geometry:
m = _find_monitor(geometry, int(focused_idx))
if m is not None:
return _to_target(m, source="focus")
# 3. Fallback composite (backward compat — comportement actuel mss.monitors[0])
return _COMPOSITE_FALLBACK

View File

@@ -22,6 +22,18 @@ try:
except ImportError:
PYAUTOGUI_AVAILABLE = False
try:
import mss
MSS_AVAILABLE = True
except ImportError:
MSS_AVAILABLE = False
try:
from PIL import Image as PILImage
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
def safe_type_text(text: str):
"""Saisie de texte compatible VM/Citrix et claviers AZERTY/QWERTY.
@@ -157,11 +169,13 @@ def handle_detected_pattern(pattern: Dict[str, Any]) -> bool:
screenshot = sct.grab(monitor)
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
# EasyOCR (rapide, bonne qualité GUI) avec fallback docTR
# EasyOCR (rapide, bonne qualité GUI) avec fallback docTR.
# gpu=True : harmonisé avec dialog_handler.py et title_verifier.py.
# Coût VRAM ~0.5 GB, sous le budget RTX 5070 (cf. deploy/VRAM_BUDGET.md).
words = []
try:
import easyocr
_reader = easyocr.Reader(['fr', 'en'], gpu=False, verbose=False)
_reader = easyocr.Reader(['fr', 'en'], gpu=True, verbose=False)
results = _reader.readtext(np.array(screen))
for (bbox_pts, text, conf) in results:
if not text or len(text.strip()) < 1:
@@ -312,6 +326,7 @@ def find_element_on_screen(
target_description: str = "",
anchor_image_base64: Optional[str] = None,
anchor_bbox: Optional[Dict] = None,
monitor_idx: Optional[int] = None,
) -> Optional[Dict[str, Any]]:
"""
Cherche un élément sur l'écran en utilisant 3 méthodes en cascade.
@@ -325,6 +340,7 @@ def find_element_on_screen(
target_description: Description plus longue (ex: "le dossier Demo sur le bureau")
anchor_image_base64: Image de référence de l'ancre (pour CLIP matching, réservé futur)
anchor_bbox: Position originale de l'ancre (pour désambiguïser les matchs multiples)
monitor_idx: Index logique 0..N-1 du monitor à scruter. None = composite legacy.
Returns:
{'x': int, 'y': int, 'method': str, 'confidence': float} ou None
@@ -347,6 +363,13 @@ def find_element_on_screen(
logger.debug("find_element_on_screen: ni target_text ni target_description fournis")
return None
# Propager monitor_idx au niveau OCR via anchor_bbox (sans muter l'argument original)
if monitor_idx is not None and anchor_bbox is not None:
anchor_bbox = dict(anchor_bbox) # copie pour ne pas muter l'argument
anchor_bbox["monitor_idx"] = monitor_idx
elif monitor_idx is not None:
anchor_bbox = {"monitor_idx": monitor_idx}
search_label = target_description or target_text
logger.info(f"[Grounding] Recherche élément: '{search_label}' (cascade 3 niveaux)")
@@ -356,12 +379,12 @@ def find_element_on_screen(
return result
# ─── Niveau 2 — UI-TARS grounding (~3s) ───
result = _grounding_ui_tars(target_text, target_description)
result = _grounding_ui_tars(target_text, target_description, monitor_idx=monitor_idx)
if result:
return result
# ─── Niveau 3 — VLM reasoning (~10s) ───
result = _grounding_vlm(target_text, target_description)
result = _grounding_vlm(target_text, target_description, monitor_idx=monitor_idx)
if result:
return result
@@ -411,20 +434,43 @@ def _describe_anchor_image(anchor_image_base64: str) -> Optional[str]:
return None
def _capture_screen():
"""Capture l'écran principal et retourne (PIL.Image, width, height)."""
try:
import mss
from PIL import Image as PILImage
def _capture_screen(monitor_idx=None):
"""Capture l'écran et retourne (PIL.Image, width, height, offset_x, offset_y).
Args:
monitor_idx: Index logique 0..N-1 du monitor à capturer (cf. screeninfo).
Si None : capture composite (mss.monitors[0]) — comportement legacy.
Returns:
(image, w, h, offset_x, offset_y). offset = (0,0) en mode composite.
"""
try:
with mss.mss() as sct:
monitor = sct.monitors[0]
if monitor_idx is None:
# Comportement actuel : composite tous écrans
monitor = sct.monitors[0]
offset_x, offset_y = 0, 0
else:
# mss skip monitors[0] (composite). Index logique 0 → mss.monitors[1].
mss_idx = int(monitor_idx) + 1
if mss_idx >= len(sct.monitors):
logger.warning(
"mss.monitors[%d] hors limites (n=%d) — fallback composite",
mss_idx, len(sct.monitors),
)
monitor = sct.monitors[0]
offset_x, offset_y = 0, 0
else:
monitor = sct.monitors[mss_idx]
offset_x = int(monitor.get("left", 0))
offset_y = int(monitor.get("top", 0))
screenshot = sct.grab(monitor)
screen = PILImage.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
return screen, monitor['width'], monitor['height']
return screen, monitor['width'], monitor['height'], offset_x, offset_y
except Exception as e:
logger.debug(f"Capture écran échouée: {e}")
return None, 0, 0
return None, 0, 0, 0, 0
def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
@@ -439,7 +485,8 @@ def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Opti
return None
try:
screen, screen_w, screen_h = _capture_screen()
monitor_idx_param = anchor_bbox.get("monitor_idx") if anchor_bbox else None
screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx_param)
if screen is None:
return None
@@ -503,14 +550,14 @@ def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Opti
sel = " ← CHOISI" if m is best else ""
logger.info(f" [OCR] Candidat: '{m['text']}' à ({m['x']}, {m['y']}) [{m['type']}]{sel}")
return {'x': best['x'], 'y': best['y'], 'method': 'ocr', 'confidence': best['conf']}
return {'x': best['x'] + ox, 'y': best['y'] + oy, 'method': 'ocr', 'confidence': best['conf']}
except Exception as e:
logger.debug(f"[Grounding/OCR] Erreur: {e}")
return None
def _grounding_ui_tars(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]:
def _grounding_ui_tars(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]:
"""Niveau 2 — UI-TARS grounding visuel (~3s)."""
try:
import requests
@@ -519,7 +566,7 @@ def _grounding_ui_tars(target_text: str, target_description: str = "") -> Option
import re
import os
screen, screen_w, screen_h = _capture_screen()
screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx)
if screen is None:
return None
@@ -564,7 +611,7 @@ def _grounding_ui_tars(target_text: str, target_description: str = "") -> Option
# Valider que les coordonnées sont dans l'écran
if 0 <= x <= screen_w and 0 <= y <= screen_h:
logger.info(f"[Grounding/UI-TARS] Grounding → ({x}, {y})")
return {'x': x, 'y': y, 'method': 'ui_tars', 'confidence': 0.85}
return {'x': x + ox, 'y': y + oy, 'method': 'ui_tars', 'confidence': 0.85}
else:
logger.warning(f"[Grounding/UI-TARS] Coordonnées hors écran: ({x}, {y}) pour {screen_w}x{screen_h}")
return None
@@ -624,7 +671,7 @@ def _parse_ui_tars_coordinates(text: str, screen_w: int, screen_h: int) -> Optio
return None
def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]:
def _grounding_vlm(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]:
"""Niveau 3 — VLM reasoning + confirmation OCR (~10s)."""
try:
search_label = target_description or target_text
@@ -646,7 +693,7 @@ def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[D
logger.info(f"[Grounding/VLM] VLM suggère de cliquer sur: '{vlm_target}'")
# Confirmation par OCR : chercher le target VLM sur l'écran
screen, screen_w, screen_h = _capture_screen()
screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx)
if screen is None:
return None
@@ -668,7 +715,7 @@ def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[D
x = int((x1 + x2) / 2)
y = int((y1 + y2) / 2)
logger.info(f"[Grounding/VLM] Confirmé par OCR: '{word['text']}' à ({x}, {y})")
return {'x': x, 'y': y, 'method': 'vlm', 'confidence': 0.75}
return {'x': x + ox, 'y': y + oy, 'method': 'vlm', 'confidence': 0.75}
logger.debug(f"[Grounding/VLM] Target VLM '{vlm_target}' non trouvé par OCR")
return None

View File

@@ -0,0 +1,41 @@
# tests/integration/test_grounding_offset.py
"""Tests intégration pour la propagation d'offset multi-écrans (QW1)."""
import pytest
from unittest.mock import patch, MagicMock
from core.execution import input_handler
@pytest.fixture
def mock_screen():
"""Mock une capture mss : retourne un PIL Image factice + offsets."""
from PIL import Image
img = Image.new("RGB", (1920, 1080), color="white")
return img
def test_capture_screen_default_returns_composite_when_no_idx(mock_screen):
"""_capture_screen() sans monitor_idx → composite, offset (0, 0)."""
with patch("core.execution.input_handler.mss") as mock_mss:
ctx = mock_mss.mss.return_value.__enter__.return_value
ctx.monitors = [{"left": 0, "top": 0, "width": 3840, "height": 1080}]
ctx.grab.return_value = MagicMock(size=(3840, 1080), bgra=b"\x00" * (3840 * 1080 * 4))
with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen):
screen, w, h, ox, oy = input_handler._capture_screen()
assert (w, h, ox, oy) == (3840, 1080, 0, 0)
def test_capture_screen_targets_specific_monitor_with_offset(mock_screen):
"""_capture_screen(monitor_idx=1) → cible monitors[2] (mss skip [0]), offset = monitor.left."""
with patch("core.execution.input_handler.mss") as mock_mss:
ctx = mock_mss.mss.return_value.__enter__.return_value
# mss layout : [0]=composite, [1]=primary, [2]=secondary
ctx.monitors = [
{"left": 0, "top": 0, "width": 3840, "height": 1080},
{"left": 0, "top": 0, "width": 1920, "height": 1080},
{"left": 1920, "top": 0, "width": 1920, "height": 1080},
]
ctx.grab.return_value = MagicMock(size=(1920, 1080), bgra=b"\x00" * (1920 * 1080 * 4))
with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen):
screen, w, h, ox, oy = input_handler._capture_screen(monitor_idx=1)
assert (w, h, ox, oy) == (1920, 1080, 1920, 0)

View File

@@ -0,0 +1,51 @@
# tests/unit/test_monitor_router.py
"""Tests unitaires pour MonitorRouter (QW1)."""
import pytest
from agent_v0.server_v1.monitor_router import resolve_target_monitor, MonitorTarget
# Geometry de référence pour les 3 tests : 2 écrans côte à côte
TWO_MONITORS = [
{"idx": 0, "x": 0, "y": 0, "w": 1920, "h": 1080, "primary": True},
{"idx": 1, "x": 1920, "y": 0, "w": 1920, "h": 1080, "primary": False},
]
def test_resolve_uses_action_monitor_index_when_present():
"""Si action.monitor_index présent et valide → cible cet écran."""
action = {"monitor_index": 1}
session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 0}
result = resolve_target_monitor(action, session_state)
assert result.idx == 1
assert result.offset_x == 1920
assert result.offset_y == 0
assert result.source == "action"
def test_resolve_falls_back_to_focused_monitor_when_action_missing():
"""Si action.monitor_index absent → fallback focus actif."""
action = {} # pas de monitor_index
session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 1}
result = resolve_target_monitor(action, session_state)
assert result.idx == 1
assert result.source == "focus"
def test_resolve_falls_back_to_composite_when_geometry_empty():
"""Si geometry vide (vieux Agent V1) → fallback composite (idx=-1, offset=0)."""
action = {}
session_state = {"monitors_geometry": [], "last_focused_monitor": None}
result = resolve_target_monitor(action, session_state)
assert result.source == "composite_fallback"
assert result.offset_x == 0
assert result.offset_y == 0
def test_resolve_falls_back_when_action_index_out_of_range():
"""Si action.monitor_index hors limites (écran débranché) → fallback focus."""
action = {"monitor_index": 5} # n'existe pas
session_state = {"monitors_geometry": TWO_MONITORS, "last_focused_monitor": 0}
result = resolve_target_monitor(action, session_state)
assert result.idx == 0
assert result.source == "focus"