Files
rpa_vision_v3/tools/test_replay_e2e.py
Dom 35fd6cf4c5
Some checks failed
tests / Lint (ruff + black) (push) Successful in 14s
tests / Tests unitaires (sans GPU) (push) Failing after 13s
tests / Tests sécurité (critique) (push) Has been skipped
test(e2e): harness replay reproductible — mock client Léa V1 contre serveur réel
Réduit le cycle debug d'un workflow de 1-2 min (replay manuel via
Windows + Léa V1 + maquette) à ~2-5s (mock client Linux contre
serveur de streaming localhost:5005). 30-60× plus rapide.

Architecture :
- tools/test_replay_e2e.py — harness CLI (~580 lignes), reproduit la
  chaîne réelle : VWB /api/v3/execute-windows → streaming /replay/raw
  → boucle /replay/next côté harness avec resolve_target sur un
  screenshot fixture → POST /replay/result. Pas de modification serveur.
- tests/e2e/test_urgence_aiva_demo.py — wrapper pytest (smoke).
- tests/e2e/urgence_aiva_demo_expected.yaml — référence générée par
  --export-expected, pour comparaison régression auto.
- pytest.ini — ajout du marqueur e2e.

Usage :
    python tools/test_replay_e2e.py --execution-mode autonomous --max-iter 120 --verbose
    python tools/test_replay_e2e.py --single-step 8 --shot <heartbeat>.png
    python tools/test_replay_e2e.py --expected tests/e2e/urgence_aiva_demo_expected.yaml
    pytest tests/e2e -v -m e2e

Sortie : tableau Markdown step × méthode × score × pos × status × diag.

Limitations connues (extensions post-démo) :
- Une seule fixture screenshot pour tout le replay → click_anchor réalistes
  échouent dès qu'on dépasse l'écran fixture. Carte step_id → fixture à venir.
- extract_text/table/t2a_decision exécutés côté serveur, observables mais
  pas modifiables.
- Pas de simulation screenshot_after → ReplayVerifier (Critic VLM) ne tourne pas.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 22:11:07 +02:00

813 lines
33 KiB
Python

#!/usr/bin/env python3
"""Harness E2E pour tester un replay sans Léa V1 / Windows.
Mocque le client Léa V1 contre le serveur de streaming réel (port 5005).
Le harness compile le workflow via VWB (port 5002, /api/v3/execute-windows)
exactement comme le frontend, puis prend la place de l'agent Windows :
- boucle GET /replay/next (poll)
- résout les actions click_anchor via POST /replay/resolve_target avec un
screenshot fixture (heartbeat sur disque)
- POST /replay/result avec succès/échec
- gère pause_for_human (auto-resume ou stop selon mode)
- imprime un tableau Markdown des résolutions et compare à un YAML d'attendus
Permet d'itérer en quelques secondes (vs 1-2 min de replay Windows réel) sur :
- modifications serveur (resolve_engine, replay_engine, validation OCR…)
- robustesse de la cascade visuelle sur un screenshot donné
- cas d'erreur (target_not_found, pause supervisée, retry).
Usage standard (workflow Urgence_aiva_demo, screenshot le plus récent) :
cd /home/dom/ai/rpa_vision_v3 && source .venv/bin/activate
python tools/test_replay_e2e.py \\
--workflow-id wf_a38aeebea5e6_1778162737 \\
--shot data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png
Options :
--workflow-id ID workflow à rejouer (default Urgence_aiva_demo)
--shot PATH screenshot fixture (default: dernier heartbeat trouvé)
--expected YAML fichier attendus (compare step par step)
--export-expected PATH exporter le run en YAML/JSON d'attendus
--auto-resume auto-acquitter pause_for_human
--execution-mode autonomous|supervised (par défaut: autonomous)
--single-step N (debug) ne lancer que les N premières actions
--verbose logs détaillés HTTP
--timeout-poll SECONDS timeout par poll (default 8s)
--max-iter N garde-fou (default 200)
--vwb-url URL URL VWB (default http://localhost:5002)
Sortie :
- tableau Markdown récapitulatif
- exit code 0 si tous les steps OK / 1 sinon
Ne dépend PAS de Windows, ne modifie aucun fichier serveur.
Pré-requis : streaming server (5005) + VWB backend (5002) actifs.
"""
from __future__ import annotations
import argparse
import base64
import glob
import json
import os
import sys
import time
import uuid
from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import requests
# YAML est optionnel : si absent, on génère du JSON pour l'export d'attendus
try:
import yaml as _yaml
except ImportError:
_yaml = None
# ==========================================================================
# Configuration par défaut
# ==========================================================================
ROOT = Path(__file__).resolve().parent.parent
ENV_FILE = ROOT / ".env.local"
DEFAULT_BASE_URL = "http://localhost:5005"
DEFAULT_VWB_URL = "http://localhost:5002"
DEFAULT_HEARTBEAT_GLOB = str(
ROOT / "data" / "training" / "live_sessions" / "*" / "shots" / "heartbeat_*.png"
)
DEFAULT_HEARTBEAT_GLOB_BG = str(
ROOT / "data" / "training" / "live_sessions" / "bg_*" / "shots" / "heartbeat_*.png"
)
def _load_token() -> str:
"""Lit RPA_API_TOKEN depuis l'env ou .env.local."""
if "RPA_API_TOKEN" in os.environ and os.environ["RPA_API_TOKEN"]:
return os.environ["RPA_API_TOKEN"]
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
line = line.strip()
if line.startswith("RPA_API_TOKEN="):
return line.split("=", 1)[1].strip().strip('"').strip("'")
return ""
def _find_latest_heartbeat() -> Optional[str]:
"""Cherche le dernier heartbeat sur disque utilisable comme fixture.
Préfère les heartbeats `bg_*` (capturés en arrière-plan, pleine résolution)
aux heartbeats sess_* qui peuvent être tronqués (bug mss.monitors[1]
capturant la barre des tâches, cf. resolve_engine.py).
Filtre aussi sur la taille minimale (1200x800) pour ignorer les crops.
"""
from PIL import Image
def _is_full_size(path: str) -> bool:
try:
with Image.open(path) as im:
return im.width >= 1200 and im.height >= 800
except Exception:
return False
# 1. Chercher d'abord dans bg_*
bg_candidates = [
f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB_BG)
if "_blurred" not in f and os.path.isfile(f)
]
bg_candidates = [f for f in bg_candidates if _is_full_size(f)]
if bg_candidates:
bg_candidates.sort(key=lambda f: os.path.getmtime(f), reverse=True)
return bg_candidates[0]
# 2. Fallback sur sess_*, mais en filtrant les tronqués
other = [
f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB)
if "_blurred" not in f and os.path.isfile(f)
]
other = [f for f in other if _is_full_size(f)]
if other:
other.sort(key=lambda f: os.path.getmtime(f), reverse=True)
return other[0]
return None
# ==========================================================================
# Modèles légers (pas d'import Pydantic pour rester rapide à charger)
# ==========================================================================
@dataclass
class StepReport:
order: int
action_id: str
action_type: str
by_text: str
method: str = ""
score: float = 0.0
x_pct: Optional[float] = None
y_pct: Optional[float] = None
status: str = "?" # OK / FAIL / SKIP / PAUSED
diag: str = ""
elapsed_ms: float = 0.0
# ==========================================================================
# Client mock
# ==========================================================================
class ReplayMockClient:
"""Simule l'Agent V1 contre le serveur de streaming."""
def __init__(
self,
base_url: str,
vwb_url: str,
token: str,
session_id: str,
machine_id: str,
screenshot_path: str,
verbose: bool = False,
auto_resume: bool = True,
execution_mode: str = "autonomous",
timeout_poll: float = 8.0,
single_step: Optional[int] = None,
max_iter: int = 200,
) -> None:
self.base_url = base_url.rstrip("/")
self.vwb_url = vwb_url.rstrip("/")
self.token = token
self.session_id = session_id
self.machine_id = machine_id
self.screenshot_path = screenshot_path
self.verbose = verbose
self.auto_resume = auto_resume
self.execution_mode = execution_mode
self.timeout_poll = timeout_poll
self.single_step = single_step
self.max_iter = max_iter
self._session = requests.Session()
if token:
self._session.headers.update({"Authorization": f"Bearer {token}"})
# cache du screenshot encodé (gros)
self._screenshot_b64: Optional[str] = None
self._screen_w: int = 1920
self._screen_h: int = 1080
self._load_screenshot()
self.replay_id: Optional[str] = None
self.reports: List[StepReport] = []
self._action_counter = 0
self._resumes_done = 0
# ---- helpers ------------------------------------------------------
def _load_screenshot(self) -> None:
from PIL import Image # imported lazily
with open(self.screenshot_path, "rb") as f:
data = f.read()
self._screenshot_b64 = base64.b64encode(data).decode("ascii")
with Image.open(self.screenshot_path) as img:
self._screen_w, self._screen_h = img.size
def _log(self, msg: str) -> None:
if self.verbose:
ts = time.strftime("%H:%M:%S")
print(f"[{ts}] {msg}", flush=True)
def _post(self, path: str, json_body: Dict[str, Any]) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
if self.verbose:
self._log(f"POST {path} body={json.dumps(json_body)[:200]}")
resp = self._session.post(url, json=json_body, timeout=60)
if self.verbose:
self._log(f"{resp.status_code} {resp.text[:300]}")
resp.raise_for_status()
return resp.json() if resp.text else {}
def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
url = f"{self.base_url}{path}"
resp = self._session.get(url, params=params, timeout=self.timeout_poll)
resp.raise_for_status()
return resp.json() if resp.text else {}
# ---- lifecycle ----------------------------------------------------
def cancel_stale_replays(self) -> None:
"""Annule les replays running/paused pour cette machine, pour éviter les collisions."""
try:
data = self._get("/api/v1/traces/stream/replays")
except Exception as e:
self._log(f"cancel_stale: get replays échoué : {e}")
return
for r in data.get("replays", []):
if r.get("machine_id") == self.machine_id and r.get("status") in (
"running", "paused_need_help",
):
rid = r.get("replay_id")
self._log(f"cancel stale replay {rid} (status={r.get('status')})")
try:
self._post(f"/api/v1/traces/stream/replay/{rid}/cancel", {})
except Exception as e:
self._log(f"cancel {rid} échoué : {e}")
def register_session(self) -> None:
"""Enregistre la session de test côté serveur."""
# POST /register avec session_id en query (pas JSON body)
url = f"{self.base_url}/api/v1/traces/stream/register"
resp = self._session.post(
url,
params={"session_id": self.session_id, "machine_id": self.machine_id},
timeout=10,
)
resp.raise_for_status()
self._log(f"session registered : {self.session_id} (machine={self.machine_id})")
def start_replay(self, workflow_id: str) -> Dict[str, Any]:
"""Lance un replay via la chaîne réelle VWB → /replay/raw.
On reproduit ce que fait le frontend (ExecutionControls.tsx) :
1. GET /api/v3/workflow/{id} pour récupérer les steps
2. POST /api/v3/execute-windows avec actions[] + session_id/machine_id
(VWB charge les ancres, mappe les types, et POST sur /replay/raw)
"""
# 1. Récupérer le workflow et ses steps depuis VWB
wf_url = f"{self.vwb_url}/api/v3/workflow/{workflow_id}"
resp = self._session.get(wf_url, timeout=15)
resp.raise_for_status()
wf_data = resp.json()
steps = (
wf_data.get("steps")
or wf_data.get("workflow", {}).get("steps")
or []
)
if not steps:
raise RuntimeError(
f"Workflow {workflow_id} : aucune étape récupérée depuis VWB "
f"({wf_url})"
)
self._log(f"workflow {workflow_id} : {len(steps)} steps récupérées")
# 2. Construire le payload comme le frontend
actions = []
for i, step in enumerate(steps):
anchor = step.get("anchor") or {}
actions.append({
"action_id": step.get("id") or f"action_{i}",
"type": step.get("action_type"),
"parameters": step.get("parameters") or {},
"anchor_id": anchor.get("id") if anchor else step.get("anchor_id"),
"order": i,
})
# 3. POST /api/v3/execute-windows (VWB compile + forward au streaming)
execute_url = f"{self.vwb_url}/api/v3/execute-windows"
body = {
"workflow_id": workflow_id,
"session_id": self.session_id,
"machine_id": self.machine_id,
"actions": actions,
"params": {"execution_mode": self.execution_mode},
}
if self.verbose:
self._log(f"POST {execute_url} actions={len(actions)}")
resp = self._session.post(execute_url, json=body, timeout=60)
if resp.status_code != 200:
raise RuntimeError(
f"VWB execute-windows {resp.status_code} : {resp.text[:300]}"
)
data = resp.json()
self.replay_id = data.get("replay_id")
return data
def get_replay_status(self) -> Dict[str, Any]:
if not self.replay_id:
return {}
try:
return self._get(f"/api/v1/traces/stream/replay/{self.replay_id}")
except Exception:
return {}
def cancel_replay(self) -> None:
if not self.replay_id:
return
try:
self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/cancel", {})
except Exception as e:
self._log(f"cancel replay échoué : {e}")
def resume_replay(self) -> None:
"""Auto-resume une pause (mode autonomous bypass mais supervised peut bloquer)."""
if not self.replay_id:
return
# Récupérer les checks à acquitter
ack: List[str] = []
try:
state = self.get_replay_status()
for c in state.get("safety_checks") or []:
if c.get("required"):
ack.append(c.get("id"))
except Exception:
pass
body: Dict[str, Any] = {"acknowledged_check_ids": ack}
try:
self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/resume", body)
self._resumes_done += 1
self._log(f"resume OK (checks ack={ack})")
except Exception as e:
self._log(f"resume échoué : {e}")
# ---- dispatch d'actions ------------------------------------------
def resolve_target(self, target_spec: Dict[str, Any], strict: bool) -> Dict[str, Any]:
"""Appelle /replay/resolve_target côté serveur avec le screenshot fixture."""
body = {
"session_id": self.session_id,
"screenshot_b64": self._screenshot_b64 or "",
"target_spec": target_spec or {},
"fallback_x_pct": 0.5,
"fallback_y_pct": 0.5,
"screen_width": self._screen_w,
"screen_height": self._screen_h,
"strict_mode": strict,
}
return self._post("/api/v1/traces/stream/replay/resolve_target", body)
def dispatch(self, action: Dict[str, Any]) -> StepReport:
"""Simule l'exécution d'une action côté client et POST le résultat."""
self._action_counter += 1
action_id = action.get("action_id", f"unk_{self._action_counter}")
action_type = action.get("type", "?")
target_spec = action.get("target_spec") or {}
by_text = (target_spec.get("by_text") or "")[:40]
report = StepReport(
order=self._action_counter,
action_id=action_id,
action_type=action_type,
by_text=by_text,
)
t0 = time.time()
# ── Action visuelle : resolve_target puis renvoyer success ──
if action_type in ("click", "click_anchor", "double_click"):
try:
res = self.resolve_target(target_spec, strict=bool(action.get("success_strict")))
report.method = res.get("method", "?")
report.score = float(res.get("score") or 0.0)
report.x_pct = res.get("x_pct")
report.y_pct = res.get("y_pct")
resolved = bool(res.get("resolved"))
if not resolved:
report.status = "FAIL"
report.diag = res.get("reason", res.get("method", ""))[:80]
self._post_result(
action_id,
success=False,
error=f"resolve_failed:{report.method}",
actual_position=None,
resolution_method=report.method,
resolution_score=report.score,
resolution_elapsed_ms=res.get("elapsed_ms"),
target_spec=target_spec,
target_description=by_text,
)
else:
report.status = "OK"
self._post_result(
action_id,
success=True,
actual_position={"x_pct": report.x_pct, "y_pct": report.y_pct},
resolution_method=report.method,
resolution_score=report.score,
resolution_elapsed_ms=res.get("elapsed_ms"),
)
except Exception as e:
report.status = "FAIL"
report.diag = f"client_error:{e}"[:80]
self._post_result(action_id, success=False, error=str(e)[:200])
# ── Type texte / shortcut clavier / wait : on simule succès ──
elif action_type in ("type_text", "type", "keyboard_shortcut", "key_combo", "wait"):
report.status = "OK"
report.method = "simulated"
report.diag = f"{action_type} simulé"
self._post_result(action_id, success=True)
# ── Actions serveur (extract_text/table, t2a_decision) :
# ne devraient PAS arriver côté client (le serveur les exécute en
# interne dans /replay/next). On marque SKIP pour traçabilité.
elif action_type in ("extract_text", "extract_table", "t2a_decision"):
report.status = "SKIP"
report.method = "server_side"
report.diag = "(action serveur, exécutée en interne)"
else:
report.status = "OK"
report.method = "noop"
report.diag = f"action {action_type} non gérée → success simulé"
self._post_result(action_id, success=True)
report.elapsed_ms = (time.time() - t0) * 1000
self.reports.append(report)
return report
def _post_result(
self,
action_id: str,
success: bool,
error: Optional[str] = None,
warning: Optional[str] = None,
actual_position: Optional[Dict[str, float]] = None,
resolution_method: Optional[str] = None,
resolution_score: Optional[float] = None,
resolution_elapsed_ms: Optional[float] = None,
target_spec: Optional[Dict[str, Any]] = None,
target_description: Optional[str] = None,
) -> None:
body: Dict[str, Any] = {
"session_id": self.session_id,
"action_id": action_id,
"success": success,
}
if error:
body["error"] = error
if warning:
body["warning"] = warning
if actual_position:
body["actual_position"] = actual_position
if resolution_method:
body["resolution_method"] = resolution_method
if resolution_score is not None:
body["resolution_score"] = float(resolution_score)
if resolution_elapsed_ms is not None:
body["resolution_elapsed_ms"] = float(resolution_elapsed_ms)
# Pour ne pas que le verifier ouvre un Critic VLM (lent), on n'envoie
# PAS de screenshot_before/after (l'action sera marquée comme non
# vérifiée mais avancera quand même).
if target_spec:
body["target_spec"] = target_spec
if target_description:
body["target_description"] = target_description
try:
self._post("/api/v1/traces/stream/replay/result", body)
except Exception as e:
self._log(f"POST result échoué (action {action_id}) : {e}")
# ---- main loop ----------------------------------------------------
def run(self) -> None:
iter_count = 0
last_paused_logged = ""
empty_polls = 0
while iter_count < self.max_iter:
iter_count += 1
try:
resp = self._get(
"/api/v1/traces/stream/replay/next",
params={
"session_id": self.session_id,
"machine_id": self.machine_id,
},
)
except requests.exceptions.RequestException as e:
self._log(f"poll {iter_count} : erreur réseau {e}, retry dans 1s")
time.sleep(1)
continue
# Pause supervisée (paused_need_help) ?
if resp.get("replay_paused"):
msg = (resp.get("pause_message") or "")[:120]
# Distinguer pause volontaire (user_request, safety_checks) vs
# pause d'échec (target_not_found, wrong_window, system_dialog).
# Pour les pauses d'échec, l'auto-resume relance la même action
# qui échouera encore — on ne resume qu'une fois max pour ne
# pas boucler infiniment.
state = self.get_replay_status()
failed = state.get("failed_action") or {}
pause_reason = failed.get("reason") or ""
is_failure_pause = pause_reason in (
"target_not_found", "wrong_window", "system_dialog",
)
if msg != last_paused_logged:
self._log(f"PAUSE ({pause_reason or 'user'}) : {msg}")
last_paused_logged = msg
# Marquer le report comme PAUSED (une seule fois)
if not self.reports or self.reports[-1].status != "PAUSED":
self._action_counter += 1
self.reports.append(
StepReport(
order=self._action_counter,
action_id=resp.get("replay_id", "?"),
action_type=f"pause:{pause_reason or 'user'}",
by_text=(failed.get("target_description") or "")[:32],
status="PAUSED",
diag=msg[:80],
)
)
if not self.auto_resume:
self._log("--auto-resume désactivé : on stoppe.")
break
if is_failure_pause and self._resumes_done > 5:
self._log(
f"Trop de resumes ({self._resumes_done}) sur des "
f"pauses d'échec — stop pour éviter la boucle."
)
break
time.sleep(0.5)
self.resume_replay()
last_paused_logged = ""
continue
action = resp.get("action")
if action is None:
# Pas d'action en attente : peut-être terminé, peut-être server_busy
if resp.get("server_busy"):
time.sleep(0.5)
continue
state = self.get_replay_status()
status = state.get("status", "?")
if status in ("completed", "cancelled", "error", "failed"):
self._log(f"replay terminé status={status}")
break
empty_polls += 1
if empty_polls > 30: # 30 polls vides = ~30s : on lève le doute
self._log("Trop de polls vides, on stoppe.")
break
time.sleep(0.5)
continue
empty_polls = 0
self.dispatch(action)
if self.single_step is not None and self._action_counter >= self.single_step:
self._log(f"--single-step {self.single_step} atteint, stop.")
break
if iter_count >= self.max_iter:
self._log(f"WARN : max_iter ({self.max_iter}) atteint.")
# Réconciliation : récupérer les actions exécutées côté serveur
# (extract_text, extract_table, t2a_decision) qui ne sont jamais
# passées par /replay/next côté client.
try:
state = self.get_replay_status()
seen_ids = {r.action_id for r in self.reports}
for res in state.get("results") or []:
aid = res.get("action_id")
if aid in seen_ids:
continue
# Heuristique : ce sont des actions serveur non vues
ok = bool(res.get("success"))
self._action_counter += 1
self.reports.append(StepReport(
order=self._action_counter,
action_id=aid or "?",
action_type="(server)",
by_text="",
method="server_side",
status="OK" if ok else "FAIL",
diag=(res.get("error") or "")[:60],
))
except Exception as e:
self._log(f"reconciliation skipped : {e}")
# ---- rapport ------------------------------------------------------
def render_report(self) -> str:
out: List[str] = []
out.append("")
out.append("| # | Type | by_text | Méthode | Score | Pos résolue | Status | Diag |")
out.append("|----|------------------|----------------------------------|----------------------|-------|----------------------|---------|------|")
for r in self.reports:
pos = (
f"({r.x_pct:.4f}, {r.y_pct:.4f})"
if r.x_pct is not None and r.y_pct is not None
else "-"
)
score = f"{r.score:.2f}" if r.method else "-"
out.append(
f"| {r.order:<2} | {r.action_type:<16} | {r.by_text[:32]:<32} | "
f"{r.method[:20]:<20} | {score:<5} | {pos:<20} | {r.status:<7} | {r.diag[:60]} |"
)
out.append("")
return "\n".join(out)
def export_expected(self, path: Path) -> None:
"""Sérialise les résolutions actuelles comme attendus de référence."""
data = {
"workflow_session_id": self.session_id,
"screenshot": str(self.screenshot_path),
"steps": [asdict(r) for r in self.reports],
}
if path.suffix in (".yaml", ".yml") and _yaml is not None:
path.write_text(_yaml.safe_dump(data, sort_keys=False, allow_unicode=True))
else:
# fallback JSON
path.write_text(json.dumps(data, indent=2, ensure_ascii=False))
self._log(f"Attendus exportés vers {path}")
def compare_to_expected(self, expected_path: Path) -> Tuple[int, int]:
"""Compare reports vs attendus. Retourne (matching, total)."""
if not expected_path.exists():
print(f"[expected] fichier introuvable : {expected_path}")
return (0, len(self.reports))
if expected_path.suffix in (".yaml", ".yml") and _yaml is not None:
expected = _yaml.safe_load(expected_path.read_text())
else:
expected = json.loads(expected_path.read_text())
steps = expected.get("steps") or []
ok = 0
for actual, exp in zip(self.reports, steps):
same_method = (actual.method == exp.get("method", "")) or (
actual.method.startswith("hybrid_") and exp.get("method", "").startswith("hybrid_")
)
same_status = actual.status == exp.get("status", "")
if same_method and same_status:
ok += 1
return (ok, len(steps) if steps else len(self.reports))
# ==========================================================================
# CLI
# ==========================================================================
def main(argv: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
description="Harness E2E pour rejouer un workflow contre le serveur sans Léa V1."
)
parser.add_argument("--workflow-id", default="wf_a38aeebea5e6_1778162737",
help="ID du workflow (default: Urgence_aiva_demo)")
parser.add_argument("--shot", default=None,
help="Path screenshot fixture (default: dernier heartbeat)")
parser.add_argument("--base-url", default=DEFAULT_BASE_URL,
help="URL streaming server (default 5005)")
parser.add_argument("--vwb-url", default=DEFAULT_VWB_URL,
help="URL VWB backend (default 5002)")
parser.add_argument("--token", default=None,
help="RPA_API_TOKEN (default: lit .env.local)")
parser.add_argument("--session-id", default=None,
help="(default: test_e2e_<ts>)")
parser.add_argument("--machine-id", default=None,
help="(default: test_e2e_machine_<ts>)")
parser.add_argument("--auto-resume", action="store_true",
help="auto-acquitter pause_for_human")
parser.add_argument("--no-auto-resume", action="store_true",
help="stop dès qu'une pause est rencontrée")
parser.add_argument("--execution-mode", choices=("autonomous", "supervised"),
default="autonomous")
parser.add_argument("--single-step", type=int, default=None)
parser.add_argument("--verbose", action="store_true")
parser.add_argument("--timeout-poll", type=float, default=8.0)
parser.add_argument("--max-iter", type=int, default=200)
parser.add_argument("--export-expected", type=Path, default=None,
help="Exporter le run en YAML/JSON d'attendus")
parser.add_argument("--expected", type=Path, default=None,
help="Comparer le run à ce YAML/JSON d'attendus")
args = parser.parse_args(argv)
token = args.token or _load_token()
if not token:
print("WARN : pas de RPA_API_TOKEN trouvé.", file=sys.stderr)
shot = args.shot or _find_latest_heartbeat()
if not shot or not os.path.isfile(shot):
print(f"ERREUR : screenshot introuvable ({shot})", file=sys.stderr)
return 2
ts = time.strftime("%Y%m%dT%H%M%S")
session_id = args.session_id or f"test_e2e_sess_{ts}_{uuid.uuid4().hex[:6]}"
machine_id = args.machine_id or f"test_e2e_machine_{ts}"
auto_resume = True
if args.no_auto_resume:
auto_resume = False
if args.auto_resume:
auto_resume = True
print(f"[e2e] base_url={args.base_url}")
print(f"[e2e] workflow_id={args.workflow_id}")
print(f"[e2e] shot={shot}")
print(f"[e2e] session_id={session_id}")
print(f"[e2e] machine_id={machine_id}")
print(f"[e2e] mode={args.execution_mode} auto_resume={auto_resume}")
client = ReplayMockClient(
base_url=args.base_url,
vwb_url=args.vwb_url,
token=token,
session_id=session_id,
machine_id=machine_id,
screenshot_path=shot,
verbose=args.verbose,
auto_resume=auto_resume,
execution_mode=args.execution_mode,
timeout_poll=args.timeout_poll,
single_step=args.single_step,
max_iter=args.max_iter,
)
# Healthcheck
try:
h = requests.get(f"{args.base_url}/health", timeout=3).json()
if h.get("status") != "healthy":
print(f"WARN : serveur health={h}")
except Exception as e:
print(f"ERREUR : serveur injoignable sur {args.base_url} ({e})", file=sys.stderr)
return 3
client.cancel_stale_replays()
client.register_session()
t_start = time.time()
final_state: Dict[str, Any] = {}
try:
info = client.start_replay(args.workflow_id)
print(f"[e2e] replay_id={info.get('replay_id')} total_actions={info.get('total_actions')}")
client.run()
# Snapshot l'état AVANT cancel (sinon on voit toujours "cancelled")
try:
final_state = client.get_replay_status()
except Exception:
final_state = {}
finally:
# toujours annuler en sortie pour ne pas laisser un replay actif
try:
client.cancel_replay()
except Exception:
pass
elapsed = time.time() - t_start
print(client.render_report())
n_total = len(client.reports)
n_ok = sum(1 for r in client.reports if r.status == "OK")
n_skip = sum(1 for r in client.reports if r.status == "SKIP")
n_paused = sum(1 for r in client.reports if r.status == "PAUSED")
n_fail = sum(1 for r in client.reports if r.status == "FAIL")
print(
f"[e2e] {n_total} steps en {elapsed:.1f}s : "
f"OK={n_ok} SKIP={n_skip} PAUSED={n_paused} FAIL={n_fail} "
f"(resumes auto={client._resumes_done})"
)
if final_state:
print(
f"[e2e] final replay status={final_state.get('status')} "
f"completed={final_state.get('completed_actions')}/"
f"{final_state.get('total_actions')} "
f"failed={final_state.get('failed_actions')} "
f"retried={final_state.get('retried_actions')}"
)
for err in (final_state.get("error_log") or [])[-3:]:
print(f" ERR action_id={err.get('action_id')} "
f"error='{err.get('error')}' retry={err.get('retry_count')}")
if args.export_expected:
client.export_expected(args.export_expected)
if args.expected:
ok, total = client.compare_to_expected(args.expected)
print(f"[e2e] comparaison attendus : {ok}/{total} steps matchent")
if ok < total:
return 1
return 1 if n_fail else 0
if __name__ == "__main__":
sys.exit(main())