Files
rpa_vision_v3/agent_v0/server_v1/replay_engine.py
Dom 7df51d2c79 snapshot: WIP 5j replay reliability (B1 watchdog + dialog handlers + grounding drift)
Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé,
polls morts ×2). Point de rollback stable.

Contenu:
- agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab
  hotkey fallback, confirm_save Unicode apostrophe, foreground dialog
  recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint,
  requires_post_verify_window_transition
- agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed)
- server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s
- server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan,
  metrics endpoint
- server_v1/replay_engine.py: _schedule_retry préserve original_action +
  dispatched_action
- stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab
  on save_as dialog open) + _attach_expected_window_before
- tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py
- tests/unit: test_executor_verify_window_guard.py (start_button, close_tab,
  runtime_dialog, post_verify, transition fallbacks)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 16:48:37 +02:00

2832 lines
106 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# agent_v0/server_v1/replay_engine.py
"""
Replay Engine — Gestion des replays de workflows.
Contient :
- Setup environnement (préparation apps avant replay)
- Validation des actions de replay (sécurité)
- Conversion workflow → actions normalisées
- Fonctions utilitaires de replay (session detection, state management, retry)
- Pre-check écran par embedding CLIP
- Détection popup
Extrait de api_stream.py pour clarifier l'architecture.
"""
import base64
import io
import json
import logging
import os
import re
import threading
import time
import uuid
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger("api_stream")
# =========================================================================
# Validation des actions de replay (sécurité HIGH)
# =========================================================================
_ALLOWED_ACTION_TYPES = {
"click", "type", "key_combo", "scroll", "wait",
"file_open", "file_save", "file_close", "file_new", "file_dialog",
"double_click", "right_click", "drag",
"verify_screen", # Replay hybride : vérification visuelle entre groupes
"pause_for_human", # Pause supervisée explicite (interceptée par /replay/next)
"extract_text", # OCR serveur sur dernier heartbeat → variable workflow
"extract_table", # OCR serveur + filtre regex → liste structurée (boucle)
"extract_text_scroll", # Marker côté graphe — expansé en sous-actions par _edge_to_normalized_actions
"_concat_text_vars", # Action serveur interne (générée par expansion extract_text_scroll)
"t2a_decision", # Analyse LLM facturation T2A → variable workflow
"llm_generate", # Génération texte libre côté serveur → variable workflow
"paste_and_execute", # Bypass NoMachine : ydotool Ctrl+V+Ctrl+Enter dans VM via SSH
}
# Types d'actions exécutées CÔTÉ SERVEUR (jamais transmises à l'Agent V1).
# Le pipeline /replay/next les traite en boucle interne et passe à l'action
# suivante jusqu'à trouver une action visuelle (à transmettre au client).
_SERVER_SIDE_ACTION_TYPES = {
"extract_text",
"extract_table",
"t2a_decision",
"llm_generate",
"_concat_text_vars",
"paste_and_execute",
}
# Pause par défaut entre Ctrl+End/Home et la capture suivante (ms).
# Configurable par step via parameters.scroll_pause_ms ; default ici.
SCROLL_PAUSE_MS = 500
_MAX_ACTION_TEXT_LENGTH = 10000
_MAX_KEYS_PER_COMBO = 10
# Touches autorisées dans les key_combo (modificateurs + touches spéciales + caractères simples)
_KNOWN_KEY_NAMES = {
"enter", "return", "tab", "escape", "esc", "backspace", "delete", "space",
"up", "down", "left", "right", "home", "end", "page_up", "page_down",
"f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10", "f11", "f12",
"ctrl", "ctrl_l", "ctrl_r", "alt", "alt_l", "alt_r",
"shift", "shift_l", "shift_r",
"cmd", "win", "super", "super_l", "super_r", "windows", "meta",
"insert", "print_screen", "caps_lock", "num_lock",
}
def _auto_launch_replay_after_finalize() -> bool:
"""Décide si ``/finalize`` doit proposer un replay direct immédiat.
Patch 2026-05-23 (brief 0902 deferred-workflow) : le chemin produit
cible est le workflow compilé par le worker VLM en arrière-plan,
pas le replay direct depuis ``live_events.jsonl``. Le replay direct
reste utile pour le smoke/debug — on l'active explicitement via
la variable d'env ``RPA_AUTO_LAUNCH_REPLAY_AFTER_FINALIZE``.
Default-deny : toute valeur autre que ``true``/``1``/``yes``
(case-insensitive, après strip) retourne ``False``.
"""
raw = os.environ.get("RPA_AUTO_LAUNCH_REPLAY_AFTER_FINALIZE", "")
return raw.strip().lower() in {"true", "1", "yes"}
def _validate_replay_action(action: dict) -> Optional[str]:
"""Valide une action de replay. Retourne un message d'erreur ou None si valide."""
action_type = action.get("type", "")
# Vérifier le type d'action
if action_type not in _ALLOWED_ACTION_TYPES:
return f"Type d'action non autorisé : '{action_type}'. Autorisés : {sorted(_ALLOWED_ACTION_TYPES)}"
# Vérifier la longueur du texte
text = action.get("text", "")
if isinstance(text, str) and len(text) > _MAX_ACTION_TEXT_LENGTH:
return f"Texte trop long ({len(text)} > {_MAX_ACTION_TEXT_LENGTH} caractères)"
# Vérifier les touches
keys = action.get("keys", [])
if isinstance(keys, list):
if len(keys) > _MAX_KEYS_PER_COMBO:
return f"Trop de touches ({len(keys)} > {_MAX_KEYS_PER_COMBO})"
for key in keys:
key_lower = str(key).lower()
# Accepter les caractères simples (a-z, 0-9, ponctuation) et les noms connus
if len(str(key)) == 1 or key_lower in _KNOWN_KEY_NAMES:
continue
return f"Touche inconnue : '{key}'"
# Vérifier les coordonnées normalisées
for coord_name in ("x_pct", "y_pct"):
val = action.get(coord_name)
if val is not None:
try:
val_f = float(val)
if not (0.0 <= val_f <= 1.0):
return f"Coordonnée {coord_name}={val_f} hors limites [0.0, 1.0]"
except (TypeError, ValueError):
return f"Coordonnée {coord_name} invalide : {val}"
return None # Valide
# =========================================================================
# Setup environnement — Préparation automatique avant le replay
# =========================================================================
# Mapping des noms d'exécutables Windows courants vers la commande de lancement.
# Utilisé comme fallback pour le texte de recherche dans le menu Démarrer.
# Le format est : "processname.exe" (minuscule) -> commande shell
_APP_LAUNCH_COMMANDS: Dict[str, str] = {
"notepad.exe": "notepad",
"explorer.exe": "explorer",
"calc.exe": "calc",
"mspaint.exe": "mspaint",
"cmd.exe": "cmd",
"powershell.exe": "powershell",
"wordpad.exe": "wordpad",
"charmap.exe": "charmap",
"snippingtool.exe": "snippingtool",
"taskmgr.exe": "taskmgr",
"regedit.exe": "regedit",
"mstsc.exe": "mstsc",
"winword.exe": "winword",
"excel.exe": "excel",
"powerpnt.exe": "powerpnt",
"outlook.exe": "outlook",
"msedge.exe": "msedge",
"chrome.exe": "chrome",
"firefox.exe": "firefox",
"code.exe": "code",
}
# Mapping des exécutables vers le nom visuel à chercher dans le menu Démarrer.
# Contient le texte de recherche (souvent le nom français) et une description
# pour le VLM afin d'identifier l'icône dans les résultats de recherche.
# Format : "processname.exe" -> {"search_text": ..., "display_name": ..., "vlm_description": ...}
_APP_VISUAL_SEARCH: Dict[str, Dict[str, str]] = {
"notepad.exe": {
"search_text": "Bloc-notes",
"display_name": "Bloc-notes",
"vlm_description": "L'application Bloc-notes (Notepad) dans les résultats de recherche",
},
"calc.exe": {
"search_text": "Calculatrice",
"display_name": "Calculatrice",
"vlm_description": "L'application Calculatrice dans les résultats de recherche",
},
"mspaint.exe": {
"search_text": "Paint",
"display_name": "Paint",
"vlm_description": "L'application Paint dans les résultats de recherche",
},
"cmd.exe": {
"search_text": "Invite de commandes",
"display_name": "Invite de commandes",
"vlm_description": "L'Invite de commandes (Command Prompt) dans les résultats",
},
"powershell.exe": {
"search_text": "PowerShell",
"display_name": "PowerShell",
"vlm_description": "Windows PowerShell dans les résultats de recherche",
},
"wordpad.exe": {
"search_text": "WordPad",
"display_name": "WordPad",
"vlm_description": "L'application WordPad dans les résultats de recherche",
},
"winword.exe": {
"search_text": "Word",
"display_name": "Microsoft Word",
"vlm_description": "Microsoft Word dans les résultats de recherche",
},
"excel.exe": {
"search_text": "Excel",
"display_name": "Microsoft Excel",
"vlm_description": "Microsoft Excel dans les résultats de recherche",
},
"powerpnt.exe": {
"search_text": "PowerPoint",
"display_name": "Microsoft PowerPoint",
"vlm_description": "Microsoft PowerPoint dans les résultats de recherche",
},
"outlook.exe": {
"search_text": "Outlook",
"display_name": "Microsoft Outlook",
"vlm_description": "Microsoft Outlook dans les résultats de recherche",
},
"msedge.exe": {
"search_text": "Edge",
"display_name": "Microsoft Edge",
"vlm_description": "Microsoft Edge dans les résultats de recherche",
},
"chrome.exe": {
"search_text": "Chrome",
"display_name": "Google Chrome",
"vlm_description": "Google Chrome dans les résultats de recherche",
},
"firefox.exe": {
"search_text": "Firefox",
"display_name": "Mozilla Firefox",
"vlm_description": "Mozilla Firefox dans les résultats de recherche",
},
"code.exe": {
"search_text": "Visual Studio Code",
"display_name": "Visual Studio Code",
"vlm_description": "Visual Studio Code dans les résultats de recherche",
},
"taskmgr.exe": {
"search_text": "Gestionnaire des tâches",
"display_name": "Gestionnaire des tâches",
"vlm_description": "Le Gestionnaire des tâches dans les résultats de recherche",
},
"snippingtool.exe": {
"search_text": "Outil Capture",
"display_name": "Outil Capture d'écran",
"vlm_description": "L'Outil Capture d'écran dans les résultats de recherche",
},
"mstsc.exe": {
"search_text": "Connexion Bureau à distance",
"display_name": "Bureau à distance",
"vlm_description": "La Connexion Bureau à distance dans les résultats",
},
}
# Applications Windows à ignorer pour le setup (processus système, agents, etc.)
_SETUP_IGNORE_APPS = {
"searchhost.exe", # Barre de recherche Windows
"explorer.exe", # Explorer est toujours lancé (shell Windows)
"pythonw.exe", # Agent Python (notre propre agent)
"python.exe", # Idem
"shellexperiencehost.exe",
"startmenuexperiencehost.exe",
"applicationframehost.exe",
"systemsettings.exe",
"textinputhost.exe",
"runtimebroker.exe",
}
# Certaines applications Windows légères sont plus robustes à lancer via
# `Win+R` + commande shell qu'à travers un setup 100% visuel Démarrer /
# Rechercher. On active cette stratégie de façon ciblée pour les cas validés
# en live afin d'éviter que la validation métier dépende d'un chemin Windows
# fragile sans rapport avec le workflow testé.
_SETUP_RUN_DIALOG_APPS = {
"notepad.exe",
}
# Tokens de titres "neutres" = état initial d'une application fraîchement
# lancée par le setup auto (Win+Démarrer → recherche → clic résultat).
# Quand la session source contient un focus_change vers un de ces titres
# peu après le premier focus app, le trim coupe jusqu'à ce focus pour
# éliminer les clics intra-app redondants (bascule d'onglet, fermeture de
# fenêtre précédente, etc.) que le setup auto rend inutiles.
_NEUTRAL_TITLE_TOKENS = frozenset({
"sans titre", # Bloc-notes FR
"untitled", # Notepad EN
"document1", # Word
"classeur1", # Excel FR
"book1", # Excel EN
"présentation1", # PowerPoint FR
"presentation1", # PowerPoint EN
})
# Nombre d'events bruts inspectés après le premier focus vers primary_app
# pour rechercher un focus vers un titre neutre. Volontairement court
# pour ne pas couper un workflow qui re-visite un titre neutre bien plus
# tard (filet de sécurité).
_TRIM_NEUTRAL_LOOKAHEAD = 15
def _is_neutral_window_title(window_title: str) -> bool:
"""Détecter si un titre de fenêtre correspond à l'état initial vide
d'une application (fenêtre fraîchement ouverte par le setup auto).
Exemples : ``Sans titre Bloc-notes`` → True,
``http://foo.txt Bloc-notes`` → False,
``Document1 - Word`` → True.
"""
if not window_title:
return False
title = str(window_title).strip().lower()
for sep in (" ", " - "):
if sep in title:
title = title.split(sep, 1)[0].strip()
break
return title.lstrip("*").strip() in _NEUTRAL_TITLE_TOKENS
def _relative_position_labels(x: int, y: int, screen_w: int, screen_h: int) -> Dict[str, str]:
"""Décrire une position en termes relatifs sur l'écran."""
y_relative = ""
x_relative = ""
if screen_h > 0:
y_relative = (
"en bas" if y / screen_h > 0.8
else "en haut" if y / screen_h < 0.2
else "au milieu"
)
if screen_w > 0:
x_relative = (
"à gauche" if x / screen_w < 0.3
else "à droite" if x / screen_w > 0.7
else "au centre"
)
return {
"x_relative": x_relative,
"y_relative": y_relative,
}
def _extract_launch_result_target(
raw_events: list,
primary_app: str,
) -> Optional[Dict[str, Any]]:
"""Retrouver le vrai clic de lancement depuis SearchHost.exe.
Cherche un clic dans la fenêtre de recherche Windows qui est suivi
rapidement d'un focus vers l'application principale. Ce clic sert de
meilleure cible pour le setup auto qu'une cible synthétique
`display_name/app_icon`, trop fragile sur les résultats de recherche.
"""
primary_app_lower = (primary_app or "").lower()
if not primary_app_lower:
return None
for idx, raw_evt in enumerate(raw_events):
event_data = raw_evt.get("event", raw_evt)
if event_data.get("type") != "mouse_click":
continue
window = event_data.get("window", {})
if not isinstance(window, dict):
continue
if window.get("app_name", "").lower() != "searchhost.exe":
continue
pos = event_data.get("pos") or []
if not isinstance(pos, list) or len(pos) != 2:
continue
screen_meta = event_data.get("screen_metadata", {})
screen_res = (
screen_meta.get("screen_resolution")
or event_data.get("screen_resolution")
or []
)
if not isinstance(screen_res, list) or len(screen_res) != 2:
continue
try:
click_x = int(pos[0])
click_y = int(pos[1])
screen_w = int(screen_res[0])
screen_h = int(screen_res[1])
except (TypeError, ValueError):
continue
if screen_w <= 0 or screen_h <= 0:
continue
click_ts = event_data.get("timestamp")
launched = False
for follow_evt in raw_events[idx + 1: idx + 8]:
follow_data = follow_evt.get("event", follow_evt)
if follow_data.get("type") != "window_focus_change":
continue
to_info = follow_data.get("to", {})
if not isinstance(to_info, dict):
continue
if to_info.get("app_name", "").lower() != primary_app_lower:
continue
follow_ts = follow_data.get("timestamp")
if (
isinstance(click_ts, (int, float))
and isinstance(follow_ts, (int, float))
and follow_ts - click_ts > 5.0
):
break
launched = True
break
if not launched:
continue
pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h)
position_desc = " ".join(
part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part
)
window_title = window.get("title", "") or "Rechercher"
target: Dict[str, Any] = {
"x_pct": round(click_x / screen_w, 6),
"y_pct": round(click_y / screen_h, 6),
"window_title": window_title,
"expected_window_before": window_title,
"original_position": pos_labels,
"source_app": window.get("app_name", ""),
}
if position_desc:
target["position_desc"] = position_desc
window_capture = event_data.get("window_capture", {})
if isinstance(window_capture, dict):
click_relative = window_capture.get("click_relative")
window_size = window_capture.get("window_size")
if (
isinstance(click_relative, list)
and len(click_relative) == 2
and isinstance(window_size, list)
and len(window_size) == 2
):
target["window_capture"] = {
"click_relative": click_relative,
"window_size": window_size,
}
return target
return None
def _extract_start_menu_target(raw_events: list) -> Optional[Dict[str, Any]]:
"""Retrouver le vrai clic sur Démarrer depuis les événements bruts."""
return _extract_start_menu_target_from_session(raw_events, session_dir=None)
def _load_click_anchor_from_session(
session_dir: Optional[str],
screenshot_id: str,
click_x: int,
click_y: int,
) -> str:
"""Charger un crop 80x80 depuis le screenshot source stocké côté serveur."""
if not session_dir or not screenshot_id:
return ""
full_path = Path(session_dir) / "shots" / f"{screenshot_id}_full.png"
if not full_path.is_file():
return ""
try:
from PIL import Image
img = Image.open(full_path)
crop_size = 40
x1 = max(0, click_x - crop_size)
y1 = max(0, click_y - crop_size)
x2 = min(img.width, click_x + crop_size)
y2 = min(img.height, click_y + crop_size)
cropped = img.crop((x1, y1, x2, y2))
buf = io.BytesIO()
cropped.save(buf, format="PNG")
return base64.b64encode(buf.getvalue()).decode("utf-8")
except Exception as e:
logger.debug("setup start anchor: crop échoué pour %s: %s", full_path, e)
return ""
def _extract_start_menu_target_from_session(
raw_events: list,
session_dir: Optional[str] = None,
) -> Optional[Dict[str, Any]]:
"""Retrouver le vrai clic sur Démarrer et son ancre visuelle si disponible."""
for idx, raw_evt in enumerate(raw_events):
event_data = raw_evt.get("event", raw_evt)
if event_data.get("type") != "mouse_click":
continue
pos = event_data.get("pos") or []
if not isinstance(pos, list) or len(pos) != 2:
continue
screen_meta = event_data.get("screen_metadata", {})
screen_res = (
screen_meta.get("screen_resolution")
or event_data.get("screen_resolution")
or []
)
if not isinstance(screen_res, list) or len(screen_res) != 2:
continue
click_ts = event_data.get("timestamp")
opened_search = False
for follow_evt in raw_events[idx + 1: idx + 6]:
follow_data = follow_evt.get("event", follow_evt)
if follow_data.get("type") != "window_focus_change":
continue
to_info = follow_data.get("to", {})
if not isinstance(to_info, dict):
continue
if to_info.get("app_name", "").lower() != "searchhost.exe":
continue
follow_ts = follow_data.get("timestamp")
if (
isinstance(click_ts, (int, float))
and isinstance(follow_ts, (int, float))
and follow_ts - click_ts > 3.0
):
break
opened_search = True
break
if not opened_search:
continue
try:
click_x = int(pos[0])
click_y = int(pos[1])
screen_w = int(screen_res[0])
screen_h = int(screen_res[1])
except (TypeError, ValueError):
continue
if screen_w <= 0 or screen_h <= 0:
continue
pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h)
position_desc = " ".join(
part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part
)
target: Dict[str, Any] = {
"x_pct": round(click_x / screen_w, 6),
"y_pct": round(click_y / screen_h, 6),
"original_position": pos_labels,
"position_desc": position_desc,
}
anchor_b64 = _load_click_anchor_from_session(
session_dir=session_dir,
screenshot_id=str(event_data.get("screenshot_id", "")).strip(),
click_x=click_x,
click_y=click_y,
)
if anchor_b64:
target["anchor_image_base64"] = anchor_b64
return target
return None
def _extract_search_box_interaction(raw_events: list) -> Optional[Dict[str, Any]]:
"""Déterminer comment la recherche Windows a été activée.
Cas utile observé en prod :
- clic Démarrer -> focus SearchHost -> saisie directe
=> inutile de générer un clic artificiel sur le champ de recherche.
"""
for idx, raw_evt in enumerate(raw_events):
event_data = raw_evt.get("event", raw_evt)
if event_data.get("type") != "window_focus_change":
continue
to_info = event_data.get("to", {})
if not isinstance(to_info, dict):
continue
if to_info.get("app_name", "").lower() != "searchhost.exe":
continue
search_window_title = to_info.get("title", "") or "Rechercher"
for follow_evt in raw_events[idx + 1: idx + 8]:
follow_data = follow_evt.get("event", follow_evt)
follow_type = follow_data.get("type")
if follow_type == "text_input":
window = follow_data.get("window", {})
if isinstance(window, dict) and window.get("app_name", "").lower() == "searchhost.exe":
return {
"mode": "direct_typing",
"window_title": window.get("title", "") or search_window_title,
}
continue
if follow_type == "mouse_click":
window = follow_data.get("window", {})
if not (isinstance(window, dict) and window.get("app_name", "").lower() == "searchhost.exe"):
continue
pos = follow_data.get("pos") or []
screen_meta = follow_data.get("screen_metadata", {})
screen_res = (
screen_meta.get("screen_resolution")
or follow_data.get("screen_resolution")
or []
)
if not (isinstance(pos, list) and len(pos) == 2 and isinstance(screen_res, list) and len(screen_res) == 2):
continue
has_text_after = False
for later_evt in raw_events[idx + 2: idx + 8]:
later_data = later_evt.get("event", later_evt)
if later_data.get("type") != "text_input":
continue
later_window = later_data.get("window", {})
if isinstance(later_window, dict) and later_window.get("app_name", "").lower() == "searchhost.exe":
has_text_after = True
break
if not has_text_after:
continue
try:
click_x = int(pos[0])
click_y = int(pos[1])
screen_w = int(screen_res[0])
screen_h = int(screen_res[1])
except (TypeError, ValueError):
continue
if screen_w <= 0 or screen_h <= 0:
continue
pos_labels = _relative_position_labels(click_x, click_y, screen_w, screen_h)
position_desc = " ".join(
part for part in [pos_labels["y_relative"], pos_labels["x_relative"]] if part
)
return {
"mode": "click_then_type",
"x_pct": round(click_x / screen_w, 6),
"y_pct": round(click_y / screen_h, 6),
"window_title": window.get("title", "") or search_window_title,
"expected_window_before": window.get("title", "") or search_window_title,
"original_position": pos_labels,
"position_desc": position_desc,
}
if follow_type == "window_focus_change":
next_to = follow_data.get("to", {})
if isinstance(next_to, dict) and next_to.get("app_name", "").lower() != "searchhost.exe":
break
return None
def _extract_required_apps_from_events(
raw_events: list,
session_dir: Optional[str] = None,
) -> Dict[str, Any]:
"""Extraire les applications requises depuis les événements bruts d'une session.
Analyse les window_focus_change pour identifier :
- L'application principale (la plus utilisée hors apps système)
- La première fenêtre ciblée (pour le setup initial)
Args:
raw_events: Événements bruts depuis live_events.jsonl.
Returns:
Dict avec les clés :
- primary_app: str (nom de l'exécutable principal, ex: "Notepad.exe")
- primary_launch_cmd: str (commande Win+R, ex: "notepad")
- first_window_title: str (titre de la première fenêtre applicative)
- apps: dict[str, int] (app_name -> nombre d'occurrences)
- start_menu_target: dict optionnel (vrai clic Démarrer)
- search_box_interaction: dict optionnel (saisie directe ou vrai clic SearchHost)
- launch_result_target: dict optionnel (vrai clic SearchHost -> app)
"""
app_counts: Dict[str, int] = defaultdict(int)
first_app = None
first_window_title = None
for raw_evt in raw_events:
event_data = raw_evt.get("event", raw_evt)
evt_type = event_data.get("type", "")
if evt_type == "window_focus_change":
to_info = event_data.get("to", {})
if not to_info:
continue
app_name = to_info.get("app_name", "")
title = to_info.get("title", "")
if app_name:
app_counts[app_name] += 1
if first_app is None and app_name.lower() not in _SETUP_IGNORE_APPS:
first_app = app_name
first_window_title = title
# Aussi extraire depuis les mouse_click qui ont un champ window
elif evt_type == "mouse_click":
window = event_data.get("window", {})
if isinstance(window, dict):
app_name = window.get("app_name", "")
if app_name:
app_counts[app_name] += 1
if not app_counts:
return {}
# Déterminer l'application principale (la plus fréquente hors apps ignorées)
filtered_apps = {
k: v for k, v in app_counts.items()
if k.lower() not in _SETUP_IGNORE_APPS
}
if not filtered_apps:
return {}
primary_app = max(filtered_apps, key=filtered_apps.get)
# Résoudre la commande de lancement
primary_launch_cmd = _resolve_launch_command(primary_app)
start_menu_target = _extract_start_menu_target_from_session(
raw_events,
session_dir=session_dir,
)
search_box_interaction = _extract_search_box_interaction(raw_events)
launch_result_target = _extract_launch_result_target(raw_events, primary_app)
result = {
"primary_app": primary_app,
"primary_launch_cmd": primary_launch_cmd,
"first_window_title": first_window_title or "",
"apps": dict(app_counts),
}
if start_menu_target:
result["start_menu_target"] = start_menu_target
if search_box_interaction:
result["search_box_interaction"] = search_box_interaction
if launch_result_target:
result["launch_result_target"] = launch_result_target
return result
def _trim_redundant_setup_events(
raw_events: List[Dict[str, Any]],
app_info: Dict[str, Any],
) -> List[Dict[str, Any]]:
"""Couper le préambule de lancement déjà couvert par le setup injecté.
Quand `/replay-session` injecte un setup visuel (Démarrer -> SearchHost ->
résultat d'application), les événements bruts de la session source
contiennent encore cette même séquence. Sans coupe, le replay rejoue
l'ouverture de l'application une deuxième fois et dérive hors contexte.
Stratégie :
- chercher la première `window_focus_change` vers l'application principale
- préférer un titre qui matche `first_window_title`
- conserver uniquement les événements APRÈS cette bascule de focus
Args:
raw_events: événements source complets de la session.
app_info: résultat de `_extract_required_apps_from_events`.
Returns:
Liste coupée si un point de reprise fiable est trouvé, sinon la liste
d'origine inchangée.
"""
if not raw_events or not app_info:
return raw_events
primary_app = str(app_info.get("primary_app", "")).strip().lower()
first_title = str(app_info.get("first_window_title", "")).strip().lower()
if not primary_app:
return raw_events
first_primary_idx = None
matched_idx = None
neutral_idx = None
# Si le titre observé en premier est déjà neutre, le setup amène
# déjà l'app dans le même état → comportement legacy suffit.
first_title_is_neutral = _is_neutral_window_title(first_title)
for idx, raw_evt in enumerate(raw_events):
event_data = raw_evt.get("event", raw_evt)
if event_data.get("type", "") != "window_focus_change":
continue
to_info = event_data.get("to", {})
if not isinstance(to_info, dict):
continue
app_name = str(to_info.get("app_name", "")).strip().lower()
if app_name != primary_app:
continue
title = str(to_info.get("title", "")).strip()
title_lower = title.lower()
if first_primary_idx is None:
first_primary_idx = idx
# Priorité : focus vers un titre neutre proche du premier focus
# → c'est l'état que le setup auto va réellement produire, donc
# les events entre le premier focus et celui-ci sont redondants
# (bascule d'onglet vers la fenêtre vide, etc.).
if (
not first_title_is_neutral
and neutral_idx is None
and _is_neutral_window_title(title)
and (idx - first_primary_idx) <= _TRIM_NEUTRAL_LOOKAHEAD
):
neutral_idx = idx
if matched_idx is None:
if not first_title:
matched_idx = idx
elif title_lower and (
first_title in title_lower or title_lower in first_title
):
matched_idx = idx
# Early exit dès qu'on a trouvé ce qu'on cherche, ou qu'on est
# sorti du lookahead pour le neutral.
if neutral_idx is not None:
break
if matched_idx is not None and (
first_title_is_neutral
or (idx - first_primary_idx) > _TRIM_NEUTRAL_LOOKAHEAD
):
break
cut_idx = (
neutral_idx
if neutral_idx is not None
else (matched_idx if matched_idx is not None else first_primary_idx)
)
if cut_idx is None:
logger.info(
"setup trim : aucun focus initial trouvé pour '%s' — replay brut conservé",
primary_app,
)
return raw_events
trimmed = raw_events[cut_idx + 1:]
logger.info(
"setup trim : %d événements retirés avant le replay brut "
"(app=%s, titre=%s, neutral=%s, restant=%d)",
cut_idx + 1,
primary_app,
app_info.get("first_window_title", ""),
neutral_idx is not None,
len(trimmed),
)
return trimmed
def _extract_required_apps_from_workflow(workflow) -> Dict[str, Any]:
"""Extraire les applications requises depuis un workflow structuré.
Analyse les nodes du workflow pour identifier les titres de fenêtres
requis, puis infère l'application principale.
Args:
workflow: Objet Workflow ou dict brut.
Returns:
Même format que _extract_required_apps_from_events.
"""
# Accéder aux données (objet ou dict)
if hasattr(workflow, 'nodes'):
nodes = workflow.nodes
metadata = workflow.metadata if hasattr(workflow, 'metadata') else {}
elif isinstance(workflow, dict):
nodes = workflow.get('nodes', [])
metadata = workflow.get('metadata', {})
else:
return {}
if not nodes:
return {}
# Collecter les titres de fenêtres depuis les nodes
window_titles = []
for node in nodes:
template = node.template if hasattr(node, 'template') else node.get('template', {})
if isinstance(template, dict):
window = template.get('window', {})
elif hasattr(template, 'window'):
window = template.window if hasattr(template.window, '__dict__') else {}
else:
window = {}
if isinstance(window, dict):
title = window.get('title_pattern', '') or window.get('title_contains', '')
elif hasattr(window, 'title_pattern'):
title = getattr(window, 'title_pattern', '') or ''
else:
title = ''
if title:
window_titles.append(title)
# Inférer l'app principale depuis les titres de fenêtres
primary_app, primary_launch_cmd, matched_title = _infer_app_from_window_titles(window_titles)
# Utiliser le titre qui a matché l'app (pas le premier node qui peut être "Rechercher")
first_title = matched_title or (window_titles[0] if window_titles else "")
if not primary_app:
return {}
source_session_id = metadata.get("source_session_id", "") if isinstance(metadata, dict) else ""
machine_id = metadata.get("machine_id", "") if isinstance(metadata, dict) else ""
return {
"primary_app": primary_app,
"primary_launch_cmd": primary_launch_cmd,
"first_window_title": first_title,
"apps": {},
"source_session_id": source_session_id,
"machine_id": machine_id,
}
def _resolve_launch_command(app_name: str) -> str:
"""Résoudre la commande Win+R pour lancer une application.
Si l'app n'est pas dans le mapping, utilise le nom de l'exécutable
directement sans l'extension .exe (fonctionne pour la plupart des apps).
"""
app_lower = app_name.lower()
if app_lower in _APP_LAUNCH_COMMANDS:
return _APP_LAUNCH_COMMANDS[app_lower]
# Fallback : utiliser le nom sans l'extension .exe
if app_lower.endswith(".exe"):
return app_name[:-4]
return app_name
def _infer_app_from_window_titles(titles: list) -> tuple:
"""Inférer le nom de l'application et la commande de lancement depuis des titres de fenêtres.
Utilise des heuristiques basées sur les patterns de titres Windows courants.
Returns:
Tuple (app_name, launch_command, matched_title).
("", "", "") si non identifié.
"""
_TITLE_APP_PATTERNS = [
("bloc-notes", "Notepad.exe", "notepad"),
("notepad", "Notepad.exe", "notepad"),
("word", "winword.exe", "winword"),
("excel", "excel.exe", "excel"),
("powerpoint", "powerpnt.exe", "powerpnt"),
("outlook", "outlook.exe", "outlook"),
("paint", "mspaint.exe", "mspaint"),
("calculatrice", "calc.exe", "calc"),
("calculator", "calc.exe", "calc"),
("explorateur de fichiers", "explorer.exe", "explorer"),
("file explorer", "explorer.exe", "explorer"),
("invite de commandes", "cmd.exe", "cmd"),
("command prompt", "cmd.exe", "cmd"),
("powershell", "powershell.exe", "powershell"),
("visual studio code", "code.exe", "code"),
("edge", "msedge.exe", "msedge"),
("chrome", "chrome.exe", "chrome"),
("firefox", "firefox.exe", "firefox"),
]
for title in titles:
title_lower = title.lower()
for pattern, app_name, launch_cmd in _TITLE_APP_PATTERNS:
if pattern in title_lower:
# Ignorer les apps système (explorer, etc.)
if app_name.lower() in _SETUP_IGNORE_APPS:
continue
return (app_name, launch_cmd, title)
return ("", "", "")
def _get_visual_search_info(app_name: str) -> Dict[str, str]:
"""Obtenir les informations de recherche visuelle pour une application.
Consulte _APP_VISUAL_SEARCH, sinon construit un fallback à partir du nom
de l'exécutable (ex: "MonApp.exe" -> search_text="MonApp").
Args:
app_name: Nom de l'exécutable (ex: "Notepad.exe").
Returns:
Dict avec search_text, display_name, vlm_description.
"""
app_lower = app_name.lower()
if app_lower in _APP_VISUAL_SEARCH:
return dict(_APP_VISUAL_SEARCH[app_lower])
# Fallback : utiliser le nom sans .exe
base_name = app_name[:-4] if app_lower.endswith(".exe") else app_name
return {
"search_text": base_name,
"display_name": base_name,
"vlm_description": f"L'application {base_name} dans les résultats de recherche",
}
def _should_use_run_dialog_setup(primary_app: str, launch_cmd: str) -> bool:
"""Déterminer si le setup doit passer par `Win+R`.
On cible seulement quelques apps connues où le chemin Démarrer →
Rechercher s'est montré fragile en live, alors que la commande shell est
stable et sémantiquement équivalente pour préparer l'environnement.
"""
app_lower = str(primary_app or "").strip().lower()
launch_cmd = str(launch_cmd or "").strip()
return bool(app_lower in _SETUP_RUN_DIALOG_APPS and launch_cmd)
def _generate_run_dialog_setup_actions(
app_info: Dict[str, Any],
setup_id_prefix: str = "setup",
) -> List[Dict[str, Any]]:
"""Générer un setup sémantique `Win+R -> commande -> Enter`.
Utilisé pour les applications dont l'ouverture via Démarrer/Rechercher
ajoute une fragilité Windows sans valeur métier pour le replay.
"""
launch_cmd = str(app_info.get("primary_launch_cmd", "") or "").strip()
primary_app = str(app_info.get("primary_app", "") or "").strip()
first_title = str(app_info.get("first_window_title", "") or "").strip()
visual_info = _get_visual_search_info(primary_app)
display_name = str(visual_info.get("display_name", "") or "").strip()
if not launch_cmd or not primary_app:
return []
heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"}
wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000
title_patterns: List[str] = []
for candidate in (
display_name,
launch_cmd,
primary_app[:-4] if primary_app.lower().endswith(".exe") else primary_app,
):
candidate = str(candidate or "").strip()
if not candidate:
continue
if candidate.lower() not in {p.lower() for p in title_patterns}:
title_patterns.append(candidate)
if " " in candidate:
last_token = candidate.split()[-1].strip()
if last_token and last_token.lower() not in {p.lower() for p in title_patterns}:
title_patterns.append(last_token)
actions: List[Dict[str, Any]] = [
{
"action_id": f"act_{setup_id_prefix}_open_run",
"type": "key_combo",
"keys": ["win", "r"],
"_setup_phase": True,
"_setup_step": "open_run_dialog",
"_setup_strategy": "run_dialog",
},
{
"action_id": f"act_{setup_id_prefix}_wait_run",
"type": "wait",
"duration_ms": 500,
"_setup_phase": True,
"_setup_step": "wait_run_dialog",
"_setup_strategy": "run_dialog",
},
{
"action_id": f"act_{setup_id_prefix}_type_launch_cmd",
"type": "type",
"text": launch_cmd,
"_setup_phase": True,
"_setup_step": "type_launch_command",
"_setup_strategy": "run_dialog",
},
{
"action_id": f"act_{setup_id_prefix}_wait_launch_cmd",
"type": "wait",
"duration_ms": 300,
"_setup_phase": True,
"_setup_step": "wait_launch_command",
"_setup_strategy": "run_dialog",
},
{
"action_id": f"act_{setup_id_prefix}_submit_run",
"type": "key_combo",
"keys": ["enter"],
"_setup_phase": True,
"_setup_step": "submit_run_dialog",
"_setup_strategy": "run_dialog",
},
{
"action_id": f"act_{setup_id_prefix}_wait_launch",
"type": "wait",
"duration_ms": wait_ms,
"_setup_phase": True,
"_setup_step": "wait_app_launch",
"_setup_strategy": "run_dialog",
},
]
if title_patterns or first_title:
actions.append({
"action_id": f"act_{setup_id_prefix}_verify",
"type": "verify_screen",
"expected_node": "setup_initial",
"timeout_ms": 5000,
"_setup_phase": True,
"_setup_step": "verify_app_ready",
"_setup_strategy": "run_dialog",
"expected_window_title_contains": title_patterns or [first_title],
})
logger.info(
"Setup env sémantique généré : %d actions pour lancer '%s' via Win+R (%s)",
len(actions), primary_app, launch_cmd,
)
return actions
def _generate_setup_actions(
app_info: Dict[str, Any],
setup_id_prefix: str = "setup",
) -> List[Dict[str, Any]]:
"""Générer les actions 100% visuelles pour ouvrir l'application avant le replay.
Approche entièrement visuelle -- JAMAIS de raccourcis clavier (Win, Win+R,
Ctrl+X, etc.) qui n'ont pas été enregistrés par l'utilisateur. Tout passe
par des clics visuels résolus par le VLM (Qwen2.5-VL).
La séquence est :
1. Clic visuel sur le bouton Démarrer (coin bas-gauche de l'écran)
2. Attendre que le menu Démarrer s'ouvre (1s)
3. Clic visuel sur la barre de recherche du menu Démarrer
4. Attendre que la barre de recherche soit active (500ms)
5. Taper le nom de l'application (texte français, ex: "Bloc-notes")
6. Attendre les résultats de recherche (1.2s)
7. Clic visuel sur le résultat de l'application trouvée
8. Attendre que l'application s'ouvre (2-3s selon le poids)
9. verify_screen : vérifier que la fenêtre attendue est apparue
Args:
app_info: Dict retourné par _extract_required_apps_from_events ou
_extract_required_apps_from_workflow.
setup_id_prefix: Préfixe pour les action_id générés.
Returns:
Liste d'actions normalisées, prêtes à injecter dans la queue.
Liste vide si aucune préparation n'est nécessaire.
"""
if not app_info:
return []
launch_cmd = app_info.get("primary_launch_cmd", "")
primary_app = app_info.get("primary_app", "")
first_title = app_info.get("first_window_title", "")
if not launch_cmd:
logger.debug(
"setup_actions : pas de commande de lancement pour '%s', skip",
primary_app,
)
return []
# Ne pas lancer les apps système (toujours présentes)
if primary_app.lower() in _SETUP_IGNORE_APPS:
logger.debug("setup_actions : app '%s' ignorée (système)", primary_app)
return []
if _should_use_run_dialog_setup(primary_app, launch_cmd):
return _generate_run_dialog_setup_actions(
app_info,
setup_id_prefix=setup_id_prefix,
)
# Obtenir les informations de recherche visuelle pour cette app
visual_info = _get_visual_search_info(primary_app)
search_text = visual_info["search_text"]
display_name = visual_info["display_name"]
vlm_description = visual_info["vlm_description"]
start_menu_target = app_info.get("start_menu_target", {}) or {}
search_box_interaction = app_info.get("search_box_interaction", {}) or {}
launch_result_target = app_info.get("launch_result_target", {}) or {}
actions = []
logger.info(
"Génération setup env 100%% visuel : lancement de '%s' via clic "
"Démarrer → recherche visuelle '%s' (fenêtre attendue : '%s')",
primary_app, search_text, first_title,
)
# 1. Clic visuel sur le bouton Démarrer (toujours visible, bas-gauche)
# Le VLM résout la position exacte ; x_pct/y_pct sont des fallbacks.
start_click_spec = {
"by_text": "Démarrer",
"by_role": "start_button",
"vlm_description": (
"Le bouton Démarrer de Windows (icône Windows), "
"en bas à gauche de la barre des tâches"
),
"screen_scope": "full_screen",
}
start_click_x = 0.02
start_click_y = 0.98
if start_menu_target:
start_click_x = float(start_menu_target.get("x_pct", start_click_x))
start_click_y = float(start_menu_target.get("y_pct", start_click_y))
start_click_spec["by_text"] = ""
start_click_spec["allow_position_fallback"] = True
anchor_b64 = str(start_menu_target.get("anchor_image_base64", "")).strip()
if anchor_b64:
start_click_spec["anchor_image_base64"] = anchor_b64
original_position = start_menu_target.get("original_position")
if isinstance(original_position, dict) and original_position:
start_click_spec["original_position"] = dict(original_position)
position_desc = str(start_menu_target.get("position_desc", "")).strip()
if position_desc:
start_click_spec["vlm_description"] = (
"L'icône Windows du bouton Démarrer dans la barre des tâches, "
f"visible {position_desc} de l'écran"
)
else:
start_click_spec["vlm_description"] = (
"L'icône Windows du bouton Démarrer dans la barre des tâches"
)
actions.append({
"action_id": f"act_{setup_id_prefix}_click_start",
"type": "click",
"x_pct": start_click_x,
"y_pct": start_click_y,
"button": "left",
"visual_mode": True,
"target_spec": start_click_spec,
"_setup_phase": True,
"_setup_step": "click_start_menu",
})
# 2. Attendre que le menu Démarrer s'ouvre
actions.append({
"action_id": f"act_{setup_id_prefix}_wait_start",
"type": "wait",
"duration_ms": 1000,
"_setup_phase": True,
"_setup_step": "wait_start_menu",
})
# 2b. Garde visuelle : le menu Démarrer / la barre de recherche
# doit être réellement actif avant de continuer. Sans cette garde,
# un click_start qui touche en fait le systray overflow popup
# laisse le setup taper « bloc » dans la mauvaise fenêtre (cf.
# run live 2026-05-22 replay_sess_76b7d067). L'exécuteur compare
# le titre actif aux patterns ci-dessous (substring case-insensitive,
# FR+EN+app-name) et bascule en apprentissage humain si aucun match.
actions.append({
"action_id": f"act_{setup_id_prefix}_verify_start_open",
"type": "verify_screen",
"expected_node": "",
"timeout_ms": 1500,
"expected_window_title_contains": [
"Rechercher",
"Recherche",
"Search",
"Cortana",
"Démarrer",
"Start",
"SearchHost",
"StartMenuExperienceHost",
],
"_setup_phase": True,
"_setup_step": "verify_start_menu_open",
})
search_mode = str(search_box_interaction.get("mode", "")).strip()
if search_mode != "direct_typing":
search_click_spec = {
"by_text": "Rechercher",
"by_role": "search_box",
"vlm_description": (
"La barre ou le champ de recherche dans le menu Démarrer "
"de Windows, souvent intitulé 'Tapez ici pour rechercher' "
"ou 'Rechercher'"
),
}
search_click_x = 0.20
search_click_y = 0.92
search_expected_window = ""
if search_mode == "click_then_type":
search_click_x = float(search_box_interaction.get("x_pct", search_click_x))
search_click_y = float(search_box_interaction.get("y_pct", search_click_y))
search_expected_window = str(
search_box_interaction.get("expected_window_before")
or search_box_interaction.get("window_title")
or ""
)
search_click_spec["window_title"] = str(
search_box_interaction.get("window_title", "")
).strip()
original_position = search_box_interaction.get("original_position")
if isinstance(original_position, dict) and original_position:
search_click_spec["original_position"] = dict(original_position)
position_desc = str(search_box_interaction.get("position_desc", "")).strip()
if position_desc:
search_click_spec["vlm_description"] = (
f"Dans la fenêtre '{search_click_spec['window_title']}', "
f"le champ de recherche se trouve {position_desc} de l'écran"
)
search_click_action = {
"action_id": f"act_{setup_id_prefix}_click_search",
"type": "click",
"x_pct": search_click_x,
"y_pct": search_click_y,
"button": "left",
"visual_mode": True,
"target_spec": search_click_spec,
"_setup_phase": True,
"_setup_step": "click_search_box",
}
if search_expected_window:
search_click_action["expected_window_before"] = search_expected_window
actions.append(search_click_action)
# 4. Attendre que la barre de recherche soit active et prête
actions.append({
"action_id": f"act_{setup_id_prefix}_wait_search_ready",
"type": "wait",
"duration_ms": 500,
"_setup_phase": True,
"_setup_step": "wait_search_ready",
})
# 4b. Garde visuelle : la barre Rechercher doit effectivement
# avoir le focus avant la frappe. On combine le titre récupéré
# de la session source (`search_box_interaction.window_title`)
# avec un fallback FR/EN générique.
search_window_hint = str(search_box_interaction.get("window_title", "")).strip()
verify_patterns = ["Rechercher", "Recherche", "Search"]
if search_window_hint and search_window_hint not in verify_patterns:
verify_patterns = [search_window_hint] + verify_patterns
actions.append({
"action_id": f"act_{setup_id_prefix}_verify_search_active",
"type": "verify_screen",
"expected_node": "",
"timeout_ms": 1500,
"expected_window_title_contains": verify_patterns,
"_setup_phase": True,
"_setup_step": "verify_search_box_active",
})
# 5. Taper le nom visuel de l'application (texte français)
actions.append({
"action_id": f"act_{setup_id_prefix}_type_search",
"type": "type",
"text": search_text,
"_setup_phase": True,
"_setup_step": "type_app_name",
})
# 6. Attendre que la recherche Windows trouve l'application
actions.append({
"action_id": f"act_{setup_id_prefix}_wait_results",
"type": "wait",
"duration_ms": 1200,
"_setup_phase": True,
"_setup_step": "wait_search_results",
})
# 6b. Dernière garde avant le clic résultat : la barre Rechercher
# (et donc la liste de résultats) doit toujours être active. Sans
# cette garde finale, un focus perdu pendant wait_search_results
# fait cliquer click_app_result dans la mauvaise surface (constat
# live 2026-05-22 — fenêtre observée « Fenêtre de dépassement de
# capacité de la barre d'état système »).
actions.append({
"action_id": f"act_{setup_id_prefix}_verify_results_visible",
"type": "verify_screen",
"expected_node": "",
"timeout_ms": 1500,
"expected_window_title_contains": [
"Rechercher",
"Recherche",
"Search",
"Cortana",
"SearchHost",
"StartMenuExperienceHost",
],
"_setup_phase": True,
"_setup_step": "verify_search_results_visible",
})
# 7. Clic visuel sur le résultat de l'application dans la liste
click_result_spec = {
"by_text": display_name,
"by_role": "app_icon",
"vlm_description": vlm_description,
}
click_result_x = 0.20
click_result_y = 0.50
click_result_expected_window = ""
if launch_result_target:
click_result_x = float(launch_result_target.get("x_pct", click_result_x))
click_result_y = float(launch_result_target.get("y_pct", click_result_y))
click_result_expected_window = str(
launch_result_target.get("expected_window_before")
or launch_result_target.get("window_title")
or ""
)
click_result_spec["by_role"] = "search_result"
click_result_spec["allow_position_fallback"] = True
click_result_spec["window_title"] = str(launch_result_target.get("window_title", "")).strip()
original_position = launch_result_target.get("original_position")
if isinstance(original_position, dict) and original_position:
click_result_spec["original_position"] = dict(original_position)
window_capture = launch_result_target.get("window_capture")
if isinstance(window_capture, dict) and window_capture:
click_result_spec["window_capture"] = dict(window_capture)
position_desc = str(launch_result_target.get("position_desc", "")).strip()
if position_desc:
click_result_spec["vlm_description"] = (
f"Dans la fenêtre '{click_result_spec['window_title']}', "
f"le résultat de recherche de l'application '{display_name}' "
f"se trouve {position_desc} de l'écran"
)
else:
click_result_spec["vlm_description"] = (
f"Dans la fenêtre '{click_result_spec['window_title']}', "
f"cliquez sur le résultat de recherche de l'application '{display_name}'"
)
click_result_action = {
"action_id": f"act_{setup_id_prefix}_click_result",
"type": "click",
"x_pct": click_result_x,
"y_pct": click_result_y,
"button": "left",
"visual_mode": True,
"target_spec": click_result_spec,
"_setup_phase": True,
"_setup_step": "click_app_result",
}
if click_result_expected_window:
click_result_action["expected_window_before"] = click_result_expected_window
actions.append({
**click_result_action,
})
# 8. Attendre que l'application s'ouvre
heavy_apps = {"winword.exe", "excel.exe", "powerpnt.exe", "outlook.exe", "code.exe"}
wait_ms = 3000 if primary_app.lower() in heavy_apps else 2000
actions.append({
"action_id": f"act_{setup_id_prefix}_wait_launch",
"type": "wait",
"duration_ms": wait_ms,
"_setup_phase": True,
"_setup_step": "wait_app_launch",
})
# 9. Vérification visuelle que la fenêtre attendue est apparue
if first_title:
actions.append({
"action_id": f"act_{setup_id_prefix}_verify",
"type": "verify_screen",
"expected_node": "setup_initial",
"timeout_ms": 5000,
"_setup_phase": True,
"_setup_step": "verify_app_ready",
"_expected_title": first_title,
})
logger.info(
"Setup env visuel généré : %d actions pour lancer '%s' "
"(recherche visuelle : '%s')",
len(actions), primary_app, search_text,
)
return actions
# =========================================================================
# Replay — Fonctions de conversion workflow → actions
# =========================================================================
def _find_active_agent_session(session_manager, machine_id: Optional[str] = None) -> Optional[str]:
"""Trouver la dernière session Agent V1 pour le replay.
Stratégie en 2 passes :
1. D'abord chercher une session `sess_*` non-finalisée (Agent V1 actif)
2. Sinon, pour une machine ciblée, réutiliser `bg_<machine_id>` si présent
3. Sinon, prendre la plus récente `sess_*` même finalisée
Args:
session_manager: Instance LiveSessionManager.
machine_id: Si fourni, ne chercher que les sessions de cette machine.
"""
with session_manager._lock:
bg_session_id = f"bg_{machine_id}" if machine_id else None
def _matches_machine(session) -> bool:
if machine_id is None:
return True
if session.machine_id == machine_id:
return True
# Robustesse au redémarrage : certaines sessions de fond peuvent
# encore être restaurées avec machine_id='default' alors que leur
# session_id encode déjà la vraie machine.
return bool(bg_session_id and session.session_id == bg_session_id)
all_agent_sessions = [
s for s in session_manager._sessions.values()
if s.session_id.startswith("sess_")
and _matches_machine(s)
]
background_session = next(
(
s for s in session_manager._sessions.values()
if bg_session_id and s.session_id == bg_session_id
),
None,
)
# Trier par session_id (contient un timestamp) — plus récent d'abord
all_agent_sessions.sort(key=lambda s: s.session_id, reverse=True)
# Passe 1 : préférer une session non-finalisée
for s in all_agent_sessions:
if not s.finalized:
return s.session_id
# Passe 2 : fallback sur la session de fond de la machine si elle existe.
if background_session and not background_session.finalized:
return background_session.session_id
# Passe 3 : fallback sur la plus récente (même finalisée)
if all_agent_sessions:
return all_agent_sessions[0].session_id
if background_session:
return background_session.session_id
return None
def _workflow_to_actions(
workflow,
params: Optional[Dict[str, Any]] = None,
processor=None,
gesture_catalog=None,
) -> List[Dict[str, Any]]:
"""
Convertir un workflow (nodes + edges ordonnés) en liste d'actions normalisées.
Parcourt le graphe depuis les entry_nodes en suivant les edges.
Chaque edge produit une action normalisée avec coordonnées en pourcentage.
Mode intelligent (workflows appris par Léa) :
Si le workflow a des nodes avec des prototype_vectors, utilise le
StreamProcessor.extract_enriched_actions() qui enrichit les actions
avec les données de la session originale, le ciblage visuel et le
pre-check/post-check par embedding CLIP.
Mode classique (workflows VWB/manuels) :
Parcours BFS classique avec _edge_to_normalized_actions().
"""
params = params or {}
# Détection d'un workflow appris (a des nodes avec prototype_vectors)
# et qui a des edges structurés
if _is_learned_workflow(workflow) and processor is not None:
# Priorité 1 : replay hybride (événements bruts + structure workflow)
hybrid = processor.build_hybrid_replay(workflow)
if hybrid:
logger.info(
"Replay hybride : %d actions depuis events bruts + structure workflow",
len(hybrid),
)
# Optimisation par gestes clavier si disponible
if gesture_catalog and hybrid:
hybrid = gesture_catalog.optimize_replay_actions(hybrid)
return hybrid
# Priorité 2 : enrichissement classique (fallback si hybride échoue)
enriched = processor.extract_enriched_actions(workflow, params)
if enriched:
logger.info(
"Replay intelligent : %d actions enrichies depuis le workflow appris",
len(enriched),
)
if gesture_catalog and enriched:
enriched = gesture_catalog.optimize_replay_actions(enriched)
return enriched
# Si l'enrichissement échoue aussi, fallback sur le mode classique
logger.warning(
"Enrichissement échoué pour le workflow appris, fallback mode classique"
)
# Mode classique (VWB/manuels ou fallback)
actions = []
# Construire un index des edges sortants par node
outgoing: Dict[str, list] = defaultdict(list)
for edge in workflow.edges:
outgoing[edge.from_node].append(edge)
# Parcours linéaire depuis le premier entry_node
visited = set()
current_nodes = list(workflow.entry_nodes) if workflow.entry_nodes else []
# Fallback : si pas d'entry_nodes, prendre le premier node
if not current_nodes and workflow.nodes:
current_nodes = [workflow.nodes[0].node_id]
while current_nodes:
node_id = current_nodes.pop(0)
if node_id in visited:
continue
visited.add(node_id)
edges = outgoing.get(node_id, [])
for edge in edges:
edge_actions = _edge_to_normalized_actions(edge, params)
actions.extend(edge_actions)
# Suivre le graphe vers le prochain node
if edge.to_node not in visited:
current_nodes.append(edge.to_node)
# Optimisation : substituer les actions visuelles par des gestes clavier si possible
if gesture_catalog and actions:
actions = gesture_catalog.optimize_replay_actions(actions)
return actions
def _is_learned_workflow(workflow) -> bool:
"""Détecter si un workflow est un workflow appris (vs VWB/manuel).
Un workflow appris a :
- Des nodes avec _prototype_vector dans metadata
- Des edges avec from_node/to_node
- Un learning_state indicatif (OBSERVATION, COACHING, AUTO_CANDIDATE, etc.)
Un workflow VWB/manuel a généralement :
- Des edges avec des target_spec complets (by_text, by_role remplis)
- Pas de prototype_vectors
"""
# Accéder aux données (objet ou dict)
if hasattr(workflow, 'nodes'):
nodes = workflow.nodes
edges = workflow.edges
elif isinstance(workflow, dict):
nodes = workflow.get('nodes', [])
edges = workflow.get('edges', [])
else:
return False
if not nodes or not edges:
return False
# Vérifier si au moins un node a un prototype_vector
has_prototype = False
for node in nodes:
metadata = node.metadata if hasattr(node, 'metadata') else node.get('metadata', {})
if isinstance(metadata, dict) and '_prototype_vector' in metadata:
has_prototype = True
break
return has_prototype
def _edge_to_normalized_actions(edge, params: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Convertir un WorkflowEdge en liste d'actions normalisées pour l'Agent V1.
Un edge simple produit 1 action, un edge compound produit N actions (une par step).
"""
action = edge.action
if action is None:
logger.warning(f"Edge {edge.edge_id} sans action, skip")
return []
action_type = action.type
target = action.target
action_params = action.parameters or {}
# Extraire les coordonnées normalisées depuis TargetSpec.by_position
x_pct = 0.0
y_pct = 0.0
if target and target.by_position:
px, py = target.by_position
if px <= 1.0 and py <= 1.0:
x_pct = px
y_pct = py
else:
ref_w = action_params.get("ref_width", 1920) or 1920
ref_h = action_params.get("ref_height", 1080) or 1080
x_pct = round(px / ref_w, 6)
y_pct = round(py / ref_h, 6)
base = {"edge_id": edge.edge_id, "from_node": edge.from_node, "to_node": edge.to_node}
# Compound : décomposer en actions individuelles
if action_type == "compound":
return _expand_compound_steps(action_params.get("steps", []), base, params)
# Actions simples
normalized = {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"}
if action_type == "mouse_click":
normalized["type"] = "click"
normalized["x_pct"] = x_pct
normalized["y_pct"] = y_pct
normalized["button"] = action_params.get("button", "left")
elif action_type == "text_input":
normalized["type"] = "type"
text = action_params.get("text", "")
text = _substitute_variables(text, params, action_params.get("defaults", {}))
normalized["text"] = text
normalized["x_pct"] = x_pct
normalized["y_pct"] = y_pct
elif action_type == "key_press":
normalized["type"] = "key_combo"
keys = action_params.get("keys", [])
if not keys and action_params.get("key"):
keys = [action_params["key"]]
normalized["keys"] = keys
elif action_type == "pause_for_human":
normalized["type"] = "pause_for_human"
normalized["parameters"] = {
"message": action_params.get("message", "Validation requise"),
}
return [normalized] # pas de target/coords pour cette action logique
elif action_type == "extract_text":
normalized["type"] = "extract_text"
normalized["parameters"] = {
"output_var": action_params.get("output_var", "extracted_text"),
"paragraph": bool(action_params.get("paragraph", True)),
}
return [normalized]
elif action_type == "paste_and_execute":
normalized["type"] = "paste_and_execute"
normalized["parameters"] = {}
return [normalized]
elif action_type == "extract_table":
normalized["type"] = "extract_table"
normalized["parameters"] = {
"output_var": action_params.get("output_var", "table_rows"),
"pattern": action_params.get("pattern"),
"limit": action_params.get("limit"),
"region": action_params.get("region"),
}
return [normalized]
elif action_type == "extract_text_scroll":
# Expansion en séquence : OCR(top) → Ctrl+End → wait → OCR(bottom)
# → concat(top, bottom → final) → Ctrl+Home.
# variable_name (préféré) ou output_var (compat extract_text).
final_var = (
action_params.get("variable_name")
or action_params.get("output_var")
or "extracted_text"
)
paragraph = bool(action_params.get("paragraph", True))
# Pause après scroll Ctrl+End — configurable au step.
# Default 500ms (Wikipedia) ; cible 1500-2000ms pour DPI Citrix lent.
try:
scroll_pause = int(action_params.get("scroll_pause_ms", SCROLL_PAUSE_MS))
except (TypeError, ValueError):
scroll_pause = SCROLL_PAUSE_MS
# Variables internes nommées par préfixe : invisibles à l'utilisateur.
# Préfixe `_` pour signaler "interne" et éviter collision.
top_var = f"__{final_var}_top"
bottom_var = f"__{final_var}_bottom"
return _expand_extract_text_scroll(
base, final_var, top_var, bottom_var, paragraph,
scroll_pause_ms=scroll_pause,
)
elif action_type == "t2a_decision":
normalized["type"] = "t2a_decision"
normalized["parameters"] = {
"input_template": action_params.get("input_template", ""),
"output_var": action_params.get("output_var", "t2a_result"),
"model": action_params.get("model"),
}
return [normalized]
elif action_type == "llm_generate":
normalized["type"] = "llm_generate"
normalized["parameters"] = {
"prompt": action_params.get("prompt", ""),
"context": action_params.get("context", ""),
"output_var": (
action_params.get("output_var")
or action_params.get("variable_name")
or "generated_text"
),
"model": action_params.get("model"),
}
if action_params.get("temperature") is not None:
normalized["parameters"]["temperature"] = action_params.get("temperature")
return [normalized]
else:
logger.warning(f"Type d'action inconnu : {action_type}")
return []
# Ajouter le target_spec complet pour la résolution visuelle
target_spec = {}
if target and target.by_role:
target_spec["by_role"] = target.by_role
normalized["target_role"] = target.by_role # Compat debug
if target and target.by_text:
target_spec["by_text"] = target.by_text
normalized["target_text"] = target.by_text # Compat debug
if target and hasattr(target, 'context_hints') and target.context_hints:
target_spec["context_hints"] = target.context_hints
if target_spec:
normalized["target_spec"] = target_spec
normalized["visual_mode"] = True # Signal à l'agent d'utiliser la résolution visuelle
return [normalized]
def _substitute_variables(text: str, params: Dict[str, Any], defaults: Dict[str, Any]) -> str:
"""Substituer les variables ${var} dans un texte.
Priorité : params utilisateur > defaults du workflow > texte brut inchangé.
Supporte ${var} dans un texte plus long (ex: "${expression}=").
"""
def replacer(match):
var_name = match.group(1)
return str(params.get(var_name, defaults.get(var_name, match.group(0))))
return re.sub(r'\$\{(\w+)\}', replacer, text)
# Regex pour le templating runtime : {{var}} ou {{var.champ}} ou {{var.champ.sous}}
_RUNTIME_VAR_PATTERN = re.compile(r'\{\{\s*(\w+)(?:\.([\w.]+))?\s*\}\}')
def _resolve_runtime_vars_in_str(text: str, variables: Dict[str, Any]) -> str:
"""Remplace {{var}} et {{var.field}} par leur valeur depuis le dict variables.
Variables/champs absents : laissés tels quels (ne casse pas le pipeline).
Pour les valeurs non-str (dict, list), str() est appelé.
"""
def replacer(match):
var_name = match.group(1)
path = match.group(2)
if var_name not in variables:
return match.group(0)
value = variables[var_name]
if path:
for field in path.split('.'):
if isinstance(value, dict) and field in value:
value = value[field]
else:
return match.group(0)
return str(value)
return _RUNTIME_VAR_PATTERN.sub(replacer, text)
def _resolve_runtime_vars(value: Any, variables: Dict[str, Any]) -> Any:
"""Résout récursivement les {{var}} et {{var.field}} dans une valeur.
Supporte str, dict, list. Les autres types sont retournés tels quels.
Si variables est vide ou None, value est retournée inchangée.
"""
if not variables:
return value
if isinstance(value, str):
return _resolve_runtime_vars_in_str(value, variables)
if isinstance(value, dict):
return {k: _resolve_runtime_vars(v, variables) for k, v in value.items()}
if isinstance(value, list):
return [_resolve_runtime_vars(item, variables) for item in value]
return value
# =========================================================================
# Handlers pour les actions exécutées côté serveur (extract_text, t2a_decision)
# =========================================================================
def _normalize_ollama_endpoint(raw_url: str) -> str:
"""Normalise une URL Ollama pour les clients qui attendent l'endpoint racine.
`OLLAMA_URL` est parfois configuré vers `/api/generate` alors que
`LLMActionHandler` attend la racine `http://host:port`.
"""
endpoint = (raw_url or "http://localhost:11434").strip().rstrip("/")
for suffix in ("/api/generate", "/api/chat"):
if endpoint.endswith(suffix):
return endpoint[: -len(suffix)]
return endpoint
def _handle_extract_text_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
session_id: str,
last_heartbeat: Dict[str, Dict[str, Any]],
) -> bool:
"""Traite une action extract_text côté serveur. Stocke le texte OCRisé dans
replay_state["variables"][output_var]. Retourne True si succès.
Robuste aux échecs : si pas de heartbeat ou OCR raté, stocke "" et retourne
False (le pipeline continue, pas de blocage).
"""
params = action.get("parameters") or {}
# Compatibilité VWB : "variable_name" (VWB) et "output_var" (agent libre)
output_var = (params.get("output_var") or params.get("variable_name") or "extracted_text").strip()
paragraph = bool(params.get("paragraph", True))
# Source prioritaire : screenshot envoyé par l'agent après la dernière action.
# Si c'est du base64, on le sauvegarde dans un fichier temp pour l'OCR.
# Fallback : heartbeat de fond (vrai chemin serveur, via "bg_{machine_id}").
path = None
raw_screenshot = replay_state.get("last_screenshot") or ""
if raw_screenshot:
if raw_screenshot.startswith("data:"):
# base64 → fichier temp
try:
import base64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(base64.b64decode(b64data))
tmp.close()
path = tmp.name
except Exception as e:
logger.warning("extract_text: décodage base64 screenshot échoué: %s", e)
elif os.path.isfile(raw_screenshot):
path = raw_screenshot
if not path:
machine_id = replay_state.get("machine_id", "")
bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None
heartbeat = (
last_heartbeat.get(session_id)
or (last_heartbeat.get(bg_session) if bg_session else None)
or {}
)
path = heartbeat.get("path")
text = ""
if path:
try:
from core.llm import extract_text_from_image
text = extract_text_from_image(path, paragraph=paragraph)
except Exception as e:
logger.warning("extract_text OCR échoué (%s) — variable '%s' = ''", e, output_var)
else:
logger.warning(
"extract_text : pas de heartbeat pour session %s — variable '%s' = ''",
session_id, output_var,
)
replay_state.setdefault("variables", {})[output_var] = text
logger.info(
"extract_text → variable '%s' (%d chars) replay %s",
output_var, len(text), replay_state.get("replay_id", "?"),
)
return bool(text)
def _handle_extract_table_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
session_id: str,
last_heartbeat: Dict[str, Dict[str, Any]],
) -> bool:
"""Traite une action extract_table côté serveur. OCR + filtre regex pour
retourner une liste structurée (ex : IPP d'un tableau de patients) qui
pourra être bouclée par le templating ${patients[i]}.
Paramètres reconnus :
output_var : nom de variable runtime (default "table_rows")
pattern : regex à matcher sur chaque token OCR (ex : r"^25\\d{6}$")
limit : nb max d'entrées à retourner
region : (x, y, w, h) en pixels pour cropper avant OCR
(None = image entière)
Robuste aux échecs : si pas de heartbeat ou OCR raté, stocke [] et
retourne False — le pipeline continue.
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or params.get("variable_name") or "table_rows").strip()
pattern = params.get("pattern") or None
limit = params.get("limit")
region = params.get("region") or None
if isinstance(limit, str):
try:
limit = int(limit)
except ValueError:
limit = None
# Source : screenshot du heartbeat (idem extract_text)
path = None
raw_screenshot = replay_state.get("last_screenshot") or ""
if raw_screenshot:
if raw_screenshot.startswith("data:"):
try:
import base64, tempfile
header, b64data = raw_screenshot.split(",", 1)
suffix = ".jpg" if "jpeg" in header else ".png"
tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False)
tmp.write(base64.b64decode(b64data))
tmp.close()
path = tmp.name
except Exception as e:
logger.warning("extract_table: décodage base64 screenshot échoué: %s", e)
elif os.path.isfile(raw_screenshot):
path = raw_screenshot
if not path:
machine_id = replay_state.get("machine_id", "")
bg_session = f"bg_{machine_id}" if machine_id and machine_id != "default" else None
heartbeat = (
last_heartbeat.get(session_id)
or (last_heartbeat.get(bg_session) if bg_session else None)
or {}
)
path = heartbeat.get("path")
rows: list = []
if path:
try:
from core.llm import extract_table_from_image
rows = extract_table_from_image(
path,
region=tuple(region) if region else None,
pattern=pattern,
limit=limit,
)
except Exception as e:
logger.warning(
"extract_table OCR échoué (%s) — variable '%s' = []", e, output_var,
)
else:
logger.warning(
"extract_table : pas de heartbeat pour session %s — variable '%s' = []",
session_id, output_var,
)
replay_state.setdefault("variables", {})[output_var] = rows
logger.info(
"extract_table → variable '%s' (%d entrées, pattern=%r, limit=%s) replay %s",
output_var, len(rows), pattern, limit, replay_state.get("replay_id", "?"),
)
return bool(rows)
def _handle_t2a_decision_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Traite une action t2a_decision côté serveur. Stocke le résultat JSON
dans replay_state["variables"][output_var]. Retourne True si succès.
Le DPI à analyser vient de action.parameters.input_template (déjà résolu
par _resolve_runtime_vars donc les {{var}} sont remplis).
"""
params = action.get("parameters") or {}
output_var = (params.get("output_var") or "t2a_result").strip()
dpi_text = (params.get("input_template") or params.get("dpi") or "").strip()
model = params.get("model") or None # None → DEFAULT_MODEL
# Bypass LLM : si static_result est fourni dans les params, on l'utilise
# tel quel comme résultat. Utile pour les démos déterministes (pas de
# hallucination, pas de latence, pas de truncation de prompt).
static_result = params.get("static_result")
if isinstance(static_result, dict) and static_result.get("decision"):
replay_state.setdefault("variables", {})[output_var] = static_result
logger.info(
"t2a_decision (STATIC) → variable '%s' decision=%s replay %s",
output_var, static_result.get("decision"), replay_state.get("replay_id", "?"),
)
return True
if not dpi_text:
logger.warning(
"t2a_decision : input vide — variable '%s' = {decision: 'INDETERMINE'}", output_var,
)
replay_state.setdefault("variables", {})[output_var] = {
"decision": "INDETERMINE",
"justification": "DPI vide ou non extrait",
"confiance": "faible",
"_error": "empty_input",
}
return False
try:
from core.llm import analyze_dpi, DEFAULT_MODEL, build_dpi_enriched
# Enrichissement déterministe avant LLM : injection FAITS_CALCULÉS en
# tête (durée, âge, CCMU/GEMSA/priorité…) pour neutraliser les
# hallucinations de durée (cf. bug "23h" MOREL). La metadata est
# capturée pour les garde-fous serveur (commit 2 — Python ↔ LLM).
dpi_enriched, metadata = build_dpi_enriched(dpi_text)
logger.info(
"[build_dpi_enriched] duree_python=%sh decision_terrain=%r warnings=%d",
metadata.get("duree_heures_decimales"),
metadata.get("decision_terrain"),
len(metadata.get("parsing_warnings", [])),
)
result = analyze_dpi(dpi_enriched, model=model or DEFAULT_MODEL)
except Exception as e:
logger.warning("t2a_decision : analyze_dpi exception %s", e)
result = {
"decision": "INDETERMINE",
"justification": f"Erreur analyse : {e}",
"confiance": "faible",
"_error": str(e),
}
# Si parse_error, injecter des valeurs de fallback pour que les templates restent lisibles
if result.get("_parse_error"):
raw_preview = result.get("_raw", "")[:200]
logger.warning("t2a_decision parse_error — raw: %s", raw_preview)
result.setdefault("decision", "INDETERMINE")
result.setdefault("decision_court", "À vérifier")
result.setdefault("preuve_critere1", "Analyse non disponible (erreur LLM)")
result.setdefault("preuve_critere2", "Analyse non disponible (erreur LLM)")
result.setdefault("preuve_critere3", "Analyse non disponible (erreur LLM)")
result.setdefault("justification", f"Réponse LLM non parsable : {raw_preview}")
result.setdefault("confiance", "faible")
replay_state.setdefault("variables", {})[output_var] = result
decision = result.get("decision", "?")
elapsed = result.get("_elapsed_s", "?")
logger.info(
"t2a_decision → variable '%s' decision=%s (%ss) replay %s",
output_var, decision, elapsed, replay_state.get("replay_id", "?"),
)
return "_error" not in result and not result.get("_parse_error")
def _handle_llm_generate_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Traite une action llm_generate côté serveur.
Stocke le texte généré dans replay_state["variables"][output_var].
Les paramètres `prompt` et `context` sont déjà résolus via le templating
runtime avant d'arriver ici.
"""
params = action.get("parameters") or {}
output_var = (
params.get("output_var")
or params.get("variable_name")
or "generated_text"
).strip()
# Bypass LLM : si static_text est fourni dans les params, on l'utilise
# tel quel. Utile pour les démos déterministes.
static_text = params.get("static_text")
if isinstance(static_text, str) and static_text.strip():
replay_state.setdefault("variables", {})[output_var] = static_text
logger.info(
"llm_generate (STATIC) → variable '%s' (%d chars) replay %s",
output_var, len(static_text), replay_state.get("replay_id", "?"),
)
return True
prompt = str(params.get("prompt") or "").strip()
context = str(params.get("context") or "")
model = params.get("model") or None
temperature = None
if params.get("temperature") is not None:
try:
temperature = float(params.get("temperature"))
except (TypeError, ValueError):
logger.warning(
"llm_generate : temperature invalide %r — fallback valeur par défaut",
params.get("temperature"),
)
if not prompt:
logger.warning(
"llm_generate : prompt vide — variable '%s' = ''",
output_var,
)
replay_state.setdefault("variables", {})[output_var] = ""
return False
generated = ""
try:
from core.execution.llm_actions import LLMActionHandler
handler = LLMActionHandler(
ollama_endpoint=_normalize_ollama_endpoint(
os.environ.get("OLLAMA_URL", "http://localhost:11434")
),
timeout=180,
)
generated = handler.generate_text(
prompt=prompt,
context=context,
model=model,
temperature=temperature,
).strip()
except Exception as e:
logger.warning("llm_generate : génération échouée (%s) — variable '%s' = ''", e, output_var)
replay_state.setdefault("variables", {})[output_var] = generated
logger.info(
"llm_generate → variable '%s' (%d chars, model=%s) replay %s",
output_var,
len(generated),
model or "default",
replay_state.get("replay_id", "?"),
)
return bool(generated)
def _handle_concat_text_vars_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Traite une action serveur interne `_concat_text_vars`.
Concatène deux variables runtime existantes (top_var + separator + bottom_var)
et écrit le résultat dans output_var. Variables manquantes traitées comme "".
Action générée par l'expansion de `extract_text_scroll` ; pas exposée à
l'utilisateur final. Robuste aux échecs OCR amont (l'une ou l'autre var vide).
"""
params = action.get("parameters") or {}
top_var = (params.get("top_var") or "").strip()
bottom_var = (params.get("bottom_var") or "").strip()
output_var = (params.get("output_var") or "extracted_text").strip()
separator = params.get("separator", "\n\n")
variables = replay_state.setdefault("variables", {})
top_text = str(variables.get(top_var, "") or "")
bottom_text = str(variables.get(bottom_var, "") or "")
# Si les deux sont vides, output reste "" (cohérent avec _handle_extract_text_action).
# Si un seul est vide, on évite un séparateur inutile en début/fin.
if top_text and bottom_text:
merged = top_text + separator + bottom_text
else:
merged = top_text or bottom_text
variables[output_var] = merged
# Nettoyage des variables internes pour ne pas polluer l'état.
if top_var.startswith("__"):
variables.pop(top_var, None)
if bottom_var.startswith("__"):
variables.pop(bottom_var, None)
logger.info(
"extract_text_scroll concat → variable '%s' (%d chars) replay %s",
output_var, len(merged), replay_state.get("replay_id", "?"),
)
return bool(merged)
def _handle_paste_and_execute_action(
action: Dict[str, Any],
replay_state: Dict[str, Any],
) -> bool:
"""Action serveur : invoque scripts/paste_and_execute_linuxdb.sh pour
déclencher Ctrl+V + Ctrl+Enter dans DBeaver de la VM via ydotool.
Bypasse Léa/NoMachine (Ctrl mangé par NoMachine passive grab).
Cf. handoff 2026-05-16_handoff_ydotool_clipboard.md.
"""
import subprocess
script_path = "/home/dom/ai/rpa_vision_v3/scripts/paste_and_execute_linuxdb.sh"
try:
result = subprocess.run(
[script_path],
timeout=30,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.warning(
"paste_and_execute échoué (rc=%d) stderr=%s",
result.returncode, (result.stderr or "")[:500],
)
return False
logger.info("paste_and_execute OK replay %s", replay_state.get("replay_id", "?"))
return True
except subprocess.TimeoutExpired:
logger.warning("paste_and_execute timeout (30s)")
return False
except Exception as e:
logger.warning("paste_and_execute exception : %s", e)
return False
def _expand_extract_text_scroll(
base: Dict[str, Any],
final_var: str,
top_var: str,
bottom_var: str,
paragraph: bool,
scroll_pause_ms: int = SCROLL_PAUSE_MS,
) -> List[Dict[str, Any]]:
"""Expanse un step extract_text_scroll en séquence d'actions atomiques.
Séquence générée :
1. extract_text(top_var) — OCR zone visible (haut de page)
2. key_combo(ctrl+end) — scroll bas (côté client Léa V1)
3. wait(scroll_pause_ms) — laisse DOM/UI se redessiner
4. extract_text(bottom_var) — OCR zone visible (bas de page)
5. _concat_text_vars(top, bottom→final) — action serveur interne
6. key_combo(ctrl+home) — remet en haut
Toutes les sous-actions héritent de `base` (edge_id, from_node, to_node)
pour la traçabilité. Chaque action obtient un action_id unique.
`scroll_pause_ms` : configurable au step (défaut SCROLL_PAUSE_MS=500ms).
"""
def _new_action() -> Dict[str, Any]:
return {**base, "action_id": f"act_{uuid.uuid4().hex[:8]}"}
a1 = _new_action()
a1["type"] = "extract_text"
a1["parameters"] = {"output_var": top_var, "paragraph": paragraph}
a2 = _new_action()
a2["type"] = "key_combo"
a2["keys"] = ["ctrl", "end"]
a3 = _new_action()
a3["type"] = "wait"
a3["duration_ms"] = scroll_pause_ms
a4 = _new_action()
a4["type"] = "extract_text"
a4["parameters"] = {"output_var": bottom_var, "paragraph": paragraph}
a5 = _new_action()
a5["type"] = "_concat_text_vars"
a5["parameters"] = {
"top_var": top_var,
"bottom_var": bottom_var,
"output_var": final_var,
"separator": "\n\n",
}
a6 = _new_action()
a6["type"] = "key_combo"
a6["keys"] = ["ctrl", "home"]
return [a1, a2, a3, a4, a5, a6]
def _expand_compound_steps(
steps: List[Dict[str, Any]], base: Dict[str, Any], params: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Décomposer les steps d'un compound en actions individuelles."""
actions = []
for step in steps:
step_type = step.get("type", "unknown")
action = {
**base,
"action_id": f"act_{uuid.uuid4().hex[:8]}",
}
if step_type == "key_press":
action["type"] = "key_combo"
keys = step.get("keys", [])
if not keys and step.get("key"):
keys = [step["key"]]
action["keys"] = keys
elif step_type == "text_input":
action["type"] = "type"
text = step.get("text", "")
text = _substitute_variables(text, params, {})
action["text"] = text
elif step_type == "wait":
action["type"] = "wait"
action["duration_ms"] = step.get("duration_ms", 500)
elif step_type == "mouse_click":
action["type"] = "click"
action["x_pct"] = step.get("x_pct", 0.0)
action["y_pct"] = step.get("y_pct", 0.0)
action["button"] = step.get("button", "left")
else:
logger.debug(f"Step compound inconnu : {step_type}")
continue
actions.append(action)
return actions
# =========================================================================
# Pre-check écran — Vérification pré-action par embedding CLIP
# =========================================================================
def _pre_check_screen_state(
session_id: str,
expected_node_id: str,
current_screenshot_path: str,
active_processor,
replay_states: Dict[str, Dict[str, Any]],
replay_lock: threading.Lock,
precheck_threshold: float = 0.85,
) -> Dict[str, Any]:
"""Vérifier que l'écran actuel correspond à l'état attendu du node.
Compare le screenshot actuel avec le prototype du node attendu
via similarité d'embedding CLIP (rapide, ~200ms).
Args:
session_id: ID de la session de replay
expected_node_id: ID du node source de l'action (from_node)
current_screenshot_path: Chemin du screenshot heartbeat récent
active_processor: Instance StreamProcessor avec le CLIPEmbedder chargé
replay_states: Dict partagé des états de replay
replay_lock: Lock pour l'accès concurrent aux replay_states
precheck_threshold: Seuil de similarité cosine
Returns:
{"match": True/False, "similarity": float, "expected_node": str,
"reason": str (si mismatch), "popup_detected": bool}
"""
result: Dict[str, Any] = {
"match": True,
"similarity": 1.0,
"expected_node": expected_node_id,
"popup_detected": False,
}
try:
# 1. Trouver le workflow actif pour cette session
replay_state = None
workflow = None
with replay_lock:
for state in replay_states.values():
if state["session_id"] == session_id and state["status"] == "running":
replay_state = state
break
if not replay_state:
result["reason"] = "no_active_replay"
return result
workflow_id = replay_state.get("workflow_id", "")
with active_processor._data_lock:
workflow = active_processor._workflows.get(workflow_id)
if workflow is None:
result["reason"] = "workflow_not_found"
return result
# 2. Récupérer le prototype du node attendu
# Supporter à la fois les objets Workflow et les dicts bruts
node = None
if hasattr(workflow, "get_node"):
node = workflow.get_node(expected_node_id)
elif isinstance(workflow, dict):
# Format dict brut (workflows VWB/manuels)
for n in workflow.get("nodes", []):
if n.get("node_id") == expected_node_id:
node = n
break
if node is None:
result["reason"] = "node_not_found"
return result
# Extraire le prototype vector
metadata = node.metadata if hasattr(node, "metadata") else node.get("metadata", {})
proto_list = metadata.get("_prototype_vector")
if not proto_list or not isinstance(proto_list, (list, tuple)):
result["reason"] = "no_prototype_vector"
return result
import numpy as np
prototype_vector = np.array(proto_list, dtype=np.float32)
# 3. Calculer l'embedding CLIP du screenshot actuel
active_processor._ensure_initialized()
if active_processor._clip_embedder is None:
result["reason"] = "clip_embedder_unavailable"
return result
from PIL import Image
pil_image = Image.open(current_screenshot_path)
current_vector = active_processor._clip_embedder.embed_image(pil_image)
if current_vector is None or len(current_vector) == 0:
result["reason"] = "embedding_failed"
return result
# 4. Similarité cosine
current_vector = current_vector.flatten().astype(np.float32)
prototype_vector = prototype_vector.flatten().astype(np.float32)
norm_current = np.linalg.norm(current_vector)
norm_proto = np.linalg.norm(prototype_vector)
if norm_current < 1e-8 or norm_proto < 1e-8:
result["reason"] = "zero_norm_vector"
result["match"] = False
result["similarity"] = 0.0
return result
similarity = float(
np.dot(current_vector, prototype_vector) / (norm_current * norm_proto)
)
result["similarity"] = round(similarity, 4)
result["match"] = similarity >= precheck_threshold
if not result["match"]:
result["reason"] = "screen_mismatch"
logger.warning(
f"Pre-check MISMATCH pour session={session_id} "
f"node={expected_node_id}: similarity={similarity:.4f} "
f"< seuil={precheck_threshold}"
)
# 5. Détection de popup par changement de titre de fenêtre
result["popup_detected"] = _detect_popup_hint(
session_id, workflow, expected_node_id, active_processor,
)
except Exception as e:
# Ne jamais bloquer le replay en cas d'erreur du pre-check
logger.error(f"Pre-check échoué (non bloquant): {e}")
result["match"] = True # Fallback permissif
result["reason"] = f"precheck_error: {e}"
return result
def _detect_popup_hint(
session_id: str,
workflow: Any,
expected_node_id: str,
processor_instance=None,
) -> bool:
"""Détecter si une popup ou un dialogue modal est probable.
Compare le titre de fenêtre actuel (via last_window_info de la session)
avec le titre attendu du node dans le workflow. Un changement de titre
suggère une popup/dialogue inattendu.
Args:
session_id: ID de la session
workflow: Workflow object ou dict
expected_node_id: ID du node attendu
processor_instance: StreamProcessor pour accéder aux sessions
Returns:
True si un changement de titre suggère une popup
"""
try:
if processor_instance is None:
return False
# Titre actuel depuis la session
session = processor_instance.session_manager.get_session(session_id)
if not session:
return False
current_title = session.last_window_info.get("title", "").strip().lower()
if not current_title or current_title == "unknown":
return False
# Titre attendu depuis le node du workflow
expected_title = ""
if hasattr(workflow, "get_node"):
node = workflow.get_node(expected_node_id)
if node and hasattr(node, "template") and hasattr(node.template, "window"):
window_spec = node.template.window
if hasattr(window_spec, "title_contains") and window_spec.title_contains:
expected_title = window_spec.title_contains.strip().lower()
elif isinstance(workflow, dict):
for n in workflow.get("nodes", []):
if n.get("node_id") == expected_node_id:
template = n.get("template", {})
window = template.get("window", {})
expected_title = (window.get("title_contains") or "").strip().lower()
break
if not expected_title:
return False
# Si le titre actuel ne contient plus le titre attendu, popup probable
if expected_title not in current_title:
logger.info(
f"Popup détectée: titre actuel='{current_title}' "
f"ne contient pas '{expected_title}'"
)
return True
except Exception as e:
logger.debug(f"Détection popup échouée: {e}")
return False
# =========================================================================
# Replay — État et retry
# =========================================================================
def _create_replay_state(
replay_id: str,
workflow_id: str,
session_id: str,
total_actions: int,
params: Optional[Dict[str, Any]] = None,
machine_id: Optional[str] = None,
actions: Optional[List[Dict[str, Any]]] = None,
) -> Dict[str, Any]:
"""Créer un état de replay enrichi avec les champs de suivi d'erreur.
Args:
actions: Liste des actions du replay. Une copie slim (sans anchors
base64) est stockée pour permettre à `/replay/result` de
retrouver le `target_spec` de l'action courante — nécessaire
pour l'apprentissage mémoire (Phase 1 plan Léa).
"""
# Copie slim des actions : on strip les anchor_image_base64 pour ne
# pas gonfler la mémoire (anchors peuvent faire 50-200 KB chacun).
# On conserve les champs utilisés par :
# - la Phase 1 apprentissage (target_spec pour memory_record_success)
# - le contrôle strict (success_strict, expected_window_*)
# - les logs/audit (intention, action_id, type, coords)
actions_slim: List[Dict[str, Any]] = []
if actions:
for a in actions:
a_copy = {
"action_id": a.get("action_id"),
"type": a.get("type"),
"x_pct": a.get("x_pct"),
"y_pct": a.get("y_pct"),
# Contrôle strict des étapes (Dom, matin 10 avril 2026)
"success_strict": a.get("success_strict", False),
"expected_window_before": a.get("expected_window_before", ""),
"expected_window_title": a.get("expected_window_title", ""),
# Contexte métier utile pour logs et apprentissage
"intention": a.get("intention", ""),
}
ts = a.get("target_spec")
if isinstance(ts, dict):
a_copy["target_spec"] = {
k: v for k, v in ts.items()
if k not in ("anchor_image_base64",)
}
actions_slim.append(a_copy)
return {
"replay_id": replay_id,
"workflow_id": workflow_id,
"session_id": session_id,
"machine_id": machine_id or "default", # Machine cible du replay
"status": "running",
"total_actions": total_actions,
"completed_actions": 0,
"failed_actions": 0,
"current_action_index": 0,
"params": params or {},
"results": [], # Historique des résultats action par action
"actions": actions_slim, # Copie slim pour lookup par index (Phase 1 mémoire)
# Champs enrichis pour le suivi d'erreur (#7)
"retried_actions": 0,
"unverified_actions": 0,
"error_log": [], # Liste des erreurs rencontrées
"last_screenshot": None, # Path du dernier screenshot reçu
"_last_screenshot_before": None, # Interne: screenshot avant la dernière action
# Champs pour pause supervisée (target_not_found)
"failed_action": None, # Contexte de l'action en echec (quand paused_need_help)
"pause_message": None, # Message a afficher a l'utilisateur
# Variables d'exécution produites en cours de workflow (extract_text,
# t2a_decision, etc.). Résolues via templating {{var}} ou {{var.field}}
# dans les paramètres des actions suivantes.
"variables": {},
# QW2 — Anneaux d'historique pour LoopDetector (5 derniers max)
"_screenshot_history": [], # images PIL des N derniers heartbeats (LoopDetector embed à chaque tick)
"_action_history": [], # N dernières actions exécutées (signature)
# QW4 — Safety checks (hybride déclaratif + LLM contextuel) et audit acquittements
"safety_checks": [], # liste produite par SafetyChecksProvider
"checks_acknowledged": [], # ids acquittés via /replay/resume (audit trail)
"pause_reason": "", # "loop_detected" | "" pour V1
"pause_payload": None, # payload complet pour debug/audit
}
def _schedule_retry(
session_id: str,
replay_state: Dict[str, Any],
action: Dict[str, Any],
current_retry: int,
reason: str,
replay_queues: Dict[str, List[Dict[str, Any]]],
retry_pending: Dict[str, Dict[str, Any]],
max_retries: int = 3,
):
"""
Programmer un retry pour une action échouée.
Stratégie :
- Retry 1 : réinjecter l'action directement (re-résolution visuelle par l'agent)
- Retry 2 : injecter un wait de 2s avant l'action (possible loading en cours)
- Retry 3 : dernier essai direct
L'action est réinsérée en tête de la queue pour être la prochaine exécutée.
Le lock de replay doit être acquis par l'appelant.
"""
next_retry = current_retry + 1
replay_state["retried_actions"] += 1
# Créer une copie de l'action avec un nouveau action_id pour le tracking
retry_action = dict(action)
retry_action_id = f"{action.get('action_id', 'unknown')}_retry{next_retry}"
retry_action["action_id"] = retry_action_id
# Stocker l'info de retry pour le prochain report_action_result
retry_pending[retry_action_id] = {
"action": action,
"dispatched_action": retry_action,
"retry_count": next_retry,
"replay_id": replay_state["replay_id"],
"session_id": session_id,
"machine_id": replay_state.get("machine_id", "default"),
"dispatched_at": 0.0,
"first_dispatched_at": 0.0,
"resent_count": 0,
"last_resent_at": 0.0,
"reason": reason,
}
# Stratégie de retry selon le numéro
actions_to_insert = []
if next_retry == 2:
# Retry 2 : injecter un wait de 2s avant l'action
wait_action = {
"action_id": f"wait_retry_{uuid.uuid4().hex[:6]}",
"type": "wait",
"duration_ms": 2000,
}
actions_to_insert.append(wait_action)
actions_to_insert.append(retry_action)
# Insérer en tête de la queue (prochaine action à exécuter)
queue = replay_queues.get(session_id, [])
replay_queues[session_id] = actions_to_insert + queue
logger.info(
f"Retry {next_retry}/{max_retries} programmé pour {action.get('action_id')} "
f"(raison: {reason}) | nouveau id: {retry_action_id}"
)
def _notify_error_callback(
replay_state: Dict[str, Any],
action_id: str,
error: Optional[str],
error_callbacks: Dict[str, str],
):
"""
Notifier le callback d'erreur si configuré pour ce replay.
Appel HTTP POST non-bloquant vers l'URL de callback.
En cas d'échec de notification, on log mais on ne bloque pas.
"""
replay_id = replay_state["replay_id"]
callback_url = error_callbacks.get(replay_id)
if not callback_url:
return
def _send_callback():
try:
import urllib.request
payload = json.dumps({
"replay_id": replay_id,
"workflow_id": replay_state.get("workflow_id"),
"session_id": replay_state.get("session_id"),
"action_id": action_id,
"error": error or "Erreur inconnue",
"retried_actions": replay_state.get("retried_actions", 0),
"error_log": replay_state.get("error_log", []),
"status": replay_state.get("status"),
}).encode("utf-8")
req = urllib.request.Request(
callback_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=5) as resp:
logger.info(
f"Error callback envoyé à {callback_url}: {resp.status}"
)
except Exception as e:
logger.warning(
f"Échec envoi error callback à {callback_url}: {e}"
)
# Envoyer en arrière-plan pour ne pas bloquer
threading.Thread(target=_send_callback, daemon=True).start()