diff --git a/core/execution/input_handler.py b/core/execution/input_handler.py index c53ba7098..273290020 100644 --- a/core/execution/input_handler.py +++ b/core/execution/input_handler.py @@ -22,6 +22,18 @@ try: except ImportError: PYAUTOGUI_AVAILABLE = False +try: + import mss + MSS_AVAILABLE = True +except ImportError: + MSS_AVAILABLE = False + +try: + from PIL import Image as PILImage + PIL_AVAILABLE = True +except ImportError: + PIL_AVAILABLE = False + def safe_type_text(text: str): """Saisie de texte compatible VM/Citrix et claviers AZERTY/QWERTY. @@ -157,11 +169,13 @@ def handle_detected_pattern(pattern: Dict[str, Any]) -> bool: screenshot = sct.grab(monitor) screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') - # EasyOCR (rapide, bonne qualité GUI) avec fallback docTR + # EasyOCR (rapide, bonne qualité GUI) avec fallback docTR. + # gpu=True : harmonisé avec dialog_handler.py et title_verifier.py. + # Coût VRAM ~0.5 GB, sous le budget RTX 5070 (cf. deploy/VRAM_BUDGET.md). words = [] try: import easyocr - _reader = easyocr.Reader(['fr', 'en'], gpu=False, verbose=False) + _reader = easyocr.Reader(['fr', 'en'], gpu=True, verbose=False) results = _reader.readtext(np.array(screen)) for (bbox_pts, text, conf) in results: if not text or len(text.strip()) < 1: @@ -312,6 +326,7 @@ def find_element_on_screen( target_description: str = "", anchor_image_base64: Optional[str] = None, anchor_bbox: Optional[Dict] = None, + monitor_idx: Optional[int] = None, ) -> Optional[Dict[str, Any]]: """ Cherche un élément sur l'écran en utilisant 3 méthodes en cascade. @@ -325,6 +340,7 @@ def find_element_on_screen( target_description: Description plus longue (ex: "le dossier Demo sur le bureau") anchor_image_base64: Image de référence de l'ancre (pour CLIP matching, réservé futur) anchor_bbox: Position originale de l'ancre (pour désambiguïser les matchs multiples) + monitor_idx: Index logique 0..N-1 du monitor à scruter. None = composite legacy. Returns: {'x': int, 'y': int, 'method': str, 'confidence': float} ou None @@ -347,6 +363,13 @@ def find_element_on_screen( logger.debug("find_element_on_screen: ni target_text ni target_description fournis") return None + # Propager monitor_idx au niveau OCR via anchor_bbox (sans muter l'argument original) + if monitor_idx is not None and anchor_bbox is not None: + anchor_bbox = dict(anchor_bbox) # copie pour ne pas muter l'argument + anchor_bbox["monitor_idx"] = monitor_idx + elif monitor_idx is not None: + anchor_bbox = {"monitor_idx": monitor_idx} + search_label = target_description or target_text logger.info(f"[Grounding] Recherche élément: '{search_label}' (cascade 3 niveaux)") @@ -356,12 +379,12 @@ def find_element_on_screen( return result # ─── Niveau 2 — UI-TARS grounding (~3s) ─── - result = _grounding_ui_tars(target_text, target_description) + result = _grounding_ui_tars(target_text, target_description, monitor_idx=monitor_idx) if result: return result # ─── Niveau 3 — VLM reasoning (~10s) ─── - result = _grounding_vlm(target_text, target_description) + result = _grounding_vlm(target_text, target_description, monitor_idx=monitor_idx) if result: return result @@ -411,20 +434,43 @@ def _describe_anchor_image(anchor_image_base64: str) -> Optional[str]: return None -def _capture_screen(): - """Capture l'écran principal et retourne (PIL.Image, width, height).""" - try: - import mss - from PIL import Image as PILImage +def _capture_screen(monitor_idx=None): + """Capture l'écran et retourne (PIL.Image, width, height, offset_x, offset_y). + Args: + monitor_idx: Index logique 0..N-1 du monitor à capturer (cf. screeninfo). + Si None : capture composite (mss.monitors[0]) — comportement legacy. + + Returns: + (image, w, h, offset_x, offset_y). offset = (0,0) en mode composite. + """ + try: with mss.mss() as sct: - monitor = sct.monitors[0] + if monitor_idx is None: + # Comportement actuel : composite tous écrans + monitor = sct.monitors[0] + offset_x, offset_y = 0, 0 + else: + # mss skip monitors[0] (composite). Index logique 0 → mss.monitors[1]. + mss_idx = int(monitor_idx) + 1 + if mss_idx >= len(sct.monitors): + logger.warning( + "mss.monitors[%d] hors limites (n=%d) — fallback composite", + mss_idx, len(sct.monitors), + ) + monitor = sct.monitors[0] + offset_x, offset_y = 0, 0 + else: + monitor = sct.monitors[mss_idx] + offset_x = int(monitor.get("left", 0)) + offset_y = int(monitor.get("top", 0)) + screenshot = sct.grab(monitor) screen = PILImage.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') - return screen, monitor['width'], monitor['height'] + return screen, monitor['width'], monitor['height'], offset_x, offset_y except Exception as e: logger.debug(f"Capture écran échouée: {e}") - return None, 0, 0 + return None, 0, 0, 0, 0 def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Optional[Dict[str, Any]]: @@ -439,7 +485,8 @@ def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Opti return None try: - screen, screen_w, screen_h = _capture_screen() + monitor_idx_param = anchor_bbox.get("monitor_idx") if anchor_bbox else None + screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx_param) if screen is None: return None @@ -503,14 +550,14 @@ def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Opti sel = " ← CHOISI" if m is best else "" logger.info(f" [OCR] Candidat: '{m['text']}' à ({m['x']}, {m['y']}) [{m['type']}]{sel}") - return {'x': best['x'], 'y': best['y'], 'method': 'ocr', 'confidence': best['conf']} + return {'x': best['x'] + ox, 'y': best['y'] + oy, 'method': 'ocr', 'confidence': best['conf']} except Exception as e: logger.debug(f"[Grounding/OCR] Erreur: {e}") return None -def _grounding_ui_tars(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]: +def _grounding_ui_tars(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]: """Niveau 2 — UI-TARS grounding visuel (~3s).""" try: import requests @@ -519,7 +566,7 @@ def _grounding_ui_tars(target_text: str, target_description: str = "") -> Option import re import os - screen, screen_w, screen_h = _capture_screen() + screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx) if screen is None: return None @@ -564,7 +611,7 @@ def _grounding_ui_tars(target_text: str, target_description: str = "") -> Option # Valider que les coordonnées sont dans l'écran if 0 <= x <= screen_w and 0 <= y <= screen_h: logger.info(f"[Grounding/UI-TARS] Grounding → ({x}, {y})") - return {'x': x, 'y': y, 'method': 'ui_tars', 'confidence': 0.85} + return {'x': x + ox, 'y': y + oy, 'method': 'ui_tars', 'confidence': 0.85} else: logger.warning(f"[Grounding/UI-TARS] Coordonnées hors écran: ({x}, {y}) pour {screen_w}x{screen_h}") return None @@ -624,7 +671,7 @@ def _parse_ui_tars_coordinates(text: str, screen_w: int, screen_h: int) -> Optio return None -def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]: +def _grounding_vlm(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]: """Niveau 3 — VLM reasoning + confirmation OCR (~10s).""" try: search_label = target_description or target_text @@ -646,7 +693,7 @@ def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[D logger.info(f"[Grounding/VLM] VLM suggère de cliquer sur: '{vlm_target}'") # Confirmation par OCR : chercher le target VLM sur l'écran - screen, screen_w, screen_h = _capture_screen() + screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx) if screen is None: return None @@ -668,7 +715,7 @@ def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[D x = int((x1 + x2) / 2) y = int((y1 + y2) / 2) logger.info(f"[Grounding/VLM] Confirmé par OCR: '{word['text']}' à ({x}, {y})") - return {'x': x, 'y': y, 'method': 'vlm', 'confidence': 0.75} + return {'x': x + ox, 'y': y + oy, 'method': 'vlm', 'confidence': 0.75} logger.debug(f"[Grounding/VLM] Target VLM '{vlm_target}' non trouvé par OCR") return None diff --git a/tests/integration/test_grounding_offset.py b/tests/integration/test_grounding_offset.py new file mode 100644 index 000000000..492c5f9c6 --- /dev/null +++ b/tests/integration/test_grounding_offset.py @@ -0,0 +1,41 @@ +# tests/integration/test_grounding_offset.py +"""Tests intégration pour la propagation d'offset multi-écrans (QW1).""" +import pytest +from unittest.mock import patch, MagicMock + +from core.execution import input_handler + + +@pytest.fixture +def mock_screen(): + """Mock une capture mss : retourne un PIL Image factice + offsets.""" + from PIL import Image + img = Image.new("RGB", (1920, 1080), color="white") + return img + + +def test_capture_screen_default_returns_composite_when_no_idx(mock_screen): + """_capture_screen() sans monitor_idx → composite, offset (0, 0).""" + with patch("core.execution.input_handler.mss") as mock_mss: + ctx = mock_mss.mss.return_value.__enter__.return_value + ctx.monitors = [{"left": 0, "top": 0, "width": 3840, "height": 1080}] + ctx.grab.return_value = MagicMock(size=(3840, 1080), bgra=b"\x00" * (3840 * 1080 * 4)) + with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen): + screen, w, h, ox, oy = input_handler._capture_screen() + assert (w, h, ox, oy) == (3840, 1080, 0, 0) + + +def test_capture_screen_targets_specific_monitor_with_offset(mock_screen): + """_capture_screen(monitor_idx=1) → cible monitors[2] (mss skip [0]), offset = monitor.left.""" + with patch("core.execution.input_handler.mss") as mock_mss: + ctx = mock_mss.mss.return_value.__enter__.return_value + # mss layout : [0]=composite, [1]=primary, [2]=secondary + ctx.monitors = [ + {"left": 0, "top": 0, "width": 3840, "height": 1080}, + {"left": 0, "top": 0, "width": 1920, "height": 1080}, + {"left": 1920, "top": 0, "width": 1920, "height": 1080}, + ] + ctx.grab.return_value = MagicMock(size=(1920, 1080), bgra=b"\x00" * (1920 * 1080 * 4)) + with patch("core.execution.input_handler.PILImage.frombytes", return_value=mock_screen): + screen, w, h, ox, oy = input_handler._capture_screen(monitor_idx=1) + assert (w, h, ox, oy) == (1920, 1080, 1920, 0)