Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 13s
security-audit / pip-audit (CVE dépendances) (push) Successful in 10s
security-audit / Scan secrets (grep) (push) Successful in 9s
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 16s
tests / Tests sécurité (critique) (push) Has been skipped
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
724 lines
26 KiB
Python
724 lines
26 KiB
Python
"""
|
|
Module partagé de saisie texte et gestion des dialogues.
|
|
|
|
Utilisé par les deux executors :
|
|
- VWB executor (visual_workflow_builder/backend/api_v3/execute.py)
|
|
- Core executor (core/execution/action_executor.py)
|
|
|
|
Garantit le même comportement AZERTY/VM/Citrix partout.
|
|
"""
|
|
|
|
import logging
|
|
import subprocess
|
|
import shutil
|
|
import time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import pyautogui
|
|
PYAUTOGUI_AVAILABLE = True
|
|
except ImportError:
|
|
PYAUTOGUI_AVAILABLE = False
|
|
|
|
|
|
def safe_type_text(text: str):
|
|
"""Saisie de texte compatible VM/Citrix et claviers AZERTY/QWERTY.
|
|
|
|
Priorité :
|
|
1. xdotool type avec refresh layout → traverse les VM spice/QEMU
|
|
2. Presse-papier (xclip) + Ctrl+V → fallback
|
|
3. pyautogui.write() → dernier recours
|
|
"""
|
|
if not text:
|
|
return
|
|
|
|
# Méthode 1 : xdotool type avec refresh du layout clavier
|
|
if shutil.which('xdotool') and shutil.which('setxkbmap'):
|
|
try:
|
|
subprocess.run(['setxkbmap', 'fr'], timeout=2)
|
|
subprocess.run(
|
|
['xdotool', 'type', '--delay', '0', '--clearmodifiers', '--', text],
|
|
timeout=max(30, len(text) * 0.05),
|
|
check=True
|
|
)
|
|
logger.debug(f"Saisie via xdotool type ({len(text)} car.)")
|
|
return
|
|
except Exception as e:
|
|
logger.debug(f"xdotool type échoué: {e}")
|
|
|
|
# Méthode 2 : Presse-papier
|
|
xclip = shutil.which('xclip')
|
|
if xclip and PYAUTOGUI_AVAILABLE:
|
|
try:
|
|
p = subprocess.Popen(
|
|
['xclip', '-selection', 'clipboard'],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.DEVNULL
|
|
)
|
|
p.stdin.write(text.encode('utf-8'))
|
|
p.stdin.close()
|
|
time.sleep(0.2)
|
|
pyautogui.hotkey('ctrl', 'v')
|
|
time.sleep(0.3)
|
|
logger.debug(f"Saisie via presse-papier ({len(text)} car.)")
|
|
return
|
|
except Exception as e:
|
|
logger.debug(f"xclip échoué: {e}")
|
|
|
|
# Méthode 3 : pyautogui
|
|
if PYAUTOGUI_AVAILABLE:
|
|
logger.warning("Saisie via pyautogui.write() (AZERTY non garanti)")
|
|
pyautogui.write(text, interval=0.02)
|
|
else:
|
|
logger.warning(f"Aucune méthode de saisie disponible pour: {text[:50]}")
|
|
|
|
|
|
def check_screen_for_patterns() -> Optional[Dict[str, Any]]:
|
|
"""Vérifie si l'écran contient un pattern UI connu (dialogue, popup).
|
|
|
|
Capture l'écran, extrait le texte via OCR, et cherche un pattern
|
|
dans la UIPatternLibrary.
|
|
|
|
Returns:
|
|
Dict avec le pattern trouvé, ou None.
|
|
"""
|
|
try:
|
|
from core.knowledge.ui_patterns import UIPatternLibrary
|
|
import mss
|
|
from PIL import Image
|
|
|
|
lib = UIPatternLibrary()
|
|
|
|
with mss.mss() as sct:
|
|
monitor = sct.monitors[0]
|
|
screenshot = sct.grab(monitor)
|
|
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
|
|
|
try:
|
|
# Essayer docTR d'abord (peut être importé depuis différents chemins)
|
|
try:
|
|
from services.ocr_service import ocr_extract_text
|
|
except ImportError:
|
|
from core.extraction.field_extractor import FieldExtractor
|
|
extractor = FieldExtractor()
|
|
ocr_extract_text = lambda img: extractor.extract_text_from_image(img)
|
|
|
|
ocr_text = ocr_extract_text(screen)
|
|
except ImportError:
|
|
logger.debug("OCR non disponible pour pattern check")
|
|
return None
|
|
|
|
if not ocr_text or len(ocr_text) < 5:
|
|
return None
|
|
|
|
pattern = lib.find_pattern(ocr_text)
|
|
if pattern and pattern['category'] in ('dialog', 'popup'):
|
|
logger.info(f"Pattern UI détecté: {pattern['pattern']} → {pattern['action']} '{pattern['target']}'")
|
|
return pattern
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Pattern check échoué: {e}")
|
|
return None
|
|
|
|
|
|
def handle_detected_pattern(pattern: Dict[str, Any]) -> bool:
|
|
"""Gère automatiquement un pattern UI détecté.
|
|
|
|
Cherche le bouton cible via OCR (position réelle sur l'écran).
|
|
100% vision — zéro coordonnée hardcodée.
|
|
|
|
Returns:
|
|
True si le pattern a été géré avec succès.
|
|
"""
|
|
if not PYAUTOGUI_AVAILABLE:
|
|
logger.warning("pyautogui non disponible — impossible de gérer le pattern")
|
|
return False
|
|
|
|
action = pattern.get('action')
|
|
target = pattern.get('target', '')
|
|
alternatives = pattern.get('alternatives', [])
|
|
|
|
if action == 'click':
|
|
candidates_labels = [target] + alternatives
|
|
|
|
try:
|
|
import mss
|
|
from PIL import Image
|
|
|
|
# Importer OCR (essayer les deux chemins)
|
|
try:
|
|
from services.ocr_service import ocr_extract_words
|
|
except ImportError:
|
|
from core.extraction.field_extractor import FieldExtractor
|
|
extractor = FieldExtractor()
|
|
def ocr_extract_words(img):
|
|
return extractor.extract_words_from_image(img)
|
|
|
|
with mss.mss() as sct:
|
|
monitor = sct.monitors[0]
|
|
screenshot = sct.grab(monitor)
|
|
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
|
|
|
words = ocr_extract_words(screen)
|
|
|
|
# Collecter tous les matchs, prendre le plus bas (bouton = bas du dialogue)
|
|
all_matches = []
|
|
|
|
for candidate in candidates_labels:
|
|
candidate_lower = candidate.lower()
|
|
for word in words:
|
|
word_text = word['text'].lower()
|
|
if len(word_text) < 2 or len(candidate_lower) < 2:
|
|
continue
|
|
if word_text == candidate_lower:
|
|
x1, y1, x2, y2 = word['bbox']
|
|
all_matches.append({
|
|
'text': word['text'],
|
|
'x': int((x1 + x2) / 2),
|
|
'y': int((y1 + y2) / 2),
|
|
'match_type': 'exact',
|
|
})
|
|
|
|
# Recherche partielle (lettre soulignée manquante)
|
|
if not all_matches:
|
|
for candidate in candidates_labels:
|
|
if len(candidate) > 3:
|
|
partial = candidate[1:].lower()
|
|
for word in words:
|
|
if partial in word['text'].lower():
|
|
x1, y1, x2, y2 = word['bbox']
|
|
all_matches.append({
|
|
'text': word['text'],
|
|
'x': int((x1 + x2) / 2),
|
|
'y': int((y1 + y2) / 2),
|
|
'match_type': 'partial',
|
|
})
|
|
|
|
if all_matches:
|
|
best = max(all_matches, key=lambda m: m['y'])
|
|
logger.info(f"Clic sur '{best['text']}' à ({best['x']}, {best['y']})")
|
|
pyautogui.click(best['x'], best['y'])
|
|
time.sleep(1.0)
|
|
return True
|
|
|
|
logger.info(f"Bouton '{target}' introuvable par OCR — appel VLM...")
|
|
vlm_result = vlm_reason_about_screen(
|
|
objective=f"Cliquer sur le bouton '{target}'",
|
|
context=f"Un dialogue '{pattern.get('pattern')}' est détecté"
|
|
)
|
|
if vlm_result and vlm_result.get('action') == 'click' and vlm_result.get('target'):
|
|
vlm_target = vlm_result['target']
|
|
for word in words:
|
|
if vlm_target.lower() in word['text'].lower():
|
|
x1, y1, x2, y2 = word['bbox']
|
|
x = int((x1 + x2) / 2)
|
|
y = int((y1 + y2) / 2)
|
|
logger.info(f"VLM → clic sur '{word['text']}' à ({x}, {y})")
|
|
pyautogui.click(x, y)
|
|
time.sleep(1.0)
|
|
return True
|
|
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.warning(f"OCR bouton échoué: {e}")
|
|
return False
|
|
|
|
elif action == 'hotkey':
|
|
keys = target.split('+')
|
|
logger.info(f"Raccourci automatique: {target}")
|
|
pyautogui.hotkey(*keys)
|
|
time.sleep(0.5)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def vlm_reason_about_screen(objective: str = "", context: str = "") -> Optional[Dict[str, Any]]:
|
|
"""Demande au VLM de raisonner sur l'écran actuel et proposer une action.
|
|
|
|
Utilisé quand les réflexes (patterns) ne suffisent pas.
|
|
Le VLM voit l'écran et décide quoi faire.
|
|
|
|
Args:
|
|
objective: Ce que Léa essaie de faire (ex: "cliquer sur Enregistrer")
|
|
context: Contexte additionnel (ex: "un dialogue est apparu")
|
|
|
|
Returns:
|
|
Dict avec 'action', 'target', 'reasoning' ou None si le VLM ne peut pas aider.
|
|
"""
|
|
try:
|
|
import mss
|
|
import requests
|
|
import json
|
|
import base64
|
|
import io
|
|
import os
|
|
from PIL import Image
|
|
|
|
with mss.mss() as sct:
|
|
monitor = sct.monitors[0]
|
|
screenshot = sct.grab(monitor)
|
|
screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
|
|
|
buffer = io.BytesIO()
|
|
screen.save(buffer, format='JPEG', quality=70)
|
|
image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
|
|
prompt = f"""Analyse cet écran et dis-moi quoi faire.
|
|
|
|
Objectif : {objective or "Interagir avec l'interface visible"}
|
|
Contexte : {context or "Aucun contexte supplémentaire"}
|
|
|
|
Réponds en JSON strict :
|
|
{{
|
|
"action": "click" ou "type" ou "wait" ou "nothing",
|
|
"target": "texte exact du bouton ou champ à cliquer",
|
|
"reasoning": "explication courte de ton choix"
|
|
}}
|
|
|
|
Si tu vois un dialogue ou une popup, indique quel bouton cliquer.
|
|
Si l'écran est normal sans action nécessaire, réponds action="nothing".
|
|
Réponds UNIQUEMENT le JSON, pas d'explication."""
|
|
|
|
ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
model = os.environ.get("RPA_REASONING_MODEL", "qwen2.5vl:7b")
|
|
|
|
response = requests.post(
|
|
f"{ollama_url}/api/generate",
|
|
json={
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"images": [image_b64],
|
|
"stream": False,
|
|
"options": {"temperature": 0.1, "num_predict": 200}
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
logger.warning(f"VLM reasoning failed: HTTP {response.status_code}")
|
|
return None
|
|
|
|
result = response.json()
|
|
text = result.get('response', '').strip()
|
|
|
|
import re
|
|
match = re.search(r'\{[\s\S]*\}', text)
|
|
if match:
|
|
parsed = json.loads(match.group())
|
|
logger.info(f"VLM reasoning: {parsed.get('action')} '{parsed.get('target')}' — {parsed.get('reasoning', '')[:80]}")
|
|
return parsed
|
|
|
|
logger.debug(f"VLM response not parseable: {text[:100]}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(f"VLM reasoning failed: {e}")
|
|
return None
|
|
|
|
|
|
def find_element_on_screen(
|
|
target_text: str,
|
|
target_description: str = "",
|
|
anchor_image_base64: Optional[str] = None,
|
|
anchor_bbox: Optional[Dict] = None,
|
|
) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Cherche un élément sur l'écran en utilisant 3 méthodes en cascade.
|
|
|
|
Niveau 1 — OCR (rapide, ~1s) : docTR pour trouver le texte exact
|
|
Niveau 2 — UI-TARS grounding (~3s) : modèle GUI spécialisé
|
|
Niveau 3 — VLM reasoning (~10s) : raisonnement + OCR de confirmation
|
|
|
|
Args:
|
|
target_text: Texte de l'élément à trouver (ex: "Demo", "Enregistrer")
|
|
target_description: Description plus longue (ex: "le dossier Demo sur le bureau")
|
|
anchor_image_base64: Image de référence de l'ancre (pour CLIP matching, réservé futur)
|
|
anchor_bbox: Position originale de l'ancre (pour désambiguïser les matchs multiples)
|
|
|
|
Returns:
|
|
{'x': int, 'y': int, 'method': str, 'confidence': float} ou None
|
|
"""
|
|
# Si le target_text est vide ou c'est juste le type d'action,
|
|
# utiliser le VLM pour décrire l'image de l'ancre
|
|
action_types = {'click_anchor', 'double_click_anchor', 'right_click_anchor',
|
|
'hover_anchor', 'focus_anchor', 'scroll_to_anchor'}
|
|
has_useful_text = target_text and target_text not in action_types
|
|
|
|
if not has_useful_text and anchor_image_base64:
|
|
desc = _describe_anchor_image(anchor_image_base64)
|
|
if desc:
|
|
logger.info(f"[Grounding] Ancre décrite par VLM: '{desc}'")
|
|
target_description = desc
|
|
if not has_useful_text:
|
|
target_text = desc
|
|
|
|
if not target_text and not target_description:
|
|
logger.debug("find_element_on_screen: ni target_text ni target_description fournis")
|
|
return None
|
|
|
|
search_label = target_description or target_text
|
|
logger.info(f"[Grounding] Recherche élément: '{search_label}' (cascade 3 niveaux)")
|
|
|
|
# ─── Niveau 1 — OCR (rapide, ~1s) ───
|
|
result = _grounding_ocr(target_text, anchor_bbox=anchor_bbox)
|
|
if result:
|
|
return result
|
|
|
|
# ─── Niveau 2 — UI-TARS grounding (~3s) ───
|
|
result = _grounding_ui_tars(target_text, target_description)
|
|
if result:
|
|
return result
|
|
|
|
# ─── Niveau 3 — VLM reasoning (~10s) ───
|
|
result = _grounding_vlm(target_text, target_description)
|
|
if result:
|
|
return result
|
|
|
|
logger.warning(f"[Grounding] ÉCHEC total pour '{search_label}' — aucune méthode n'a trouvé l'élément")
|
|
return None
|
|
|
|
|
|
def _describe_anchor_image(anchor_image_base64: str) -> Optional[str]:
|
|
"""Demande au VLM de décrire l'image de l'ancre en quelques mots.
|
|
|
|
Utilisé quand le label est vide — le VLM regarde le crop de l'ancre
|
|
et décrit ce qu'il voit ("folder icon named Demo", "Save button", etc.)
|
|
pour que UI-TARS puisse chercher cet élément sur l'écran complet.
|
|
"""
|
|
try:
|
|
import requests
|
|
import os
|
|
|
|
if ',' in anchor_image_base64:
|
|
anchor_image_base64 = anchor_image_base64.split(',', 1)[1]
|
|
|
|
ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
model = "qwen2.5vl:3b"
|
|
|
|
logger.info(f"[Grounding] Description ancre via {model}...")
|
|
response = requests.post(
|
|
f"{ollama_url}/api/generate",
|
|
json={
|
|
"model": model,
|
|
"prompt": "Describe this UI element in 5 words maximum. Just the element name, nothing else. Example: 'folder icon named Demo' or 'Save button' or 'Chrome browser icon'",
|
|
"images": [anchor_image_base64],
|
|
"stream": False,
|
|
"options": {"temperature": 0.1, "num_predict": 20}
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
desc = response.json().get('response', '').strip().strip('"').strip("'")
|
|
if desc and len(desc) > 2:
|
|
return desc
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[Grounding] Description ancre échouée: {e}")
|
|
return None
|
|
|
|
|
|
def _capture_screen():
|
|
"""Capture l'écran principal et retourne (PIL.Image, width, height)."""
|
|
try:
|
|
import mss
|
|
from PIL import Image as PILImage
|
|
|
|
with mss.mss() as sct:
|
|
monitor = sct.monitors[0]
|
|
screenshot = sct.grab(monitor)
|
|
screen = PILImage.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX')
|
|
return screen, monitor['width'], monitor['height']
|
|
except Exception as e:
|
|
logger.debug(f"Capture écran échouée: {e}")
|
|
return None, 0, 0
|
|
|
|
|
|
def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
|
|
"""Niveau 1 — Cherche le texte par OCR (docTR). ~1s.
|
|
|
|
Collecte TOUS les matchs et choisit le plus pertinent :
|
|
- Si anchor_bbox fourni → le plus proche de la position originale
|
|
- Sinon → le plus proche du centre de l'écran (zone contenu)
|
|
"""
|
|
if not target_text:
|
|
return None
|
|
|
|
try:
|
|
screen, screen_w, screen_h = _capture_screen()
|
|
if screen is None:
|
|
return None
|
|
|
|
try:
|
|
from services.ocr_service import ocr_extract_words
|
|
except ImportError:
|
|
from core.extraction.field_extractor import FieldExtractor
|
|
extractor = FieldExtractor()
|
|
def ocr_extract_words(img):
|
|
return extractor.extract_words_from_image(img)
|
|
|
|
words = ocr_extract_words(screen)
|
|
if not words:
|
|
logger.debug("[Grounding/OCR] Aucun mot détecté")
|
|
return None
|
|
|
|
target_lower = target_text.lower()
|
|
all_matches = []
|
|
|
|
# Collecter tous les matchs
|
|
for word in words:
|
|
word_lower = word['text'].lower()
|
|
x1, y1, x2, y2 = word['bbox']
|
|
cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2)
|
|
|
|
if word_lower == target_lower:
|
|
all_matches.append({'text': word['text'], 'x': cx, 'y': cy, 'type': 'exact', 'conf': 0.95})
|
|
elif len(word_lower) >= 3 and len(target_lower) >= 3:
|
|
if target_lower in word_lower or word_lower in target_lower:
|
|
# Pénaliser les matchs partiels trop courts par rapport au target
|
|
ratio = len(word_lower) / max(len(target_lower), 1)
|
|
conf = 0.80 if ratio > 0.5 else 0.50
|
|
all_matches.append({'text': word['text'], 'x': cx, 'y': cy, 'type': 'partial', 'conf': conf})
|
|
|
|
# Matching lettre initiale manquante
|
|
if not all_matches and len(target_lower) > 3:
|
|
partial = target_lower[1:]
|
|
for word in words:
|
|
if partial in word['text'].lower():
|
|
x1, y1, x2, y2 = word['bbox']
|
|
all_matches.append({'text': word['text'], 'x': int((x1+x2)/2), 'y': int((y1+y2)/2), 'type': 'partial_cut', 'conf': 0.70})
|
|
|
|
if not all_matches:
|
|
logger.debug(f"[Grounding/OCR] '{target_text}' non trouvé parmi {len(words)} mots")
|
|
return None
|
|
|
|
# Choisir le meilleur match
|
|
if len(all_matches) == 1:
|
|
best = all_matches[0]
|
|
elif anchor_bbox:
|
|
# Prendre le plus proche de la position originale de l'ancre
|
|
orig_x = anchor_bbox.get('x', 0) + anchor_bbox.get('width', 0) / 2
|
|
orig_y = anchor_bbox.get('y', 0) + anchor_bbox.get('height', 0) / 2
|
|
best = min(all_matches, key=lambda m: ((m['x'] - orig_x)**2 + (m['y'] - orig_y)**2))
|
|
else:
|
|
# Prendre le plus central (zone contenu, pas les barres de titre)
|
|
center_x, center_y = screen_w / 2, screen_h / 2
|
|
best = min(all_matches, key=lambda m: ((m['x'] - center_x)**2 + (m['y'] - center_y)**2))
|
|
|
|
for m in all_matches:
|
|
sel = " ← CHOISI" if m is best else ""
|
|
logger.info(f" [OCR] Candidat: '{m['text']}' à ({m['x']}, {m['y']}) [{m['type']}]{sel}")
|
|
|
|
return {'x': best['x'], 'y': best['y'], 'method': 'ocr', 'confidence': best['conf']}
|
|
|
|
except Exception as e:
|
|
logger.debug(f"[Grounding/OCR] Erreur: {e}")
|
|
return None
|
|
|
|
|
|
def _grounding_ui_tars(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]:
|
|
"""Niveau 2 — UI-TARS grounding visuel (~3s)."""
|
|
try:
|
|
import requests
|
|
import base64
|
|
import io
|
|
import re
|
|
import os
|
|
|
|
screen, screen_w, screen_h = _capture_screen()
|
|
if screen is None:
|
|
return None
|
|
|
|
# Encoder le screenshot en base64
|
|
buffer = io.BytesIO()
|
|
screen.save(buffer, format='JPEG', quality=70)
|
|
image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8')
|
|
|
|
# Construire le prompt pour UI-TARS
|
|
click_target = target_description or target_text
|
|
prompt = f"click on {click_target}"
|
|
|
|
ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434")
|
|
model = "0000/ui-tars-1.5-7b-q8_0:7b"
|
|
|
|
logger.info(f"[Grounding/UI-TARS] Envoi à {model}: '{prompt}'")
|
|
|
|
response = requests.post(
|
|
f"{ollama_url}/api/generate",
|
|
json={
|
|
"model": model,
|
|
"prompt": prompt,
|
|
"images": [image_b64],
|
|
"stream": False,
|
|
"options": {"temperature": 0.1, "num_predict": 50}
|
|
},
|
|
timeout=30
|
|
)
|
|
|
|
if response.status_code != 200:
|
|
logger.warning(f"[Grounding/UI-TARS] HTTP {response.status_code}")
|
|
return None
|
|
|
|
result = response.json()
|
|
text = result.get('response', '').strip()
|
|
logger.debug(f"[Grounding/UI-TARS] Réponse brute: {text[:200]}")
|
|
|
|
# Parser les coordonnées de UI-TARS
|
|
coords = _parse_ui_tars_coordinates(text, screen_w, screen_h)
|
|
if coords:
|
|
x, y = coords
|
|
# Valider que les coordonnées sont dans l'écran
|
|
if 0 <= x <= screen_w and 0 <= y <= screen_h:
|
|
logger.info(f"[Grounding/UI-TARS] Grounding → ({x}, {y})")
|
|
return {'x': x, 'y': y, 'method': 'ui_tars', 'confidence': 0.85}
|
|
else:
|
|
logger.warning(f"[Grounding/UI-TARS] Coordonnées hors écran: ({x}, {y}) pour {screen_w}x{screen_h}")
|
|
return None
|
|
|
|
logger.debug(f"[Grounding/UI-TARS] Pas de coordonnées parsées dans: {text[:100]}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(f"[Grounding/UI-TARS] Erreur: {e}")
|
|
return None
|
|
|
|
|
|
def _parse_ui_tars_coordinates(text: str, screen_w: int, screen_h: int) -> Optional[tuple]:
|
|
"""Parse les coordonnées retournées par UI-TARS.
|
|
|
|
UI-TARS peut retourner :
|
|
- Coordonnées normalisées (0-1000) : "click at (500, 300)"
|
|
- Coordonnées en pixels : "click at (960, 540)"
|
|
- Format (x, y) ou [x, y] ou x,y
|
|
- Format "Action: click\nCoordinate: (500, 300)" ou "[500, 300]"
|
|
|
|
Returns:
|
|
(x_pixel, y_pixel) ou None
|
|
"""
|
|
import re
|
|
|
|
# Chercher des patterns de coordonnées
|
|
patterns = [
|
|
r'Coordinate:\s*\[?\(?\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)?\]?',
|
|
r'click\s+(?:at\s+)?\[?\(?\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)?\]?',
|
|
r'\(\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)',
|
|
r'\[\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\]',
|
|
]
|
|
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text, re.IGNORECASE)
|
|
if match:
|
|
raw_x = float(match.group(1))
|
|
raw_y = float(match.group(2))
|
|
|
|
# UI-TARS utilise souvent des coordonnées normalisées 0-1000
|
|
if raw_x <= 1000 and raw_y <= 1000 and (raw_x > 1 or raw_y > 1):
|
|
# Probablement normalisées sur 1000
|
|
x = int(raw_x * screen_w / 1000)
|
|
y = int(raw_y * screen_h / 1000)
|
|
elif raw_x <= 1.0 and raw_y <= 1.0:
|
|
# Normalisées 0-1
|
|
x = int(raw_x * screen_w)
|
|
y = int(raw_y * screen_h)
|
|
else:
|
|
# Pixels directs
|
|
x = int(raw_x)
|
|
y = int(raw_y)
|
|
|
|
return (x, y)
|
|
|
|
return None
|
|
|
|
|
|
def _grounding_vlm(target_text: str, target_description: str = "") -> Optional[Dict[str, Any]]:
|
|
"""Niveau 3 — VLM reasoning + confirmation OCR (~10s)."""
|
|
try:
|
|
search_label = target_description or target_text
|
|
|
|
vlm_result = vlm_reason_about_screen(
|
|
objective=f"Cliquer sur {search_label}",
|
|
context=f"Je cherche l'élément '{target_text}' sur l'écran pour cliquer dessus"
|
|
)
|
|
|
|
if not vlm_result:
|
|
logger.debug("[Grounding/VLM] VLM n'a pas retourné de résultat")
|
|
return None
|
|
|
|
if vlm_result.get('action') != 'click' or not vlm_result.get('target'):
|
|
logger.debug(f"[Grounding/VLM] VLM action={vlm_result.get('action')}, pas un clic")
|
|
return None
|
|
|
|
vlm_target = vlm_result['target']
|
|
logger.info(f"[Grounding/VLM] VLM suggère de cliquer sur: '{vlm_target}'")
|
|
|
|
# Confirmation par OCR : chercher le target VLM sur l'écran
|
|
screen, screen_w, screen_h = _capture_screen()
|
|
if screen is None:
|
|
return None
|
|
|
|
try:
|
|
try:
|
|
from services.ocr_service import ocr_extract_words
|
|
except ImportError:
|
|
from core.extraction.field_extractor import FieldExtractor
|
|
extractor = FieldExtractor()
|
|
def ocr_extract_words(img):
|
|
return extractor.extract_words_from_image(img)
|
|
|
|
words = ocr_extract_words(screen)
|
|
|
|
vlm_target_lower = vlm_target.lower()
|
|
for word in words:
|
|
if vlm_target_lower in word['text'].lower() or word['text'].lower() in vlm_target_lower:
|
|
x1, y1, x2, y2 = word['bbox']
|
|
x = int((x1 + x2) / 2)
|
|
y = int((y1 + y2) / 2)
|
|
logger.info(f"[Grounding/VLM] Confirmé par OCR: '{word['text']}' à ({x}, {y})")
|
|
return {'x': x, 'y': y, 'method': 'vlm', 'confidence': 0.75}
|
|
|
|
logger.debug(f"[Grounding/VLM] Target VLM '{vlm_target}' non trouvé par OCR")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(f"[Grounding/VLM] OCR de confirmation échoué: {e}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.debug(f"[Grounding/VLM] Erreur: {e}")
|
|
return None
|
|
|
|
|
|
def post_execution_cleanup(execution_mode: str = 'debug'):
|
|
"""Vérifie l'écran après exécution et gère les dialogues restants.
|
|
|
|
Appelé après la dernière étape d'un workflow pour laisser l'écran propre.
|
|
"""
|
|
if execution_mode not in ('intelligent', 'debug'):
|
|
return
|
|
|
|
logger.info("Vérification écran final...")
|
|
time.sleep(1.0)
|
|
for _ in range(3):
|
|
detected = check_screen_for_patterns()
|
|
if detected:
|
|
logger.info(f"Dialogue résiduel détecté: {detected.get('pattern')}")
|
|
handle_detected_pattern(detected)
|
|
time.sleep(1.0)
|
|
else:
|
|
vlm_result = vlm_reason_about_screen(
|
|
objective="Vérifier que l'écran est propre après l'exécution",
|
|
context="Le workflow vient de se terminer"
|
|
)
|
|
if vlm_result and vlm_result.get('action') in ('click', 'type'):
|
|
logger.info(f"VLM post-workflow: {vlm_result.get('action')} '{vlm_result.get('target')}'")
|
|
break
|