""" core/grounding/server.py — Serveur FastAPI de grounding visuel (port 8200) Charge UI-TARS-1.5-7B en 4-bit NF4 dans son propre process Python avec son propre contexte CUDA. Le backend Flask VWB (port 5002) et la boucle ORA appellent ce serveur en HTTP au lieu de charger le modele in-process. Lancement : .venv/bin/python3 -m core.grounding.server Endpoints : GET /health — verifie que le modele est charge POST /ground — localise un element UI sur un screenshot """ import base64 import gc import io import math import os import re import time from typing import Optional import torch from fastapi import FastAPI, HTTPException from pydantic import BaseModel import uvicorn # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- PORT = int(os.environ.get("GROUNDING_PORT", 8200)) MODEL_ID = "ByteDance-Seed/UI-TARS-1.5-7B" MIN_PIXELS = 100 * 28 * 28 MAX_PIXELS = 16384 * 28 * 28 # --------------------------------------------------------------------------- # Smart resize — identique a /tmp/test_uitars.py # --------------------------------------------------------------------------- def _smart_resize(height: int, width: int, factor: int = 28, min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS): """UI-TARS smart resize (memes defaults que le test valide).""" h_bar = max(factor, round(height / factor) * factor) w_bar = max(factor, round(width / factor) * factor) if h_bar * w_bar > max_pixels: beta = math.sqrt((height * width) / max_pixels) h_bar = math.floor(height / beta / factor) * factor w_bar = math.floor(width / beta / factor) * factor elif h_bar * w_bar < min_pixels: beta = math.sqrt(min_pixels / (height * width)) h_bar = math.ceil(height * beta / factor) * factor w_bar = math.ceil(width * beta / factor) * factor return h_bar, w_bar # --------------------------------------------------------------------------- # Prompt officiel UI-TARS — identique a /tmp/test_uitars.py # --------------------------------------------------------------------------- _GROUNDING_PROMPT = """You are a GUI agent. You are given a task and your action history, with screenshots. You need to perform the next action to complete the task. ## Output Format Thought: ... Action: ... ## Action Space click(start_box='(x1, y1)') ## User Instruction {instruction}""" # --------------------------------------------------------------------------- # Modele singleton # --------------------------------------------------------------------------- _model = None _processor = None _model_loaded = False def _evict_ollama_models(): """Libere les modeles Ollama de la VRAM avant de charger UI-TARS.""" try: import requests try: ps_resp = requests.get('http://localhost:11434/api/ps', timeout=3) if ps_resp.status_code == 200: loaded = ps_resp.json().get('models', []) model_names = [m.get('name', '') for m in loaded if m.get('name')] else: model_names = [] except Exception: model_names = [] if not model_names: print("[grounding-server] Aucun modele Ollama en VRAM") return for model_name in model_names: try: requests.post( 'http://localhost:11434/api/generate', json={'model': model_name, 'keep_alive': '0'}, timeout=5, ) print(f"[grounding-server] Ollama: eviction de '{model_name}'") except Exception: pass time.sleep(1.0) print("[grounding-server] Modeles Ollama liberes") except ImportError: print("[grounding-server] requests non dispo, skip eviction Ollama") def _load_model(): """Charge UI-TARS-1.5-7B en 4-bit NF4 — code identique a /tmp/test_uitars.py.""" global _model, _processor, _model_loaded if _model_loaded: return print("=" * 60) print(f"[grounding-server] Chargement de {MODEL_ID}") print("=" * 60) if not torch.cuda.is_available(): raise RuntimeError("CUDA non disponible — le serveur de grounding necessite un GPU") # Liberer la VRAM Ollama _evict_ollama_models() torch.cuda.empty_cache() gc.collect() from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig bnb_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True, ) t0 = time.time() _model = Qwen2_5_VLForConditionalGeneration.from_pretrained( MODEL_ID, quantization_config=bnb_config, device_map="auto", ) _model.eval() _processor = AutoProcessor.from_pretrained( MODEL_ID, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS, ) _model_loaded = True load_time = time.time() - t0 alloc = torch.cuda.memory_allocated() / 1024**3 peak = torch.cuda.max_memory_allocated() / 1024**3 print(f"[grounding-server] Modele charge en {load_time:.1f}s | " f"VRAM: {alloc:.2f} GB (peak: {peak:.2f} GB)") def _capture_screen(): """Capture l'ecran complet via mss. Retourne PIL Image ou None.""" try: import mss as mss_lib from PIL import Image with mss_lib.mss() as sct: mon = sct.monitors[0] grab = sct.grab(mon) return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX') except Exception as e: print(f"[grounding-server] Erreur capture ecran: {e}") return None def _parse_coordinates(raw: str, orig_w: int, orig_h: int, resized_w: int, resized_h: int): """Parse les coordonnees du modele — identique a /tmp/test_uitars.py. Retourne (px, py, method_detail, confidence) ou None. """ cx, cy = None, None # Format 1: x y pm = re.search(r'\s*(\d+)\s+(\d+)\s*', raw) if pm: cx, cy = int(pm.group(1)), int(pm.group(2)) # Format 2: start_box='(x, y)' if cx is None: bm = re.search(r"start_box=\s*['\"]?\((\d+)\s*,\s*(\d+)\)", raw) if bm: cx, cy = int(bm.group(1)), int(bm.group(2)) # Format 3: fallback x, y if cx is None: fm = re.search(r'(\d+)\s*,\s*(\d+)', raw) if fm: cx, cy = int(fm.group(1)), int(fm.group(2)) if cx is None or cy is None: return None # Conversion : tester les 2 interpretations, garder la meilleure # Methode A : coordonnees dans l'espace de l'image resizee px_r = int(cx / resized_w * orig_w) py_r = int(cy / resized_h * orig_h) delta_r = ((px_r - orig_w / 2) ** 2 + (py_r - orig_h / 2) ** 2) ** 0.5 # Methode B : coordonnees 0-1000 px_1k = int(cx / 1000 * orig_w) py_1k = int(cy / 1000 * orig_h) delta_1k = ((px_1k - orig_w / 2) ** 2 + (py_1k - orig_h / 2) ** 2) ** 0.5 # Heuristique du script valide : si coords dans les limites du resize, # les deux sont possibles. UI-TARS utilise l'espace resize en natif. if cx <= resized_w and cy <= resized_h: in_screen_r = (0 <= px_r <= orig_w and 0 <= py_r <= orig_h) in_screen_1k = (0 <= px_1k <= orig_w and 0 <= py_1k <= orig_h) if in_screen_r and in_screen_1k: px, py = px_r, py_r method_detail = "resized" elif in_screen_r: px, py = px_r, py_r method_detail = "resized" else: px, py = px_1k, py_1k method_detail = "0-1000" else: px, py = px_1k, py_1k method_detail = "0-1000" confidence = 0.85 if ("start_box" in raw or "" in raw) else 0.70 print(f"[grounding-server] model=({cx},{cy}) -> pixel=({px},{py}) " f"[{method_detail}] resized={resized_w}x{resized_h} orig={orig_w}x{orig_h}") return px, py, method_detail, confidence # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI(title="RPA Vision Grounding Server", version="1.0.0") class GroundRequest(BaseModel): target_text: str = "" target_description: str = "" image_b64: str = "" class GroundResponse(BaseModel): x: Optional[int] = None y: Optional[int] = None method: str = "ui_tars" confidence: float = 0.85 time_ms: float = 0.0 raw_output: str = "" @app.get("/health") def health(): return { "status": "ok" if _model_loaded else "loading", "model": MODEL_ID, "model_loaded": _model_loaded, "cuda_available": torch.cuda.is_available(), "vram_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2) if torch.cuda.is_available() else 0, } @app.post("/ground", response_model=GroundResponse) def ground(req: GroundRequest): if not _model_loaded: raise HTTPException(status_code=503, detail="Modele pas encore charge") from PIL import Image from qwen_vl_utils import process_vision_info # Construire l'instruction parts = [] if req.target_text: parts.append(req.target_text) if req.target_description: parts.append(req.target_description) if not parts: raise HTTPException(status_code=400, detail="target_text ou target_description requis") instruction = f"Click on the {' — '.join(parts)}" # Obtenir l'image (fournie en b64 ou capture ecran) if req.image_b64: try: raw_b64 = req.image_b64.split(',')[1] if ',' in req.image_b64 else req.image_b64 img_data = base64.b64decode(raw_b64) screen_pil = Image.open(io.BytesIO(img_data)).convert('RGB') except Exception as e: raise HTTPException(status_code=400, detail=f"Erreur decodage image: {e}") else: screen_pil = _capture_screen() if screen_pil is None: raise HTTPException(status_code=500, detail="Capture ecran echouee") W, H = screen_pil.size rH, rW = _smart_resize(H, W, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS) # Sauver temporairement l'image pour qwen_vl_utils import tempfile tmp_path = os.path.join(tempfile.gettempdir(), f"grounding_screen_{os.getpid()}.png") screen_pil.save(tmp_path) try: system_prompt = _GROUNDING_PROMPT.format(instruction=instruction) messages = [ { "role": "user", "content": [ { "type": "image", "image": f"file://{tmp_path}", "min_pixels": MIN_PIXELS, "max_pixels": MAX_PIXELS, }, { "type": "text", "text": system_prompt, }, ], } ] text = _processor.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) image_inputs, video_inputs = process_vision_info(messages) inputs = _processor( text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt", ).to(_model.device) # Inference t0 = time.time() with torch.no_grad(): gen = _model.generate(**inputs, max_new_tokens=64) infer_ms = (time.time() - t0) * 1000 # Decoder trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)] raw = _processor.batch_decode( trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False )[0].strip() print(f"[grounding-server] '{instruction}' -> raw='{raw[:150]}' ({infer_ms:.0f}ms)") # Détecter les réponses négatives (le modèle dit qu'il ne voit pas l'élément) _raw_lower = raw.lower() _negative_markers = ["don't see", "do not see", "cannot find", "can't find", "not visible", "not found", "doesn't appear", "does not appear", "i don't", "unable to find", "unable to locate"] for _neg in _negative_markers: if _neg in _raw_lower: print(f"[grounding-server] NÉGATIF détecté: '{_neg}' → élément non trouvé") return GroundResponse(x=None, y=None, method="ui_tars", confidence=0.0, time_ms=round(infer_ms, 1), raw_output=raw[:300]) # Parser les coordonnees parsed = _parse_coordinates(raw, W, H, rW, rH) if parsed is None: raise HTTPException( status_code=422, detail=f"Coordonnees non parsees dans la reponse: {raw[:200]}" ) px, py, method_detail, confidence = parsed print(f"[grounding-server] Resultat: ({px}, {py}) conf={confidence:.2f} " f"[{method_detail}] ({infer_ms:.0f}ms)") return GroundResponse( x=px, y=py, method="ui_tars", confidence=confidence, time_ms=round(infer_ms, 1), raw_output=raw[:300], ) finally: try: os.unlink(tmp_path) except OSError: pass # --------------------------------------------------------------------------- # Entrypoint # --------------------------------------------------------------------------- @app.on_event("startup") async def startup_event(): """Charge le modele au demarrage du serveur.""" print(f"[grounding-server] Demarrage sur port {PORT}...") _load_model() print(f"[grounding-server] Pret a recevoir des requetes sur http://localhost:{PORT}") if __name__ == "__main__": uvicorn.run( "core.grounding.server:app", host="0.0.0.0", port=PORT, log_level="info", workers=1, # 1 seul worker (1 seul GPU) )