Files
rpa_vision_v3/agent_v0/server_v1/resolve_engine.py
Dom b92cb9db03 feat: Phase 1 apprentissage — greffe TargetMemoryStore sur V4
Greffe minimale du mécanisme d'apprentissage persistant (Fiche #18,
target_memory_store.py) sur le pipeline streaming V4 sans toucher à V3.

Architecture (docs/PLAN_APPRENTISSAGE_LEA.md) :
- Lookup mémoire AVANT la cascade résolution coûteuse OCR/template/VLM
  dans _resolve_target_sync → hit = <10ms, miss = overhead zéro
- Record APRÈS validation post-condition (title_match strict)
  dans /replay/result → 2 succès → cristallisation par répétition
- Single source of truth : l'agent remplit report.actual_position avec
  les coords effectivement cliquées, le serveur les lit directement.
  Pas de cache intermédiaire (option C du plan).

Signature écran V4 : sha256(normalize(window_title))[:16]. Robuste aux
données variables, faux positifs rattrapés par le post-cond qui
décrémente la fiabilité via record_failure().

Fichiers :
- agent_v0/server_v1/replay_memory.py : nouveau wrapper 316 lignes
  exposant compute_screen_sig/memory_lookup/record_success/failure,
  lazy-init du store, normalisation texte stable, garde sanity coords
- agent_v0/server_v1/resolve_engine.py : lookup mémoire en tête de
  _resolve_target_sync (30 lignes)
- agent_v0/server_v1/replay_engine.py : _create_replay_state stocke
  une copie slim des actions (sans anchor base64) pour retrouver le
  target_spec par current_action_index
- agent_v0/server_v1/api_stream.py : 4 callers passent actions=...,
  record success/failure dans /replay/result lit actual_position
  du rapport (click-only), correction du commentaire Pydantic
- agent_v0/agent_v1/core/executor.py : remplit result["actual_position"]
  après self._click(), transmis dans le report de poll_and_execute

Tests : 56 E2E + Phase0 passent, zéro régression. Cycle Phase 1 validé
en simulation : miss → record → miss → record → HIT au 3ème passage.

Le deploy copy executor.py a une divergence pré-existante de 1302
lignes non committées — traité séparément lors du cleanup prochain.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 21:08:14 +02:00

2214 lines
85 KiB
Python

# agent_v0/server_v1/resolve_engine.py
"""
Résolution visuelle des cibles UI pour le replay.
Contient toutes les stratégies de résolution :
- Template matching OpenCV (~100ms)
- YOLO/OmniParser (~0.6-0.8s)
- VLM Quick Find (~3-8s)
- VLM Grounding Direct (~5-15s)
- SomEngine + VLM (~5-15s)
- Matching sémantique ScreenAnalyzer (~15-20s)
- Pré-analyse écran (Observer — popup detection)
Extrait de api_stream.py pour clarifier l'architecture.
"""
import base64
import io
import logging
import os
import re
import tempfile
import threading
import time
from typing import Any, Dict, List, Optional
from pydantic import BaseModel
logger = logging.getLogger("api_stream")
# =========================================================================
# Modèles Pydantic
# =========================================================================
class ResolveTargetRequest(BaseModel):
"""Requête de résolution visuelle d'une cible."""
session_id: str
screenshot_b64: str # Screenshot JPEG en base64
target_spec: Dict[str, Any] # {by_role, by_text, by_position, ...}
fallback_x_pct: float = 0.0 # Coordonnées de fallback
fallback_y_pct: float = 0.0
screen_width: int = 1920
screen_height: int = 1080
strict_mode: bool = False # True pour replay sessions (seuil template 0.90 + YOLO)
class PreAnalyzeRequest(BaseModel):
"""Requête de pré-analyse écran (Observer)."""
screenshot_b64: str
expected_state: str = "" # Description attendue de l'état écran
window_title: str = "" # Titre fenêtre attendu
screen_width: int = 1920
screen_height: int = 1080
# =========================================================================
# Template Matching
# =========================================================================
def _resolve_by_template_matching(
screenshot_path: str,
anchor_image_b64: str,
screen_width: int,
screen_height: int,
confidence_threshold: float = 0.7,
) -> Optional[Dict[str, Any]]:
"""Résoudre la position d'une ancre par template matching OpenCV.
Compare l'image de l'ancre (crop) avec le screenshot actuel pour trouver
la meilleure correspondance. Utilise cv2.matchTemplate avec TM_CCOEFF_NORMED.
Args:
screenshot_path: Chemin du screenshot de l'écran actuel
anchor_image_b64: Image de l'ancre encodée en base64 (PNG)
screen_width: Largeur de l'écran en pixels
screen_height: Hauteur de l'écran en pixels
confidence_threshold: Seuil minimum de confiance (0.0 à 1.0)
Returns:
Dict avec resolved=True et coordonnées, ou None si pas de match
"""
try:
import cv2
import numpy as np
except ImportError:
logger.warning("OpenCV non disponible pour template matching")
return None
try:
# Charger le screenshot
screenshot = cv2.imread(screenshot_path)
if screenshot is None:
logger.warning("Impossible de lire le screenshot : %s", screenshot_path)
return None
# Décoder l'image de l'ancre depuis base64
anchor_bytes = base64.b64decode(anchor_image_b64)
anchor_array = np.frombuffer(anchor_bytes, dtype=np.uint8)
anchor_img = cv2.imdecode(anchor_array, cv2.IMREAD_COLOR)
if anchor_img is None:
logger.warning("Impossible de décoder l'image de l'ancre")
return None
# Convertir en niveaux de gris pour le matching
screenshot_gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY)
anchor_gray = cv2.cvtColor(anchor_img, cv2.COLOR_BGR2GRAY)
# Vérifier que l'ancre n'est pas plus grande que le screenshot
sh, sw = screenshot_gray.shape[:2]
ah, aw = anchor_gray.shape[:2]
if ah > sh or aw > sw:
logger.warning(
"Ancre (%dx%d) plus grande que le screenshot (%dx%d)",
aw, ah, sw, sh,
)
return None
# Template matching multi-échelle : essayer l'échelle 1.0 d'abord,
# puis quelques variations si la résolution a changé.
# Plage étendue 0.5x-2.0x pour couvrir les écarts importants
# (ex: apprentissage 2560x1600 → replay 1280x720 = ratio ~0.5x)
best_val = -1.0
best_loc = None
best_scale = 1.0
best_anchor_size = (aw, ah)
for scale in [1.0, 0.9, 1.1, 0.8, 1.2, 0.75, 1.25, 0.6, 1.5, 0.5, 1.75, 2.0]:
if scale != 1.0:
new_w = int(aw * scale)
new_h = int(ah * scale)
if new_w < 10 or new_h < 10 or new_w > sw or new_h > sh:
continue
scaled_anchor = cv2.resize(anchor_gray, (new_w, new_h))
else:
scaled_anchor = anchor_gray
new_w, new_h = aw, ah
result = cv2.matchTemplate(screenshot_gray, scaled_anchor, cv2.TM_CCOEFF_NORMED)
_, max_val, _, max_loc = cv2.minMaxLoc(result)
if max_val > best_val:
best_val = max_val
best_loc = max_loc
best_scale = scale
best_anchor_size = (new_w, new_h)
# Si on a un très bon match, pas besoin de continuer
if best_val >= 0.95:
break
if best_val < confidence_threshold:
logger.info(
"Template matching : meilleur score=%.3f < seuil=%.3f (ancre %dx%d, écran %dx%d)",
best_val, confidence_threshold, aw, ah, sw, sh,
)
return None
# Calculer le centre du match
match_w, match_h = best_anchor_size
cx = best_loc[0] + match_w / 2.0
cy = best_loc[1] + match_h / 2.0
# Convertir en proportions normalisées
x_pct = round(cx / sw, 6) if sw > 0 else 0.0
y_pct = round(cy / sh, 6) if sh > 0 else 0.0
logger.info(
"Template matching OK : score=%.3f, échelle=%.2f, "
"centre=(%d, %d) → (%.4f, %.4f) sur %dx%d",
best_val, best_scale, int(cx), int(cy), x_pct, y_pct, sw, sh,
)
return {
"resolved": True,
"method": "template_matching",
"x_pct": x_pct,
"y_pct": y_pct,
"matched_element": {
"label": f"anchor_template",
"type": "visual_anchor",
"role": "anchor",
"center": [int(cx), int(cy)],
"confidence": best_val,
},
"score": best_val,
"scale": best_scale,
"match_box": {
"x": best_loc[0],
"y": best_loc[1],
"width": match_w,
"height": match_h,
},
}
except Exception as e:
logger.error("Erreur template matching : %s", e)
return None
def _validate_match_context(
result: Dict[str, Any],
original_x_pct: float,
original_y_pct: float,
target_spec: Dict[str, Any],
max_distance: float = 0.35,
) -> bool:
"""Vérifier que la position trouvée est dans la même zone que l'originale.
Évite les faux positifs du template matching : un bouton similaire visuellement
mais situé dans une zone très différente de l'écran.
Args:
result: Résultat du template matching (contient x_pct, y_pct).
original_x_pct: Position X originale (pourcentage, 0.0-1.0).
original_y_pct: Position Y originale (pourcentage, 0.0-1.0).
target_spec: Spécification de la cible (non utilisé pour l'instant,
mais disponible pour des règles contextuelles futures).
max_distance: Distance euclidienne maximum acceptée (en pourcentage de l'écran).
Défaut 0.35 = ~35% de la diagonale, assez permissif pour les UI dynamiques.
Returns:
True si la position est valide (même zone), False sinon.
"""
found_x = result.get("x_pct", 0.0)
found_y = result.get("y_pct", 0.0)
# Distance euclidienne en pourcentage de l'écran
dx = found_x - original_x_pct
dy = found_y - original_y_pct
distance = (dx ** 2 + dy ** 2) ** 0.5
if distance > max_distance:
logger.debug(
"Context validation : distance=%.3f > max=%.3f "
"(found=(%.3f, %.3f), original=(%.3f, %.3f))",
distance, max_distance, found_x, found_y, original_x_pct, original_y_pct,
)
return False
return True
# =========================================================================
# YOLO/OmniParser — Résolution par détection d'éléments UI
# =========================================================================
# Chargement paresseux d'OmniParser (singleton, GPU)
_omniparser_available: Optional[bool] = None # None = pas encore vérifié
_omniparser_instance = None
_omniparser_lock = threading.Lock()
def _get_omniparser():
"""Obtenir l'instance OmniParser (lazy loading, thread-safe).
Returns:
OmniParserAdapter ou None si non disponible.
"""
global _omniparser_available, _omniparser_instance
if _omniparser_available is False:
return None
if _omniparser_instance is not None:
return _omniparser_instance
with _omniparser_lock:
if _omniparser_available is False:
return None
if _omniparser_instance is not None:
return _omniparser_instance
try:
from core.detection.omniparser_adapter import OmniParserAdapter
adapter = OmniParserAdapter()
if adapter.available:
_omniparser_instance = adapter
_omniparser_available = True
logger.info("OmniParser disponible pour la résolution YOLO")
return adapter
else:
_omniparser_available = False
logger.info("OmniParser : modèles non trouvés, YOLO désactivé")
return None
except ImportError:
_omniparser_available = False
logger.info("OmniParser non installé, YOLO désactivé")
return None
except Exception as e:
_omniparser_available = False
logger.warning("OmniParser init échouée : %s", e)
return None
def _resolve_by_yolo(
screenshot_path: str,
anchor_image_b64: str,
screen_width: int,
screen_height: int,
target_spec: Dict[str, Any],
) -> Optional[Dict[str, Any]]:
"""Résolution via YOLO/OmniParser : détecte tous les éléments UI
puis matche le crop de référence contre les éléments détectés.
Stratégie :
1. OmniParser détecte tous les éléments UI du screenshot (~0.6-0.8s)
2. Pour chaque élément détecté, template matching local contre l'anchor
3. Si 1 seul bon match (score >= 0.50) → accepter
4. Si 2+ matchs ambigus → retourner None (le VLM tranchera)
Args:
screenshot_path: Chemin vers le screenshot JPEG
anchor_image_b64: Image de l'anchor encodée en base64
screen_width: Largeur de l'écran
screen_height: Hauteur de l'écran
target_spec: Spécification de la cible
Returns:
Dict avec resolved=True/False, x_pct, y_pct, score
ou None si OmniParser pas disponible ou aucun match
"""
try:
import cv2
import numpy as np
except ImportError:
return None
omniparser = _get_omniparser()
if omniparser is None:
return None
t0 = time.time()
try:
from PIL import Image as PILImage
# Charger le screenshot en PIL
screenshot_pil = PILImage.open(screenshot_path)
sw, sh = screenshot_pil.size
# Charger le screenshot en numpy/OpenCV pour le template matching
screenshot_np = np.array(screenshot_pil)
if len(screenshot_np.shape) == 3 and screenshot_np.shape[2] == 3:
# PIL est RGB, convertir en BGR pour OpenCV
screenshot_bgr = cv2.cvtColor(screenshot_np, cv2.COLOR_RGB2BGR)
else:
screenshot_bgr = screenshot_np
screenshot_gray = cv2.cvtColor(screenshot_bgr, cv2.COLOR_BGR2GRAY)
# Décoder l'anchor depuis base64
anchor_bytes = base64.b64decode(anchor_image_b64)
anchor_array = np.frombuffer(anchor_bytes, dtype=np.uint8)
anchor_img = cv2.imdecode(anchor_array, cv2.IMREAD_COLOR)
if anchor_img is None:
logger.warning("YOLO resolve : impossible de décoder l'anchor")
return None
anchor_gray = cv2.cvtColor(anchor_img, cv2.COLOR_BGR2GRAY)
anchor_h, anchor_w = anchor_gray.shape[:2]
# Détecter tous les éléments UI avec OmniParser
elements = omniparser.detect(screenshot_pil)
if not elements:
elapsed = time.time() - t0
logger.info("YOLO resolve : 0 éléments détectés (%.1fs)", elapsed)
return None
logger.info(
"YOLO resolve : %d éléments détectés, matching anchor %dx%d...",
len(elements), anchor_w, anchor_h,
)
# Matcher l'anchor contre chaque élément détecté
YOLO_MATCH_THRESHOLD = 0.50
matches = []
for elem in elements:
x1, y1, x2, y2 = elem.bbox
elem_w = x2 - x1
elem_h = y2 - y1
# Ignorer les éléments trop petits
if elem_w < 5 or elem_h < 5:
continue
# Extraire le crop de l'élément depuis le screenshot
elem_crop = screenshot_gray[y1:y2, x1:x2]
if elem_crop.size == 0:
continue
# Template matching local : resize anchor pour matcher la taille de l'élément
# ou inversement, selon les dimensions relatives
try:
# Approche : resize l'anchor à la taille du crop et comparer
if elem_w > 0 and elem_h > 0:
anchor_resized = cv2.resize(anchor_gray, (elem_w, elem_h))
result = cv2.matchTemplate(
elem_crop, anchor_resized, cv2.TM_CCOEFF_NORMED
)
_, max_val, _, _ = cv2.minMaxLoc(result)
else:
continue
# Aussi essayer le crop à la taille de l'anchor si c'est plus grand
if elem_w >= anchor_w and elem_h >= anchor_h:
result2 = cv2.matchTemplate(
elem_crop, anchor_gray, cv2.TM_CCOEFF_NORMED
)
_, max_val2, _, _ = cv2.minMaxLoc(result2)
max_val = max(max_val, max_val2)
if max_val >= YOLO_MATCH_THRESHOLD:
matches.append((elem, max_val))
except cv2.error:
continue
elapsed = time.time() - t0
if not matches:
logger.info(
"YOLO resolve : aucun match >= %.2f parmi %d éléments (%.1fs)",
YOLO_MATCH_THRESHOLD, len(elements), elapsed,
)
return None
# Trier par score décroissant
matches.sort(key=lambda m: m[1], reverse=True)
best_elem, best_score = matches[0]
# Si 2+ matchs avec des scores proches (< 0.10 d'écart), c'est ambigu
# → laisser le VLM trancher
if len(matches) >= 2:
second_score = matches[1][1]
if best_score - second_score < 0.10:
logger.info(
"YOLO resolve : %d matchs ambigus (best=%.3f, second=%.3f, "
"écart=%.3f < 0.10), VLM requis (%.1fs)",
len(matches), best_score, second_score,
best_score - second_score, elapsed,
)
return None
# 1 seul match clair → accepter
cx, cy = best_elem.center
x_pct = round(cx / sw, 6) if sw > 0 else 0.0
y_pct = round(cy / sh, 6) if sh > 0 else 0.0
logger.info(
"YOLO resolve OK : '%s' (%s) score=%.3f → (%.4f, %.4f) "
"parmi %d éléments, %d matchs (%.1fs)",
best_elem.label, best_elem.element_type, best_score,
x_pct, y_pct, len(elements), len(matches), elapsed,
)
return {
"resolved": True,
"method": "yolo_omniparser",
"x_pct": x_pct,
"y_pct": y_pct,
"matched_element": {
"label": best_elem.label,
"type": best_elem.element_type,
"role": "yolo_detected",
"center": [cx, cy],
"confidence": best_score,
},
"score": best_score,
"yolo_elements_count": len(elements),
"yolo_matches_count": len(matches),
}
except Exception as e:
elapsed = time.time() - t0
logger.warning("YOLO resolve : exception (%.1fs) — %s", elapsed, e)
return None
# =========================================================================
# VLM Quick Find — Fallback léger quand le template matching échoue
# =========================================================================
# Client Ollama singleton (initialisé au premier appel, pas au démarrage)
_vlm_client = None
_vlm_client_lock = threading.Lock()
# Timeout dédié pour le VLM Quick Find (plus court que le timeout par défaut)
_VLM_QUICK_FIND_TIMEOUT = 30 # secondes
def _get_vlm_client():
"""Obtenir ou créer le client Ollama singleton pour le VLM Quick Find.
Initialisation paresseuse : le client n'est créé qu'au premier appel,
pas au démarrage du serveur (évite de bloquer si Ollama est down).
Le modèle est résolu automatiquement via vlm_config (RPA_VLM_MODEL).
"""
global _vlm_client
if _vlm_client is not None:
return _vlm_client
with _vlm_client_lock:
if _vlm_client is not None:
return _vlm_client
try:
from core.detection.ollama_client import OllamaClient
from core.detection.vlm_config import get_vlm_model
_model = get_vlm_model()
_vlm_client = OllamaClient(
endpoint="http://localhost:11434",
model=_model,
timeout=_VLM_QUICK_FIND_TIMEOUT,
)
logger.info("VLM Quick Find : client Ollama initialisé (%s)", _model)
except Exception as e:
logger.warning(f"VLM Quick Find : impossible d'initialiser le client Ollama : {e}")
return None
return _vlm_client
def _build_target_description(target_spec: Dict[str, Any]) -> str:
"""Construire une description textuelle de l'élément à trouver.
Utilisé par le VLM Quick Find pour savoir quoi chercher sur le screenshot.
Args:
target_spec: Spécification de la cible (by_text, by_role, etc.)
Returns:
Description en langage naturel, ex: "un bouton contenant 'Valider'"
"""
by_text = target_spec.get("by_text", "").strip()
by_role = target_spec.get("by_role", "").strip()
if by_text and by_role:
return f"un {by_role} contenant '{by_text}'"
elif by_text:
return f"élément contenant le texte '{by_text}'"
elif by_role:
return f"un {by_role}"
else:
return "l'élément interactif principal"
def _vlm_quick_find(
screenshot_path: str,
target_description: str,
anchor_image_b64: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Demander au VLM de localiser un élément sur le screenshot.
Stratégie VLM-first pour le replay : le VLM comprend le contexte
de l'écran et peut trouver un élément même si l'apparence a changé.
Modes de fonctionnement :
- Avec anchor_image_b64 + description : multi-image (screenshot + crop de référence).
Le VLM voit le screenshot ET le crop, ce qui est beaucoup plus précis.
- Avec description seule : single-image, le VLM cherche par la description textuelle.
- Avec anchor_image_b64 seule (pas de description) : multi-image avec prompt visuel pur.
Args:
screenshot_path: Chemin du screenshot actuel
target_description: Description riche de l'élément à trouver.
Ex: "Dans la fenêtre 'Exécuter', l'élément cliqué en bas au centre"
anchor_image_b64: Image de référence (crop) en base64 (optionnel).
Si fourni, envoyé comme seconde image au VLM pour comparaison visuelle.
Returns:
{"x_pct": float, "y_pct": float, "confidence": float, "method": "vlm_quick_find"}
ou None si l'élément n'est pas trouvé ou en cas d'erreur
"""
client = _get_vlm_client()
if client is None:
logger.debug("VLM Quick Find : client Ollama non disponible, skip")
return None
t0 = time.time()
# Construire le prompt adapté selon les informations disponibles
has_anchor = bool(anchor_image_b64)
has_description = bool(target_description and target_description.strip())
if has_anchor and has_description:
# Mode optimal : screenshot + crop de référence + description textuelle
prompt = (
"The first image is the current screen. "
"The second image shows the element I want to click.\n\n"
f"Context: {target_description}\n\n"
"Find this exact element on the screen and return its CENTER coordinates "
"as percentage of the screen dimensions.\n"
'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n'
'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}'
)
elif has_anchor:
# Mode visuel pur : screenshot + crop, pas de description
prompt = (
"The first image is the current screen. "
"The second image shows the element I want to click.\n\n"
"Find this exact element on the screen and return its CENTER coordinates "
"as percentage of the screen dimensions.\n"
'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n'
'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}'
)
else:
# Mode description seule
prompt = (
"Look at this screenshot carefully.\n\n"
f"{target_description}\n\n"
"Find this element and return its CENTER coordinates "
"as percentage of the image dimensions.\n"
'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n'
'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}'
)
system_prompt = "You are a UI element locator. Output raw JSON only. No explanation."
try:
# Préparer les images supplémentaires (anchor crop)
extra_images = [anchor_image_b64] if has_anchor else None
result = client.generate(
prompt=prompt,
image_path=screenshot_path,
system_prompt=system_prompt,
temperature=0.1,
max_tokens=200,
force_json=False,
extra_images_b64=extra_images,
)
elapsed = time.time() - t0
if not result.get("success"):
logger.info(
"VLM Quick Find : échec appel VLM (%.1fs) — %s",
elapsed, result.get("error", "?"),
)
return None
response_text = result.get("response", "").strip()
if not response_text:
logger.info("VLM Quick Find : réponse vide du VLM (%.1fs)", elapsed)
return None
# Parser la réponse JSON (réutiliser le parser robuste d'OllamaClient)
parsed = client._extract_json_from_response(response_text)
if parsed is None:
logger.info(
"VLM Quick Find : réponse non-JSON (%.1fs) — %.80s",
elapsed, response_text,
)
return None
# Valider les coordonnées
x_pct = parsed.get("x_pct")
y_pct = parsed.get("y_pct")
confidence = float(parsed.get("confidence", 0.0))
if x_pct is None or y_pct is None or confidence < 0.3:
logger.info(
"VLM Quick Find : élément non trouvé ou confiance trop basse "
"(%.1fs, confidence=%.2f) pour '%s'",
elapsed, confidence,
target_description[:80] if target_description else "(anchor only)",
)
return None
x_pct = float(x_pct)
y_pct = float(y_pct)
# Vérifier que les coordonnées sont dans les bornes [0, 1]
if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0):
logger.info(
"VLM Quick Find : coordonnées hors bornes (%.4f, %.4f), ignoré",
x_pct, y_pct,
)
return None
mode_str = "multi-image" if has_anchor else "description"
desc_short = (target_description[:60] + "...") if target_description and len(target_description) > 60 else (target_description or "(anchor)")
logger.info(
"VLM Quick Find OK [%s] : '%s' → (%.4f, %.4f) confidence=%.2f en %.1fs",
mode_str, desc_short, x_pct, y_pct, confidence, elapsed,
)
return {
"resolved": True,
"method": "vlm_quick_find",
"x_pct": round(x_pct, 6),
"y_pct": round(y_pct, 6),
"matched_element": {
"label": target_description or "anchor_visual",
"type": "vlm_located",
"role": "vlm_quick_find",
"confidence": confidence,
},
"score": confidence,
}
except Exception as e:
elapsed = time.time() - t0
logger.warning(
"VLM Quick Find : exception (%.1fs) — %s", elapsed, e,
)
return None
# ---------------------------------------------------------------------------
# Résolution par VLM Grounding Direct (configurable via RPA_VLM_MODEL)
# ---------------------------------------------------------------------------
def _resolve_by_grounding(
screenshot_path: str,
target_spec: Dict[str, Any],
screen_width: int,
screen_height: int,
) -> Optional[Dict[str, Any]]:
"""Résoudre une cible via grounding VLM direct.
Le modèle VLM (gemma4:e4b par défaut, configurable via RPA_VLM_MODEL)
reçoit le screenshot + une description textuelle et retourne
directement les coordonnées de l'élément. Pas de SomEngine,
pas de numérotation — le VLM fait du grounding UI natif.
Approche plus fiable que SomEngine+VLM pour les icônes et éléments
visuels sans texte (logo Windows, disquette, bouton fermer).
"""
t0 = time.time()
# Construire la description de la cible
by_text = target_spec.get("by_text", "").strip()
vlm_desc = target_spec.get("vlm_description", "").strip()
window_title = target_spec.get("window_title", "").strip()
if by_text:
description = by_text
elif vlm_desc:
description = vlm_desc
else:
return None
# Utiliser la capture fenêtre si disponible (plus ciblée, moins de bruit)
# Sinon fallback sur le full screen
window_capture = target_spec.get("window_capture", {})
window_rect = window_capture.get("rect") # [x1, y1, x2, y2] écran
try:
from PIL import Image as PILImage
from pathlib import Path
# Utiliser la fenêtre active : cropper depuis le screenshot full
# via window_rect (fonctionne au replay comme à l'enregistrement)
img = PILImage.open(screenshot_path)
if window_rect:
x1, y1, x2, y2 = window_rect
img = img.crop((x1, y1, x2, y2))
using_window = True
logger.debug("Grounding : crop fenêtre (%d,%d,%d,%d) → %dx%d", x1, y1, x2, y2, *img.size)
else:
using_window = False
orig_w, orig_h = img.size
small_w, small_h = orig_w, orig_h # pas de redimensionnement
buf = io.BytesIO()
img.save(buf, format="JPEG", quality=80)
shot_b64 = base64.b64encode(buf.getvalue()).decode()
except Exception as e:
logger.warning("Grounding : erreur chargement image — %s", e)
return None
# Prompt natif Qwen2.5-VL — format bbox_2d (le seul fiable)
# Ajouter la position relative pour désambiguïser (ex: deux "Rechercher" à l'écran)
original_pos = target_spec.get("original_position", {})
pos_hint = ""
y_rel = original_pos.get("y_relative", "")
x_rel = original_pos.get("x_relative", "")
if y_rel or x_rel:
pos_hint = f" located {y_rel} {x_rel} of the screen".strip()
prompt = f"Detect '{description}'{pos_hint} in this image with a bounding box."
# Le grounding nécessite un modèle entraîné pour les coordonnées (bbox_2d).
# Qwen2.5-VL est le seul qui retourne des positions précises.
# gemma4 comprend les images mais ne sait pas localiser en coordonnées.
_grounding_model = os.environ.get("RPA_GROUNDING_MODEL", "qwen2.5vl:7b")
# Appel VLM — vLLM (GPU, rapide) en priorité, Ollama en fallback
import requests as _requests
content = ""
# Port vLLM configurable via env
_vllm_port = os.environ.get("VLLM_PORT", "8100")
_vllm_model = os.environ.get("VLLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct-AWQ")
# Essai 1 : vLLM (API OpenAI-compatible, GPU)
try:
vllm_resp = _requests.post(
f"http://localhost:{_vllm_port}/v1/chat/completions",
json={
"model": _vllm_model,
"messages": [
{"role": "system", "content": "You locate UI elements on screenshots. Return coordinates."},
{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{shot_b64}"}},
]},
],
"temperature": 0.1,
"max_tokens": 80,
},
timeout=30,
)
if vllm_resp.ok:
content = vllm_resp.json().get("choices", [{}])[0].get("message", {}).get("content", "")
if content:
logger.debug("Grounding via vLLM OK")
except Exception as e:
logger.debug("vLLM non disponible (%s), fallback Ollama", e)
# Essai 2 : Ollama (qwen2.5vl:7b pour le grounding — format bbox_2d natif)
if not content:
try:
resp = _requests.post("http://localhost:11434/api/chat", json={
"model": _grounding_model,
"messages": [
{"role": "user", "content": prompt, "images": [shot_b64]},
],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 100},
}, timeout=60)
content = resp.json().get("message", {}).get("content", "")
except Exception as e:
logger.info("Grounding VLM timeout/erreur : %s", e)
return None
elapsed = time.time() - t0
# Parser la réponse — supporte bbox_2d en pixels, JSON %, arrays bruts
x_pct, y_pct = None, None
# Format 1 : bbox_2d en pixels [x, y] ou [x1, y1, x2, y2]
bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', content)
if bbox_match:
coords = [float(v.strip()) for v in bbox_match.group(1).split(",")]
if len(coords) == 2:
x_pct = coords[0] / small_w
y_pct = coords[1] / small_h
elif len(coords) >= 4:
x_pct = (coords[0] + coords[2]) / 2 / small_w
y_pct = (coords[1] + coords[3]) / 2 / small_h
# Format 2 : JSON {"x": 0.XX, "y": 0.YY}
if x_pct is None:
json_match = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content)
if json_match:
x_val, y_val = float(json_match.group(1)), float(json_match.group(2))
# Si > 1, c'est en pixels
if x_val > 1:
x_pct = x_val / small_w
y_pct = y_val / small_h
else:
x_pct = x_val
y_pct = y_val
# Format 3 : {"x_pct": 0.XX, "y_pct": 0.YY}
if x_pct is None:
pct_match = re.search(r'"x_pct"\s*:\s*([\d.]+).*?"y_pct"\s*:\s*([\d.]+)', content)
if pct_match:
x_pct = float(pct_match.group(1))
y_pct = float(pct_match.group(2))
# Format 4 : array brut [x1, y1, x2, y2] ou [x, y]
if x_pct is None:
arr_match = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content)
if arr_match:
vals = [float(v) for v in arr_match.groups() if v is not None]
if len(vals) >= 4:
x_pct = (vals[0] + vals[2]) / 2 / small_w
y_pct = (vals[1] + vals[3]) / 2 / small_h
elif len(vals) == 2:
x_pct = vals[0] / small_w
y_pct = vals[1] / small_h
if x_pct is None or y_pct is None:
# Fallback multi-image : screenshot + crop → grounding sans description
anchor_b64 = target_spec.get("anchor_image_base64", "")
if anchor_b64:
try:
prompt_mi = (
"Image 1 is a screenshot. Image 2 shows a UI element.\n"
"Find where Image 2 appears on Image 1.\n"
'Return position: {"x": NNN, "y": NNN} in pixels of Image 1.'
)
resp2 = _requests.post("http://localhost:11434/api/chat", json={
"model": _grounding_model,
"messages": [
{"role": "user", "content": prompt_mi, "images": [shot_b64, anchor_b64]},
],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 50},
}, timeout=60)
content2 = resp2.json().get("message", {}).get("content", "")
elapsed = time.time() - t0
# Parser tous les formats
arr2 = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content2)
if arr2:
vals = [float(v) for v in arr2.groups() if v is not None]
if len(vals) >= 4:
x_pct = (vals[0] + vals[2]) / 2 / small_w
y_pct = (vals[1] + vals[3]) / 2 / small_h
elif len(vals) == 2:
x_pct = vals[0] / small_w
y_pct = vals[1] / small_h
if x_pct is None:
json2 = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content2)
if json2:
x_pct = float(json2.group(1)) / small_w
y_pct = float(json2.group(2)) / small_h
if x_pct is not None:
logger.info("Grounding multi-image OK (%.1fs)", elapsed)
except Exception as e:
logger.debug("Grounding multi-image erreur: %s", e)
if x_pct is None or y_pct is None:
logger.info(
"Grounding : réponse non parsable (%.1fs) — %s",
elapsed, content[:120],
)
return None
# Valider les bornes
if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0):
logger.info("Grounding : coordonnées hors bornes (%.3f, %.3f)", x_pct, y_pct)
return None
# Convertir coordonnées fenêtre → coordonnées écran
if using_window and window_rect:
win_x1, win_y1, win_x2, win_y2 = window_rect
win_w = win_x2 - win_x1
win_h = win_y2 - win_y1
# x_pct/y_pct sont relatifs à la fenêtre, convertir en relatif à l'écran
abs_x = win_x1 + x_pct * win_w
abs_y = win_y1 + y_pct * win_h
x_pct = abs_x / screen_width
y_pct = abs_y / screen_height
logger.info(
"Grounding OK [%s/window] : '%s' → (%.4f, %.4f) en %.1fs",
_grounding_model, description[:50], x_pct, y_pct, elapsed,
)
else:
logger.info(
"Grounding OK [%s/full] : '%s' → (%.4f, %.4f) en %.1fs",
_grounding_model, description[:50], x_pct, y_pct, elapsed,
)
return {
"resolved": True,
"method": "grounding_vlm",
"x_pct": round(x_pct, 6),
"y_pct": round(y_pct, 6),
"matched_element": {
"label": description[:60],
"type": "grounding",
"role": "grounding_vlm",
"confidence": 0.85,
},
"score": 0.85,
}
# ---------------------------------------------------------------------------
# Résolution Set-of-Mark : SomEngine (détection) + VLM (identification)
# ---------------------------------------------------------------------------
def _get_som_engine_api():
"""Singleton SomEngine partagé."""
try:
from core.detection.som_engine import get_shared_engine
return get_shared_engine()
except ImportError:
return None
def _resolve_by_som(
screenshot_path: str,
target_spec: Dict[str, Any],
screen_width: int,
screen_height: int,
) -> Optional[Dict[str, Any]]:
"""Résoudre une cible UI via Set-of-Mark + VLM.
Pipeline :
1. SomEngine détecte tous les éléments et les numérote sur le screenshot
2. VLM reçoit l'image annotée + description de la cible
3. VLM identifie le numéro du mark → coordonnées précises
Avantages vs VLM direct :
- Le VLM n'a qu'à identifier (son point fort), pas localiser
- Les coordonnées viennent de SomEngine (pixel-perfect)
- Question simple "quel numéro ?" → réponse simple
Args:
screenshot_path: Chemin du screenshot actuel
target_spec: Spécification de la cible (vlm_description, som_element, etc.)
screen_width: Largeur écran en pixels
screen_height: Hauteur écran en pixels
Returns:
Dict avec resolved=True et coordonnées, ou None si indisponible.
"""
engine = _get_som_engine_api()
if engine is None:
return None
client = _get_vlm_client()
if client is None:
return None
t0 = time.time()
# ── 1. Lancer SomEngine sur le screenshot actuel ──
try:
from PIL import Image as PILImage
img = PILImage.open(screenshot_path).convert("RGB")
som_result = engine.analyze(img)
except Exception as e:
logger.warning("SoM resolve : erreur analyse — %s", e)
return None
if not som_result.elements:
logger.info("SoM resolve : 0 éléments détectés")
return None
# ── 2. Construire la description de la cible ──
som_element = target_spec.get("som_element", {})
vlm_description = target_spec.get("vlm_description", "")
anchor_label = som_element.get("label", "")
# Construire un prompt riche
target_parts = []
if anchor_label:
target_parts.append(f"texte '{anchor_label}'")
if vlm_description:
target_parts.append(vlm_description)
if not target_parts:
# Sans description, SoM resolve ne peut pas fonctionner
logger.debug("SoM resolve : pas de description pour identifier l'élément")
return None
target_desc = ", ".join(target_parts)
# ── 2.5. Raccourci : si le label est connu, chercher par texte directement ──
# Pas besoin du VLM si on connaît le texte exact de l'élément !
if anchor_label and len(anchor_label) >= 2:
label_lower = anchor_label.lower()
# Match exact d'abord, puis partiel
exact_matches = [
e for e in som_result.elements
if e.label and e.label.lower() == label_lower
]
if not exact_matches:
exact_matches = [
e for e in som_result.elements
if e.label and len(e.label) >= 3 and (
label_lower in e.label.lower()
or e.label.lower() in label_lower
)
]
if len(exact_matches) == 1:
# Match unique par texte → pas besoin du VLM
elem = exact_matches[0]
elapsed = time.time() - t0
cx_norm, cy_norm = elem.center_norm
logger.info(
"SoM resolve FAST : match texte unique '#%d %s' → (%.4f, %.4f) en %.1fs",
elem.id, elem.label, cx_norm, cy_norm, elapsed,
)
return {
"resolved": True,
"method": "som_text_match",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": elem.label,
"type": elem.source,
"role": "som_text_match",
"confidence": max(elem.confidence, 0.85),
"som_id": elem.id,
},
"score": max(elem.confidence, 0.85),
}
elif len(exact_matches) > 1:
# Plusieurs matchs texte → disambiguïser par proximité à la position originale
ref_center = som_element.get("center_norm", [])
if ref_center and len(ref_center) == 2:
ref_x, ref_y = ref_center
best = min(
exact_matches,
key=lambda e: (
(e.center_norm[0] - ref_x) ** 2
+ (e.center_norm[1] - ref_y) ** 2
),
)
elapsed = time.time() - t0
cx_norm, cy_norm = best.center_norm
dist = ((cx_norm - ref_x) ** 2 + (cy_norm - ref_y) ** 2) ** 0.5
if dist < 0.15: # Tolérance 15% de l'écran
logger.info(
"SoM resolve FAST : match texte proximité '#%d %s' (dist=%.3f) "
"→ (%.4f, %.4f) en %.1fs",
best.id, best.label, dist, cx_norm, cy_norm, elapsed,
)
return {
"resolved": True,
"method": "som_text_match",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": best.label,
"type": best.source,
"role": "som_text_match_proximity",
"confidence": max(best.confidence, 0.80),
"som_id": best.id,
},
"score": max(best.confidence, 0.80),
}
logger.info(
"SoM resolve : %d matchs texte pour '%s', VLM nécessaire",
len(exact_matches), anchor_label,
)
# ── 2.7. Fallback : template matching anchor vs éléments SomEngine ──
# Pour les icônes sans texte : comparer le crop de référence contre
# chaque région YOLO détectée par SomEngine.
anchor_b64 = target_spec.get("anchor_image_base64", "")
by_text = target_spec.get("by_text", "").strip()
if anchor_b64 and (not anchor_label or not by_text):
try:
import cv2
import numpy as np
# Décoder l'anchor
anc_bytes = base64.b64decode(anchor_b64)
anc_array = np.frombuffer(anc_bytes, dtype=np.uint8)
anc_img = cv2.imdecode(anc_array, cv2.IMREAD_GRAYSCALE)
# Charger le screenshot en OpenCV
screenshot_cv = cv2.imread(screenshot_path, cv2.IMREAD_GRAYSCALE)
if anc_img is not None and screenshot_cv is not None:
# Template matching de l'anchor sur le SCREENSHOT ENTIER
# (pas sur les régions individuelles — l'anchor est souvent plus grand)
anc_h, anc_w = anc_img.shape[:2]
if screenshot_cv.shape[0] >= anc_h and screenshot_cv.shape[1] >= anc_w:
res = cv2.matchTemplate(screenshot_cv, anc_img, cv2.TM_CCOEFF_NORMED)
_, max_score, _, max_loc = cv2.minMaxLoc(res)
if max_score >= 0.5:
# Centre du match
match_cx = max_loc[0] + anc_w // 2
match_cy = max_loc[1] + anc_h // 2
# Trouver l'élément SomEngine le plus proche du centre du match
best_elem = None
best_dist = float("inf")
for elem in som_result.elements:
cx, cy = elem.center
dist = ((match_cx - cx) ** 2 + (match_cy - cy) ** 2) ** 0.5
if dist < best_dist:
best_dist = dist
best_elem = elem
if best_elem and best_dist < 100: # Max 100px de distance
elapsed = time.time() - t0
cx_norm, cy_norm = best_elem.center_norm
logger.info(
"SoM resolve ANCHOR : match crop score=%.3f"
"elem '#%d %s' (dist=%.0fpx) → (%.4f, %.4f) en %.1fs",
max_score, best_elem.id, best_elem.label,
best_dist, cx_norm, cy_norm, elapsed,
)
return {
"resolved": True,
"method": "som_anchor_match",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": best_elem.label or f"icon #{best_elem.id}",
"type": best_elem.source,
"role": "som_anchor_match",
"confidence": max_score,
"som_id": best_elem.id,
},
"score": max_score,
}
except ImportError:
pass
except Exception as e:
logger.debug("SoM anchor match erreur : %s", e)
# ── 3. Sauvegarder l'image annotée SoM temporairement ──
if som_result.som_image is None:
logger.debug("SoM resolve : pas d'image annotée, skip VLM")
return None
try:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp:
som_result.som_image.save(tmp, format="JPEG", quality=85)
som_img_path = tmp.name
except Exception as e:
logger.warning("SoM resolve : erreur sauvegarde image annotée — %s", e)
return None
# ── 4. VLM : identifier le numéro du mark ──
# Lister uniquement les éléments avec un label (plus concis pour le VLM)
labeled_elements = [e for e in som_result.elements if e.label][:30]
elements_list = "\n".join(
f" #{e.id}: '{e.label}'"
for e in labeled_elements
)
# Multi-image : SoM annotée + anchor crop (si disponible)
anchor_b64 = target_spec.get("anchor_image_base64", "")
extra_images = [anchor_b64] if anchor_b64 else None
if extra_images:
prompt = (
"Image 1 shows the screen with numbered marks on each UI element.\n"
"Image 2 shows the element I'm looking for.\n\n"
f"Target: {target_desc}\n\n"
f"Detected elements:\n{elements_list}\n\n"
"Which mark number matches the target element in Image 2?\n"
'Answer with JSON only: {"mark_id": N, "confidence": 0.9}'
)
else:
prompt = (
f"I'm looking for: {target_desc}\n\n"
f"Detected elements:\n{elements_list}\n\n"
"Which number is the correct element?\n"
'Answer with JSON only: {"mark_id": N, "confidence": 0.9}'
)
system_prompt = "You identify UI elements by number. Output JSON only, no explanation."
try:
result = client.generate(
prompt=prompt,
image_path=som_img_path,
system_prompt=system_prompt,
temperature=0.1,
max_tokens=50,
force_json=False,
extra_images_b64=extra_images,
)
except Exception as e:
logger.warning("SoM resolve : erreur VLM — %s", e)
return None
finally:
try:
os.unlink(som_img_path)
except OSError:
pass
elapsed = time.time() - t0
if not result.get("success"):
logger.info("SoM resolve : VLM échoué (%.1fs)", elapsed)
return None
# ── 5. Parser la réponse et retourner les coordonnées ──
response_text = result.get("response", "").strip()
# Tenter d'abord l'extraction JSON standard
parsed = client._extract_json_from_response(response_text)
# Fallback : extraire un nombre simple de la réponse
if parsed is None:
numbers = re.findall(r'\b(\d+)\b', response_text)
if numbers:
candidate = int(numbers[0])
if som_result.get_element_by_id(candidate) is not None:
parsed = {"mark_id": candidate, "confidence": 0.7}
logger.debug("SoM resolve : extraction numéro fallback → #%d", candidate)
if parsed is None:
logger.info("SoM resolve : réponse non-JSON (%.1fs) — %.80s", elapsed, response_text)
return None
mark_id = parsed.get("mark_id")
confidence = float(parsed.get("confidence", 0.0))
if mark_id is None or confidence < 0.3:
logger.info(
"SoM resolve : mark non trouvé ou confiance trop basse (mark=%s, conf=%.2f, %.1fs)",
mark_id, confidence, elapsed,
)
return None
mark_id = int(mark_id)
elem = som_result.get_element_by_id(mark_id)
if elem is None:
logger.warning("SoM resolve : mark #%d inexistant (%.1fs)", mark_id, elapsed)
return None
cx_norm, cy_norm = elem.center_norm
logger.info(
"SoM resolve OK : mark #%d '%s' → (%.4f, %.4f) conf=%.2f en %.1fs (%d éléments)",
mark_id, elem.label, cx_norm, cy_norm, confidence, elapsed, len(som_result.elements),
)
return {
"resolved": True,
"method": "som_vlm",
"x_pct": round(cx_norm, 6),
"y_pct": round(cy_norm, 6),
"matched_element": {
"label": elem.label or f"mark #{mark_id}",
"type": elem.source,
"role": "som_identified",
"confidence": confidence,
"som_id": mark_id,
},
"score": confidence,
}
# =========================================================================
# Orchestrateur — Résolution cible complète (synchrone)
# =========================================================================
# =========================================================================
# V4 : Résolution pilotée par le plan pré-compilé
# =========================================================================
def _resolve_with_precompiled_order(
screenshot_path: str,
target_spec: Dict[str, Any],
resolve_order: list,
screen_width: int,
screen_height: int,
fallback_x_pct: float,
fallback_y_pct: float,
) -> Optional[Dict[str, Any]]:
"""Résoudre la cible en suivant l'ordre pré-compilé par l'ExecutionCompiler.
C'est le chemin V4 : l'ExecutionPlan a déjà décidé quelle méthode utiliser
(OCR, template, VLM) selon le learning et les caractéristiques de l'élément.
Le runtime ne fait qu'exécuter l'ordre — pas de cascade improvisée.
resolve_order : liste de méthodes dans l'ordre à essayer
ex: ["ocr", "template", "vlm"]
ex: ["template", "ocr"] (template d'abord pour les icônes)
ex: ["vlm"] (dernier recours)
Returns:
Dict résultat si trouvé, None si toutes les méthodes échouent.
"""
import time as _time
t_start = _time.time()
by_text = target_spec.get("by_text", "").strip()
anchor_b64 = target_spec.get("anchor_image_base64", "")
vlm_description = target_spec.get("vlm_description", "")
for method in resolve_order:
method_start = _time.time()
if method == "ocr" and by_text:
# OCR : chercher le texte visible dans l'image
# C'est le chemin rapide — idéalement < 200ms
try:
result = _resolve_by_ocr_text(
screenshot_path=screenshot_path,
target_text=by_text,
screen_width=screen_width,
screen_height=screen_height,
)
if result and result.get("resolved"):
elapsed = (_time.time() - method_start) * 1000
logger.info(
"V4 OCR : OK en %.0fms pour '%s' → (%.3f, %.3f)",
elapsed, by_text[:30],
result.get("x_pct", 0), result.get("y_pct", 0),
)
result["resolve_method"] = "v4_ocr"
result["resolve_elapsed_ms"] = elapsed
return result
except Exception as e:
logger.debug("V4 OCR erreur : %s", e)
elif method == "template" and anchor_b64:
# Template matching : comparer des pixels
try:
result = _resolve_by_template_matching(
screenshot_path=screenshot_path,
anchor_image_b64=anchor_b64,
screen_width=screen_width,
screen_height=screen_height,
confidence_threshold=0.85,
)
if result and result.get("resolved"):
elapsed = (_time.time() - method_start) * 1000
logger.info(
"V4 TEMPLATE : OK en %.0fms score=%.3f → (%.3f, %.3f)",
elapsed, result.get("score", 0),
result.get("x_pct", 0), result.get("y_pct", 0),
)
result["resolve_method"] = "v4_template"
result["resolve_elapsed_ms"] = elapsed
return result
except Exception as e:
logger.debug("V4 template erreur : %s", e)
elif method == "vlm" and (vlm_description or by_text):
# VLM : exception handler (lent, dernier recours)
description = vlm_description or f"élément '{by_text}'"
try:
result = _vlm_quick_find(
screenshot_path=screenshot_path,
target_description=description,
screen_width=screen_width,
screen_height=screen_height,
anchor_image_b64=anchor_b64,
)
if result and result.get("resolved"):
elapsed = (_time.time() - method_start) * 1000
logger.info(
"V4 VLM : OK en %.0fms pour '%s' → (%.3f, %.3f)",
elapsed, description[:30],
result.get("x_pct", 0), result.get("y_pct", 0),
)
result["resolve_method"] = "v4_vlm"
result["resolve_elapsed_ms"] = elapsed
return result
except Exception as e:
logger.debug("V4 VLM erreur : %s", e)
total_elapsed = (_time.time() - t_start) * 1000
logger.info(
"V4 resolve : toutes les méthodes (%s) ont échoué en %.0fms",
resolve_order, total_elapsed,
)
return None
def _resolve_by_ocr_text(
screenshot_path: str,
target_text: str,
screen_width: int,
screen_height: int,
) -> Optional[Dict[str, Any]]:
"""Localiser du texte dans l'image via OCR (docTR ou fallback).
C'est le chemin rapide V4 : pas de VLM, pas de template matching,
juste de l'OCR direct. Idéal pour les éléments avec texte visible.
Returns:
Dict avec x_pct, y_pct, score si trouvé, None sinon.
"""
try:
from doctr.io import DocumentFile
from doctr.models import ocr_predictor
except ImportError:
logger.debug("docTR non disponible pour V4 OCR")
return None
try:
# Utiliser un cache global pour éviter de recharger le modèle à chaque appel
global _V4_OCR_PREDICTOR
try:
_V4_OCR_PREDICTOR
except NameError:
_V4_OCR_PREDICTOR = None
if _V4_OCR_PREDICTOR is None:
_V4_OCR_PREDICTOR = ocr_predictor(
det_arch='db_resnet50',
reco_arch='crnn_vgg16_bn',
pretrained=True,
)
doc = DocumentFile.from_images([screenshot_path])
result = _V4_OCR_PREDICTOR(doc)
# Chercher le texte (match exact, insensible à la casse)
target_lower = target_text.lower().strip()
best_match = None
best_score = 0.0
for page in result.pages:
for block in page.blocks:
for line_obj in block.lines:
line_text = " ".join(w.value for w in line_obj.words)
line_lower = line_text.lower()
# Match exact > contient > mot par mot
score = 0.0
if target_lower == line_lower:
score = 1.0
elif target_lower in line_lower:
score = 0.8
elif any(target_lower == w.value.lower() for w in line_obj.words):
score = 0.9
if score > best_score:
# Coordonnées de la ligne entière (bbox)
box = line_obj.geometry # ((x1,y1), (x2,y2)) normalisées 0-1
cx = (box[0][0] + box[1][0]) / 2
cy = (box[0][1] + box[1][1]) / 2
best_match = {
"resolved": True,
"method": "v4_ocr",
"x_pct": cx,
"y_pct": cy,
"score": score,
"matched_text": line_text,
}
best_score = score
if best_match and best_score >= 0.7:
return best_match
except Exception as e:
logger.debug("docTR OCR erreur : %s", e)
return None
def _resolve_target_sync(
screenshot_path: str,
target_spec: Dict[str, Any],
screen_width: int,
screen_height: int,
fallback_x_pct: float,
fallback_y_pct: float,
strict_mode: bool = False,
processor=None,
) -> Dict[str, Any]:
"""Résoudre la cible visuellement (exécuté dans un thread séparé).
Hiérarchie de résolution (strict_mode=True, replay sessions) — VLM-FIRST :
1. VLM Quick Find (~3-8s) — compréhension sémantique de l'écran, multi-image
(screenshot + crop de référence + description riche)
1.5. SoM + VLM (~5-15s) — SomEngine numérote les éléments, VLM identifie le bon
2. Template matching OpenCV (~100ms) — fallback pixel, seuil STRICT 0.90
3. resolved=False → STOP le replay
Le VLM comprend le contexte (titre de fenêtre, type d'élément, position)
et peut trouver un élément même si l'écran est différent de l'enregistrement.
Le template matching ne compare que des pixels et produit des faux positifs.
Hiérarchie classique (strict_mode=False, VWB et autres) — INCHANGÉE :
1. Template matching OpenCV (~100ms) — seuil 0.70
1.5. VLM Quick Find si template échoue et by_text/by_role dispo
2. by_text/by_role → VLM Quick Find puis ScreenAnalyzer
3. fallback coordonnées statiques
"""
anchor_image_b64 = target_spec.get("anchor_image_base64", "")
# ===================================================================
# PHASE 1 APPRENTISSAGE : Lookup mémoire persistante (Fiche #18)
# ===================================================================
# Avant TOUTE résolution coûteuse (OCR/template/VLM), on consulte la
# mémoire persistante (TargetMemoryStore). Si cette cible a été résolue
# avec succès ≥2 fois sur cet écran (fail_ratio < 30%), on retourne
# directement les coordonnées mémorisées.
#
# Hit mémoire : <10ms (vs 300ms-15s de résolution)
# Miss mémoire : aucun overhead, on continue la cascade normale
#
# Les coords stockées sont celles qui ont PASSÉ la post-condition
# (title_match strict) lors des replays précédents. C'est la
# cristallisation par répétition : Léa = stagiaire qui apprend.
try:
from .replay_memory import memory_lookup
_window_title = target_spec.get("window_title", "") or ""
if _window_title:
_mem_result = memory_lookup(
window_title=_window_title,
target_spec=target_spec,
)
if _mem_result:
# Hit mémoire : on skip toute la cascade.
# Les coordonnées sont sanity-checked dans memory_lookup().
return _mem_result
except Exception as _exc:
logger.debug("Memory lookup skipped : %s", _exc)
# ===================================================================
# V4 : Résolution pilotée par le plan pré-compilé
# ===================================================================
# Si le target_spec contient `resolve_order`, il vient d'un ExecutionPlan
# compilé. On honore cet ordre au lieu de faire la cascade par défaut.
# C'est le "zéro VLM au runtime" : on essaie d'abord la stratégie
# pré-compilée (OCR, template, ou VLM).
resolve_order = target_spec.get("resolve_order")
if resolve_order and isinstance(resolve_order, list):
logger.info(
"V4 resolve : ordre pré-compilé = %s",
resolve_order,
)
result = _resolve_with_precompiled_order(
screenshot_path=screenshot_path,
target_spec=target_spec,
resolve_order=resolve_order,
screen_width=screen_width,
screen_height=screen_height,
fallback_x_pct=fallback_x_pct,
fallback_y_pct=fallback_y_pct,
)
if result and result.get("resolved"):
return result
# Si les méthodes pré-compilées ont toutes échoué, on continue
# vers la cascade legacy (compatibilité et robustesse).
logger.info(
"V4 resolve : toutes les méthodes pré-compilées ont échoué, "
"fallback cascade legacy"
)
# ===================================================================
# MODE STRICT (replay sessions) — Stratégie VLM-FIRST
# ===================================================================
if strict_mode and anchor_image_b64:
vlm_description = target_spec.get("vlm_description", "")
by_text_strict = target_spec.get("by_text", "").strip()
# Fallback : construire la description depuis by_text/by_role
if not vlm_description:
by_role = target_spec.get("by_role", "").strip()
if by_text_strict or by_role:
vlm_description = _build_target_description(target_spec)
# ---------------------------------------------------------------
# Étape -1 : Vérification CLIP (si embedding de référence fourni)
# Vérifie qu'on est dans la bonne application avant de chercher
# l'élément. Filet de sécurité contre les clics au mauvais endroit.
# ---------------------------------------------------------------
clip_embedding = target_spec.get("clip_embedding")
if clip_embedding:
try:
from core.embedding.clip_embedder import CLIPEmbedder
from PIL import Image as _PILImage
import numpy as _np
_clip = CLIPEmbedder()
# Embedding de l'écran actuel (fenêtre si possible)
window_capture = target_spec.get("window_capture", {})
window_rect = window_capture.get("rect")
current_img = _PILImage.open(screenshot_path)
if window_rect:
current_img = current_img.crop(tuple(window_rect))
current_emb = _np.array(_clip.embed_image(current_img), dtype=_np.float32).flatten()
ref_emb = _np.array(clip_embedding, dtype=_np.float32).flatten()
clip_sim = float(_np.dot(current_emb, ref_emb) / (
_np.linalg.norm(current_emb) * _np.linalg.norm(ref_emb)
))
logger.info(f"CLIP vérification : similarité={clip_sim:.3f}")
if clip_sim < 0.75:
logger.warning(
f"CLIP MISMATCH : sim={clip_sim:.3f} < 0.75 — "
f"écran actuel trop différent de l'enregistrement"
)
return {
"resolved": False,
"method": "clip_mismatch",
"reason": f"clip_similarity_{clip_sim:.3f}",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
except Exception as e:
logger.debug(f"CLIP vérification erreur (non-bloquant) : {e}")
# ---------------------------------------------------------------
# Étape 0 : Choisir la stratégie selon le type d'élément
# ---------------------------------------------------------------
by_text_source = target_spec.get("by_text_source", "")
has_window = bool(target_spec.get("window_capture", {}).get("rect"))
if by_text_strict and by_text_source in ("ocr", "vlm") and has_window:
# Texte visible DANS une fenêtre → grounding VLM sur fenêtre croppée
grounding_result = _resolve_by_grounding(
screenshot_path=screenshot_path,
target_spec=target_spec,
screen_width=screen_width,
screen_height=screen_height,
)
if grounding_result and grounding_result.get("resolved"):
logger.info(
"Strict resolve GROUNDING : OK (%.4f, %.4f) pour '%s'",
grounding_result.get("x_pct", 0),
grounding_result.get("y_pct", 0),
by_text_strict[:50],
)
return grounding_result
if not by_text_strict or by_text_source not in ("ocr", "vlm"):
# Template matching pour les éléments sans texte (icônes pures)
window_capture = target_spec.get("window_capture", {})
window_rect = window_capture.get("rect")
from pathlib import Path as _Path
_full = _Path(screenshot_path)
_win = _full.parent / _full.name.replace("_full.png", "_window.png")
tm_path = str(_win) if _win.is_file() and window_rect else screenshot_path
tm_screen_w = (window_rect[2] - window_rect[0]) if window_rect and _win.is_file() else screen_width
tm_screen_h = (window_rect[3] - window_rect[1]) if window_rect and _win.is_file() else screen_height
result = _resolve_by_template_matching(
screenshot_path=tm_path,
anchor_image_b64=anchor_image_b64,
screen_width=tm_screen_w,
screen_height=tm_screen_h,
confidence_threshold=0.90,
)
if result and result.get("score", 0) >= 0.90:
x_tm, y_tm = result["x_pct"], result["y_pct"]
# Convertir coordonnées fenêtre → écran si nécessaire
if window_rect and _win.is_file():
abs_x = window_rect[0] + x_tm * tm_screen_w
abs_y = window_rect[1] + y_tm * tm_screen_h
result["x_pct"] = round(abs_x / screen_width, 6)
result["y_pct"] = round(abs_y / screen_height, 6)
logger.info(
"Strict resolve TEMPLATE : icon match (score=%.3f)",
result.get("score", 0),
)
return result
# ---------------------------------------------------------------
# Étape 1 : VLM Quick Find (fallback, multi-image)
# ---------------------------------------------------------------
if vlm_description or anchor_image_b64:
vlm_result = _vlm_quick_find(
screenshot_path=screenshot_path,
target_description=vlm_description,
anchor_image_b64=anchor_image_b64,
)
if vlm_result and vlm_result.get("resolved"):
if vlm_result.get("score", 0) >= 0.3:
logger.info(
"Strict resolve VLM-first : VLM OK (score=%.2f) pour '%s'",
vlm_result.get("score", 0),
vlm_description[:60] if vlm_description else "(anchor)",
)
return vlm_result
else:
logger.info(
"Strict resolve VLM-first : VLM score=%.2f trop bas, passage template",
vlm_result.get("score", 0),
)
else:
logger.info(
"Strict resolve VLM-first : VLM échoué pour '%s', passage template matching",
vlm_description[:60] if vlm_description else "(anchor)",
)
# ---------------------------------------------------------------
# Étape 1.5 : SoM + VLM (Set-of-Mark + identification)
# SomEngine numérote les éléments, VLM identifie le bon numéro.
# Plus fiable que le VLM direct car le VLM n'a qu'à identifier,
# pas localiser — et les coordonnées sont pixel-perfect.
# ---------------------------------------------------------------
som_element = target_spec.get("som_element", {})
if som_element or vlm_description:
som_result = _resolve_by_som(
screenshot_path=screenshot_path,
target_spec=target_spec,
screen_width=screen_width,
screen_height=screen_height,
)
if som_result and som_result.get("resolved"):
logger.info(
"Strict resolve SoM+VLM : OK (score=%.2f, mark=#%s)",
som_result.get("score", 0),
som_result.get("matched_element", {}).get("som_id", "?"),
)
return som_result
else:
logger.info("Strict resolve SoM+VLM : échoué, passage template matching")
# ---------------------------------------------------------------
# Étape 2 : Template matching (fallback pixel) — seuil STRICT 0.90
# ---------------------------------------------------------------
result = _resolve_by_template_matching(
screenshot_path=screenshot_path,
anchor_image_b64=anchor_image_b64,
screen_width=screen_width,
screen_height=screen_height,
confidence_threshold=0.90,
)
if result:
score = result.get("score", 0)
# Score >= 0.95 : match quasi-parfait, pas besoin de valider le contexte
if score >= 0.95:
logger.info(
"Strict resolve VLM-first : template matching fallback OK "
"(score=%.3f >= 0.95, contexte skip — match quasi-parfait)",
score,
)
return result
elif _validate_match_context(result, fallback_x_pct, fallback_y_pct, target_spec):
logger.info(
"Strict resolve VLM-first : template matching fallback OK "
"(score=%.3f >= 0.90, context OK)",
score,
)
return result
else:
logger.warning(
"Strict resolve VLM-first : template score=%.3f MAIS contexte invalide, rejeté",
score,
)
# ---------------------------------------------------------------
# Étape 3 : RIEN ne fonctionne → resolved=False → STOP replay
# ---------------------------------------------------------------
return {
"resolved": False,
"method": "strict_vlm_template_failed",
"reason": "vlm_and_template_all_failed",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# ===================================================================
# MODE CLASSIQUE (VWB et autres) — Comportement existant
# ===================================================================
# ---------------------------------------------------------------
# Stratégie 1 : Template matching par image d'ancre (seuil 0.70)
# ---------------------------------------------------------------
if anchor_image_b64:
result = _resolve_by_template_matching(
screenshot_path=screenshot_path,
anchor_image_b64=anchor_image_b64,
screen_width=screen_width,
screen_height=screen_height,
confidence_threshold=0.7,
)
if result:
return result
logger.info(
"Template matching échoué pour ancre '%s', tentative VLM Quick Find",
target_spec.get("anchor_id", "?"),
)
# ---------------------------------------------------------------
# Stratégie 1.5 : VLM Quick Find (fallback léger après template matching)
# ---------------------------------------------------------------
by_text = target_spec.get("by_text", "").strip()
by_role = target_spec.get("by_role", "").strip()
if by_text or by_role:
vlm_desc = _build_target_description(target_spec)
vlm_result = _vlm_quick_find(
screenshot_path=screenshot_path,
target_description=vlm_desc,
anchor_image_b64=anchor_image_b64,
)
if vlm_result:
return vlm_result
logger.info(
"VLM Quick Find échoué pour ancre '%s', fallback coordonnées",
target_spec.get("anchor_id", "?"),
)
return {
"resolved": False,
"method": "fallback",
"reason": "template_matching_failed",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# ---------------------------------------------------------------
# Stratégie 2 : VLM Quick Find (léger, ~5-10s)
# ---------------------------------------------------------------
by_text = target_spec.get("by_text", "")
by_role = target_spec.get("by_role", "")
# Si aucun critère sémantique et pas d'ancre, fallback direct
if not by_text and not by_role and not anchor_image_b64:
return {
"resolved": False,
"method": "fallback",
"reason": "no_target_criteria",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# Tenter le VLM Quick Find AVANT ScreenAnalyzer (beaucoup plus rapide)
if by_text or by_role:
vlm_desc = _build_target_description(target_spec)
vlm_result = _vlm_quick_find(
screenshot_path=screenshot_path,
target_description=vlm_desc,
)
if vlm_result:
return vlm_result
logger.info(
"VLM Quick Find échoué pour '%s', fallback ScreenAnalyzer",
vlm_desc,
)
# ---------------------------------------------------------------
# Stratégie 3 : Matching sémantique via ScreenAnalyzer (~15-20s)
# ---------------------------------------------------------------
if processor is None:
return {
"resolved": False,
"method": "fallback",
"reason": "no_processor",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
processor._ensure_initialized()
if processor._screen_analyzer is None:
return {
"resolved": False,
"method": "fallback",
"reason": "screen_analyzer_unavailable",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# Analyser le screenshot (Niveaux 1-3 : raw, OCR, UI elements)
try:
screen_state = processor._screen_analyzer.analyze(screenshot_path)
except Exception as e:
logger.warning(f"Analyse screenshot échouée: {e}")
return {
"resolved": False,
"method": "fallback",
"reason": f"analysis_failed: {e}",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
ui_elements = screen_state.ui_elements or []
if not ui_elements:
logger.info("Aucun élément UI détecté, fallback coordonnées")
return {
"resolved": False,
"method": "fallback",
"reason": "no_ui_elements",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
}
# Matching de la cible parmi les éléments détectés
candidates = []
for elem in ui_elements:
score = 0.0
# Score par texte (label)
if by_text and elem.label:
text_lower = by_text.lower()
label_lower = elem.label.lower()
if text_lower in label_lower or label_lower in text_lower:
score += 0.6
elif _fuzzy_match(text_lower, label_lower):
score += 0.3
# Score par rôle
if by_role:
role_lower = by_role.lower()
if elem.role and role_lower in elem.role.lower():
score += 0.3
if elem.type and role_lower in elem.type.lower():
score += 0.2
if score > 0:
candidates.append((elem, score))
if not candidates:
logger.info(
f"Aucun match visuel pour target(text='{by_text}', role='{by_role}') "
f"parmi {len(ui_elements)} éléments"
)
return {
"resolved": False,
"method": "fallback",
"reason": "no_match",
"x_pct": fallback_x_pct,
"y_pct": fallback_y_pct,
"ui_elements_count": len(ui_elements),
}
# Trier par score décroissant et prendre le meilleur
candidates.sort(key=lambda c: c[1], reverse=True)
best_elem, best_score = candidates[0]
# Convertir les coordonnées pixel en proportions
cx, cy = best_elem.center
x_pct = round(cx / screen_width, 6) if screen_width > 0 else 0.0
y_pct = round(cy / screen_height, 6) if screen_height > 0 else 0.0
logger.info(
f"Cible résolue visuellement: '{best_elem.label}' ({best_elem.type}/{best_elem.role}) "
f"score={best_score:.2f} → ({x_pct:.4f}, {y_pct:.4f})"
)
return {
"resolved": True,
"method": "visual",
"x_pct": x_pct,
"y_pct": y_pct,
"matched_element": {
"label": best_elem.label,
"type": best_elem.type,
"role": best_elem.role,
"center": list(best_elem.center),
"confidence": best_elem.label_confidence,
},
"score": best_score,
"candidates_count": len(candidates),
"ui_elements_count": len(ui_elements),
}
def _fuzzy_match(a: str, b: str, threshold: float = 0.6) -> bool:
"""Match approximatif par ratio de caractères communs."""
if not a or not b:
return False
common = sum(1 for c in a if c in b)
return (common / max(len(a), len(b))) >= threshold
def _fallback_response(request: ResolveTargetRequest, reason: str, detail: str) -> Dict:
"""Réponse de fallback quand la résolution visuelle échoue."""
return {
"resolved": False,
"method": "fallback",
"reason": reason,
"detail": detail,
"x_pct": request.fallback_x_pct,
"y_pct": request.fallback_y_pct,
}
# =========================================================================
# Observer — Pré-analyse écran avant résolution
# =========================================================================
def _pre_analyze_screen_sync(
screenshot_b64: str,
expected_state: str,
window_title: str,
screen_width: int,
screen_height: int,
) -> Dict[str, Any]:
"""Pré-analyse synchrone de l'écran via VLM.
Utilise gemma4 (Docker port 11435) pour détecter :
1. Popups/dialogues modaux (avec coordonnées du bouton à cliquer)
2. États incohérents avec l'attendu
Rapide (~2-5s) car gemma4 est léger et en mode texte+image.
"""
import requests as _requests
gemma4_port = os.environ.get("GEMMA4_PORT", "11435")
gemma4_url = f"http://localhost:{gemma4_port}/api/chat"
# Charger le contexte métier pour l'Observer
from .domain_context import get_domain_context
domain = get_domain_context(os.environ.get("RPA_DOMAIN", "generic"))
# Prompt concis pour détection popup
prompt = (
"Regarde cette capture d'écran.\n"
"Y a-t-il une popup, boîte de dialogue, message d'erreur, ou fenêtre modale visible ?\n\n"
"Réponds EXACTEMENT dans ce format :\n"
"ÉTAT: OK ou POPUP ou INATTENDU\n"
"BOUTON: texte du bouton à cliquer (si POPUP, sinon 'aucun')\n"
"DÉTAIL: description courte (1 ligne)"
)
# Messages avec contexte métier
messages = []
if domain.system_prompt:
messages.append({"role": "system", "content": domain.system_prompt})
messages.append({"role": "user", "content": prompt, "images": [screenshot_b64]})
try:
t_start = time.time()
resp = _requests.post(
gemma4_url,
json={
"model": "gemma4:e4b",
"messages": messages,
"stream": False,
"think": True,
"options": {"temperature": 0.1, "num_predict": 800},
},
timeout=30,
)
elapsed_ms = (time.time() - t_start) * 1000
if not resp.ok:
logger.warning(f"Observer VLM HTTP {resp.status_code}")
return {"screen_state": "ok", "detail": f"VLM HTTP {resp.status_code}"}
content = resp.json().get("message", {}).get("content", "").strip()
logger.info(f"Observer VLM ({elapsed_ms:.0f}ms) : {content[:100]}")
# Parser la réponse
state = "ok"
button = ""
detail = content
for line in content.split("\n"):
line_clean = line.strip()
upper = line_clean.upper()
if upper.startswith("ÉTAT:") or upper.startswith("ETAT:"):
val = upper.split(":", 1)[1].strip()
if "POPUP" in val:
state = "popup"
elif "INATTENDU" in val or "UNEXPECTED" in val:
state = "unexpected"
else:
state = "ok"
elif upper.startswith("BOUTON:"):
button = line_clean.split(":", 1)[1].strip().strip("'\"")
if button.lower() in ("aucun", "none", "n/a", ""):
button = ""
elif upper.startswith("DÉTAIL:") or upper.startswith("DETAIL:"):
detail = line_clean.split(":", 1)[1].strip()
if state == "ok":
return {"screen_state": "ok"}
result = {
"screen_state": state,
"detail": detail,
"elapsed_ms": round(elapsed_ms, 1),
}
# Si popup détectée avec un texte de bouton, essayer de le localiser
if state == "popup" and button:
result["popup_label"] = button
# Localiser le bouton par grounding VLM (qwen2.5vl)
coords = _locate_popup_button(screenshot_b64, button, screen_width, screen_height)
if coords:
result["popup_coords"] = coords
return result
except _requests.Timeout:
logger.debug("Observer VLM timeout (15s)")
return {"screen_state": "ok", "detail": "VLM timeout"}
except Exception as e:
logger.debug(f"Observer VLM erreur : {e}")
return {"screen_state": "ok", "detail": str(e)}
def _locate_popup_button(
screenshot_b64: str, button_text: str,
screen_width: int, screen_height: int,
) -> Optional[Dict[str, float]]:
"""Localiser un bouton de popup par grounding VLM (qwen2.5vl).
Utilise le format bbox_2d natif de qwen2.5vl pour trouver
la position exacte du bouton sur le screenshot.
"""
import requests as _requests
ollama_url = "http://localhost:11434/api/chat"
prompt = f"Detect the button with text '{button_text}' with a bounding box."
try:
resp = _requests.post(
ollama_url,
json={
"model": "qwen2.5vl:7b",
"messages": [{"role": "user", "content": prompt, "images": [screenshot_b64]}],
"stream": False,
"options": {"temperature": 0.1, "num_predict": 50},
},
timeout=15,
)
if not resp.ok:
return None
content = resp.json().get("message", {}).get("content", "")
# Parser bbox_2d — qwen2.5vl retourne des coordonnées en pixels
# relatifs à l'image envoyée, PAS sur une grille 1000x1000.
# Format JSON : [{"bbox_2d": [x1, y1, x2, y2], "label": "..."}]
bbox_match = re.search(
r'"bbox_2d"\s*:\s*\[\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\]',
content,
)
if bbox_match:
x1, y1, x2, y2 = [int(bbox_match.group(i)) for i in range(1, 5)]
# Normaliser par les dimensions de l'écran (pixels → 0-1)
cx = (x1 + x2) / 2 / screen_width
cy = (y1 + y2) / 2 / screen_height
if 0.0 <= cx <= 1.0 and 0.0 <= cy <= 1.0:
logger.info(f"Observer : bouton '{button_text}' localisé à ({cx:.3f}, {cy:.3f})")
return {"x_pct": cx, "y_pct": cy}
except Exception as e:
logger.debug(f"Observer grounding bouton erreur : {e}")
return None