""" core/grounding/template_matcher.py — Template matching centralisé Fournit une classe TemplateMatcher qui localise une ancre visuelle (image template) dans un screenshot via cv2.matchTemplate. Supporte single-scale et multi-scale. Remplace les implémentations dupliquées dans : - core/execution/observe_reason_act.py (~1348-1375) - visual_workflow_builder/backend/api_v3/execute.py (~930-963) - visual_workflow_builder/backend/catalog_routes_v2_vlm.py (~339-381) - visual_workflow_builder/backend/services/intelligent_executor.py (~131-210) - core/detection/omniparser_adapter.py (~330) Utilisation : from core.grounding import TemplateMatcher, MatchResult matcher = TemplateMatcher(threshold=0.75) result = matcher.match_screen(anchor_b64="...") if result: print(f"Trouvé à ({result.x}, {result.y}) score={result.score:.3f}") """ from __future__ import annotations import base64 import io import logging import time from dataclasses import dataclass from typing import List, Optional, Tuple logger = logging.getLogger(__name__) # Imports optionnels — le module se charge même sans cv2/PIL/mss try: import cv2 _CV2 = True except ImportError: _CV2 = False try: import numpy as np _NP = True except ImportError: _NP = False try: from PIL import Image _PIL = True except ImportError: _PIL = False try: import mss as mss_lib _MSS = True except ImportError: _MSS = False # --------------------------------------------------------------------------- # Résultat d'un match # --------------------------------------------------------------------------- @dataclass class MatchResult: """Résultat d'un template matching.""" x: int y: int score: float method: str # 'template' | 'template_multiscale' time_ms: float scale: float = 1.0 # Échelle à laquelle le meilleur match a été trouvé # --------------------------------------------------------------------------- # TemplateMatcher # --------------------------------------------------------------------------- class TemplateMatcher: """Localise une ancre visuelle dans un screenshot via template matching. Paramètres : threshold : score minimum pour accepter un match (défaut 0.75) multiscale : active le matching multi-échelle (défaut False) scales : liste d'échelles à tester en mode multi-scale method : méthode cv2 (défaut cv2.TM_CCOEFF_NORMED) grayscale : convertir en niveaux de gris avant matching (défaut False) """ # Échelles par défaut pour le mode multi-scale, ordonnées par # probabilité décroissante (1.0 en premier = rapide si ça matche) DEFAULT_SCALES: List[float] = [1.0, 0.95, 1.05, 0.9, 1.1, 0.85, 1.15, 0.8, 1.2] def __init__( self, threshold: float = 0.75, multiscale: bool = False, scales: Optional[List[float]] = None, grayscale: bool = False, ): self.threshold = threshold self.multiscale = multiscale self.scales = scales or self.DEFAULT_SCALES self.grayscale = grayscale # cv2.TM_CCOEFF_NORMED est la méthode utilisée partout dans le projet self._cv2_method = cv2.TM_CCOEFF_NORMED if _CV2 else None # ------------------------------------------------------------------ # API publique # ------------------------------------------------------------------ def match_screen( self, anchor_b64: Optional[str] = None, anchor_pil: Optional["Image.Image"] = None, screen_pil: Optional["Image.Image"] = None, ) -> Optional[MatchResult]: """Cherche l'ancre dans le screenshot courant (ou fourni). L'ancre peut être passée en base64 ou en PIL Image. Le screenshot est capturé via mss si non fourni. Retourne un MatchResult ou None si aucun match >= seuil. """ if not (_CV2 and _NP and _PIL): logger.debug("[TemplateMatcher] cv2/numpy/PIL non disponible") return None # --- Préparer l'ancre --- anchor_img = self._decode_anchor(anchor_b64, anchor_pil) if anchor_img is None: return None # --- Préparer le screenshot --- if screen_pil is None: screen_pil = self._capture_screen() if screen_pil is None: return None # --- Convertir en arrays cv2 --- screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR) anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR) # --- Matching --- if self.multiscale: return self._match_multiscale(screen_cv, anchor_cv) else: return self._match_single(screen_cv, anchor_cv) def match_in_region( self, region_cv: "np.ndarray", anchor_cv: "np.ndarray", threshold: Optional[float] = None, ) -> Optional[MatchResult]: """Match dans une région déjà découpée (arrays BGR). Utilisé par les pipelines qui font leur propre capture/découpe. """ if not (_CV2 and _NP): return None thr = threshold if threshold is not None else self.threshold if self.multiscale: return self._match_multiscale(region_cv, anchor_cv, threshold_override=thr) else: return self._match_single(region_cv, anchor_cv, threshold_override=thr) def match_screen_diagnostic( self, anchor_b64: Optional[str] = None, anchor_pil: Optional["Image.Image"] = None, screen_pil: Optional["Image.Image"] = None, ) -> str: """Retourne un diagnostic textuel (score + position) même sans match.""" if not (_CV2 and _NP and _PIL): return "cv2/numpy/PIL non dispo" anchor_img = self._decode_anchor(anchor_b64, anchor_pil) if anchor_img is None: return "ancre non décodable" if screen_pil is None: screen_pil = self._capture_screen() if screen_pil is None: return "capture écran échouée" screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR) anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR) if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]: return f"ancre {anchor_cv.shape[:2]} >= écran {screen_cv.shape[:2]}" s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv) result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method) _, max_val, _, max_loc = cv2.minMaxLoc(result_tm) return f"{max_val:.3f} pos={max_loc}" # ------------------------------------------------------------------ # Méthodes internes # ------------------------------------------------------------------ def _match_single( self, screen_cv: "np.ndarray", anchor_cv: "np.ndarray", threshold_override: Optional[float] = None, ) -> Optional[MatchResult]: """Template matching single-scale.""" threshold = threshold_override if threshold_override is not None else self.threshold if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]: logger.debug("[TemplateMatcher] Ancre plus grande que le screen") return None s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv) t0 = time.time() result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method) _, max_val, _, max_loc = cv2.minMaxLoc(result_tm) elapsed_ms = (time.time() - t0) * 1000 logger.debug( "[TemplateMatcher] score=%.3f pos=%s (%.0fms)", max_val, max_loc, elapsed_ms, ) if max_val >= threshold: cx = max_loc[0] + anchor_cv.shape[1] // 2 cy = max_loc[1] + anchor_cv.shape[0] // 2 return MatchResult( x=cx, y=cy, score=float(max_val), method='template', time_ms=elapsed_ms, scale=1.0, ) return None def _match_multiscale( self, screen_cv: "np.ndarray", anchor_cv: "np.ndarray", threshold_override: Optional[float] = None, ) -> Optional[MatchResult]: """Template matching multi-scale.""" threshold = threshold_override if threshold_override is not None else self.threshold best_score = -1.0 best_loc = None best_scale = 1.0 best_anchor_shape = anchor_cv.shape t0 = time.time() for scale in self.scales: if scale == 1.0: scaled = anchor_cv else: new_w = int(anchor_cv.shape[1] * scale) new_h = int(anchor_cv.shape[0] * scale) if new_w < 8 or new_h < 8: continue if new_h >= screen_cv.shape[0] or new_w >= screen_cv.shape[1]: continue scaled = cv2.resize(anchor_cv, (new_w, new_h), interpolation=cv2.INTER_AREA) if scaled.shape[0] >= screen_cv.shape[0] or scaled.shape[1] >= screen_cv.shape[1]: continue s_img, a_img = self._maybe_grayscale(screen_cv, scaled) result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method) _, max_val, _, max_loc = cv2.minMaxLoc(result_tm) if max_val > best_score: best_score = max_val best_loc = max_loc best_scale = scale best_anchor_shape = scaled.shape elapsed_ms = (time.time() - t0) * 1000 logger.debug( "[TemplateMatcher/multiscale] best_score=%.3f scale=%.2f (%.0fms)", best_score, best_scale, elapsed_ms, ) if best_score >= threshold and best_loc is not None: cx = best_loc[0] + best_anchor_shape[1] // 2 cy = best_loc[1] + best_anchor_shape[0] // 2 return MatchResult( x=cx, y=cy, score=float(best_score), method='template_multiscale', time_ms=elapsed_ms, scale=best_scale, ) return None def _maybe_grayscale( self, screen: "np.ndarray", anchor: "np.ndarray", ) -> Tuple["np.ndarray", "np.ndarray"]: """Convertit en niveaux de gris si self.grayscale est True.""" if not self.grayscale: return screen, anchor s = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY) if len(screen.shape) == 3 else screen a = cv2.cvtColor(anchor, cv2.COLOR_BGR2GRAY) if len(anchor.shape) == 3 else anchor return s, a @staticmethod def _decode_anchor( anchor_b64: Optional[str], anchor_pil: Optional["Image.Image"], ) -> Optional["Image.Image"]: """Décode l'ancre depuis base64 ou retourne le PIL directement.""" if anchor_pil is not None: return anchor_pil if anchor_b64 is None: logger.debug("[TemplateMatcher] Ni anchor_b64 ni anchor_pil fourni") return None try: raw = anchor_b64.split(',')[1] if ',' in anchor_b64 else anchor_b64 data = base64.b64decode(raw) return Image.open(io.BytesIO(data)) except Exception as e: logger.debug("[TemplateMatcher] Erreur décodage ancre: %s", e) return None @staticmethod def _capture_screen() -> Optional["Image.Image"]: """Capture l'écran complet via mss (moniteur 0 = tous les écrans).""" if not _MSS: logger.debug("[TemplateMatcher] mss non disponible") return None try: with mss_lib.mss() as sct: mon = sct.monitors[0] grab = sct.grab(mon) return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX') except Exception as e: logger.debug("[TemplateMatcher] Erreur capture écran: %s", e) return None