docs: cartographie complète d'exécution + fix target_text ORA + worker InfiGUI fichiers

docs/CARTOGRAPHY.md :
- Carte complète des 2 chemins d'exécution (Legacy vs ORA)
- 12 systèmes de grounding identifiés dont 3 morts
- Trace du champ target_text de la capture au clic
- Fonctions existantes non branchées (verify, recovery, ShadowLearningHook)
- Budget VRAM, fichiers critiques, règles de modification

Fix target_text ORA (observe_reason_act.py:217) :
- Détecte les target_text absurdes ("click_anchor")
- Appelle _describe_anchor_image() (VLM) pour décrire le crop
- Même logique que le legacy execute.py:893

Worker InfiGUI via fichiers /tmp :
- Communication par fichiers (pas subprocess pipes, pas HTTP)
- Process indépendant lancé avant le backend
- Résout le crash CUDA dans Flask/FastAPI/uvicorn

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-26 12:37:43 +02:00
parent f73a2a59a9
commit 3d6868f029
6 changed files with 878 additions and 581 deletions

View File

@@ -213,8 +213,31 @@ class ORALoop:
# --- Mapper action_type vers action Decision --- # --- Mapper action_type vers action Decision ---
# Types d'action qui ne sont PAS des descriptions valides
_action_type_names = {'click_anchor', 'double_click_anchor', 'right_click_anchor',
'hover_anchor', 'focus_anchor', 'scroll_to_anchor',
'click', 'type_text', 'keyboard_shortcut', 'wait_for_anchor'}
if action_type in ('click_anchor', 'click', 'double_click_anchor', 'right_click_anchor'): if action_type in ('click_anchor', 'click', 'double_click_anchor', 'right_click_anchor'):
target_text = anchor.get('target_text', '') or label target_text = anchor.get('target_text', '') or anchor.get('description', '')
# Si target_text est vide ou est un nom d'action → décrire le crop
if not target_text or target_text in _action_type_names:
screenshot_b64 = anchor.get('screenshot', '')
if screenshot_b64:
try:
from core.execution.input_handler import _describe_anchor_image
desc = _describe_anchor_image(screenshot_b64)
if desc and len(desc) > 2:
target_text = desc
print(f"🏷️ [ORA/reason] Ancre décrite par VLM: '{target_text}'")
except Exception:
pass
# Dernier fallback : label si pas un nom d'action
if not target_text or target_text in _action_type_names:
target_text = label if label not in _action_type_names else ''
action = 'click' action = 'click'
value = 'double' if action_type == 'double_click_anchor' else ( value = 'double' if action_type == 'double_click_anchor' else (
'right' if action_type == 'right_click_anchor' else 'left') 'right' if action_type == 'right_click_anchor' else 'left')
@@ -1234,27 +1257,25 @@ Règles:
# --- 1. Observer l'état pré-action --- # --- 1. Observer l'état pré-action ---
pre = self.observe() pre = self.observe()
# --- 1b. Réflexe Check : popup/dialogue inattendu ? --- # --- 1b. Réflexe : dialogue inattendu ? ---
# Déclenché UNIQUEMENT si le pHash a changé de manière inattendue # Déclenché si le pHash a changé de manière inattendue.
# (= un popup est probablement apparu). Sinon → 0ms, pas d'OCR. # Flux : titre fenêtre (50ms) → dialogue connu ? → InfiGUI clique (3s)
if i > 0 and hasattr(self, '_last_post_phash') and self._last_post_phash: if i > 0 and hasattr(self, '_last_post_phash') and self._last_post_phash:
_phash_distance = self._phash_distance(pre.phash, self._last_post_phash) _phash_distance = self._phash_distance(pre.phash, self._last_post_phash)
if _phash_distance > 10: # Changement significatif inattendu if _phash_distance > 10:
print(f"🧠 [ORA/réflexe] pHash changé (distance={_phash_distance}) → vérification popup") print(f"🧠 [ORA/réflexe] pHash changé (distance={_phash_distance}) → vérification dialogue")
try: try:
from core.execution.input_handler import check_screen_for_patterns, handle_detected_pattern from core.grounding.dialog_handler import DialogHandler
_reflex_pattern = check_screen_for_patterns() _dh = DialogHandler()
if _reflex_pattern: _dh_result = _dh.handle_if_dialog(pre.screenshot)
_reflex_name = _reflex_pattern.get('pattern', '?') if _dh_result.get('handled'):
_reflex_target = _reflex_pattern.get('target', '?') print(f"✅ [ORA/réflexe] Dialogue '{_dh_result['title'][:30]}' géré → {_dh_result['action']}")
print(f"🧠 [ORA/réflexe] Pattern détecté: '{_reflex_name}' → clic '{_reflex_target}'") time.sleep(0.5)
_handled = handle_detected_pattern(_reflex_pattern) pre = self.observe()
if _handled: elif _dh_result.get('dialog_type'):
print(f" [ORA/réflexe] Dialogue '{_reflex_name}' géré automatiquement") print(f"⚠️ [ORA/réflexe] Dialogue '{_dh_result.get('dialog_type')}' détecté mais non géré: {_dh_result.get('reason')}")
time.sleep(0.5) else:
pre = self.observe() print(f"🧠 [ORA/réflexe] Pas de dialogue détecté: {_dh_result.get('reason', '?')}")
else:
print(f"⚠️ [ORA/réflexe] Pattern '{_reflex_name}' détecté mais non géré")
except Exception as _reflex_err: except Exception as _reflex_err:
print(f"⚠️ [ORA/réflexe] Erreur: {_reflex_err}") print(f"⚠️ [ORA/réflexe] Erreur: {_reflex_err}")

View File

@@ -0,0 +1,253 @@
"""
core/grounding/dialog_handler.py — Gestion intelligente des dialogues
Quand un dialogue inattendu apparaît (pHash change après une action) :
1. Lire le titre de la fenêtre (EasyOCR crop 45px, ~130ms)
2. Si titre connu (Enregistrer sous, Confirmer, etc.) → action connue
3. Demander à InfiGUI de cliquer sur le bon bouton (~3s)
4. Vérifier que le dialogue a disparu (pHash)
Pas de patterns prédéfinis pour les boutons. InfiGUI comprend
visuellement le dialogue et clique au bon endroit.
Utilisation :
from core.grounding.dialog_handler import DialogHandler
handler = DialogHandler()
result = handler.handle_if_dialog(screenshot_pil)
if result['handled']:
print(f"Dialogue '{result['title']}' géré → {result['action']}")
"""
from __future__ import annotations
import time
from typing import Any, Dict, Optional
# Titres connus → quelle action demander à InfiGUI
KNOWN_DIALOGS = {
"enregistrer sous": {"target": "Enregistrer", "description": "Clique sur le bouton Enregistrer dans le dialogue Enregistrer sous"},
"save as": {"target": "Save", "description": "Click the Save button in the Save As dialog"},
"confirmer": {"target": "Oui", "description": "Clique sur le bouton Oui dans le dialogue de confirmation"},
"remplacer": {"target": "Oui", "description": "Clique sur le bouton Oui pour confirmer le remplacement du fichier"},
"replace": {"target": "Yes", "description": "Click Yes to confirm file replacement"},
"voulez-vous enregistrer": {"target": "Enregistrer", "description": "Clique sur Enregistrer pour sauvegarder les modifications"},
"do you want to save": {"target": "Save", "description": "Click Save to save changes"},
"overwrite": {"target": "Yes", "description": "Click Yes to overwrite"},
"écraser": {"target": "Oui", "description": "Clique sur Oui pour écraser le fichier"},
"already exists": {"target": "Yes", "description": "Click Yes, the file already exists"},
"existe déjà": {"target": "Oui", "description": "Clique sur Oui, le fichier existe déjà"},
"erreur": {"target": "OK", "description": "Clique sur OK pour fermer le message d'erreur"},
"error": {"target": "OK", "description": "Click OK to close the error message"},
"avertissement": {"target": "OK", "description": "Clique sur OK pour fermer l'avertissement"},
"warning": {"target": "OK", "description": "Click OK to close the warning"},
}
class DialogHandler:
"""Gestion intelligente des dialogues via titre + InfiGUI."""
GROUNDING_URL = "http://localhost:8200"
def __init__(self):
self._easyocr_reader = None
def handle_if_dialog(
self,
screenshot_pil,
previous_title: str = "",
) -> Dict[str, Any]:
"""Vérifie si l'écran montre un dialogue et le gère.
Args:
screenshot_pil: Screenshot PIL actuel.
previous_title: Titre de la fenêtre avant l'action (pour comparaison).
Returns:
Dict avec 'handled' (bool), 'title', 'action', 'position'.
"""
t0 = time.time()
# 1. Lire le titre de la fenêtre
title = self._read_title(screenshot_pil)
if not title or len(title) < 3:
return {'handled': False, 'title': '', 'reason': 'Titre illisible'}
print(f"🔍 [Dialog] Titre lu: '{title}'")
# 2. Chercher si c'est un dialogue connu
matched_dialog = None
for key, action_info in KNOWN_DIALOGS.items():
if key in title.lower():
matched_dialog = (key, action_info)
break
if not matched_dialog:
# Pas un dialogue connu — le workflow continue normalement
return {'handled': False, 'title': title, 'reason': 'Pas un dialogue connu'}
dialog_key, action_info = matched_dialog
target = action_info['target']
description = action_info['description']
print(f"🧠 [Dialog] Dialogue détecté: '{dialog_key}' → clic '{target}'")
# 3. Demander à InfiGUI de cliquer sur le bouton
click_result = self._click_via_infigui(
target, description, screenshot_pil
)
dt = (time.time() - t0) * 1000
if click_result:
print(f"✅ [Dialog] Clic '{target}' à ({click_result['x']}, {click_result['y']}) ({dt:.0f}ms)")
return {
'handled': True,
'title': title,
'dialog_type': dialog_key,
'action': f"click '{target}'",
'position': (click_result['x'], click_result['y']),
'time_ms': dt,
}
else:
# InfiGUI n'a pas trouvé le bouton — essayer le clic direct via OCR
print(f"⚠️ [Dialog] InfiGUI n'a pas trouvé '{target}', essai OCR direct")
ocr_result = self._click_via_ocr(target, screenshot_pil)
dt = (time.time() - t0) * 1000
if ocr_result:
print(f"✅ [Dialog] OCR clic '{target}' à ({ocr_result[0]}, {ocr_result[1]}) ({dt:.0f}ms)")
return {
'handled': True,
'title': title,
'dialog_type': dialog_key,
'action': f"click '{target}' (OCR)",
'position': ocr_result,
'time_ms': dt,
}
print(f"❌ [Dialog] Impossible de cliquer '{target}' ({dt:.0f}ms)")
return {
'handled': False,
'title': title,
'dialog_type': dialog_key,
'reason': f"Bouton '{target}' introuvable",
'time_ms': dt,
}
# ------------------------------------------------------------------
# Lecture titre
# ------------------------------------------------------------------
def _read_title(self, screenshot_pil) -> str:
"""Lit TOUT le texte visible via EasyOCR full-screen (~500ms).
En VM QEMU, la barre de titre Windows est à l'intérieur du framebuffer,
pas en haut absolu de l'écran. On fait l'OCR full-screen et on cherche
les mots-clés des dialogues connus dans le texte complet.
"""
try:
import numpy as np
reader = self._get_easyocr()
if reader is None:
return ""
results = reader.readtext(np.array(screenshot_pil))
full_text = ' '.join(r[1] for r in results if r[1].strip())
return full_text
except Exception as e:
print(f"⚠️ [Dialog] Erreur lecture écran: {e}")
return ""
# ------------------------------------------------------------------
# Clic via InfiGUI (serveur grounding)
# ------------------------------------------------------------------
def _click_via_infigui(
self, target: str, description: str, screenshot_pil
) -> Optional[Dict]:
"""Demande à InfiGUI de localiser et cliquer sur le bouton."""
try:
import requests
import base64
import io
buf = io.BytesIO()
screenshot_pil.save(buf, format='JPEG', quality=85)
b64 = base64.b64encode(buf.getvalue()).decode()
resp = requests.post(f"{self.GROUNDING_URL}/ground", json={
'target_text': target,
'target_description': description,
'image_b64': b64,
}, timeout=15)
if resp.status_code == 200:
data = resp.json()
if data.get('x') is not None:
# Cliquer
import pyautogui
pyautogui.click(data['x'], data['y'])
return data
return None
except Exception as e:
print(f"⚠️ [Dialog/InfiGUI] Erreur: {e}")
return None
# ------------------------------------------------------------------
# Clic via OCR (fallback rapide)
# ------------------------------------------------------------------
def _click_via_ocr(self, target: str, screenshot_pil) -> Optional[tuple]:
"""Cherche le bouton par OCR et clique dessus."""
try:
import numpy as np
reader = self._get_easyocr()
if reader is None:
return None
results = reader.readtext(np.array(screenshot_pil))
target_lower = target.lower()
matches = []
for (bbox_pts, text, conf) in results:
if target_lower in text.lower() or text.lower() in target_lower:
x = int(sum(p[0] for p in bbox_pts) / 4)
y = int(sum(p[1] for p in bbox_pts) / 4)
matches.append((x, y, text))
if matches:
# Prendre le match le plus bas (boutons = bas du dialogue)
best = max(matches, key=lambda m: m[1])
import pyautogui
pyautogui.click(best[0], best[1])
return (best[0], best[1])
return None
except Exception as e:
print(f"⚠️ [Dialog/OCR] Erreur: {e}")
return None
# ------------------------------------------------------------------
# EasyOCR singleton
# ------------------------------------------------------------------
def _get_easyocr(self):
if self._easyocr_reader is not None:
return self._easyocr_reader
try:
import easyocr
self._easyocr_reader = easyocr.Reader(
['fr', 'en'], gpu=True, verbose=False
)
return self._easyocr_reader
except ImportError:
return None

View File

@@ -0,0 +1,187 @@
#!/usr/bin/env python3
"""
Worker InfiGUI — process indépendant, communication par fichiers.
Charge le modèle, surveille /tmp/infigui_request.json, infère, écrit /tmp/infigui_response.json.
Lancement :
cd ~/ai/rpa_vision_v3
.venv/bin/python3 -m core.grounding.infigui_worker
"""
import json
import math
import os
import re
import sys
import time
import gc
import warnings
warnings.filterwarnings("ignore")
import torch
REQUEST_FILE = "/tmp/infigui_request.json"
RESPONSE_FILE = "/tmp/infigui_response.json"
READY_FILE = "/tmp/infigui_ready"
def load_model():
"""Charge InfiGUI-G1-3B en 4-bit NF4."""
torch.cuda.empty_cache()
gc.collect()
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig
model_id = "InfiX-ai/InfiGUI-G1-3B"
print(f"[infigui-worker] Chargement {model_id}...")
bnb = BitsAndBytesConfig(
load_in_4bit=True, bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True,
)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
model_id, quantization_config=bnb, device_map={"": "cuda:0"},
)
model.eval()
processor = AutoProcessor.from_pretrained(
model_id, padding_side="left",
min_pixels=100 * 28 * 28, max_pixels=5600 * 28 * 28,
)
vram = torch.cuda.memory_allocated() / 1e9
print(f"[infigui-worker] Prêt — VRAM: {vram:.2f}GB")
# Signal "prêt"
with open(READY_FILE, "w") as f:
f.write(f"ready {vram:.2f}GB")
return model, processor
def infer(model, processor, req):
"""Fait une inférence."""
from PIL import Image
from qwen_vl_utils import process_vision_info
target = req.get("target", "")
description = req.get("description", "")
label = f"{target}{description}" if description else target
if not label.strip():
return {"x": None, "y": None, "error": "target requis"}
# Image
image_path = req.get("image_path", "")
if image_path and os.path.exists(image_path):
img = Image.open(image_path).convert("RGB")
else:
import mss
with mss.mss() as sct:
grab = sct.grab(sct.monitors[0])
img = Image.frombytes("RGB", grab.size, grab.bgra, "raw", "BGRX")
W, H = img.size
factor = 28
rH = max(factor, round(H / factor) * factor)
rW = max(factor, round(W / factor) * factor)
system = (
"You FIRST think about the reasoning process as an internal monologue "
"and then provide the final answer.\n"
"The reasoning process MUST BE enclosed within <think> </think> tags."
)
user_text = (
f'The screen\'s resolution is {rW}x{rH}.\n'
f'Locate the UI element(s) for "{label}", '
f'output the coordinates using JSON format: '
f'[{{"point_2d": [x, y]}}, ...]'
)
messages = [
{"role": "system", "content": system},
{"role": "user", "content": [
{"type": "image", "image": img},
{"type": "text", "text": user_text},
]},
]
text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = processor(
text=[text], images=image_inputs, videos=video_inputs,
padding=True, return_tensors="pt",
).to(model.device)
t0 = time.time()
with torch.no_grad():
gen = model.generate(**inputs, max_new_tokens=512)
infer_ms = (time.time() - t0) * 1000
trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)]
raw = processor.batch_decode(
trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False,
)[0].strip()
print(f"[infigui-worker] '{label[:40]}' ({infer_ms:.0f}ms)")
# Parser JSON point_2d
json_part = raw.split("</think>")[-1] if "</think>" in raw else raw
json_part = json_part.replace("```json", "").replace("```", "").strip()
px, py = None, None
try:
parsed = json.loads(json_part)
if isinstance(parsed, list) and len(parsed) > 0:
pt = parsed[0].get("point_2d", [])
if len(pt) >= 2:
px = int(pt[0] * W / rW)
py = int(pt[1] * H / rH)
except json.JSONDecodeError:
m = re.search(r'"point_2d"\s*:\s*\[(\d+),\s*(\d+)\]', raw)
if m:
px = int(int(m.group(1)) * W / rW)
py = int(int(m.group(2)) * H / rH)
return {
"x": px, "y": py,
"method": "infigui",
"confidence": 0.90 if px else 0.0,
"time_ms": round(infer_ms, 1),
}
def main():
model, processor = load_model()
# Nettoyer les fichiers résiduels
for f in [REQUEST_FILE, RESPONSE_FILE]:
if os.path.exists(f):
os.unlink(f)
print(f"[infigui-worker] En attente de requêtes ({REQUEST_FILE})")
# Boucle : surveiller le fichier de requête
while True:
if os.path.exists(REQUEST_FILE):
try:
with open(REQUEST_FILE, "r") as f:
req = json.load(f)
os.unlink(REQUEST_FILE)
result = infer(model, processor, req)
with open(RESPONSE_FILE, "w") as f:
json.dump(result, f)
except Exception as e:
print(f"[infigui-worker] ERREUR: {e}")
with open(RESPONSE_FILE, "w") as f:
json.dump({"x": None, "y": None, "error": str(e)}, f)
time.sleep(0.05) # 50ms polling
if __name__ == "__main__":
main()

View File

@@ -1,425 +1,113 @@
""" """Serveur grounding minimaliste — Flask single-thread, même contexte CUDA."""
core/grounding/server.py — Serveur FastAPI de grounding visuel (port 8200) import base64, io, json, math, os, re, time, gc
Charge UI-TARS-1.5-7B en 4-bit NF4 dans son propre process Python avec son
propre contexte CUDA. Le backend Flask VWB (port 5002) et la boucle ORA
appellent ce serveur en HTTP au lieu de charger le modele in-process.
Lancement :
.venv/bin/python3 -m core.grounding.server
Endpoints :
GET /health — verifie que le modele est charge
POST /ground — localise un element UI sur un screenshot
"""
import base64
import gc
import io
import math
import os
import re
import time
from typing import Optional
import torch import torch
from fastapi import FastAPI, HTTPException from flask import Flask, request, jsonify
from pydantic import BaseModel from PIL import Image
import uvicorn
# --------------------------------------------------------------------------- app = Flask(__name__)
# Configuration
# ---------------------------------------------------------------------------
PORT = int(os.environ.get("GROUNDING_PORT", 8200))
MODEL_ID = os.environ.get("GROUNDING_MODEL", "InfiX-ai/InfiGUI-G1-3B") MODEL_ID = os.environ.get("GROUNDING_MODEL", "InfiX-ai/InfiGUI-G1-3B")
MIN_PIXELS = 100 * 28 * 28 MIN_PIXELS = 100 * 28 * 28
MAX_PIXELS = 5600 * 28 * 28 # InfiGUI recommande 5600*28*28 MAX_PIXELS = 5600 * 28 * 28
# ---------------------------------------------------------------------------
# Smart resize — identique a /tmp/test_uitars.py
# ---------------------------------------------------------------------------
def _smart_resize(height: int, width: int, factor: int = 28,
min_pixels: int = MIN_PIXELS, max_pixels: int = MAX_PIXELS):
"""UI-TARS smart resize (memes defaults que le test valide)."""
h_bar = max(factor, round(height / factor) * factor)
w_bar = max(factor, round(width / factor) * factor)
if h_bar * w_bar > max_pixels:
beta = math.sqrt((height * width) / max_pixels)
h_bar = math.floor(height / beta / factor) * factor
w_bar = math.floor(width / beta / factor) * factor
elif h_bar * w_bar < min_pixels:
beta = math.sqrt(min_pixels / (height * width))
h_bar = math.ceil(height * beta / factor) * factor
w_bar = math.ceil(width * beta / factor) * factor
return h_bar, w_bar
# ---------------------------------------------------------------------------
# Prompts — InfiGUI-G1-3B (format officiel de la doc HuggingFace)
# ---------------------------------------------------------------------------
_SYSTEM_PROMPT = """You FIRST think about the reasoning process as an internal monologue and then provide the final answer.
The reasoning process MUST BE enclosed within <think> </think> tags."""
# ---------------------------------------------------------------------------
# Modele singleton
# ---------------------------------------------------------------------------
_model = None _model = None
_processor = None _processor = None
_model_loaded = False
def _smart_resize(h, w, factor=28):
h_bar = max(factor, round(h/factor)*factor)
w_bar = max(factor, round(w/factor)*factor)
if h_bar*w_bar > MAX_PIXELS:
beta = math.sqrt((h*w)/MAX_PIXELS)
h_bar = math.floor(h/beta/factor)*factor
w_bar = math.floor(w/beta/factor)*factor
elif h_bar*w_bar < MIN_PIXELS:
beta = math.sqrt(MIN_PIXELS/(h*w))
h_bar = math.ceil(h*beta/factor)*factor
w_bar = math.ceil(w*beta/factor)*factor
return h_bar, w_bar
def _evict_ollama_models(): def load_model():
"""Libere les modeles Ollama de la VRAM avant de charger UI-TARS.""" global _model, _processor
try: if _model is not None:
import requests
try:
ps_resp = requests.get('http://localhost:11434/api/ps', timeout=3)
if ps_resp.status_code == 200:
loaded = ps_resp.json().get('models', [])
model_names = [m.get('name', '') for m in loaded if m.get('name')]
else:
model_names = []
except Exception:
model_names = []
if not model_names:
print("[grounding-server] Aucun modele Ollama en VRAM")
return
for model_name in model_names:
try:
requests.post(
'http://localhost:11434/api/generate',
json={'model': model_name, 'keep_alive': '0'},
timeout=5,
)
print(f"[grounding-server] Ollama: eviction de '{model_name}'")
except Exception:
pass
time.sleep(1.0)
print("[grounding-server] Modeles Ollama liberes")
except ImportError:
print("[grounding-server] requests non dispo, skip eviction Ollama")
def _load_model():
"""Charge le modele de grounding en 4-bit NF4."""
global _model, _processor, _model_loaded
if _model_loaded:
return return
print("=" * 60)
print(f"[grounding-server] Chargement de {MODEL_ID}")
print("=" * 60)
if not torch.cuda.is_available():
raise RuntimeError("CUDA non disponible — le serveur de grounding necessite un GPU")
# Liberer la VRAM Ollama
_evict_ollama_models()
torch.cuda.empty_cache()
gc.collect()
from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig from transformers import Qwen2_5_VLForConditionalGeneration, AutoProcessor, BitsAndBytesConfig
torch.cuda.empty_cache(); gc.collect()
print(f"[grounding] Chargement {MODEL_ID}...")
bnb = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16, bnb_4bit_use_double_quant=True)
_model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
MODEL_ID, quantization_config=bnb, device_map="auto")
_model.eval()
_processor = AutoProcessor.from_pretrained(MODEL_ID, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS, padding_side="left")
print(f"[grounding] Prêt — VRAM: {torch.cuda.memory_allocated()/1e9:.2f}GB")
bnb_config = BitsAndBytesConfig( @app.route('/health')
load_in_4bit=True, def health():
bnb_4bit_quant_type="nf4", return jsonify({"status": "ok", "model": MODEL_ID, "model_loaded": _model is not None,
bnb_4bit_compute_dtype=torch.bfloat16, "cuda_available": torch.cuda.is_available(),
bnb_4bit_use_double_quant=True, "vram_allocated_gb": round(torch.cuda.memory_allocated()/1e9, 2)})
)
@app.route('/ground', methods=['POST'])
def ground():
if _model is None:
return jsonify({"error": "Modèle pas chargé"}), 503
from qwen_vl_utils import process_vision_info
data = request.json
target = data.get('target_text', '')
desc = data.get('target_description', '')
label = f"{target}{desc}" if desc else target
if not label.strip():
return jsonify({"error": "target_text requis"}), 400
# Image
if data.get('image_b64'):
raw = data['image_b64'].split(',')[1] if ',' in data['image_b64'] else data['image_b64']
img = Image.open(io.BytesIO(base64.b64decode(raw))).convert('RGB')
else:
import mss
with mss.mss() as sct:
grab = sct.grab(sct.monitors[0])
img = Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX')
W, H = img.size
rH, rW = _smart_resize(H, W)
user_text = f'The screen\'s resolution is {rW}x{rH}.\nLocate the UI element(s) for "{label}", output the coordinates using JSON format: [{{"point_2d": [x, y]}}, ...]'
system = "You FIRST think about the reasoning process as an internal monologue and then provide the final answer.\nThe reasoning process MUST BE enclosed within <think> </think> tags."
messages = [{"role": "system", "content": system},
{"role": "user", "content": [{"type": "image", "image": img}, {"type": "text", "text": user_text}]}]
text = _processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
image_inputs, video_inputs = process_vision_info(messages)
inputs = _processor(text=[text], images=image_inputs, videos=video_inputs, padding=True, return_tensors="pt").to(_model.device)
t0 = time.time() t0 = time.time()
_model = Qwen2_5_VLForConditionalGeneration.from_pretrained( with torch.no_grad():
MODEL_ID, gen = _model.generate(**inputs, max_new_tokens=512)
quantization_config=bnb_config, infer_ms = (time.time()-t0)*1000
device_map="auto",
)
_model.eval()
_processor = AutoProcessor.from_pretrained( trimmed = [o[len(i):] for i,o in zip(inputs.input_ids, gen)]
MODEL_ID, raw = _processor.batch_decode(trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)[0].strip()
min_pixels=MIN_PIXELS, print(f"[grounding] '{label[:40]}'{raw[:100]} ({infer_ms:.0f}ms)")
max_pixels=MAX_PIXELS,
padding_side="left",
)
_model_loaded = True # Parser JSON point_2d
load_time = time.time() - t0 json_part = raw.split("</think>")[-1] if "</think>" in raw else raw
alloc = torch.cuda.memory_allocated() / 1024**3 json_part = json_part.replace("```json","").replace("```","").strip()
peak = torch.cuda.max_memory_allocated() / 1024**3 px, py = None, None
print(f"[grounding-server] Modele charge en {load_time:.1f}s | "
f"VRAM: {alloc:.2f} GB (peak: {peak:.2f} GB)")
def _capture_screen():
"""Capture l'ecran complet via mss. Retourne PIL Image ou None."""
try: try:
import mss as mss_lib parsed = json.loads(json_part)
from PIL import Image if isinstance(parsed, list) and len(parsed) > 0:
with mss_lib.mss() as sct: pt = parsed[0].get("point_2d", [])
mon = sct.monitors[0] if len(pt) >= 2:
grab = sct.grab(mon) px, py = int(pt[0]*W/rW), int(pt[1]*H/rH)
return Image.frombytes('RGB', grab.size, grab.bgra, 'raw', 'BGRX') except json.JSONDecodeError:
except Exception as e: m = re.search(r'"point_2d"\s*:\s*\[(\d+),\s*(\d+)\]', raw)
print(f"[grounding-server] Erreur capture ecran: {e}") if m:
return None px, py = int(int(m.group(1))*W/rW), int(int(m.group(2))*H/rH)
return jsonify({"x": px, "y": py, "method": "infigui", "confidence": 0.90 if px else 0.0,
"time_ms": round(infer_ms, 1), "raw_output": raw[:300]})
def _parse_coordinates(raw: str, orig_w: int, orig_h: int, if __name__ == '__main__':
resized_w: int, resized_h: int): load_model()
"""Parse les coordonnees du modele — identique a /tmp/test_uitars.py. app.run(host='0.0.0.0', port=8200, threaded=False)
Retourne (px, py, method_detail, confidence) ou None.
"""
cx, cy = None, None
# Format 1: <point>x y</point>
pm = re.search(r'<point>\s*(\d+)\s+(\d+)\s*</point>', raw)
if pm:
cx, cy = int(pm.group(1)), int(pm.group(2))
# Format 2: start_box='(x, y)'
if cx is None:
bm = re.search(r"start_box=\s*['\"]?\((\d+)\s*,\s*(\d+)\)", raw)
if bm:
cx, cy = int(bm.group(1)), int(bm.group(2))
# Format 3: fallback x, y
if cx is None:
fm = re.search(r'(\d+)\s*,\s*(\d+)', raw)
if fm:
cx, cy = int(fm.group(1)), int(fm.group(2))
if cx is None or cy is None:
return None
# Conversion : tester les 2 interpretations, garder la meilleure
# Methode A : coordonnees dans l'espace de l'image resizee
px_r = int(cx / resized_w * orig_w)
py_r = int(cy / resized_h * orig_h)
delta_r = ((px_r - orig_w / 2) ** 2 + (py_r - orig_h / 2) ** 2) ** 0.5
# Methode B : coordonnees 0-1000
px_1k = int(cx / 1000 * orig_w)
py_1k = int(cy / 1000 * orig_h)
delta_1k = ((px_1k - orig_w / 2) ** 2 + (py_1k - orig_h / 2) ** 2) ** 0.5
# Heuristique du script valide : si coords dans les limites du resize,
# les deux sont possibles. UI-TARS utilise l'espace resize en natif.
if cx <= resized_w and cy <= resized_h:
in_screen_r = (0 <= px_r <= orig_w and 0 <= py_r <= orig_h)
in_screen_1k = (0 <= px_1k <= orig_w and 0 <= py_1k <= orig_h)
if in_screen_r and in_screen_1k:
px, py = px_r, py_r
method_detail = "resized"
elif in_screen_r:
px, py = px_r, py_r
method_detail = "resized"
else:
px, py = px_1k, py_1k
method_detail = "0-1000"
else:
px, py = px_1k, py_1k
method_detail = "0-1000"
confidence = 0.85 if ("start_box" in raw or "<point>" in raw) else 0.70
print(f"[grounding-server] model=({cx},{cy}) -> pixel=({px},{py}) "
f"[{method_detail}] resized={resized_w}x{resized_h} orig={orig_w}x{orig_h}")
return px, py, method_detail, confidence
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="RPA Vision Grounding Server", version="1.0.0")
class GroundRequest(BaseModel):
target_text: str = ""
target_description: str = ""
image_b64: str = ""
class GroundResponse(BaseModel):
x: Optional[int] = None
y: Optional[int] = None
method: str = "ui_tars"
confidence: float = 0.85
time_ms: float = 0.0
raw_output: str = ""
@app.get("/health")
def health():
return {
"status": "ok" if _model_loaded else "loading",
"model": MODEL_ID,
"model_loaded": _model_loaded,
"cuda_available": torch.cuda.is_available(),
"vram_allocated_gb": round(torch.cuda.memory_allocated() / 1024**3, 2) if torch.cuda.is_available() else 0,
}
@app.post("/ground", response_model=GroundResponse)
def ground(req: GroundRequest):
if not _model_loaded:
raise HTTPException(status_code=503, detail="Modele pas encore charge")
from PIL import Image
from qwen_vl_utils import process_vision_info
# Construire la description de la cible
parts = []
if req.target_text:
parts.append(req.target_text)
if req.target_description:
parts.append(req.target_description)
if not parts:
raise HTTPException(status_code=400, detail="target_text ou target_description requis")
target_label = ''.join(parts)
# Obtenir l'image (fournie en b64 ou capture ecran)
if req.image_b64:
try:
raw_b64 = req.image_b64.split(',')[1] if ',' in req.image_b64 else req.image_b64
img_data = base64.b64decode(raw_b64)
screen_pil = Image.open(io.BytesIO(img_data)).convert('RGB')
except Exception as e:
raise HTTPException(status_code=400, detail=f"Erreur decodage image: {e}")
else:
screen_pil = _capture_screen()
if screen_pil is None:
raise HTTPException(status_code=500, detail="Capture ecran echouee")
W, H = screen_pil.size
rH, rW = _smart_resize(H, W, min_pixels=MIN_PIXELS, max_pixels=MAX_PIXELS)
try:
import json as _json
# Prompt officiel InfiGUI-G1-3B (doc HuggingFace)
user_text = (
f'The screen\'s resolution is {rW}x{rH}.\n'
f'Locate the UI element(s) for "{target_label}", '
f'output the coordinates using JSON format: '
f'[{{"point_2d": [x, y]}}, ...]'
)
messages = [
{"role": "system", "content": _SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "image", "image": screen_pil},
{"type": "text", "text": user_text},
]},
]
text = _processor.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
image_inputs, video_inputs = process_vision_info(messages)
inputs = _processor(
text=[text],
images=image_inputs,
videos=video_inputs,
padding=True,
return_tensors="pt",
).to(_model.device)
# Inference
t0 = time.time()
with torch.no_grad():
gen = _model.generate(**inputs, max_new_tokens=512)
infer_ms = (time.time() - t0) * 1000
# Decoder
trimmed = [o[len(i):] for i, o in zip(inputs.input_ids, gen)]
raw = _processor.batch_decode(
trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
)[0].strip()
print(f"[grounding-server] '{target_label}' -> raw='{raw[:150]}' ({infer_ms:.0f}ms)")
# Parser le JSON InfiGUI : split sur </think>, extraire point_2d
px, py = None, None
json_part = raw.split("</think>")[-1] if "</think>" in raw else raw
json_part = json_part.replace("```json", "").replace("```", "").strip()
try:
data = _json.loads(json_part)
if isinstance(data, list) and len(data) > 0:
pt = data[0].get("point_2d", [])
if len(pt) >= 2:
# Coordonnées en pixels resizés → convertir en pixels originaux
px = int(pt[0] * W / rW)
py = int(pt[1] * H / rH)
except _json.JSONDecodeError:
# Fallback regex
m = re.search(r'"point_2d"\s*:\s*\[(\d+),\s*(\d+)\]', raw)
if m:
px = int(int(m.group(1)) * W / rW)
py = int(int(m.group(2)) * H / rH)
if px is None:
# Détection réponses négatives
_raw_lower = raw.lower()
for _neg in ["don't see", "cannot find", "not visible", "not found",
"unable to find", "unable to locate", "does not appear"]:
if _neg in _raw_lower:
print(f"[grounding-server] NÉGATIF: '{_neg}'")
return GroundResponse(x=None, y=None, method="infigui",
confidence=0.0, time_ms=round(infer_ms, 1),
raw_output=raw[:300])
print(f"[grounding-server] Coordonnées non parsées: {json_part[:100]}")
return GroundResponse(x=None, y=None, method="infigui",
confidence=0.0, time_ms=round(infer_ms, 1),
raw_output=raw[:300])
confidence = 0.90
print(f"[grounding-server] Résultat: ({px}, {py}) conf={confidence:.2f} ({infer_ms:.0f}ms)")
return GroundResponse(
x=px, y=py, method="infigui",
confidence=confidence, time_ms=round(infer_ms, 1),
raw_output=raw[:300],
)
except Exception as e:
print(f"[grounding-server] ERREUR: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
@app.on_event("startup")
async def startup_event():
"""Charge le modele au demarrage du serveur."""
print(f"[grounding-server] Demarrage sur port {PORT}...")
_load_model()
print(f"[grounding-server] Pret a recevoir des requetes sur http://localhost:{PORT}")
if __name__ == "__main__":
uvicorn.run(
"core.grounding.server:app",
host="0.0.0.0",
port=PORT,
log_level="info",
workers=1, # 1 seul worker (1 seul GPU)
)

View File

@@ -1,57 +1,41 @@
""" """
core/grounding/ui_tars_grounder.py — Client HTTP pour le serveur de grounding core/grounding/ui_tars_grounder.py — Grounding via worker InfiGUI indépendant
Remplace le chargement in-process du modele UI-TARS (qui crashe dans Flask Communication par fichiers :
a cause de conflits CUDA) par un CLIENT HTTP qui appelle le serveur de - Écrit la requête dans /tmp/infigui_request.json
grounding separe sur le port 8200. - Le worker lit, infère, écrit la réponse dans /tmp/infigui_response.json
- Le grounder lit la réponse
Le serveur est lance separement via : Le worker est un process indépendant lancé par start_grounding_worker.sh,
.venv/bin/python3 -m core.grounding.server PAS un subprocess de Flask.
Utilisation (inchangee) :
from core.grounding.ui_tars_grounder import UITarsGrounder
grounder = UITarsGrounder.get_instance()
result = grounder.ground("Bouton Valider", "le bouton vert en bas a droite")
if result:
print(f"Trouve a ({result.x}, {result.y})")
""" """
from __future__ import annotations from __future__ import annotations
import base64 import json
import io
import os import os
import threading
import time import time
import threading
from typing import Optional from typing import Optional
from core.grounding.target import GroundingResult from core.grounding.target import GroundingResult
# ---------------------------------------------------------------------------
# Singleton
# ---------------------------------------------------------------------------
_instance: Optional[UITarsGrounder] = None _instance: Optional[UITarsGrounder] = None
_instance_lock = threading.Lock() _instance_lock = threading.Lock()
REQUEST_FILE = "/tmp/infigui_request.json"
RESPONSE_FILE = "/tmp/infigui_response.json"
READY_FILE = "/tmp/infigui_ready"
class UITarsGrounder: class UITarsGrounder:
"""Client HTTP pour le serveur de grounding UI-TARS (port 8200). """Grounding via worker InfiGUI indépendant — communication par fichiers."""
Singleton : utiliser get_instance() pour obtenir l'instance unique.
Le serveur doit etre lance separement (.venv/bin/python3 -m core.grounding.server).
"""
SERVER_URL = os.environ.get("GROUNDING_SERVER_URL", "http://localhost:8200")
def __init__(self): def __init__(self):
self._server_available: Optional[bool] = None self._lock = threading.Lock()
self._last_check = 0.0
@classmethod @classmethod
def get_instance(cls) -> UITarsGrounder: def get_instance(cls) -> UITarsGrounder:
"""Retourne l'instance singleton du grounder."""
global _instance global _instance
if _instance is None: if _instance is None:
with _instance_lock: with _instance_lock:
@@ -59,146 +43,77 @@ class UITarsGrounder:
_instance = cls() _instance = cls()
return _instance return _instance
# ------------------------------------------------------------------
# Verification du serveur
# ------------------------------------------------------------------
def _check_server(self, force: bool = False) -> bool:
"""Verifie si le serveur de grounding est disponible.
Cache le resultat pendant 30 secondes pour eviter le spam.
"""
now = time.time()
if not force and self._server_available is not None and (now - self._last_check) < 30:
return self._server_available
try:
import requests
resp = requests.get(f"{self.SERVER_URL}/health", timeout=3)
if resp.status_code == 200:
data = resp.json()
self._server_available = data.get("model_loaded", False)
if not self._server_available:
print(f"[UI-TARS/client] Serveur en cours de chargement...")
else:
self._server_available = False
except Exception:
self._server_available = False
self._last_check = now
if not self._server_available:
print(f"[UI-TARS/client] Serveur non disponible sur {self.SERVER_URL} "
f"— lancer: .venv/bin/python3 -m core.grounding.server")
return self._server_available
@property @property
def is_loaded(self) -> bool: def available(self) -> bool:
"""Compatibilite : verifie si le serveur est pret.""" return os.path.exists(READY_FILE)
return self._check_server()
def load(self) -> None:
"""Compatibilite : ne fait rien (le serveur charge le modele au demarrage)."""
if not self._check_server(force=True):
print(f"[UI-TARS/client] ATTENTION: serveur non disponible sur {self.SERVER_URL}")
print(f"[UI-TARS/client] Lancer le serveur: .venv/bin/python3 -m core.grounding.server")
def unload(self) -> None:
"""Compatibilite : ne fait rien (le modele vit dans le process serveur)."""
pass
# ------------------------------------------------------------------
# Grounding via HTTP
# ------------------------------------------------------------------
def ground( def ground(
self, self,
target_text: str = "", target_text: str = "",
target_description: str = "", target_description: str = "",
screen_pil: Optional["PIL.Image.Image"] = None, screen_pil=None,
) -> Optional[GroundingResult]: ) -> Optional[GroundingResult]:
"""Localise un element UI en appelant le serveur de grounding. """Localise un élément UI via le worker InfiGUI."""
if not self.available:
Args: print("[InfiGUI] Worker non démarré (pas de /tmp/infigui_ready)")
target_text: texte visible de l'element (ex: "Valider", "Rechercher")
target_description: description semantique (ex: "le bouton vert en bas")
screen_pil: screenshot PIL, le serveur capture si None
Returns:
GroundingResult avec coordonnees en pixels ecran, ou None si echec
"""
if not target_text and not target_description:
print("[UI-TARS/client] Pas de target_text ni target_description")
return None return None
# Verifier que le serveur est disponible t0 = time.time()
if not self._check_server():
return None
import requests
# Encoder l'image en base64 si fournie
image_b64 = ""
if screen_pil is not None:
try:
buffer = io.BytesIO()
screen_pil.save(buffer, format='PNG')
image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
except Exception as e:
print(f"[UI-TARS/client] Erreur encodage image: {e}")
# Continuer sans image — le serveur capturera l'ecran
payload = {
"target_text": target_text,
"target_description": target_description,
"image_b64": image_b64,
}
try: try:
t0 = time.time() with self._lock:
resp = requests.post( # Sauver l'image si fournie
f"{self.SERVER_URL}/ground", image_path = ""
json=payload, if screen_pil is not None:
timeout=30, # UI-TARS peut prendre 3-5s + overhead reseau image_path = "/tmp/infigui_screen.png"
) screen_pil.save(image_path)
total_ms = (time.time() - t0) * 1000
if resp.status_code == 200: # Écrire la requête
data = resp.json() req = {
result = GroundingResult( "target": target_text,
x=data["x"], "description": target_description,
y=data["y"], "image_path": image_path,
method=data.get("method", "ui_tars"), "timestamp": time.time(),
confidence=data.get("confidence", 0.85), }
time_ms=data.get("time_ms", total_ms),
# Supprimer l'ancienne réponse
if os.path.exists(RESPONSE_FILE):
os.unlink(RESPONSE_FILE)
# Écrire la requête
with open(REQUEST_FILE, "w") as f:
json.dump(req, f)
# Attendre la réponse (max 30s)
for _ in range(300):
if os.path.exists(RESPONSE_FILE):
time.sleep(0.05) # Laisser le fichier se fermer
try:
with open(RESPONSE_FILE, "r") as f:
data = json.load(f)
os.unlink(RESPONSE_FILE)
break
except (json.JSONDecodeError, IOError):
continue
time.sleep(0.1)
else:
print(f"⚠️ [InfiGUI] Timeout 30s — worker ne répond pas")
return None
dt = (time.time() - t0) * 1000
if data.get("x") is not None:
print(f"🎯 [InfiGUI] ({data['x']}, {data['y']}) conf={data.get('confidence', 0):.2f} ({dt:.0f}ms)")
return GroundingResult(
x=data["x"], y=data["y"],
method="infigui",
confidence=data.get("confidence", 0.90),
time_ms=dt,
) )
print(f"[UI-TARS/client] '{target_text or target_description}' -> "
f"({result.x}, {result.y}) conf={result.confidence:.2f} "
f"({result.time_ms:.0f}ms)")
return result
elif resp.status_code == 422:
# Coordonnees non parsees
detail = resp.json().get("detail", "")
print(f"[UI-TARS/client] Pas de coordonnees parsees: {detail[:150]}")
return None
elif resp.status_code == 503:
print(f"[UI-TARS/client] Serveur pas encore pret (modele en chargement)")
return None
else: else:
print(f"[UI-TARS/client] Erreur HTTP {resp.status_code}: {resp.text[:200]}") print(f"⚠️ [InfiGUI] Pas trouvé ({dt:.0f}ms)")
return None return None
except requests.exceptions.ConnectionError:
self._server_available = False
print(f"[UI-TARS/client] Serveur non joignable sur {self.SERVER_URL}")
return None
except requests.exceptions.Timeout:
print(f"[UI-TARS/client] Timeout (>30s) pour '{target_text}'")
return None
except Exception as e: except Exception as e:
print(f"[UI-TARS/client] Erreur inattendue: {e}") print(f"⚠️ [InfiGUI] Erreur: {e}")
return None return None

233
docs/CARTOGRAPHY.md Normal file
View File

@@ -0,0 +1,233 @@
# Cartographie d'exécution — RPA Vision V3 (Léa)
> **Date** : 26 avril 2026
> **Objectif** : carte complète de ce qui est branché, ce qui ne l'est pas, et comment les données transitent.
> **Règle** : LIRE CE DOCUMENT AVANT TOUTE MODIFICATION DE CODE.
---
## 1. Point d'entrée : deux chemins disjoints
```
POST /api/v3/execute/start (execute.py:1528)
├── execution_mode = "verified" → run_workflow_verified() ← CHEMIN ORA
└── execution_mode = "basic"|"intelligent"|"debug" → execute_workflow_thread() ← CHEMIN LEGACY
```
**Il existe DEUX exécuteurs distincts** qui dupliquent le chargement des ancres, la boucle d'étapes, le grounding, la gestion d'erreurs. Ils ne partagent que `input_handler.py`.
---
## 2. Chemin LEGACY (modes basic/intelligent/debug)
```
[API] POST /execute/start (mode=intelligent)
→ [execute.py:145] execute_workflow_thread()
→ [execute.py:160] Charge steps depuis DB
→ BOUCLE sur chaque step:
├─ RÉFLEXE PRÉ-ÉTAPE (modes intelligent/debug)
│ → [input_handler.py:79] check_screen_for_patterns()
│ → UIPatternLibrary.find_pattern(ocr_text) ← BRANCHÉ
│ → [input_handler.py:129] handle_detected_pattern()
│ → EasyOCR full screen + clic bouton ← BRANCHÉ
├─ CHARGEMENT ANCRE [execute.py:222-256]
│ params['visual_anchor'] = {
│ screenshot: base64 du crop,
│ bounding_box: {x, y, width, height},
│ target_text: anchor.target_text, ← PEUT ÊTRE VIDE ("")
│ description: anchor.ocr_description ← PEUT ÊTRE VIDE ("")
│ }
├─ execute_action(action_type, params) [execute.py:278]
│ │
│ ├─ ACTION = click_anchor [execute.py:862-1096]
│ │ │
│ │ ├─ MODE basic: coordonnées statiques (bbox centre)
│ │ │
│ │ └─ MODE intelligent/debug:
│ │ ├─ target_text = anchor.target_text || step.label
│ │ │ Si target_text == "click_anchor" et screenshot_base64:
│ │ │ → _describe_anchor_image() (VLM qwen2.5vl:3b) ← BRANCHÉ
│ │ │
│ │ ├─ MÉTHODE 1: Template matching (cv2) ← BRANCHÉ
│ │ ├─ MÉTHODE 2: CLIP matching (RF-DETR + CLIP) ← BRANCHÉ
│ │ ├─ MÉTHODE 3: OCR → UI-TARS → VLM ← BRANCHÉ
│ │ └─ ÉCHEC: self-healing interactif ← BRANCHÉ
│ │
│ ├─ ACTION = type_text → safe_type_text() ← BRANCHÉ
│ ├─ ACTION = wait → sleep + pattern check ← BRANCHÉ
│ ├─ ACTION = keyboard_shortcut → pyautogui.hotkey() ← BRANCHÉ
│ ├─ ACTION = ai_analyze_text → Ollama ← BRANCHÉ
│ ├─ ACTION = extract_text → docTR OCR ← BRANCHÉ
│ └─ ACTION = hover/scroll/focus → coords statiques ← PAS DE GROUNDING
```
---
## 3. Chemin ORA (mode "verified")
```
[API] POST /execute/start (mode=verified)
→ [execute.py:1349] run_workflow_verified()
→ [execute.py:1380-1428] Charge steps + ancres (MÊME logique que legacy)
→ [execute.py:1433] ORALoop(verify_level='none', max_retries=2)
│ ^^^^^^^^^^^^^^^^^^^
│ VÉRIFICATION DÉSACTIVÉE EN DUR
→ [ORA:1478] ora.run_workflow(steps=ora_steps)
BOUCLE sur chaque step:
├─ [ORA:1258] OBSERVE: capture écran + pHash + titre fenêtre
├─ [ORA:1263] RÉFLEXE DIALOGUE (si pHash changé > 10)
│ → DialogHandler.handle_if_dialog(screenshot) ← BRANCHÉ
│ → EasyOCR full screen → mots-clés dialogues connus
│ → InfiGUI worker (/tmp/infigui_*)
│ → Fallback OCR clic
├─ [ORA:196] REASON: reason_workflow_step()
│ target_text = anchor.target_text || anchor.description
│ Si vide ou nom d'action → _describe_anchor_image() ← CORRIGÉ 26/04
│ Si encore vide → label (si pas un nom d'action)
├─ [ORA:1306] ACT → _act_click()
│ │
│ ├─ RPA_USE_FAST_PIPELINE=1 (défaut)
│ │ → FastSmartThinkPipeline
│ │ → FastDetector (RF-DETR 120ms + EasyOCR 192ms) ← BRANCHÉ
│ │ → SmartMatcher (texte+type+position+voisins <1ms) ← BRANCHÉ
│ │ → SignatureStore.lookup() (apprentissage) ← BRANCHÉ
│ │ → Score ≥ 0.90 → action directe ← BRANCHÉ
│ │ → Score 0.60-0.90 → ThinkArbiter
│ │ → UITarsGrounder → InfiGUI worker (/tmp) ← BRANCHÉ
│ │ → Score < 0.60 → ThinkArbiter seul ← BRANCHÉ
│ │ → ÉCHEC → _try_fallback()
│ │ → GroundingPipeline ← NON BRANCHÉ (jamais connecté)
│ │
│ ├─ FALLBACK template matching (cv2, >0.75) ← BRANCHÉ
│ ├─ FALLBACK OCR (_grounding_ocr) ← BRANCHÉ
│ └─ DERNIER RECOURS: coords statiques ← BRANCHÉ
├─ [ORA:1337] VÉRIFICATION TITRE (post-action)
│ → TitleVerifier → EasyOCR crop 45px ← BRANCHÉ
│ *** NE LIT RIEN EN VM (titre Windows dans le framebuffer) ← PROBLÈME
├─ [ORA:1358] VERIFY: verify(pre, post, decision)
│ *** DÉSACTIVÉ (verify_level='none') *** ← NON BRANCHÉ
└─ [ORA:1362] RECOVERY (5 stratégies)
*** JAMAIS ATTEINT *** ← NON BRANCHÉ
- _recover_element_not_found (wait+scroll+UI-TARS)
- _recover_overlay_blocking (pattern+Win+D)
- _recover_wrong_screen (Alt+Tab)
- _recover_no_effect (double-clic+décalage)
- _classify_error (4 types)
```
---
## 4. Trace du champ `target_text`
```
CAPTURE (VWB CapturePanel → capture.py:201-263)
→ OCR sur crop élargi (docTR)
→ VLM qwen2.5vl:3b décrit le crop
→ Si les deux échouent → target_text = ""
→ Aucune erreur remontée au frontend
STOCKAGE (DB)
→ VisualAnchor.target_text (nullable) = "" si non renseigné
CHARGEMENT (execute.py:1400-1428)
→ SI anchor.target_text existe et non vide → injecté dans visual_anchor
→ SINON → la clé 'target_text' N'EXISTE PAS dans le dict
LEGACY (execute.py:893-907)
→ target_text = anchor.get('target_text', '')
→ SI vide ET c'est un nom d'action → _describe_anchor_image() ← COMPENSE
→ SINON → fallback sur step_label
ORA (observe_reason_act.py:217) — CORRIGÉ LE 26 AVRIL
→ target_text = anchor.target_text || anchor.description
→ SI vide ou nom d'action → _describe_anchor_image() ← AJOUTÉ
→ SINON → label (si pas un nom d'action)
```
---
## 5. Fonctions existantes NON BRANCHÉES
| Fonction | Fichier | Raison |
|----------|---------|--------|
| `verify()` + `_classify_error()` + 5 `_recover_*()` | observe_reason_act.py | verify_level='none' en dur |
| `GroundingPipeline` (ancien) | pipeline.py | set_fallback_pipeline() jamais appelé |
| `TemplateMatcher` (classe centralisée) | template_matcher.py | Utilisé seulement par GroundingPipeline mort |
| `ShadowLearningHook` | shadow_learning_hook.py | Jamais importé dans aucun flux |
| `CognitiveContext` | working_memory.py | Mode instruction seulement |
| `VLM pre-check` | observe_reason_act.py | `if False:` en dur |
| hover/focus grounding | execute.py | Coords statiques uniquement |
| `grounding/server.py` (FastAPI :8200) | server.py | Crash CUDA, remplacé par worker fichiers |
---
## 6. Les 12 systèmes de grounding
| # | Système | Fichier | Branché ? |
|---|---------|---------|-----------|
| 1 | Template matching inline (legacy) | execute.py:914 | ✅ Legacy |
| 2 | Template matching inline (ORA) | ORA:1475 | ✅ ORA fallback |
| 3 | CLIP matching (IntelligentExecutor) | intelligent_executor.py | ✅ Legacy |
| 4 | OCR docTR (_grounding_ocr) | input_handler.py:430 | ✅ Legacy + ORA |
| 5 | UI-TARS Ollama (_grounding_ui_tars) | input_handler.py:513 | ✅ Legacy |
| 6 | VLM reasoning (_grounding_vlm) | input_handler.py:627 | ✅ Legacy seulement |
| 7 | FastDetector (RF-DETR + EasyOCR) | fast_detector.py | ✅ ORA |
| 8 | SmartMatcher | smart_matcher.py | ✅ ORA |
| 9 | ThinkArbiter → InfiGUI worker | think_arbiter.py + ui_tars_grounder.py | ✅ ORA |
| 10 | DialogHandler → InfiGUI | dialog_handler.py | ✅ ORA réflexe |
| 11 | GroundingPipeline (ancien) | pipeline.py | ❌ Jamais connecté |
| 12 | TemplateMatcher classe | template_matcher.py | ❌ Via GroundingPipeline mort |
---
## 7. Gestion des dialogues (2 systèmes parallèles)
| # | Système | Base de patterns | OCR | Clic | Utilisé par |
|---|---------|-----------------|-----|------|-------------|
| 1 | UIPatternLibrary + handle_detected_pattern | 28 patterns builtin | docTR/EasyOCR | OCR find bouton | Legacy |
| 2 | DialogHandler + KNOWN_DIALOGS | 15 titres connus | EasyOCR full screen | InfiGUI | ORA |
---
## 8. Budget VRAM (configuration actuelle)
| Composant | VRAM | Process |
|-----------|------|---------|
| InfiGUI-G1-3B (NF4) | 2.41 GB | Worker indépendant (/tmp) |
| RF-DETR Medium | 0.8 GB | Process Flask |
| EasyOCR | ~1 GB (GPU) | Process Flask |
| Ollama qwen2.5vl:3b (si appelé) | ~3.2 GB | Process Ollama |
| Chrome + système | ~1.3 GB | — |
| **Total max** | **~8.7 GB / 12 GB** | |
---
## 9. Fichiers critiques par ordre d'importance
1. `core/execution/observe_reason_act.py` — boucle ORA, _act_click, reason, verify
2. `visual_workflow_builder/backend/api_v3/execute.py` — API, chargement ancres, legacy executor
3. `core/grounding/fast_pipeline.py` — pipeline FAST→SMART→THINK
4. `core/grounding/ui_tars_grounder.py` — client InfiGUI worker
5. `core/grounding/infigui_worker.py` — worker InfiGUI (process indépendant)
6. `core/execution/input_handler.py` — OCR, UI-TARS Ollama, safe_type_text, patterns
7. `core/grounding/dialog_handler.py` — gestion dialogues ORA
8. `core/grounding/fast_detector.py` — RF-DETR + EasyOCR
9. `core/grounding/smart_matcher.py` — matching contextuel
10. `core/knowledge/ui_patterns.py` — patterns réflexes
---
> **Dernière mise à jour** : 26 avril 2026
> **Prochaine action** : rebrancher verify + recovery, converger les 2 exécuteurs, nettoyer le code mort.