diff --git a/core/evaluation/workflow_simulation_report.py b/_archive/dead_code_20260424/core/evaluation/workflow_simulation_report.py
similarity index 100%
rename from core/evaluation/workflow_simulation_report.py
rename to _archive/dead_code_20260424/core/evaluation/workflow_simulation_report.py
diff --git a/core/pipeline/workflow_pipeline_enhanced.py b/_archive/dead_code_20260424/core/pipeline/workflow_pipeline_enhanced.py
similarity index 100%
rename from core/pipeline/workflow_pipeline_enhanced.py
rename to _archive/dead_code_20260424/core/pipeline/workflow_pipeline_enhanced.py
diff --git a/core/visual/contextual_capture_service.py b/_archive/dead_code_20260424/core/visual/contextual_capture_service.py
similarity index 100%
rename from core/visual/contextual_capture_service.py
rename to _archive/dead_code_20260424/core/visual/contextual_capture_service.py
diff --git a/core/visual/realtime_validation_service.py b/_archive/dead_code_20260424/core/visual/realtime_validation_service.py
similarity index 100%
rename from core/visual/realtime_validation_service.py
rename to _archive/dead_code_20260424/core/visual/realtime_validation_service.py
diff --git a/core/visual/rpa_integration_manager.py b/_archive/dead_code_20260424/core/visual/rpa_integration_manager.py
similarity index 100%
rename from core/visual/rpa_integration_manager.py
rename to _archive/dead_code_20260424/core/visual/rpa_integration_manager.py
diff --git a/core/visual/visual_performance_optimizer.py b/_archive/dead_code_20260424/core/visual/visual_performance_optimizer.py
similarity index 100%
rename from core/visual/visual_performance_optimizer.py
rename to _archive/dead_code_20260424/core/visual/visual_performance_optimizer.py
diff --git a/core/visual/visual_persistence_manager.py b/_archive/dead_code_20260424/core/visual/visual_persistence_manager.py
similarity index 100%
rename from core/visual/visual_persistence_manager.py
rename to _archive/dead_code_20260424/core/visual/visual_persistence_manager.py
diff --git a/core/visual/workflow_migration_tool.py b/_archive/dead_code_20260424/core/visual/workflow_migration_tool.py
similarity index 100%
rename from core/visual/workflow_migration_tool.py
rename to _archive/dead_code_20260424/core/visual/workflow_migration_tool.py
diff --git a/visual_workflow_builder/backend/app_catalogue_simple.py b/_archive/dead_code_20260424/visual_workflow_builder/backend/app_catalogue_simple.py
similarity index 100%
rename from visual_workflow_builder/backend/app_catalogue_simple.py
rename to _archive/dead_code_20260424/visual_workflow_builder/backend/app_catalogue_simple.py
diff --git a/core/execution/observe_reason_act.py b/core/execution/observe_reason_act.py
index be8467aa5..e93acc606 100644
--- a/core/execution/observe_reason_act.py
+++ b/core/execution/observe_reason_act.py
@@ -1363,20 +1363,51 @@ Règles:
x, y = None, None
method_used = ''
- # --- Méthode 1 : UI-TARS grounding (~3s, 94% précision) ---
- # Le plus fiable : on dit "click on X" et UI-TARS trouve les coordonnées
+ # --- Capture unique de l'écran pour TOUTES les méthodes ---
+ _screen_b64 = None
+ if MSS_AVAILABLE and PIL_AVAILABLE:
+ try:
+ import io as _io
+ with mss_lib.mss() as _sct:
+ _mon = _sct.monitors[0]
+ _grab = _sct.grab(_mon)
+ _screen_pil = Image.frombytes('RGB', _grab.size, _grab.bgra, 'raw', 'BGRX')
+ _buf = _io.BytesIO()
+ _screen_pil.save(_buf, format='JPEG', quality=85)
+ _screen_b64 = base64.b64encode(_buf.getvalue()).decode('utf-8')
+ print(f"📸 [ORA/capture] Écran capturé: {_screen_pil.size}")
+ except Exception as _e:
+ print(f"⚠️ [ORA/capture] Erreur: {_e}")
+
+ # --- Méthode 1 : UI-TARS via serveur grounding (port 8200, ~3s) ---
+ # Le serveur tourne dans un process séparé avec son propre CUDA context.
+ # Si le serveur n'est pas lancé → on passe au template matching.
if target_text or target_desc:
try:
- from core.execution.input_handler import _grounding_ui_tars
+ import requests as _http
click_label = target_desc or target_text
print(f"🎯 [ORA/UI-TARS] Recherche: '{click_label}'")
- result = _grounding_ui_tars(target_text, target_desc)
- if result:
- x, y = result['x'], result['y']
- method_used = 'ui_tars'
- print(f"✅ [ORA/UI-TARS] Trouvé à ({x}, {y})")
+ _payload = {
+ 'target_text': target_text,
+ 'target_description': target_desc,
+ }
+ if _screen_b64:
+ _payload['image_b64'] = _screen_b64
+ _resp = _http.post('http://localhost:8200/ground', json=_payload, timeout=30)
+ if _resp.status_code == 200:
+ _data = _resp.json()
+ if _data.get('x') is not None:
+ x, y = _data['x'], _data['y']
+ method_used = 'ui_tars'
+ print(f"✅ [ORA/UI-TARS] Trouvé à ({x}, {y}) conf={_data.get('confidence', 0):.2f} ({_data.get('time_ms', 0):.0f}ms)")
+ else:
+ print(f"⚠️ [ORA/UI-TARS] Serveur n'a pas trouvé '{click_label}'")
+ else:
+ print(f"⚠️ [ORA/UI-TARS] Serveur HTTP {_resp.status_code}")
+ except _http.ConnectionError:
+ print(f"⚠️ [ORA/UI-TARS] Serveur grounding non démarré (port 8200)")
except Exception as e:
- logger.debug(f"⚠️ [ORA/UI-TARS] Erreur: {e}")
+ print(f"⚠️ [ORA/UI-TARS] Erreur: {e}")
# --- Méthode 2 : Template matching (~80ms) ---
if x is None and screenshot_b64 and CV2_AVAILABLE and PIL_AVAILABLE and MSS_AVAILABLE:
@@ -1405,19 +1436,22 @@ Règles:
y = max_loc[1] + anchor_cv.shape[0] // 2
method_used = 'template'
except Exception as e:
- logger.debug(f"⚠️ [ORA/template] Erreur: {e}")
+ print(f"⚠️ [ORA/template] Erreur: {e}")
# --- Méthode 3 : OCR texte (~1s) ---
if x is None and target_text:
try:
from core.execution.input_handler import _grounding_ocr
+ print(f"🔍 [ORA/OCR] Recherche: '{target_text}'")
result = _grounding_ocr(target_text, anchor_bbox=bbox if bbox else None)
if result:
x, y = result['x'], result['y']
method_used = 'ocr'
print(f"🔍 [ORA/OCR] Trouvé à ({x}, {y})")
+ else:
+ print(f"🔍 [ORA/OCR] '{target_text}' non trouvé")
except Exception as e:
- logger.debug(f"⚠️ [ORA/OCR] Erreur: {e}")
+ print(f"⚠️ [ORA/OCR] Erreur: {e}")
# --- Exécuter le clic ---
if x is None:
@@ -1426,13 +1460,13 @@ Règles:
x = int(bbox.get('x', 0) + bbox.get('width', 0) / 2)
y = int(bbox.get('y', 0) + bbox.get('height', 0) / 2)
method_used = 'static_fallback'
- logger.warning(f"⚠️ [ORA/click] Fallback coordonnées statiques: ({x}, {y})")
+ print(f"⚠️ [ORA/click] Fallback coordonnées statiques: ({x}, {y})")
else:
logger.error(f"❌ [ORA/click] Impossible de localiser '{target_text}' — aucune méthode n'a fonctionné")
return False
- # --- Vérification pré-action : est-ce le bon élément ? ---
- if target_text and method_used not in ('template',) and MSS_AVAILABLE and PIL_AVAILABLE:
+ # --- Vérification pré-action (skip si UI-TARS a déjà validé visuellement) ---
+ if target_text and method_used not in ('template', 'ui_tars') and MSS_AVAILABLE and PIL_AVAILABLE:
try:
pre_check = self._verify_pre_click(x, y, target_text, target_desc)
if not pre_check:
diff --git a/core/grounding/__init__.py b/core/grounding/__init__.py
new file mode 100644
index 000000000..357edeeda
--- /dev/null
+++ b/core/grounding/__init__.py
@@ -0,0 +1,20 @@
+# core/grounding — Module de localisation d'éléments UI
+#
+# Centralise les méthodes de grounding visuel : template matching,
+# OCR, VLM, etc. Chaque méthode produit un GroundingResult uniforme.
+#
+# Le serveur de grounding (server.py) tourne dans un process séparé
+# sur le port 8200. Le client HTTP (UITarsGrounder) l'appelle via HTTP.
+# Le pipeline (GroundingPipeline) orchestre template → OCR → UI-TARS → static.
+
+from core.grounding.template_matcher import TemplateMatcher, MatchResult
+from core.grounding.target import GroundingTarget, GroundingResult
+from core.grounding.ui_tars_grounder import UITarsGrounder
+from core.grounding.pipeline import GroundingPipeline
+
+__all__ = [
+ 'TemplateMatcher', 'MatchResult',
+ 'GroundingTarget', 'GroundingResult',
+ 'UITarsGrounder',
+ 'GroundingPipeline',
+]
diff --git a/core/grounding/pipeline.py b/core/grounding/pipeline.py
new file mode 100644
index 000000000..7825a5002
--- /dev/null
+++ b/core/grounding/pipeline.py
@@ -0,0 +1,190 @@
+"""
+core/grounding/pipeline.py — Pipeline de grounding en cascade
+
+Orchestre les methodes de localisation dans l'ordre :
+1. Template matching (TemplateMatcher, local, ~80ms)
+2. OCR (docTR via input_handler, local, ~1s)
+3. UI-TARS (HTTP vers serveur grounding, ~3s)
+4. Static fallback (coordonnees d'origine du workflow)
+
+Chaque methode est essayee dans l'ordre. Des qu'une reussit, on retourne
+le resultat. Cela permet un equilibre entre vitesse (template) et robustesse
+(UI-TARS pour les elements qui ont change de position/apparence).
+
+Utilisation :
+ from core.grounding.pipeline import GroundingPipeline
+ from core.grounding.target import GroundingTarget
+
+ pipeline = GroundingPipeline()
+ result = pipeline.locate(GroundingTarget(
+ text="Valider",
+ description="bouton vert en bas",
+ template_b64=screenshot_b64,
+ original_bbox={"x": 100, "y": 200, "width": 80, "height": 30},
+ ))
+ if result:
+ print(f"Trouve a ({result.x}, {result.y}) via {result.method}")
+"""
+
+from __future__ import annotations
+
+import time
+from typing import Optional
+
+from core.grounding.target import GroundingTarget, GroundingResult
+
+
+class GroundingPipeline:
+ """Pipeline de localisation en cascade : template -> OCR -> UI-TARS -> static."""
+
+ def __init__(self, template_threshold: float = 0.75, enable_uitars: bool = True):
+ self.template_threshold = template_threshold
+ self.enable_uitars = enable_uitars
+
+ def locate(self, target: GroundingTarget) -> Optional[GroundingResult]:
+ """Localise un element UI en essayant les methodes en cascade.
+
+ Args:
+ target: description de l'element a localiser
+
+ Returns:
+ GroundingResult ou None si aucune methode ne trouve l'element
+ """
+ t0 = time.time()
+
+ # --- Methode 1 : Template matching (~80ms) ---
+ result = self._try_template(target)
+ if result:
+ print(f"[GroundingPipeline] Localise via {result.method} en "
+ f"{(time.time() - t0) * 1000:.0f}ms")
+ return result
+
+ # --- Methode 2 : OCR texte (~1s) ---
+ result = self._try_ocr(target)
+ if result:
+ print(f"[GroundingPipeline] Localise via {result.method} en "
+ f"{(time.time() - t0) * 1000:.0f}ms")
+ return result
+
+ # --- Methode 3 : UI-TARS via serveur HTTP (~3s) ---
+ if self.enable_uitars:
+ result = self._try_uitars(target)
+ if result:
+ print(f"[GroundingPipeline] Localise via {result.method} en "
+ f"{(time.time() - t0) * 1000:.0f}ms")
+ return result
+
+ # --- Methode 4 : Fallback statique ---
+ result = self._try_static(target)
+ if result:
+ print(f"[GroundingPipeline] Localise via {result.method} en "
+ f"{(time.time() - t0) * 1000:.0f}ms")
+ return result
+
+ print(f"[GroundingPipeline] ECHEC: '{target.text}' introuvable "
+ f"(toutes methodes epuisees, {(time.time() - t0) * 1000:.0f}ms)")
+ return None
+
+ # ------------------------------------------------------------------
+ # Methodes individuelles
+ # ------------------------------------------------------------------
+
+ def _try_template(self, target: GroundingTarget) -> Optional[GroundingResult]:
+ """Template matching — rapide, exact, mais sensible aux changements visuels."""
+ if not target.template_b64:
+ return None
+
+ try:
+ from core.grounding.template_matcher import TemplateMatcher
+ matcher = TemplateMatcher(threshold=self.template_threshold)
+ match = matcher.match_screen(anchor_b64=target.template_b64)
+ if match:
+ print(f"[GroundingPipeline/template] score={match.score:.3f} "
+ f"pos=({match.x},{match.y}) ({match.time_ms:.0f}ms)")
+ return GroundingResult(
+ x=match.x,
+ y=match.y,
+ method='template',
+ confidence=match.score,
+ time_ms=match.time_ms,
+ )
+ else:
+ diag = matcher.match_screen_diagnostic(anchor_b64=target.template_b64)
+ print(f"[GroundingPipeline/template] pas de match — best={diag}")
+ except Exception as e:
+ print(f"[GroundingPipeline/template] ERREUR: {e}")
+
+ return None
+
+ def _try_ocr(self, target: GroundingTarget) -> Optional[GroundingResult]:
+ """OCR : cherche le texte cible sur l'ecran via docTR."""
+ if not target.text:
+ return None
+
+ try:
+ from core.execution.input_handler import _grounding_ocr
+ bbox = target.original_bbox if target.original_bbox else None
+ result = _grounding_ocr(target.text, anchor_bbox=bbox)
+ if result:
+ print(f"[GroundingPipeline/OCR] '{target.text}' -> ({result['x']}, {result['y']})")
+ return GroundingResult(
+ x=result['x'],
+ y=result['y'],
+ method='ocr',
+ confidence=result.get('confidence', 0.80),
+ time_ms=result.get('time_ms', 0),
+ )
+ else:
+ print(f"[GroundingPipeline/OCR] '{target.text}' non trouve")
+ except Exception as e:
+ print(f"[GroundingPipeline/OCR] ERREUR: {e}")
+
+ return None
+
+ def _try_uitars(self, target: GroundingTarget) -> Optional[GroundingResult]:
+ """UI-TARS via serveur HTTP — robust, gere les changements de layout."""
+ if not target.text and not target.description:
+ return None
+
+ try:
+ from core.grounding.ui_tars_grounder import UITarsGrounder
+ grounder = UITarsGrounder.get_instance()
+ result = grounder.ground(
+ target_text=target.text,
+ target_description=target.description,
+ )
+ if result:
+ print(f"[GroundingPipeline/UI-TARS] ({result.x}, {result.y}) "
+ f"conf={result.confidence:.2f} ({result.time_ms:.0f}ms)")
+ return result
+ else:
+ print(f"[GroundingPipeline/UI-TARS] pas de resultat")
+ except Exception as e:
+ print(f"[GroundingPipeline/UI-TARS] ERREUR: {e}")
+
+ return None
+
+ def _try_static(self, target: GroundingTarget) -> Optional[GroundingResult]:
+ """Fallback : coordonnees d'origine du workflow (centre du bounding box)."""
+ bbox = target.original_bbox
+ if not bbox:
+ return None
+
+ w = bbox.get('width', 0)
+ h = bbox.get('height', 0)
+ if not w or not h:
+ return None
+
+ x = int(bbox.get('x', 0) + w / 2)
+ y = int(bbox.get('y', 0) + h / 2)
+
+ print(f"[GroundingPipeline/static] fallback ({x}, {y}) "
+ f"depuis bbox {bbox}")
+
+ return GroundingResult(
+ x=x,
+ y=y,
+ method='static_fallback',
+ confidence=0.30,
+ time_ms=0.0,
+ )
diff --git a/core/grounding/server.py b/core/grounding/server.py
new file mode 100644
index 000000000..7d757dbd2
--- /dev/null
+++ b/core/grounding/server.py
@@ -0,0 +1,433 @@
+"""
+core/grounding/server.py — Serveur FastAPI de grounding visuel (port 8200)
+
+Charge UI-TARS-1.5-7B en 4-bit NF4 dans son propre process Python avec son
+propre contexte CUDA. Le backend Flask VWB (port 5002) et la boucle ORA
+appellent ce serveur en HTTP au lieu de charger le modele in-process.
+
+Lancement :
+ .venv/bin/python3 -m core.grounding.server
+
+Endpoints :
+ GET /health — verifie que le modele est charge
+ POST /ground — localise un element UI sur un screenshot
+"""
+
+import base64
+import gc
+import io
+import math
+import os
+import re
+import time
+from typing import Optional
+
+import torch
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+import uvicorn
+
+# ---------------------------------------------------------------------------
+# Configuration
+# ---------------------------------------------------------------------------
+
+PORT = int(os.environ.get("GROUNDING_PORT", 8200))
+MODEL_ID = "ByteDance-Seed/UI-TARS-1.5-7B"
+MIN_PIXELS = 100 * 28 * 28
+MAX_PIXELS = 16384 * 28 * 28
+
+# ---------------------------------------------------------------------------
+# Smart resize — identique a /tmp/test_uitars.py
+# ---------------------------------------------------------------------------
+
+def _smart_resize(height: int, width: int, factor: int = 28,
+ min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS):
+ """UI-TARS smart resize (memes defaults que le test valide)."""
+ h_bar = max(factor, round(height / factor) * factor)
+ w_bar = max(factor, round(width / factor) * factor)
+ if h_bar * w_bar > max_pixels:
+ beta = math.sqrt((height * width) / max_pixels)
+ h_bar = math.floor(height / beta / factor) * factor
+ w_bar = math.floor(width / beta / factor) * factor
+ elif h_bar * w_bar < min_pixels:
+ beta = math.sqrt(min_pixels / (height * width))
+ h_bar = math.ceil(height * beta / factor) * factor
+ w_bar = math.ceil(width * beta / factor) * factor
+ return h_bar, w_bar
+
+
+# ---------------------------------------------------------------------------
+# Prompt officiel UI-TARS — identique a /tmp/test_uitars.py
+# ---------------------------------------------------------------------------
+
+_GROUNDING_PROMPT = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task.
+
+## Output Format
+
+Thought: ...
+Action: ...
+
+
+## Action Space
+click(start_box='(x1, y1)')
+
+
+## User Instruction
+{instruction}"""
+
+
+# ---------------------------------------------------------------------------
+# Modele singleton
+# ---------------------------------------------------------------------------
+
+_model = None
+_processor = None
+_model_loaded = False
+
+
+def _evict_ollama_models():
+ """Libere les modeles Ollama de la VRAM avant de charger UI-TARS."""
+ try:
+ import requests
+ try:
+ ps_resp = requests.get('http://localhost:11434/api/ps', timeout=3)
+ if ps_resp.status_code == 200:
+ loaded = ps_resp.json().get('models', [])
+ model_names = [m.get('name', '') for m in loaded if m.get('name')]
+ else:
+ model_names = []
+ except Exception:
+ model_names = []
+
+ if not model_names:
+ print("[grounding-server] Aucun modele Ollama en VRAM")
+ return
+
+ for model_name in model_names:
+ try:
+ requests.post(
+ 'http://localhost:11434/api/generate',
+ json={'model': model_name, 'keep_alive': '0'},
+ timeout=5,
+ )
+ print(f"[grounding-server] Ollama: eviction de '{model_name}'")
+ except Exception:
+ pass
+
+ time.sleep(1.0)
+ print("[grounding-server] Modeles Ollama liberes")
+ except ImportError:
+ print("[grounding-server] requests non dispo, skip eviction Ollama")
+
+
+def _load_model():
+ """Charge UI-TARS-1.5-7B en 4-bit NF4 — code identique a /tmp/test_uitars.py."""
+ global _model, _processor, _model_loaded
+
+ if _model_loaded:
+ return
+
+ print("=" * 60)
+ print(f"[grounding-server] Chargement de {MODEL_ID}")
+ print("=" * 60)
+
+ if not torch.cuda.is_available():
+ raise RuntimeError("CUDA non disponible — le serveur de grounding necessite un GPU")
+
+ # Liberer la VRAM Ollama
+ _evict_ollama_models()
+
+ torch.cuda.empty_cache()
+ gc.collect()
+
+ from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig
+
+ bnb_config = BitsAndBytesConfig(
+ load_in_4bit=True,
+ bnb_4bit_quant_type="nf4",
+ bnb_4bit_compute_dtype=torch.bfloat16,
+ bnb_4bit_use_double_quant=True,
+ )
+
+ t0 = time.time()
+ _model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
+ MODEL_ID,
+ quantization_config=bnb_config,
+ device_map="auto",
+ )
+ _model.eval()
+
+ _processor = AutoProcessor.from_pretrained(
+ MODEL_ID,
+ min_pixels=MIN_PIXELS,
+ max_pixels=MAX_PIXELS,
+ )
+
+ _model_loaded = True
+ load_time = time.time() - t0
+ alloc = torch.cuda.memory_allocated() / 1024**3
+ peak = torch.cuda.max_memory_allocated() / 1024**3
+ print(f"[grounding-server] Modele charge en {load_time:.1f}s | "
+ f"VRAM: {alloc:.2f} GB (peak: {peak:.2f} GB)")
+
+
+def _capture_screen():
+ """Capture l'ecran complet via mss. Retourne PIL Image ou None."""
+ try:
+ import mss as mss_lib
+ from PIL import Image
+ with mss_lib.mss() as sct:
+ mon = sct.monitors[0]
+ grab = sct.grab(mon)
+ return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
+ except Exception as e:
+ print(f"[grounding-server] Erreur capture ecran: {e}")
+ return None
+
+
+def _parse_coordinates(raw: str, orig_w: int, orig_h: int,
+ resized_w: int, resized_h: int):
+ """Parse les coordonnees du modele — identique a /tmp/test_uitars.py.
+
+ Retourne (px, py, method_detail, confidence) ou None.
+ """
+ cx, cy = None, None
+
+ # Format 1: x y
+ pm = re.search(r'\s*(\d+)\s+(\d+)\s*', raw)
+ if pm:
+ cx, cy = int(pm.group(1)), int(pm.group(2))
+
+ # Format 2: start_box='(x, y)'
+ if cx is None:
+ bm = re.search(r"start_box=\s*['\"]?\((\d+)\s*,\s*(\d+)\)", raw)
+ if bm:
+ cx, cy = int(bm.group(1)), int(bm.group(2))
+
+ # Format 3: fallback x, y
+ if cx is None:
+ fm = re.search(r'(\d+)\s*,\s*(\d+)', raw)
+ if fm:
+ cx, cy = int(fm.group(1)), int(fm.group(2))
+
+ if cx is None or cy is None:
+ return None
+
+ # Conversion : tester les 2 interpretations, garder la meilleure
+ # Methode A : coordonnees dans l'espace de l'image resizee
+ px_r = int(cx / resized_w * orig_w)
+ py_r = int(cy / resized_h * orig_h)
+ delta_r = ((px_r - orig_w / 2) ** 2 + (py_r - orig_h / 2) ** 2) ** 0.5
+
+ # Methode B : coordonnees 0-1000
+ px_1k = int(cx / 1000 * orig_w)
+ py_1k = int(cy / 1000 * orig_h)
+ delta_1k = ((px_1k - orig_w / 2) ** 2 + (py_1k - orig_h / 2) ** 2) ** 0.5
+
+ # Heuristique du script valide : si coords dans les limites du resize,
+ # les deux sont possibles. UI-TARS utilise l'espace resize en natif.
+ if cx <= resized_w and cy <= resized_h:
+ in_screen_r = (0 <= px_r <= orig_w and 0 <= py_r <= orig_h)
+ in_screen_1k = (0 <= px_1k <= orig_w and 0 <= py_1k <= orig_h)
+
+ if in_screen_r and in_screen_1k:
+ px, py = px_r, py_r
+ method_detail = "resized"
+ elif in_screen_r:
+ px, py = px_r, py_r
+ method_detail = "resized"
+ else:
+ px, py = px_1k, py_1k
+ method_detail = "0-1000"
+ else:
+ px, py = px_1k, py_1k
+ method_detail = "0-1000"
+
+ confidence = 0.85 if ("start_box" in raw or "" in raw) else 0.70
+
+ print(f"[grounding-server] model=({cx},{cy}) -> pixel=({px},{py}) "
+ f"[{method_detail}] resized={resized_w}x{resized_h} orig={orig_w}x{orig_h}")
+
+ return px, py, method_detail, confidence
+
+
+# ---------------------------------------------------------------------------
+# FastAPI app
+# ---------------------------------------------------------------------------
+
+app = FastAPI(title="RPA Vision Grounding Server", version="1.0.0")
+
+
+class GroundRequest(BaseModel):
+ target_text: str = ""
+ target_description: str = ""
+ image_b64: str = ""
+
+
+class GroundResponse(BaseModel):
+ x: Optional[int] = None
+ y: Optional[int] = None
+ method: str = "ui_tars"
+ confidence: float = 0.85
+ time_ms: float = 0.0
+ raw_output: str = ""
+
+
+@app.get("/health")
+def health():
+ return {
+ "status": "ok" if _model_loaded else "loading",
+ "model": MODEL_ID,
+ "model_loaded": _model_loaded,
+ "cuda_available": torch.cuda.is_available(),
+ "vram_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2) if torch.cuda.is_available() else 0,
+ }
+
+
+@app.post("/ground", response_model=GroundResponse)
+def ground(req: GroundRequest):
+ if not _model_loaded:
+ raise HTTPException(status_code=503, detail="Modele pas encore charge")
+
+ from PIL import Image
+ from qwen_vl_utils import process_vision_info
+
+ # Construire l'instruction
+ parts = []
+ if req.target_text:
+ parts.append(req.target_text)
+ if req.target_description:
+ parts.append(req.target_description)
+ if not parts:
+ raise HTTPException(status_code=400, detail="target_text ou target_description requis")
+
+ instruction = f"Click on the {' — '.join(parts)}"
+
+ # Obtenir l'image (fournie en b64 ou capture ecran)
+ if req.image_b64:
+ try:
+ raw_b64 = req.image_b64.split(',')[1] if ',' in req.image_b64 else req.image_b64
+ img_data = base64.b64decode(raw_b64)
+ screen_pil = Image.open(io.BytesIO(img_data)).convert('RGB')
+ except Exception as e:
+ raise HTTPException(status_code=400, detail=f"Erreur decodage image: {e}")
+ else:
+ screen_pil = _capture_screen()
+ if screen_pil is None:
+ raise HTTPException(status_code=500, detail="Capture ecran echouee")
+
+ W, H = screen_pil.size
+ rH, rW = _smart_resize(H, W, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS)
+
+ # Sauver temporairement l'image pour qwen_vl_utils
+ import tempfile
+ tmp_path = os.path.join(tempfile.gettempdir(), f"grounding_screen_{os.getpid()}.png")
+ screen_pil.save(tmp_path)
+
+ try:
+ system_prompt = _GROUNDING_PROMPT.format(instruction=instruction)
+
+ messages = [
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "image",
+ "image": f"file://{tmp_path}",
+ "min_pixels": MIN_PIXELS,
+ "max_pixels": MAX_PIXELS,
+ },
+ {
+ "type": "text",
+ "text": system_prompt,
+ },
+ ],
+ }
+ ]
+
+ text = _processor.apply_chat_template(
+ messages, tokenize=False, add_generation_prompt=True
+ )
+ image_inputs, video_inputs = process_vision_info(messages)
+ inputs = _processor(
+ text=[text],
+ images=image_inputs,
+ videos=video_inputs,
+ padding=True,
+ return_tensors="pt",
+ ).to(_model.device)
+
+ # Inference
+ t0 = time.time()
+ with torch.no_grad():
+ gen = _model.generate(**inputs, max_new_tokens=256)
+ infer_ms = (time.time() - t0) * 1000
+
+ # Decoder
+ trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)]
+ raw = _processor.batch_decode(
+ trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
+ )[0].strip()
+
+ print(f"[grounding-server] '{instruction}' -> raw='{raw[:150]}' ({infer_ms:.0f}ms)")
+
+ # Détecter les réponses négatives (le modèle dit qu'il ne voit pas l'élément)
+ _raw_lower = raw.lower()
+ _negative_markers = ["don't see", "do not see", "cannot find", "can't find",
+ "not visible", "not found", "doesn't appear", "does not appear",
+ "i don't", "unable to find", "unable to locate"]
+ for _neg in _negative_markers:
+ if _neg in _raw_lower:
+ print(f"[grounding-server] NÉGATIF détecté: '{_neg}' → élément non trouvé")
+ return GroundResponse(x=None, y=None, method="ui_tars", confidence=0.0,
+ time_ms=round(infer_ms, 1), raw_output=raw[:300])
+
+ # Parser les coordonnees
+ parsed = _parse_coordinates(raw, W, H, rW, rH)
+ if parsed is None:
+ raise HTTPException(
+ status_code=422,
+ detail=f"Coordonnees non parsees dans la reponse: {raw[:200]}"
+ )
+
+ px, py, method_detail, confidence = parsed
+
+ print(f"[grounding-server] Resultat: ({px}, {py}) conf={confidence:.2f} "
+ f"[{method_detail}] ({infer_ms:.0f}ms)")
+
+ return GroundResponse(
+ x=px,
+ y=py,
+ method="ui_tars",
+ confidence=confidence,
+ time_ms=round(infer_ms, 1),
+ raw_output=raw[:300],
+ )
+
+ finally:
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+
+
+# ---------------------------------------------------------------------------
+# Entrypoint
+# ---------------------------------------------------------------------------
+
+@app.on_event("startup")
+async def startup_event():
+ """Charge le modele au demarrage du serveur."""
+ print(f"[grounding-server] Demarrage sur port {PORT}...")
+ _load_model()
+ print(f"[grounding-server] Pret a recevoir des requetes sur http://localhost:{PORT}")
+
+
+if __name__ == "__main__":
+ uvicorn.run(
+ "core.grounding.server:app",
+ host="0.0.0.0",
+ port=PORT,
+ log_level="info",
+ workers=1, # 1 seul worker (1 seul GPU)
+ )
diff --git a/core/grounding/target.py b/core/grounding/target.py
new file mode 100644
index 000000000..d4cc487f9
--- /dev/null
+++ b/core/grounding/target.py
@@ -0,0 +1,48 @@
+"""
+core/grounding/target.py — Types partagés pour le grounding visuel
+
+Dataclasses décrivant une cible à localiser (GroundingTarget) et
+le résultat d'une localisation (GroundingResult).
+
+Ces types sont la brique commune pour tous les modules de grounding :
+template matching, OCR, VLM, CLIP, etc.
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from typing import Dict, Optional
+
+
+@dataclass
+class GroundingTarget:
+ """Description d'un élément UI à localiser sur l'écran.
+
+ Attributs :
+ text : texte visible de l'élément (bouton, label, etc.)
+ description : description sémantique libre (ex: "le bouton Valider en bas à droite")
+ template_b64 : capture visuelle de l'élément, encodée en base64 PNG/JPEG
+ original_bbox : position d'origine lors de la capture {x, y, width, height}
+ """
+ text: str = ""
+ description: str = ""
+ template_b64: str = ""
+ original_bbox: Optional[Dict[str, int]] = field(default=None)
+
+
+@dataclass
+class GroundingResult:
+ """Résultat d'une localisation d'élément UI.
+
+ Attributs :
+ x : coordonnée X du centre de l'élément trouvé (pixels écran)
+ y : coordonnée Y du centre de l'élément trouvé (pixels écran)
+ method : méthode ayant produit le résultat ('template', 'ocr', 'vlm', 'clip', etc.)
+ confidence : score de confiance [0.0 – 1.0]
+ time_ms : temps de recherche en millisecondes
+ """
+ x: int
+ y: int
+ method: str
+ confidence: float
+ time_ms: float
diff --git a/core/grounding/template_matcher.py b/core/grounding/template_matcher.py
new file mode 100644
index 000000000..174ed9693
--- /dev/null
+++ b/core/grounding/template_matcher.py
@@ -0,0 +1,350 @@
+"""
+core/grounding/template_matcher.py — Template matching centralisé
+
+Fournit une classe TemplateMatcher qui localise une ancre visuelle (image template)
+dans un screenshot via cv2.matchTemplate. Supporte single-scale et multi-scale.
+
+Remplace les implémentations dupliquées dans :
+ - core/execution/observe_reason_act.py (~1348-1375)
+ - visual_workflow_builder/backend/api_v3/execute.py (~930-963)
+ - visual_workflow_builder/backend/catalog_routes_v2_vlm.py (~339-381)
+ - visual_workflow_builder/backend/services/intelligent_executor.py (~131-210)
+ - core/detection/omniparser_adapter.py (~330)
+
+Utilisation :
+ from core.grounding import TemplateMatcher, MatchResult
+
+ matcher = TemplateMatcher(threshold=0.75)
+ result = matcher.match_screen(anchor_b64="...")
+ if result:
+ print(f"Trouvé à ({result.x}, {result.y}) score={result.score:.3f}")
+"""
+
+from __future__ import annotations
+
+import base64
+import io
+import logging
+import time
+from dataclasses import dataclass
+from typing import List, Optional, Tuple
+
+logger = logging.getLogger(__name__)
+
+# Imports optionnels — le module se charge même sans cv2/PIL/mss
+try:
+ import cv2
+ _CV2 = True
+except ImportError:
+ _CV2 = False
+
+try:
+ import numpy as np
+ _NP = True
+except ImportError:
+ _NP = False
+
+try:
+ from PIL import Image
+ _PIL = True
+except ImportError:
+ _PIL = False
+
+try:
+ import mss as mss_lib
+ _MSS = True
+except ImportError:
+ _MSS = False
+
+
+# ---------------------------------------------------------------------------
+# Résultat d'un match
+# ---------------------------------------------------------------------------
+
+@dataclass
+class MatchResult:
+ """Résultat d'un template matching."""
+ x: int
+ y: int
+ score: float
+ method: str # 'template' | 'template_multiscale'
+ time_ms: float
+ scale: float = 1.0 # Échelle à laquelle le meilleur match a été trouvé
+
+
+# ---------------------------------------------------------------------------
+# TemplateMatcher
+# ---------------------------------------------------------------------------
+
+class TemplateMatcher:
+ """Localise une ancre visuelle dans un screenshot via template matching.
+
+ Paramètres :
+ threshold : score minimum pour accepter un match (défaut 0.75)
+ multiscale : active le matching multi-échelle (défaut False)
+ scales : liste d'échelles à tester en mode multi-scale
+ method : méthode cv2 (défaut cv2.TM_CCOEFF_NORMED)
+ grayscale : convertir en niveaux de gris avant matching (défaut False)
+ """
+
+ # Échelles par défaut pour le mode multi-scale, ordonnées par
+ # probabilité décroissante (1.0 en premier = rapide si ça matche)
+ DEFAULT_SCALES: List[float] = [1.0, 0.95, 1.05, 0.9, 1.1, 0.85, 1.15, 0.8, 1.2]
+
+ def __init__(
+ self,
+ threshold: float = 0.75,
+ multiscale: bool = False,
+ scales: Optional[List[float]] = None,
+ grayscale: bool = False,
+ ):
+ self.threshold = threshold
+ self.multiscale = multiscale
+ self.scales = scales or self.DEFAULT_SCALES
+ self.grayscale = grayscale
+ # cv2.TM_CCOEFF_NORMED est la méthode utilisée partout dans le projet
+ self._cv2_method = cv2.TM_CCOEFF_NORMED if _CV2 else None
+
+ # ------------------------------------------------------------------
+ # API publique
+ # ------------------------------------------------------------------
+
+ def match_screen(
+ self,
+ anchor_b64: Optional[str] = None,
+ anchor_pil: Optional["Image.Image"] = None,
+ screen_pil: Optional["Image.Image"] = None,
+ ) -> Optional[MatchResult]:
+ """Cherche l'ancre dans le screenshot courant (ou fourni).
+
+ L'ancre peut être passée en base64 ou en PIL Image.
+ Le screenshot est capturé via mss si non fourni.
+
+ Retourne un MatchResult ou None si aucun match >= seuil.
+ """
+ if not (_CV2 and _NP and _PIL):
+ logger.debug("[TemplateMatcher] cv2/numpy/PIL non disponible")
+ return None
+
+ # --- Préparer l'ancre ---
+ anchor_img = self._decode_anchor(anchor_b64, anchor_pil)
+ if anchor_img is None:
+ return None
+
+ # --- Préparer le screenshot ---
+ if screen_pil is None:
+ screen_pil = self._capture_screen()
+ if screen_pil is None:
+ return None
+
+ # --- Convertir en arrays cv2 ---
+ screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR)
+ anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR)
+
+ # --- Matching ---
+ if self.multiscale:
+ return self._match_multiscale(screen_cv, anchor_cv)
+ else:
+ return self._match_single(screen_cv, anchor_cv)
+
+ def match_in_region(
+ self,
+ region_cv: "np.ndarray",
+ anchor_cv: "np.ndarray",
+ threshold: Optional[float] = None,
+ ) -> Optional[MatchResult]:
+ """Match dans une région déjà découpée (arrays BGR).
+
+ Utilisé par les pipelines qui font leur propre capture/découpe.
+ """
+ if not (_CV2 and _NP):
+ return None
+
+ thr = threshold if threshold is not None else self.threshold
+
+ if self.multiscale:
+ return self._match_multiscale(region_cv, anchor_cv, threshold_override=thr)
+ else:
+ return self._match_single(region_cv, anchor_cv, threshold_override=thr)
+
+ def match_screen_diagnostic(
+ self,
+ anchor_b64: Optional[str] = None,
+ anchor_pil: Optional["Image.Image"] = None,
+ screen_pil: Optional["Image.Image"] = None,
+ ) -> str:
+ """Retourne un diagnostic textuel (score + position) même sans match."""
+ if not (_CV2 and _NP and _PIL):
+ return "cv2/numpy/PIL non dispo"
+
+ anchor_img = self._decode_anchor(anchor_b64, anchor_pil)
+ if anchor_img is None:
+ return "ancre non décodable"
+
+ if screen_pil is None:
+ screen_pil = self._capture_screen()
+ if screen_pil is None:
+ return "capture écran échouée"
+
+ screen_cv = cv2.cvtColor(np.array(screen_pil), cv2.COLOR_RGB2BGR)
+ anchor_cv = cv2.cvtColor(np.array(anchor_img), cv2.COLOR_RGB2BGR)
+
+ if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]:
+ return f"ancre {anchor_cv.shape[:2]} >= écran {screen_cv.shape[:2]}"
+
+ s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv)
+ result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
+ _, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
+ return f"{max_val:.3f} pos={max_loc}"
+
+ # ------------------------------------------------------------------
+ # Méthodes internes
+ # ------------------------------------------------------------------
+
+ def _match_single(
+ self,
+ screen_cv: "np.ndarray",
+ anchor_cv: "np.ndarray",
+ threshold_override: Optional[float] = None,
+ ) -> Optional[MatchResult]:
+ """Template matching single-scale."""
+ threshold = threshold_override if threshold_override is not None else self.threshold
+
+ if anchor_cv.shape[0] >= screen_cv.shape[0] or anchor_cv.shape[1] >= screen_cv.shape[1]:
+ logger.debug("[TemplateMatcher] Ancre plus grande que le screen")
+ return None
+
+ s_img, a_img = self._maybe_grayscale(screen_cv, anchor_cv)
+
+ t0 = time.time()
+ result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
+ _, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
+ elapsed_ms = (time.time() - t0) * 1000
+
+ logger.debug(
+ "[TemplateMatcher] score=%.3f pos=%s (%.0fms)",
+ max_val, max_loc, elapsed_ms,
+ )
+
+ if max_val >= threshold:
+ cx = max_loc[0] + anchor_cv.shape[1] // 2
+ cy = max_loc[1] + anchor_cv.shape[0] // 2
+ return MatchResult(
+ x=cx,
+ y=cy,
+ score=float(max_val),
+ method='template',
+ time_ms=elapsed_ms,
+ scale=1.0,
+ )
+ return None
+
+ def _match_multiscale(
+ self,
+ screen_cv: "np.ndarray",
+ anchor_cv: "np.ndarray",
+ threshold_override: Optional[float] = None,
+ ) -> Optional[MatchResult]:
+ """Template matching multi-scale."""
+ threshold = threshold_override if threshold_override is not None else self.threshold
+
+ best_score = -1.0
+ best_loc = None
+ best_scale = 1.0
+ best_anchor_shape = anchor_cv.shape
+
+ t0 = time.time()
+
+ for scale in self.scales:
+ if scale == 1.0:
+ scaled = anchor_cv
+ else:
+ new_w = int(anchor_cv.shape[1] * scale)
+ new_h = int(anchor_cv.shape[0] * scale)
+ if new_w < 8 or new_h < 8:
+ continue
+ if new_h >= screen_cv.shape[0] or new_w >= screen_cv.shape[1]:
+ continue
+ scaled = cv2.resize(anchor_cv, (new_w, new_h), interpolation=cv2.INTER_AREA)
+
+ if scaled.shape[0] >= screen_cv.shape[0] or scaled.shape[1] >= screen_cv.shape[1]:
+ continue
+
+ s_img, a_img = self._maybe_grayscale(screen_cv, scaled)
+ result_tm = cv2.matchTemplate(s_img, a_img, self._cv2_method)
+ _, max_val, _, max_loc = cv2.minMaxLoc(result_tm)
+
+ if max_val > best_score:
+ best_score = max_val
+ best_loc = max_loc
+ best_scale = scale
+ best_anchor_shape = scaled.shape
+
+ elapsed_ms = (time.time() - t0) * 1000
+
+ logger.debug(
+ "[TemplateMatcher/multiscale] best_score=%.3f scale=%.2f (%.0fms)",
+ best_score, best_scale, elapsed_ms,
+ )
+
+ if best_score >= threshold and best_loc is not None:
+ cx = best_loc[0] + best_anchor_shape[1] // 2
+ cy = best_loc[1] + best_anchor_shape[0] // 2
+ return MatchResult(
+ x=cx,
+ y=cy,
+ score=float(best_score),
+ method='template_multiscale',
+ time_ms=elapsed_ms,
+ scale=best_scale,
+ )
+ return None
+
+ def _maybe_grayscale(
+ self,
+ screen: "np.ndarray",
+ anchor: "np.ndarray",
+ ) -> Tuple["np.ndarray", "np.ndarray"]:
+ """Convertit en niveaux de gris si self.grayscale est True."""
+ if not self.grayscale:
+ return screen, anchor
+ s = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY) if len(screen.shape) == 3 else screen
+ a = cv2.cvtColor(anchor, cv2.COLOR_BGR2GRAY) if len(anchor.shape) == 3 else anchor
+ return s, a
+
+ @staticmethod
+ def _decode_anchor(
+ anchor_b64: Optional[str],
+ anchor_pil: Optional["Image.Image"],
+ ) -> Optional["Image.Image"]:
+ """Décode l'ancre depuis base64 ou retourne le PIL directement."""
+ if anchor_pil is not None:
+ return anchor_pil
+
+ if anchor_b64 is None:
+ logger.debug("[TemplateMatcher] Ni anchor_b64 ni anchor_pil fourni")
+ return None
+
+ try:
+ raw = anchor_b64.split(',')[1] if ',' in anchor_b64 else anchor_b64
+ data = base64.b64decode(raw)
+ return Image.open(io.BytesIO(data))
+ except Exception as e:
+ logger.debug("[TemplateMatcher] Erreur décodage ancre: %s", e)
+ return None
+
+ @staticmethod
+ def _capture_screen() -> Optional["Image.Image"]:
+ """Capture l'écran complet via mss (moniteur 0 = tous les écrans)."""
+ if not _MSS:
+ logger.debug("[TemplateMatcher] mss non disponible")
+ return None
+
+ try:
+ with mss_lib.mss() as sct:
+ mon = sct.monitors[0]
+ grab = sct.grab(mon)
+ return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
+ except Exception as e:
+ logger.debug("[TemplateMatcher] Erreur capture écran: %s", e)
+ return None
diff --git a/core/grounding/ui_tars_grounder.py b/core/grounding/ui_tars_grounder.py
new file mode 100644
index 000000000..dbd028b30
--- /dev/null
+++ b/core/grounding/ui_tars_grounder.py
@@ -0,0 +1,204 @@
+"""
+core/grounding/ui_tars_grounder.py — Client HTTP pour le serveur de grounding
+
+Remplace le chargement in-process du modele UI-TARS (qui crashe dans Flask
+a cause de conflits CUDA) par un CLIENT HTTP qui appelle le serveur de
+grounding separe sur le port 8200.
+
+Le serveur est lance separement via :
+ .venv/bin/python3 -m core.grounding.server
+
+Utilisation (inchangee) :
+ from core.grounding.ui_tars_grounder import UITarsGrounder
+
+ grounder = UITarsGrounder.get_instance()
+ result = grounder.ground("Bouton Valider", "le bouton vert en bas a droite")
+ if result:
+ print(f"Trouve a ({result.x}, {result.y})")
+"""
+
+from __future__ import annotations
+
+import base64
+import io
+import os
+import threading
+import time
+from typing import Optional
+
+from core.grounding.target import GroundingResult
+
+# ---------------------------------------------------------------------------
+# Singleton
+# ---------------------------------------------------------------------------
+
+_instance: Optional[UITarsGrounder] = None
+_instance_lock = threading.Lock()
+
+
+class UITarsGrounder:
+ """Client HTTP pour le serveur de grounding UI-TARS (port 8200).
+
+ Singleton : utiliser get_instance() pour obtenir l'instance unique.
+ Le serveur doit etre lance separement (.venv/bin/python3 -m core.grounding.server).
+ """
+
+ SERVER_URL = os.environ.get("GROUNDING_SERVER_URL", "http://localhost:8200")
+
+ def __init__(self):
+ self._server_available: Optional[bool] = None
+ self._last_check = 0.0
+
+ @classmethod
+ def get_instance(cls) -> UITarsGrounder:
+ """Retourne l'instance singleton du grounder."""
+ global _instance
+ if _instance is None:
+ with _instance_lock:
+ if _instance is None:
+ _instance = cls()
+ return _instance
+
+ # ------------------------------------------------------------------
+ # Verification du serveur
+ # ------------------------------------------------------------------
+
+ def _check_server(self, force: bool = False) -> bool:
+ """Verifie si le serveur de grounding est disponible.
+
+ Cache le resultat pendant 30 secondes pour eviter le spam.
+ """
+ now = time.time()
+ if not force and self._server_available is not None and (now - self._last_check) < 30:
+ return self._server_available
+
+ try:
+ import requests
+ resp = requests.get(f"{self.SERVER_URL}/health", timeout=3)
+ if resp.status_code == 200:
+ data = resp.json()
+ self._server_available = data.get("model_loaded", False)
+ if not self._server_available:
+ print(f"[UI-TARS/client] Serveur en cours de chargement...")
+ else:
+ self._server_available = False
+ except Exception:
+ self._server_available = False
+
+ self._last_check = now
+
+ if not self._server_available:
+ print(f"[UI-TARS/client] Serveur non disponible sur {self.SERVER_URL} "
+ f"— lancer: .venv/bin/python3 -m core.grounding.server")
+
+ return self._server_available
+
+ @property
+ def is_loaded(self) -> bool:
+ """Compatibilite : verifie si le serveur est pret."""
+ return self._check_server()
+
+ def load(self) -> None:
+ """Compatibilite : ne fait rien (le serveur charge le modele au demarrage)."""
+ if not self._check_server(force=True):
+ print(f"[UI-TARS/client] ATTENTION: serveur non disponible sur {self.SERVER_URL}")
+ print(f"[UI-TARS/client] Lancer le serveur: .venv/bin/python3 -m core.grounding.server")
+
+ def unload(self) -> None:
+ """Compatibilite : ne fait rien (le modele vit dans le process serveur)."""
+ pass
+
+ # ------------------------------------------------------------------
+ # Grounding via HTTP
+ # ------------------------------------------------------------------
+
+ def ground(
+ self,
+ target_text: str = "",
+ target_description: str = "",
+ screen_pil: Optional["PIL.Image.Image"] = None,
+ ) -> Optional[GroundingResult]:
+ """Localise un element UI en appelant le serveur de grounding.
+
+ Args:
+ target_text: texte visible de l'element (ex: "Valider", "Rechercher")
+ target_description: description semantique (ex: "le bouton vert en bas")
+ screen_pil: screenshot PIL, le serveur capture si None
+
+ Returns:
+ GroundingResult avec coordonnees en pixels ecran, ou None si echec
+ """
+ if not target_text and not target_description:
+ print("[UI-TARS/client] Pas de target_text ni target_description")
+ return None
+
+ # Verifier que le serveur est disponible
+ if not self._check_server():
+ return None
+
+ import requests
+
+ # Encoder l'image en base64 si fournie
+ image_b64 = ""
+ if screen_pil is not None:
+ try:
+ buffer = io.BytesIO()
+ screen_pil.save(buffer, format='PNG')
+ image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
+ except Exception as e:
+ print(f"[UI-TARS/client] Erreur encodage image: {e}")
+ # Continuer sans image — le serveur capturera l'ecran
+
+ payload = {
+ "target_text": target_text,
+ "target_description": target_description,
+ "image_b64": image_b64,
+ }
+
+ try:
+ t0 = time.time()
+ resp = requests.post(
+ f"{self.SERVER_URL}/ground",
+ json=payload,
+ timeout=30, # UI-TARS peut prendre 3-5s + overhead reseau
+ )
+ total_ms = (time.time() - t0) * 1000
+
+ if resp.status_code == 200:
+ data = resp.json()
+ result = GroundingResult(
+ x=data["x"],
+ y=data["y"],
+ method=data.get("method", "ui_tars"),
+ confidence=data.get("confidence", 0.85),
+ time_ms=data.get("time_ms", total_ms),
+ )
+ print(f"[UI-TARS/client] '{target_text or target_description}' -> "
+ f"({result.x}, {result.y}) conf={result.confidence:.2f} "
+ f"({result.time_ms:.0f}ms)")
+ return result
+
+ elif resp.status_code == 422:
+ # Coordonnees non parsees
+ detail = resp.json().get("detail", "")
+ print(f"[UI-TARS/client] Pas de coordonnees parsees: {detail[:150]}")
+ return None
+
+ elif resp.status_code == 503:
+ print(f"[UI-TARS/client] Serveur pas encore pret (modele en chargement)")
+ return None
+
+ else:
+ print(f"[UI-TARS/client] Erreur HTTP {resp.status_code}: {resp.text[:200]}")
+ return None
+
+ except requests.exceptions.ConnectionError:
+ self._server_available = False
+ print(f"[UI-TARS/client] Serveur non joignable sur {self.SERVER_URL}")
+ return None
+ except requests.exceptions.Timeout:
+ print(f"[UI-TARS/client] Timeout (>30s) pour '{target_text}'")
+ return None
+ except Exception as e:
+ print(f"[UI-TARS/client] Erreur inattendue: {e}")
+ return None
diff --git a/tests/unit/test_template_matcher.py b/tests/unit/test_template_matcher.py
new file mode 100644
index 000000000..a9eae199a
--- /dev/null
+++ b/tests/unit/test_template_matcher.py
@@ -0,0 +1,311 @@
+"""Tests pour core/grounding/template_matcher.py"""
+
+import base64
+import io
+import time
+from unittest.mock import MagicMock, patch
+
+import cv2
+import numpy as np
+import pytest
+from PIL import Image
+
+from core.grounding.template_matcher import MatchResult, TemplateMatcher
+
+
+# ---------------------------------------------------------------------------
+# Helpers
+# ---------------------------------------------------------------------------
+
+def _make_image(w: int, h: int, color: tuple = (128, 128, 128)) -> Image.Image:
+ """Crée une image PIL unie."""
+ img = Image.new('RGB', (w, h), color)
+ return img
+
+
+def _pil_to_b64(img: Image.Image) -> str:
+ """Encode une image PIL en base64 PNG."""
+ buf = io.BytesIO()
+ img.save(buf, format='PNG')
+ return base64.b64encode(buf.getvalue()).decode()
+
+
+def _make_screen_with_target(
+ screen_w: int = 800,
+ screen_h: int = 600,
+ target_x: int = 300,
+ target_y: int = 200,
+ target_w: int = 60,
+ target_h: int = 40,
+):
+ """Crée un screen bruité avec un motif unique et l'ancre correspondante.
+
+ Le screen a un fond aléatoire (bruit) pour que le template matching
+ ne puisse matcher qu'à l'endroit exact du motif injecté.
+ """
+ rng = np.random.RandomState(42)
+ # Fond bruité — chaque pixel est différent, pas de faux match possible
+ screen = rng.randint(0, 256, (screen_h, screen_w, 3), dtype=np.uint8)
+
+ # Injecter un motif déterministe unique (damier rouge/bleu)
+ target = np.zeros((target_h, target_w, 3), dtype=np.uint8)
+ for r in range(target_h):
+ for c in range(target_w):
+ if (r + c) % 2 == 0:
+ target[r, c] = [255, 0, 0] # rouge
+ else:
+ target[r, c] = [0, 0, 255] # bleu
+ screen[target_y:target_y + target_h, target_x:target_x + target_w] = target
+ screen_pil = Image.fromarray(screen)
+
+ # L'ancre est exactement le même motif
+ anchor_pil = Image.fromarray(target)
+
+ expected_cx = target_x + target_w // 2
+ expected_cy = target_y + target_h // 2
+
+ return screen_pil, anchor_pil, expected_cx, expected_cy
+
+
+# ---------------------------------------------------------------------------
+# Tests MatchResult
+# ---------------------------------------------------------------------------
+
+class TestMatchResult:
+ def test_fields(self):
+ r = MatchResult(x=100, y=200, score=0.85, method='template', time_ms=5.0)
+ assert r.x == 100
+ assert r.y == 200
+ assert r.score == 0.85
+ assert r.method == 'template'
+ assert r.time_ms == 5.0
+ assert r.scale == 1.0 # default
+
+ def test_with_scale(self):
+ r = MatchResult(x=10, y=20, score=0.9, method='template_multiscale', time_ms=12.0, scale=0.95)
+ assert r.scale == 0.95
+
+
+# ---------------------------------------------------------------------------
+# Tests TemplateMatcher — init
+# ---------------------------------------------------------------------------
+
+class TestTemplateMatcherInit:
+ def test_defaults(self):
+ m = TemplateMatcher()
+ assert m.threshold == 0.75
+ assert m.multiscale is False
+ assert m.grayscale is False
+
+ def test_custom_params(self):
+ m = TemplateMatcher(threshold=0.5, multiscale=True, grayscale=True, scales=[1.0, 0.8])
+ assert m.threshold == 0.5
+ assert m.multiscale is True
+ assert m.grayscale is True
+ assert m.scales == [1.0, 0.8]
+
+
+# ---------------------------------------------------------------------------
+# Tests TemplateMatcher — _decode_anchor
+# ---------------------------------------------------------------------------
+
+class TestDecodeAnchor:
+ def test_pil_passthrough(self):
+ img = _make_image(50, 50)
+ result = TemplateMatcher._decode_anchor(None, img)
+ assert result is img
+
+ def test_b64_decode(self):
+ img = _make_image(50, 50, (255, 0, 0))
+ b64 = _pil_to_b64(img)
+ result = TemplateMatcher._decode_anchor(b64, None)
+ assert result is not None
+ assert result.size == (50, 50)
+
+ def test_b64_with_data_prefix(self):
+ img = _make_image(30, 30)
+ b64 = "data:image/png;base64," + _pil_to_b64(img)
+ result = TemplateMatcher._decode_anchor(b64, None)
+ assert result is not None
+
+ def test_none_inputs(self):
+ result = TemplateMatcher._decode_anchor(None, None)
+ assert result is None
+
+ def test_invalid_b64(self):
+ result = TemplateMatcher._decode_anchor("not-valid-base64!!!", None)
+ assert result is None
+
+
+# ---------------------------------------------------------------------------
+# Tests TemplateMatcher — match_screen avec screen_pil fourni
+# ---------------------------------------------------------------------------
+
+class TestMatchScreenWithPIL:
+ def test_exact_match(self):
+ screen, anchor, cx, cy = _make_screen_with_target()
+ m = TemplateMatcher(threshold=0.75)
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ assert result is not None
+ assert abs(result.x - cx) <= 1
+ assert abs(result.y - cy) <= 1
+ assert result.score > 0.9
+ assert result.method == 'template'
+ assert result.time_ms >= 0
+
+ def test_no_match(self):
+ # Screen bruité, ancre = damier unique absent du screen
+ rng = np.random.RandomState(123)
+ screen_np = rng.randint(0, 256, (600, 800, 3), dtype=np.uint8)
+ screen = Image.fromarray(screen_np)
+
+ # Ancre = damier régulier non présent dans le bruit
+ anchor_np = np.zeros((40, 60, 3), dtype=np.uint8)
+ for r in range(40):
+ for c in range(60):
+ anchor_np[r, c] = [255, 255, 0] if (r + c) % 2 == 0 else [0, 255, 255]
+ anchor = Image.fromarray(anchor_np)
+
+ m = TemplateMatcher(threshold=0.75)
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ assert result is None
+
+ def test_b64_anchor(self):
+ screen, anchor, cx, cy = _make_screen_with_target()
+ b64 = _pil_to_b64(anchor)
+ m = TemplateMatcher(threshold=0.75)
+ result = m.match_screen(anchor_b64=b64, screen_pil=screen)
+ assert result is not None
+ assert abs(result.x - cx) <= 1
+
+ def test_anchor_bigger_than_screen(self):
+ screen = _make_image(100, 100)
+ anchor = _make_image(200, 200)
+ m = TemplateMatcher()
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ assert result is None
+
+ def test_threshold_configurable(self):
+ screen, anchor, cx, cy = _make_screen_with_target()
+ # Avec un seuil de 0.999, le match exact devrait quand même passer (score=1.0)
+ m = TemplateMatcher(threshold=0.999)
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ # Le score d'un match pixel-perfect peut être 1.0 ou très proche
+ # On accepte les deux cas
+ if result:
+ assert result.score >= 0.999
+
+
+# ---------------------------------------------------------------------------
+# Tests TemplateMatcher — multi-scale
+# ---------------------------------------------------------------------------
+
+class TestMultiscale:
+ def test_multiscale_exact(self):
+ screen, anchor, cx, cy = _make_screen_with_target()
+ m = TemplateMatcher(threshold=0.75, multiscale=True)
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ assert result is not None
+ assert abs(result.x - cx) <= 2
+ assert abs(result.y - cy) <= 2
+ assert result.score > 0.9
+
+ def test_multiscale_scaled_anchor(self):
+ """L'ancre a été capturée à une échelle légèrement différente.
+
+ On utilise un motif plus gros (bloc de couleur unie) pour que le resize
+ ne détruise pas le pattern comme avec un damier fin.
+ """
+ # Screen bruité + gros bloc rouge
+ rng = np.random.RandomState(42)
+ screen_np = rng.randint(50, 200, (600, 800, 3), dtype=np.uint8)
+ target = np.full((80, 120, 3), dtype=np.uint8, fill_value=0)
+ target[:, :] = [220, 30, 30] # rouge vif unique
+ # Ajouter un bord vert pour le rendre encore plus unique
+ target[:5, :] = [30, 220, 30]
+ target[-5:, :] = [30, 220, 30]
+ screen_np[200:280, 300:420] = target
+ screen = Image.fromarray(screen_np)
+
+ # L'ancre d'origine
+ anchor_original = Image.fromarray(target)
+ # L'ancre à 105% (scale modeste pour que ça reste réaliste)
+ w, h = anchor_original.size
+ scaled_anchor = anchor_original.resize((int(w * 1.05), int(h * 1.05)), Image.BILINEAR)
+
+ m_multi = TemplateMatcher(threshold=0.60, multiscale=True)
+ result_multi = m_multi.match_screen(anchor_pil=scaled_anchor, screen_pil=screen)
+ assert result_multi is not None
+ assert result_multi.method == 'template_multiscale'
+
+ def test_multiscale_anchor_too_small(self):
+ """Ancre très petite — certaines échelles sont sautées."""
+ screen = _make_image(800, 600)
+ anchor = _make_image(5, 5, (255, 0, 0))
+ m = TemplateMatcher(threshold=0.99, multiscale=True, scales=[0.5, 0.3])
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ # Pas de crash même avec des échelles qui produisent < 8px
+ # Le résultat peut être None ou un match selon le contenu
+
+
+# ---------------------------------------------------------------------------
+# Tests TemplateMatcher — match_in_region
+# ---------------------------------------------------------------------------
+
+class TestMatchInRegion:
+ def test_region_match(self):
+ # Créer une region BGR bruitée avec un motif damier injecté
+ rng = np.random.RandomState(77)
+ region = rng.randint(0, 256, (200, 300, 3), dtype=np.uint8)
+ # Motif damier en BGR
+ anchor = np.zeros((40, 60, 3), dtype=np.uint8)
+ for r in range(40):
+ for c in range(60):
+ if (r + c) % 2 == 0:
+ anchor[r, c] = [255, 0, 0]
+ else:
+ anchor[r, c] = [0, 0, 255]
+ region[50:90, 100:160] = anchor
+
+ m = TemplateMatcher(threshold=0.75)
+ result = m.match_in_region(region, anchor)
+ assert result is not None
+ assert abs(result.x - 130) <= 1 # 100 + 60//2
+ assert abs(result.y - 70) <= 1 # 50 + 40//2
+
+ def test_region_no_match(self):
+ # Region bruitée, ancre damier absente
+ rng = np.random.RandomState(88)
+ region = rng.randint(0, 256, (200, 300, 3), dtype=np.uint8)
+ anchor = np.zeros((40, 60, 3), dtype=np.uint8)
+ for r in range(40):
+ for c in range(60):
+ anchor[r, c] = [255, 255, 0] if (r + c) % 2 == 0 else [0, 255, 255]
+
+ m = TemplateMatcher(threshold=0.75)
+ result = m.match_in_region(region, anchor)
+ assert result is None
+
+
+# ---------------------------------------------------------------------------
+# Tests grayscale mode
+# ---------------------------------------------------------------------------
+
+class TestGrayscale:
+ def test_grayscale_match(self):
+ screen, anchor, cx, cy = _make_screen_with_target()
+ m = TemplateMatcher(threshold=0.75, grayscale=True)
+ result = m.match_screen(anchor_pil=anchor, screen_pil=screen)
+ assert result is not None
+ assert abs(result.x - cx) <= 1
+
+
+# ---------------------------------------------------------------------------
+# Tests _capture_screen (mocké)
+# ---------------------------------------------------------------------------
+
+class TestCaptureScreen:
+ @patch('core.grounding.template_matcher._MSS', False)
+ def test_no_mss(self):
+ result = TemplateMatcher._capture_screen()
+ assert result is None
diff --git a/tools/benchmark_grounding.py b/tools/benchmark_grounding.py
new file mode 100644
index 000000000..f041034a5
--- /dev/null
+++ b/tools/benchmark_grounding.py
@@ -0,0 +1,218 @@
+#!/usr/bin/env python3
+"""
+Benchmark complet des méthodes de grounding visuel.
+À lancer avec la VM Windows visible à l'écran, bureau avec dossier Demo.
+
+Usage:
+ cd ~/ai/rpa_vision_v3
+ .venv/bin/python3 tools/benchmark_grounding.py
+"""
+import mss, io, base64, requests, time, re, cv2, numpy as np, os, glob, json
+from PIL import Image
+
+OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
+ANCHOR_DIR = 'visual_workflow_builder/backend/data/anchors'
+
+
+def capture_screen():
+ with mss.mss() as sct:
+ grab = sct.grab(sct.monitors[0])
+ screen = Image.frombytes('RGB', grab.size, grab.rgb)
+ return screen
+
+
+def screen_to_b64(screen):
+ buf = io.BytesIO()
+ screen.save(buf, format='JPEG', quality=70)
+ return base64.b64encode(buf.getvalue()).decode()
+
+
+def parse_coords(text, screen_w, screen_h):
+ for pat in [
+ r"start_box='?\\|?box_start\|?\>?\((\d+),(\d+)\)",
+ r'\((\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\)',
+ r'\[(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\]',
+ ]:
+ m = re.search(pat, text)
+ if m:
+ rx, ry = float(m.group(1)), float(m.group(2))
+ if rx <= 1.0 and ry <= 1.0:
+ return int(rx * screen_w), int(ry * screen_h)
+ elif rx <= 1000 and ry <= 1000:
+ return int(rx * screen_w / 1000), int(ry * screen_h / 1000)
+ return int(rx), int(ry)
+ return None
+
+
+def test_vlm(model, prompt, b64, screen_w, screen_h):
+ t0 = time.time()
+ try:
+ resp = requests.post(f'{OLLAMA_URL}/api/generate', json={
+ 'model': model, 'prompt': prompt, 'images': [b64],
+ 'stream': False, 'options': {'temperature': 0.0, 'num_predict': 50}
+ }, timeout=60)
+ elapsed = time.time() - t0
+ if resp.status_code != 200:
+ return elapsed, None, f"HTTP {resp.status_code}"
+ text = resp.json().get('response', '').strip()
+ coords = parse_coords(text, screen_w, screen_h)
+ return elapsed, coords, text[:120]
+ except Exception as e:
+ return time.time() - t0, None, str(e)[:80]
+
+
+def test_template(screen_gray, anchor_path):
+ anchor = cv2.imread(anchor_path, cv2.IMREAD_GRAYSCALE)
+ if anchor is None:
+ return None
+ ah, aw = anchor.shape[:2]
+ if ah >= screen_gray.shape[0] or aw >= screen_gray.shape[1]:
+ return None
+ t0 = time.time()
+ result = cv2.matchTemplate(screen_gray, anchor, cv2.TM_CCOEFF_NORMED)
+ _, max_val, _, max_loc = cv2.minMaxLoc(result)
+ elapsed = (time.time() - t0) * 1000
+ return {
+ 'method': 'template', 'time_ms': elapsed,
+ 'score': max_val, 'pos': (max_loc[0] + aw//2, max_loc[1] + ah//2)
+ }
+
+
+def test_template_multiscale(screen_gray, anchor_path, scales=(0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3)):
+ anchor = cv2.imread(anchor_path, cv2.IMREAD_GRAYSCALE)
+ if anchor is None:
+ return None
+ ah, aw = anchor.shape[:2]
+ t0 = time.time()
+ best_val, best_loc, best_scale = 0, None, 1.0
+ for s in scales:
+ resized = cv2.resize(anchor, None, fx=s, fy=s)
+ rh, rw = resized.shape[:2]
+ if rh >= screen_gray.shape[0] or rw >= screen_gray.shape[1]:
+ continue
+ res = cv2.matchTemplate(screen_gray, resized, cv2.TM_CCOEFF_NORMED)
+ _, mv, _, ml = cv2.minMaxLoc(res)
+ if mv > best_val:
+ best_val, best_loc, best_scale = mv, ml, s
+ elapsed = (time.time() - t0) * 1000
+ if best_loc is None:
+ return None
+ rh, rw = int(ah * best_scale), int(aw * best_scale)
+ return {
+ 'method': 'template_multiscale', 'time_ms': elapsed,
+ 'score': best_val, 'pos': (best_loc[0] + rw//2, best_loc[1] + rh//2),
+ 'scale': best_scale
+ }
+
+
+def test_orb(screen_gray, anchor_path, max_distance=50):
+ anchor = cv2.imread(anchor_path, cv2.IMREAD_GRAYSCALE)
+ if anchor is None:
+ return None
+ t0 = time.time()
+ orb = cv2.ORB_create(nfeatures=1000)
+ kp1, des1 = orb.detectAndCompute(anchor, None)
+ kp2, des2 = orb.detectAndCompute(screen_gray, None)
+ if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
+ return {'method': 'ORB', 'time_ms': (time.time()-t0)*1000, 'matches': 0, 'pos': None}
+ bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
+ matches = bf.match(des1, des2)
+ good = sorted([m for m in matches if m.distance < max_distance], key=lambda m: m.distance)
+ elapsed = (time.time() - t0) * 1000
+ pos = None
+ if len(good) >= 4:
+ pts = np.float32([kp2[m.trainIdx].pt for m in good])
+ pos = (int(np.median(pts[:, 0])), int(np.median(pts[:, 1])))
+ return {'method': 'ORB', 'time_ms': elapsed, 'matches': len(good), 'pos': pos}
+
+
+def test_akaze(screen_gray, anchor_path, max_distance=80):
+ anchor = cv2.imread(anchor_path, cv2.IMREAD_GRAYSCALE)
+ if anchor is None:
+ return None
+ t0 = time.time()
+ akaze = cv2.AKAZE_create()
+ kp1, des1 = akaze.detectAndCompute(anchor, None)
+ kp2, des2 = akaze.detectAndCompute(screen_gray, None)
+ if des1 is None or des2 is None or len(des1) < 2 or len(des2) < 2:
+ return {'method': 'AKAZE', 'time_ms': (time.time()-t0)*1000, 'matches': 0, 'pos': None}
+ bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
+ matches = bf.match(des1, des2)
+ good = sorted([m for m in matches if m.distance < max_distance], key=lambda m: m.distance)
+ elapsed = (time.time() - t0) * 1000
+ pos = None
+ if len(good) >= 4:
+ pts = np.float32([kp2[m.trainIdx].pt for m in good])
+ pos = (int(np.median(pts[:, 0])), int(np.median(pts[:, 1])))
+ return {'method': 'AKAZE', 'time_ms': elapsed, 'matches': len(good), 'pos': pos}
+
+
+def main():
+ print("="*70)
+ print("BENCHMARK GROUNDING — Léa RPA Vision")
+ print("="*70)
+
+ screen = capture_screen()
+ screen_w, screen_h = screen.size
+ b64 = screen_to_b64(screen)
+ screen_cv = cv2.cvtColor(np.array(screen), cv2.COLOR_RGB2BGR)
+ screen_gray = cv2.cvtColor(screen_cv, cv2.COLOR_BGR2GRAY)
+ print(f"Écran: {screen_w}x{screen_h}\n")
+
+ # ── VLM grounding ──
+ print("─── VLM GROUNDING (cible: 'Demo folder') ───")
+ vlm_tests = [
+ ("qwen3-vl:8b", 'Click on "Demo folder". Return the action in format: click(start_box="(x,y)") with coordinates normalized 0-1000.'),
+ ("qwen2.5vl:7b", 'Click on "Demo folder". Return the action in format: click(start_box="(x,y)") with coordinates normalized 0-1000.'),
+ ("moondream:latest", 'Where is the "Demo" folder icon? Give coordinates as (x, y) in pixels.'),
+ ("gemma4:latest", 'Click on "Demo folder". Return the action in format: click(start_box="(x,y)") with coordinates normalized 0-1000.'),
+ ]
+ for model, prompt in vlm_tests:
+ elapsed, coords, text = test_vlm(model, prompt, b64, screen_w, screen_h)
+ coord_str = f"({coords[0]:4d}, {coords[1]:4d})" if coords else " — "
+ print(f" {model:35s} {elapsed:5.1f}s {coord_str} {text[:60]}")
+
+ # ── OpenCV ──
+ print(f"\n─── OPENCV (ancres de {ANCHOR_DIR}) ───")
+ thumbs = sorted(glob.glob(f'{ANCHOR_DIR}/*_thumb.png'))[:5]
+ full_imgs = sorted(glob.glob(f'{ANCHOR_DIR}/*_full.png'))[:5]
+
+ for thumb_path in thumbs:
+ name = os.path.basename(thumb_path).replace('_thumb.png', '')[:30]
+ ah, aw = cv2.imread(thumb_path, cv2.IMREAD_GRAYSCALE).shape[:2] if cv2.imread(thumb_path) is not None else (0,0)
+ print(f"\n Ancre: {name} ({aw}x{ah})")
+
+ r = test_template(screen_gray, thumb_path)
+ if r:
+ print(f" Template: {r['time_ms']:6.1f}ms score={r['score']:.3f} pos={r['pos']}")
+
+ r = test_template_multiscale(screen_gray, thumb_path)
+ if r:
+ print(f" Template multi-s: {r['time_ms']:6.1f}ms score={r['score']:.3f} pos={r['pos']} scale={r['scale']}")
+
+ r = test_orb(screen_gray, thumb_path)
+ if r:
+ print(f" ORB: {r['time_ms']:6.1f}ms matches={r['matches']:3d} pos={r['pos']}")
+
+ r = test_akaze(screen_gray, thumb_path)
+ if r:
+ print(f" AKAZE: {r['time_ms']:6.1f}ms matches={r['matches']:3d} pos={r['pos']}")
+
+ # ── Résumé ──
+ print(f"\n{'='*70}")
+ print("RÉSUMÉ")
+ print("="*70)
+ print("""
+Pipeline recommandé (du plus rapide au plus lent) :
+ 1. Template matching classique ~20-50ms (score > 0.75 = direct)
+ 2. Template multi-scale ~80-150ms (robuste aux changements de taille)
+ 3. OCR (docTR) ~500-1000ms (texte uniquement)
+ 4. Static fallback ~0ms (coordonnées d'origine)
+
+Note : les feature matchers (ORB/AKAZE) ne sont pas adaptés aux petites
+ancres UI (< 200x200px) — trop peu de keypoints distinctifs.
+""")
+
+
+if __name__ == '__main__':
+ main()
diff --git a/tools/start_grounding_server.sh b/tools/start_grounding_server.sh
new file mode 100755
index 000000000..8db94d742
--- /dev/null
+++ b/tools/start_grounding_server.sh
@@ -0,0 +1,39 @@
+#!/bin/bash
+# Lancement du serveur de grounding UI-TARS (port 8200)
+#
+# Le serveur charge UI-TARS-1.5-7B en 4-bit NF4 dans son propre process
+# Python avec un contexte CUDA propre. Le backend Flask VWB et la boucle
+# ORA appellent ce serveur en HTTP.
+#
+# Usage :
+# ./tools/start_grounding_server.sh # premier plan
+# ./tools/start_grounding_server.sh --bg # arriere-plan (log dans /tmp)
+
+set -e
+
+cd /home/dom/ai/rpa_vision_v3
+
+VENV=".venv/bin/python3"
+LOG="/tmp/grounding_server.log"
+
+if [ ! -f "$VENV" ]; then
+ echo "ERREUR: venv non trouve a $VENV"
+ exit 1
+fi
+
+echo "=== Serveur de Grounding UI-TARS ==="
+echo "Port: 8200"
+echo "Modele: ByteDance-Seed/UI-TARS-1.5-7B (4-bit NF4)"
+echo ""
+
+if [ "$1" = "--bg" ]; then
+ echo "Lancement en arriere-plan (logs dans $LOG)"
+ nohup $VENV -m core.grounding.server > "$LOG" 2>&1 &
+ PID=$!
+ echo "PID: $PID"
+ echo "$PID" > /tmp/grounding_server.pid
+ echo "Verifier: curl http://localhost:8200/health"
+ echo "Logs: tail -f $LOG"
+else
+ $VENV -m core.grounding.server
+fi
diff --git a/visual_workflow_builder/backend/api_v3/execute.py b/visual_workflow_builder/backend/api_v3/execute.py
index 93d922a07..57167a877 100644
--- a/visual_workflow_builder/backend/api_v3/execute.py
+++ b/visual_workflow_builder/backend/api_v3/execute.py
@@ -1431,7 +1431,7 @@ def run_workflow_verified(execution_id: str, workflow_id: str, app):
from core.execution.observe_reason_act import ORALoop
ora = ORALoop(
- max_retries=2, max_steps=50, verify_level='auto',
+ max_retries=2, max_steps=50, verify_level='none',
should_continue=lambda: not _execution_state.get('should_stop', False)
)
ora._variables = _execution_state.get('variables', {})