Files
rpa_vision_v3/core/grounding/template_matcher.py
Dom 9da589c8c2 feat(grounding): pipeline centralisé + serveur UI-TARS transformers + nettoyage code mort
Architecture grounding complète :
- core/grounding/server.py : serveur FastAPI (port 8200) avec UI-TARS-1.5-7B en 4-bit NF4
  Process séparé avec son propre contexte CUDA (résout le crash Flask/CUDA)
- core/grounding/pipeline.py : orchestrateur cascade template→OCR→UI-TARS→static
- core/grounding/template_matcher.py : TemplateMatcher centralisé (remplace 5 copies)
- core/grounding/ui_tars_grounder.py : client HTTP vers le serveur de grounding
- core/grounding/target.py : GroundingTarget + GroundingResult

ORA modifié :
- _act_click() : capture unique de l'écran envoyée au serveur de grounding
- Pre-check VLM skippé pour ui_tars (redondant, et Ollama n'a plus de VRAM)
- verify_level='none' par défaut (vérification titre OCR prévue en Phase 2)
- Détection réponses négatives UI-TARS ("I don't see it" → fallback OCR)

Nettoyage :
- 9 fichiers morts archivés dans _archive/ (~6300 lignes supprimées)
- 21 tests ajoutés pour TemplateMatcher

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-25 17:48:18 +02:00

351 lines
12 KiB
Python

"""
core/grounding/template_matcher.py — Template matching centralisé
Fournit une classe TemplateMatcher qui localise une ancre visuelle (image template)
dans un screenshot via cv2.matchTemplate. Supporte single-scale et multi-scale.
Remplace les implémentations dupliquées dans :
- core/execution/observe_reason_act.py (~1348-1375)
- visual_workflow_builder/backend/api_v3/execute.py (~930-963)
- visual_workflow_builder/backend/catalog_routes_v2_vlm.py (~339-381)
- visual_workflow_builder/backend/services/intelligent_executor.py (~131-210)
- core/detection/omniparser_adapter.py (~330)
Utilisation :
from core.grounding import TemplateMatcher, MatchResult
matcher = TemplateMatcher(threshold=0.75)
result = matcher.match_screen(anchor_b64="...")
if result:
print(f"Trouvé à ({result.x}, {result.y}) score={result.score:.3f}")
"""
from __future__ import annotations
import base64
import io
import logging
import time
from dataclasses import dataclass
from typing import List, Optional, Tuple
logger = logging.getLogger(__name__)
# Imports optionnels — le module se charge même sans cv2/PIL/mss
try:
import cv2
_CV2 = True
except ImportError:
_CV2 = False
try:
import numpy as np
_NP = True
except ImportError:
_NP = False
try:
from PIL import Image
_PIL = True
except ImportError:
_PIL = False
try:
import mss as mss_lib
_MSS = True
except ImportError:
_MSS = False
# ---------------------------------------------------------------------------
# Résultat d'un match
# ---------------------------------------------------------------------------
@dataclass
class MatchResult:
"""Résultat d'un template matching."""
x: int
y: int
score: float
method: str # 'template' | 'template_multiscale'
time_ms: float
scale: float = 1.0 # Échelle à laquelle le meilleur match a été trouvé
# ---------------------------------------------------------------------------
# TemplateMatcher
# ---------------------------------------------------------------------------
class TemplateMatcher:
"""Localise une ancre visuelle dans un screenshot via template matching.
Paramètres :
threshold : score minimum pour accepter un match (défaut 0.75)
multiscale : active le matching multi-échelle (défaut False)
scales : liste d'échelles à tester en mode multi-scale
method : méthode cv2 (défaut cv2.TM_CCOEFF_NORMED)
grayscale : convertir en niveaux de gris avant matching (défaut False)
"""
# Échelles par défaut pour le mode multi-scale, ordonnées par
# probabilité décroissante (1.0 en premier = rapide si ça matche)
DEFAULT_SCALES: List[float] = [1.0, 0.95, 1.05, 0.9, 1.1, 0.85, 1.15, 0.8, 1.2]
def __init__(
self,
threshold: float = 0.75,
multiscale: bool = False,
scales: Optional[List[float]] = None,
grayscale: bool = False,
):
self.threshold = threshold
self.multiscale = multiscale
self.scales = scales or self.DEFAULT_SCALES
self.grayscale = grayscale
# cv2.TM_CCOEFF_NORMED est la méthode utilisée partout dans le projet
self._cv2_method = cv2.TM_CCOEFF_NORMED if _CV2 else None
# ------------------------------------------------------------------
# API publique
# ------------------------------------------------------------------
def match_screen(
self,
anchor_b64: Optional[str] = None,
anchor_pil: Optional["Image.Image"] = None,
screen_pil: Optional["Image.Image"] = None,
) -> Optional[MatchResult]:
"""Cherche l'ancre dans le screenshot courant (ou fourni).
L'ancre peut être passée en base64 ou en PIL Image.
Le screenshot est capturé via mss si non fourni.
Retourne un MatchResult ou None si aucun match >= seuil.
"""
if not (_CV2 and _NP and _PIL):
logger.debug("[TemplateMatcher] cv2/numpy/PIL non disponible")
return None
# --- Préparer l'ancre ---
anchor_img = self._decode_anchor(anchor_b64, anchor_pil)
if anchor_img is None:
return None
# --- Préparer le screenshot ---
if screen_pil is None:
screen_pil = self._capture_screen()
if screen_pil is None:
return None
# --- Convertir en arrays cv2 ---
screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR)
anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR)
# --- Matching ---
if self.multiscale:
return self._match_multiscale(screen_cv, anchor_cv)
else:
return self._match_single(screen_cv, anchor_cv)
def match_in_region(
self,
region_cv: "np.ndarray",
anchor_cv: "np.ndarray",
threshold: Optional[float] = None,
) -> Optional[MatchResult]:
"""Match dans une région déjà découpée (arrays BGR).
Utilisé par les pipelines qui font leur propre capture/découpe.
"""
if not (_CV2 and _NP):
return None
thr = threshold if threshold is not None else self.threshold
if self.multiscale:
return self._match_multiscale(region_cv, anchor_cv, threshold_override=thr)
else:
return self._match_single(region_cv, anchor_cv, threshold_override=thr)
def match_screen_diagnostic(
self,
anchor_b64: Optional[str] = None,
anchor_pil: Optional["Image.Image"] = None,
screen_pil: Optional["Image.Image"] = None,
) -> str:
"""Retourne un diagnostic textuel (score + position) même sans match."""
if not (_CV2 and _NP and _PIL):
return "cv2/numpy/PIL non dispo"
anchor_img = self._decode_anchor(anchor_b64, anchor_pil)
if anchor_img is None:
return "ancre non décodable"
if screen_pil is None:
screen_pil = self._capture_screen()
if screen_pil is None:
return "capture écran échouée"
screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR)
anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR)
if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]:
return f"ancre {anchor_cv.shape[:2]} >= écran {screen_cv.shape[:2]}"
s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv)
result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
_, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
return f"{max_val:.3f} pos={max_loc}"
# ------------------------------------------------------------------
# Méthodes internes
# ------------------------------------------------------------------
def _match_single(
self,
screen_cv: "np.ndarray",
anchor_cv: "np.ndarray",
threshold_override: Optional[float] = None,
) -> Optional[MatchResult]:
"""Template matching single-scale."""
threshold = threshold_override if threshold_override is not None else self.threshold
if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]:
logger.debug("[TemplateMatcher] Ancre plus grande que le screen")
return None
s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv)
t0 = time.time()
result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
_, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
elapsed_ms = (time.time() - t0) * 1000
logger.debug(
"[TemplateMatcher] score=%.3f pos=%s (%.0fms)",
max_val, max_loc, elapsed_ms,
)
if max_val >= threshold:
cx = max_loc[0] + anchor_cv.shape[1] // 2
cy = max_loc[1] + anchor_cv.shape[0] // 2
return MatchResult(
x=cx,
y=cy,
score=float(max_val),
method='template',
time_ms=elapsed_ms,
scale=1.0,
)
return None
def _match_multiscale(
self,
screen_cv: "np.ndarray",
anchor_cv: "np.ndarray",
threshold_override: Optional[float] = None,
) -> Optional[MatchResult]:
"""Template matching multi-scale."""
threshold = threshold_override if threshold_override is not None else self.threshold
best_score = -1.0
best_loc = None
best_scale = 1.0
best_anchor_shape = anchor_cv.shape
t0 = time.time()
for scale in self.scales:
if scale == 1.0:
scaled = anchor_cv
else:
new_w = int(anchor_cv.shape[1] * scale)
new_h = int(anchor_cv.shape[0] * scale)
if new_w < 8 or new_h < 8:
continue
if new_h >= screen_cv.shape[0] or new_w >= screen_cv.shape[1]:
continue
scaled = cv2.resize(anchor_cv, (new_w, new_h), interpolation=cv2.INTER_AREA)
if scaled.shape[0] >= screen_cv.shape[0] or scaled.shape[1] >= screen_cv.shape[1]:
continue
s_img, a_img = self._maybe_grayscale(screen_cv, scaled)
result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
_, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
if max_val > best_score:
best_score = max_val
best_loc = max_loc
best_scale = scale
best_anchor_shape = scaled.shape
elapsed_ms = (time.time() - t0) * 1000
logger.debug(
"[TemplateMatcher/multiscale] best_score=%.3f scale=%.2f (%.0fms)",
best_score, best_scale, elapsed_ms,
)
if best_score >= threshold and best_loc is not None:
cx = best_loc[0] + best_anchor_shape[1] // 2
cy = best_loc[1] + best_anchor_shape[0] // 2
return MatchResult(
x=cx,
y=cy,
score=float(best_score),
method='template_multiscale',
time_ms=elapsed_ms,
scale=best_scale,
)
return None
def _maybe_grayscale(
self,
screen: "np.ndarray",
anchor: "np.ndarray",
) -> Tuple["np.ndarray", "np.ndarray"]:
"""Convertit en niveaux de gris si self.grayscale est True."""
if not self.grayscale:
return screen, anchor
s = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY) if len(screen.shape) == 3 else screen
a = cv2.cvtColor(anchor, cv2.COLOR_BGR2GRAY) if len(anchor.shape) == 3 else anchor
return s, a
@staticmethod
def _decode_anchor(
anchor_b64: Optional[str],
anchor_pil: Optional["Image.Image"],
) -> Optional["Image.Image"]:
"""Décode l'ancre depuis base64 ou retourne le PIL directement."""
if anchor_pil is not None:
return anchor_pil
if anchor_b64 is None:
logger.debug("[TemplateMatcher] Ni anchor_b64 ni anchor_pil fourni")
return None
try:
raw = anchor_b64.split(',')[1] if ',' in anchor_b64 else anchor_b64
data = base64.b64decode(raw)
return Image.open(io.BytesIO(data))
except Exception as e:
logger.debug("[TemplateMatcher] Erreur décodage ancre: %s", e)
return None
@staticmethod
def _capture_screen() -> Optional["Image.Image"]:
"""Capture l'écran complet via mss (moniteur 0 = tous les écrans)."""
if not _MSS:
logger.debug("[TemplateMatcher] mss non disponible")
return None
try:
with mss_lib.mss() as sct:
mon = sct.monitors[0]
grab = sct.grab(mon)
return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
except Exception as e:
logger.debug("[TemplateMatcher] Erreur capture écran: %s", e)
return None