Files
rpa_vision_v3/agent_v0/agent_v1/vision/capturer.py
Dom 7df51d2c79 snapshot: WIP 5j replay reliability (B1 watchdog + dialog handlers + grounding drift)
Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé,
polls morts ×2). Point de rollback stable.

Contenu:
- agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab
  hotkey fallback, confirm_save Unicode apostrophe, foreground dialog
  recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint,
  requires_post_verify_window_transition
- agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed)
- server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s
- server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan,
  metrics endpoint
- server_v1/replay_engine.py: _schedule_retry préserve original_action +
  dispatched_action
- stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab
  on save_as dialog open) + _attach_expected_window_before
- tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py
- tests/unit: test_executor_verify_window_guard.py (start_button, close_tab,
  runtime_dialog, post_verify, transition fallbacks)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 16:48:37 +02:00

688 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# agent_v1/vision/capturer.py
"""
Gestionnaire de vision avancé pour Agent V1.
Optimisé pour le streaming fibre avec détection de changement.
Captures disponibles :
- Plein écran (full) : contexte global 1920x1080+
- Crop ciblé (crop) : 80x80 autour du clic (apprentissage VLM)
- Fenêtre active (window) : image isolée de la fenêtre + métadonnées
(titre, rect, coordonnées clic relatives) — cross-platform
"""
import os
import time
import logging
import hashlib
import platform
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image, ImageFilter, ImageStat
import mss
from ..config import TARGETED_CROP_SIZE, SCREENSHOT_QUALITY, BLUR_SENSITIVE
from .blur_sensitive import blur_sensitive_regions
logger = logging.getLogger(__name__)
# OS courant (détecté une seule fois)
_SYSTEM = platform.system()
# QW1 — détection multi-écrans (fallback gracieux si screeninfo absent)
try:
from screeninfo import get_monitors as _screeninfo_get_monitors
_SCREENINFO_AVAILABLE = True
except ImportError:
_SCREENINFO_AVAILABLE = False
def _get_monitors_geometry() -> List[Dict[str, Any]]:
"""Retourne la liste des monitors physiques avec leurs offsets.
Returns:
List[dict] : [{idx, x, y, w, h, primary}, ...]. Vide si screeninfo
indisponible (le serveur tombera sur fallback composite).
"""
if not _SCREENINFO_AVAILABLE:
return []
try:
monitors = _screeninfo_get_monitors()
return [
{
"idx": i,
"x": int(m.x),
"y": int(m.y),
"w": int(m.width),
"h": int(m.height),
"primary": bool(getattr(m, "is_primary", False)),
}
for i, m in enumerate(monitors)
]
except Exception:
return []
def _get_active_monitor_index() -> Optional[int]:
"""Retourne l'index logique du monitor où se trouve le curseur (focus actif).
Returns:
int ou None si indéterminable.
"""
if not _SCREENINFO_AVAILABLE:
return None
try:
import pyautogui # import paresseux : évite la dépendance dure
cx, cy = pyautogui.position()
for i, m in enumerate(_screeninfo_get_monitors()):
if m.x <= cx < m.x + m.width and m.y <= cy < m.y + m.height:
return i
except Exception:
return None
return None
def _enrich_with_monitor_info(payload: dict) -> dict:
"""Ajoute monitor_index et monitors_geometry au payload (in-place + return)."""
if isinstance(payload, dict):
payload["monitor_index"] = _get_active_monitor_index()
payload["monitors_geometry"] = _get_monitors_geometry()
return payload
# Garde dimensions monitor (démo GHT 19 mai 2026) : mss.monitors[1] peut
# retourner intermittemment des dims tronquées (cas observé 2560×60). Utiliser
# ces dims pour normaliser des coords empoisonne la mémoire (TargetMemoryStore).
MIN_MONITOR_WIDTH = 200
MIN_MONITOR_HEIGHT = 200
MONITOR_MAX_ATTEMPTS = 2
MONITOR_RETRY_DELAY_S = 0.05
BLACK_FRAME_MEAN_MAX = 1.0
BLACK_FRAME_STDDEV_MAX = 1.0
BLACK_FRAME_MAX_LUMA = 3
def _is_monitor_sane(monitor) -> bool:
"""True si les dims du monitor sont au-dessus du seuil de plausibilité."""
if not isinstance(monitor, dict):
return False
w = monitor.get("width", 0) or 0
h = monitor.get("height", 0) or 0
return w >= MIN_MONITOR_WIDTH and h >= MIN_MONITOR_HEIGHT
def _dim_str(monitor) -> str:
"""Représentation courte WxH pour les logs (gère monitor=None)."""
if not isinstance(monitor, dict):
return "?x?"
return f"{monitor.get('width', '?')}x{monitor.get('height', '?')}"
def _acquire_safe_grab(max_attempts: int = MONITOR_MAX_ATTEMPTS,
retry_delay_s: float = MONITOR_RETRY_DELAY_S,
allow_secondary_fallback: bool = True):
"""Ouvre mss et capture un monitor avec dimensions plausibles.
Stratégie en cascade :
1. À chaque tentative, ouvrir un nouveau `mss.mss()` (peut rafraîchir le
cache interne) et examiner monitors[1..n].
2. Préférer monitors[1] (écran principal physique). Si aberrant ET
`allow_secondary_fallback=True`, prendre le premier monitors[2..n]
sain avec un WARNING explicite.
3. Si `allow_secondary_fallback=False`, on n'accepte QUE monitors[1].
Utile pour les méthodes qui reçoivent des coordonnées (x, y) en
système écran composite : capturer un monitor secondaire produirait
une image saine mais décalée par rapport à ces coords.
4. Si aucune dim plausible : attendre `retry_delay_s` et retenter.
5. Après `max_attempts` infructueuses : log ERROR et retourner
(None, None) pour que l'appelant tombe en sortie d'erreur explicite.
Args:
max_attempts: nombre de tentatives mss avant abandon.
retry_delay_s: délai entre tentatives.
allow_secondary_fallback: si False, refuser monitors[2..n] (fail-closed
pour les méthodes coord-bearing).
Returns:
Tuple (monitor_dict, PIL.Image) si capture saine réussie,
(None, None) sinon.
"""
last_aberrant = None
secondary_seen = False # un monitor secondaire sain a été vu mais refusé
for attempt in range(max_attempts):
with mss.mss() as sct:
monitors = list(sct.monitors) if sct.monitors else []
chosen = None
chosen_idx = None
for idx in range(1, len(monitors)):
candidate = monitors[idx]
if not _is_monitor_sane(candidate):
last_aberrant = candidate
logger.warning(
"Monitor[%d] dims aberrantes (%s, seuil %dx%d) "
"— attempt %d/%d",
idx, _dim_str(candidate),
MIN_MONITOR_WIDTH, MIN_MONITOR_HEIGHT,
attempt + 1, max_attempts,
)
continue
# Monitor sain trouvé
if idx == 1 or allow_secondary_fallback:
chosen = candidate
chosen_idx = idx
break
# Sinon : sain mais secondaire interdit pour cet appelant
secondary_seen = True
logger.warning(
"Monitor[%d] sain (%s) mais fallback secondaire refusé "
"(allow_secondary_fallback=False) — capture cohérente "
"des coords impossible",
idx, _dim_str(candidate),
)
if chosen is not None:
if chosen_idx != 1 or attempt > 0:
logger.warning(
"Capture fallback : monitor[%d] dim=%s, attempt=%d",
chosen_idx, _dim_str(chosen), attempt + 1,
)
sct_img = sct.grab(chosen)
img = Image.frombytes(
"RGB", sct_img.size, sct_img.bgra, "raw", "BGRX",
)
return chosen, img
if attempt < max_attempts - 1:
time.sleep(retry_delay_s)
if secondary_seen and not allow_secondary_fallback:
logger.error(
"Capture abandonnée : monitor[1] aberrant après %d tentatives "
"(dernier vu %s) et fallback secondaire désactivé "
"pour préserver la cohérence des coordonnées",
max_attempts, _dim_str(last_aberrant),
)
else:
logger.error(
"Aucun monitor avec dims plausibles trouvé après %d tentatives "
"(dernier vu : %s, seuil %dx%d) — capture abandonnée",
max_attempts, _dim_str(last_aberrant),
MIN_MONITOR_WIDTH, MIN_MONITOR_HEIGHT,
)
return None, None
def _compute_luma_stats(img: Image.Image) -> Dict[str, float | int]:
"""Retourne des stats simples de luminance pour diagnostiquer un frame noir."""
gray = img.convert("L")
stat = ImageStat.Stat(gray)
min_luma, max_luma = gray.getextrema()
return {
"mean": round(float(stat.mean[0]) if stat.mean else 0.0, 2),
"stddev": round(float(stat.stddev[0]) if stat.stddev else 0.0, 2),
"min": int(min_luma),
"max": int(max_luma),
}
def _is_effectively_black(img: Image.Image) -> bool:
"""Heuristique fail-closed pour refuser un screenshot pratiquement noir."""
stats = _compute_luma_stats(img)
return (
stats["max"] <= BLACK_FRAME_MAX_LUMA
and stats["mean"] <= BLACK_FRAME_MEAN_MAX
and stats["stddev"] <= BLACK_FRAME_STDDEV_MAX
)
def _capture_via_imagegrab() -> Tuple[Optional[Dict[str, int]], Optional[Image.Image], Dict[str, Any]]:
"""Fallback Windows via Pillow/ImageGrab.
Utile quand `mss` retourne un frame noir alors que la session graphique
utilisateur reste visible.
"""
if _SYSTEM != "Windows":
return None, None, {"backend": "imagegrab", "error": "unsupported_platform"}
try:
from PIL import ImageGrab
except ImportError as exc:
return None, None, {"backend": "imagegrab", "error": str(exc)}
try:
img = ImageGrab.grab(all_screens=True)
except Exception as exc:
logger.warning("ImageGrab indisponible pour le fallback capture : %s", exc)
return None, None, {"backend": "imagegrab", "error": str(exc)}
monitor = {"left": 0, "top": 0, "width": img.width, "height": img.height}
return monitor, img, {
"backend": "imagegrab",
"luma": _compute_luma_stats(img),
}
def capture_screen_image(
allow_secondary_fallback: bool = True,
) -> Tuple[Optional[Dict[str, int]], Optional[Image.Image], Dict[str, Any]]:
"""Capture plein écran avec diagnostic noir + fallback Windows.
Returns:
(monitor, image, meta) où image peut être None si aucun backend plein
écran n'a produit une image exploitable.
"""
monitor, img = _acquire_safe_grab(
allow_secondary_fallback=allow_secondary_fallback
)
meta: Dict[str, Any] = {"backend": "mss"}
if img is not None:
meta["luma"] = _compute_luma_stats(img)
if not _is_effectively_black(img):
return monitor, img, meta
logger.warning(
"Capture mss quasi noire (%s) — tentative de fallback",
meta["luma"],
)
meta["mss_black_frame"] = True
else:
meta["mss_unavailable"] = True
fallback_monitor, fallback_img, fallback_meta = _capture_via_imagegrab()
if fallback_img is not None:
if not _is_effectively_black(fallback_img):
logger.warning(
"Capture fallback via ImageGrab (%sx%s)",
fallback_img.width,
fallback_img.height,
)
return fallback_monitor, fallback_img, fallback_meta
logger.warning(
"Capture ImageGrab quasi noire (%s)",
fallback_meta.get("luma"),
)
meta["imagegrab_black_frame"] = True
meta["imagegrab_error"] = fallback_meta.get("error")
return None, None, meta
def _capture_window_image_windows(
hwnd: int,
width: int,
height: int,
) -> Tuple[Optional[Image.Image], Dict[str, Any]]:
"""Capture une fenêtre Windows via PrintWindow.
Fallback utile quand la capture plein écran est noire mais que la fenêtre
active reste imprimable par l'API Win32.
"""
if _SYSTEM != "Windows":
return None, {"backend": "printwindow", "error": "unsupported_platform"}
try:
import ctypes
import win32gui
import win32ui
except ImportError as exc:
return None, {"backend": "printwindow", "error": str(exc)}
last_error = None
for flag in (3, 2, 0):
wnd_dc = None
src_dc = None
mem_dc = None
bmp = None
try:
wnd_dc = win32gui.GetWindowDC(hwnd)
if not wnd_dc:
raise RuntimeError("GetWindowDC a retourné 0")
src_dc = win32ui.CreateDCFromHandle(wnd_dc)
mem_dc = src_dc.CreateCompatibleDC()
bmp = win32ui.CreateBitmap()
bmp.CreateCompatibleBitmap(src_dc, width, height)
mem_dc.SelectObject(bmp)
result = ctypes.windll.user32.PrintWindow(
hwnd, mem_dc.GetSafeHdc(), flag
)
bits = bmp.GetBitmapBits(True)
img = Image.frombuffer(
"RGB", (width, height), bits, "raw", "BGRX", 0, 1
)
luma = _compute_luma_stats(img)
if result or not _is_effectively_black(img):
return img, {
"backend": f"printwindow:{flag}",
"printwindow_result": int(result),
"luma": luma,
}
except Exception as exc:
last_error = str(exc)
finally:
try:
if bmp is not None:
win32gui.DeleteObject(bmp.GetHandle())
except Exception:
pass
try:
if mem_dc is not None:
mem_dc.DeleteDC()
except Exception:
pass
try:
if src_dc is not None:
src_dc.DeleteDC()
except Exception:
pass
try:
if wnd_dc is not None:
win32gui.ReleaseDC(hwnd, wnd_dc)
except Exception:
pass
return None, {
"backend": "printwindow",
"error": last_error or "no_usable_frame",
}
def capture_foreground_window_image() -> Tuple[Optional[Image.Image], Dict[str, Any]]:
"""Capture la fenêtre au focus via API native si disponible."""
try:
from ..window_info_crossplatform import get_active_window_rect
rect_info = get_active_window_rect()
except Exception as exc:
return None, {"backend": "printwindow", "error": str(exc)}
if not rect_info:
return None, {"backend": "printwindow", "error": "active_window_unavailable"}
win_w, win_h = rect_info.get("size", [0, 0])
hwnd = rect_info.get("hwnd")
if not hwnd or win_w <= 0 or win_h <= 0:
return None, {
"backend": "printwindow",
"error": "active_window_handle_unavailable",
"title": rect_info.get("title", "unknown_window"),
}
img, meta = _capture_window_image_windows(hwnd, win_w, win_h)
if img is None:
return None, meta
meta.update(
{
"title": rect_info.get("title", "unknown_window"),
"app_name": rect_info.get("app_name", "unknown_app"),
"rect": rect_info.get("rect"),
"window_size": rect_info.get("size"),
"hwnd": hwnd,
}
)
return img, meta
class VisionCapturer:
def __init__(self, session_dir: str):
self.session_dir = session_dir
self.shots_dir = os.path.join(session_dir, "shots")
os.makedirs(self.shots_dir, exist_ok=True)
# On ne crée plus self.sct ici car mss n'est pas thread-safe sous Windows
self.last_img_hash = None
def capture_full_context(self, name_suffix: str, force=False) -> str:
"""
Capture l'écran complet.
Si force=False, vérifie d'abord si l'écran a changé.
Enrichit les métadonnées avec le titre de la fenêtre active
(utile pour le contextualisation des heartbeats côté serveur).
"""
try:
_monitor, img, meta = capture_screen_image()
if img is None:
img, win_meta = capture_foreground_window_image()
if img is None:
logger.error(
"Capture plein contexte indisponible (meta=%s, window=%s)",
meta,
win_meta,
)
return ""
logger.warning(
"Capture plein contexte dégradée via fenêtre active (%s)",
win_meta.get("backend"),
)
# Détection de changement (pour Heartbeat)
if not force:
current_hash = self._compute_quick_hash(img)
if current_hash == self.last_img_hash:
return "" # Pas de changement, on économise la fibre
self.last_img_hash = current_hash
# Floutage des données sensibles (conformité AI Act)
if BLUR_SENSITIVE:
blur_sensitive_regions(img)
path = os.path.join(self.shots_dir, f"context_{int(time.time())}_{name_suffix}.png")
img.save(path, "PNG", quality=SCREENSHOT_QUALITY)
return path
except Exception as e:
logger.error(f"Erreur Context Capture: {e}")
return ""
def get_active_window_title(self) -> str:
"""Retourne le titre de la fenêtre active (pour enrichir les heartbeats).
Fallback gracieux : retourne une chaîne vide si indisponible.
"""
try:
from ..window_info_crossplatform import get_active_window_info
info = get_active_window_info()
return info.get("title", "")
except Exception:
return ""
def capture_dual(self, x: int, y: int, screenshot_id: str, anonymize=False) -> dict:
"""Capture triple (Full + Crop + Fenêtre active) systématique.
La fenêtre active est un AJOUT — en cas d'échec, le full + crop
sont toujours retournés (fallback gracieux).
"""
try:
# Coords (x, y) sont en système écran composite ; cropper depuis
# un monitor secondaire (offset ≠ 0) produirait une image saine
# mais décalée → fail-closed sur fallback secondaire.
_monitor, img, meta = capture_screen_image(
allow_secondary_fallback=False
)
if img is None:
window_info = self.capture_active_window(
x, y, screenshot_id, full_img=None
)
if window_info:
result = {"window_capture": window_info}
_enrich_with_monitor_info(result)
logger.warning(
"capture_dual dégradée: fenêtre active seule (%s)",
meta,
)
return result
return {}
full_path = os.path.join(self.shots_dir, f"{screenshot_id}_full.png")
# Capture du Crop (Cœur de l'apprentissage qwen3-vl)
crop_path = os.path.join(self.shots_dir, f"{screenshot_id}_crop.png")
w, h = TARGETED_CROP_SIZE
left = max(0, x - w // 2)
top = max(0, y - h // 2)
crop_img = img.crop((left, top, left + w, top + h))
if anonymize:
crop_img = crop_img.filter(ImageFilter.GaussianBlur(radius=4))
# Floutage des données sensibles (conformité AI Act)
if BLUR_SENSITIVE:
blur_sensitive_regions(img)
blur_sensitive_regions(crop_img)
img.save(full_path, "PNG", quality=SCREENSHOT_QUALITY)
crop_img.save(crop_path, "PNG", quality=SCREENSHOT_QUALITY)
# Mise à jour du hash pour le prochain heartbeat
self.last_img_hash = self._compute_quick_hash(img)
result = {"full": full_path, "crop": crop_path}
# --- Capture de la fenêtre active ---
# Ajout non-bloquant : enrichit le résultat avec l'image
# de la fenêtre seule + métadonnées (titre, rect, clic relatif)
window_info = self.capture_active_window(x, y, screenshot_id, full_img=img)
if window_info:
result["window_capture"] = window_info
# QW1 — enrichissement multi-écrans (additif, fallback gracieux)
_enrich_with_monitor_info(result)
return result
except Exception as e:
logger.error(f"Erreur Dual Capture: {e}")
return {}
def capture_active_window(
self,
x: int,
y: int,
screenshot_id: str,
full_img: Optional[Image.Image] = None,
) -> Optional[Dict[str, Any]]:
"""Capture l'image de la fenêtre active seule + métadonnées.
Stratégie :
1. Obtenir le rectangle de la fenêtre via l'API OS (pywin32 / xdotool / Quartz)
2. Cropper depuis le screenshot plein écran (plus fiable que PrintWindow)
3. Calculer les coordonnées du clic relatives à la fenêtre
Args:
x, y: coordonnées du clic en pixels écran
screenshot_id: identifiant pour le nom de fichier
full_img: screenshot plein écran déjà capturé (optionnel, évite une
double capture si appelé depuis capture_dual)
Returns:
Dict avec window_image, window_title, window_rect, click_in_window,
window_size — ou None si la fenêtre est introuvable.
"""
try:
from ..window_info_crossplatform import get_active_window_rect
rect_info = get_active_window_rect()
if not rect_info:
logger.debug("Fenêtre active introuvable — skip capture fenêtre")
return None
win_rect = rect_info["rect"] # [left, top, right, bottom]
win_left, win_top, win_right, win_bottom = win_rect
win_w, win_h = rect_info["size"] # [width, height]
title = rect_info.get("title", "unknown_window")
app_name = rect_info.get("app_name", "unknown_app")
# Ignorer les fenêtres trop petites (barres de tâches, popups système)
if win_w < 50 or win_h < 50:
logger.debug(f"Fenêtre trop petite ({win_w}x{win_h}) — skip")
return None
# Coordonnées du clic relatives à la fenêtre
click_rel_x = x - win_left
click_rel_y = y - win_top
# Si le clic est en dehors de la fenêtre, on le signale mais on continue
click_inside = (0 <= click_rel_x <= win_w and 0 <= click_rel_y <= win_h)
window_img = None
# --- Crop de la fenêtre depuis le plein écran ---
if full_img is None:
# Pas de screenshot fourni — en capturer un (cas standalone).
# win_rect est en coords globales ; cropper depuis un monitor
# secondaire produirait une image décalée → fail-closed sur
# fallback secondaire.
try:
_monitor, full_img, _meta = capture_screen_image(
allow_secondary_fallback=False
)
except Exception as e:
logger.error(f"Erreur capture plein écran pour fenêtre : {e}")
full_img = None
if full_img is not None and not _is_effectively_black(full_img):
img_w, img_h = full_img.size
crop_left = max(0, win_left)
crop_top = max(0, win_top)
crop_right = min(img_w, win_right)
crop_bottom = min(img_h, win_bottom)
if crop_right > crop_left and crop_bottom > crop_top:
window_img = full_img.crop(
(crop_left, crop_top, crop_right, crop_bottom)
)
else:
logger.debug("Fenêtre hors écran — fallback natif si possible")
elif full_img is not None:
logger.warning(
"capture_active_window: screenshot plein écran noir, fallback natif"
)
if window_img is None and rect_info.get("hwnd"):
window_img, native_meta = _capture_window_image_windows(
rect_info["hwnd"], win_w, win_h
)
if window_img is not None:
logger.warning(
"capture_active_window via fallback natif (%s)",
native_meta.get("backend"),
)
if window_img is None:
logger.debug("Fenêtre hors écran ou capture native indisponible")
return None
# Floutage conformité AI Act
if BLUR_SENSITIVE:
blur_sensitive_regions(window_img)
# Sauvegarde
window_path = os.path.join(
self.shots_dir, f"{screenshot_id}_window.png"
)
window_img.save(window_path, "PNG", quality=SCREENSHOT_QUALITY)
result = {
"window_image": window_path,
"window_title": title,
"app_name": app_name,
"window_rect": win_rect,
"window_size": [win_w, win_h],
"click_in_window": [click_rel_x, click_rel_y],
"click_inside_window": click_inside,
}
# QW1 — enrichissement multi-écrans (additif)
_enrich_with_monitor_info(result)
logger.debug(
f"Fenêtre capturée : {title} ({win_w}x{win_h}) — "
f"clic relatif ({click_rel_x}, {click_rel_y})"
)
return result
except ImportError as e:
logger.debug(f"Module fenêtre indisponible : {e}")
return None
except Exception as e:
logger.error(f"Erreur capture fenêtre active : {e}")
return None
def _compute_quick_hash(self, img: Image) -> str:
"""Calcule un hash rapide basé sur une vignette réduite pour détecter les changements."""
# On réduit l'image à 64x64 pour comparer les masses de couleurs (très rapide)
small_img = img.resize((64, 64), Image.NEAREST).convert("L")
return hashlib.md5(small_img.tobytes()).hexdigest()