From e4a48e78bf54705fc7ae7096b93f0bd832cca2b7 Mon Sep 17 00:00:00 2001 From: Dom Date: Sat, 25 Apr 2026 20:44:12 +0200 Subject: [PATCH] =?UTF-8?q?feat(grounding):=20Phase=203=20=E2=80=94=20Thin?= =?UTF-8?q?kArbiter=20+=20SignatureStore?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ThinkArbiter (core/grounding/think_arbiter.py) : - Client HTTP vers le serveur UI-TARS (port 8200) - Appelé uniquement si SmartMatcher score < 0.60 - Vérifie la disponibilité du serveur avant appel - Validé : Demo trouvé à (1479, 183) en 3.6s SignatureStore (core/grounding/element_signature.py) : - Stockage SQLite des signatures d'éléments UI apprises - record_success() enrichit la signature (texte, type, position, voisins) - record_failure() incrémente le compteur d'échecs - lookup() avec fallback (contexte exact → toutes variantes) - Validé : 3 succès → conf_moy=0.917, voisins enrichis Modules standalone — aucun impact sur le système existant. Co-Authored-By: Claude Opus 4.6 (1M context) --- core/grounding/element_signature.py | 239 ++++++++++++++++++++++++++++ core/grounding/think_arbiter.py | 117 ++++++++++++++ 2 files changed, 356 insertions(+) create mode 100644 core/grounding/element_signature.py create mode 100644 core/grounding/think_arbiter.py diff --git a/core/grounding/element_signature.py b/core/grounding/element_signature.py new file mode 100644 index 000000000..c35d8a96e --- /dev/null +++ b/core/grounding/element_signature.py @@ -0,0 +1,239 @@ +""" +core/grounding/element_signature.py — Signatures d'éléments UI apprises + +Chaque élément cliqué avec succès enrichit sa signature : +- texte OCR, type, position relative, voisins contextuels +- nombre de succès/échecs, confiance moyenne +- variantes observées (résolutions, positions) + +Les signatures sont stockées en SQLite pour un lookup rapide. +Pattern identique à TargetMemoryStore (validé en prod). + +Utilisation : + from core.grounding.element_signature import SignatureStore + + store = SignatureStore() + + # Après un clic réussi + store.record_success("btn_valider", "notepad_1920x1080", element, confidence=0.92) + + # Au replay + sig = store.lookup("btn_valider", "notepad_1920x1080") + if sig: + print(f"Signature connue : {sig['text']} position={sig['relative_position']}") +""" + +from __future__ import annotations + +import hashlib +import json +import os +import sqlite3 +import threading +import time +from typing import Any, Dict, List, Optional + +from core.grounding.fast_types import DetectedUIElement + +# Chemin par défaut de la DB +_DEFAULT_DB = os.path.join( + os.path.dirname(os.path.dirname(os.path.dirname(__file__))), + "data", "learning", "element_signatures.db", +) + + +class SignatureStore: + """Stockage SQLite des signatures d'éléments UI appris.""" + + def __init__(self, db_path: str = _DEFAULT_DB): + self.db_path = db_path + self._lock = threading.Lock() + self._ensure_db() + + def _ensure_db(self): + """Crée la DB et la table si nécessaire.""" + os.makedirs(os.path.dirname(self.db_path), exist_ok=True) + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + CREATE TABLE IF NOT EXISTS signatures ( + target_key TEXT NOT NULL, + screen_context TEXT NOT NULL, + text TEXT DEFAULT '', + element_type TEXT DEFAULT 'element', + relative_position TEXT DEFAULT '', + neighbors TEXT DEFAULT '[]', + success_count INTEGER DEFAULT 0, + fail_count INTEGER DEFAULT 0, + avg_confidence REAL DEFAULT 0.0, + last_seen TEXT DEFAULT '', + variants TEXT DEFAULT '[]', + PRIMARY KEY (target_key, screen_context) + ) + """) + conn.execute(""" + CREATE INDEX IF NOT EXISTS idx_target_key + ON signatures(target_key) + """) + + # ------------------------------------------------------------------ + # Lookup + # ------------------------------------------------------------------ + + def lookup(self, target_key: str, screen_context: str = "") -> Optional[Dict[str, Any]]: + """Cherche une signature connue. + + Args: + target_key: Clé unique de la cible (hash du texte + description). + screen_context: Contexte d'écran (hash titre fenêtre + résolution). + + Returns: + Dict avec les champs de la signature, ou None. + """ + with sqlite3.connect(self.db_path) as conn: + conn.row_factory = sqlite3.Row + # Chercher avec le contexte exact d'abord + row = conn.execute( + "SELECT * FROM signatures WHERE target_key = ? AND screen_context = ?", + (target_key, screen_context), + ).fetchone() + + # Fallback : chercher sans contexte (toutes les variantes) + if row is None and screen_context: + row = conn.execute( + "SELECT * FROM signatures WHERE target_key = ? ORDER BY success_count DESC LIMIT 1", + (target_key,), + ).fetchone() + + if row is None: + return None + + return { + "target_key": row["target_key"], + "screen_context": row["screen_context"], + "text": row["text"], + "element_type": row["element_type"], + "relative_position": row["relative_position"], + "neighbors": json.loads(row["neighbors"]), + "success_count": row["success_count"], + "fail_count": row["fail_count"], + "avg_confidence": row["avg_confidence"], + "last_seen": row["last_seen"], + "variants": json.loads(row["variants"]), + } + + # ------------------------------------------------------------------ + # Enregistrement + # ------------------------------------------------------------------ + + def record_success( + self, + target_key: str, + screen_context: str, + element: DetectedUIElement, + confidence: float, + ): + """Enregistre un succès — crée ou enrichit la signature.""" + with self._lock: + existing = self.lookup(target_key, screen_context) + now = time.strftime("%Y-%m-%dT%H:%M:%S") + + if existing: + # Enrichir la signature existante + n = existing["success_count"] + new_avg = (existing["avg_confidence"] * n + confidence) / (n + 1) + + # Ajouter la variante si position différente + variants = existing["variants"] + variant = { + "position": element.relative_position, + "center": list(element.center), + "confidence": confidence, + "timestamp": now, + } + variants.append(variant) + # Garder les 20 dernières variantes max + variants = variants[-20:] + + # Mettre à jour les voisins (union) + neighbors = list(set(existing["neighbors"] + element.neighbors))[:10] + + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + UPDATE signatures SET + success_count = success_count + 1, + avg_confidence = ?, + last_seen = ?, + neighbors = ?, + variants = ?, + relative_position = ? + WHERE target_key = ? AND screen_context = ? + """, ( + new_avg, now, + json.dumps(neighbors), + json.dumps(variants), + element.relative_position, + target_key, screen_context, + )) + else: + # Créer une nouvelle signature + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + INSERT INTO signatures + (target_key, screen_context, text, element_type, relative_position, + neighbors, success_count, fail_count, avg_confidence, last_seen, variants) + VALUES (?, ?, ?, ?, ?, ?, 1, 0, ?, ?, ?) + """, ( + target_key, screen_context, + element.ocr_text, + element.element_type, + element.relative_position, + json.dumps(element.neighbors[:10]), + confidence, now, + json.dumps([{ + "position": element.relative_position, + "center": list(element.center), + "confidence": confidence, + "timestamp": now, + }]), + )) + + print(f"📝 [Signature] '{target_key}' {'enrichie' if existing else 'créée'} " + f"(conf={confidence:.2f}, ctx='{screen_context[:30]}')") + + def record_failure(self, target_key: str, screen_context: str): + """Enregistre un échec pour une signature.""" + with self._lock: + with sqlite3.connect(self.db_path) as conn: + conn.execute(""" + UPDATE signatures SET fail_count = fail_count + 1, last_seen = ? + WHERE target_key = ? AND screen_context = ? + """, (time.strftime("%Y-%m-%dT%H:%M:%S"), target_key, screen_context)) + + # ------------------------------------------------------------------ + # Utilitaires + # ------------------------------------------------------------------ + + @staticmethod + def make_target_key(text: str, description: str = "") -> str: + """Génère une clé unique pour une cible.""" + raw = f"{text.lower().strip()}|{description.lower().strip()}" + return hashlib.md5(raw.encode()).hexdigest()[:16] + + @staticmethod + def make_screen_context(window_title: str, resolution: tuple = (0, 0)) -> str: + """Génère un contexte d'écran.""" + raw = f"{window_title.lower().strip()}|{resolution[0]}x{resolution[1]}" + return hashlib.md5(raw.encode()).hexdigest()[:12] + + def get_stats(self) -> Dict[str, Any]: + """Statistiques de la base de signatures.""" + with sqlite3.connect(self.db_path) as conn: + total = conn.execute("SELECT COUNT(*) FROM signatures").fetchone()[0] + reliable = conn.execute( + "SELECT COUNT(*) FROM signatures WHERE success_count >= 3 AND fail_count = 0" + ).fetchone()[0] + return { + "total_signatures": total, + "reliable": reliable, + "db_path": self.db_path, + } diff --git a/core/grounding/think_arbiter.py b/core/grounding/think_arbiter.py new file mode 100644 index 000000000..ee47fab95 --- /dev/null +++ b/core/grounding/think_arbiter.py @@ -0,0 +1,117 @@ +""" +core/grounding/think_arbiter.py — Layer THINK : VLM arbitre (UI-TARS) + +Appelé UNIQUEMENT quand le SmartMatcher n'a pas assez confiance : +- Score < 0.60 : aucun candidat clair → UI-TARS cherche dans tout l'écran +- Score 0.60-0.90 : candidats ambigus → UI-TARS confirme/infirme + +Le VLM tourne dans un process séparé (serveur FastAPI port 8200). +Ce module est un CLIENT HTTP — il ne charge aucun modèle en VRAM. + +Utilisation : + from core.grounding.think_arbiter import ThinkArbiter + + arbiter = ThinkArbiter() + if arbiter.available: + result = arbiter.arbitrate(target, candidates, screenshot) +""" + +from __future__ import annotations + +import base64 +import io +import time +from typing import Any, Dict, List, Optional + +from core.grounding.fast_types import DetectedUIElement, LocateResult, MatchCandidate +from core.grounding.target import GroundingTarget + + +class ThinkArbiter: + """Arbitre VLM pour les cas ambigus — appelle le serveur UI-TARS.""" + + DEFAULT_URL = "http://localhost:8200" + + def __init__(self, server_url: str = DEFAULT_URL, timeout: int = 30): + self.server_url = server_url + self.timeout = timeout + + @property + def available(self) -> bool: + """Vérifie si le serveur de grounding est accessible.""" + try: + import requests + resp = requests.get(f"{self.server_url}/health", timeout=3) + return resp.status_code == 200 and resp.json().get("model_loaded", False) + except Exception: + return False + + def arbitrate( + self, + target: GroundingTarget, + candidates: List[MatchCandidate], + screenshot_pil: Optional[Any] = None, + ) -> Optional[LocateResult]: + """Demande au VLM de trancher. + + Args: + target: Ce qu'on cherche. + candidates: Candidats SMART (peut être vide). + screenshot_pil: Screenshot PIL. Si None, le serveur capture lui-même. + + Returns: + LocateResult ou None si le VLM ne trouve pas non plus. + """ + t0 = time.time() + + try: + import requests + + # Construire le payload + payload: Dict[str, Any] = { + "target_text": target.text or "", + "target_description": target.description or "", + } + + # Envoyer l'image si disponible + if screenshot_pil is not None: + buf = io.BytesIO() + screenshot_pil.save(buf, format="JPEG", quality=85) + payload["image_b64"] = base64.b64encode(buf.getvalue()).decode("utf-8") + + # Appel au serveur + resp = requests.post( + f"{self.server_url}/ground", + json=payload, + timeout=self.timeout, + ) + + dt = (time.time() - t0) * 1000 + + if resp.status_code != 200: + print(f"🤔 [THINK] Serveur HTTP {resp.status_code}") + return None + + data = resp.json() + + if data.get("x") is None: + print(f"🤔 [THINK] VLM n'a pas trouvé '{target.text}' ({dt:.0f}ms)") + return None + + result = LocateResult( + x=data["x"], + y=data["y"], + confidence=data.get("confidence", 0.85), + method="think_vlm", + time_ms=dt, + tier="think", + candidates_count=len(candidates), + ) + + print(f"🤔 [THINK] VLM → ({result.x}, {result.y}) conf={result.confidence:.2f} ({dt:.0f}ms)") + return result + + except Exception as ex: + dt = (time.time() - t0) * 1000 + print(f"⚠️ [THINK] Erreur: {ex} ({dt:.0f}ms)") + return None