# agent_v0/server_v1/replay_engine.py """ Replay Engine — Gestion des replays de workflows. Contient : - Setup environnement (préparation apps avant replay) - Validation des actions de replay (sécurité) - Conversion workflow → actions normalisées - Fonctions utilitaires de replay (session detection, state management, retry) - Pre-check écran par embedding CLIP - Détection popup Extrait de api_stream.py pour clarifier l'architecture. """ import base64 import io import json import logging import os import re import threading import time import uuid from collections import defaultdict from pathlib import Path from typing import Any, Dict, List, Optional logger = logging.getLogger("api_stream") # ========================================================================= # Validation des actions de replay (sécurité HIGH) # ========================================================================= _ALLOWED_ACTION_TYPES = { "click", "type", "key_combo", "scroll", "wait", "file_open", "file_save", "file_close", "file_new", "file_dialog", "double_click", "right_click", "drag", "verify_screen", # Replay hybride : vérification visuelle entre groupes "pause_for_human", # Pause supervisée explicite (interceptée par /replay/next) "extract_text", # OCR serveur sur dernier heartbeat → variable workflow "extract_table", # OCR serveur + filtre regex → liste structurée (boucle) "extract_dossier", # OCR grille structurée → dossier patient persisté (brique 3) "navigate", # Navigation visuelle → coords login/recherche (brique navigation) "extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions "_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll) "t2a_decision", # Analyse LLM facturation T2A → variable workflow "llm_generate", # Génération texte libre côté serveur → variable workflow "paste_and_execute", # Bypass NoMachine : ydotool Ctrl+V+Ctrl+Enter dans VM via SSH } # Types d'actions exécutées CÔTÉ SERVEUR (jamais transmises à l'Agent V1). # Le pipeline /replay/next les traite en boucle interne et passe à l'action # suivante jusqu'à trouver une action visuelle (à transmettre au client). _SERVER_SIDE_ACTION_TYPES = { "extract_text", "extract_table", "extract_dossier", "navigate", "t2a_decision", "llm_generate", "_concat_text_vars", "paste_and_execute", } # Pause par défaut entre Ctrl+End/Home et la capture suivante (ms). # Configurable par step via parameters.scroll_pause_ms ; default ici. SCROLL_PAUSE_MS = 500 _MAX_ACTION_TEXT_LENGTH = 10000 _MAX_KEYS_PER_COMBO = 10 # Touches autorisées dans les key_combo (modificateurs + touches spéciales + caractères simples) _KNOWN_KEY_NAMES = { "enter", "return", "tab", "escape", "esc", "backspace", "delete", "space", "up", "down", "left", "right", "home", "end", "page_up", "page_down", "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12", "ctrl", "ctrl_l", "ctrl_r", "alt", "alt_l", "alt_r", "shift", "shift_l", "shift_r", "cmd", "win", "super", "super_l", "super_r", "windows", "meta", "insert", "print_screen", "caps_lock", "num_lock", } def _auto_launch_replay_after_finalize() -> bool: """Décide si ``/finalize`` doit proposer un replay direct immédiat. Patch 2026-05-23 (brief 0902 deferred-workflow) : le chemin produit cible est le workflow compilé par le worker VLM en arrière-plan, pas le replay direct depuis ``live_events.jsonl``. Le replay direct reste utile pour le smoke/debug — on l'active explicitement via la variable d'env ``RPA_AUTO_LAUNCH_REPLAY_AFTER_FINALIZE``. Default-deny : toute valeur autre que ``true``/``1``/``yes`` (case-insensitive, après strip) retourne ``False``. """ raw = os.environ.get("RPA_AUTO_LAUNCH_REPLAY_AFTER_FINALIZE", "") return raw.strip().lower() in {"true", "1", "yes"} def _validate_replay_action(action: dict) -> Optional[str]: """Valide une action de replay. Retourne un message d'erreur ou None si valide.""" action_type = action.get("type", "") # Vérifier le type d'action if action_type not in _ALLOWED_ACTION_TYPES: return f"Type d'action non autorisé : '{action_type}'. Autorisés : {sorted(_ALLOWED_ACTION_TYPES)}" # Vérifier la longueur du texte text = action.get("text", "") if isinstance(text, str) and len(text) > _MAX_ACTION_TEXT_LENGTH: return f"Texte trop long ({len(text)} > {_MAX_ACTION_TEXT_LENGTH} caractères)" # Vérifier les touches keys = action.get("keys", []) if isinstance(keys, list): if len(keys) > _MAX_KEYS_PER_COMBO: return f"Trop de touches ({len(keys)} > {_MAX_KEYS_PER_COMBO})" for key in keys: key_lower = str(key).lower() # Accepter les caractères simples (a-z, 0-9, ponctuation) et les noms connus if len(str(key)) == 1 or key_lower in _KNOWN_KEY_NAMES: continue return f"Touche inconnue : '{key}'" # Vérifier les coordonnées normalisées for coord_name in ("x_pct", "y_pct"): val = action.get(coord_name) if val is not None: try: val_f = float(val) if not (0.0 <= val_f <= 1.0): return f"Coordonnée {coord_name}={val_f} hors limites [0.0, 1.0]" except (TypeError, ValueError): return f"Coordonnée {coord_name} invalide : {val}" return None # Valide # ========================================================================= # Setup environnement — Préparation automatique avant le replay # ========================================================================= # Mapping des noms d'exécutables Windows courants vers la commande de lancement. # Utilisé comme fallback pour le texte de recherche dans le menu Démarrer. # Le format est : "processname.exe" (minuscule) -> commande shell _APP_LAUNCH_COMMANDS: Dict[str, str] = { "notepad.exe": "notepad", "explorer.exe": "explorer", "calc.exe": "calc", "mspaint.exe": "mspaint", "cmd.exe": "cmd", "powershell.exe": "powershell", "wordpad.exe": "wordpad", "charmap.exe": "charmap", "snippingtool.exe": "snippingtool", "taskmgr.exe": "taskmgr", "regedit.exe": "regedit", "mstsc.exe": "mstsc", "winword.exe": "winword", "excel.exe": "excel", "powerpnt.exe": "powerpnt", "outlook.exe": "outlook", "msedge.exe": "msedge", "chrome.exe": "chrome", "firefox.exe": "firefox", "code.exe": "code", } # Mapping des exécutables vers le nom visuel à chercher dans le menu Démarrer. # Contient le texte de recherche (souvent le nom français) et une description # pour le VLM afin d'identifier l'icône dans les résultats de recherche. # Format : "processname.exe" -> {"search_text": ..., "display_name": ..., "vlm_description": ...} _APP_VISUAL_SEARCH: Dict[str, Dict[str, str]] = { "notepad.exe": { "search_text": "Bloc-notes", "display_name": "Bloc-notes", "vlm_description": "L'application Bloc-notes (Notepad) dans les résultats de recherche", }, "calc.exe": { "search_text": "Calculatrice", "display_name": "Calculatrice", "vlm_description": "L'application Calculatrice dans les résultats de recherche", }, "mspaint.exe": { "search_text": "Paint", "display_name": "Paint", "vlm_description": "L'application Paint dans les résultats de recherche", }, "cmd.exe": { "search_text": "Invite de commandes", "display_name": "Invite de commandes", "vlm_description": "L'Invite de commandes (Command Prompt) dans les résultats", }, "powershell.exe": { "search_text": "PowerShell", "display_name": "PowerShell", "vlm_description": "Windows PowerShell dans les résultats de recherche", }, "wordpad.exe": { "search_text": "WordPad", "display_name": "WordPad", "vlm_description": "L'application WordPad dans les résultats de recherche", }, "winword.exe": { "search_text": "Word", "display_name": "Microsoft Word", "vlm_description": "Microsoft Word dans les résultats de recherche", }, "excel.exe": { "search_text": "Excel", "display_name": "Microsoft Excel", "vlm_description": "Microsoft Excel dans les résultats de recherche", }, "powerpnt.exe": { "search_text": "PowerPoint", "display_name": "Microsoft PowerPoint", "vlm_description": "Microsoft PowerPoint dans les résultats de recherche", }, "outlook.exe": { "search_text": "Outlook", "display_name": "Microsoft Outlook", "vlm_description": "Microsoft Outlook dans les résultats de recherche", }, "msedge.exe": { "search_text": "Edge", "display_name": "Microsoft Edge", "vlm_description": "Microsoft Edge dans les résultats de recherche", }, "chrome.exe": { "search_text": "Chrome", "display_name": "Google Chrome", "vlm_description": "Google Chrome dans les résultats de recherche", }, "firefox.exe": { "search_text": "Firefox", "display_name": "Mozilla Firefox", "vlm_description": "Mozilla Firefox dans les résultats de recherche", }, "code.exe": { "search_text": "Visual Studio Code", "display_name": "Visual Studio Code", "vlm_description": "Visual Studio Code dans les résultats de recherche", }, "taskmgr.exe": { "search_text": "Gestionnaire des tâches", "display_name": "Gestionnaire des tâches", "vlm_description": "Le Gestionnaire des tâches dans les résultats de recherche", }, "snippingtool.exe": { "search_text": "Outil Capture", "display_name": "Outil Capture d'écran", "vlm_description": "L'Outil Capture d'écran dans les résultats de recherche", }, "mstsc.exe": { "search_text": "Connexion Bureau à distance", "display_name": "Bureau à distance", "vlm_description": "La Connexion Bureau à distance dans les résultats", }, } # Applications Windows à ignorer pour le setup (processus système, agents, etc.) _SETUP_IGNORE_APPS = { "searchhost.exe", # Barre de recherche Windows "explorer.exe", # Explorer est toujours lancé (shell Windows) "pythonw.exe", # Agent Python (notre propre agent) "python.exe", # Idem "shellexperiencehost.exe", "startmenuexperiencehost.exe", "applicationframehost.exe", "systemsettings.exe", "textinputhost.exe", "runtimebroker.exe", } # Certaines applications Windows légères sont plus robustes à lancer via # `Win+R` + commande shell qu'à travers un setup 100% visuel Démarrer / # Rechercher. On active cette stratégie de façon ciblée pour les cas validés # en live afin d'éviter que la validation métier dépende d'un chemin Windows # fragile sans rapport avec le workflow testé. _SETUP_RUN_DIALOG_APPS = { "notepad.exe", } # Tokens de titres "neutres" = état initial d'une application fraîchement # lancée par le setup auto (Win+Démarrer → recherche → clic résultat). # Quand la session source contient un focus_change vers un de ces titres # peu après le premier focus app, le trim coupe jusqu'à ce focus pour # éliminer les clics intra-app redondants (bascule d'onglet, fermeture de # fenêtre précédente, etc.) que le setup auto rend inutiles. _NEUTRAL_TITLE_TOKENS = frozenset({ "sans titre", # Bloc-notes FR "untitled", # Notepad EN "document1", # Word "classeur1", # Excel FR "book1", # Excel EN "présentation1", # PowerPoint FR "presentation1", # PowerPoint EN }) # Nombre d'events bruts inspectés après le premier focus vers primary_app # pour rechercher un focus vers un titre neutre. Volontairement court # pour ne pas couper un workflow qui re-visite un titre neutre bien plus # tard (filet de sécurité). _TRIM_NEUTRAL_LOOKAHEAD = 15 def _is_neutral_window_title(window_title: str) -> bool: """Détecter si un titre de fenêtre correspond à l'état initial vide d'une application (fenêtre fraîchement ouverte par le setup auto). Exemples : ``Sans titre – Bloc-notes`` → True, ``http://foo.txt – Bloc-notes`` → False, ``Document1 - Word`` → True. """ if not window_title: return False title = str(window_title).strip().lower() for sep in (" – ", " - "): if sep in title: title = title.split(sep, 1)[0].strip() break return title.lstrip("*").strip() in _NEUTRAL_TITLE_TOKENS def _relative_position_labels(x: int, y: int, screen_w: int, screen_h: int) -> Dict[str, str]: """Décrire une position en termes relatifs sur l'écran.""" y_relative = "" x_relative = "" if screen_h > 0: y_relative = ( "en bas" if y / screen_h > 0.8 else "en haut" if y / screen_h < 0.2 else "au milieu" ) if screen_w > 0: x_relative = ( "à gauche" if x / screen_w < 0.3 else "à droite" if x / screen_w > 0.7 else "au centre" ) return { "x_relative": x_relative, "y_relative": y_relative, } def _extract_launch_result_target( raw_events: list, primary_app: str, ) -> Optional[Dict[str, Any]]: """Retrouver le vrai clic de lancement depuis SearchHost.exe. Cherche un clic dans la fenêtre de recherche Windows qui est suivi rapidement d'un focus vers l'application principale. Ce clic sert de meilleure cible pour le setup auto qu'une cible synthétique `display_name/app_icon`, trop fragile sur les résultats de recherche. """ primary_app_lower = (primary_app or "").lower() if not primary_app_lower: return None for idx, raw_evt in enumerate(raw_events): event_data = raw_evt.get("event", raw_evt) if event_data.get("type") != "mouse_click": continue window = event_data.get("window", {}) if not isinstance(window, dict): continue if window.get("app_name", "").lower() != "searchhost.exe": continue pos = event_data.get("pos") or [] if not isinstance(pos, list) or len(pos) != 2: continue screen_meta = event_data.get("screen_metadata", {}) screen_res = ( screen_meta.get("screen_resolution") or event_data.get("screen_resolution") or [] ) if not isinstance(screen_res, list) or len(screen_res) != 2: continue try: click_x = int(pos[0]) click_y = int(pos[1]) screen_w = int(screen_res[0]) screen_h = int(screen_res[1]) except (TypeError, ValueError): continue if screen_w <= 0 or screen_h <= 0: continue click_ts = event_data.get("timestamp") launched = False for follow_evt in raw_events[idx + 1: idx + 8]: follow_data = follow_evt.get("event", follow_evt) if follow_data.get("type") != "window_focus_change": continue to_info = follow_data.get("to", {}) if not isinstance(to_info, dict): continue if to_info.get("app_name", "").lower() != primary_app_lower: continue follow_ts = follow_data.get("timestamp") if ( isinstance(click_ts, (int, float)) and isinstance(follow_ts, (int, float)) and follow_ts - click_ts > 5.0 ): break launched = True break if not launched: continue pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h) position_desc = " ".join( part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part ) window_title = window.get("title", "") or "Rechercher" target: Dict[str, Any] = { "x_pct": round(click_x / screen_w, 6), "y_pct": round(click_y / screen_h, 6), "window_title": window_title, "expected_window_before": window_title, "original_position": pos_labels, "source_app": window.get("app_name", ""), } if position_desc: target["position_desc"] = position_desc window_capture = event_data.get("window_capture", {}) if isinstance(window_capture, dict): click_relative = window_capture.get("click_relative") window_size = window_capture.get("window_size") if ( isinstance(click_relative, list) and len(click_relative) == 2 and isinstance(window_size, list) and len(window_size) == 2 ): target["window_capture"] = { "click_relative": click_relative, "window_size": window_size, } return target return None def _extract_start_menu_target(raw_events: list) -> Optional[Dict[str, Any]]: """Retrouver le vrai clic sur Démarrer depuis les événements bruts.""" return _extract_start_menu_target_from_session(raw_events, session_dir=None) def _load_click_anchor_from_session( session_dir: Optional[str], screenshot_id: str, click_x: int, click_y: int, ) -> str: """Charger un crop 80x80 depuis le screenshot source stocké côté serveur.""" if not session_dir or not screenshot_id: return "" full_path = Path(session_dir) / "shots" / f"{screenshot_id}_full.png" if not full_path.is_file(): return "" try: from PIL import Image img = Image.open(full_path) crop_size = 40 x1 = max(0, click_x - crop_size) y1 = max(0, click_y - crop_size) x2 = min(img.width, click_x + crop_size) y2 = min(img.height, click_y + crop_size) cropped = img.crop((x1, y1, x2, y2)) buf = io.BytesIO() cropped.save(buf, format="PNG") return base64.b64encode(buf.getvalue()).decode("utf-8") except Exception as e: logger.debug("setup start anchor: crop échoué pour %s: %s", full_path, e) return "" def _extract_start_menu_target_from_session( raw_events: list, session_dir: Optional[str] = None, ) -> Optional[Dict[str, Any]]: """Retrouver le vrai clic sur Démarrer et son ancre visuelle si disponible.""" for idx, raw_evt in enumerate(raw_events): event_data = raw_evt.get("event", raw_evt) if event_data.get("type") != "mouse_click": continue pos = event_data.get("pos") or [] if not isinstance(pos, list) or len(pos) != 2: continue screen_meta = event_data.get("screen_metadata", {}) screen_res = ( screen_meta.get("screen_resolution") or event_data.get("screen_resolution") or [] ) if not isinstance(screen_res, list) or len(screen_res) != 2: continue click_ts = event_data.get("timestamp") opened_search = False for follow_evt in raw_events[idx + 1: idx + 6]: follow_data = follow_evt.get("event", follow_evt) if follow_data.get("type") != "window_focus_change": continue to_info = follow_data.get("to", {}) if not isinstance(to_info, dict): continue if to_info.get("app_name", "").lower() != "searchhost.exe": continue follow_ts = follow_data.get("timestamp") if ( isinstance(click_ts, (int, float)) and isinstance(follow_ts, (int, float)) and follow_ts - click_ts > 3.0 ): break opened_search = True break if not opened_search: continue try: click_x = int(pos[0]) click_y = int(pos[1]) screen_w = int(screen_res[0]) screen_h = int(screen_res[1]) except (TypeError, ValueError): continue if screen_w <= 0 or screen_h <= 0: continue pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h) position_desc = " ".join( part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part ) target: Dict[str, Any] = { "x_pct": round(click_x / screen_w, 6), "y_pct": round(click_y / screen_h, 6), "original_position": pos_labels, "position_desc": position_desc, } anchor_b64 = _load_click_anchor_from_session( session_dir=session_dir, screenshot_id=str(event_data.get("screenshot_id", "")).strip(), click_x=click_x, click_y=click_y, ) if anchor_b64: target["anchor_image_base64"] = anchor_b64 return target return None def _extract_search_box_interaction(raw_events: list) -> Optional[Dict[str, Any]]: """Déterminer comment la recherche Windows a été activée. Cas utile observé en prod : - clic Démarrer -> focus SearchHost -> saisie directe => inutile de générer un clic artificiel sur le champ de recherche. """ for idx, raw_evt in enumerate(raw_events): event_data = raw_evt.get("event", raw_evt) if event_data.get("type") != "window_focus_change": continue to_info = event_data.get("to", {}) if not isinstance(to_info, dict): continue if to_info.get("app_name", "").lower() != "searchhost.exe": continue search_window_title = to_info.get("title", "") or "Rechercher" for follow_evt in raw_events[idx + 1: idx + 8]: follow_data = follow_evt.get("event", follow_evt) follow_type = follow_data.get("type") if follow_type == "text_input": window = follow_data.get("window", {}) if isinstance(window, dict) and window.get("app_name", "").lower() == "searchhost.exe": return { "mode": "direct_typing", "window_title": window.get("title", "") or search_window_title, } continue if follow_type == "mouse_click": window = follow_data.get("window", {}) if not (isinstance(window, dict) and window.get("app_name", "").lower() == "searchhost.exe"): continue pos = follow_data.get("pos") or [] screen_meta = follow_data.get("screen_metadata", {}) screen_res = ( screen_meta.get("screen_resolution") or follow_data.get("screen_resolution") or [] ) if not (isinstance(pos, list) and len(pos) == 2 and isinstance(screen_res, list) and len(screen_res) == 2): continue has_text_after = False for later_evt in raw_events[idx + 2: idx + 8]: later_data = later_evt.get("event", later_evt) if later_data.get("type") != "text_input": continue later_window = later_data.get("window", {}) if isinstance(later_window, dict) and later_window.get("app_name", "").lower() == "searchhost.exe": has_text_after = True break if not has_text_after: continue try: click_x = int(pos[0]) click_y = int(pos[1]) screen_w = int(screen_res[0]) screen_h = int(screen_res[1]) except (TypeError, ValueError): continue if screen_w <= 0 or screen_h <= 0: continue pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h) position_desc = " ".join( part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part ) return { "mode": "click_then_type", "x_pct": round(click_x / screen_w, 6), "y_pct": round(click_y / screen_h, 6), "window_title": window.get("title", "") or search_window_title, "expected_window_before": window.get("title", "") or search_window_title, "original_position": pos_labels, "position_desc": position_desc, } if follow_type == "window_focus_change": next_to = follow_data.get("to", {}) if isinstance(next_to, dict) and next_to.get("app_name", "").lower() != "searchhost.exe": break return None def _extract_required_apps_from_events( raw_events: list, session_dir: Optional[str] = None, ) -> Dict[str, Any]: """Extraire les applications requises depuis les événements bruts d'une session. Analyse les window_focus_change pour identifier : - L'application principale (la plus utilisée hors apps système) - La première fenêtre ciblée (pour le setup initial) Args: raw_events: Événements bruts depuis live_events.jsonl. Returns: Dict avec les clés : - primary_app: str (nom de l'exécutable principal, ex: "Notepad.exe") - primary_launch_cmd: str (commande Win+R, ex: "notepad") - first_window_title: str (titre de la première fenêtre applicative) - apps: dict[str, int] (app_name -> nombre d'occurrences) - start_menu_target: dict optionnel (vrai clic Démarrer) - search_box_interaction: dict optionnel (saisie directe ou vrai clic SearchHost) - launch_result_target: dict optionnel (vrai clic SearchHost -> app) """ app_counts: Dict[str, int] = defaultdict(int) app_titles: Dict[str, List[str]] = defaultdict(list) first_app = None first_window_title = None for raw_evt in raw_events: event_data = raw_evt.get("event", raw_evt) evt_type = event_data.get("type", "") if evt_type == "window_focus_change": to_info = event_data.get("to", {}) if not to_info: continue app_name = to_info.get("app_name", "") title = to_info.get("title", "") if app_name: app_counts[app_name] += 1 if title: app_titles[app_name].append(title) if first_app is None and app_name.lower() not in _SETUP_IGNORE_APPS: first_app = app_name first_window_title = title # Aussi extraire depuis les mouse_click qui ont un champ window elif evt_type == "mouse_click": window = event_data.get("window", {}) if isinstance(window, dict): app_name = window.get("app_name", "") if app_name: app_counts[app_name] += 1 if not app_counts: return {} # Déterminer l'application principale (la plus fréquente hors apps ignorées) filtered_apps = { k: v for k, v in app_counts.items() if k.lower() not in _SETUP_IGNORE_APPS } if not filtered_apps: return {} primary_app = max(filtered_apps, key=filtered_apps.get) # Résoudre la commande de lancement primary_launch_cmd = _resolve_launch_command(primary_app) start_menu_target = _extract_start_menu_target_from_session( raw_events, session_dir=session_dir, ) search_box_interaction = _extract_search_box_interaction(raw_events) launch_result_target = _extract_launch_result_target(raw_events, primary_app) result = { "primary_app": primary_app, "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_window_title or "", "apps": dict(app_counts), "has_neutral_window_title": any( _is_neutral_window_title(title) for title in app_titles.get(primary_app, []) ), } if start_menu_target: result["start_menu_target"] = start_menu_target if search_box_interaction: result["search_box_interaction"] = search_box_interaction if launch_result_target: result["launch_result_target"] = launch_result_target return result def _trim_redundant_setup_events( raw_events: List[Dict[str, Any]], app_info: Dict[str, Any], ) -> List[Dict[str, Any]]: """Couper le préambule de lancement déjà couvert par le setup injecté. Quand `/replay-session` injecte un setup visuel (Démarrer -> SearchHost -> résultat d'application), les événements bruts de la session source contiennent encore cette même séquence. Sans coupe, le replay rejoue l'ouverture de l'application une deuxième fois et dérive hors contexte. Stratégie : - chercher la première `window_focus_change` vers l'application principale - préférer un titre qui matche `first_window_title` - conserver uniquement les événements APRÈS cette bascule de focus Args: raw_events: événements source complets de la session. app_info: résultat de `_extract_required_apps_from_events`. Returns: Liste coupée si un point de reprise fiable est trouvé, sinon la liste d'origine inchangée. """ if not raw_events or not app_info: return raw_events primary_app = str(app_info.get("primary_app", "")).strip().lower() first_title = str(app_info.get("first_window_title", "")).strip().lower() if not primary_app: return raw_events first_primary_idx = None matched_idx = None neutral_idx = None # Si le titre observé en premier est déjà neutre, le setup amène # déjà l'app dans le même état → comportement legacy suffit. first_title_is_neutral = _is_neutral_window_title(first_title) for idx, raw_evt in enumerate(raw_events): event_data = raw_evt.get("event", raw_evt) if event_data.get("type", "") != "window_focus_change": continue to_info = event_data.get("to", {}) if not isinstance(to_info, dict): continue app_name = str(to_info.get("app_name", "")).strip().lower() if app_name != primary_app: continue title = str(to_info.get("title", "")).strip() title_lower = title.lower() if first_primary_idx is None: first_primary_idx = idx # Priorité : focus vers un titre neutre proche du premier focus # → c'est l'état que le setup auto va réellement produire, donc # les events entre le premier focus et celui-ci sont redondants # (bascule d'onglet vers la fenêtre vide, etc.). if ( not first_title_is_neutral and neutral_idx is None and _is_neutral_window_title(title) and (idx - first_primary_idx) <= _TRIM_NEUTRAL_LOOKAHEAD ): neutral_idx = idx if matched_idx is None: if not first_title: matched_idx = idx elif title_lower and ( first_title in title_lower or title_lower in first_title ): matched_idx = idx # Early exit dès qu'on a trouvé ce qu'on cherche, ou qu'on est # sorti du lookahead pour le neutral. if neutral_idx is not None: break if matched_idx is not None and ( first_title_is_neutral or (idx - first_primary_idx) > _TRIM_NEUTRAL_LOOKAHEAD ): break cut_idx = ( neutral_idx if neutral_idx is not None else (matched_idx if matched_idx is not None else first_primary_idx) ) if cut_idx is None: logger.info( "setup trim : aucun focus initial trouvé pour '%s' — replay brut conservé", primary_app, ) return raw_events trimmed = raw_events[cut_idx + 1:] logger.info( "setup trim : %d événements retirés avant le replay brut " "(app=%s, titre=%s, neutral=%s, restant=%d)", cut_idx + 1, primary_app, app_info.get("first_window_title", ""), neutral_idx is not None, len(trimmed), ) return trimmed def _extract_required_apps_from_workflow(workflow) -> Dict[str, Any]: """Extraire les applications requises depuis un workflow structuré. Analyse les nodes du workflow pour identifier les titres de fenêtres requis, puis infère l'application principale. Args: workflow: Objet Workflow ou dict brut. Returns: Même format que _extract_required_apps_from_events. """ # Accéder aux données (objet ou dict) if hasattr(workflow, 'nodes'): nodes = workflow.nodes metadata = workflow.metadata if hasattr(workflow, 'metadata') else {} elif isinstance(workflow, dict): nodes = workflow.get('nodes', []) metadata = workflow.get('metadata', {}) else: return {} if not nodes: return {} # Collecter les titres de fenêtres depuis les nodes window_titles = [] for node in nodes: template = node.template if hasattr(node, 'template') else node.get('template', {}) if isinstance(template, dict): window = template.get('window', {}) elif hasattr(template, 'window'): window = template.window if hasattr(template.window, '__dict__') else {} else: window = {} if isinstance(window, dict): title = window.get('title_pattern', '') or window.get('title_contains', '') elif hasattr(window, 'title_pattern'): title = getattr(window, 'title_pattern', '') or '' else: title = '' if title: window_titles.append(title) # Inférer l'app principale depuis les titres de fenêtres primary_app, primary_launch_cmd, matched_title = _infer_app_from_window_titles(window_titles) # Utiliser le titre qui a matché l'app (pas le premier node qui peut être "Rechercher") first_title = matched_title or (window_titles[0] if window_titles else "") if not primary_app: return {} source_session_id = metadata.get("source_session_id", "") if isinstance(metadata, dict) else "" machine_id = metadata.get("machine_id", "") if isinstance(metadata, dict) else "" return { "primary_app": primary_app, "primary_launch_cmd": primary_launch_cmd, "first_window_title": first_title, "apps": {}, "has_neutral_window_title": any( _is_neutral_window_title(title) for title in window_titles ), "source_session_id": source_session_id, "machine_id": machine_id, } def _resolve_launch_command(app_name: str) -> str: """Résoudre la commande Win+R pour lancer une application. Si l'app n'est pas dans le mapping, utilise le nom de l'exécutable directement sans l'extension .exe (fonctionne pour la plupart des apps). """ app_lower = app_name.lower() if app_lower in _APP_LAUNCH_COMMANDS: return _APP_LAUNCH_COMMANDS[app_lower] # Fallback : utiliser le nom sans l'extension .exe if app_lower.endswith(".exe"): return app_name[:-4] return app_name def _infer_app_from_window_titles(titles: list) -> tuple: """Inférer le nom de l'application et la commande de lancement depuis des titres de fenêtres. Utilise des heuristiques basées sur les patterns de titres Windows courants. Returns: Tuple (app_name, launch_command, matched_title). ("", "", "") si non identifié. """ _TITLE_APP_PATTERNS = [ ("bloc-notes", "Notepad.exe", "notepad"), ("notepad", "Notepad.exe", "notepad"), ("word", "winword.exe", "winword"), ("excel", "excel.exe", "excel"), ("powerpoint", "powerpnt.exe", "powerpnt"), ("outlook", "outlook.exe", "outlook"), ("paint", "mspaint.exe", "mspaint"), ("calculatrice", "calc.exe", "calc"), ("calculator", "calc.exe", "calc"), ("explorateur de fichiers", "explorer.exe", "explorer"), ("file explorer", "explorer.exe", "explorer"), ("invite de commandes", "cmd.exe", "cmd"), ("command prompt", "cmd.exe", "cmd"), ("powershell", "powershell.exe", "powershell"), ("visual studio code", "code.exe", "code"), ("edge", "msedge.exe", "msedge"), ("chrome", "chrome.exe", "chrome"), ("firefox", "firefox.exe", "firefox"), ] for title in titles: title_lower = title.lower() for pattern, app_name, launch_cmd in _TITLE_APP_PATTERNS: if pattern in title_lower: # Ignorer les apps système (explorer, etc.) if app_name.lower() in _SETUP_IGNORE_APPS: continue return (app_name, launch_cmd, title) return ("", "", "") def _get_visual_search_info(app_name: str) -> Dict[str, str]: """Obtenir les informations de recherche visuelle pour une application. Consulte _APP_VISUAL_SEARCH, sinon construit un fallback à partir du nom de l'exécutable (ex: "MonApp.exe" -> search_text="MonApp"). Args: app_name: Nom de l'exécutable (ex: "Notepad.exe"). Returns: Dict avec search_text, display_name, vlm_description. """ app_lower = app_name.lower() if app_lower in _APP_VISUAL_SEARCH: return dict(_APP_VISUAL_SEARCH[app_lower]) # Fallback : utiliser le nom sans .exe base_name = app_name[:-4] if app_lower.endswith(".exe") else app_name return { "search_text": base_name, "display_name": base_name, "vlm_description": f"L'application {base_name} dans les résultats de recherche", } def _should_use_run_dialog_setup(primary_app: str, launch_cmd: str) -> bool: """Déterminer si le setup doit passer par `Win+R`. On cible seulement quelques apps connues où le chemin Démarrer → Rechercher s'est montré fragile en live, alors que la commande shell est stable et sémantiquement équivalente pour préparer l'environnement. """ app_lower = str(primary_app or "").strip().lower() launch_cmd = str(launch_cmd or "").strip() return bool(app_lower in _SETUP_RUN_DIALOG_APPS and launch_cmd) def _generate_run_dialog_setup_actions( app_info: Dict[str, Any], setup_id_prefix: str = "setup", ) -> List[Dict[str, Any]]: """Générer un setup sémantique `Win+R -> commande -> Enter`. Utilisé pour les applications dont l'ouverture via Démarrer/Rechercher ajoute une fragilité Windows sans valeur métier pour le replay. """ launch_cmd = str(app_info.get("primary_launch_cmd", "") or "").strip() primary_app = str(app_info.get("primary_app", "") or "").strip() first_title = str(app_info.get("first_window_title", "") or "").strip() visual_info = _get_visual_search_info(primary_app) display_name = str(visual_info.get("display_name", "") or "").strip() if not launch_cmd or not primary_app: return [] heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"} wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000 title_patterns: List[str] = [] for candidate in ( display_name, launch_cmd, primary_app[:-4] if primary_app.lower().endswith(".exe") else primary_app, ): candidate = str(candidate or "").strip() if not candidate: continue if candidate.lower() not in {p.lower() for p in title_patterns}: title_patterns.append(candidate) if " " in candidate: last_token = candidate.split()[-1].strip() if last_token and last_token.lower() not in {p.lower() for p in title_patterns}: title_patterns.append(last_token) actions: List[Dict[str, Any]] = [ { "action_id": f"act_{setup_id_prefix}_open_run", "type": "key_combo", "keys": ["win", "r"], "_setup_phase": True, "_setup_step": "open_run_dialog", "_setup_strategy": "run_dialog", }, { "action_id": f"act_{setup_id_prefix}_wait_run", "type": "wait", "duration_ms": 500, "_setup_phase": True, "_setup_step": "wait_run_dialog", "_setup_strategy": "run_dialog", }, { "action_id": f"act_{setup_id_prefix}_type_launch_cmd", "type": "type", "text": launch_cmd, "_setup_phase": True, "_setup_step": "type_launch_command", "_setup_strategy": "run_dialog", }, { "action_id": f"act_{setup_id_prefix}_wait_launch_cmd", "type": "wait", "duration_ms": 300, "_setup_phase": True, "_setup_step": "wait_launch_command", "_setup_strategy": "run_dialog", }, { "action_id": f"act_{setup_id_prefix}_submit_run", "type": "key_combo", "keys": ["enter"], "_setup_phase": True, "_setup_step": "submit_run_dialog", "_setup_strategy": "run_dialog", }, { "action_id": f"act_{setup_id_prefix}_wait_launch", "type": "wait", "duration_ms": wait_ms, "_setup_phase": True, "_setup_step": "wait_app_launch", "_setup_strategy": "run_dialog", }, ] needs_fresh_notepad_document = ( primary_app.lower() == "notepad.exe" and ( bool(app_info.get("has_neutral_window_title")) or _is_neutral_window_title(first_title) ) ) if needs_fresh_notepad_document: if title_patterns or first_title: actions.append({ "action_id": f"act_{setup_id_prefix}_verify_before_fresh_document", "type": "verify_screen", "expected_node": "setup_initial_before_fresh_document", "timeout_ms": 5000, "_setup_phase": True, "_setup_step": "verify_app_ready_before_fresh_document", "_setup_strategy": "run_dialog", "expected_window_title_contains": title_patterns or [first_title], "intention": ( "vérifier que Bloc-notes est la scène active avant " "d'ouvrir un document vierge" ), }) actions.extend([ { "action_id": f"act_{setup_id_prefix}_ensure_fresh_document", "type": "key_combo", "keys": ["ctrl", "n"], "_setup_phase": True, "_setup_step": "ensure_fresh_document", "_setup_strategy": "run_dialog", "expected_window_before": first_title, "intention": "ouvrir un document Bloc-notes vierge non nommé", }, { "action_id": f"act_{setup_id_prefix}_wait_fresh_document", "type": "wait", "duration_ms": 400, "_setup_phase": True, "_setup_step": "wait_fresh_document", "_setup_strategy": "run_dialog", }, ]) if title_patterns or first_title: actions.append({ "action_id": f"act_{setup_id_prefix}_verify", "type": "verify_screen", "expected_node": "setup_initial", "timeout_ms": 5000, "_setup_phase": True, "_setup_step": "verify_app_ready", "_setup_strategy": "run_dialog", "expected_window_title_contains": title_patterns or [first_title], }) logger.info( "Setup env sémantique généré : %d actions pour lancer '%s' via Win+R (%s)", len(actions), primary_app, launch_cmd, ) return actions def _generate_setup_actions( app_info: Dict[str, Any], setup_id_prefix: str = "setup", ) -> List[Dict[str, Any]]: """Générer les actions 100% visuelles pour ouvrir l'application avant le replay. Approche entièrement visuelle -- JAMAIS de raccourcis clavier (Win, Win+R, Ctrl+X, etc.) qui n'ont pas été enregistrés par l'utilisateur. Tout passe par des clics visuels résolus par le VLM (Qwen2.5-VL). La séquence est : 1. Clic visuel sur le bouton Démarrer (coin bas-gauche de l'écran) 2. Attendre que le menu Démarrer s'ouvre (1s) 3. Clic visuel sur la barre de recherche du menu Démarrer 4. Attendre que la barre de recherche soit active (500ms) 5. Taper le nom de l'application (texte français, ex: "Bloc-notes") 6. Attendre les résultats de recherche (1.2s) 7. Clic visuel sur le résultat de l'application trouvée 8. Attendre que l'application s'ouvre (2-3s selon le poids) 9. verify_screen : vérifier que la fenêtre attendue est apparue Args: app_info: Dict retourné par _extract_required_apps_from_events ou _extract_required_apps_from_workflow. setup_id_prefix: Préfixe pour les action_id générés. Returns: Liste d'actions normalisées, prêtes à injecter dans la queue. Liste vide si aucune préparation n'est nécessaire. """ if not app_info: return [] launch_cmd = app_info.get("primary_launch_cmd", "") primary_app = app_info.get("primary_app", "") first_title = app_info.get("first_window_title", "") if not launch_cmd: logger.debug( "setup_actions : pas de commande de lancement pour '%s', skip", primary_app, ) return [] # Ne pas lancer les apps système (toujours présentes) if primary_app.lower() in _SETUP_IGNORE_APPS: logger.debug("setup_actions : app '%s' ignorée (système)", primary_app) return [] if _should_use_run_dialog_setup(primary_app, launch_cmd): return _generate_run_dialog_setup_actions( app_info, setup_id_prefix=setup_id_prefix, ) # Obtenir les informations de recherche visuelle pour cette app visual_info = _get_visual_search_info(primary_app) search_text = visual_info["search_text"] display_name = visual_info["display_name"] vlm_description = visual_info["vlm_description"] start_menu_target = app_info.get("start_menu_target", {}) or {} search_box_interaction = app_info.get("search_box_interaction", {}) or {} launch_result_target = app_info.get("launch_result_target", {}) or {} actions = [] logger.info( "Génération setup env 100%% visuel : lancement de '%s' via clic " "Démarrer → recherche visuelle '%s' (fenêtre attendue : '%s')", primary_app, search_text, first_title, ) # 1. Clic visuel sur le bouton Démarrer (toujours visible, bas-gauche) # Le VLM résout la position exacte ; x_pct/y_pct sont des fallbacks. start_click_spec = { "by_text": "Démarrer", "by_role": "start_button", "vlm_description": ( "Le bouton Démarrer de Windows (icône Windows), " "en bas à gauche de la barre des tâches" ), "screen_scope": "full_screen", } start_click_x = 0.02 start_click_y = 0.98 if start_menu_target: start_click_x = float(start_menu_target.get("x_pct", start_click_x)) start_click_y = float(start_menu_target.get("y_pct", start_click_y)) start_click_spec["by_text"] = "" start_click_spec["allow_position_fallback"] = True anchor_b64 = str(start_menu_target.get("anchor_image_base64", "")).strip() if anchor_b64: start_click_spec["anchor_image_base64"] = anchor_b64 original_position = start_menu_target.get("original_position") if isinstance(original_position, dict) and original_position: start_click_spec["original_position"] = dict(original_position) position_desc = str(start_menu_target.get("position_desc", "")).strip() if position_desc: start_click_spec["vlm_description"] = ( "L'icône Windows du bouton Démarrer dans la barre des tâches, " f"visible {position_desc} de l'écran" ) else: start_click_spec["vlm_description"] = ( "L'icône Windows du bouton Démarrer dans la barre des tâches" ) actions.append({ "action_id": f"act_{setup_id_prefix}_click_start", "type": "click", "x_pct": start_click_x, "y_pct": start_click_y, "button": "left", "visual_mode": True, "target_spec": start_click_spec, "_setup_phase": True, "_setup_step": "click_start_menu", }) # 2. Attendre que le menu Démarrer s'ouvre actions.append({ "action_id": f"act_{setup_id_prefix}_wait_start", "type": "wait", "duration_ms": 1000, "_setup_phase": True, "_setup_step": "wait_start_menu", }) # 2b. Garde visuelle : le menu Démarrer / la barre de recherche # doit être réellement actif avant de continuer. Sans cette garde, # un click_start qui touche en fait le systray overflow popup # laisse le setup taper « bloc » dans la mauvaise fenêtre (cf. # run live 2026-05-22 replay_sess_76b7d067). L'exécuteur compare # le titre actif aux patterns ci-dessous (substring case-insensitive, # FR+EN+app-name) et bascule en apprentissage humain si aucun match. actions.append({ "action_id": f"act_{setup_id_prefix}_verify_start_open", "type": "verify_screen", "expected_node": "", "timeout_ms": 1500, "expected_window_title_contains": [ "Rechercher", "Recherche", "Search", "Cortana", "Démarrer", "Start", "SearchHost", "StartMenuExperienceHost", ], "_setup_phase": True, "_setup_step": "verify_start_menu_open", }) search_mode = str(search_box_interaction.get("mode", "")).strip() if search_mode != "direct_typing": search_click_spec = { "by_text": "Rechercher", "by_role": "search_box", "vlm_description": ( "La barre ou le champ de recherche dans le menu Démarrer " "de Windows, souvent intitulé 'Tapez ici pour rechercher' " "ou 'Rechercher'" ), } search_click_x = 0.20 search_click_y = 0.92 search_expected_window = "" if search_mode == "click_then_type": search_click_x = float(search_box_interaction.get("x_pct", search_click_x)) search_click_y = float(search_box_interaction.get("y_pct", search_click_y)) search_expected_window = str( search_box_interaction.get("expected_window_before") or search_box_interaction.get("window_title") or "" ) search_click_spec["window_title"] = str( search_box_interaction.get("window_title", "") ).strip() original_position = search_box_interaction.get("original_position") if isinstance(original_position, dict) and original_position: search_click_spec["original_position"] = dict(original_position) position_desc = str(search_box_interaction.get("position_desc", "")).strip() if position_desc: search_click_spec["vlm_description"] = ( f"Dans la fenêtre '{search_click_spec['window_title']}', " f"le champ de recherche se trouve {position_desc} de l'écran" ) search_click_action = { "action_id": f"act_{setup_id_prefix}_click_search", "type": "click", "x_pct": search_click_x, "y_pct": search_click_y, "button": "left", "visual_mode": True, "target_spec": search_click_spec, "_setup_phase": True, "_setup_step": "click_search_box", } if search_expected_window: search_click_action["expected_window_before"] = search_expected_window actions.append(search_click_action) # 4. Attendre que la barre de recherche soit active et prête actions.append({ "action_id": f"act_{setup_id_prefix}_wait_search_ready", "type": "wait", "duration_ms": 500, "_setup_phase": True, "_setup_step": "wait_search_ready", }) # 4b. Garde visuelle : la barre Rechercher doit effectivement # avoir le focus avant la frappe. On combine le titre récupéré # de la session source (`search_box_interaction.window_title`) # avec un fallback FR/EN générique. search_window_hint = str(search_box_interaction.get("window_title", "")).strip() verify_patterns = ["Rechercher", "Recherche", "Search"] if search_window_hint and search_window_hint not in verify_patterns: verify_patterns = [search_window_hint] + verify_patterns actions.append({ "action_id": f"act_{setup_id_prefix}_verify_search_active", "type": "verify_screen", "expected_node": "", "timeout_ms": 1500, "expected_window_title_contains": verify_patterns, "_setup_phase": True, "_setup_step": "verify_search_box_active", }) # 5. Taper le nom visuel de l'application (texte français) actions.append({ "action_id": f"act_{setup_id_prefix}_type_search", "type": "type", "text": search_text, "_setup_phase": True, "_setup_step": "type_app_name", }) # 6. Attendre que la recherche Windows trouve l'application actions.append({ "action_id": f"act_{setup_id_prefix}_wait_results", "type": "wait", "duration_ms": 1200, "_setup_phase": True, "_setup_step": "wait_search_results", }) # 6b. Dernière garde avant le clic résultat : la barre Rechercher # (et donc la liste de résultats) doit toujours être active. Sans # cette garde finale, un focus perdu pendant wait_search_results # fait cliquer click_app_result dans la mauvaise surface (constat # live 2026-05-22 — fenêtre observée « Fenêtre de dépassement de # capacité de la barre d'état système »). actions.append({ "action_id": f"act_{setup_id_prefix}_verify_results_visible", "type": "verify_screen", "expected_node": "", "timeout_ms": 1500, "expected_window_title_contains": [ "Rechercher", "Recherche", "Search", "Cortana", "SearchHost", "StartMenuExperienceHost", ], "_setup_phase": True, "_setup_step": "verify_search_results_visible", }) # 7. Clic visuel sur le résultat de l'application dans la liste click_result_spec = { "by_text": display_name, "by_role": "app_icon", "vlm_description": vlm_description, } click_result_x = 0.20 click_result_y = 0.50 click_result_expected_window = "" if launch_result_target: click_result_x = float(launch_result_target.get("x_pct", click_result_x)) click_result_y = float(launch_result_target.get("y_pct", click_result_y)) click_result_expected_window = str( launch_result_target.get("expected_window_before") or launch_result_target.get("window_title") or "" ) click_result_spec["by_role"] = "search_result" click_result_spec["allow_position_fallback"] = True click_result_spec["window_title"] = str(launch_result_target.get("window_title", "")).strip() original_position = launch_result_target.get("original_position") if isinstance(original_position, dict) and original_position: click_result_spec["original_position"] = dict(original_position) window_capture = launch_result_target.get("window_capture") if isinstance(window_capture, dict) and window_capture: click_result_spec["window_capture"] = dict(window_capture) position_desc = str(launch_result_target.get("position_desc", "")).strip() if position_desc: click_result_spec["vlm_description"] = ( f"Dans la fenêtre '{click_result_spec['window_title']}', " f"le résultat de recherche de l'application '{display_name}' " f"se trouve {position_desc} de l'écran" ) else: click_result_spec["vlm_description"] = ( f"Dans la fenêtre '{click_result_spec['window_title']}', " f"cliquez sur le résultat de recherche de l'application '{display_name}'" ) click_result_action = { "action_id": f"act_{setup_id_prefix}_click_result", "type": "click", "x_pct": click_result_x, "y_pct": click_result_y, "button": "left", "visual_mode": True, "target_spec": click_result_spec, "_setup_phase": True, "_setup_step": "click_app_result", } if click_result_expected_window: click_result_action["expected_window_before"] = click_result_expected_window actions.append({ **click_result_action, }) # 8. Attendre que l'application s'ouvre heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"} wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000 actions.append({ "action_id": f"act_{setup_id_prefix}_wait_launch", "type": "wait", "duration_ms": wait_ms, "_setup_phase": True, "_setup_step": "wait_app_launch", }) # 9. Vérification visuelle que la fenêtre attendue est apparue if first_title: actions.append({ "action_id": f"act_{setup_id_prefix}_verify", "type": "verify_screen", "expected_node": "setup_initial", "timeout_ms": 5000, "_setup_phase": True, "_setup_step": "verify_app_ready", "_expected_title": first_title, }) logger.info( "Setup env visuel généré : %d actions pour lancer '%s' " "(recherche visuelle : '%s')", len(actions), primary_app, search_text, ) return actions # ========================================================================= # Replay — Fonctions de conversion workflow → actions # ========================================================================= def _find_active_agent_session(session_manager, machine_id: Optional[str] = None) -> Optional[str]: """Trouver la dernière session Agent V1 pour le replay. Stratégie en 2 passes : 1. D'abord chercher une session `sess_*` non-finalisée (Agent V1 actif) 2. Sinon, pour une machine ciblée, réutiliser `bg_` si présent 3. Sinon, prendre la plus récente `sess_*` même finalisée Args: session_manager: Instance LiveSessionManager. machine_id: Si fourni, ne chercher que les sessions de cette machine. """ with session_manager._lock: bg_session_id = f"bg_{machine_id}" if machine_id else None def _matches_machine(session) -> bool: if machine_id is None: return True if session.machine_id == machine_id: return True # Robustesse au redémarrage : certaines sessions de fond peuvent # encore être restaurées avec machine_id='default' alors que leur # session_id encode déjà la vraie machine. return bool(bg_session_id and session.session_id == bg_session_id) all_agent_sessions = [ s for s in session_manager._sessions.values() if s.session_id.startswith("sess_") and _matches_machine(s) ] background_session = next( ( s for s in session_manager._sessions.values() if bg_session_id and s.session_id == bg_session_id ), None, ) # Trier par session_id (contient un timestamp) — plus récent d'abord all_agent_sessions.sort(key=lambda s: s.session_id, reverse=True) # Passe 1 : préférer une session non-finalisée for s in all_agent_sessions: if not s.finalized: return s.session_id # Passe 2 : fallback sur la session de fond de la machine si elle existe. if background_session and not background_session.finalized: return background_session.session_id # Passe 3 : fallback sur la plus récente (même finalisée) if all_agent_sessions: return all_agent_sessions[0].session_id if background_session: return background_session.session_id return None def _workflow_to_actions( workflow, params: Optional[Dict[str, Any]] = None, processor=None, gesture_catalog=None, ) -> List[Dict[str, Any]]: """ Convertir un workflow (nodes + edges ordonnés) en liste d'actions normalisées. Parcourt le graphe depuis les entry_nodes en suivant les edges. Chaque edge produit une action normalisée avec coordonnées en pourcentage. Mode intelligent (workflows appris par Léa) : Si le workflow a des nodes avec des prototype_vectors, utilise le StreamProcessor.extract_enriched_actions() qui enrichit les actions avec les données de la session originale, le ciblage visuel et le pre-check/post-check par embedding CLIP. Mode classique (workflows VWB/manuels) : Parcours BFS classique avec _edge_to_normalized_actions(). """ params = params or {} # Détection d'un workflow appris (a des nodes avec prototype_vectors) # et qui a des edges structurés if _is_learned_workflow(workflow) and processor is not None: # Priorité 1 : replay hybride (événements bruts + structure workflow) hybrid = processor.build_hybrid_replay(workflow) if hybrid: logger.info( "Replay hybride : %d actions depuis events bruts + structure workflow", len(hybrid), ) # Optimisation par gestes clavier si disponible if gesture_catalog and hybrid: hybrid = gesture_catalog.optimize_replay_actions(hybrid) return hybrid # Priorité 2 : enrichissement classique (fallback si hybride échoue) enriched = processor.extract_enriched_actions(workflow, params) if enriched: logger.info( "Replay intelligent : %d actions enrichies depuis le workflow appris", len(enriched), ) if gesture_catalog and enriched: enriched = gesture_catalog.optimize_replay_actions(enriched) return enriched # Si l'enrichissement échoue aussi, fallback sur le mode classique logger.warning( "Enrichissement échoué pour le workflow appris, fallback mode classique" ) # Mode classique (VWB/manuels ou fallback) actions = [] # Construire un index des edges sortants par node outgoing: Dict[str, list] = defaultdict(list) for edge in workflow.edges: outgoing[edge.from_node].append(edge) # Parcours linéaire depuis le premier entry_node visited = set() current_nodes = list(workflow.entry_nodes) if workflow.entry_nodes else [] # Fallback : si pas d'entry_nodes, prendre le premier node if not current_nodes and workflow.nodes: current_nodes = [workflow.nodes[0].node_id] while current_nodes: node_id = current_nodes.pop(0) if node_id in visited: continue visited.add(node_id) edges = outgoing.get(node_id, []) for edge in edges: edge_actions = _edge_to_normalized_actions(edge, params) actions.extend(edge_actions) # Suivre le graphe vers le prochain node if edge.to_node not in visited: current_nodes.append(edge.to_node) # Optimisation : substituer les actions visuelles par des gestes clavier si possible if gesture_catalog and actions: actions = gesture_catalog.optimize_replay_actions(actions) return actions def _is_learned_workflow(workflow) -> bool: """Détecter si un workflow est un workflow appris (vs VWB/manuel). Un workflow appris a : - Des nodes avec _prototype_vector dans metadata - Des edges avec from_node/to_node - Un learning_state indicatif (OBSERVATION, COACHING, AUTO_CANDIDATE, etc.) Un workflow VWB/manuel a généralement : - Des edges avec des target_spec complets (by_text, by_role remplis) - Pas de prototype_vectors """ # Accéder aux données (objet ou dict) if hasattr(workflow, 'nodes'): nodes = workflow.nodes edges = workflow.edges elif isinstance(workflow, dict): nodes = workflow.get('nodes', []) edges = workflow.get('edges', []) else: return False if not nodes or not edges: return False # Vérifier si au moins un node a un prototype_vector has_prototype = False for node in nodes: metadata = node.metadata if hasattr(node, 'metadata') else node.get('metadata', {}) if isinstance(metadata, dict) and '_prototype_vector' in metadata: has_prototype = True break return has_prototype _TARGET_SEMANTIC_KEYS = ( "by_text", "by_role", "anchor_id", "target_text", "ocr_description", "description", "vlm_description", "anchor_image_base64", "by_text_source", "window_title", "anchor_bbox", "original_size", ) def _first_non_empty_text(*values: Any) -> str: for value in values: text = str(value or "").strip() if text and text.casefold() not in {"none", "null"}: return text return "" def _target_attr(target: Any, key: str, default: Any = None) -> Any: if isinstance(target, dict): return target.get(key, default) return getattr(target, key, default) def _copy_semantic_target_fields( target_spec: Dict[str, Any], *sources: Optional[Dict[str, Any]], ) -> None: for source in sources: if not isinstance(source, dict): continue for key in _TARGET_SEMANTIC_KEYS: value = source.get(key) if value and not target_spec.get(key): target_spec[key] = value if not target_spec.get("by_text"): target_text = _first_non_empty_text(target_spec.get("target_text")) if target_text: target_spec["by_text"] = target_text target_spec.setdefault("by_text_source", "visual_anchor") if not target_spec.get("vlm_description"): description = _first_non_empty_text( target_spec.get("description"), target_spec.get("ocr_description"), ) if description: target_spec["vlm_description"] = description def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, Any]]: """ Convertir un WorkflowEdge en liste d'actions normalisées pour l'Agent V1. Un edge simple produit 1 action, un edge compound produit N actions (une par step). """ action = edge.action if action is None: logger.warning(f"Edge {edge.edge_id} sans action, skip") return [] action_type = action.type target = action.target action_params = action.parameters or {} # Extraire les coordonnées normalisées depuis TargetSpec.by_position x_pct = 0.0 y_pct = 0.0 by_position = _target_attr(target, "by_position") if target and by_position: px, py = by_position if px <= 1.0 and py <= 1.0: x_pct = px y_pct = py else: ref_w = action_params.get("ref_width", 1920) or 1920 ref_h = action_params.get("ref_height", 1080) or 1080 x_pct = round(px / ref_w, 6) y_pct = round(py / ref_h, 6) base = {"edge_id": edge.edge_id, "from_node": edge.from_node, "to_node": edge.to_node} # Compound : décomposer en actions individuelles if action_type == "compound": return _expand_compound_steps(action_params.get("steps", []), base, params) # Actions simples normalized = {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"} if action_type == "mouse_click": normalized["type"] = "click" normalized["x_pct"] = x_pct normalized["y_pct"] = y_pct normalized["button"] = action_params.get("button", "left") elif action_type == "text_input": normalized["type"] = "type" text = action_params.get("text", "") text = _substitute_variables(text, params, action_params.get("defaults", {})) normalized["text"] = text normalized["x_pct"] = x_pct normalized["y_pct"] = y_pct elif action_type == "key_press": normalized["type"] = "key_combo" keys = action_params.get("keys", []) if not keys and action_params.get("key"): keys = [action_params["key"]] normalized["keys"] = keys elif action_type == "pause_for_human": normalized["type"] = "pause_for_human" normalized["parameters"] = { "message": action_params.get("message", "Validation requise"), } return [normalized] # pas de target/coords pour cette action logique elif action_type == "extract_text": normalized["type"] = "extract_text" normalized["parameters"] = { "output_var": action_params.get("output_var", "extracted_text"), "paragraph": bool(action_params.get("paragraph", True)), } return [normalized] elif action_type == "paste_and_execute": normalized["type"] = "paste_and_execute" normalized["parameters"] = {} return [normalized] elif action_type == "extract_table": normalized["type"] = "extract_table" normalized["parameters"] = { "output_var": ( action_params.get("variable_name") or action_params.get("output_var") or "table_rows" ), "pattern": action_params.get("pattern"), "limit": action_params.get("limit"), "region": action_params.get("region"), "engine": action_params.get("engine", "easyocr"), } return [normalized] elif action_type == "extract_text_scroll": # Expansion en séquence : OCR(top) → Ctrl+End → wait → OCR(bottom) # → concat(top, bottom → final) → Ctrl+Home. # variable_name (préféré) ou output_var (compat extract_text). final_var = ( action_params.get("variable_name") or action_params.get("output_var") or "extracted_text" ) paragraph = bool(action_params.get("paragraph", True)) # Pause après scroll Ctrl+End — configurable au step. # Default 500ms (Wikipedia) ; cible 1500-2000ms pour DPI Citrix lent. try: scroll_pause = int(action_params.get("scroll_pause_ms", SCROLL_PAUSE_MS)) except (TypeError, ValueError): scroll_pause = SCROLL_PAUSE_MS # Variables internes nommées par préfixe : invisibles à l'utilisateur. # Préfixe `_` pour signaler "interne" et éviter collision. top_var = f"__{final_var}_top" bottom_var = f"__{final_var}_bottom" return _expand_extract_text_scroll( base, final_var, top_var, bottom_var, paragraph, scroll_pause_ms=scroll_pause, ) elif action_type == "t2a_decision": normalized["type"] = "t2a_decision" normalized["parameters"] = { "input_template": action_params.get("input_template", ""), "output_var": action_params.get("output_var", "t2a_result"), "model": action_params.get("model"), } return [normalized] elif action_type == "llm_generate": normalized["type"] = "llm_generate" normalized["parameters"] = { "prompt": action_params.get("prompt", ""), "context": action_params.get("context", ""), "output_var": ( action_params.get("output_var") or action_params.get("variable_name") or "generated_text" ), "model": action_params.get("model"), } if action_params.get("temperature") is not None: normalized["parameters"]["temperature"] = action_params.get("temperature") return [normalized] elif action_type == "navigate": normalized["type"] = "navigate" normalized["parameters"] = { "action": action_params.get("action", "login"), "login_coords_var": action_params.get("login_coords_var", "navigate_login_coords"), "password_coords_var": action_params.get("password_coords_var", "navigate_password_coords"), "submit_coords_var": action_params.get("submit_coords_var", "navigate_submit_coords"), } login_config_keys = ("login_field", "password_field", "submit_button", "success_elements", "context") for key in login_config_keys: if action_params.get(key) is not None: normalized["parameters"][key] = action_params[key] return [normalized] else: logger.warning(f"Type d'action inconnu : {action_type}") return [] # Ajouter le target_spec complet pour la résolution visuelle target_spec = {} by_role = _target_attr(target, "by_role", "") by_text = _target_attr(target, "by_text", "") context_hints = _target_attr(target, "context_hints", {}) or {} if target and by_role: target_spec["by_role"] = by_role normalized["target_role"] = by_role # Compat debug if target and by_text: target_spec["by_text"] = by_text normalized["target_text"] = by_text # Compat debug if target and context_hints: target_spec["context_hints"] = context_hints _copy_semantic_target_fields( target_spec, action_params, action_params.get("target_spec") if isinstance(action_params, dict) else None, context_hints, ) semantic_label = _first_non_empty_text( target_spec.get("by_text"), target_spec.get("target_text"), target_spec.get("description"), target_spec.get("ocr_description"), target_spec.get("vlm_description"), ) if semantic_label: normalized.setdefault("target_text", target_spec.get("target_text") or semantic_label) normalized.setdefault("target_description", semantic_label) if target_spec: normalized["target_spec"] = target_spec normalized["visual_mode"] = True # Signal à l'agent d'utiliser la résolution visuelle return [normalized] def _substitute_variables(text: str, params: Dict[str, Any], defaults: Dict[str, Any]) -> str: """Substituer les variables ${var} dans un texte. Priorité : params utilisateur > defaults du workflow > texte brut inchangé. Supporte ${var} dans un texte plus long (ex: "${expression}="). """ def replacer(match): var_name = match.group(1) return str(params.get(var_name, defaults.get(var_name, match.group(0)))) return re.sub(r'\$\{(\w+)\}', replacer, text) # Regex pour le templating runtime : {{var}} ou {{var.champ}} ou {{var.champ.sous}} _RUNTIME_VAR_PATTERN = re.compile(r'\{\{\s*(\w+)(?:\.([\w.]+))?\s*\}\}') def _resolve_runtime_vars_in_str(text: str, variables: Dict[str, Any]) -> str: """Remplace {{var}} et {{var.field}} par leur valeur depuis le dict variables. Variables/champs absents : laissés tels quels (ne casse pas le pipeline). Pour les valeurs non-str (dict, list), str() est appelé. """ def replacer(match): var_name = match.group(1) path = match.group(2) if var_name not in variables: return match.group(0) value = variables[var_name] if path: for field in path.split('.'): if isinstance(value, dict) and field in value: value = value[field] else: return match.group(0) return str(value) return _RUNTIME_VAR_PATTERN.sub(replacer, text) def _resolve_runtime_vars(value: Any, variables: Dict[str, Any]) -> Any: """Résout récursivement les {{var}} et {{var.field}} dans une valeur. Supporte str, dict, list. Les autres types sont retournés tels quels. Si variables est vide ou None, value est retournée inchangée. """ if not variables: return value if isinstance(value, str): return _resolve_runtime_vars_in_str(value, variables) if isinstance(value, dict): return {k: _resolve_runtime_vars(v, variables) for k, v in value.items()} if isinstance(value, list): return [_resolve_runtime_vars(item, variables) for item in value] return value def _coerce_action_coords(action: dict) -> dict: """Cast x_pct/y_pct en float après template resolution par _resolve_runtime_vars. Politique : si string non convertible ou template encore present → skip + pause_for_human. Idempotent sur les actions qui ont déjà des floats (mouse_click existant). Jamais fallback 0.0/0.0 — un clic sur coords (0,0) = top-left = potentiellement dangereux. Appelé APRÈS _resolve_runtime_vars dans la boucle dispatch (api_stream.py). """ for key in ("x_pct", "y_pct"): val = action.get(key) if val is None: continue if isinstance(val, float): continue # déjà float, idempotent if isinstance(val, str): # Template encore présent = non résolu par _resolve_runtime_vars if val.startswith("{{") and val.endswith("}}"): action["_skip_reason"] = f"coords_var non résolu: {key}={val}" action["type"] = "pause_for_human" action["safety_level"] = "high" return action try: action[key] = float(val) except (ValueError, TypeError): action["_skip_reason"] = f"coords invalide: {key}={val}" action["type"] = "pause_for_human" action["safety_level"] = "high" return action return action # ========================================================================= # Handlers pour les actions exécutées côté serveur (extract_text, t2a_decision) # ========================================================================= def _normalize_ollama_endpoint(raw_url: str) -> str: """Normalise une URL Ollama pour les clients qui attendent l'endpoint racine. `OLLAMA_URL` est parfois configuré vers `/api/generate` alors que `LLMActionHandler` attend la racine `http://host:port`. """ endpoint = (raw_url or "http://localhost:11434").strip().rstrip("/") for suffix in ("/api/generate", "/api/chat"): if endpoint.endswith(suffix): return endpoint[: -len(suffix)] return endpoint def _handle_extract_text_action( action: Dict[str, Any], replay_state: Dict[str, Any], session_id: str, last_heartbeat: Dict[str, Dict[str, Any]], ) -> bool: """Traite une action extract_text côté serveur. Stocke le texte OCRisé dans replay_state["variables"][output_var]. Retourne True si succès. Robuste aux échecs : si pas de heartbeat ou OCR raté, stocke "" et retourne False (le pipeline continue, pas de blocage). """ params = action.get("parameters") or {} # Compatibilité VWB : "variable_name" (VWB) et "output_var" (agent libre) output_var = (params.get("output_var") or params.get("variable_name") or "extracted_text").strip() paragraph = bool(params.get("paragraph", True)) # Source prioritaire : screenshot envoyé par l'agent après la dernière action. # Si c'est du base64, on le sauvegarde dans un fichier temp pour l'OCR. # Fallback : heartbeat de fond (vrai chemin serveur, via "bg_{machine_id}"). path = None raw_screenshot = replay_state.get("last_screenshot") or "" if raw_screenshot: if raw_screenshot.startswith("data:"): # base64 → fichier temp try: import base64, tempfile header, b64data = raw_screenshot.split(",", 1) suffix = ".jpg" if "jpeg" in header else ".png" tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) tmp.write(base64.b64decode(b64data)) tmp.close() path = tmp.name except Exception as e: logger.warning("extract_text: décodage base64 screenshot échoué: %s", e) elif os.path.isfile(raw_screenshot): path = raw_screenshot if not path: machine_id = replay_state.get("machine_id", "") bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None heartbeat = ( last_heartbeat.get(session_id) or (last_heartbeat.get(bg_session) if bg_session else None) or {} ) path = heartbeat.get("path") text = "" if path: try: from core.llm import extract_text_from_image text = extract_text_from_image(path, paragraph=paragraph) except Exception as e: logger.warning("extract_text OCR échoué (%s) — variable '%s' = ''", e, output_var) else: logger.warning( "extract_text : pas de heartbeat pour session %s — variable '%s' = ''", session_id, output_var, ) replay_state.setdefault("variables", {})[output_var] = text logger.info( "extract_text → variable '%s' (%d chars) replay %s", output_var, len(text), replay_state.get("replay_id", "?"), ) return bool(text) def _handle_extract_table_action( action: Dict[str, Any], replay_state: Dict[str, Any], session_id: str, last_heartbeat: Dict[str, Dict[str, Any]], ) -> bool: """Traite une action extract_table côté serveur. OCR + filtre regex pour retourner une liste structurée (ex : IPP d'un tableau de patients) qui pourra être bouclée par le templating ${patients[i]}. Paramètres reconnus : output_var : nom de variable runtime (default "table_rows") pattern : regex à matcher sur chaque token OCR (ex : r"^25\\d{6}$") limit : nb max d'entrées à retourner engine : easyocr (défaut) ou tesseract/digits/ipp pour chiffres region : (x, y, w, h) en pixels pour cropper avant OCR (None = image entière) Robuste aux échecs : si pas de heartbeat ou OCR raté, stocke [] et retourne False — le pipeline continue. """ params = action.get("parameters") or {} output_var = (params.get("output_var") or params.get("variable_name") or "table_rows").strip() pattern = params.get("pattern") or None limit = params.get("limit") engine = params.get("engine") or "easyocr" region = params.get("region") or None if isinstance(limit, str): try: limit = int(limit) except ValueError: limit = None # Source : screenshot du heartbeat (idem extract_text) path = None raw_screenshot = replay_state.get("last_screenshot") or "" if raw_screenshot: if raw_screenshot.startswith("data:"): try: import base64, tempfile header, b64data = raw_screenshot.split(",", 1) suffix = ".jpg" if "jpeg" in header else ".png" tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) tmp.write(base64.b64decode(b64data)) tmp.close() path = tmp.name except Exception as e: logger.warning("extract_table: décodage base64 screenshot échoué: %s", e) elif os.path.isfile(raw_screenshot): path = raw_screenshot if not path: machine_id = replay_state.get("machine_id", "") bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None heartbeat = ( last_heartbeat.get(session_id) or (last_heartbeat.get(bg_session) if bg_session else None) or {} ) path = heartbeat.get("path") rows: list = [] if path: try: from core.llm import extract_table_from_image rows = extract_table_from_image( path, region=tuple(region) if region else None, pattern=pattern, limit=limit, engine=engine, ) except Exception as e: logger.warning( "extract_table OCR échoué (%s) — variable '%s' = []", e, output_var, ) else: logger.warning( "extract_table : pas de heartbeat pour session %s — variable '%s' = []", session_id, output_var, ) replay_state.setdefault("variables", {})[output_var] = rows logger.info( "extract_table → variable '%s' (%d entrées, pattern=%r, limit=%s, engine=%s) replay %s", output_var, len(rows), pattern, limit, engine, replay_state.get("replay_id", "?"), ) return bool(rows) def _resolve_screenshot_path(replay_state: Dict[str, Any]) -> Optional[str]: """Résout le chemin du dernier screenshot (path disque ou base64 → temp). Calque la source utilisée par extract_text/extract_table : priorité au ``last_screenshot`` (path ou data-URI base64). Retourne None si absent. """ raw_screenshot = replay_state.get("last_screenshot") or "" if not raw_screenshot: return None if raw_screenshot.startswith("data:"): try: import base64 as _b64, tempfile header, b64data = raw_screenshot.split(",", 1) suffix = ".jpg" if "jpeg" in header else ".png" tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) tmp.write(_b64.b64decode(b64data)) tmp.close() return tmp.name except Exception as e: logger.warning("extract_dossier: décodage base64 screenshot échoué: %s", e) return None if os.path.isfile(raw_screenshot): return raw_screenshot return None def _gate_dossier_quality( grid: List[List[Dict[str, Any]]], *, min_confidence: float, expected_cols: Optional[int], ) -> str: """Gate qualité simple → 'complete' ou 'needs_review'. 'complete' SSI : grille non vide ET confiance médiane ≥ seuil ET (si expected_cols fourni) au moins une ligne avec ce nombre de colonnes. Sinon 'needs_review'. Volontairement conservatrice (default-review). """ confs = [ cell.get("confidence") for row in grid for cell in row if isinstance(cell.get("confidence"), (int, float)) ] if not confs: return "needs_review" confs.sort() median = confs[len(confs) // 2] if median < min_confidence: return "needs_review" if expected_cols is not None: if not any(len(row) == expected_cols for row in grid): return "needs_review" return "complete" def _handle_extract_dossier_action( action: Dict[str, Any], replay_state: Dict[str, Any], session_id: str, ) -> bool: """Traite une action extract_dossier côté serveur (brique 3). Lit le dernier screenshot, extrait une grille structurée via ``extract_grid_from_image``, applique une gate qualité, puis PERSISTE un « dossier patient extrait » (Job/Table/Field) dans la DB VWB avec preuve (screenshot_ref + screen_bbox + confidences). Le job_id est stocké dans ``replay_state["variables"][output_var]``. Paramètres reconnus (action.parameters) : output_var : nom de variable runtime (default "extracted_dossier") patient_ref : référence patient EN CLAIR (volontaire) — non tokenisée region : (x, y, w, h) px pour cropper avant OCR (None = plein) min_confidence : seuil de confiance médiane pour 'complete' (default 0.6) expected_cols : nb de colonnes attendu (optionnel) pour la gate N'ÉCHOUE JAMAIS le replay : toute erreur → log + needs_review. Retourne True SSI le dossier est persisté avec statut 'complete'. """ params = action.get("parameters") or {} output_var = (params.get("output_var") or params.get("variable_name") or "extracted_dossier").strip() patient_ref = params.get("patient_ref") region = params.get("region") or None try: min_confidence = float(params.get("min_confidence", 0.6)) except (TypeError, ValueError): min_confidence = 0.6 expected_cols = params.get("expected_cols") if isinstance(expected_cols, str): try: expected_cols = int(expected_cols) except ValueError: expected_cols = None job_id = "" status = "needs_review" try: path = _resolve_screenshot_path(replay_state) grid: List[List[Dict[str, Any]]] = [] if path: from core.llm import extract_grid_from_image grid = extract_grid_from_image( path, region=tuple(region) if region else None ) else: logger.warning( "extract_dossier : pas de screenshot pour session %s — needs_review", session_id, ) status = _gate_dossier_quality( grid, min_confidence=min_confidence, expected_cols=expected_cols ) from . import vwb_db with vwb_db.vwb_app_context(): job_id = vwb_db.persist_extracted_dossier( grid, patient_ref=patient_ref, source_session_id=session_id, screenshot_ref=path, screen_bbox=({"x": region[0], "y": region[1], "width": region[2], "height": region[3]} if region and len(region) == 4 else None), status=status, ) except Exception as e: # Ne JAMAIS échouer le replay : on log, on marque needs_review. logger.warning( "extract_dossier : échec persistance (%s) — needs_review, replay %s", e, replay_state.get("replay_id", "?"), ) status = "needs_review" replay_state.setdefault("variables", {})[output_var] = job_id logger.info( "extract_dossier → variable '%s' job=%s statut=%s replay %s", output_var, job_id or "?", status, replay_state.get("replay_id", "?"), ) return status == "complete" def _handle_t2a_decision_action( action: Dict[str, Any], replay_state: Dict[str, Any], ) -> bool: """Traite une action t2a_decision côté serveur. Stocke le résultat JSON dans replay_state["variables"][output_var]. Retourne True si succès. Le DPI à analyser vient de action.parameters.input_template (déjà résolu par _resolve_runtime_vars donc les {{var}} sont remplis). """ params = action.get("parameters") or {} output_var = (params.get("output_var") or "t2a_result").strip() dpi_text = (params.get("input_template") or params.get("dpi") or "").strip() model = params.get("model") or None # None → DEFAULT_MODEL # Bypass LLM : si static_result est fourni dans les params, on l'utilise # tel quel comme résultat. Utile pour les démos déterministes (pas de # hallucination, pas de latence, pas de truncation de prompt). static_result = params.get("static_result") if isinstance(static_result, dict) and static_result.get("decision"): replay_state.setdefault("variables", {})[output_var] = static_result logger.info( "t2a_decision (STATIC) → variable '%s' decision=%s replay %s", output_var, static_result.get("decision"), replay_state.get("replay_id", "?"), ) return True if not dpi_text: logger.warning( "t2a_decision : input vide — variable '%s' = {decision: 'INDETERMINE'}", output_var, ) replay_state.setdefault("variables", {})[output_var] = { "decision": "INDETERMINE", "justification": "DPI vide ou non extrait", "confiance": "faible", "_error": "empty_input", } return False try: from core.llm import analyze_dpi, DEFAULT_MODEL, build_dpi_enriched # Enrichissement déterministe avant LLM : injection FAITS_CALCULÉS en # tête (durée, âge, CCMU/GEMSA/priorité…) pour neutraliser les # hallucinations de durée (cf. bug "23h" MOREL). La metadata est # capturée pour les garde-fous serveur (commit 2 — Python ↔ LLM). dpi_enriched, metadata = build_dpi_enriched(dpi_text) logger.info( "[build_dpi_enriched] duree_python=%sh decision_terrain=%r warnings=%d", metadata.get("duree_heures_decimales"), metadata.get("decision_terrain"), len(metadata.get("parsing_warnings", [])), ) result = analyze_dpi(dpi_enriched, model=model or DEFAULT_MODEL) except Exception as e: logger.warning("t2a_decision : analyze_dpi exception %s", e) result = { "decision": "INDETERMINE", "justification": f"Erreur analyse : {e}", "confiance": "faible", "_error": str(e), } # Si parse_error, injecter des valeurs de fallback pour que les templates restent lisibles if result.get("_parse_error"): raw_preview = result.get("_raw", "")[:200] logger.warning("t2a_decision parse_error — raw: %s", raw_preview) result.setdefault("decision", "INDETERMINE") result.setdefault("decision_court", "À vérifier") result.setdefault("preuve_critere1", "Analyse non disponible (erreur LLM)") result.setdefault("preuve_critere2", "Analyse non disponible (erreur LLM)") result.setdefault("preuve_critere3", "Analyse non disponible (erreur LLM)") result.setdefault("justification", f"Réponse LLM non parsable : {raw_preview}") result.setdefault("confiance", "faible") replay_state.setdefault("variables", {})[output_var] = result decision = result.get("decision", "?") elapsed = result.get("_elapsed_s", "?") logger.info( "t2a_decision → variable '%s' decision=%s (%ss) replay %s", output_var, decision, elapsed, replay_state.get("replay_id", "?"), ) return "_error" not in result and not result.get("_parse_error") def _handle_llm_generate_action( action: Dict[str, Any], replay_state: Dict[str, Any], ) -> bool: """Traite une action llm_generate côté serveur. Stocke le texte généré dans replay_state["variables"][output_var]. Les paramètres `prompt` et `context` sont déjà résolus via le templating runtime avant d'arriver ici. """ params = action.get("parameters") or {} output_var = ( params.get("output_var") or params.get("variable_name") or "generated_text" ).strip() # Bypass LLM : si static_text est fourni dans les params, on l'utilise # tel quel. Utile pour les démos déterministes. static_text = params.get("static_text") if isinstance(static_text, str) and static_text.strip(): replay_state.setdefault("variables", {})[output_var] = static_text logger.info( "llm_generate (STATIC) → variable '%s' (%d chars) replay %s", output_var, len(static_text), replay_state.get("replay_id", "?"), ) return True prompt = str(params.get("prompt") or "").strip() context = str(params.get("context") or "") model = params.get("model") or None temperature = None if params.get("temperature") is not None: try: temperature = float(params.get("temperature")) except (TypeError, ValueError): logger.warning( "llm_generate : temperature invalide %r — fallback valeur par défaut", params.get("temperature"), ) if not prompt: logger.warning( "llm_generate : prompt vide — variable '%s' = ''", output_var, ) replay_state.setdefault("variables", {})[output_var] = "" return False generated = "" try: from core.execution.llm_actions import LLMActionHandler handler = LLMActionHandler( ollama_endpoint=_normalize_ollama_endpoint( os.environ.get("OLLAMA_URL", "http://localhost:11434") ), timeout=180, ) generated = handler.generate_text( prompt=prompt, context=context, model=model, temperature=temperature, ).strip() except Exception as e: logger.warning("llm_generate : génération échouée (%s) — variable '%s' = ''", e, output_var) replay_state.setdefault("variables", {})[output_var] = generated logger.info( "llm_generate → variable '%s' (%d chars, model=%s) replay %s", output_var, len(generated), model or "default", replay_state.get("replay_id", "?"), ) return bool(generated) def _handle_concat_text_vars_action( action: Dict[str, Any], replay_state: Dict[str, Any], ) -> bool: """Traite une action serveur interne `_concat_text_vars`. Concatène deux variables runtime existantes (top_var + separator + bottom_var) et écrit le résultat dans output_var. Variables manquantes traitées comme "". Action générée par l'expansion de `extract_text_scroll` ; pas exposée à l'utilisateur final. Robuste aux échecs OCR amont (l'une ou l'autre var vide). """ params = action.get("parameters") or {} top_var = (params.get("top_var") or "").strip() bottom_var = (params.get("bottom_var") or "").strip() output_var = (params.get("output_var") or "extracted_text").strip() separator = params.get("separator", "\n\n") variables = replay_state.setdefault("variables", {}) top_text = str(variables.get(top_var, "") or "") bottom_text = str(variables.get(bottom_var, "") or "") # Si les deux sont vides, output reste "" (cohérent avec _handle_extract_text_action). # Si un seul est vide, on évite un séparateur inutile en début/fin. if top_text and bottom_text: merged = top_text + separator + bottom_text else: merged = top_text or bottom_text variables[output_var] = merged # Nettoyage des variables internes pour ne pas polluer l'état. if top_var.startswith("__"): variables.pop(top_var, None) if bottom_var.startswith("__"): variables.pop(bottom_var, None) logger.info( "extract_text_scroll concat → variable '%s' (%d chars) replay %s", output_var, len(merged), replay_state.get("replay_id", "?"), ) return bool(merged) def _handle_paste_and_execute_action( action: Dict[str, Any], replay_state: Dict[str, Any], ) -> bool: """Action serveur : invoque scripts/paste_and_execute_linuxdb.sh pour déclencher Ctrl+V + Ctrl+Enter dans DBeaver de la VM via ydotool. Bypasse Léa/NoMachine (Ctrl mangé par NoMachine passive grab). Cf. handoff 2026-05-16_handoff_ydotool_clipboard.md. """ import subprocess script_path = "/home/dom/ai/rpa_vision_v3/scripts/paste_and_execute_linuxdb.sh" try: result = subprocess.run( [script_path], timeout=30, capture_output=True, text=True, ) if result.returncode != 0: logger.warning( "paste_and_execute échoué (rc=%d) stderr=%s", result.returncode, (result.stderr or "")[:500], ) return False logger.info("paste_and_execute OK replay %s", replay_state.get("replay_id", "?")) return True except subprocess.TimeoutExpired: logger.warning("paste_and_execute timeout (30s)") return False except Exception as e: logger.warning("paste_and_execute exception : %s", e) return False def _expand_extract_text_scroll( base: Dict[str, Any], final_var: str, top_var: str, bottom_var: str, paragraph: bool, scroll_pause_ms: int = SCROLL_PAUSE_MS, ) -> List[Dict[str, Any]]: """Expanse un step extract_text_scroll en séquence d'actions atomiques. Séquence générée : 1. extract_text(top_var) — OCR zone visible (haut de page) 2. key_combo(ctrl+end) — scroll bas (côté client Léa V1) 3. wait(scroll_pause_ms) — laisse DOM/UI se redessiner 4. extract_text(bottom_var) — OCR zone visible (bas de page) 5. _concat_text_vars(top, bottom→final) — action serveur interne 6. key_combo(ctrl+home) — remet en haut Toutes les sous-actions héritent de `base` (edge_id, from_node, to_node) pour la traçabilité. Chaque action obtient un action_id unique. `scroll_pause_ms` : configurable au step (défaut SCROLL_PAUSE_MS=500ms). """ def _new_action() -> Dict[str, Any]: return {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"} a1 = _new_action() a1["type"] = "extract_text" a1["parameters"] = {"output_var": top_var, "paragraph": paragraph} a2 = _new_action() a2["type"] = "key_combo" a2["keys"] = ["ctrl", "end"] a3 = _new_action() a3["type"] = "wait" a3["duration_ms"] = scroll_pause_ms a4 = _new_action() a4["type"] = "extract_text" a4["parameters"] = {"output_var": bottom_var, "paragraph": paragraph} a5 = _new_action() a5["type"] = "_concat_text_vars" a5["parameters"] = { "top_var": top_var, "bottom_var": bottom_var, "output_var": final_var, "separator": "\n\n", } a6 = _new_action() a6["type"] = "key_combo" a6["keys"] = ["ctrl", "home"] return [a1, a2, a3, a4, a5, a6] def _expand_compound_steps( steps: List[Dict[str, Any]], base: Dict[str, Any], params: Dict[str, Any] ) -> List[Dict[str, Any]]: """Décomposer les steps d'un compound en actions individuelles.""" actions = [] for step in steps: step_type = step.get("type", "unknown") action = { **base, "action_id": f"act_{uuid.uuid4().hex[:8]}", } if step_type == "key_press": action["type"] = "key_combo" keys = step.get("keys", []) if not keys and step.get("key"): keys = [step["key"]] action["keys"] = keys elif step_type == "text_input": action["type"] = "type" text = step.get("text", "") text = _substitute_variables(text, params, {}) action["text"] = text elif step_type == "wait": action["type"] = "wait" action["duration_ms"] = step.get("duration_ms", 500) elif step_type == "mouse_click": action["type"] = "click" action["x_pct"] = step.get("x_pct", 0.0) action["y_pct"] = step.get("y_pct", 0.0) action["button"] = step.get("button", "left") target_spec: Dict[str, Any] = {} _copy_semantic_target_fields( target_spec, step, step.get("target_spec") if isinstance(step, dict) else None, step.get("visual_anchor") if isinstance(step, dict) else None, ) semantic_label = _first_non_empty_text( target_spec.get("by_text"), target_spec.get("target_text"), target_spec.get("description"), target_spec.get("ocr_description"), target_spec.get("vlm_description"), ) if semantic_label: action.setdefault( "target_text", target_spec.get("target_text") or semantic_label, ) action.setdefault("target_description", semantic_label) if target_spec: action["target_spec"] = target_spec action["visual_mode"] = True else: logger.debug(f"Step compound inconnu : {step_type}") continue actions.append(action) return actions # ========================================================================= # Pre-check écran — Vérification pré-action par embedding CLIP # ========================================================================= def _pre_check_screen_state( session_id: str, expected_node_id: str, current_screenshot_path: str, active_processor, replay_states: Dict[str, Dict[str, Any]], replay_lock: threading.Lock, precheck_threshold: float = 0.85, ) -> Dict[str, Any]: """Vérifier que l'écran actuel correspond à l'état attendu du node. Compare le screenshot actuel avec le prototype du node attendu via similarité d'embedding CLIP (rapide, ~200ms). Args: session_id: ID de la session de replay expected_node_id: ID du node source de l'action (from_node) current_screenshot_path: Chemin du screenshot heartbeat récent active_processor: Instance StreamProcessor avec le CLIPEmbedder chargé replay_states: Dict partagé des états de replay replay_lock: Lock pour l'accès concurrent aux replay_states precheck_threshold: Seuil de similarité cosine Returns: {"match": True/False, "similarity": float, "expected_node": str, "reason": str (si mismatch), "popup_detected": bool} """ result: Dict[str, Any] = { "match": True, "similarity": 1.0, "expected_node": expected_node_id, "popup_detected": False, } try: # 1. Trouver le workflow actif pour cette session replay_state = None workflow = None with replay_lock: for state in replay_states.values(): if state["session_id"] == session_id and state["status"] == "running": replay_state = state break if not replay_state: result["reason"] = "no_active_replay" return result workflow_id = replay_state.get("workflow_id", "") with active_processor._data_lock: workflow = active_processor._workflows.get(workflow_id) if workflow is None: result["reason"] = "workflow_not_found" return result # 2. Récupérer le prototype du node attendu # Supporter à la fois les objets Workflow et les dicts bruts node = None if hasattr(workflow, "get_node"): node = workflow.get_node(expected_node_id) elif isinstance(workflow, dict): # Format dict brut (workflows VWB/manuels) for n in workflow.get("nodes", []): if n.get("node_id") == expected_node_id: node = n break if node is None: result["reason"] = "node_not_found" return result # Extraire le prototype vector metadata = node.metadata if hasattr(node, "metadata") else node.get("metadata", {}) proto_list = metadata.get("_prototype_vector") if not proto_list or not isinstance(proto_list, (list, tuple)): result["reason"] = "no_prototype_vector" return result import numpy as np prototype_vector = np.array(proto_list, dtype=np.float32) # 3. Calculer l'embedding CLIP du screenshot actuel active_processor._ensure_initialized() if active_processor._clip_embedder is None: result["reason"] = "clip_embedder_unavailable" return result from PIL import Image pil_image = Image.open(current_screenshot_path) current_vector = active_processor._clip_embedder.embed_image(pil_image) if current_vector is None or len(current_vector) == 0: result["reason"] = "embedding_failed" return result # 4. Similarité cosine current_vector = current_vector.flatten().astype(np.float32) prototype_vector = prototype_vector.flatten().astype(np.float32) norm_current = np.linalg.norm(current_vector) norm_proto = np.linalg.norm(prototype_vector) if norm_current < 1e-8 or norm_proto < 1e-8: result["reason"] = "zero_norm_vector" result["match"] = False result["similarity"] = 0.0 return result similarity = float( np.dot(current_vector, prototype_vector) / (norm_current * norm_proto) ) result["similarity"] = round(similarity, 4) result["match"] = similarity >= precheck_threshold if not result["match"]: result["reason"] = "screen_mismatch" logger.warning( f"Pre-check MISMATCH pour session={session_id} " f"node={expected_node_id}: similarity={similarity:.4f} " f"< seuil={precheck_threshold}" ) # 5. Détection de popup par changement de titre de fenêtre result["popup_detected"] = _detect_popup_hint( session_id, workflow, expected_node_id, active_processor, ) except Exception as e: # Ne jamais bloquer le replay en cas d'erreur du pre-check logger.error(f"Pre-check échoué (non bloquant): {e}") result["match"] = True # Fallback permissif result["reason"] = f"precheck_error: {e}" return result def _detect_popup_hint( session_id: str, workflow: Any, expected_node_id: str, processor_instance=None, ) -> bool: """Détecter si une popup ou un dialogue modal est probable. Compare le titre de fenêtre actuel (via last_window_info de la session) avec le titre attendu du node dans le workflow. Un changement de titre suggère une popup/dialogue inattendu. Args: session_id: ID de la session workflow: Workflow object ou dict expected_node_id: ID du node attendu processor_instance: StreamProcessor pour accéder aux sessions Returns: True si un changement de titre suggère une popup """ try: if processor_instance is None: return False # Titre actuel depuis la session session = processor_instance.session_manager.get_session(session_id) if not session: return False current_title = session.last_window_info.get("title", "").strip().lower() if not current_title or current_title == "unknown": return False # Titre attendu depuis le node du workflow expected_title = "" if hasattr(workflow, "get_node"): node = workflow.get_node(expected_node_id) if node and hasattr(node, "template") and hasattr(node.template, "window"): window_spec = node.template.window if hasattr(window_spec, "title_contains") and window_spec.title_contains: expected_title = window_spec.title_contains.strip().lower() elif isinstance(workflow, dict): for n in workflow.get("nodes", []): if n.get("node_id") == expected_node_id: template = n.get("template", {}) window = template.get("window", {}) expected_title = (window.get("title_contains") or "").strip().lower() break if not expected_title: return False # Si le titre actuel ne contient plus le titre attendu, popup probable if expected_title not in current_title: logger.info( f"Popup détectée: titre actuel='{current_title}' " f"ne contient pas '{expected_title}'" ) return True except Exception as e: logger.debug(f"Détection popup échouée: {e}") return False # ========================================================================= # Replay — État et retry # ========================================================================= def _create_replay_state( replay_id: str, workflow_id: str, session_id: str, total_actions: int, params: Optional[Dict[str, Any]] = None, machine_id: Optional[str] = None, actions: Optional[List[Dict[str, Any]]] = None, ) -> Dict[str, Any]: """Créer un état de replay enrichi avec les champs de suivi d'erreur. Args: actions: Liste des actions du replay. Une copie slim (sans anchors base64) est stockée pour permettre à `/replay/result` de retrouver le `target_spec` de l'action courante — nécessaire pour l'apprentissage mémoire (Phase 1 plan Léa). """ # Copie slim des actions : on strip les anchor_image_base64 pour ne # pas gonfler la mémoire (anchors peuvent faire 50-200 KB chacun). # On conserve les champs utilisés par : # - la Phase 1 apprentissage (target_spec pour memory_record_success) # - le contrôle strict (success_strict, expected_window_*) # - les logs/audit (intention, action_id, type, coords) actions_slim: List[Dict[str, Any]] = [] if actions: for a in actions: a_copy = { "action_id": a.get("action_id"), "type": a.get("type"), "keys": a.get("keys"), "button": a.get("button"), "x_pct": a.get("x_pct"), "y_pct": a.get("y_pct"), # Contrôle strict des étapes (Dom, matin 10 avril 2026) "success_strict": a.get("success_strict", False), "expected_window_before": a.get("expected_window_before", ""), "expected_window_title": a.get("expected_window_title", ""), # Contexte métier utile pour logs et apprentissage "intention": a.get("intention", ""), "target_text": a.get("target_text", ""), "target_description": a.get("target_description", ""), "description": a.get("description", ""), } ts = a.get("target_spec") if isinstance(ts, dict): a_copy["target_spec"] = { k: v for k, v in ts.items() if k not in ("anchor_image_base64",) } actions_slim.append(a_copy) return { "replay_id": replay_id, "workflow_id": workflow_id, "session_id": session_id, "machine_id": machine_id or "default", # Machine cible du replay "status": "running", "total_actions": total_actions, "completed_actions": 0, "failed_actions": 0, "current_action_index": 0, "params": params or {}, "results": [], # Historique des résultats action par action "actions": actions_slim, # Copie slim pour lookup par index (Phase 1 mémoire) # Champs enrichis pour le suivi d'erreur (#7) "retried_actions": 0, "unverified_actions": 0, "error_log": [], # Liste des erreurs rencontrées "last_screenshot": None, # Path du dernier screenshot reçu "_last_screenshot_before": None, # Interne: screenshot avant la dernière action # Champs pour pause supervisée (target_not_found) "failed_action": None, # Contexte de l'action en echec (quand paused_need_help) "pause_message": None, # Message a afficher a l'utilisateur # Variables d'exécution produites en cours de workflow (extract_text, # t2a_decision, etc.). Résolues via templating {{var}} ou {{var.field}} # dans les paramètres des actions suivantes. "variables": {}, # QW2 — Anneaux d'historique pour LoopDetector (5 derniers max) "_screenshot_history": [], # images PIL des N derniers heartbeats (LoopDetector embed à chaque tick) "_action_history": [], # N dernières actions exécutées (signature) # QW4 — Safety checks (hybride déclaratif + LLM contextuel) et audit acquittements "safety_checks": [], # liste produite par SafetyChecksProvider "checks_acknowledged": [], # ids acquittés via /replay/resume (audit trail) "pause_reason": "", # "loop_detected" | "" pour V1 "pause_payload": None, # payload complet pour debug/audit } def _schedule_retry( session_id: str, replay_state: Dict[str, Any], action: Dict[str, Any], current_retry: int, reason: str, replay_queues: Dict[str, List[Dict[str, Any]]], retry_pending: Dict[str, Dict[str, Any]], max_retries: int = 3, ): """ Programmer un retry pour une action échouée. Stratégie : - Retry 1 : réinjecter l'action directement (re-résolution visuelle par l'agent) - Retry 2 : injecter un wait de 2s avant l'action (possible loading en cours) - Retry 3 : dernier essai direct L'action est réinsérée en tête de la queue pour être la prochaine exécutée. Le lock de replay doit être acquis par l'appelant. """ next_retry = current_retry + 1 replay_state["retried_actions"] += 1 # Créer une copie de l'action avec un nouveau action_id pour le tracking retry_action = dict(action) retry_action_id = f"{action.get('action_id', 'unknown')}_retry{next_retry}" retry_action["action_id"] = retry_action_id # Stocker l'info de retry pour le prochain report_action_result retry_pending[retry_action_id] = { "action": action, "dispatched_action": retry_action, "retry_count": next_retry, "replay_id": replay_state["replay_id"], "session_id": session_id, "machine_id": replay_state.get("machine_id", "default"), "dispatched_at": 0.0, "first_dispatched_at": 0.0, "resent_count": 0, "last_resent_at": 0.0, "reason": reason, } # Stratégie de retry selon le numéro actions_to_insert = [] if next_retry == 2: # Retry 2 : injecter un wait de 2s avant l'action wait_action = { "action_id": f"wait_retry_{uuid.uuid4().hex[:6]}", "type": "wait", "duration_ms": 2000, } actions_to_insert.append(wait_action) actions_to_insert.append(retry_action) # Insérer en tête de la queue (prochaine action à exécuter) queue = replay_queues.get(session_id, []) replay_queues[session_id] = actions_to_insert + queue logger.info( f"Retry {next_retry}/{max_retries} programmé pour {action.get('action_id')} " f"(raison: {reason}) | nouveau id: {retry_action_id}" ) def _notify_error_callback( replay_state: Dict[str, Any], action_id: str, error: Optional[str], error_callbacks: Dict[str, str], ): """ Notifier le callback d'erreur si configuré pour ce replay. Appel HTTP POST non-bloquant vers l'URL de callback. En cas d'échec de notification, on log mais on ne bloque pas. """ replay_id = replay_state["replay_id"] callback_url = error_callbacks.get(replay_id) if not callback_url: return def _send_callback(): try: import urllib.request payload = json.dumps({ "replay_id": replay_id, "workflow_id": replay_state.get("workflow_id"), "session_id": replay_state.get("session_id"), "action_id": action_id, "error": error or "Erreur inconnue", "retried_actions": replay_state.get("retried_actions", 0), "error_log": replay_state.get("error_log", []), "status": replay_state.get("status"), }).encode("utf-8") req = urllib.request.Request( callback_url, data=payload, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(req, timeout=5) as resp: logger.info( f"Error callback envoyé à {callback_url}: {resp.status}" ) except Exception as e: logger.warning( f"Échec envoi error callback à {callback_url}: {e}" ) # Envoyer en arrière-plan pour ne pas bloquer threading.Thread(target=_send_callback, daemon=True).start()