feat(grounding): Phase 3 — ThinkArbiter + SignatureStore

ThinkArbiter (core/grounding/think_arbiter.py) :
- Client HTTP vers le serveur UI-TARS (port 8200)
- Appelé uniquement si SmartMatcher score < 0.60
- Vérifie la disponibilité du serveur avant appel
- Validé : Demo trouvé à (1479, 183) en 3.6s

SignatureStore (core/grounding/element_signature.py) :
- Stockage SQLite des signatures d'éléments UI apprises
- record_success() enrichit la signature (texte, type, position, voisins)
- record_failure() incrémente le compteur d'échecs
- lookup() avec fallback (contexte exact → toutes variantes)
- Validé : 3 succès → conf_moy=0.917, voisins enrichis

Modules standalone — aucun impact sur le système existant.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-25 20:44:12 +02:00
parent ea36bba5cc
commit e4a48e78bf
2 changed files with 356 additions and 0 deletions

View File

@@ -0,0 +1,239 @@
"""
core/grounding/element_signature.py — Signatures d'éléments UI apprises
Chaque élément cliqué avec succès enrichit sa signature :
- texte OCR, type, position relative, voisins contextuels
- nombre de succès/échecs, confiance moyenne
- variantes observées (résolutions, positions)
Les signatures sont stockées en SQLite pour un lookup rapide.
Pattern identique à TargetMemoryStore (validé en prod).
Utilisation :
from core.grounding.element_signature import SignatureStore
store = SignatureStore()
# Après un clic réussi
store.record_success("btn_valider", "notepad_1920x1080", element, confidence=0.92)
# Au replay
sig = store.lookup("btn_valider", "notepad_1920x1080")
if sig:
print(f"Signature connue : {sig['text']} position={sig['relative_position']}")
"""
from __future__ import annotations
import hashlib
import json
import os
import sqlite3
import threading
import time
from typing import Any, Dict, List, Optional
from core.grounding.fast_types import DetectedUIElement
# Chemin par défaut de la DB
_DEFAULT_DB = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
"data", "learning", "element_signatures.db",
)
class SignatureStore:
"""Stockage SQLite des signatures d'éléments UI appris."""
def __init__(self, db_path: str = _DEFAULT_DB):
self.db_path = db_path
self._lock = threading.Lock()
self._ensure_db()
def _ensure_db(self):
"""Crée la DB et la table si nécessaire."""
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS signatures (
target_key TEXT NOT NULL,
screen_context TEXT NOT NULL,
text TEXT DEFAULT '',
element_type TEXT DEFAULT 'element',
relative_position TEXT DEFAULT '',
neighbors TEXT DEFAULT '[]',
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
avg_confidence REAL DEFAULT 0.0,
last_seen TEXT DEFAULT '',
variants TEXT DEFAULT '[]',
PRIMARY KEY (target_key, screen_context)
)
""")
conn.execute("""
CREATE INDEX IF NOT EXISTS idx_target_key
ON signatures(target_key)
""")
# ------------------------------------------------------------------
# Lookup
# ------------------------------------------------------------------
def lookup(self, target_key: str, screen_context: str = "") -> Optional[Dict[str, Any]]:
"""Cherche une signature connue.
Args:
target_key: Clé unique de la cible (hash du texte + description).
screen_context: Contexte d'écran (hash titre fenêtre + résolution).
Returns:
Dict avec les champs de la signature, ou None.
"""
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row
# Chercher avec le contexte exact d'abord
row = conn.execute(
"SELECT * FROM signatures WHERE target_key = ? AND screen_context = ?",
(target_key, screen_context),
).fetchone()
# Fallback : chercher sans contexte (toutes les variantes)
if row is None and screen_context:
row = conn.execute(
"SELECT * FROM signatures WHERE target_key = ? ORDER BY success_count DESC LIMIT 1",
(target_key,),
).fetchone()
if row is None:
return None
return {
"target_key": row["target_key"],
"screen_context": row["screen_context"],
"text": row["text"],
"element_type": row["element_type"],
"relative_position": row["relative_position"],
"neighbors": json.loads(row["neighbors"]),
"success_count": row["success_count"],
"fail_count": row["fail_count"],
"avg_confidence": row["avg_confidence"],
"last_seen": row["last_seen"],
"variants": json.loads(row["variants"]),
}
# ------------------------------------------------------------------
# Enregistrement
# ------------------------------------------------------------------
def record_success(
self,
target_key: str,
screen_context: str,
element: DetectedUIElement,
confidence: float,
):
"""Enregistre un succès — crée ou enrichit la signature."""
with self._lock:
existing = self.lookup(target_key, screen_context)
now = time.strftime("%Y-%m-%dT%H:%M:%S")
if existing:
# Enrichir la signature existante
n = existing["success_count"]
new_avg = (existing["avg_confidence"] * n + confidence) / (n + 1)
# Ajouter la variante si position différente
variants = existing["variants"]
variant = {
"position": element.relative_position,
"center": list(element.center),
"confidence": confidence,
"timestamp": now,
}
variants.append(variant)
# Garder les 20 dernières variantes max
variants = variants[-20:]
# Mettre à jour les voisins (union)
neighbors = list(set(existing["neighbors"] + element.neighbors))[:10]
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE signatures SET
success_count = success_count + 1,
avg_confidence = ?,
last_seen = ?,
neighbors = ?,
variants = ?,
relative_position = ?
WHERE target_key = ? AND screen_context = ?
""", (
new_avg, now,
json.dumps(neighbors),
json.dumps(variants),
element.relative_position,
target_key, screen_context,
))
else:
# Créer une nouvelle signature
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
INSERT INTO signatures
(target_key, screen_context, text, element_type, relative_position,
neighbors, success_count, fail_count, avg_confidence, last_seen, variants)
VALUES (?, ?, ?, ?, ?, ?, 1, 0, ?, ?, ?)
""", (
target_key, screen_context,
element.ocr_text,
element.element_type,
element.relative_position,
json.dumps(element.neighbors[:10]),
confidence, now,
json.dumps([{
"position": element.relative_position,
"center": list(element.center),
"confidence": confidence,
"timestamp": now,
}]),
))
print(f"📝 [Signature] '{target_key}' {'enrichie' if existing else 'créée'} "
f"(conf={confidence:.2f}, ctx='{screen_context[:30]}')")
def record_failure(self, target_key: str, screen_context: str):
"""Enregistre un échec pour une signature."""
with self._lock:
with sqlite3.connect(self.db_path) as conn:
conn.execute("""
UPDATE signatures SET fail_count = fail_count + 1, last_seen = ?
WHERE target_key = ? AND screen_context = ?
""", (time.strftime("%Y-%m-%dT%H:%M:%S"), target_key, screen_context))
# ------------------------------------------------------------------
# Utilitaires
# ------------------------------------------------------------------
@staticmethod
def make_target_key(text: str, description: str = "") -> str:
"""Génère une clé unique pour une cible."""
raw = f"{text.lower().strip()}|{description.lower().strip()}"
return hashlib.md5(raw.encode()).hexdigest()[:16]
@staticmethod
def make_screen_context(window_title: str, resolution: tuple = (0, 0)) -> str:
"""Génère un contexte d'écran."""
raw = f"{window_title.lower().strip()}|{resolution[0]}x{resolution[1]}"
return hashlib.md5(raw.encode()).hexdigest()[:12]
def get_stats(self) -> Dict[str, Any]:
"""Statistiques de la base de signatures."""
with sqlite3.connect(self.db_path) as conn:
total = conn.execute("SELECT COUNT(*) FROM signatures").fetchone()[0]
reliable = conn.execute(
"SELECT COUNT(*) FROM signatures WHERE success_count >= 3 AND fail_count = 0"
).fetchone()[0]
return {
"total_signatures": total,
"reliable": reliable,
"db_path": self.db_path,
}

View File

@@ -0,0 +1,117 @@
"""
core/grounding/think_arbiter.py — Layer THINK : VLM arbitre (UI-TARS)
Appelé UNIQUEMENT quand le SmartMatcher n'a pas assez confiance :
- Score < 0.60 : aucun candidat clair → UI-TARS cherche dans tout l'écran
- Score 0.60-0.90 : candidats ambigus → UI-TARS confirme/infirme
Le VLM tourne dans un process séparé (serveur FastAPI port 8200).
Ce module est un CLIENT HTTP — il ne charge aucun modèle en VRAM.
Utilisation :
from core.grounding.think_arbiter import ThinkArbiter
arbiter = ThinkArbiter()
if arbiter.available:
result = arbiter.arbitrate(target, candidates, screenshot)
"""
from __future__ import annotations
import base64
import io
import time
from typing import Any, Dict, List, Optional
from core.grounding.fast_types import DetectedUIElement, LocateResult, MatchCandidate
from core.grounding.target import GroundingTarget
class ThinkArbiter:
"""Arbitre VLM pour les cas ambigus — appelle le serveur UI-TARS."""
DEFAULT_URL = "http://localhost:8200"
def __init__(self, server_url: str = DEFAULT_URL, timeout: int = 30):
self.server_url = server_url
self.timeout = timeout
@property
def available(self) -> bool:
"""Vérifie si le serveur de grounding est accessible."""
try:
import requests
resp = requests.get(f"{self.server_url}/health", timeout=3)
return resp.status_code == 200 and resp.json().get("model_loaded", False)
except Exception:
return False
def arbitrate(
self,
target: GroundingTarget,
candidates: List[MatchCandidate],
screenshot_pil: Optional[Any] = None,
) -> Optional[LocateResult]:
"""Demande au VLM de trancher.
Args:
target: Ce qu'on cherche.
candidates: Candidats SMART (peut être vide).
screenshot_pil: Screenshot PIL. Si None, le serveur capture lui-même.
Returns:
LocateResult ou None si le VLM ne trouve pas non plus.
"""
t0 = time.time()
try:
import requests
# Construire le payload
payload: Dict[str, Any] = {
"target_text": target.text or "",
"target_description": target.description or "",
}
# Envoyer l'image si disponible
if screenshot_pil is not None:
buf = io.BytesIO()
screenshot_pil.save(buf, format="JPEG", quality=85)
payload["image_b64"] = base64.b64encode(buf.getvalue()).decode("utf-8")
# Appel au serveur
resp = requests.post(
f"{self.server_url}/ground",
json=payload,
timeout=self.timeout,
)
dt = (time.time() - t0) * 1000
if resp.status_code != 200:
print(f"🤔 [THINK] Serveur HTTP {resp.status_code}")
return None
data = resp.json()
if data.get("x") is None:
print(f"🤔 [THINK] VLM n'a pas trouvé '{target.text}' ({dt:.0f}ms)")
return None
result = LocateResult(
x=data["x"],
y=data["y"],
confidence=data.get("confidence", 0.85),
method="think_vlm",
time_ms=dt,
tier="think",
candidates_count=len(candidates),
)
print(f"🤔 [THINK] VLM → ({result.x}, {result.y}) conf={result.confidence:.2f} ({dt:.0f}ms)")
return result
except Exception as ex:
dt = (time.time() - t0) * 1000
print(f"⚠️ [THINK] Erreur: {ex} ({dt:.0f}ms)")
return None