Serveur de grounding (server.py) : - InfiGUI-G1-3B au lieu de UI-TARS-1.5-7B - VRAM : 2.25 GB au lieu de 8.4 GB (6.6 GB libres) - Prompt officiel InfiGUI (system <think> + user point_2d JSON) - max_new_tokens=512, parsing JSON point_2d - 4/4 éléments trouvés : Demo 5px, Chrome 98px, Corbeille 15px, Search 66px - Fallback UI-TARS via env GROUNDING_MODEL=ByteDance-Seed/UI-TARS-1.5-7B EasyOCR : retour sur GPU (assez de VRAM maintenant) → 192ms au lieu de 2.5s Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
426 lines
14 KiB
Python
426 lines
14 KiB
Python
"""
|
|
core/grounding/server.py — Serveur FastAPI de grounding visuel (port 8200)
|
|
|
|
Charge UI-TARS-1.5-7B en 4-bit NF4 dans son propre process Python avec son
|
|
propre contexte CUDA. Le backend Flask VWB (port 5002) et la boucle ORA
|
|
appellent ce serveur en HTTP au lieu de charger le modele in-process.
|
|
|
|
Lancement :
|
|
.venv/bin/python3 -m core.grounding.server
|
|
|
|
Endpoints :
|
|
GET /health — verifie que le modele est charge
|
|
POST /ground — localise un element UI sur un screenshot
|
|
"""
|
|
|
|
import base64
|
|
import gc
|
|
import io
|
|
import math
|
|
import os
|
|
import re
|
|
import time
|
|
from typing import Optional
|
|
|
|
import torch
|
|
from fastapi import FastAPI, HTTPException
|
|
from pydantic import BaseModel
|
|
import uvicorn
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
PORT = int(os.environ.get("GROUNDING_PORT", 8200))
|
|
MODEL_ID = os.environ.get("GROUNDING_MODEL", "InfiX-ai/InfiGUI-G1-3B")
|
|
MIN_PIXELS = 100 * 28 * 28
|
|
MAX_PIXELS = 5600 * 28 * 28 # InfiGUI recommande 5600*28*28
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Smart resize — identique a /tmp/test_uitars.py
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _smart_resize(height: int, width: int, factor: int = 28,
|
|
min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS):
|
|
"""UI-TARS smart resize (memes defaults que le test valide)."""
|
|
h_bar = max(factor, round(height / factor) * factor)
|
|
w_bar = max(factor, round(width / factor) * factor)
|
|
if h_bar * w_bar > max_pixels:
|
|
beta = math.sqrt((height * width) / max_pixels)
|
|
h_bar = math.floor(height / beta / factor) * factor
|
|
w_bar = math.floor(width / beta / factor) * factor
|
|
elif h_bar * w_bar < min_pixels:
|
|
beta = math.sqrt(min_pixels / (height * width))
|
|
h_bar = math.ceil(height * beta / factor) * factor
|
|
w_bar = math.ceil(width * beta / factor) * factor
|
|
return h_bar, w_bar
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Prompts — InfiGUI-G1-3B (format officiel de la doc HuggingFace)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_SYSTEM_PROMPT = """You FIRST think about the reasoning process as an internal monologue and then provide the final answer.
|
|
The reasoning process MUST BE enclosed within <think> </think> tags."""
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Modele singleton
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_model = None
|
|
_processor = None
|
|
_model_loaded = False
|
|
|
|
|
|
def _evict_ollama_models():
|
|
"""Libere les modeles Ollama de la VRAM avant de charger UI-TARS."""
|
|
try:
|
|
import requests
|
|
try:
|
|
ps_resp = requests.get('http://localhost:11434/api/ps', timeout=3)
|
|
if ps_resp.status_code == 200:
|
|
loaded = ps_resp.json().get('models', [])
|
|
model_names = [m.get('name', '') for m in loaded if m.get('name')]
|
|
else:
|
|
model_names = []
|
|
except Exception:
|
|
model_names = []
|
|
|
|
if not model_names:
|
|
print("[grounding-server] Aucun modele Ollama en VRAM")
|
|
return
|
|
|
|
for model_name in model_names:
|
|
try:
|
|
requests.post(
|
|
'http://localhost:11434/api/generate',
|
|
json={'model': model_name, 'keep_alive': '0'},
|
|
timeout=5,
|
|
)
|
|
print(f"[grounding-server] Ollama: eviction de '{model_name}'")
|
|
except Exception:
|
|
pass
|
|
|
|
time.sleep(1.0)
|
|
print("[grounding-server] Modeles Ollama liberes")
|
|
except ImportError:
|
|
print("[grounding-server] requests non dispo, skip eviction Ollama")
|
|
|
|
|
|
def _load_model():
|
|
"""Charge le modele de grounding en 4-bit NF4."""
|
|
global _model, _processor, _model_loaded
|
|
|
|
if _model_loaded:
|
|
return
|
|
|
|
print("=" * 60)
|
|
print(f"[grounding-server] Chargement de {MODEL_ID}")
|
|
print("=" * 60)
|
|
|
|
if not torch.cuda.is_available():
|
|
raise RuntimeError("CUDA non disponible — le serveur de grounding necessite un GPU")
|
|
|
|
# Liberer la VRAM Ollama
|
|
_evict_ollama_models()
|
|
|
|
torch.cuda.empty_cache()
|
|
gc.collect()
|
|
|
|
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig
|
|
|
|
bnb_config = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_compute_dtype=torch.bfloat16,
|
|
bnb_4bit_use_double_quant=True,
|
|
)
|
|
|
|
t0 = time.time()
|
|
_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
|
|
MODEL_ID,
|
|
quantization_config=bnb_config,
|
|
device_map="auto",
|
|
)
|
|
_model.eval()
|
|
|
|
_processor = AutoProcessor.from_pretrained(
|
|
MODEL_ID,
|
|
min_pixels=MIN_PIXELS,
|
|
max_pixels=MAX_PIXELS,
|
|
padding_side="left",
|
|
)
|
|
|
|
_model_loaded = True
|
|
load_time = time.time() - t0
|
|
alloc = torch.cuda.memory_allocated() / 1024**3
|
|
peak = torch.cuda.max_memory_allocated() / 1024**3
|
|
print(f"[grounding-server] Modele charge en {load_time:.1f}s | "
|
|
f"VRAM: {alloc:.2f} GB (peak: {peak:.2f} GB)")
|
|
|
|
|
|
def _capture_screen():
|
|
"""Capture l'ecran complet via mss. Retourne PIL Image ou None."""
|
|
try:
|
|
import mss as mss_lib
|
|
from PIL import Image
|
|
with mss_lib.mss() as sct:
|
|
mon = sct.monitors[0]
|
|
grab = sct.grab(mon)
|
|
return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
|
|
except Exception as e:
|
|
print(f"[grounding-server] Erreur capture ecran: {e}")
|
|
return None
|
|
|
|
|
|
def _parse_coordinates(raw: str, orig_w: int, orig_h: int,
|
|
resized_w: int, resized_h: int):
|
|
"""Parse les coordonnees du modele — identique a /tmp/test_uitars.py.
|
|
|
|
Retourne (px, py, method_detail, confidence) ou None.
|
|
"""
|
|
cx, cy = None, None
|
|
|
|
# Format 1: <point>x y</point>
|
|
pm = re.search(r'<point>\s*(\d+)\s+(\d+)\s*</point>', raw)
|
|
if pm:
|
|
cx, cy = int(pm.group(1)), int(pm.group(2))
|
|
|
|
# Format 2: start_box='(x, y)'
|
|
if cx is None:
|
|
bm = re.search(r"start_box=\s*['\"]?\((\d+)\s*,\s*(\d+)\)", raw)
|
|
if bm:
|
|
cx, cy = int(bm.group(1)), int(bm.group(2))
|
|
|
|
# Format 3: fallback x, y
|
|
if cx is None:
|
|
fm = re.search(r'(\d+)\s*,\s*(\d+)', raw)
|
|
if fm:
|
|
cx, cy = int(fm.group(1)), int(fm.group(2))
|
|
|
|
if cx is None or cy is None:
|
|
return None
|
|
|
|
# Conversion : tester les 2 interpretations, garder la meilleure
|
|
# Methode A : coordonnees dans l'espace de l'image resizee
|
|
px_r = int(cx / resized_w * orig_w)
|
|
py_r = int(cy / resized_h * orig_h)
|
|
delta_r = ((px_r - orig_w / 2) ** 2 + (py_r - orig_h / 2) ** 2) ** 0.5
|
|
|
|
# Methode B : coordonnees 0-1000
|
|
px_1k = int(cx / 1000 * orig_w)
|
|
py_1k = int(cy / 1000 * orig_h)
|
|
delta_1k = ((px_1k - orig_w / 2) ** 2 + (py_1k - orig_h / 2) ** 2) ** 0.5
|
|
|
|
# Heuristique du script valide : si coords dans les limites du resize,
|
|
# les deux sont possibles. UI-TARS utilise l'espace resize en natif.
|
|
if cx <= resized_w and cy <= resized_h:
|
|
in_screen_r = (0 <= px_r <= orig_w and 0 <= py_r <= orig_h)
|
|
in_screen_1k = (0 <= px_1k <= orig_w and 0 <= py_1k <= orig_h)
|
|
|
|
if in_screen_r and in_screen_1k:
|
|
px, py = px_r, py_r
|
|
method_detail = "resized"
|
|
elif in_screen_r:
|
|
px, py = px_r, py_r
|
|
method_detail = "resized"
|
|
else:
|
|
px, py = px_1k, py_1k
|
|
method_detail = "0-1000"
|
|
else:
|
|
px, py = px_1k, py_1k
|
|
method_detail = "0-1000"
|
|
|
|
confidence = 0.85 if ("start_box" in raw or "<point>" in raw) else 0.70
|
|
|
|
print(f"[grounding-server] model=({cx},{cy}) -> pixel=({px},{py}) "
|
|
f"[{method_detail}] resized={resized_w}x{resized_h} orig={orig_w}x{orig_h}")
|
|
|
|
return px, py, method_detail, confidence
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# FastAPI app
|
|
# ---------------------------------------------------------------------------
|
|
|
|
app = FastAPI(title="RPA Vision Grounding Server", version="1.0.0")
|
|
|
|
|
|
class GroundRequest(BaseModel):
|
|
target_text: str = ""
|
|
target_description: str = ""
|
|
image_b64: str = ""
|
|
|
|
|
|
class GroundResponse(BaseModel):
|
|
x: Optional[int] = None
|
|
y: Optional[int] = None
|
|
method: str = "ui_tars"
|
|
confidence: float = 0.85
|
|
time_ms: float = 0.0
|
|
raw_output: str = ""
|
|
|
|
|
|
@app.get("/health")
|
|
def health():
|
|
return {
|
|
"status": "ok" if _model_loaded else "loading",
|
|
"model": MODEL_ID,
|
|
"model_loaded": _model_loaded,
|
|
"cuda_available": torch.cuda.is_available(),
|
|
"vram_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2) if torch.cuda.is_available() else 0,
|
|
}
|
|
|
|
|
|
@app.post("/ground", response_model=GroundResponse)
|
|
def ground(req: GroundRequest):
|
|
if not _model_loaded:
|
|
raise HTTPException(status_code=503, detail="Modele pas encore charge")
|
|
|
|
from PIL import Image
|
|
from qwen_vl_utils import process_vision_info
|
|
|
|
# Construire la description de la cible
|
|
parts = []
|
|
if req.target_text:
|
|
parts.append(req.target_text)
|
|
if req.target_description:
|
|
parts.append(req.target_description)
|
|
if not parts:
|
|
raise HTTPException(status_code=400, detail="target_text ou target_description requis")
|
|
|
|
target_label = ' — '.join(parts)
|
|
|
|
# Obtenir l'image (fournie en b64 ou capture ecran)
|
|
if req.image_b64:
|
|
try:
|
|
raw_b64 = req.image_b64.split(',')[1] if ',' in req.image_b64 else req.image_b64
|
|
img_data = base64.b64decode(raw_b64)
|
|
screen_pil = Image.open(io.BytesIO(img_data)).convert('RGB')
|
|
except Exception as e:
|
|
raise HTTPException(status_code=400, detail=f"Erreur decodage image: {e}")
|
|
else:
|
|
screen_pil = _capture_screen()
|
|
if screen_pil is None:
|
|
raise HTTPException(status_code=500, detail="Capture ecran echouee")
|
|
|
|
W, H = screen_pil.size
|
|
rH, rW = _smart_resize(H, W, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS)
|
|
|
|
try:
|
|
import json as _json
|
|
|
|
# Prompt officiel InfiGUI-G1-3B (doc HuggingFace)
|
|
user_text = (
|
|
f'The screen\'s resolution is {rW}x{rH}.\n'
|
|
f'Locate the UI element(s) for "{target_label}", '
|
|
f'output the coordinates using JSON format: '
|
|
f'[{{"point_2d": [x, y]}}, ...]'
|
|
)
|
|
|
|
messages = [
|
|
{"role": "system", "content": _SYSTEM_PROMPT},
|
|
{"role": "user", "content": [
|
|
{"type": "image", "image": screen_pil},
|
|
{"type": "text", "text": user_text},
|
|
]},
|
|
]
|
|
|
|
text = _processor.apply_chat_template(
|
|
messages, tokenize=False, add_generation_prompt=True
|
|
)
|
|
image_inputs, video_inputs = process_vision_info(messages)
|
|
inputs = _processor(
|
|
text=[text],
|
|
images=image_inputs,
|
|
videos=video_inputs,
|
|
padding=True,
|
|
return_tensors="pt",
|
|
).to(_model.device)
|
|
|
|
# Inference
|
|
t0 = time.time()
|
|
with torch.no_grad():
|
|
gen = _model.generate(**inputs, max_new_tokens=512)
|
|
infer_ms = (time.time() - t0) * 1000
|
|
|
|
# Decoder
|
|
trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)]
|
|
raw = _processor.batch_decode(
|
|
trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
|
|
)[0].strip()
|
|
|
|
print(f"[grounding-server] '{target_label}' -> raw='{raw[:150]}' ({infer_ms:.0f}ms)")
|
|
|
|
# Parser le JSON InfiGUI : split sur </think>, extraire point_2d
|
|
px, py = None, None
|
|
json_part = raw.split("</think>")[-1] if "</think>" in raw else raw
|
|
json_part = json_part.replace("```json", "").replace("```", "").strip()
|
|
|
|
try:
|
|
data = _json.loads(json_part)
|
|
if isinstance(data, list) and len(data) > 0:
|
|
pt = data[0].get("point_2d", [])
|
|
if len(pt) >= 2:
|
|
# Coordonnées en pixels resizés → convertir en pixels originaux
|
|
px = int(pt[0] * W / rW)
|
|
py = int(pt[1] * H / rH)
|
|
except _json.JSONDecodeError:
|
|
# Fallback regex
|
|
m = re.search(r'"point_2d"\s*:\s*\[(\d+),\s*(\d+)\]', raw)
|
|
if m:
|
|
px = int(int(m.group(1)) * W / rW)
|
|
py = int(int(m.group(2)) * H / rH)
|
|
|
|
if px is None:
|
|
# Détection réponses négatives
|
|
_raw_lower = raw.lower()
|
|
for _neg in ["don't see", "cannot find", "not visible", "not found",
|
|
"unable to find", "unable to locate", "does not appear"]:
|
|
if _neg in _raw_lower:
|
|
print(f"[grounding-server] NÉGATIF: '{_neg}'")
|
|
return GroundResponse(x=None, y=None, method="infigui",
|
|
confidence=0.0, time_ms=round(infer_ms, 1),
|
|
raw_output=raw[:300])
|
|
|
|
print(f"[grounding-server] Coordonnées non parsées: {json_part[:100]}")
|
|
return GroundResponse(x=None, y=None, method="infigui",
|
|
confidence=0.0, time_ms=round(infer_ms, 1),
|
|
raw_output=raw[:300])
|
|
|
|
confidence = 0.90
|
|
print(f"[grounding-server] Résultat: ({px}, {py}) conf={confidence:.2f} ({infer_ms:.0f}ms)")
|
|
|
|
return GroundResponse(
|
|
x=px, y=py, method="infigui",
|
|
confidence=confidence, time_ms=round(infer_ms, 1),
|
|
raw_output=raw[:300],
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"[grounding-server] ERREUR: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Entrypoint
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Charge le modele au demarrage du serveur."""
|
|
print(f"[grounding-server] Demarrage sur port {PORT}...")
|
|
_load_model()
|
|
print(f"[grounding-server] Pret a recevoir des requetes sur http://localhost:{PORT}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
uvicorn.run(
|
|
"core.grounding.server:app",
|
|
host="0.0.0.0",
|
|
port=PORT,
|
|
log_level="info",
|
|
workers=1, # 1 seul worker (1 seul GPU)
|
|
)
|