# agent_v0/server_v1/resolve_engine.py """ Résolution visuelle des cibles UI pour le replay. Contient toutes les stratégies de résolution : - Template matching OpenCV (~100ms) - YOLO/OmniParser (~0.6-0.8s) - VLM Quick Find (~3-8s) - VLM Grounding Direct (~5-15s) - SomEngine + VLM (~5-15s) - Matching sémantique ScreenAnalyzer (~15-20s) - Pré-analyse écran (Observer — popup detection) Extrait de api_stream.py pour clarifier l'architecture. """ import base64 import io import logging import os import re import tempfile import threading import time from typing import Any, Dict, List, Optional from pydantic import BaseModel logger = logging.getLogger("api_stream") # ========================================================================= # Modèles Pydantic # ========================================================================= class ResolveTargetRequest(BaseModel): """Requête de résolution visuelle d'une cible.""" session_id: str screenshot_b64: str # Screenshot JPEG en base64 target_spec: Dict[str, Any] # {by_role, by_text, by_position, ...} fallback_x_pct: float = 0.0 # Coordonnées de fallback fallback_y_pct: float = 0.0 screen_width: int = 1920 screen_height: int = 1080 strict_mode: bool = False # True pour replay sessions (seuil template 0.90 + YOLO) class PreAnalyzeRequest(BaseModel): """Requête de pré-analyse écran (Observer).""" screenshot_b64: str expected_state: str = "" # Description attendue de l'état écran window_title: str = "" # Titre fenêtre attendu screen_width: int = 1920 screen_height: int = 1080 # ========================================================================= # Template Matching # ========================================================================= def _resolve_by_template_matching( screenshot_path: str, anchor_image_b64: str, screen_width: int, screen_height: int, confidence_threshold: float = 0.7, ) -> Optional[Dict[str, Any]]: """Résoudre la position d'une ancre par template matching OpenCV. Compare l'image de l'ancre (crop) avec le screenshot actuel pour trouver la meilleure correspondance. Utilise cv2.matchTemplate avec TM_CCOEFF_NORMED. Args: screenshot_path: Chemin du screenshot de l'écran actuel anchor_image_b64: Image de l'ancre encodée en base64 (PNG) screen_width: Largeur de l'écran en pixels screen_height: Hauteur de l'écran en pixels confidence_threshold: Seuil minimum de confiance (0.0 à 1.0) Returns: Dict avec resolved=True et coordonnées, ou None si pas de match """ try: import cv2 import numpy as np except ImportError: logger.warning("OpenCV non disponible pour template matching") return None try: # Charger le screenshot screenshot = cv2.imread(screenshot_path) if screenshot is None: logger.warning("Impossible de lire le screenshot : %s", screenshot_path) return None # Décoder l'image de l'ancre depuis base64 anchor_bytes = base64.b64decode(anchor_image_b64) anchor_array = np.frombuffer(anchor_bytes, dtype=np.uint8) anchor_img = cv2.imdecode(anchor_array, cv2.IMREAD_COLOR) if anchor_img is None: logger.warning("Impossible de décoder l'image de l'ancre") return None # Convertir en niveaux de gris pour le matching screenshot_gray = cv2.cvtColor(screenshot, cv2.COLOR_BGR2GRAY) anchor_gray = cv2.cvtColor(anchor_img, cv2.COLOR_BGR2GRAY) # Vérifier que l'ancre n'est pas plus grande que le screenshot sh, sw = screenshot_gray.shape[:2] ah, aw = anchor_gray.shape[:2] if ah > sh or aw > sw: logger.warning( "Ancre (%dx%d) plus grande que le screenshot (%dx%d)", aw, ah, sw, sh, ) return None # Template matching multi-échelle : essayer l'échelle 1.0 d'abord, # puis quelques variations si la résolution a changé. # Plage étendue 0.5x-2.0x pour couvrir les écarts importants # (ex: apprentissage 2560x1600 → replay 1280x720 = ratio ~0.5x) best_val = -1.0 best_loc = None best_scale = 1.0 best_anchor_size = (aw, ah) for scale in [1.0, 0.9, 1.1, 0.8, 1.2, 0.75, 1.25, 0.6, 1.5, 0.5, 1.75, 2.0]: if scale != 1.0: new_w = int(aw * scale) new_h = int(ah * scale) if new_w < 10 or new_h < 10 or new_w > sw or new_h > sh: continue scaled_anchor = cv2.resize(anchor_gray, (new_w, new_h)) else: scaled_anchor = anchor_gray new_w, new_h = aw, ah result = cv2.matchTemplate(screenshot_gray, scaled_anchor, cv2.TM_CCOEFF_NORMED) _, max_val, _, max_loc = cv2.minMaxLoc(result) if max_val > best_val: best_val = max_val best_loc = max_loc best_scale = scale best_anchor_size = (new_w, new_h) # Si on a un très bon match, pas besoin de continuer if best_val >= 0.95: break if best_val < confidence_threshold: logger.info( "Template matching : meilleur score=%.3f < seuil=%.3f (ancre %dx%d, écran %dx%d)", best_val, confidence_threshold, aw, ah, sw, sh, ) return None # Calculer le centre du match match_w, match_h = best_anchor_size cx = best_loc[0] + match_w / 2.0 cy = best_loc[1] + match_h / 2.0 # Convertir en proportions normalisées x_pct = round(cx / sw, 6) if sw > 0 else 0.0 y_pct = round(cy / sh, 6) if sh > 0 else 0.0 logger.info( "Template matching OK : score=%.3f, échelle=%.2f, " "centre=(%d, %d) → (%.4f, %.4f) sur %dx%d", best_val, best_scale, int(cx), int(cy), x_pct, y_pct, sw, sh, ) return { "resolved": True, "method": "template_matching", "x_pct": x_pct, "y_pct": y_pct, "matched_element": { "label": f"anchor_template", "type": "visual_anchor", "role": "anchor", "center": [int(cx), int(cy)], "confidence": best_val, }, "score": best_val, "scale": best_scale, "match_box": { "x": best_loc[0], "y": best_loc[1], "width": match_w, "height": match_h, }, } except Exception as e: logger.error("Erreur template matching : %s", e) return None def _validate_match_context( result: Dict[str, Any], original_x_pct: float, original_y_pct: float, target_spec: Dict[str, Any], max_distance: float = 0.35, ) -> bool: """Vérifier que la position trouvée est dans la même zone que l'originale. Évite les faux positifs du template matching : un bouton similaire visuellement mais situé dans une zone très différente de l'écran. Args: result: Résultat du template matching (contient x_pct, y_pct). original_x_pct: Position X originale (pourcentage, 0.0-1.0). original_y_pct: Position Y originale (pourcentage, 0.0-1.0). target_spec: Spécification de la cible (non utilisé pour l'instant, mais disponible pour des règles contextuelles futures). max_distance: Distance euclidienne maximum acceptée (en pourcentage de l'écran). Défaut 0.35 = ~35% de la diagonale, assez permissif pour les UI dynamiques. Returns: True si la position est valide (même zone), False sinon. """ found_x = result.get("x_pct", 0.0) found_y = result.get("y_pct", 0.0) # Distance euclidienne en pourcentage de l'écran dx = found_x - original_x_pct dy = found_y - original_y_pct distance = (dx ** 2 + dy ** 2) ** 0.5 if distance > max_distance: logger.debug( "Context validation : distance=%.3f > max=%.3f " "(found=(%.3f, %.3f), original=(%.3f, %.3f))", distance, max_distance, found_x, found_y, original_x_pct, original_y_pct, ) return False return True # ========================================================================= # YOLO/OmniParser — Résolution par détection d'éléments UI # ========================================================================= # Chargement paresseux d'OmniParser (singleton, GPU) _omniparser_available: Optional[bool] = None # None = pas encore vérifié _omniparser_instance = None _omniparser_lock = threading.Lock() def _get_omniparser(): """Obtenir l'instance OmniParser (lazy loading, thread-safe). Returns: OmniParserAdapter ou None si non disponible. """ global _omniparser_available, _omniparser_instance if _omniparser_available is False: return None if _omniparser_instance is not None: return _omniparser_instance with _omniparser_lock: if _omniparser_available is False: return None if _omniparser_instance is not None: return _omniparser_instance try: from core.detection.omniparser_adapter import OmniParserAdapter adapter = OmniParserAdapter() if adapter.available: _omniparser_instance = adapter _omniparser_available = True logger.info("OmniParser disponible pour la résolution YOLO") return adapter else: _omniparser_available = False logger.info("OmniParser : modèles non trouvés, YOLO désactivé") return None except ImportError: _omniparser_available = False logger.info("OmniParser non installé, YOLO désactivé") return None except Exception as e: _omniparser_available = False logger.warning("OmniParser init échouée : %s", e) return None def _resolve_by_yolo( screenshot_path: str, anchor_image_b64: str, screen_width: int, screen_height: int, target_spec: Dict[str, Any], ) -> Optional[Dict[str, Any]]: """Résolution via YOLO/OmniParser : détecte tous les éléments UI puis matche le crop de référence contre les éléments détectés. Stratégie : 1. OmniParser détecte tous les éléments UI du screenshot (~0.6-0.8s) 2. Pour chaque élément détecté, template matching local contre l'anchor 3. Si 1 seul bon match (score >= 0.50) → accepter 4. Si 2+ matchs ambigus → retourner None (le VLM tranchera) Args: screenshot_path: Chemin vers le screenshot JPEG anchor_image_b64: Image de l'anchor encodée en base64 screen_width: Largeur de l'écran screen_height: Hauteur de l'écran target_spec: Spécification de la cible Returns: Dict avec resolved=True/False, x_pct, y_pct, score ou None si OmniParser pas disponible ou aucun match """ try: import cv2 import numpy as np except ImportError: return None omniparser = _get_omniparser() if omniparser is None: return None t0 = time.time() try: from PIL import Image as PILImage # Charger le screenshot en PIL screenshot_pil = PILImage.open(screenshot_path) sw, sh = screenshot_pil.size # Charger le screenshot en numpy/OpenCV pour le template matching screenshot_np = np.array(screenshot_pil) if len(screenshot_np.shape) == 3 and screenshot_np.shape[2] == 3: # PIL est RGB, convertir en BGR pour OpenCV screenshot_bgr = cv2.cvtColor(screenshot_np, cv2.COLOR_RGB2BGR) else: screenshot_bgr = screenshot_np screenshot_gray = cv2.cvtColor(screenshot_bgr, cv2.COLOR_BGR2GRAY) # Décoder l'anchor depuis base64 anchor_bytes = base64.b64decode(anchor_image_b64) anchor_array = np.frombuffer(anchor_bytes, dtype=np.uint8) anchor_img = cv2.imdecode(anchor_array, cv2.IMREAD_COLOR) if anchor_img is None: logger.warning("YOLO resolve : impossible de décoder l'anchor") return None anchor_gray = cv2.cvtColor(anchor_img, cv2.COLOR_BGR2GRAY) anchor_h, anchor_w = anchor_gray.shape[:2] # Détecter tous les éléments UI avec OmniParser elements = omniparser.detect(screenshot_pil) if not elements: elapsed = time.time() - t0 logger.info("YOLO resolve : 0 éléments détectés (%.1fs)", elapsed) return None logger.info( "YOLO resolve : %d éléments détectés, matching anchor %dx%d...", len(elements), anchor_w, anchor_h, ) # Matcher l'anchor contre chaque élément détecté YOLO_MATCH_THRESHOLD = 0.50 matches = [] for elem in elements: x1, y1, x2, y2 = elem.bbox elem_w = x2 - x1 elem_h = y2 - y1 # Ignorer les éléments trop petits if elem_w < 5 or elem_h < 5: continue # Extraire le crop de l'élément depuis le screenshot elem_crop = screenshot_gray[y1:y2, x1:x2] if elem_crop.size == 0: continue # Template matching local : resize anchor pour matcher la taille de l'élément # ou inversement, selon les dimensions relatives try: # Approche : resize l'anchor à la taille du crop et comparer if elem_w > 0 and elem_h > 0: anchor_resized = cv2.resize(anchor_gray, (elem_w, elem_h)) result = cv2.matchTemplate( elem_crop, anchor_resized, cv2.TM_CCOEFF_NORMED ) _, max_val, _, _ = cv2.minMaxLoc(result) else: continue # Aussi essayer le crop à la taille de l'anchor si c'est plus grand if elem_w >= anchor_w and elem_h >= anchor_h: result2 = cv2.matchTemplate( elem_crop, anchor_gray, cv2.TM_CCOEFF_NORMED ) _, max_val2, _, _ = cv2.minMaxLoc(result2) max_val = max(max_val, max_val2) if max_val >= YOLO_MATCH_THRESHOLD: matches.append((elem, max_val)) except cv2.error: continue elapsed = time.time() - t0 if not matches: logger.info( "YOLO resolve : aucun match >= %.2f parmi %d éléments (%.1fs)", YOLO_MATCH_THRESHOLD, len(elements), elapsed, ) return None # Trier par score décroissant matches.sort(key=lambda m: m[1], reverse=True) best_elem, best_score = matches[0] # Si 2+ matchs avec des scores proches (< 0.10 d'écart), c'est ambigu # → laisser le VLM trancher if len(matches) >= 2: second_score = matches[1][1] if best_score - second_score < 0.10: logger.info( "YOLO resolve : %d matchs ambigus (best=%.3f, second=%.3f, " "écart=%.3f < 0.10), VLM requis (%.1fs)", len(matches), best_score, second_score, best_score - second_score, elapsed, ) return None # 1 seul match clair → accepter cx, cy = best_elem.center x_pct = round(cx / sw, 6) if sw > 0 else 0.0 y_pct = round(cy / sh, 6) if sh > 0 else 0.0 logger.info( "YOLO resolve OK : '%s' (%s) score=%.3f → (%.4f, %.4f) " "parmi %d éléments, %d matchs (%.1fs)", best_elem.label, best_elem.element_type, best_score, x_pct, y_pct, len(elements), len(matches), elapsed, ) return { "resolved": True, "method": "yolo_omniparser", "x_pct": x_pct, "y_pct": y_pct, "matched_element": { "label": best_elem.label, "type": best_elem.element_type, "role": "yolo_detected", "center": [cx, cy], "confidence": best_score, }, "score": best_score, "yolo_elements_count": len(elements), "yolo_matches_count": len(matches), } except Exception as e: elapsed = time.time() - t0 logger.warning("YOLO resolve : exception (%.1fs) — %s", elapsed, e) return None # ========================================================================= # VLM Quick Find — Fallback léger quand le template matching échoue # ========================================================================= # Client Ollama singleton (initialisé au premier appel, pas au démarrage) _vlm_client = None _vlm_client_lock = threading.Lock() # Timeout dédié pour le VLM Quick Find (plus court que le timeout par défaut) _VLM_QUICK_FIND_TIMEOUT = 30 # secondes def _get_vlm_client(): """Obtenir ou créer le client Ollama singleton pour le VLM Quick Find. Initialisation paresseuse : le client n'est créé qu'au premier appel, pas au démarrage du serveur (évite de bloquer si Ollama est down). Le modèle est résolu automatiquement via vlm_config (RPA_VLM_MODEL). """ global _vlm_client if _vlm_client is not None: return _vlm_client with _vlm_client_lock: if _vlm_client is not None: return _vlm_client try: from core.detection.ollama_client import OllamaClient from core.detection.vlm_config import get_vlm_model _model = get_vlm_model() _vlm_client = OllamaClient( endpoint="http://localhost:11434", model=_model, timeout=_VLM_QUICK_FIND_TIMEOUT, ) logger.info("VLM Quick Find : client Ollama initialisé (%s)", _model) except Exception as e: logger.warning(f"VLM Quick Find : impossible d'initialiser le client Ollama : {e}") return None return _vlm_client def _build_target_description(target_spec: Dict[str, Any]) -> str: """Construire une description textuelle de l'élément à trouver. Utilisé par le VLM Quick Find pour savoir quoi chercher sur le screenshot. Args: target_spec: Spécification de la cible (by_text, by_role, etc.) Returns: Description en langage naturel, ex: "un bouton contenant 'Valider'" """ by_text = target_spec.get("by_text", "").strip() by_role = target_spec.get("by_role", "").strip() if by_text and by_role: return f"un {by_role} contenant '{by_text}'" elif by_text: return f"élément contenant le texte '{by_text}'" elif by_role: return f"un {by_role}" else: return "l'élément interactif principal" def _vlm_quick_find( screenshot_path: str, target_description: str, anchor_image_b64: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Demander au VLM de localiser un élément sur le screenshot. Stratégie VLM-first pour le replay : le VLM comprend le contexte de l'écran et peut trouver un élément même si l'apparence a changé. Modes de fonctionnement : - Avec anchor_image_b64 + description : multi-image (screenshot + crop de référence). Le VLM voit le screenshot ET le crop, ce qui est beaucoup plus précis. - Avec description seule : single-image, le VLM cherche par la description textuelle. - Avec anchor_image_b64 seule (pas de description) : multi-image avec prompt visuel pur. Args: screenshot_path: Chemin du screenshot actuel target_description: Description riche de l'élément à trouver. Ex: "Dans la fenêtre 'Exécuter', l'élément cliqué en bas au centre" anchor_image_b64: Image de référence (crop) en base64 (optionnel). Si fourni, envoyé comme seconde image au VLM pour comparaison visuelle. Returns: {"x_pct": float, "y_pct": float, "confidence": float, "method": "vlm_quick_find"} ou None si l'élément n'est pas trouvé ou en cas d'erreur """ client = _get_vlm_client() if client is None: logger.debug("VLM Quick Find : client Ollama non disponible, skip") return None t0 = time.time() # Construire le prompt adapté selon les informations disponibles has_anchor = bool(anchor_image_b64) has_description = bool(target_description and target_description.strip()) if has_anchor and has_description: # Mode optimal : screenshot + crop de référence + description textuelle prompt = ( "The first image is the current screen. " "The second image shows the element I want to click.\n\n" f"Context: {target_description}\n\n" "Find this exact element on the screen and return its CENTER coordinates " "as percentage of the screen dimensions.\n" 'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n' 'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}' ) elif has_anchor: # Mode visuel pur : screenshot + crop, pas de description prompt = ( "The first image is the current screen. " "The second image shows the element I want to click.\n\n" "Find this exact element on the screen and return its CENTER coordinates " "as percentage of the screen dimensions.\n" 'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n' 'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}' ) else: # Mode description seule prompt = ( "Look at this screenshot carefully.\n\n" f"{target_description}\n\n" "Find this element and return its CENTER coordinates " "as percentage of the image dimensions.\n" 'Return ONLY a JSON object: {"x_pct": 0.XX, "y_pct": 0.XX, "confidence": 0.XX}\n' 'If the element is not visible, return: {"x_pct": null, "y_pct": null, "confidence": 0.0}' ) system_prompt = "You are a UI element locator. Output raw JSON only. No explanation." try: # Préparer les images supplémentaires (anchor crop) extra_images = [anchor_image_b64] if has_anchor else None result = client.generate( prompt=prompt, image_path=screenshot_path, system_prompt=system_prompt, temperature=0.1, max_tokens=200, force_json=False, extra_images_b64=extra_images, ) elapsed = time.time() - t0 if not result.get("success"): logger.info( "VLM Quick Find : échec appel VLM (%.1fs) — %s", elapsed, result.get("error", "?"), ) return None response_text = result.get("response", "").strip() if not response_text: logger.info("VLM Quick Find : réponse vide du VLM (%.1fs)", elapsed) return None # Parser la réponse JSON (réutiliser le parser robuste d'OllamaClient) parsed = client._extract_json_from_response(response_text) if parsed is None: logger.info( "VLM Quick Find : réponse non-JSON (%.1fs) — %.80s", elapsed, response_text, ) return None # Valider les coordonnées x_pct = parsed.get("x_pct") y_pct = parsed.get("y_pct") confidence = float(parsed.get("confidence", 0.0)) if x_pct is None or y_pct is None or confidence < 0.3: logger.info( "VLM Quick Find : élément non trouvé ou confiance trop basse " "(%.1fs, confidence=%.2f) pour '%s'", elapsed, confidence, target_description[:80] if target_description else "(anchor only)", ) return None x_pct = float(x_pct) y_pct = float(y_pct) # Vérifier que les coordonnées sont dans les bornes [0, 1] if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0): logger.info( "VLM Quick Find : coordonnées hors bornes (%.4f, %.4f), ignoré", x_pct, y_pct, ) return None mode_str = "multi-image" if has_anchor else "description" desc_short = (target_description[:60] + "...") if target_description and len(target_description) > 60 else (target_description or "(anchor)") logger.info( "VLM Quick Find OK [%s] : '%s' → (%.4f, %.4f) confidence=%.2f en %.1fs", mode_str, desc_short, x_pct, y_pct, confidence, elapsed, ) return { "resolved": True, "method": "vlm_quick_find", "x_pct": round(x_pct, 6), "y_pct": round(y_pct, 6), "matched_element": { "label": target_description or "anchor_visual", "type": "vlm_located", "role": "vlm_quick_find", "confidence": confidence, }, "score": confidence, } except Exception as e: elapsed = time.time() - t0 logger.warning( "VLM Quick Find : exception (%.1fs) — %s", elapsed, e, ) return None # --------------------------------------------------------------------------- # Résolution par VLM Grounding Direct (configurable via RPA_VLM_MODEL) # --------------------------------------------------------------------------- def _resolve_by_grounding( screenshot_path: str, target_spec: Dict[str, Any], screen_width: int, screen_height: int, ) -> Optional[Dict[str, Any]]: """Résoudre une cible via grounding VLM direct. Le modèle VLM (gemma4:e4b par défaut, configurable via RPA_VLM_MODEL) reçoit le screenshot + une description textuelle et retourne directement les coordonnées de l'élément. Pas de SomEngine, pas de numérotation — le VLM fait du grounding UI natif. Approche plus fiable que SomEngine+VLM pour les icônes et éléments visuels sans texte (logo Windows, disquette, bouton fermer). """ t0 = time.time() # Construire la description de la cible by_text = target_spec.get("by_text", "").strip() vlm_desc = target_spec.get("vlm_description", "").strip() window_title = target_spec.get("window_title", "").strip() if by_text: description = by_text elif vlm_desc: description = vlm_desc else: return None # Utiliser la capture fenêtre si disponible (plus ciblée, moins de bruit) # Sinon fallback sur le full screen window_capture = target_spec.get("window_capture", {}) window_rect = window_capture.get("rect") # [x1, y1, x2, y2] écran try: from PIL import Image as PILImage from pathlib import Path # Utiliser la fenêtre active : cropper depuis le screenshot full # via window_rect (fonctionne au replay comme à l'enregistrement) img = PILImage.open(screenshot_path) if window_rect: x1, y1, x2, y2 = window_rect img = img.crop((x1, y1, x2, y2)) using_window = True logger.debug("Grounding : crop fenêtre (%d,%d,%d,%d) → %dx%d", x1, y1, x2, y2, *img.size) else: using_window = False orig_w, orig_h = img.size small_w, small_h = orig_w, orig_h # pas de redimensionnement buf = io.BytesIO() img.save(buf, format="JPEG", quality=80) shot_b64 = base64.b64encode(buf.getvalue()).decode() except Exception as e: logger.warning("Grounding : erreur chargement image — %s", e) return None # Prompt natif Qwen2.5-VL — format bbox_2d (le seul fiable) # Ajouter la position relative pour désambiguïser (ex: deux "Rechercher" à l'écran) original_pos = target_spec.get("original_position", {}) pos_hint = "" y_rel = original_pos.get("y_relative", "") x_rel = original_pos.get("x_relative", "") if y_rel or x_rel: pos_hint = f" located {y_rel} {x_rel} of the screen".strip() prompt = f"Detect '{description}'{pos_hint} in this image with a bounding box." # Le grounding nécessite un modèle entraîné pour les coordonnées (bbox_2d). # Qwen2.5-VL est le seul qui retourne des positions précises. # gemma4 comprend les images mais ne sait pas localiser en coordonnées. _grounding_model = os.environ.get("RPA_GROUNDING_MODEL", "qwen2.5vl:7b") # Appel VLM — vLLM (GPU, rapide) en priorité, Ollama en fallback import requests as _requests content = "" # Port vLLM configurable via env _vllm_port = os.environ.get("VLLM_PORT", "8100") _vllm_model = os.environ.get("VLLM_MODEL", "Qwen/Qwen2.5-VL-7B-Instruct-AWQ") # Essai 1 : vLLM (API OpenAI-compatible, GPU) try: vllm_resp = _requests.post( f"http://localhost:{_vllm_port}/v1/chat/completions", json={ "model": _vllm_model, "messages": [ {"role": "system", "content": "You locate UI elements on screenshots. Return coordinates."}, {"role": "user", "content": [ {"type": "text", "text": prompt}, {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{shot_b64}"}}, ]}, ], "temperature": 0.1, "max_tokens": 80, }, timeout=30, ) if vllm_resp.ok: content = vllm_resp.json().get("choices", [{}])[0].get("message", {}).get("content", "") if content: logger.debug("Grounding via vLLM OK") except Exception as e: logger.debug("vLLM non disponible (%s), fallback Ollama", e) # Essai 2 : Ollama (qwen2.5vl:7b pour le grounding — format bbox_2d natif) if not content: try: resp = _requests.post("http://localhost:11434/api/chat", json={ "model": _grounding_model, "messages": [ {"role": "user", "content": prompt, "images": [shot_b64]}, ], "stream": False, "options": {"temperature": 0.1, "num_predict": 100}, }, timeout=60) content = resp.json().get("message", {}).get("content", "") except Exception as e: logger.info("Grounding VLM timeout/erreur : %s", e) return None elapsed = time.time() - t0 # Parser la réponse — supporte bbox_2d en pixels, JSON %, arrays bruts x_pct, y_pct = None, None # Format 1 : bbox_2d en pixels [x, y] ou [x1, y1, x2, y2] bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', content) if bbox_match: coords = [float(v.strip()) for v in bbox_match.group(1).split(",")] if len(coords) == 2: x_pct = coords[0] / small_w y_pct = coords[1] / small_h elif len(coords) >= 4: x_pct = (coords[0] + coords[2]) / 2 / small_w y_pct = (coords[1] + coords[3]) / 2 / small_h # Format 2 : JSON {"x": 0.XX, "y": 0.YY} if x_pct is None: json_match = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content) if json_match: x_val, y_val = float(json_match.group(1)), float(json_match.group(2)) # Si > 1, c'est en pixels if x_val > 1: x_pct = x_val / small_w y_pct = y_val / small_h else: x_pct = x_val y_pct = y_val # Format 3 : {"x_pct": 0.XX, "y_pct": 0.YY} if x_pct is None: pct_match = re.search(r'"x_pct"\s*:\s*([\d.]+).*?"y_pct"\s*:\s*([\d.]+)', content) if pct_match: x_pct = float(pct_match.group(1)) y_pct = float(pct_match.group(2)) # Format 4 : array brut [x1, y1, x2, y2] ou [x, y] if x_pct is None: arr_match = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content) if arr_match: vals = [float(v) for v in arr_match.groups() if v is not None] if len(vals) >= 4: x_pct = (vals[0] + vals[2]) / 2 / small_w y_pct = (vals[1] + vals[3]) / 2 / small_h elif len(vals) == 2: x_pct = vals[0] / small_w y_pct = vals[1] / small_h if x_pct is None or y_pct is None: # Fallback multi-image : screenshot + crop → grounding sans description anchor_b64 = target_spec.get("anchor_image_base64", "") if anchor_b64: try: prompt_mi = ( "Image 1 is a screenshot. Image 2 shows a UI element.\n" "Find where Image 2 appears on Image 1.\n" 'Return position: {"x": NNN, "y": NNN} in pixels of Image 1.' ) resp2 = _requests.post("http://localhost:11434/api/chat", json={ "model": _grounding_model, "messages": [ {"role": "user", "content": prompt_mi, "images": [shot_b64, anchor_b64]}, ], "stream": False, "options": {"temperature": 0.1, "num_predict": 50}, }, timeout=60) content2 = resp2.json().get("message", {}).get("content", "") elapsed = time.time() - t0 # Parser tous les formats arr2 = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content2) if arr2: vals = [float(v) for v in arr2.groups() if v is not None] if len(vals) >= 4: x_pct = (vals[0] + vals[2]) / 2 / small_w y_pct = (vals[1] + vals[3]) / 2 / small_h elif len(vals) == 2: x_pct = vals[0] / small_w y_pct = vals[1] / small_h if x_pct is None: json2 = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content2) if json2: x_pct = float(json2.group(1)) / small_w y_pct = float(json2.group(2)) / small_h if x_pct is not None: logger.info("Grounding multi-image OK (%.1fs)", elapsed) except Exception as e: logger.debug("Grounding multi-image erreur: %s", e) if x_pct is None or y_pct is None: logger.info( "Grounding : réponse non parsable (%.1fs) — %s", elapsed, content[:120], ) return None # Valider les bornes if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0): logger.info("Grounding : coordonnées hors bornes (%.3f, %.3f)", x_pct, y_pct) return None # Convertir coordonnées fenêtre → coordonnées écran if using_window and window_rect: win_x1, win_y1, win_x2, win_y2 = window_rect win_w = win_x2 - win_x1 win_h = win_y2 - win_y1 # x_pct/y_pct sont relatifs à la fenêtre, convertir en relatif à l'écran abs_x = win_x1 + x_pct * win_w abs_y = win_y1 + y_pct * win_h x_pct = abs_x / screen_width y_pct = abs_y / screen_height logger.info( "Grounding OK [%s/window] : '%s' → (%.4f, %.4f) en %.1fs", _grounding_model, description[:50], x_pct, y_pct, elapsed, ) else: logger.info( "Grounding OK [%s/full] : '%s' → (%.4f, %.4f) en %.1fs", _grounding_model, description[:50], x_pct, y_pct, elapsed, ) return { "resolved": True, "method": "grounding_vlm", "x_pct": round(x_pct, 6), "y_pct": round(y_pct, 6), "matched_element": { "label": description[:60], "type": "grounding", "role": "grounding_vlm", "confidence": 0.85, }, "score": 0.85, } # --------------------------------------------------------------------------- # Résolution Set-of-Mark : SomEngine (détection) + VLM (identification) # --------------------------------------------------------------------------- def _get_som_engine_api(): """Singleton SomEngine partagé.""" try: from core.detection.som_engine import get_shared_engine return get_shared_engine() except ImportError: return None def _resolve_by_som( screenshot_path: str, target_spec: Dict[str, Any], screen_width: int, screen_height: int, ) -> Optional[Dict[str, Any]]: """Résoudre une cible UI via Set-of-Mark + VLM. Pipeline : 1. SomEngine détecte tous les éléments et les numérote sur le screenshot 2. VLM reçoit l'image annotée + description de la cible 3. VLM identifie le numéro du mark → coordonnées précises Avantages vs VLM direct : - Le VLM n'a qu'à identifier (son point fort), pas localiser - Les coordonnées viennent de SomEngine (pixel-perfect) - Question simple "quel numéro ?" → réponse simple Args: screenshot_path: Chemin du screenshot actuel target_spec: Spécification de la cible (vlm_description, som_element, etc.) screen_width: Largeur écran en pixels screen_height: Hauteur écran en pixels Returns: Dict avec resolved=True et coordonnées, ou None si indisponible. """ engine = _get_som_engine_api() if engine is None: return None client = _get_vlm_client() if client is None: return None t0 = time.time() # ── 1. Lancer SomEngine sur le screenshot actuel ── try: from PIL import Image as PILImage img = PILImage.open(screenshot_path).convert("RGB") som_result = engine.analyze(img) except Exception as e: logger.warning("SoM resolve : erreur analyse — %s", e) return None if not som_result.elements: logger.info("SoM resolve : 0 éléments détectés") return None # ── 2. Construire la description de la cible ── som_element = target_spec.get("som_element", {}) vlm_description = target_spec.get("vlm_description", "") anchor_label = som_element.get("label", "") # Construire un prompt riche target_parts = [] if anchor_label: target_parts.append(f"texte '{anchor_label}'") if vlm_description: target_parts.append(vlm_description) if not target_parts: # Sans description, SoM resolve ne peut pas fonctionner logger.debug("SoM resolve : pas de description pour identifier l'élément") return None target_desc = ", ".join(target_parts) # ── 2.5. Raccourci : si le label est connu, chercher par texte directement ── # Pas besoin du VLM si on connaît le texte exact de l'élément ! if anchor_label and len(anchor_label) >= 2: label_lower = anchor_label.lower() # Match exact d'abord, puis partiel exact_matches = [ e for e in som_result.elements if e.label and e.label.lower() == label_lower ] if not exact_matches: exact_matches = [ e for e in som_result.elements if e.label and len(e.label) >= 3 and ( label_lower in e.label.lower() or e.label.lower() in label_lower ) ] if len(exact_matches) == 1: # Match unique par texte → pas besoin du VLM elem = exact_matches[0] elapsed = time.time() - t0 cx_norm, cy_norm = elem.center_norm logger.info( "SoM resolve FAST : match texte unique '#%d %s' → (%.4f, %.4f) en %.1fs", elem.id, elem.label, cx_norm, cy_norm, elapsed, ) return { "resolved": True, "method": "som_text_match", "x_pct": round(cx_norm, 6), "y_pct": round(cy_norm, 6), "matched_element": { "label": elem.label, "type": elem.source, "role": "som_text_match", "confidence": max(elem.confidence, 0.85), "som_id": elem.id, }, "score": max(elem.confidence, 0.85), } elif len(exact_matches) > 1: # Plusieurs matchs texte → disambiguïser par proximité à la position originale ref_center = som_element.get("center_norm", []) if ref_center and len(ref_center) == 2: ref_x, ref_y = ref_center best = min( exact_matches, key=lambda e: ( (e.center_norm[0] - ref_x) ** 2 + (e.center_norm[1] - ref_y) ** 2 ), ) elapsed = time.time() - t0 cx_norm, cy_norm = best.center_norm dist = ((cx_norm - ref_x) ** 2 + (cy_norm - ref_y) ** 2) ** 0.5 if dist < 0.15: # Tolérance 15% de l'écran logger.info( "SoM resolve FAST : match texte proximité '#%d %s' (dist=%.3f) " "→ (%.4f, %.4f) en %.1fs", best.id, best.label, dist, cx_norm, cy_norm, elapsed, ) return { "resolved": True, "method": "som_text_match", "x_pct": round(cx_norm, 6), "y_pct": round(cy_norm, 6), "matched_element": { "label": best.label, "type": best.source, "role": "som_text_match_proximity", "confidence": max(best.confidence, 0.80), "som_id": best.id, }, "score": max(best.confidence, 0.80), } logger.info( "SoM resolve : %d matchs texte pour '%s', VLM nécessaire", len(exact_matches), anchor_label, ) # ── 2.7. Fallback : template matching anchor vs éléments SomEngine ── # Pour les icônes sans texte : comparer le crop de référence contre # chaque région YOLO détectée par SomEngine. anchor_b64 = target_spec.get("anchor_image_base64", "") by_text = target_spec.get("by_text", "").strip() if anchor_b64 and (not anchor_label or not by_text): try: import cv2 import numpy as np # Décoder l'anchor anc_bytes = base64.b64decode(anchor_b64) anc_array = np.frombuffer(anc_bytes, dtype=np.uint8) anc_img = cv2.imdecode(anc_array, cv2.IMREAD_GRAYSCALE) # Charger le screenshot en OpenCV screenshot_cv = cv2.imread(screenshot_path, cv2.IMREAD_GRAYSCALE) if anc_img is not None and screenshot_cv is not None: # Template matching de l'anchor sur le SCREENSHOT ENTIER # (pas sur les régions individuelles — l'anchor est souvent plus grand) anc_h, anc_w = anc_img.shape[:2] if screenshot_cv.shape[0] >= anc_h and screenshot_cv.shape[1] >= anc_w: res = cv2.matchTemplate(screenshot_cv, anc_img, cv2.TM_CCOEFF_NORMED) _, max_score, _, max_loc = cv2.minMaxLoc(res) if max_score >= 0.5: # Centre du match match_cx = max_loc[0] + anc_w // 2 match_cy = max_loc[1] + anc_h // 2 # Trouver l'élément SomEngine le plus proche du centre du match best_elem = None best_dist = float("inf") for elem in som_result.elements: cx, cy = elem.center dist = ((match_cx - cx) ** 2 + (match_cy - cy) ** 2) ** 0.5 if dist < best_dist: best_dist = dist best_elem = elem if best_elem and best_dist < 100: # Max 100px de distance elapsed = time.time() - t0 cx_norm, cy_norm = best_elem.center_norm logger.info( "SoM resolve ANCHOR : match crop score=%.3f → " "elem '#%d %s' (dist=%.0fpx) → (%.4f, %.4f) en %.1fs", max_score, best_elem.id, best_elem.label, best_dist, cx_norm, cy_norm, elapsed, ) return { "resolved": True, "method": "som_anchor_match", "x_pct": round(cx_norm, 6), "y_pct": round(cy_norm, 6), "matched_element": { "label": best_elem.label or f"icon #{best_elem.id}", "type": best_elem.source, "role": "som_anchor_match", "confidence": max_score, "som_id": best_elem.id, }, "score": max_score, } except ImportError: pass except Exception as e: logger.debug("SoM anchor match erreur : %s", e) # ── 3. Sauvegarder l'image annotée SoM temporairement ── if som_result.som_image is None: logger.debug("SoM resolve : pas d'image annotée, skip VLM") return None try: with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as tmp: som_result.som_image.save(tmp, format="JPEG", quality=85) som_img_path = tmp.name except Exception as e: logger.warning("SoM resolve : erreur sauvegarde image annotée — %s", e) return None # ── 4. VLM : identifier le numéro du mark ── # Lister uniquement les éléments avec un label (plus concis pour le VLM) labeled_elements = [e for e in som_result.elements if e.label][:30] elements_list = "\n".join( f" #{e.id}: '{e.label}'" for e in labeled_elements ) # Multi-image : SoM annotée + anchor crop (si disponible) anchor_b64 = target_spec.get("anchor_image_base64", "") extra_images = [anchor_b64] if anchor_b64 else None if extra_images: prompt = ( "Image 1 shows the screen with numbered marks on each UI element.\n" "Image 2 shows the element I'm looking for.\n\n" f"Target: {target_desc}\n\n" f"Detected elements:\n{elements_list}\n\n" "Which mark number matches the target element in Image 2?\n" 'Answer with JSON only: {"mark_id": N, "confidence": 0.9}' ) else: prompt = ( f"I'm looking for: {target_desc}\n\n" f"Detected elements:\n{elements_list}\n\n" "Which number is the correct element?\n" 'Answer with JSON only: {"mark_id": N, "confidence": 0.9}' ) system_prompt = "You identify UI elements by number. Output JSON only, no explanation." try: result = client.generate( prompt=prompt, image_path=som_img_path, system_prompt=system_prompt, temperature=0.1, max_tokens=50, force_json=False, extra_images_b64=extra_images, ) except Exception as e: logger.warning("SoM resolve : erreur VLM — %s", e) return None finally: try: os.unlink(som_img_path) except OSError: pass elapsed = time.time() - t0 if not result.get("success"): logger.info("SoM resolve : VLM échoué (%.1fs)", elapsed) return None # ── 5. Parser la réponse et retourner les coordonnées ── response_text = result.get("response", "").strip() # Tenter d'abord l'extraction JSON standard parsed = client._extract_json_from_response(response_text) # Fallback : extraire un nombre simple de la réponse if parsed is None: numbers = re.findall(r'\b(\d+)\b', response_text) if numbers: candidate = int(numbers[0]) if som_result.get_element_by_id(candidate) is not None: parsed = {"mark_id": candidate, "confidence": 0.7} logger.debug("SoM resolve : extraction numéro fallback → #%d", candidate) if parsed is None: logger.info("SoM resolve : réponse non-JSON (%.1fs) — %.80s", elapsed, response_text) return None mark_id = parsed.get("mark_id") confidence = float(parsed.get("confidence", 0.0)) if mark_id is None or confidence < 0.3: logger.info( "SoM resolve : mark non trouvé ou confiance trop basse (mark=%s, conf=%.2f, %.1fs)", mark_id, confidence, elapsed, ) return None mark_id = int(mark_id) elem = som_result.get_element_by_id(mark_id) if elem is None: logger.warning("SoM resolve : mark #%d inexistant (%.1fs)", mark_id, elapsed) return None cx_norm, cy_norm = elem.center_norm logger.info( "SoM resolve OK : mark #%d '%s' → (%.4f, %.4f) conf=%.2f en %.1fs (%d éléments)", mark_id, elem.label, cx_norm, cy_norm, confidence, elapsed, len(som_result.elements), ) return { "resolved": True, "method": "som_vlm", "x_pct": round(cx_norm, 6), "y_pct": round(cy_norm, 6), "matched_element": { "label": elem.label or f"mark #{mark_id}", "type": elem.source, "role": "som_identified", "confidence": confidence, "som_id": mark_id, }, "score": confidence, } # ========================================================================= # Orchestrateur — Résolution cible complète (synchrone) # ========================================================================= # ========================================================================= # V4 : Résolution pilotée par le plan pré-compilé # ========================================================================= def _resolve_with_precompiled_order( screenshot_path: str, target_spec: Dict[str, Any], resolve_order: list, screen_width: int, screen_height: int, fallback_x_pct: float, fallback_y_pct: float, ) -> Optional[Dict[str, Any]]: """Résoudre la cible en suivant l'ordre pré-compilé par l'ExecutionCompiler. C'est le chemin V4 : l'ExecutionPlan a déjà décidé quelle méthode utiliser (OCR, template, VLM) selon le learning et les caractéristiques de l'élément. Le runtime ne fait qu'exécuter l'ordre — pas de cascade improvisée. resolve_order : liste de méthodes dans l'ordre à essayer ex: ["ocr", "template", "vlm"] ex: ["template", "ocr"] (template d'abord pour les icônes) ex: ["vlm"] (dernier recours) Returns: Dict résultat si trouvé, None si toutes les méthodes échouent. """ import time as _time t_start = _time.time() by_text = target_spec.get("by_text", "").strip() anchor_b64 = target_spec.get("anchor_image_base64", "") vlm_description = target_spec.get("vlm_description", "") for method in resolve_order: method_start = _time.time() if method == "ocr" and by_text: # OCR : chercher le texte visible dans l'image # C'est le chemin rapide — idéalement < 200ms try: result = _resolve_by_ocr_text( screenshot_path=screenshot_path, target_text=by_text, screen_width=screen_width, screen_height=screen_height, ) if result and result.get("resolved"): elapsed = (_time.time() - method_start) * 1000 logger.info( "V4 OCR : OK en %.0fms pour '%s' → (%.3f, %.3f)", elapsed, by_text[:30], result.get("x_pct", 0), result.get("y_pct", 0), ) result["resolve_method"] = "v4_ocr" result["resolve_elapsed_ms"] = elapsed return result except Exception as e: logger.debug("V4 OCR erreur : %s", e) elif method == "template" and anchor_b64: # Template matching : comparer des pixels try: result = _resolve_by_template_matching( screenshot_path=screenshot_path, anchor_image_b64=anchor_b64, screen_width=screen_width, screen_height=screen_height, confidence_threshold=0.85, ) if result and result.get("resolved"): elapsed = (_time.time() - method_start) * 1000 logger.info( "V4 TEMPLATE : OK en %.0fms score=%.3f → (%.3f, %.3f)", elapsed, result.get("score", 0), result.get("x_pct", 0), result.get("y_pct", 0), ) result["resolve_method"] = "v4_template" result["resolve_elapsed_ms"] = elapsed return result except Exception as e: logger.debug("V4 template erreur : %s", e) elif method == "vlm" and (vlm_description or by_text): # VLM : exception handler (lent, dernier recours) description = vlm_description or f"élément '{by_text}'" try: result = _vlm_quick_find( screenshot_path=screenshot_path, target_description=description, screen_width=screen_width, screen_height=screen_height, anchor_image_b64=anchor_b64, ) if result and result.get("resolved"): elapsed = (_time.time() - method_start) * 1000 logger.info( "V4 VLM : OK en %.0fms pour '%s' → (%.3f, %.3f)", elapsed, description[:30], result.get("x_pct", 0), result.get("y_pct", 0), ) result["resolve_method"] = "v4_vlm" result["resolve_elapsed_ms"] = elapsed return result except Exception as e: logger.debug("V4 VLM erreur : %s", e) total_elapsed = (_time.time() - t_start) * 1000 logger.info( "V4 resolve : toutes les méthodes (%s) ont échoué en %.0fms", resolve_order, total_elapsed, ) return None def _resolve_by_ocr_text( screenshot_path: str, target_text: str, screen_width: int, screen_height: int, ) -> Optional[Dict[str, Any]]: """Localiser du texte dans l'image via OCR (docTR ou fallback). C'est le chemin rapide V4 : pas de VLM, pas de template matching, juste de l'OCR direct. Idéal pour les éléments avec texte visible. Returns: Dict avec x_pct, y_pct, score si trouvé, None sinon. """ try: from doctr.io import DocumentFile from doctr.models import ocr_predictor except ImportError: logger.debug("docTR non disponible pour V4 OCR") return None try: # Utiliser un cache global pour éviter de recharger le modèle à chaque appel global _V4_OCR_PREDICTOR try: _V4_OCR_PREDICTOR except NameError: _V4_OCR_PREDICTOR = None if _V4_OCR_PREDICTOR is None: _V4_OCR_PREDICTOR = ocr_predictor( det_arch='db_resnet50', reco_arch='crnn_vgg16_bn', pretrained=True, ) doc = DocumentFile.from_images([screenshot_path]) result = _V4_OCR_PREDICTOR(doc) # Chercher le texte (match exact, insensible à la casse) target_lower = target_text.lower().strip() best_match = None best_score = 0.0 for page in result.pages: for block in page.blocks: for line_obj in block.lines: line_text = " ".join(w.value for w in line_obj.words) line_lower = line_text.lower() # Match exact > contient > mot par mot score = 0.0 if target_lower == line_lower: score = 1.0 elif target_lower in line_lower: score = 0.8 elif any(target_lower == w.value.lower() for w in line_obj.words): score = 0.9 if score > best_score: # Coordonnées de la ligne entière (bbox) box = line_obj.geometry # ((x1,y1), (x2,y2)) normalisées 0-1 cx = (box[0][0] + box[1][0]) / 2 cy = (box[0][1] + box[1][1]) / 2 best_match = { "resolved": True, "method": "v4_ocr", "x_pct": cx, "y_pct": cy, "score": score, "matched_text": line_text, } best_score = score if best_match and best_score >= 0.7: return best_match except Exception as e: logger.debug("docTR OCR erreur : %s", e) return None def _resolve_target_sync( screenshot_path: str, target_spec: Dict[str, Any], screen_width: int, screen_height: int, fallback_x_pct: float, fallback_y_pct: float, strict_mode: bool = False, processor=None, ) -> Dict[str, Any]: """Résoudre la cible visuellement (exécuté dans un thread séparé). Hiérarchie de résolution (strict_mode=True, replay sessions) — VLM-FIRST : 1. VLM Quick Find (~3-8s) — compréhension sémantique de l'écran, multi-image (screenshot + crop de référence + description riche) 1.5. SoM + VLM (~5-15s) — SomEngine numérote les éléments, VLM identifie le bon 2. Template matching OpenCV (~100ms) — fallback pixel, seuil STRICT 0.90 3. resolved=False → STOP le replay Le VLM comprend le contexte (titre de fenêtre, type d'élément, position) et peut trouver un élément même si l'écran est différent de l'enregistrement. Le template matching ne compare que des pixels et produit des faux positifs. Hiérarchie classique (strict_mode=False, VWB et autres) — INCHANGÉE : 1. Template matching OpenCV (~100ms) — seuil 0.70 1.5. VLM Quick Find si template échoue et by_text/by_role dispo 2. by_text/by_role → VLM Quick Find puis ScreenAnalyzer 3. fallback coordonnées statiques """ anchor_image_b64 = target_spec.get("anchor_image_base64", "") # [REPLAY] log structuré d'entrée résolution _window_title_log = target_spec.get("window_title", "") or "" _resolve_order_log = target_spec.get("resolve_order") or [] _uia_target_log = target_spec.get("uia_target") or {} _by_text_log = target_spec.get("by_text", "") _vlm_desc_log = target_spec.get("vlm_description", "") logger.info( f"[REPLAY] RESOLVE_ENTRY window='{_window_title_log}' " f"resolve_order={_resolve_order_log} " f"has_uia={bool(_uia_target_log)} uia_name='{_uia_target_log.get('name','')[:40]}' " f"has_anchor={bool(anchor_image_b64)} " f"by_text='{_by_text_log[:40]}' vlm_desc='{_vlm_desc_log[:40]}' " f"strict_mode={strict_mode} screen={screen_width}x{screen_height}" ) # =================================================================== # PHASE 1 APPRENTISSAGE : Lookup mémoire persistante (Fiche #18) # =================================================================== # Avant TOUTE résolution coûteuse (OCR/template/VLM), on consulte la # mémoire persistante (TargetMemoryStore). Si cette cible a été résolue # avec succès ≥2 fois sur cet écran (fail_ratio < 30%), on retourne # directement les coordonnées mémorisées. # # Hit mémoire : <10ms (vs 300ms-15s de résolution) # Miss mémoire : aucun overhead, on continue la cascade normale # # Les coords stockées sont celles qui ont PASSÉ la post-condition # (title_match strict) lors des replays précédents. C'est la # cristallisation par répétition : Léa = stagiaire qui apprend. try: from .replay_memory import memory_lookup _window_title = target_spec.get("window_title", "") or "" if _window_title: _mem_result = memory_lookup( window_title=_window_title, target_spec=target_spec, ) if _mem_result: # Hit mémoire : on skip toute la cascade. # Les coordonnées sont sanity-checked dans memory_lookup(). return _mem_result except Exception as _exc: logger.debug("Memory lookup skipped : %s", _exc) # =================================================================== # V4 : Résolution pilotée par le plan pré-compilé # =================================================================== # Si le target_spec contient `resolve_order`, il vient d'un ExecutionPlan # compilé. On honore cet ordre au lieu de faire la cascade par défaut. # C'est le "zéro VLM au runtime" : on essaie d'abord la stratégie # pré-compilée (OCR, template, ou VLM). resolve_order = target_spec.get("resolve_order") if resolve_order and isinstance(resolve_order, list): logger.info( "V4 resolve : ordre pré-compilé = %s", resolve_order, ) result = _resolve_with_precompiled_order( screenshot_path=screenshot_path, target_spec=target_spec, resolve_order=resolve_order, screen_width=screen_width, screen_height=screen_height, fallback_x_pct=fallback_x_pct, fallback_y_pct=fallback_y_pct, ) if result and result.get("resolved"): return result # Si les méthodes pré-compilées ont toutes échoué, on continue # vers la cascade legacy (compatibilité et robustesse). logger.info( "V4 resolve : toutes les méthodes pré-compilées ont échoué, " "fallback cascade legacy" ) # =================================================================== # MODE STRICT (replay sessions) — Stratégie VLM-FIRST # =================================================================== if strict_mode and anchor_image_b64: vlm_description = target_spec.get("vlm_description", "") by_text_strict = target_spec.get("by_text", "").strip() # Fallback : construire la description depuis by_text/by_role if not vlm_description: by_role = target_spec.get("by_role", "").strip() if by_text_strict or by_role: vlm_description = _build_target_description(target_spec) # --------------------------------------------------------------- # Étape -1 : Vérification CLIP (si embedding de référence fourni) # Vérifie qu'on est dans la bonne application avant de chercher # l'élément. Filet de sécurité contre les clics au mauvais endroit. # --------------------------------------------------------------- clip_embedding = target_spec.get("clip_embedding") if clip_embedding: try: from core.embedding.clip_embedder import CLIPEmbedder from PIL import Image as _PILImage import numpy as _np _clip = CLIPEmbedder() # Embedding de l'écran actuel (fenêtre si possible) window_capture = target_spec.get("window_capture", {}) window_rect = window_capture.get("rect") current_img = _PILImage.open(screenshot_path) if window_rect: current_img = current_img.crop(tuple(window_rect)) current_emb = _np.array(_clip.embed_image(current_img), dtype=_np.float32).flatten() ref_emb = _np.array(clip_embedding, dtype=_np.float32).flatten() clip_sim = float(_np.dot(current_emb, ref_emb) / ( _np.linalg.norm(current_emb) * _np.linalg.norm(ref_emb) )) logger.info(f"CLIP vérification : similarité={clip_sim:.3f}") if clip_sim < 0.75: logger.warning( f"CLIP MISMATCH : sim={clip_sim:.3f} < 0.75 — " f"écran actuel trop différent de l'enregistrement" ) return { "resolved": False, "method": "clip_mismatch", "reason": f"clip_similarity_{clip_sim:.3f}", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } except Exception as e: logger.debug(f"CLIP vérification erreur (non-bloquant) : {e}") # --------------------------------------------------------------- # Étape 0 : Choisir la stratégie selon le type d'élément # --------------------------------------------------------------- by_text_source = target_spec.get("by_text_source", "") has_window = bool(target_spec.get("window_capture", {}).get("rect")) if by_text_strict and by_text_source in ("ocr", "vlm") and has_window: # Texte visible DANS une fenêtre → grounding VLM sur fenêtre croppée grounding_result = _resolve_by_grounding( screenshot_path=screenshot_path, target_spec=target_spec, screen_width=screen_width, screen_height=screen_height, ) if grounding_result and grounding_result.get("resolved"): logger.info( "Strict resolve GROUNDING : OK (%.4f, %.4f) pour '%s'", grounding_result.get("x_pct", 0), grounding_result.get("y_pct", 0), by_text_strict[:50], ) return grounding_result if not by_text_strict or by_text_source not in ("ocr", "vlm"): # Template matching pour les éléments sans texte (icônes pures) window_capture = target_spec.get("window_capture", {}) window_rect = window_capture.get("rect") from pathlib import Path as _Path _full = _Path(screenshot_path) _win = _full.parent / _full.name.replace("_full.png", "_window.png") tm_path = str(_win) if _win.is_file() and window_rect else screenshot_path tm_screen_w = (window_rect[2] - window_rect[0]) if window_rect and _win.is_file() else screen_width tm_screen_h = (window_rect[3] - window_rect[1]) if window_rect and _win.is_file() else screen_height result = _resolve_by_template_matching( screenshot_path=tm_path, anchor_image_b64=anchor_image_b64, screen_width=tm_screen_w, screen_height=tm_screen_h, confidence_threshold=0.90, ) if result and result.get("score", 0) >= 0.90: x_tm, y_tm = result["x_pct"], result["y_pct"] # Convertir coordonnées fenêtre → écran si nécessaire if window_rect and _win.is_file(): abs_x = window_rect[0] + x_tm * tm_screen_w abs_y = window_rect[1] + y_tm * tm_screen_h result["x_pct"] = round(abs_x / screen_width, 6) result["y_pct"] = round(abs_y / screen_height, 6) logger.info( "Strict resolve TEMPLATE : icon match (score=%.3f)", result.get("score", 0), ) return result # --------------------------------------------------------------- # Étape 1 : VLM Quick Find (fallback, multi-image) # --------------------------------------------------------------- if vlm_description or anchor_image_b64: vlm_result = _vlm_quick_find( screenshot_path=screenshot_path, target_description=vlm_description, anchor_image_b64=anchor_image_b64, ) if vlm_result and vlm_result.get("resolved"): if vlm_result.get("score", 0) >= 0.3: logger.info( "Strict resolve VLM-first : VLM OK (score=%.2f) pour '%s'", vlm_result.get("score", 0), vlm_description[:60] if vlm_description else "(anchor)", ) return vlm_result else: logger.info( "Strict resolve VLM-first : VLM score=%.2f trop bas, passage template", vlm_result.get("score", 0), ) else: logger.info( "Strict resolve VLM-first : VLM échoué pour '%s', passage template matching", vlm_description[:60] if vlm_description else "(anchor)", ) # --------------------------------------------------------------- # Étape 1.5 : SoM + VLM (Set-of-Mark + identification) # SomEngine numérote les éléments, VLM identifie le bon numéro. # Plus fiable que le VLM direct car le VLM n'a qu'à identifier, # pas localiser — et les coordonnées sont pixel-perfect. # --------------------------------------------------------------- som_element = target_spec.get("som_element", {}) if som_element or vlm_description: som_result = _resolve_by_som( screenshot_path=screenshot_path, target_spec=target_spec, screen_width=screen_width, screen_height=screen_height, ) if som_result and som_result.get("resolved"): logger.info( "Strict resolve SoM+VLM : OK (score=%.2f, mark=#%s)", som_result.get("score", 0), som_result.get("matched_element", {}).get("som_id", "?"), ) return som_result else: logger.info("Strict resolve SoM+VLM : échoué, passage template matching") # --------------------------------------------------------------- # Étape 2 : Template matching (fallback pixel) — seuil STRICT 0.90 # --------------------------------------------------------------- result = _resolve_by_template_matching( screenshot_path=screenshot_path, anchor_image_b64=anchor_image_b64, screen_width=screen_width, screen_height=screen_height, confidence_threshold=0.90, ) if result: score = result.get("score", 0) # Score >= 0.95 : match quasi-parfait, pas besoin de valider le contexte if score >= 0.95: logger.info( "Strict resolve VLM-first : template matching fallback OK " "(score=%.3f >= 0.95, contexte skip — match quasi-parfait)", score, ) return result elif _validate_match_context(result, fallback_x_pct, fallback_y_pct, target_spec): logger.info( "Strict resolve VLM-first : template matching fallback OK " "(score=%.3f >= 0.90, context OK)", score, ) return result else: logger.warning( "Strict resolve VLM-first : template score=%.3f MAIS contexte invalide, rejeté", score, ) # --------------------------------------------------------------- # Étape 3 : RIEN ne fonctionne → resolved=False → STOP replay # --------------------------------------------------------------- return { "resolved": False, "method": "strict_vlm_template_failed", "reason": "vlm_and_template_all_failed", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # =================================================================== # MODE CLASSIQUE (VWB et autres) — Comportement existant # =================================================================== # --------------------------------------------------------------- # Stratégie 1 : Template matching par image d'ancre (seuil 0.70) # --------------------------------------------------------------- if anchor_image_b64: result = _resolve_by_template_matching( screenshot_path=screenshot_path, anchor_image_b64=anchor_image_b64, screen_width=screen_width, screen_height=screen_height, confidence_threshold=0.7, ) if result: return result logger.info( "Template matching échoué pour ancre '%s', tentative VLM Quick Find", target_spec.get("anchor_id", "?"), ) # --------------------------------------------------------------- # Stratégie 1.5 : VLM Quick Find (fallback léger après template matching) # --------------------------------------------------------------- by_text = target_spec.get("by_text", "").strip() by_role = target_spec.get("by_role", "").strip() if by_text or by_role: vlm_desc = _build_target_description(target_spec) vlm_result = _vlm_quick_find( screenshot_path=screenshot_path, target_description=vlm_desc, anchor_image_b64=anchor_image_b64, ) if vlm_result: return vlm_result logger.info( "VLM Quick Find échoué pour ancre '%s', fallback coordonnées", target_spec.get("anchor_id", "?"), ) return { "resolved": False, "method": "fallback", "reason": "template_matching_failed", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # --------------------------------------------------------------- # Stratégie 2 : VLM Quick Find (léger, ~5-10s) # --------------------------------------------------------------- by_text = target_spec.get("by_text", "") by_role = target_spec.get("by_role", "") # Si aucun critère sémantique et pas d'ancre, fallback direct if not by_text and not by_role and not anchor_image_b64: return { "resolved": False, "method": "fallback", "reason": "no_target_criteria", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # Tenter le VLM Quick Find AVANT ScreenAnalyzer (beaucoup plus rapide) if by_text or by_role: vlm_desc = _build_target_description(target_spec) vlm_result = _vlm_quick_find( screenshot_path=screenshot_path, target_description=vlm_desc, ) if vlm_result: return vlm_result logger.info( "VLM Quick Find échoué pour '%s', fallback ScreenAnalyzer", vlm_desc, ) # --------------------------------------------------------------- # Stratégie 3 : Matching sémantique via ScreenAnalyzer (~15-20s) # --------------------------------------------------------------- if processor is None: return { "resolved": False, "method": "fallback", "reason": "no_processor", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } processor._ensure_initialized() if processor._screen_analyzer is None: return { "resolved": False, "method": "fallback", "reason": "screen_analyzer_unavailable", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # Analyser le screenshot (Niveaux 1-3 : raw, OCR, UI elements) try: screen_state = processor._screen_analyzer.analyze(screenshot_path) except Exception as e: logger.warning(f"Analyse screenshot échouée: {e}") return { "resolved": False, "method": "fallback", "reason": f"analysis_failed: {e}", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } ui_elements = screen_state.ui_elements or [] if not ui_elements: logger.info("Aucun élément UI détecté, fallback coordonnées") return { "resolved": False, "method": "fallback", "reason": "no_ui_elements", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # Matching de la cible parmi les éléments détectés candidates = [] for elem in ui_elements: score = 0.0 # Score par texte (label) if by_text and elem.label: text_lower = by_text.lower() label_lower = elem.label.lower() if text_lower in label_lower or label_lower in text_lower: score += 0.6 elif _fuzzy_match(text_lower, label_lower): score += 0.3 # Score par rôle if by_role: role_lower = by_role.lower() if elem.role and role_lower in elem.role.lower(): score += 0.3 if elem.type and role_lower in elem.type.lower(): score += 0.2 if score > 0: candidates.append((elem, score)) if not candidates: logger.info( f"Aucun match visuel pour target(text='{by_text}', role='{by_role}') " f"parmi {len(ui_elements)} éléments" ) return { "resolved": False, "method": "fallback", "reason": "no_match", "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, "ui_elements_count": len(ui_elements), } # Trier par score décroissant et prendre le meilleur candidates.sort(key=lambda c: c[1], reverse=True) best_elem, best_score = candidates[0] # Convertir les coordonnées pixel en proportions cx, cy = best_elem.center x_pct = round(cx / screen_width, 6) if screen_width > 0 else 0.0 y_pct = round(cy / screen_height, 6) if screen_height > 0 else 0.0 logger.info( f"Cible résolue visuellement: '{best_elem.label}' ({best_elem.type}/{best_elem.role}) " f"score={best_score:.2f} → ({x_pct:.4f}, {y_pct:.4f})" ) return { "resolved": True, "method": "visual", "x_pct": x_pct, "y_pct": y_pct, "matched_element": { "label": best_elem.label, "type": best_elem.type, "role": best_elem.role, "center": list(best_elem.center), "confidence": best_elem.label_confidence, }, "score": best_score, "candidates_count": len(candidates), "ui_elements_count": len(ui_elements), } def _fuzzy_match(a: str, b: str, threshold: float = 0.6) -> bool: """Match approximatif par ratio de caractères communs.""" if not a or not b: return False common = sum(1 for c in a if c in b) return (common / max(len(a), len(b))) >= threshold def _fallback_response(request: ResolveTargetRequest, reason: str, detail: str) -> Dict: """Réponse de fallback quand la résolution visuelle échoue.""" return { "resolved": False, "method": "fallback", "reason": reason, "detail": detail, "x_pct": request.fallback_x_pct, "y_pct": request.fallback_y_pct, } # ========================================================================= # Validation qualité de résolution (garde score + garde proximité) # ========================================================================= # # Couche de sécurité appliquée en sortie de la cascade de résolution pour # rejeter les résultats peu fiables. Deux checks : # # 1. Seuil de score minimum par méthode — chaque stratégie a sa propre # fiabilité empirique. Un SoM+VLM à 0.59 n'a pas le même sens qu'un # hybrid_text_direct à 0.59. # # 2. Garde de proximité — si les coordonnées enregistrées lors de la # démonstration sont disponibles (via fallback_x/y_pct), on rejette # tout résultat dont les coordonnées résolues sont aberrantes par # rapport aux coordonnées attendues. Un écart > 20% de l'écran # signale un faux positif (ex: SoM qui a trouvé le même texte # "Enregistrer" à un endroit totalement différent). # # Insertion : appelée une fois, juste avant de retourner le résultat au # client dans le handler /resolve_target. N'altère pas la cascade existante. # Seuils minimum de score par méthode. Les méthodes non listées héritent # du seuil par défaut (0.5). Une méthode peut être matchée par préfixe # (ex: "memory_v4_ocr" match "memory_"). _RESOLUTION_MIN_SCORES: Dict[str, float] = { # Mémoire Phase 1 : si une entrée a été cristallisée (≥2 succès), on a # une confiance quasi absolue — pas de seuil strict. "memory_": 0.0, # hybrid_text_direct est un matching OCR direct : fiable quand il trouve. "hybrid_text_direct": 0.80, # SoM (Set-of-Mark) : peut retourner un faux positif si l'élément # cherché existe à plusieurs endroits de l'écran. "som_anchor_match": 0.75, "som_text_match": 0.75, "som_vlm": 0.70, # Template matching : très strict sur la ressemblance pixel. "template_matching": 0.85, "v4_template": 0.85, # OCR seul (V4 ExecutionPlan) : fiable avec score moyen. "v4_ocr": 0.70, # VLM : souvent moins précis sur les coordonnées, seuil plus souple. "vlm_quick_find": 0.60, "vlm": 0.60, "v4_vlm": 0.60, "grounding": 0.60, "v4_grounding": 0.60, # UIA local : déterministe, confiance élevée quand succès. "v4_uia_local": 0.90, "uia": 0.90, } # Écart maximum toléré entre coords résolues et coords enregistrées # (en fraction d'écran, dans chaque axe). Au-delà, on considère que la # résolution a trouvé un faux positif ailleurs sur l'écran. _RESOLUTION_MAX_DRIFT: float = 0.20 def _validate_resolution_quality( result: Optional[Dict[str, Any]], fallback_x_pct: float, fallback_y_pct: float, ) -> Optional[Dict[str, Any]]: """Valide un résultat de résolution et le rejette s'il est peu fiable. Deux checks appliqués en sortie de cascade : - Score minimum par méthode (voir _RESOLUTION_MIN_SCORES) - Drift maximum par rapport aux coordonnées enregistrées (_RESOLUTION_MAX_DRIFT, activé uniquement si fallback_x/y_pct sont significatifs, c'est-à-dire différents du placeholder 0.5/0.5 et non nuls). Si un check échoue, retourne un nouveau dict `resolved=False` avec une raison explicite. Sinon retourne le result inchangé. Cette fonction est le **seul point d'insertion** des gardes qualité : elle n'est PAS appelée par les méthodes internes de la cascade, mais uniquement depuis le handler HTTP `/resolve_target` après que la cascade a produit son meilleur candidat. """ if not result or not isinstance(result, dict): return result if not result.get("resolved"): return result method = str(result.get("method", "") or "") try: score = float(result.get("score", 0) or 0) except (TypeError, ValueError): score = 0.0 try: resolved_x = float(result.get("x_pct", 0) or 0) resolved_y = float(result.get("y_pct", 0) or 0) except (TypeError, ValueError): return result # coords non-numériques : on ne peut rien valider # --- Check 1 : seuil de score par méthode --- # Trouver le seuil qui s'applique (match exact ou préfixe) min_score: Optional[float] = None if method in _RESOLUTION_MIN_SCORES: min_score = _RESOLUTION_MIN_SCORES[method] else: for prefix, threshold in _RESOLUTION_MIN_SCORES.items(): if prefix.endswith("_") and method.startswith(prefix): min_score = threshold break if min_score is not None and score < min_score: logger.warning( "[REPLAY] Resolution REJETÉE (score trop bas) : method=%s score=%.3f < %.2f", method, score, min_score, ) return { "resolved": False, "method": f"rejected_low_score_{method}", "reason": f"score_{score:.3f}_below_threshold_{min_score:.2f}", "original_method": method, "original_score": score, "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # --- Check 2 : garde de proximité --- # On n'applique la garde que si les coordonnées enregistrées ont un # sens (pas des placeholders 0.5/0.5 des plans V4 ni des 0.0/0.0). _has_recorded_coords = ( fallback_x_pct > 0.001 and fallback_y_pct > 0.001 and not (abs(fallback_x_pct - 0.5) < 0.001 and abs(fallback_y_pct - 0.5) < 0.001) ) if _has_recorded_coords: dx = abs(resolved_x - fallback_x_pct) dy = abs(resolved_y - fallback_y_pct) if dx > _RESOLUTION_MAX_DRIFT or dy > _RESOLUTION_MAX_DRIFT: # Exception : si le template matching trouve l'image avec une # similarité quasi parfaite, on fait confiance à la position # visuelle peu importe le drift. Une image retrouvée à >= 0.95 # de score est SUR l'écran à l'endroit indiqué — le drift par # rapport à l'enregistrement ne reflète qu'un changement de # layout (scroll, redimensionnement, F11, devtools), pas une # erreur de résolution. _HIGH_CONFIDENCE = 0.95 if score >= _HIGH_CONFIDENCE and method.startswith("template_matching"): logger.info( "[REPLAY] Drift (%.3f, %.3f) > %.2f IGNORÉ : score=%.3f >= %.2f " "sur %s — résultat visuel fiable, on l'utilise", dx, dy, _RESOLUTION_MAX_DRIFT, score, _HIGH_CONFIDENCE, method, ) return result logger.warning( "[REPLAY] Drift trop grand (%.3f, %.3f) > %.2f — fallback coords enregistrées (%.3f, %.3f)", dx, dy, _RESOLUTION_MAX_DRIFT, fallback_x_pct, fallback_y_pct, ) # Fallback : coordonnées enregistrées lors de la capture (écran identique = safe) return { "resolved": True, "method": "fallback_recorded_coords", "reason": f"drift_dx{dx:.3f}_dy{dy:.3f}_using_recorded", "original_method": method, "original_score": score, "x_pct": fallback_x_pct, "y_pct": fallback_y_pct, } # Validation OK — on retourne le result inchangé return result # ========================================================================= # Observer — Pré-analyse écran avant résolution # ========================================================================= def _pre_analyze_screen_sync( screenshot_b64: str, expected_state: str, window_title: str, screen_width: int, screen_height: int, ) -> Dict[str, Any]: """Pré-analyse synchrone de l'écran via VLM. Utilise gemma4 (Docker port 11435) pour détecter : 1. Popups/dialogues modaux (avec coordonnées du bouton à cliquer) 2. États incohérents avec l'attendu Rapide (~2-5s) car gemma4 est léger et en mode texte+image. """ import requests as _requests gemma4_port = os.environ.get("GEMMA4_PORT", "11435") gemma4_url = f"http://localhost:{gemma4_port}/api/chat" # Charger le contexte métier pour l'Observer from .domain_context import get_domain_context domain = get_domain_context(os.environ.get("RPA_DOMAIN", "generic")) # Prompt concis pour détection popup prompt = ( "Regarde cette capture d'écran.\n" "Y a-t-il une popup, boîte de dialogue, message d'erreur, ou fenêtre modale visible ?\n\n" "Réponds EXACTEMENT dans ce format :\n" "ÉTAT: OK ou POPUP ou INATTENDU\n" "BOUTON: texte du bouton à cliquer (si POPUP, sinon 'aucun')\n" "DÉTAIL: description courte (1 ligne)" ) # Messages avec contexte métier messages = [] if domain.system_prompt: messages.append({"role": "system", "content": domain.system_prompt}) messages.append({"role": "user", "content": prompt, "images": [screenshot_b64]}) try: t_start = time.time() resp = _requests.post( gemma4_url, json={ "model": "gemma4:e4b", "messages": messages, "stream": False, "think": True, "options": {"temperature": 0.1, "num_predict": 800}, }, timeout=30, ) elapsed_ms = (time.time() - t_start) * 1000 if not resp.ok: logger.warning(f"Observer VLM HTTP {resp.status_code}") return {"screen_state": "ok", "detail": f"VLM HTTP {resp.status_code}"} content = resp.json().get("message", {}).get("content", "").strip() logger.info(f"Observer VLM ({elapsed_ms:.0f}ms) : {content[:100]}") # Parser la réponse state = "ok" button = "" detail = content for line in content.split("\n"): line_clean = line.strip() upper = line_clean.upper() if upper.startswith("ÉTAT:") or upper.startswith("ETAT:"): val = upper.split(":", 1)[1].strip() if "POPUP" in val: state = "popup" elif "INATTENDU" in val or "UNEXPECTED" in val: state = "unexpected" else: state = "ok" elif upper.startswith("BOUTON:"): button = line_clean.split(":", 1)[1].strip().strip("'\"") if button.lower() in ("aucun", "none", "n/a", ""): button = "" elif upper.startswith("DÉTAIL:") or upper.startswith("DETAIL:"): detail = line_clean.split(":", 1)[1].strip() if state == "ok": return {"screen_state": "ok"} result = { "screen_state": state, "detail": detail, "elapsed_ms": round(elapsed_ms, 1), } # Si popup détectée avec un texte de bouton, essayer de le localiser if state == "popup" and button: result["popup_label"] = button # Localiser le bouton par grounding VLM (qwen2.5vl) coords = _locate_popup_button(screenshot_b64, button, screen_width, screen_height) if coords: result["popup_coords"] = coords return result except _requests.Timeout: logger.debug("Observer VLM timeout (15s)") return {"screen_state": "ok", "detail": "VLM timeout"} except Exception as e: logger.debug(f"Observer VLM erreur : {e}") return {"screen_state": "ok", "detail": str(e)} def _locate_popup_button( screenshot_b64: str, button_text: str, screen_width: int, screen_height: int, ) -> Optional[Dict[str, float]]: """Localiser un bouton de popup par grounding VLM (qwen2.5vl). Utilise le format bbox_2d natif de qwen2.5vl pour trouver la position exacte du bouton sur le screenshot. """ import requests as _requests ollama_url = "http://localhost:11434/api/chat" prompt = f"Detect the button with text '{button_text}' with a bounding box." try: resp = _requests.post( ollama_url, json={ "model": "qwen2.5vl:7b", "messages": [{"role": "user", "content": prompt, "images": [screenshot_b64]}], "stream": False, "options": {"temperature": 0.1, "num_predict": 50}, }, timeout=15, ) if not resp.ok: return None content = resp.json().get("message", {}).get("content", "") # Parser bbox_2d — qwen2.5vl retourne des coordonnées en pixels # relatifs à l'image envoyée, PAS sur une grille 1000x1000. # Format JSON : [{"bbox_2d": [x1, y1, x2, y2], "label": "..."}] bbox_match = re.search( r'"bbox_2d"\s*:\s*\[\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\]', content, ) if bbox_match: x1, y1, x2, y2 = [int(bbox_match.group(i)) for i in range(1, 5)] # Normaliser par les dimensions de l'écran (pixels → 0-1) cx = (x1 + x2) / 2 / screen_width cy = (y1 + y2) / 2 / screen_height if 0.0 <= cx <= 1.0 and 0.0 <= cy <= 1.0: logger.info(f"Observer : bouton '{button_text}' localisé à ({cx:.3f}, {cy:.3f})") return {"x_pct": cx, "y_pct": cy} except Exception as e: logger.debug(f"Observer grounding bouton erreur : {e}") return None