test(e2e): harness replay reproductible — mock client Léa V1 contre serveur réel
Some checks failed
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 13s
tests / Tests sécurité (critique) (push) Has been skipped

Réduit le cycle debug d'un workflow de 1-2 min (replay manuel via
Windows + Léa V1 + maquette) à ~2-5s (mock client Linux contre
serveur de streaming localhost:5005). 30-60× plus rapide.

Architecture :
- tools/test_replay_e2e.py — harness CLI (~580 lignes), reproduit la
  chaîne réelle : VWB /api/v3/execute-windows → streaming /replay/raw
  → boucle /replay/next côté harness avec resolve_target sur un
  screenshot fixture → POST /replay/result. Pas de modification serveur.
- tests/e2e/test_urgence_aiva_demo.py — wrapper pytest (smoke).
- tests/e2e/urgence_aiva_demo_expected.yaml — référence générée par
  --export-expected, pour comparaison régression auto.
- pytest.ini — ajout du marqueur e2e.

Usage :
    python tools/test_replay_e2e.py --execution-mode autonomous --max-iter 120 --verbose
    python tools/test_replay_e2e.py --single-step 8 --shot <heartbeat>.png
    python tools/test_replay_e2e.py --expected tests/e2e/urgence_aiva_demo_expected.yaml
    pytest tests/e2e -v -m e2e

Sortie : tableau Markdown step × méthode × score × pos × status × diag.

Limitations connues (extensions post-démo) :
- Une seule fixture screenshot pour tout le replay → click_anchor réalistes
  échouent dès qu'on dépasse l'écran fixture. Carte step_id → fixture à venir.
- extract_text/table/t2a_decision exécutés côté serveur, observables mais
  pas modifiables.
- Pas de simulation screenshot_after → ReplayVerifier (Critic VLM) ne tourne pas.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-07 22:11:07 +02:00
parent 7847a0e829
commit 35fd6cf4c5
5 changed files with 1012 additions and 0 deletions

View File

@@ -27,6 +27,7 @@ markers =
fiche9: Tests Fiche #9 (postconditions retry backoff) fiche9: Tests Fiche #9 (postconditions retry backoff)
fiche10: Tests Fiche #10 (precision metrics engine) fiche10: Tests Fiche #10 (precision metrics engine)
visual: Tests visuels sur captures réelles (nécessite serveur GPU) visual: Tests visuels sur captures réelles (nécessite serveur GPU)
e2e: Tests E2E contre serveurs (streaming + VWB) actifs — lents, à lancer manuellement
# Note: Chemins Python gérés par tests/conftest.py # Note: Chemins Python gérés par tests/conftest.py

0
tests/e2e/__init__.py Normal file
View File

View File

@@ -0,0 +1,118 @@
"""Tests E2E du workflow Urgence_aiva_demo via le harness mock client.
Marqueurs : @pytest.mark.e2e @pytest.mark.slow
Pré-requis : streaming server (5005) + VWB (5002) actifs.
Lancement :
pytest tests/e2e -v -m e2e
Le test est un smoke check : il vérifie qu'on arrive à lancer un replay,
poller les actions et que le harness termine sans crash. Il n'exige PAS
que tous les steps réussissent (le screenshot fixture peut être obsolète).
"""
from __future__ import annotations
from pathlib import Path
import pytest
import requests
from tools.test_replay_e2e import (
ReplayMockClient,
_find_latest_heartbeat,
_load_token,
DEFAULT_BASE_URL,
DEFAULT_VWB_URL,
)
WORKFLOW_ID = "wf_a38aeebea5e6_1778162737" # Urgence_aiva_demo
def _server_alive(url: str, timeout: float = 2.0) -> bool:
try:
resp = requests.get(f"{url}/health", timeout=timeout)
return resp.status_code == 200
except Exception:
return False
def _vwb_alive(url: str, timeout: float = 2.0) -> bool:
try:
# VWB n'a pas /health, on tape /api/v3/session/state
resp = requests.get(f"{url}/api/v3/session/state", timeout=timeout)
return resp.status_code in (200, 404)
except Exception:
return False
@pytest.fixture(scope="module")
def streaming_url() -> str:
if not _server_alive(DEFAULT_BASE_URL):
pytest.skip(f"Streaming server inactif sur {DEFAULT_BASE_URL}")
return DEFAULT_BASE_URL
@pytest.fixture(scope="module")
def vwb_url() -> str:
if not _vwb_alive(DEFAULT_VWB_URL):
pytest.skip(f"VWB backend inactif sur {DEFAULT_VWB_URL}")
return DEFAULT_VWB_URL
@pytest.fixture(scope="module")
def heartbeat() -> str:
path = _find_latest_heartbeat()
if not path or not Path(path).exists():
pytest.skip("Aucun heartbeat fixture disponible sur disque")
return path
@pytest.mark.e2e
@pytest.mark.slow
def test_urgence_aiva_demo_smoke(streaming_url, vwb_url, heartbeat):
"""Smoke : lance et déroule le workflow Urgence_aiva_demo via le harness.
Vérifie que :
- le harness peut compiler et lancer le replay (pas d'exception réseau)
- au moins quelques steps sont reportés (la chaîne tourne)
- aucune exception non gérée n'est levée
"""
import time as _time
import uuid as _uuid
ts = _time.strftime("%Y%m%dT%H%M%S")
client = ReplayMockClient(
base_url=streaming_url,
vwb_url=vwb_url,
token=_load_token(),
session_id=f"test_e2e_pytest_{ts}_{_uuid.uuid4().hex[:6]}",
machine_id=f"test_e2e_pytest_machine_{ts}",
screenshot_path=heartbeat,
verbose=False,
auto_resume=True,
execution_mode="autonomous",
timeout_poll=10.0,
single_step=None,
max_iter=80,
)
try:
client.cancel_stale_replays()
client.register_session()
info = client.start_replay(WORKFLOW_ID)
assert info.get("replay_id"), f"replay_id absent : {info}"
assert info.get("total_actions", 0) > 0
client.run()
finally:
try:
client.cancel_replay()
except Exception:
pass
# Le harness doit avoir produit au moins quelques rapports
assert len(client.reports) > 0, "Aucune action reportée — harness cassé ?"
# Le 1er step est un wait synthétique injecté par VWB → doit être OK
first = client.reports[0]
assert first.action_type == "wait", f"1er step inattendu : {first}"
assert first.status == "OK"

View File

@@ -0,0 +1,81 @@
workflow_session_id: test_e2e_sess_20260507T220822_c91f30
screenshot: /home/dom/ai/rpa_vision_v3/data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png
steps:
- order: 1
action_id: wait_before_start
action_type: wait
by_text: ''
method: simulated
score: 0.0
x_pct: null
y_pct: null
status: OK
diag: wait simulé
elapsed_ms: 1.013040542602539
- order: 2
action_id: replay_free_74c2d90b
action_type: pause:user_request
by_text: ''
method: ''
score: 0.0
x_pct: null
y_pct: null
status: PAUSED
diag: 'Léa : j''ai trouvé ces dossiers : []. Pour la démo je vais traiter MOREL
Catherin'
elapsed_ms: 0.0
- order: 3
action_id: step_288d0bceea90_1778162737752
action_type: click
by_text: '25003284'
method: fallback
score: 0.0
x_pct: 0.5
y_pct: 0.5
status: FAIL
diag: template_matching_failed
elapsed_ms: 1064.7194385528564
- order: 4
action_id: step_288d0bceea90_1778162737752_retry1
action_type: click
by_text: '25003284'
method: fallback
score: 0.0
x_pct: 0.5
y_pct: 0.5
status: FAIL
diag: template_matching_failed
elapsed_ms: 1075.0248432159424
- order: 5
action_id: wait_retry_381c1b
action_type: wait
by_text: ''
method: simulated
score: 0.0
x_pct: null
y_pct: null
status: OK
diag: wait simulé
elapsed_ms: 12.79759407043457
- order: 6
action_id: step_288d0bceea90_1778162737752_retry2
action_type: click
by_text: '25003284'
method: fallback
score: 0.0
x_pct: 0.5
y_pct: 0.5
status: FAIL
diag: template_matching_failed
elapsed_ms: 1037.236213684082
- order: 7
action_id: step_288d0bceea90_1778162737752_retry3
action_type: click
by_text: '25003284'
method: fallback
score: 0.0
x_pct: 0.5
y_pct: 0.5
status: FAIL
diag: template_matching_failed
elapsed_ms: 1051.6366958618164

812
tools/test_replay_e2e.py Normal file
View File

@@ -0,0 +1,812 @@
#!/usr/bin/env python3
"""Harness E2E pour tester un replay sans Léa V1 / Windows.
Mocque le client Léa V1 contre le serveur de streaming réel (port 5005).
Le harness compile le workflow via VWB (port 5002, /api/v3/execute-windows)
exactement comme le frontend, puis prend la place de l'agent Windows :
- boucle GET /replay/next (poll)
- résout les actions click_anchor via POST /replay/resolve_target avec un
screenshot fixture (heartbeat sur disque)
- POST /replay/result avec succès/échec
- gère pause_for_human (auto-resume ou stop selon mode)
- imprime un tableau Markdown des résolutions et compare à un YAML d'attendus
Permet d'itérer en quelques secondes (vs 1-2 min de replay Windows réel) sur :
- modifications serveur (resolve_engine, replay_engine, validation OCR…)
- robustesse de la cascade visuelle sur un screenshot donné
- cas d'erreur (target_not_found, pause supervisée, retry).
Usage standard (workflow Urgence_aiva_demo, screenshot le plus récent) :
cd /home/dom/ai/rpa_vision_v3 && source .venv/bin/activate
python tools/test_replay_e2e.py \\
--workflow-id wf_a38aeebea5e6_1778162737 \\
--shot data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png
Options :
--workflow-id ID workflow à rejouer (default Urgence_aiva_demo)
--shot PATH screenshot fixture (default: dernier heartbeat trouvé)
--expected YAML fichier attendus (compare step par step)
--export-expected PATH exporter le run en YAML/JSON d'attendus
--auto-resume auto-acquitter pause_for_human
--execution-mode autonomous|supervised (par défaut: autonomous)
--single-step N (debug) ne lancer que les N premières actions
--verbose logs détaillés HTTP
--timeout-poll SECONDS timeout par poll (default 8s)
--max-iter N garde-fou (default 200)
--vwb-url URL URL VWB (default http://localhost:5002)
Sortie :
- tableau Markdown récapitulatif
- exit code 0 si tous les steps OK / 1 sinon
Ne dépend PAS de Windows, ne modifie aucun fichier serveur.
Pré-requis : streaming server (5005) + VWB backend (5002) actifs.
"""
from __future__ import annotations
import argparse
import base64
import glob
import json
import os
import sys
import time
import uuid
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
# YAML est optionnel : si absent, on génère du JSON pour l'export d'attendus
try:
import yaml as _yaml
except ImportError:
_yaml = None
# ==========================================================================
# Configuration par défaut
# ==========================================================================
ROOT = Path(__file__).resolve().parent.parent
ENV_FILE = ROOT / ".env.local"
DEFAULT_BASE_URL = "http://localhost:5005"
DEFAULT_VWB_URL = "http://localhost:5002"
DEFAULT_HEARTBEAT_GLOB = str(
ROOT / "data" / "training" / "live_sessions" / "*" / "shots" / "heartbeat_*.png"
)
DEFAULT_HEARTBEAT_GLOB_BG = str(
ROOT / "data" / "training" / "live_sessions" / "bg_*" / "shots" / "heartbeat_*.png"
)
def _load_token() -> str:
"""Lit RPA_API_TOKEN depuis l'env ou .env.local."""
if "RPA_API_TOKEN" in os.environ and os.environ["RPA_API_TOKEN"]:
return os.environ["RPA_API_TOKEN"]
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if line.startswith("RPA_API_TOKEN="):
return line.split("=", 1)[1].strip().strip('"').strip("'")
return ""
def _find_latest_heartbeat() -> Optional[str]:
"""Cherche le dernier heartbeat sur disque utilisable comme fixture.
Préfère les heartbeats `bg_*` (capturés en arrière-plan, pleine résolution)
aux heartbeats sess_* qui peuvent être tronqués (bug mss.monitors[1]
capturant la barre des tâches, cf. resolve_engine.py).
Filtre aussi sur la taille minimale (1200x800) pour ignorer les crops.
"""
from PIL import Image
def _is_full_size(path: str) -> bool:
try:
with Image.open(path) as im:
return im.width >= 1200 and im.height >= 800
except Exception:
return False
# 1. Chercher d'abord dans bg_*
bg_candidates = [
f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB_BG)
if "_blurred" not in f and os.path.isfile(f)
]
bg_candidates = [f for f in bg_candidates if _is_full_size(f)]
if bg_candidates:
bg_candidates.sort(key=lambda f: os.path.getmtime(f), reverse=True)
return bg_candidates[0]
# 2. Fallback sur sess_*, mais en filtrant les tronqués
other = [
f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB)
if "_blurred" not in f and os.path.isfile(f)
]
other = [f for f in other if _is_full_size(f)]
if other:
other.sort(key=lambda f: os.path.getmtime(f), reverse=True)
return other[0]
return None
# ==========================================================================
# Modèles légers (pas d'import Pydantic pour rester rapide à charger)
# ==========================================================================
@dataclass
class StepReport:
order: int
action_id: str
action_type: str
by_text: str
method: str = ""
score: float = 0.0
x_pct: Optional[float] = None
y_pct: Optional[float] = None
status: str = "?" # OK / FAIL / SKIP / PAUSED
diag: str = ""
elapsed_ms: float = 0.0
# ==========================================================================
# Client mock
# ==========================================================================
class ReplayMockClient:
"""Simule l'Agent V1 contre le serveur de streaming."""
def __init__(
self,
base_url: str,
vwb_url: str,
token: str,
session_id: str,
machine_id: str,
screenshot_path: str,
verbose: bool = False,
auto_resume: bool = True,
execution_mode: str = "autonomous",
timeout_poll: float = 8.0,
single_step: Optional[int] = None,
max_iter: int = 200,
) -> None:
self.base_url = base_url.rstrip("/")
self.vwb_url = vwb_url.rstrip("/")
self.token = token
self.session_id = session_id
self.machine_id = machine_id
self.screenshot_path = screenshot_path
self.verbose = verbose
self.auto_resume = auto_resume
self.execution_mode = execution_mode
self.timeout_poll = timeout_poll
self.single_step = single_step
self.max_iter = max_iter
self._session = requests.Session()
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
# cache du screenshot encodé (gros)
self._screenshot_b64: Optional[str] = None
self._screen_w: int = 1920
self._screen_h: int = 1080
self._load_screenshot()
self.replay_id: Optional[str] = None
self.reports: List[StepReport] = []
self._action_counter = 0
self._resumes_done = 0
# ---- helpers ------------------------------------------------------
def _load_screenshot(self) -> None:
from PIL import Image # imported lazily
with open(self.screenshot_path, "rb") as f:
data = f.read()
self._screenshot_b64 = base64.b64encode(data).decode("ascii")
with Image.open(self.screenshot_path) as img:
self._screen_w, self._screen_h = img.size
def _log(self, msg: str) -> None:
if self.verbose:
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {msg}", flush=True)
def _post(self, path: str, json_body: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
if self.verbose:
self._log(f"POST {path} body={json.dumps(json_body)[:200]}")
resp = self._session.post(url, json=json_body, timeout=60)
if self.verbose:
self._log(f"{resp.status_code} {resp.text[:300]}")
resp.raise_for_status()
return resp.json() if resp.text else {}
def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
resp = self._session.get(url, params=params, timeout=self.timeout_poll)
resp.raise_for_status()
return resp.json() if resp.text else {}
# ---- lifecycle ----------------------------------------------------
def cancel_stale_replays(self) -> None:
"""Annule les replays running/paused pour cette machine, pour éviter les collisions."""
try:
data = self._get("/api/v1/traces/stream/replays")
except Exception as e:
self._log(f"cancel_stale: get replays échoué : {e}")
return
for r in data.get("replays", []):
if r.get("machine_id") == self.machine_id and r.get("status") in (
"running", "paused_need_help",
):
rid = r.get("replay_id")
self._log(f"cancel stale replay {rid} (status={r.get('status')})")
try:
self._post(f"/api/v1/traces/stream/replay/{rid}/cancel", {})
except Exception as e:
self._log(f"cancel {rid} échoué : {e}")
def register_session(self) -> None:
"""Enregistre la session de test côté serveur."""
# POST /register avec session_id en query (pas JSON body)
url = f"{self.base_url}/api/v1/traces/stream/register"
resp = self._session.post(
url,
params={"session_id": self.session_id, "machine_id": self.machine_id},
timeout=10,
)
resp.raise_for_status()
self._log(f"session registered : {self.session_id} (machine={self.machine_id})")
def start_replay(self, workflow_id: str) -> Dict[str, Any]:
"""Lance un replay via la chaîne réelle VWB → /replay/raw.
On reproduit ce que fait le frontend (ExecutionControls.tsx) :
1. GET /api/v3/workflow/{id} pour récupérer les steps
2. POST /api/v3/execute-windows avec actions[] + session_id/machine_id
(VWB charge les ancres, mappe les types, et POST sur /replay/raw)
"""
# 1. Récupérer le workflow et ses steps depuis VWB
wf_url = f"{self.vwb_url}/api/v3/workflow/{workflow_id}"
resp = self._session.get(wf_url, timeout=15)
resp.raise_for_status()
wf_data = resp.json()
steps = (
wf_data.get("steps")
or wf_data.get("workflow", {}).get("steps")
or []
)
if not steps:
raise RuntimeError(
f"Workflow {workflow_id} : aucune étape récupérée depuis VWB "
f"({wf_url})"
)
self._log(f"workflow {workflow_id} : {len(steps)} steps récupérées")
# 2. Construire le payload comme le frontend
actions = []
for i, step in enumerate(steps):
anchor = step.get("anchor") or {}
actions.append({
"action_id": step.get("id") or f"action_{i}",
"type": step.get("action_type"),
"parameters": step.get("parameters") or {},
"anchor_id": anchor.get("id") if anchor else step.get("anchor_id"),
"order": i,
})
# 3. POST /api/v3/execute-windows (VWB compile + forward au streaming)
execute_url = f"{self.vwb_url}/api/v3/execute-windows"
body = {
"workflow_id": workflow_id,
"session_id": self.session_id,
"machine_id": self.machine_id,
"actions": actions,
"params": {"execution_mode": self.execution_mode},
}
if self.verbose:
self._log(f"POST {execute_url} actions={len(actions)}")
resp = self._session.post(execute_url, json=body, timeout=60)
if resp.status_code != 200:
raise RuntimeError(
f"VWB execute-windows {resp.status_code} : {resp.text[:300]}"
)
data = resp.json()
self.replay_id = data.get("replay_id")
return data
def get_replay_status(self) -> Dict[str, Any]:
if not self.replay_id:
return {}
try:
return self._get(f"/api/v1/traces/stream/replay/{self.replay_id}")
except Exception:
return {}
def cancel_replay(self) -> None:
if not self.replay_id:
return
try:
self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/cancel", {})
except Exception as e:
self._log(f"cancel replay échoué : {e}")
def resume_replay(self) -> None:
"""Auto-resume une pause (mode autonomous bypass mais supervised peut bloquer)."""
if not self.replay_id:
return
# Récupérer les checks à acquitter
ack: List[str] = []
try:
state = self.get_replay_status()
for c in state.get("safety_checks") or []:
if c.get("required"):
ack.append(c.get("id"))
except Exception:
pass
body: Dict[str, Any] = {"acknowledged_check_ids": ack}
try:
self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/resume", body)
self._resumes_done += 1
self._log(f"resume OK (checks ack={ack})")
except Exception as e:
self._log(f"resume échoué : {e}")
# ---- dispatch d'actions ------------------------------------------
def resolve_target(self, target_spec: Dict[str, Any], strict: bool) -> Dict[str, Any]:
"""Appelle /replay/resolve_target côté serveur avec le screenshot fixture."""
body = {
"session_id": self.session_id,
"screenshot_b64": self._screenshot_b64 or "",
"target_spec": target_spec or {},
"fallback_x_pct": 0.5,
"fallback_y_pct": 0.5,
"screen_width": self._screen_w,
"screen_height": self._screen_h,
"strict_mode": strict,
}
return self._post("/api/v1/traces/stream/replay/resolve_target", body)
def dispatch(self, action: Dict[str, Any]) -> StepReport:
"""Simule l'exécution d'une action côté client et POST le résultat."""
self._action_counter += 1
action_id = action.get("action_id", f"unk_{self._action_counter}")
action_type = action.get("type", "?")
target_spec = action.get("target_spec") or {}
by_text = (target_spec.get("by_text") or "")[:40]
report = StepReport(
order=self._action_counter,
action_id=action_id,
action_type=action_type,
by_text=by_text,
)
t0 = time.time()
# ── Action visuelle : resolve_target puis renvoyer success ──
if action_type in ("click", "click_anchor", "double_click"):
try:
res = self.resolve_target(target_spec, strict=bool(action.get("success_strict")))
report.method = res.get("method", "?")
report.score = float(res.get("score") or 0.0)
report.x_pct = res.get("x_pct")
report.y_pct = res.get("y_pct")
resolved = bool(res.get("resolved"))
if not resolved:
report.status = "FAIL"
report.diag = res.get("reason", res.get("method", ""))[:80]
self._post_result(
action_id,
success=False,
error=f"resolve_failed:{report.method}",
actual_position=None,
resolution_method=report.method,
resolution_score=report.score,
resolution_elapsed_ms=res.get("elapsed_ms"),
target_spec=target_spec,
target_description=by_text,
)
else:
report.status = "OK"
self._post_result(
action_id,
success=True,
actual_position={"x_pct": report.x_pct, "y_pct": report.y_pct},
resolution_method=report.method,
resolution_score=report.score,
resolution_elapsed_ms=res.get("elapsed_ms"),
)
except Exception as e:
report.status = "FAIL"
report.diag = f"client_error:{e}"[:80]
self._post_result(action_id, success=False, error=str(e)[:200])
# ── Type texte / shortcut clavier / wait : on simule succès ──
elif action_type in ("type_text", "type", "keyboard_shortcut", "key_combo", "wait"):
report.status = "OK"
report.method = "simulated"
report.diag = f"{action_type} simulé"
self._post_result(action_id, success=True)
# ── Actions serveur (extract_text/table, t2a_decision) :
# ne devraient PAS arriver côté client (le serveur les exécute en
# interne dans /replay/next). On marque SKIP pour traçabilité.
elif action_type in ("extract_text", "extract_table", "t2a_decision"):
report.status = "SKIP"
report.method = "server_side"
report.diag = "(action serveur, exécutée en interne)"
else:
report.status = "OK"
report.method = "noop"
report.diag = f"action {action_type} non gérée → success simulé"
self._post_result(action_id, success=True)
report.elapsed_ms = (time.time() - t0) * 1000
self.reports.append(report)
return report
def _post_result(
self,
action_id: str,
success: bool,
error: Optional[str] = None,
warning: Optional[str] = None,
actual_position: Optional[Dict[str, float]] = None,
resolution_method: Optional[str] = None,
resolution_score: Optional[float] = None,
resolution_elapsed_ms: Optional[float] = None,
target_spec: Optional[Dict[str, Any]] = None,
target_description: Optional[str] = None,
) -> None:
body: Dict[str, Any] = {
"session_id": self.session_id,
"action_id": action_id,
"success": success,
}
if error:
body["error"] = error
if warning:
body["warning"] = warning
if actual_position:
body["actual_position"] = actual_position
if resolution_method:
body["resolution_method"] = resolution_method
if resolution_score is not None:
body["resolution_score"] = float(resolution_score)
if resolution_elapsed_ms is not None:
body["resolution_elapsed_ms"] = float(resolution_elapsed_ms)
# Pour ne pas que le verifier ouvre un Critic VLM (lent), on n'envoie
# PAS de screenshot_before/after (l'action sera marquée comme non
# vérifiée mais avancera quand même).
if target_spec:
body["target_spec"] = target_spec
if target_description:
body["target_description"] = target_description
try:
self._post("/api/v1/traces/stream/replay/result", body)
except Exception as e:
self._log(f"POST result échoué (action {action_id}) : {e}")
# ---- main loop ----------------------------------------------------
def run(self) -> None:
iter_count = 0
last_paused_logged = ""
empty_polls = 0
while iter_count < self.max_iter:
iter_count += 1
try:
resp = self._get(
"/api/v1/traces/stream/replay/next",
params={
"session_id": self.session_id,
"machine_id": self.machine_id,
},
)
except requests.exceptions.RequestException as e:
self._log(f"poll {iter_count} : erreur réseau {e}, retry dans 1s")
time.sleep(1)
continue
# Pause supervisée (paused_need_help) ?
if resp.get("replay_paused"):
msg = (resp.get("pause_message") or "")[:120]
# Distinguer pause volontaire (user_request, safety_checks) vs
# pause d'échec (target_not_found, wrong_window, system_dialog).
# Pour les pauses d'échec, l'auto-resume relance la même action
# qui échouera encore — on ne resume qu'une fois max pour ne
# pas boucler infiniment.
state = self.get_replay_status()
failed = state.get("failed_action") or {}
pause_reason = failed.get("reason") or ""
is_failure_pause = pause_reason in (
"target_not_found", "wrong_window", "system_dialog",
)
if msg != last_paused_logged:
self._log(f"PAUSE ({pause_reason or 'user'}) : {msg}")
last_paused_logged = msg
# Marquer le report comme PAUSED (une seule fois)
if not self.reports or self.reports[-1].status != "PAUSED":
self._action_counter += 1
self.reports.append(
StepReport(
order=self._action_counter,
action_id=resp.get("replay_id", "?"),
action_type=f"pause:{pause_reason or 'user'}",
by_text=(failed.get("target_description") or "")[:32],
status="PAUSED",
diag=msg[:80],
)
)
if not self.auto_resume:
self._log("--auto-resume désactivé : on stoppe.")
break
if is_failure_pause and self._resumes_done > 5:
self._log(
f"Trop de resumes ({self._resumes_done}) sur des "
f"pauses d'échec — stop pour éviter la boucle."
)
break
time.sleep(0.5)
self.resume_replay()
last_paused_logged = ""
continue
action = resp.get("action")
if action is None:
# Pas d'action en attente : peut-être terminé, peut-être server_busy
if resp.get("server_busy"):
time.sleep(0.5)
continue
state = self.get_replay_status()
status = state.get("status", "?")
if status in ("completed", "cancelled", "error", "failed"):
self._log(f"replay terminé status={status}")
break
empty_polls += 1
if empty_polls > 30: # 30 polls vides = ~30s : on lève le doute
self._log("Trop de polls vides, on stoppe.")
break
time.sleep(0.5)
continue
empty_polls = 0
self.dispatch(action)
if self.single_step is not None and self._action_counter >= self.single_step:
self._log(f"--single-step {self.single_step} atteint, stop.")
break
if iter_count >= self.max_iter:
self._log(f"WARN : max_iter ({self.max_iter}) atteint.")
# Réconciliation : récupérer les actions exécutées côté serveur
# (extract_text, extract_table, t2a_decision) qui ne sont jamais
# passées par /replay/next côté client.
try:
state = self.get_replay_status()
seen_ids = {r.action_id for r in self.reports}
for res in state.get("results") or []:
aid = res.get("action_id")
if aid in seen_ids:
continue
# Heuristique : ce sont des actions serveur non vues
ok = bool(res.get("success"))
self._action_counter += 1
self.reports.append(StepReport(
order=self._action_counter,
action_id=aid or "?",
action_type="(server)",
by_text="",
method="server_side",
status="OK" if ok else "FAIL",
diag=(res.get("error") or "")[:60],
))
except Exception as e:
self._log(f"reconciliation skipped : {e}")
# ---- rapport ------------------------------------------------------
def render_report(self) -> str:
out: List[str] = []
out.append("")
out.append("| # | Type | by_text | Méthode | Score | Pos résolue | Status | Diag |")
out.append("|----|------------------|----------------------------------|----------------------|-------|----------------------|---------|------|")
for r in self.reports:
pos = (
f"({r.x_pct:.4f}, {r.y_pct:.4f})"
if r.x_pct is not None and r.y_pct is not None
else "-"
)
score = f"{r.score:.2f}" if r.method else "-"
out.append(
f"| {r.order:<2} | {r.action_type:<16} | {r.by_text[:32]:<32} | "
f"{r.method[:20]:<20} | {score:<5} | {pos:<20} | {r.status:<7} | {r.diag[:60]} |"
)
out.append("")
return "\n".join(out)
def export_expected(self, path: Path) -> None:
"""Sérialise les résolutions actuelles comme attendus de référence."""
data = {
"workflow_session_id": self.session_id,
"screenshot": str(self.screenshot_path),
"steps": [asdict(r) for r in self.reports],
}
if path.suffix in (".yaml", ".yml") and _yaml is not None:
path.write_text(_yaml.safe_dump(data, sort_keys=False, allow_unicode=True))
else:
# fallback JSON
path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
self._log(f"Attendus exportés vers {path}")
def compare_to_expected(self, expected_path: Path) -> Tuple[int, int]:
"""Compare reports vs attendus. Retourne (matching, total)."""
if not expected_path.exists():
print(f"[expected] fichier introuvable : {expected_path}")
return (0, len(self.reports))
if expected_path.suffix in (".yaml", ".yml") and _yaml is not None:
expected = _yaml.safe_load(expected_path.read_text())
else:
expected = json.loads(expected_path.read_text())
steps = expected.get("steps") or []
ok = 0
for actual, exp in zip(self.reports, steps):
same_method = (actual.method == exp.get("method", "")) or (
actual.method.startswith("hybrid_") and exp.get("method", "").startswith("hybrid_")
)
same_status = actual.status == exp.get("status", "")
if same_method and same_status:
ok += 1
return (ok, len(steps) if steps else len(self.reports))
# ==========================================================================
# CLI
# ==========================================================================
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Harness E2E pour rejouer un workflow contre le serveur sans Léa V1."
)
parser.add_argument("--workflow-id", default="wf_a38aeebea5e6_1778162737",
help="ID du workflow (default: Urgence_aiva_demo)")
parser.add_argument("--shot", default=None,
help="Path screenshot fixture (default: dernier heartbeat)")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL,
help="URL streaming server (default 5005)")
parser.add_argument("--vwb-url", default=DEFAULT_VWB_URL,
help="URL VWB backend (default 5002)")
parser.add_argument("--token", default=None,
help="RPA_API_TOKEN (default: lit .env.local)")
parser.add_argument("--session-id", default=None,
help="(default: test_e2e_<ts>)")
parser.add_argument("--machine-id", default=None,
help="(default: test_e2e_machine_<ts>)")
parser.add_argument("--auto-resume", action="store_true",
help="auto-acquitter pause_for_human")
parser.add_argument("--no-auto-resume", action="store_true",
help="stop dès qu'une pause est rencontrée")
parser.add_argument("--execution-mode", choices=("autonomous", "supervised"),
default="autonomous")
parser.add_argument("--single-step", type=int, default=None)
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--timeout-poll", type=float, default=8.0)
parser.add_argument("--max-iter", type=int, default=200)
parser.add_argument("--export-expected", type=Path, default=None,
help="Exporter le run en YAML/JSON d'attendus")
parser.add_argument("--expected", type=Path, default=None,
help="Comparer le run à ce YAML/JSON d'attendus")
args = parser.parse_args(argv)
token = args.token or _load_token()
if not token:
print("WARN : pas de RPA_API_TOKEN trouvé.", file=sys.stderr)
shot = args.shot or _find_latest_heartbeat()
if not shot or not os.path.isfile(shot):
print(f"ERREUR : screenshot introuvable ({shot})", file=sys.stderr)
return 2
ts = time.strftime("%Y%m%dT%H%M%S")
session_id = args.session_id or f"test_e2e_sess_{ts}_{uuid.uuid4().hex[:6]}"
machine_id = args.machine_id or f"test_e2e_machine_{ts}"
auto_resume = True
if args.no_auto_resume:
auto_resume = False
if args.auto_resume:
auto_resume = True
print(f"[e2e] base_url={args.base_url}")
print(f"[e2e] workflow_id={args.workflow_id}")
print(f"[e2e] shot={shot}")
print(f"[e2e] session_id={session_id}")
print(f"[e2e] machine_id={machine_id}")
print(f"[e2e] mode={args.execution_mode} auto_resume={auto_resume}")
client = ReplayMockClient(
base_url=args.base_url,
vwb_url=args.vwb_url,
token=token,
session_id=session_id,
machine_id=machine_id,
screenshot_path=shot,
verbose=args.verbose,
auto_resume=auto_resume,
execution_mode=args.execution_mode,
timeout_poll=args.timeout_poll,
single_step=args.single_step,
max_iter=args.max_iter,
)
# Healthcheck
try:
h = requests.get(f"{args.base_url}/health", timeout=3).json()
if h.get("status") != "healthy":
print(f"WARN : serveur health={h}")
except Exception as e:
print(f"ERREUR : serveur injoignable sur {args.base_url} ({e})", file=sys.stderr)
return 3
client.cancel_stale_replays()
client.register_session()
t_start = time.time()
final_state: Dict[str, Any] = {}
try:
info = client.start_replay(args.workflow_id)
print(f"[e2e] replay_id={info.get('replay_id')} total_actions={info.get('total_actions')}")
client.run()
# Snapshot l'état AVANT cancel (sinon on voit toujours "cancelled")
try:
final_state = client.get_replay_status()
except Exception:
final_state = {}
finally:
# toujours annuler en sortie pour ne pas laisser un replay actif
try:
client.cancel_replay()
except Exception:
pass
elapsed = time.time() - t_start
print(client.render_report())
n_total = len(client.reports)
n_ok = sum(1 for r in client.reports if r.status == "OK")
n_skip = sum(1 for r in client.reports if r.status == "SKIP")
n_paused = sum(1 for r in client.reports if r.status == "PAUSED")
n_fail = sum(1 for r in client.reports if r.status == "FAIL")
print(
f"[e2e] {n_total} steps en {elapsed:.1f}s : "
f"OK={n_ok} SKIP={n_skip} PAUSED={n_paused} FAIL={n_fail} "
f"(resumes auto={client._resumes_done})"
)
if final_state:
print(
f"[e2e] final replay status={final_state.get('status')} "
f"completed={final_state.get('completed_actions')}/"
f"{final_state.get('total_actions')} "
f"failed={final_state.get('failed_actions')} "
f"retried={final_state.get('retried_actions')}"
)
for err in (final_state.get("error_log") or [])[-3:]:
print(f" ERR action_id={err.get('action_id')} "
f"error='{err.get('error')}' retry={err.get('retry_count')}")
if args.export_expected:
client.export_expected(args.export_expected)
if args.expected:
ok, total = client.compare_to_expected(args.expected)
print(f"[e2e] comparaison attendus : {ok}/{total} steps matchent")
if ok < total:
return 1
return 1 if n_fail else 0
if __name__ == "__main__":
sys.exit(main())