""" Module partagé de saisie texte et gestion des dialogues. Utilisé par les deux executors : - VWB executor (visual_workflow_builder/backend/api_v3/execute.py) - Core executor (core/execution/action_executor.py) Garantit le même comportement AZERTY/VM/Citrix partout. """ import logging import subprocess import shutil import time from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) try: import pyautogui PYAUTOGUI_AVAILABLE = True except ImportError: PYAUTOGUI_AVAILABLE = False try: import mss MSS_AVAILABLE = True except ImportError: MSS_AVAILABLE = False try: from PIL import Image as PILImage PIL_AVAILABLE = True except ImportError: PIL_AVAILABLE = False def safe_type_text(text: str): """Saisie de texte compatible VM/Citrix et claviers AZERTY/QWERTY. Priorité : 1. xdotool type avec refresh layout → traverse les VM spice/QEMU 2. Presse-papier (xclip) + Ctrl+V → fallback 3. pyautogui.write() → dernier recours """ if not text: return # Méthode 1 : xdotool type avec refresh du layout clavier if shutil.which('xdotool') and shutil.which('setxkbmap'): try: subprocess.run(['setxkbmap', 'fr'], timeout=2) subprocess.run( ['xdotool', 'type', '--delay', '0', '--clearmodifiers', '--', text], timeout=max(30, len(text) * 0.05), check=True ) logger.debug(f"Saisie via xdotool type ({len(text)} car.)") return except Exception as e: logger.debug(f"xdotool type échoué: {e}") # Méthode 2 : Presse-papier xclip = shutil.which('xclip') if xclip and PYAUTOGUI_AVAILABLE: try: p = subprocess.Popen( ['xclip', '-selection', 'clipboard'], stdin=subprocess.PIPE, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ) p.stdin.write(text.encode('utf-8')) p.stdin.close() time.sleep(0.2) pyautogui.hotkey('ctrl', 'v') time.sleep(0.3) logger.debug(f"Saisie via presse-papier ({len(text)} car.)") return except Exception as e: logger.debug(f"xclip échoué: {e}") # Méthode 3 : pyautogui if PYAUTOGUI_AVAILABLE: logger.warning("Saisie via pyautogui.write() (AZERTY non garanti)") pyautogui.write(text, interval=0.02) else: logger.warning(f"Aucune méthode de saisie disponible pour: {text[:50]}") def check_screen_for_patterns() -> Optional[Dict[str, Any]]: """Vérifie si l'écran contient un pattern UI connu (dialogue, popup). Capture l'écran, extrait le texte via OCR, et cherche un pattern dans la UIPatternLibrary. Returns: Dict avec le pattern trouvé, ou None. """ try: from core.knowledge.ui_patterns import UIPatternLibrary import mss from PIL import Image lib = UIPatternLibrary() with mss.mss() as sct: monitor = sct.monitors[0] screenshot = sct.grab(monitor) screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') try: # Essayer docTR d'abord (peut être importé depuis différents chemins) try: from services.ocr_service import ocr_extract_text except ImportError: from core.extraction.field_extractor import FieldExtractor extractor = FieldExtractor() ocr_extract_text = lambda img: extractor.extract_text_from_image(img) ocr_text = ocr_extract_text(screen) except ImportError: logger.debug("OCR non disponible pour pattern check") return None if not ocr_text or len(ocr_text) < 5: return None pattern = lib.find_pattern(ocr_text) if pattern and pattern['category'] in ('dialog', 'popup'): print(f"🧠 [PatternCheck] Détecté: '{pattern['pattern']}' → {pattern['action']} '{pattern['target']}'") return pattern return None except Exception as e: print(f"⚠️ [PatternCheck] Erreur: {e}") return None def handle_detected_pattern(pattern: Dict[str, Any]) -> bool: """Gère automatiquement un pattern UI détecté. Cherche le bouton cible via OCR (position réelle sur l'écran). 100% vision — zéro coordonnée hardcodée. Returns: True si le pattern a été géré avec succès. """ if not PYAUTOGUI_AVAILABLE: logger.warning("pyautogui non disponible — impossible de gérer le pattern") return False action = pattern.get('action') target = pattern.get('target', '') alternatives = pattern.get('alternatives', []) if action == 'click': candidates_labels = [target] + alternatives print(f"🔧 [Réflexe/handle] Recherche bouton parmi: {candidates_labels}") try: import mss import numpy as np from PIL import Image with mss.mss() as sct: monitor = sct.monitors[0] screenshot = sct.grab(monitor) screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') # EasyOCR (rapide, bonne qualité GUI) avec fallback docTR. # gpu=True : harmonisé avec dialog_handler.py et title_verifier.py. # Coût VRAM ~0.5 GB, sous le budget RTX 5070 (cf. deploy/VRAM_BUDGET.md). words = [] try: import easyocr _reader = easyocr.Reader(['fr', 'en'], gpu=True, verbose=False) results = _reader.readtext(np.array(screen)) for (bbox_pts, text, conf) in results: if not text or len(text.strip()) < 1: continue x1 = int(min(p[0] for p in bbox_pts)) y1 = int(min(p[1] for p in bbox_pts)) x2 = int(max(p[0] for p in bbox_pts)) y2 = int(max(p[1] for p in bbox_pts)) words.append({'text': text.strip(), 'bbox': [x1, y1, x2, y2]}) except ImportError: try: from services.ocr_service import ocr_extract_words words = ocr_extract_words(screen) or [] except ImportError: pass print(f"🔧 [Réflexe/handle] {len(words)} mots OCR détectés") # Collecter tous les matchs, prendre le plus bas (bouton = bas du dialogue) all_matches = [] for candidate in candidates_labels: candidate_lower = candidate.lower() for word in words: word_text = word['text'].lower() if len(word_text) < 2 or len(candidate_lower) < 2: continue # Match exact ou inclusion if word_text == candidate_lower or candidate_lower in word_text or word_text in candidate_lower: x1, y1, x2, y2 = word['bbox'] all_matches.append({ 'text': word['text'], 'x': int((x1 + x2) / 2), 'y': int((y1 + y2) / 2), 'candidate': candidate, }) if all_matches: best = max(all_matches, key=lambda m: m['y']) print(f"✅ [Réflexe/handle] Clic sur '{best['text']}' à ({best['x']}, {best['y']})") pyautogui.click(best['x'], best['y']) time.sleep(1.0) return True print(f"⚠️ [Réflexe/handle] Bouton '{target}' introuvable parmi {[w['text'] for w in words[:15]]}") return False except Exception as e: print(f"⚠️ [Réflexe/handle] Erreur: {e}") return False elif action == 'hotkey': keys = target.split('+') logger.info(f"Raccourci automatique: {target}") pyautogui.hotkey(*keys) time.sleep(0.5) return True return False def vlm_reason_about_screen(objective: str = "", context: str = "") -> Optional[Dict[str, Any]]: """Demande au VLM de raisonner sur l'écran actuel et proposer une action. Utilisé quand les réflexes (patterns) ne suffisent pas. Le VLM voit l'écran et décide quoi faire. Args: objective: Ce que Léa essaie de faire (ex: "cliquer sur Enregistrer") context: Contexte additionnel (ex: "un dialogue est apparu") Returns: Dict avec 'action', 'target', 'reasoning' ou None si le VLM ne peut pas aider. """ try: import mss import requests import json import base64 import io import os from PIL import Image with mss.mss() as sct: monitor = sct.monitors[0] screenshot = sct.grab(monitor) screen = Image.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') buffer = io.BytesIO() screen.save(buffer, format='JPEG', quality=70) image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') prompt = f"""Analyse cet écran et dis-moi quoi faire. Objectif : {objective or "Interagir avec l'interface visible"} Contexte : {context or "Aucun contexte supplémentaire"} Réponds en JSON strict : {{ "action": "click" ou "type" ou "wait" ou "nothing", "target": "texte exact du bouton ou champ à cliquer", "reasoning": "explication courte de ton choix" }} Si tu vois un dialogue ou une popup, indique quel bouton cliquer. Si l'écran est normal sans action nécessaire, réponds action="nothing". Réponds UNIQUEMENT le JSON, pas d'explication.""" ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") model = os.environ.get("RPA_REASONING_MODEL", "qwen2.5vl:7b") response = requests.post( f"{ollama_url}/api/generate", json={ "model": model, "prompt": prompt, "images": [image_b64], "stream": False, "options": {"temperature": 0.1, "num_predict": 200} }, timeout=30 ) if response.status_code != 200: logger.warning(f"VLM reasoning failed: HTTP {response.status_code}") return None result = response.json() text = result.get('response', '').strip() import re match = re.search(r'\{[\s\S]*\}', text) if match: parsed = json.loads(match.group()) logger.info(f"VLM reasoning: {parsed.get('action')} '{parsed.get('target')}' — {parsed.get('reasoning', '')[:80]}") return parsed logger.debug(f"VLM response not parseable: {text[:100]}") return None except Exception as e: logger.debug(f"VLM reasoning failed: {e}") return None def find_element_on_screen( target_text: str, target_description: str = "", anchor_image_base64: Optional[str] = None, anchor_bbox: Optional[Dict] = None, monitor_idx: Optional[int] = None, ) -> Optional[Dict[str, Any]]: """ Cherche un élément sur l'écran en utilisant 3 méthodes en cascade. Niveau 1 — OCR (rapide, ~1s) : docTR pour trouver le texte exact Niveau 2 — UI-TARS grounding (~3s) : modèle GUI spécialisé Niveau 3 — VLM reasoning (~10s) : raisonnement + OCR de confirmation Args: target_text: Texte de l'élément à trouver (ex: "Demo", "Enregistrer") target_description: Description plus longue (ex: "le dossier Demo sur le bureau") anchor_image_base64: Image de référence de l'ancre (pour CLIP matching, réservé futur) anchor_bbox: Position originale de l'ancre (pour désambiguïser les matchs multiples) monitor_idx: Index logique 0..N-1 du monitor à scruter. None = composite legacy. Returns: {'x': int, 'y': int, 'method': str, 'confidence': float} ou None """ # Si le target_text est vide ou c'est juste le type d'action, # utiliser le VLM pour décrire l'image de l'ancre action_types = {'click_anchor', 'double_click_anchor', 'right_click_anchor', 'hover_anchor', 'focus_anchor', 'scroll_to_anchor'} has_useful_text = target_text and target_text not in action_types if not has_useful_text and anchor_image_base64: desc = _describe_anchor_image(anchor_image_base64) if desc: logger.info(f"[Grounding] Ancre décrite par VLM: '{desc}'") target_description = desc if not has_useful_text: target_text = desc if not target_text and not target_description: logger.debug("find_element_on_screen: ni target_text ni target_description fournis") return None # Propager monitor_idx au niveau OCR via anchor_bbox (sans muter l'argument original) if monitor_idx is not None and anchor_bbox is not None: anchor_bbox = dict(anchor_bbox) # copie pour ne pas muter l'argument anchor_bbox["monitor_idx"] = monitor_idx elif monitor_idx is not None: anchor_bbox = {"monitor_idx": monitor_idx} search_label = target_description or target_text logger.info(f"[Grounding] Recherche élément: '{search_label}' (cascade 3 niveaux)") # ─── Niveau 1 — OCR (rapide, ~1s) ─── result = _grounding_ocr(target_text, anchor_bbox=anchor_bbox) if result: return result # ─── Niveau 2 — UI-TARS grounding (~3s) ─── result = _grounding_ui_tars(target_text, target_description, monitor_idx=monitor_idx) if result: return result # ─── Niveau 3 — VLM reasoning (~10s) ─── result = _grounding_vlm(target_text, target_description, monitor_idx=monitor_idx) if result: return result logger.warning(f"[Grounding] ÉCHEC total pour '{search_label}' — aucune méthode n'a trouvé l'élément") return None def _describe_anchor_image(anchor_image_base64: str) -> Optional[str]: """Demande au VLM de décrire l'image de l'ancre en quelques mots. Utilisé quand le label est vide — le VLM regarde le crop de l'ancre et décrit ce qu'il voit ("folder icon named Demo", "Save button", etc.) pour que UI-TARS puisse chercher cet élément sur l'écran complet. """ try: import requests import os if ',' in anchor_image_base64: anchor_image_base64 = anchor_image_base64.split(',', 1)[1] ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") model = "qwen2.5vl:3b" logger.info(f"[Grounding] Description ancre via {model}...") response = requests.post( f"{ollama_url}/api/generate", json={ "model": model, "prompt": "Describe this UI element in 5 words maximum. Just the element name, nothing else. Example: 'folder icon named Demo' or 'Save button' or 'Chrome browser icon'", "images": [anchor_image_base64], "stream": False, "options": {"temperature": 0.1, "num_predict": 20} }, timeout=30 ) if response.status_code == 200: desc = response.json().get('response', '').strip().strip('"').strip("'") if desc and len(desc) > 2: return desc return None except Exception as e: logger.warning(f"[Grounding] Description ancre échouée: {e}") return None def _capture_screen(monitor_idx=None): """Capture l'écran et retourne (PIL.Image, width, height, offset_x, offset_y). Args: monitor_idx: Index logique 0..N-1 du monitor à capturer (cf. screeninfo). Si None : capture composite (mss.monitors[0]) — comportement legacy. Returns: (image, w, h, offset_x, offset_y). offset = (0,0) en mode composite. """ try: with mss.mss() as sct: if monitor_idx is None: # Comportement actuel : composite tous écrans monitor = sct.monitors[0] offset_x, offset_y = 0, 0 else: # mss skip monitors[0] (composite). Index logique 0 → mss.monitors[1]. mss_idx = int(monitor_idx) + 1 if mss_idx >= len(sct.monitors): logger.warning( "mss.monitors[%d] hors limites (n=%d) — fallback composite", mss_idx, len(sct.monitors), ) monitor = sct.monitors[0] offset_x, offset_y = 0, 0 else: monitor = sct.monitors[mss_idx] offset_x = int(monitor.get("left", 0)) offset_y = int(monitor.get("top", 0)) screenshot = sct.grab(monitor) screen = PILImage.frombytes('RGB', screenshot.size, screenshot.bgra, 'raw', 'BGRX') return screen, monitor['width'], monitor['height'], offset_x, offset_y except Exception as e: logger.debug(f"Capture écran échouée: {e}") return None, 0, 0, 0, 0 def _grounding_ocr(target_text: str, anchor_bbox: Optional[Dict] = None) -> Optional[Dict[str, Any]]: """Niveau 1 — Cherche le texte par OCR (docTR). ~1s. Collecte TOUS les matchs et choisit le plus pertinent : - Si anchor_bbox fourni → le plus proche de la position originale - Sinon → le plus proche du centre de l'écran (zone contenu) """ logger.debug(f"[Grounding/OCR] target='{target_text}' bbox={anchor_bbox}") if not target_text: return None try: monitor_idx_param = anchor_bbox.get("monitor_idx") if anchor_bbox else None screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx_param) if screen is None: return None try: from services.ocr_service import ocr_extract_words except ImportError: from core.extraction.field_extractor import FieldExtractor extractor = FieldExtractor() def ocr_extract_words(img): return extractor.extract_words_from_image(img) words = ocr_extract_words(screen) if not words: logger.debug("[Grounding/OCR] Aucun mot détecté") return None target_lower = target_text.lower() all_matches = [] # Collecter tous les matchs for word in words: word_lower = word['text'].lower() x1, y1, x2, y2 = word['bbox'] cx, cy = int((x1 + x2) / 2), int((y1 + y2) / 2) if word_lower == target_lower: all_matches.append({'text': word['text'], 'x': cx, 'y': cy, 'type': 'exact', 'conf': 0.95}) elif len(word_lower) >= 3 and len(target_lower) >= 3: if target_lower in word_lower or word_lower in target_lower: # Pénaliser les matchs partiels trop courts par rapport au target ratio = len(word_lower) / max(len(target_lower), 1) conf = 0.80 if ratio > 0.5 else 0.50 all_matches.append({'text': word['text'], 'x': cx, 'y': cy, 'type': 'partial', 'conf': conf}) # Matching lettre initiale manquante if not all_matches and len(target_lower) > 3: partial = target_lower[1:] for word in words: if partial in word['text'].lower(): x1, y1, x2, y2 = word['bbox'] all_matches.append({'text': word['text'], 'x': int((x1+x2)/2), 'y': int((y1+y2)/2), 'type': 'partial_cut', 'conf': 0.70}) if not all_matches: logger.debug(f"[Grounding/OCR] '{target_text}' non trouvé parmi {len(words)} mots") return None # Choisir le meilleur match if len(all_matches) == 1: best = all_matches[0] elif anchor_bbox: # Prendre le plus proche de la position originale de l'ancre orig_x = anchor_bbox.get('x', 0) + anchor_bbox.get('width', 0) / 2 orig_y = anchor_bbox.get('y', 0) + anchor_bbox.get('height', 0) / 2 best = min(all_matches, key=lambda m: ((m['x'] - orig_x)**2 + (m['y'] - orig_y)**2)) else: # Prendre le plus central (zone contenu, pas les barres de titre) center_x, center_y = screen_w / 2, screen_h / 2 best = min(all_matches, key=lambda m: ((m['x'] - center_x)**2 + (m['y'] - center_y)**2)) for m in all_matches: sel = " ← CHOISI" if m is best else "" logger.info(f" [OCR] Candidat: '{m['text']}' à ({m['x']}, {m['y']}) [{m['type']}]{sel}") return {'x': best['x'] + ox, 'y': best['y'] + oy, 'method': 'ocr', 'confidence': best['conf']} except Exception as e: logger.debug(f"[Grounding/OCR] Erreur: {e}") return None def _grounding_ui_tars(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]: """Niveau 2 — UI-TARS grounding visuel (~3s).""" try: import requests import base64 import io import re import os screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx) if screen is None: return None # Encoder le screenshot en base64 buffer = io.BytesIO() screen.save(buffer, format='JPEG', quality=70) image_b64 = base64.b64encode(buffer.getvalue()).decode('utf-8') # Construire le prompt pour UI-TARS click_target = target_description or target_text prompt = f"click on {click_target}" ollama_url = os.environ.get("OLLAMA_URL", "http://localhost:11434") model = "0000/ui-tars-1.5-7b-q8_0:7b" logger.info(f"[Grounding/UI-TARS] Envoi à {model}: '{prompt}'") response = requests.post( f"{ollama_url}/api/generate", json={ "model": model, "prompt": prompt, "images": [image_b64], "stream": False, "options": {"temperature": 0.1, "num_predict": 50} }, timeout=30 ) if response.status_code != 200: logger.warning(f"[Grounding/UI-TARS] HTTP {response.status_code}") return None result = response.json() text = result.get('response', '').strip() logger.debug(f"[Grounding/UI-TARS] Réponse brute: {text[:200]}") # Parser les coordonnées de UI-TARS coords = _parse_ui_tars_coordinates(text, screen_w, screen_h) if coords: x, y = coords # Valider que les coordonnées sont dans l'écran if 0 <= x <= screen_w and 0 <= y <= screen_h: logger.info(f"[Grounding/UI-TARS] Grounding → ({x}, {y})") return {'x': x + ox, 'y': y + oy, 'method': 'ui_tars', 'confidence': 0.85} else: logger.warning(f"[Grounding/UI-TARS] Coordonnées hors écran: ({x}, {y}) pour {screen_w}x{screen_h}") return None logger.debug(f"[Grounding/UI-TARS] Pas de coordonnées parsées dans: {text[:100]}") return None except Exception as e: logger.debug(f"[Grounding/UI-TARS] Erreur: {e}") return None def _parse_ui_tars_coordinates(text: str, screen_w: int, screen_h: int) -> Optional[tuple]: """Parse les coordonnées retournées par UI-TARS. UI-TARS peut retourner : - Coordonnées normalisées (0-1000) : "click at (500, 300)" - Coordonnées en pixels : "click at (960, 540)" - Format (x, y) ou [x, y] ou x,y - Format "Action: click\nCoordinate: (500, 300)" ou "[500, 300]" Returns: (x_pixel, y_pixel) ou None """ import re # Chercher des patterns de coordonnées patterns = [ r'Coordinate:\s*\[?\(?\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)?\]?', r'click\s+(?:at\s+)?\[?\(?\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)?\]?', r'\(\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\)', r'\[\s*(\d+(?:\.\d+)?)\s*,\s*(\d+(?:\.\d+)?)\s*\]', ] for pattern in patterns: match = re.search(pattern, text, re.IGNORECASE) if match: raw_x = float(match.group(1)) raw_y = float(match.group(2)) # UI-TARS utilise souvent des coordonnées normalisées 0-1000 if raw_x <= 1000 and raw_y <= 1000 and (raw_x > 1 or raw_y > 1): # Probablement normalisées sur 1000 x = int(raw_x * screen_w / 1000) y = int(raw_y * screen_h / 1000) elif raw_x <= 1.0 and raw_y <= 1.0: # Normalisées 0-1 x = int(raw_x * screen_w) y = int(raw_y * screen_h) else: # Pixels directs x = int(raw_x) y = int(raw_y) return (x, y) return None def _grounding_vlm(target_text: str, target_description: str = "", monitor_idx=None) -> Optional[Dict[str, Any]]: """Niveau 3 — VLM reasoning + confirmation OCR (~10s).""" try: search_label = target_description or target_text vlm_result = vlm_reason_about_screen( objective=f"Cliquer sur {search_label}", context=f"Je cherche l'élément '{target_text}' sur l'écran pour cliquer dessus" ) if not vlm_result: logger.debug("[Grounding/VLM] VLM n'a pas retourné de résultat") return None if vlm_result.get('action') != 'click' or not vlm_result.get('target'): logger.debug(f"[Grounding/VLM] VLM action={vlm_result.get('action')}, pas un clic") return None vlm_target = vlm_result['target'] logger.info(f"[Grounding/VLM] VLM suggère de cliquer sur: '{vlm_target}'") # Confirmation par OCR : chercher le target VLM sur l'écran screen, screen_w, screen_h, ox, oy = _capture_screen(monitor_idx=monitor_idx) if screen is None: return None try: try: from services.ocr_service import ocr_extract_words except ImportError: from core.extraction.field_extractor import FieldExtractor extractor = FieldExtractor() def ocr_extract_words(img): return extractor.extract_words_from_image(img) words = ocr_extract_words(screen) vlm_target_lower = vlm_target.lower() for word in words: if vlm_target_lower in word['text'].lower() or word['text'].lower() in vlm_target_lower: x1, y1, x2, y2 = word['bbox'] x = int((x1 + x2) / 2) y = int((y1 + y2) / 2) logger.info(f"[Grounding/VLM] Confirmé par OCR: '{word['text']}' à ({x}, {y})") return {'x': x + ox, 'y': y + oy, 'method': 'vlm', 'confidence': 0.75} logger.debug(f"[Grounding/VLM] Target VLM '{vlm_target}' non trouvé par OCR") return None except Exception as e: logger.debug(f"[Grounding/VLM] OCR de confirmation échoué: {e}") return None except Exception as e: logger.debug(f"[Grounding/VLM] Erreur: {e}") return None def post_execution_cleanup(execution_mode: str = 'debug'): """Vérifie l'écran après exécution et gère les dialogues restants. Appelé après la dernière étape d'un workflow pour laisser l'écran propre. """ if execution_mode not in ('intelligent', 'debug'): return logger.info("Vérification écran final...") time.sleep(1.0) for _ in range(3): detected = check_screen_for_patterns() if detected: logger.info(f"Dialogue résiduel détecté: {detected.get('pattern')}") handle_detected_pattern(detected) time.sleep(1.0) else: vlm_result = vlm_reason_about_screen( objective="Vérifier que l'écran est propre après l'exécution", context="Le workflow vient de se terminer" ) if vlm_result and vlm_result.get('action') in ('click', 'type'): logger.info(f"VLM post-workflow: {vlm_result.get('action')} '{vlm_result.get('target')}'") break