# agent_v1/vision/capturer.py """ Gestionnaire de vision avancé pour Agent V1. Optimisé pour le streaming fibre avec détection de changement. Captures disponibles : - Plein écran (full) : contexte global 1920x1080+ - Crop ciblé (crop) : 80x80 autour du clic (apprentissage VLM) - Fenêtre active (window) : image isolée de la fenêtre + métadonnées (titre, rect, coordonnées clic relatives) — cross-platform """ import os import time import logging import hashlib import platform from typing import Any, Dict, List, Optional from PIL import Image, ImageFilter, ImageStat import mss from ..config import TARGETED_CROP_SIZE, SCREENSHOT_QUALITY, BLUR_SENSITIVE from .blur_sensitive import blur_sensitive_regions logger = logging.getLogger(__name__) # OS courant (détecté une seule fois) _SYSTEM = platform.system() # QW1 — détection multi-écrans (fallback gracieux si screeninfo absent) try: from screeninfo import get_monitors as _screeninfo_get_monitors _SCREENINFO_AVAILABLE = True except ImportError: _SCREENINFO_AVAILABLE = False def _get_monitors_geometry() -> List[Dict[str, Any]]: """Retourne la liste des monitors physiques avec leurs offsets. Returns: List[dict] : [{idx, x, y, w, h, primary}, ...]. Vide si screeninfo indisponible (le serveur tombera sur fallback composite). """ if not _SCREENINFO_AVAILABLE: return [] try: monitors = _screeninfo_get_monitors() return [ { "idx": i, "x": int(m.x), "y": int(m.y), "w": int(m.width), "h": int(m.height), "primary": bool(getattr(m, "is_primary", False)), } for i, m in enumerate(monitors) ] except Exception: return [] def _get_active_monitor_index() -> Optional[int]: """Retourne l'index logique du monitor où se trouve le curseur (focus actif). Returns: int ou None si indéterminable. """ if not _SCREENINFO_AVAILABLE: return None try: import pyautogui # import paresseux : évite la dépendance dure cx, cy = pyautogui.position() for i, m in enumerate(_screeninfo_get_monitors()): if m.x <= cx < m.x + m.width and m.y <= cy < m.y + m.height: return i except Exception: return None return None def _enrich_with_monitor_info(payload: dict) -> dict: """Ajoute monitor_index et monitors_geometry au payload (in-place + return).""" if isinstance(payload, dict): payload["monitor_index"] = _get_active_monitor_index() payload["monitors_geometry"] = _get_monitors_geometry() return payload class VisionCapturer: def __init__(self, session_dir: str): self.session_dir = session_dir self.shots_dir = os.path.join(session_dir, "shots") os.makedirs(self.shots_dir, exist_ok=True) # On ne crée plus self.sct ici car mss n'est pas thread-safe sous Windows self.last_img_hash = None def capture_full_context(self, name_suffix: str, force=False) -> str: """ Capture l'écran complet. Si force=False, vérifie d'abord si l'écran a changé. Enrichit les métadonnées avec le titre de la fenêtre active (utile pour le contextualisation des heartbeats côté serveur). """ try: with mss.mss() as sct: monitor = sct.monitors[1] sct_img = sct.grab(monitor) img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Détection de changement (pour Heartbeat) if not force: current_hash = self._compute_quick_hash(img) if current_hash == self.last_img_hash: return "" # Pas de changement, on économise la fibre self.last_img_hash = current_hash # Floutage des données sensibles (conformité AI Act) if BLUR_SENSITIVE: blur_sensitive_regions(img) path = os.path.join(self.shots_dir, f"context_{int(time.time())}_{name_suffix}.png") img.save(path, "PNG", quality=SCREENSHOT_QUALITY) return path except Exception as e: logger.error(f"Erreur Context Capture: {e}") return "" def get_active_window_title(self) -> str: """Retourne le titre de la fenêtre active (pour enrichir les heartbeats). Fallback gracieux : retourne une chaîne vide si indisponible. """ try: from ..window_info_crossplatform import get_active_window_info info = get_active_window_info() return info.get("title", "") except Exception: return "" def capture_dual(self, x: int, y: int, screenshot_id: str, anonymize=False) -> dict: """Capture triple (Full + Crop + Fenêtre active) systématique. La fenêtre active est un AJOUT — en cas d'échec, le full + crop sont toujours retournés (fallback gracieux). """ try: with mss.mss() as sct: full_path = os.path.join(self.shots_dir, f"{screenshot_id}_full.png") monitor = sct.monitors[1] sct_img = sct.grab(monitor) img = Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Capture du Crop (Cœur de l'apprentissage qwen3-vl) crop_path = os.path.join(self.shots_dir, f"{screenshot_id}_crop.png") w, h = TARGETED_CROP_SIZE left = max(0, x - w // 2) top = max(0, y - h // 2) crop_img = img.crop((left, top, left + w, top + h)) if anonymize: crop_img = crop_img.filter(ImageFilter.GaussianBlur(radius=4)) # Floutage des données sensibles (conformité AI Act) if BLUR_SENSITIVE: blur_sensitive_regions(img) blur_sensitive_regions(crop_img) img.save(full_path, "PNG", quality=SCREENSHOT_QUALITY) crop_img.save(crop_path, "PNG", quality=SCREENSHOT_QUALITY) # Mise à jour du hash pour le prochain heartbeat self.last_img_hash = self._compute_quick_hash(img) result = {"full": full_path, "crop": crop_path} # --- Capture de la fenêtre active --- # Ajout non-bloquant : enrichit le résultat avec l'image # de la fenêtre seule + métadonnées (titre, rect, clic relatif) window_info = self.capture_active_window(x, y, screenshot_id, full_img=img) if window_info: result["window_capture"] = window_info # QW1 — enrichissement multi-écrans (additif, fallback gracieux) _enrich_with_monitor_info(result) return result except Exception as e: logger.error(f"Erreur Dual Capture: {e}") return {} def capture_active_window( self, x: int, y: int, screenshot_id: str, full_img: Optional[Image.Image] = None, ) -> Optional[Dict[str, Any]]: """Capture l'image de la fenêtre active seule + métadonnées. Stratégie : 1. Obtenir le rectangle de la fenêtre via l'API OS (pywin32 / xdotool / Quartz) 2. Cropper depuis le screenshot plein écran (plus fiable que PrintWindow) 3. Calculer les coordonnées du clic relatives à la fenêtre Args: x, y: coordonnées du clic en pixels écran screenshot_id: identifiant pour le nom de fichier full_img: screenshot plein écran déjà capturé (optionnel, évite une double capture si appelé depuis capture_dual) Returns: Dict avec window_image, window_title, window_rect, click_in_window, window_size — ou None si la fenêtre est introuvable. """ try: from ..window_info_crossplatform import get_active_window_rect rect_info = get_active_window_rect() if not rect_info: logger.debug("Fenêtre active introuvable — skip capture fenêtre") return None win_rect = rect_info["rect"] # [left, top, right, bottom] win_left, win_top, win_right, win_bottom = win_rect win_w, win_h = rect_info["size"] # [width, height] title = rect_info.get("title", "unknown_window") app_name = rect_info.get("app_name", "unknown_app") # Ignorer les fenêtres trop petites (barres de tâches, popups système) if win_w < 50 or win_h < 50: logger.debug(f"Fenêtre trop petite ({win_w}x{win_h}) — skip") return None # Coordonnées du clic relatives à la fenêtre click_rel_x = x - win_left click_rel_y = y - win_top # Si le clic est en dehors de la fenêtre, on le signale mais on continue click_inside = (0 <= click_rel_x <= win_w and 0 <= click_rel_y <= win_h) # --- Crop de la fenêtre depuis le plein écran --- if full_img is None: # Pas de screenshot fourni — en capturer un (cas standalone) try: with mss.mss() as sct: monitor = sct.monitors[1] sct_img = sct.grab(monitor) full_img = Image.frombytes( "RGB", sct_img.size, sct_img.bgra, "raw", "BGRX" ) except Exception as e: logger.error(f"Erreur capture plein écran pour fenêtre : {e}") return None # Borner le crop aux limites de l'image plein écran img_w, img_h = full_img.size crop_left = max(0, win_left) crop_top = max(0, win_top) crop_right = min(img_w, win_right) crop_bottom = min(img_h, win_bottom) if crop_right <= crop_left or crop_bottom <= crop_top: logger.debug("Fenêtre hors écran — skip capture fenêtre") return None window_img = full_img.crop((crop_left, crop_top, crop_right, crop_bottom)) # Floutage conformité AI Act if BLUR_SENSITIVE: blur_sensitive_regions(window_img) # Sauvegarde window_path = os.path.join( self.shots_dir, f"{screenshot_id}_window.png" ) window_img.save(window_path, "PNG", quality=SCREENSHOT_QUALITY) result = { "window_image": window_path, "window_title": title, "app_name": app_name, "window_rect": win_rect, "window_size": [win_w, win_h], "click_in_window": [click_rel_x, click_rel_y], "click_inside_window": click_inside, } # QW1 — enrichissement multi-écrans (additif) _enrich_with_monitor_info(result) logger.debug( f"Fenêtre capturée : {title} ({win_w}x{win_h}) — " f"clic relatif ({click_rel_x}, {click_rel_y})" ) return result except ImportError as e: logger.debug(f"Module fenêtre indisponible : {e}") return None except Exception as e: logger.error(f"Erreur capture fenêtre active : {e}") return None def _compute_quick_hash(self, img: Image) -> str: """Calcule un hash rapide basé sur une vignette réduite pour détecter les changements.""" # On réduit l'image à 64x64 pour comparer les masses de couleurs (très rapide) small_img = img.resize((64, 64), Image.NEAREST).convert("L") return hashlib.md5(small_img.tobytes()).hexdigest()