fix: contrôle strict des étapes + routage par machine_id

Corrections critiques après test E2E qui montrait des clics au mauvais endroit :

1. Routage par machine_id (api_stream.py)
   Quand 2 machines partagent le même session_id (agent_demo_user),
   les actions d'un replay pour la VM ne doivent PLUS être distribuées
   au PC physique. Vérification que le replay_state appartient bien à
   la machine qui poll avant de consommer la queue.

2. IRBuilder extrait expected_window_before/after (ir_builder.py)
   Pour chaque action click/type/key_combo, stocke le titre de la fenêtre
   au moment du clic (before) et le titre du prochain événement (after).
   Ces champs alimentent le contrôle strict au runtime.

3. ExecutionCompiler crée SuccessCondition title_match (execution_compiler.py)
   Quand expected_window_after est défini, crée une condition de succès
   STRICTE avec method="title_match" et expected_title. Plus de simple
   "l'écran a changé" — on vérifie la fenêtre résultante.

4. Runner propage expected_window_before et success_strict
   Le flag success_strict indique à l'agent que le contrôle post-action
   DOIT être strict (STOP sur mismatch au lieu de warning).

5. UIA strict sur parent_path (executor.py)
   _resolve_via_uia_local REJETTE un match si l'élément trouvé n'est pas
   dans la bonne fenêtre parente (évite ex: "Rechercher" taskbar confondu
   avec "Rechercher" explorateur).

6. Pré/post vérif stricte et bloquante (executor.py)
   - expected_window_before lu en priorité depuis l'action (plan V4)
   - Post-vérif : si success_strict=True et timeout, result.success=False
     → le replay s'arrête au lieu de continuer avec des warnings.

Validé sur la VM :
- Le replay s'arrête proprement quand l'étape 2 aboutit dans "Propriétés de
  Internet" au lieu de "blocnote.txt - Bloc-notes"
- Plus de clics en aveugle / saisie au mauvais endroit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-10 14:05:23 +02:00
parent 56e3cc052a
commit cecdf417b7
7 changed files with 213 additions and 18 deletions

View File

@@ -277,8 +277,12 @@ class ActionExecutorV1:
On appelle le helper Rust qui interroge UIAutomationCore.dll et
retourne les coordonnées pixel-perfect de l'élément.
Retourne (x_pct, y_pct) si trouvé, None sinon.
Le fallback vers le grounding serveur est géré par l'appelant.
STRICT : si l'élément trouvé n'appartient pas à la bonne fenêtre
parente (comparaison du parent_path), on REFUSE — sinon on clique
au mauvais endroit (ex: 'Rechercher' de la taskbar au lieu de
l'explorateur).
Retourne (x_pct, y_pct) si trouvé ET validé, None sinon.
"""
try:
from .uia_helper import get_shared_helper
@@ -289,6 +293,7 @@ class ActionExecutorV1:
name = uia_target.get("name", "")
control_type = uia_target.get("control_type", "") or None
automation_id = uia_target.get("automation_id", "") or None
expected_parent_path = uia_target.get("parent_path", []) or []
if not name:
return None
@@ -300,8 +305,41 @@ class ActionExecutorV1:
timeout_ms=1500,
)
if element is None or not element.is_clickable():
logger.debug(f"UIA: '{name}' non trouvé ou non cliquable")
return None
# ── VÉRIFICATION STRICTE du parent_path ──
# Si l'élément a été enregistré dans une fenêtre spécifique,
# il doit être trouvé dans la MÊME fenêtre au replay.
# Sinon on clique sur un homonyme dans une autre app.
if expected_parent_path:
expected_root = None
for p in expected_parent_path:
if p.get("control_type", "").lower() in ("fenêtre", "window"):
expected_root = p.get("name", "").strip()
break
if expected_root:
found_root = None
for p in element.parent_path:
if p.get("control_type", "").lower() in ("fenêtre", "window"):
found_root = p.get("name", "").strip()
break
if found_root and expected_root != found_root:
# Match souple : une sous-partie commune (ex: "Bloc-notes")
if (expected_root.lower() not in found_root.lower()
and found_root.lower() not in expected_root.lower()):
logger.warning(
f"UIA REJET : '{name}' trouvé dans '{found_root}' "
f"mais attendu dans '{expected_root}'"
)
print(
f" [UIA] REJET — '{name}' trouvé dans mauvaise fenêtre "
f"({found_root}{expected_root})"
)
return None
cx, cy = element.center()
if screen_width <= 0 or screen_height <= 0:
return None
@@ -479,7 +517,13 @@ class ActionExecutorV1:
# ── Pré-vérification : titre fenêtre ──
# Vérifier que l'écran est dans l'état attendu AVANT de cliquer.
if visual_mode and target_spec:
expected_title = target_spec.get("window_title", "")
# Le champ explicite `expected_window_before` a priorité
# (il vient du plan V4 et indique la fenêtre STRICTEMENT
# attendue avant l'action). Sinon fallback sur target_spec.
expected_title = (
action.get("expected_window_before", "")
or target_spec.get("window_title", "")
)
if expected_title and expected_title != "unknown_window":
from ..window_info_crossplatform import get_active_window_info
current_info = get_active_window_info()
@@ -728,7 +772,28 @@ class ActionExecutorV1:
else:
print(f" [POST-VÉRIF] TIMEOUT {max_wait}s — '{post_title}''{expected_after}'")
logger.warning(f"POST-VÉRIF TIMEOUT : '{post_title}''{expected_after}'")
result["warning"] = f"post_verif_timeout:{post_title}"
# Contrôle strict : si success_strict, on STOP.
# Sinon on continue avec un warning (legacy).
is_strict = bool(action.get("success_strict"))
if is_strict:
result["success"] = False
result["error"] = (
f"Post-vérif échouée : fenêtre '{post_title}' "
f"au lieu de '{expected_after}'"
)
print(
f" [POST-VÉRIF] STOP STRICT — l'étape ne s'est "
f"pas déroulée comme prévu, arrêt du replay"
)
try:
self.notifier.replay_wrong_window(
post_title, expected_after,
)
except Exception:
pass
return result
else:
result["warning"] = f"post_verif_timeout:{post_title}"
else:
print(f" [CLICK] Terminé.")

View File

@@ -2594,10 +2594,35 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
"replay_id": state["replay_id"],
}
queue = _replay_queues.get(session_id, [])
# CRITIQUE : vérifier que la queue appartient BIEN à cette machine.
# Quand 2 machines partagent le même session_id (ex: agent_demo_user),
# il faut s'assurer qu'elles ne volent PAS les actions l'une de l'autre.
# Un replay est lié à UNE machine_id spécifique via replay_states.
# On cherche d'abord si cette machine a un replay actif qui lui est propre.
queue = []
owning_replay = None
for state in _replay_states.values():
if (state.get("machine_id") == machine_id
and state.get("status") == "running"
and state.get("session_id") == session_id):
owning_replay = state
break
if owning_replay:
# Cette machine a un replay actif → consommer sa queue
queue = _replay_queues.get(session_id, [])
else:
# Pas de replay pour cette machine sur cette session → NE RIEN DISTRIBUER
# Même si _replay_queues[session_id] contient des actions, elles
# appartiennent à une autre machine.
queue = []
# Log seulement quand il y a des actions à distribuer
if queue:
logger.info(f"[REPLAY-QUEUE] session={session_id}, actions_en_attente={len(queue)}")
logger.info(
f"[REPLAY-QUEUE] session={session_id}, machine={machine_id}, "
f"actions_en_attente={len(queue)}"
)
if not queue and machine_id != "default":
# Lookup 1 : machine_replay_target (mapping explicite POST /replay)
@@ -2605,14 +2630,21 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
if target_sid and target_sid != session_id:
target_queue = _replay_queues.get(target_sid, [])
if target_queue:
queue = target_queue
_replay_queues[session_id] = target_queue
del _replay_queues[target_sid]
# Vérifier que le replay_state ciblé concerne BIEN cette machine
target_state = None
for state in _replay_states.values():
if state["session_id"] == target_sid and state["status"] == "running":
state["session_id"] = session_id
_machine_replay_target[machine_id] = session_id
logger.info(f"Replay machine-target: {machine_id} -> {target_sid} -> {session_id}")
if (state.get("session_id") == target_sid
and state.get("machine_id") == machine_id
and state["status"] == "running"):
target_state = state
break
if target_state:
queue = target_queue
_replay_queues[session_id] = target_queue
del _replay_queues[target_sid]
target_state["session_id"] = session_id
_machine_replay_target[machine_id] = session_id
logger.info(f"Replay machine-target: {machine_id} -> {target_sid} -> {session_id}")
# Lookup 2 : chercher dans les replay_states actifs pour cette machine
if not queue:

View File

@@ -238,10 +238,21 @@ def execution_node_to_action(
intent=node.intent,
)
# Titre fenêtre attendu (pour la vérification post-action / pre-check)
# Titre fenêtre attendu AVANT (pré-vérif stricte)
# Si absent, aucune pré-vérif → l'action s'exécute quel que soit l'écran
if node.expected_window_before:
action["expected_window_before"] = node.expected_window_before
target_spec["window_title"] = node.expected_window_before
# Titre fenêtre attendu APRÈS (post-vérif stricte)
# C'est la garantie de passage à l'action suivante
if node.success_condition and node.success_condition.expected_title:
action["expected_window_title"] = node.success_condition.expected_title
target_spec.setdefault("window_title", node.success_condition.expected_title)
action["success_strict"] = (
node.success_condition.method == "title_match"
)
if "window_title" not in target_spec:
target_spec["window_title"] = node.success_condition.expected_title
if target_spec:
action["target_spec"] = target_spec

View File

@@ -180,13 +180,30 @@ class ExecutionCompiler:
node.max_retries = default_click_retries
node.recovery_action = "escape"
# Condition de succès basée sur la postcondition
if step.postcondition:
# Condition de succès STRICTE basée sur le titre de fenêtre attendu.
# Si expected_window_after est défini, on fait du title_match (strict).
# Sinon on retombe sur screen_changed (faible).
expected_after = getattr(action, "expected_window_after", "")
if expected_after and expected_after != "unknown_window":
node.success_condition = SuccessCondition(
method="title_match",
expected_title=expected_after,
description=step.postcondition or f"Fenêtre attendue: {expected_after}",
)
elif step.postcondition:
node.success_condition = SuccessCondition(
method="screen_changed",
description=step.postcondition,
)
# Pré-condition stricte : la fenêtre AVANT le clic doit matcher
# Stockée en tant que champ dédié sur le nœud pour l'exécuteur
expected_before = getattr(action, "expected_window_before", "")
if expected_before and expected_before != "unknown_window":
# On l'injecte dans la condition de succès (cas "avant")
# Le nœud portera les deux via des champs séparés
node.expected_window_before = expected_before
elif action.type == "type":
node.text = action.text
node.variable_name = action.text.strip("{}") if action.variable else ""

View File

@@ -133,6 +133,9 @@ class ExecutionNode:
# Vérification
success_condition: Optional[SuccessCondition] = None
# Contrôle strict de fenêtre (pré-condition)
expected_window_before: str = "" # La fenêtre active doit matcher AVANT l'action
# Recovery
recovery_action: str = "escape" # "escape", "undo", "close", "none"
@@ -163,6 +166,8 @@ class ExecutionNode:
d["max_retries"] = self.max_retries
if self.success_condition:
d["success_condition"] = self.success_condition.to_dict()
if self.expected_window_before:
d["expected_window_before"] = self.expected_window_before
d["recovery_action"] = self.recovery_action
if self.is_optional:
d["is_optional"] = True
@@ -187,6 +192,7 @@ class ExecutionNode:
max_retries=d.get("max_retries", 1),
retry_delay_ms=d.get("retry_delay_ms", 2000),
success_condition=success,
expected_window_before=d.get("expected_window_before", ""),
recovery_action=d.get("recovery_action", "escape"),
step_id=d.get("step_id", ""),
is_optional=d.get("is_optional", False),

View File

@@ -103,7 +103,12 @@ class IRBuilder:
)
ir.steps.append(step)
# 5. Détecter les variables
# 5. Contrôle strict : remplir expected_window_before/after pour chaque action
# C'est la clé de la robustesse : chaque action sait dans quelle fenêtre
# elle doit s'exécuter ET dans quelle fenêtre elle doit aboutir.
self._attach_window_expectations(ir, actionable)
# 6. Détecter les variables
ir.variables = self._detect_variables(ir.steps, actionable)
elapsed = time.time() - t_start
@@ -125,6 +130,55 @@ class IRBuilder:
result.append(evt)
return result
def _attach_window_expectations(self, ir: WorkflowIR, events: List[Dict]) -> None:
"""Remplir expected_window_before/after pour chaque action du workflow.
C'est LA clé du contrôle strict : chaque action connaît la fenêtre
dans laquelle elle doit s'exécuter ET celle qui doit apparaître
après. Toute divergence au replay → STOP immédiat.
On reconstruit la séquence d'événements "actionables" (clicks, type,
key_combo) et on aligne chaque Action du workflow sur son événement
source pour récupérer :
- expected_window_before : titre de la fenêtre AU MOMENT du clic
- expected_window_after : titre de la fenêtre du PROCHAIN click
Les fenêtres génériques (unknown_window, vide) sont ignorées.
"""
# Extraire la séquence des événements actionables avec leurs titres
event_sequence: List[Dict[str, Any]] = []
for evt in events:
t = evt.get("type", "")
if t not in ("mouse_click", "text_input", "key_combo", "key_press", "scroll"):
continue
title = evt.get("window", {}).get("title", "") or ""
event_sequence.append({"type": t, "title": title})
# Aligner avec les actions du workflow
# Les actions dans le workflow sont dans le même ordre que les événements
flat_actions: List[tuple] = [] # (step_idx, action_idx, action)
for si, step in enumerate(ir.steps):
for ai, action in enumerate(step.actions):
if action.type in ("click", "type", "key_combo"):
flat_actions.append((si, ai, action))
# Limite : on prend le min entre les 2 listes
n = min(len(flat_actions), len(event_sequence))
for i in range(n):
si, ai, action = flat_actions[i]
title_now = event_sequence[i]["title"]
if title_now and title_now != "unknown_window":
action.expected_window_before = title_now
# Chercher le prochain événement avec un titre valide
for j in range(i + 1, len(event_sequence)):
next_title = event_sequence[j]["title"]
if next_title and next_title != "unknown_window":
# Ne pas mettre "after" si c'est le même titre (pas de transition)
action.expected_window_after = next_title
break
def _detect_applications(self, events: List[Dict]) -> List[str]:
"""Détecter les applications utilisées."""
apps = set()

View File

@@ -75,6 +75,12 @@ class Action:
duration_ms: int = 0 # Durée (pour wait)
variable: bool = False # True si le texte contient une variable {var}
anchor_hint: str = "" # Indice visuel pour aider la résolution
# Contrôle strict des étapes — l'action ne peut s'exécuter que si la fenêtre
# active correspond à `expected_window_before`, et ne peut passer à la
# suivante que si la fenêtre résultante correspond à `expected_window_after`.
# Ces champs sont extraits par l'IRBuilder depuis les événements bruts.
expected_window_before: str = ""
expected_window_after: str = ""
def to_dict(self) -> Dict[str, Any]:
d = {"type": self.type}
@@ -90,6 +96,10 @@ class Action:
d["variable"] = True
if self.anchor_hint:
d["anchor_hint"] = self.anchor_hint
if self.expected_window_before:
d["expected_window_before"] = self.expected_window_before
if self.expected_window_after:
d["expected_window_after"] = self.expected_window_after
return d
@classmethod