Files
rpa_vision_v3/agent_v0/server_v1/execution_plan_runner.py
Dom cecdf417b7 fix: contrôle strict des étapes + routage par machine_id
Corrections critiques après test E2E qui montrait des clics au mauvais endroit :

1. Routage par machine_id (api_stream.py)
   Quand 2 machines partagent le même session_id (agent_demo_user),
   les actions d'un replay pour la VM ne doivent PLUS être distribuées
   au PC physique. Vérification que le replay_state appartient bien à
   la machine qui poll avant de consommer la queue.

2. IRBuilder extrait expected_window_before/after (ir_builder.py)
   Pour chaque action click/type/key_combo, stocke le titre de la fenêtre
   au moment du clic (before) et le titre du prochain événement (after).
   Ces champs alimentent le contrôle strict au runtime.

3. ExecutionCompiler crée SuccessCondition title_match (execution_compiler.py)
   Quand expected_window_after est défini, crée une condition de succès
   STRICTE avec method="title_match" et expected_title. Plus de simple
   "l'écran a changé" — on vérifie la fenêtre résultante.

4. Runner propage expected_window_before et success_strict
   Le flag success_strict indique à l'agent que le contrôle post-action
   DOIT être strict (STOP sur mismatch au lieu de warning).

5. UIA strict sur parent_path (executor.py)
   _resolve_via_uia_local REJETTE un match si l'élément trouvé n'est pas
   dans la bonne fenêtre parente (évite ex: "Rechercher" taskbar confondu
   avec "Rechercher" explorateur).

6. Pré/post vérif stricte et bloquante (executor.py)
   - expected_window_before lu en priorité depuis l'action (plan V4)
   - Post-vérif : si success_strict=True et timeout, result.success=False
     → le replay s'arrête au lieu de continuer avec des warnings.

Validé sur la VM :
- Le replay s'arrête proprement quand l'étape 2 aboutit dans "Propriétés de
  Internet" au lieu de "blocnote.txt - Bloc-notes"
- Plus de clics en aveugle / saisie au mauvais endroit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-10 14:05:23 +02:00

374 lines
13 KiB
Python

# agent_v0/server_v1/execution_plan_runner.py
"""
ExecutionPlanRunner — Adaptateur ExecutionPlan → actions replay.
Pièce d'intégration du pipeline V4 :
RawTrace → IRBuilder → WorkflowIR → ExecutionCompiler → ExecutionPlan → Runtime
Ce module convertit un `ExecutionPlan` (plan pré-compilé, déterministe) en
liste d'actions au format attendu par l'executor replay actuel (clé x_pct,
y_pct, target_spec, etc.), puis les injecte dans `_replay_queues`.
L'ancien chemin `build_replay_from_raw_events()` dans stream_processor.py
reste inchangé — les deux chemins coexistent pendant la transition.
Format d'action produit (compatible executor existant) :
{
"action_id": "act_...",
"type": "click",
"x_pct": 0.5,
"y_pct": 0.3,
"visual_mode": True,
"target_spec": {
"by_text": "...",
"window_title": "...",
"vlm_description": "...",
"anchor_image_base64": "...",
},
"expected_window_title": "...",
}
Auteur: Dom, Alice - Avril 2026
"""
from __future__ import annotations
import logging
import re
import threading
import uuid
from typing import Any, Dict, List, Optional
from core.workflow.execution_plan import (
ExecutionNode,
ExecutionPlan,
ResolutionStrategy,
)
logger = logging.getLogger(__name__)
# =========================================================================
# Substitution de variables
# =========================================================================
# Le WorkflowIR utilise la syntaxe `{var}` dans les champs texte.
# Ici on supporte les deux : `{var}` (IR natif) et `${var}` (replay legacy).
_VARIABLE_RE_CURLY = re.compile(r"\{(\w+)\}")
_VARIABLE_RE_DOLLAR = re.compile(r"\$\{(\w+)\}")
def substitute_variables(text: str, variables: Dict[str, Any]) -> str:
"""Remplacer `{var}` et `${var}` par leurs valeurs.
Priorité : variables fournies > placeholder brut (inchangé si inconnu).
"""
if not text or not variables:
return text
def replacer(match: "re.Match[str]") -> str:
var_name = match.group(1)
if var_name in variables:
return str(variables[var_name])
return match.group(0)
text = _VARIABLE_RE_DOLLAR.sub(replacer, text)
text = _VARIABLE_RE_CURLY.sub(replacer, text)
return text
# =========================================================================
# Conversion ExecutionNode → action replay
# =========================================================================
def _strategy_to_target_spec(
strategy: Optional[ResolutionStrategy],
fallbacks: Optional[List[ResolutionStrategy]] = None,
intent: str = "",
) -> Dict[str, Any]:
"""Construire un `target_spec` depuis les stratégies de résolution.
Fusionne la primaire et les fallbacks pour donner un maximum d'indices
au resolve_engine :
- OCR → by_text
- template → anchor_image_base64 (depuis anchor_b64)
- VLM → vlm_description
Règle V4 : la stratégie primaire dicte la méthode préférée.
Le champ `resolve_order` liste les méthodes dans l'ordre à essayer.
Le resolve_engine honore cet ordre au lieu de sa cascade par défaut.
resolve_order est la clé du "zéro VLM au runtime" :
- ["ocr", "template", "vlm"] → V4 typique (OCR rapide)
- ["template", "ocr", "vlm"] → apprentissage : template marche mieux
- ["vlm"] → éléments sans texte (icônes)
"""
spec: Dict[str, Any] = {}
all_strategies: List[ResolutionStrategy] = []
if strategy is not None:
all_strategies.append(strategy)
if fallbacks:
all_strategies.extend(fallbacks)
by_text_candidate = ""
anchor_candidate = ""
vlm_candidate = ""
uia_data: Dict[str, Any] = {}
dom_data: Dict[str, Any] = {}
resolve_order: List[str] = []
seen_methods: set = set()
for strat in all_strategies:
if not strat:
continue
if strat.method == "ocr" and strat.target_text and not by_text_candidate:
by_text_candidate = strat.target_text
elif strat.method == "template":
if strat.anchor_b64 and not anchor_candidate:
anchor_candidate = strat.anchor_b64
if strat.target_text and not by_text_candidate:
by_text_candidate = strat.target_text
elif strat.method == "vlm" and strat.vlm_description and not vlm_candidate:
vlm_candidate = strat.vlm_description
elif strat.method == "uia" and strat.uia_name and not uia_data:
uia_data = {
"name": strat.uia_name,
"control_type": strat.uia_control_type,
"automation_id": strat.uia_automation_id,
"parent_path": strat.uia_parent_path,
}
elif strat.method == "dom" and strat.dom_selector and not dom_data:
dom_data = {
"selector": strat.dom_selector,
"xpath": strat.dom_xpath,
"url_pattern": strat.dom_url_pattern,
}
# Construire l'ordre des méthodes (dans l'ordre primaire → fallbacks)
if strat.method and strat.method not in seen_methods:
resolve_order.append(strat.method)
seen_methods.add(strat.method)
if by_text_candidate:
spec["by_text"] = by_text_candidate
if anchor_candidate:
spec["anchor_image_base64"] = anchor_candidate
if vlm_candidate:
spec["vlm_description"] = vlm_candidate
elif intent and "vlm_description" not in spec:
# L'intention métier devient le prompt VLM de dernier recours
spec["vlm_description"] = intent
# Données UIA — consommées par l'agent Windows via lea_uia.exe
if uia_data:
spec["uia_target"] = uia_data
# Données DOM — consommées par l'agent Windows via CDP (futur)
if dom_data:
spec["dom_target"] = dom_data
# Ordre de résolution pré-compilé — c'est LA pièce centrale du V4
if resolve_order:
spec["resolve_order"] = resolve_order
return spec
def execution_node_to_action(
node: ExecutionNode,
variables: Optional[Dict[str, Any]] = None,
id_prefix: str = "act_plan",
) -> Optional[Dict[str, Any]]:
"""Convertir un `ExecutionNode` en action replay.
Retourne `None` si le nœud n'est pas exécutable (type inconnu).
Args:
node: Le nœud à convertir.
variables: Dictionnaire de variables pour substituer les {var}.
id_prefix: Préfixe pour l'action_id générée.
"""
variables = variables or {}
action: Dict[str, Any] = {
"action_id": f"{id_prefix}_{uuid.uuid4().hex[:8]}",
"plan_node_id": node.node_id,
}
if node.intent:
action["intention"] = node.intent
if node.step_id:
action["plan_step_id"] = node.step_id
if node.is_optional:
action["is_optional"] = True
# Métadonnées d'exécution utiles au runtime
if node.timeout_ms:
action["timeout_ms"] = node.timeout_ms
if node.max_retries:
action["max_retries"] = node.max_retries
if node.recovery_action:
action["recovery_action"] = node.recovery_action
if node.success_condition:
action["success_condition"] = node.success_condition.to_dict()
action_type = node.action_type
if action_type == "click":
action["type"] = "click"
strategy = node.strategy_primary
fallbacks = node.strategy_fallbacks or []
# ── Déduction des coordonnées depuis la stratégie primaire ──
# - OCR : pas de coordonnées (le runtime trouve via OCR)
# - template : l'anchor sera utilisé au runtime
# - VLM : la description sera utilisée au runtime
# Dans tous les cas le resolve_engine retrouve les pixels au replay.
# On expose néanmoins un centre (0.5, 0.5) neutre pour rester
# compatible avec les validations de queue existantes.
action["x_pct"] = 0.5
action["y_pct"] = 0.5
action["visual_mode"] = True
target_spec = _strategy_to_target_spec(
strategy=strategy,
fallbacks=fallbacks,
intent=node.intent,
)
# Titre fenêtre attendu AVANT (pré-vérif stricte)
# Si absent, aucune pré-vérif → l'action s'exécute quel que soit l'écran
if node.expected_window_before:
action["expected_window_before"] = node.expected_window_before
target_spec["window_title"] = node.expected_window_before
# Titre fenêtre attendu APRÈS (post-vérif stricte)
# C'est la garantie de passage à l'action suivante
if node.success_condition and node.success_condition.expected_title:
action["expected_window_title"] = node.success_condition.expected_title
action["success_strict"] = (
node.success_condition.method == "title_match"
)
if "window_title" not in target_spec:
target_spec["window_title"] = node.success_condition.expected_title
if target_spec:
action["target_spec"] = target_spec
elif action_type == "type":
action["type"] = "type"
text = node.text or ""
# Substituer les variables avant d'envoyer (ex: {patient} → "DUPONT")
action["text"] = substitute_variables(text, variables)
if node.variable_name:
action["variable_name"] = node.variable_name
elif action_type in ("key_combo", "key_press"):
action["type"] = "key_combo"
keys = list(node.keys or [])
if not keys:
return None
action["keys"] = keys
elif action_type == "wait":
action["type"] = "wait"
duration = node.duration_ms or 1000
action["duration_ms"] = int(duration)
elif action_type == "scroll":
action["type"] = "scroll"
# Les stratégies peuvent contenir une zone — pas exploitée ici,
# le scroll est implicitement sur la fenêtre active.
action["delta"] = -3
else:
logger.debug("execution_node_to_action: type inconnu '%s' ignoré", action_type)
return None
return action
def execution_plan_to_actions(
plan: ExecutionPlan,
variables: Optional[Dict[str, Any]] = None,
id_prefix: str = "act_plan",
) -> List[Dict[str, Any]]:
"""Convertir un `ExecutionPlan` complet en liste d'actions replay.
Les variables passées en argument écrasent celles du plan.
"""
merged_vars: Dict[str, Any] = dict(plan.variables or {})
if variables:
merged_vars.update(variables)
actions: List[Dict[str, Any]] = []
for node in plan.nodes:
action = execution_node_to_action(
node=node,
variables=merged_vars,
id_prefix=id_prefix,
)
if action is not None:
actions.append(action)
logger.info(
"execution_plan_to_actions(%s) : %d nœuds → %d actions replay "
"(vars=%d)",
plan.plan_id, plan.total_nodes, len(actions), len(merged_vars),
)
return actions
# =========================================================================
# Injection dans la queue de replay
# =========================================================================
def inject_plan_into_queue(
plan: ExecutionPlan,
session_id: str,
replay_queues: Dict[str, List[Dict[str, Any]]],
variables: Optional[Dict[str, Any]] = None,
lock: Optional[threading.Lock] = None,
replace: bool = True,
id_prefix: str = "act_plan",
) -> List[Dict[str, Any]]:
"""Injecter un `ExecutionPlan` dans la queue de replay d'une session.
Args:
plan: Le plan à exécuter.
session_id: La session Agent V1 cible.
replay_queues: Le dict global `_replay_queues` partagé par le serveur.
variables: Variables à substituer dans les actions.
lock: Verrou optionnel à acquérir avant d'écrire (threadsafe).
replace: Si True (défaut), remplace la queue existante. Sinon, append.
id_prefix: Préfixe pour les action_id générés.
Returns:
La liste des actions injectées (après substitution).
"""
actions = execution_plan_to_actions(
plan=plan, variables=variables, id_prefix=id_prefix,
)
def _write() -> None:
if replace:
replay_queues[session_id] = list(actions)
else:
replay_queues[session_id].extend(actions)
if lock is not None:
with lock:
_write()
else:
_write()
logger.info(
"inject_plan_into_queue(%s) : %d actions injectées dans la queue "
"de la session '%s' (replace=%s)",
plan.plan_id, len(actions), session_id, replace,
)
return actions