#!/usr/bin/env python3 """Harness E2E pour tester un replay sans Léa V1 / Windows. Mocque le client Léa V1 contre le serveur de streaming réel (port 5005). Le harness compile le workflow via VWB (port 5002, /api/v3/execute-windows) exactement comme le frontend, puis prend la place de l'agent Windows : - boucle GET /replay/next (poll) - résout les actions click_anchor via POST /replay/resolve_target avec un screenshot fixture (heartbeat sur disque) - POST /replay/result avec succès/échec - gère pause_for_human (auto-resume ou stop selon mode) - imprime un tableau Markdown des résolutions et compare à un YAML d'attendus Permet d'itérer en quelques secondes (vs 1-2 min de replay Windows réel) sur : - modifications serveur (resolve_engine, replay_engine, validation OCR…) - robustesse de la cascade visuelle sur un screenshot donné - cas d'erreur (target_not_found, pause supervisée, retry). Usage standard (workflow Urgence_aiva_demo, screenshot le plus récent) : cd /home/dom/ai/rpa_vision_v3 && source .venv/bin/activate python tools/test_replay_e2e.py \\ --workflow-id wf_a38aeebea5e6_1778162737 \\ --shot data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png Options : --workflow-id ID workflow à rejouer (default Urgence_aiva_demo) --shot PATH screenshot fixture (default: dernier heartbeat trouvé) --expected YAML fichier attendus (compare step par step) --export-expected PATH exporter le run en YAML/JSON d'attendus --auto-resume auto-acquitter pause_for_human --execution-mode autonomous|supervised (par défaut: autonomous) --single-step N (debug) ne lancer que les N premières actions --verbose logs détaillés HTTP --timeout-poll SECONDS timeout par poll (default 8s) --max-iter N garde-fou (default 200) --vwb-url URL URL VWB (default http://localhost:5002) Sortie : - tableau Markdown récapitulatif - exit code 0 si tous les steps OK / 1 sinon Ne dépend PAS de Windows, ne modifie aucun fichier serveur. Pré-requis : streaming server (5005) + VWB backend (5002) actifs. """ from __future__ import annotations import argparse import base64 import glob import json import os import sys import time import uuid from dataclasses import dataclass, asdict from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import requests # YAML est optionnel : si absent, on génère du JSON pour l'export d'attendus try: import yaml as _yaml except ImportError: _yaml = None # ========================================================================== # Configuration par défaut # ========================================================================== ROOT = Path(__file__).resolve().parent.parent ENV_FILE = ROOT / ".env.local" DEFAULT_BASE_URL = "http://localhost:5005" DEFAULT_VWB_URL = "http://localhost:5002" DEFAULT_HEARTBEAT_GLOB = str( ROOT / "data" / "training" / "live_sessions" / "*" / "shots" / "heartbeat_*.png" ) DEFAULT_HEARTBEAT_GLOB_BG = str( ROOT / "data" / "training" / "live_sessions" / "bg_*" / "shots" / "heartbeat_*.png" ) def _load_token() -> str: """Lit RPA_API_TOKEN depuis l'env ou .env.local.""" if "RPA_API_TOKEN" in os.environ and os.environ["RPA_API_TOKEN"]: return os.environ["RPA_API_TOKEN"] if ENV_FILE.exists(): for line in ENV_FILE.read_text().splitlines(): line = line.strip() if line.startswith("RPA_API_TOKEN="): return line.split("=", 1)[1].strip().strip('"').strip("'") return "" def _find_latest_heartbeat() -> Optional[str]: """Cherche le dernier heartbeat sur disque utilisable comme fixture. Préfère les heartbeats `bg_*` (capturés en arrière-plan, pleine résolution) aux heartbeats sess_* qui peuvent être tronqués (bug mss.monitors[1] capturant la barre des tâches, cf. resolve_engine.py). Filtre aussi sur la taille minimale (1200x800) pour ignorer les crops. """ from PIL import Image def _is_full_size(path: str) -> bool: try: with Image.open(path) as im: return im.width >= 1200 and im.height >= 800 except Exception: return False # 1. Chercher d'abord dans bg_* bg_candidates = [ f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB_BG) if "_blurred" not in f and os.path.isfile(f) ] bg_candidates = [f for f in bg_candidates if _is_full_size(f)] if bg_candidates: bg_candidates.sort(key=lambda f: os.path.getmtime(f), reverse=True) return bg_candidates[0] # 2. Fallback sur sess_*, mais en filtrant les tronqués other = [ f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB) if "_blurred" not in f and os.path.isfile(f) ] other = [f for f in other if _is_full_size(f)] if other: other.sort(key=lambda f: os.path.getmtime(f), reverse=True) return other[0] return None # ========================================================================== # Modèles légers (pas d'import Pydantic pour rester rapide à charger) # ========================================================================== @dataclass class StepReport: order: int action_id: str action_type: str by_text: str method: str = "" score: float = 0.0 x_pct: Optional[float] = None y_pct: Optional[float] = None status: str = "?" # OK / FAIL / SKIP / PAUSED diag: str = "" elapsed_ms: float = 0.0 # ========================================================================== # Client mock # ========================================================================== class ReplayMockClient: """Simule l'Agent V1 contre le serveur de streaming.""" def __init__( self, base_url: str, vwb_url: str, token: str, session_id: str, machine_id: str, screenshot_path: str, verbose: bool = False, auto_resume: bool = True, execution_mode: str = "autonomous", timeout_poll: float = 8.0, single_step: Optional[int] = None, max_iter: int = 200, ) -> None: self.base_url = base_url.rstrip("/") self.vwb_url = vwb_url.rstrip("/") self.token = token self.session_id = session_id self.machine_id = machine_id self.screenshot_path = screenshot_path self.verbose = verbose self.auto_resume = auto_resume self.execution_mode = execution_mode self.timeout_poll = timeout_poll self.single_step = single_step self.max_iter = max_iter self._session = requests.Session() if token: self._session.headers.update({"Authorization": f"Bearer {token}"}) # cache du screenshot encodé (gros) self._screenshot_b64: Optional[str] = None self._screen_w: int = 1920 self._screen_h: int = 1080 self._load_screenshot() self.replay_id: Optional[str] = None self.reports: List[StepReport] = [] self._action_counter = 0 self._resumes_done = 0 # ---- helpers ------------------------------------------------------ def _load_screenshot(self) -> None: from PIL import Image # imported lazily with open(self.screenshot_path, "rb") as f: data = f.read() self._screenshot_b64 = base64.b64encode(data).decode("ascii") with Image.open(self.screenshot_path) as img: self._screen_w, self._screen_h = img.size def _log(self, msg: str) -> None: if self.verbose: ts = time.strftime("%H:%M:%S") print(f"[{ts}] {msg}", flush=True) def _post(self, path: str, json_body: Dict[str, Any]) -> Dict[str, Any]: url = f"{self.base_url}{path}" if self.verbose: self._log(f"POST {path} body={json.dumps(json_body)[:200]}") resp = self._session.post(url, json=json_body, timeout=60) if self.verbose: self._log(f" → {resp.status_code} {resp.text[:300]}") resp.raise_for_status() return resp.json() if resp.text else {} def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: url = f"{self.base_url}{path}" resp = self._session.get(url, params=params, timeout=self.timeout_poll) resp.raise_for_status() return resp.json() if resp.text else {} # ---- lifecycle ---------------------------------------------------- def cancel_stale_replays(self) -> None: """Annule les replays running/paused pour cette machine, pour éviter les collisions.""" try: data = self._get("/api/v1/traces/stream/replays") except Exception as e: self._log(f"cancel_stale: get replays échoué : {e}") return for r in data.get("replays", []): if r.get("machine_id") == self.machine_id and r.get("status") in ( "running", "paused_need_help", ): rid = r.get("replay_id") self._log(f"cancel stale replay {rid} (status={r.get('status')})") try: self._post(f"/api/v1/traces/stream/replay/{rid}/cancel", {}) except Exception as e: self._log(f"cancel {rid} échoué : {e}") def register_session(self) -> None: """Enregistre la session de test côté serveur.""" # POST /register avec session_id en query (pas JSON body) url = f"{self.base_url}/api/v1/traces/stream/register" resp = self._session.post( url, params={"session_id": self.session_id, "machine_id": self.machine_id}, timeout=10, ) resp.raise_for_status() self._log(f"session registered : {self.session_id} (machine={self.machine_id})") def start_replay(self, workflow_id: str) -> Dict[str, Any]: """Lance un replay via la chaîne réelle VWB → /replay/raw. On reproduit ce que fait le frontend (ExecutionControls.tsx) : 1. GET /api/v3/workflow/{id} pour récupérer les steps 2. POST /api/v3/execute-windows avec actions[] + session_id/machine_id (VWB charge les ancres, mappe les types, et POST sur /replay/raw) """ # 1. Récupérer le workflow et ses steps depuis VWB wf_url = f"{self.vwb_url}/api/v3/workflow/{workflow_id}" resp = self._session.get(wf_url, timeout=15) resp.raise_for_status() wf_data = resp.json() steps = ( wf_data.get("steps") or wf_data.get("workflow", {}).get("steps") or [] ) if not steps: raise RuntimeError( f"Workflow {workflow_id} : aucune étape récupérée depuis VWB " f"({wf_url})" ) self._log(f"workflow {workflow_id} : {len(steps)} steps récupérées") # 2. Construire le payload comme le frontend actions = [] for i, step in enumerate(steps): anchor = step.get("anchor") or {} actions.append({ "action_id": step.get("id") or f"action_{i}", "type": step.get("action_type"), "parameters": step.get("parameters") or {}, "anchor_id": anchor.get("id") if anchor else step.get("anchor_id"), "order": i, }) # 3. POST /api/v3/execute-windows (VWB compile + forward au streaming) execute_url = f"{self.vwb_url}/api/v3/execute-windows" body = { "workflow_id": workflow_id, "session_id": self.session_id, "machine_id": self.machine_id, "actions": actions, "params": {"execution_mode": self.execution_mode}, } if self.verbose: self._log(f"POST {execute_url} actions={len(actions)}") resp = self._session.post(execute_url, json=body, timeout=60) if resp.status_code != 200: raise RuntimeError( f"VWB execute-windows {resp.status_code} : {resp.text[:300]}" ) data = resp.json() self.replay_id = data.get("replay_id") return data def get_replay_status(self) -> Dict[str, Any]: if not self.replay_id: return {} try: return self._get(f"/api/v1/traces/stream/replay/{self.replay_id}") except Exception: return {} def cancel_replay(self) -> None: if not self.replay_id: return try: self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/cancel", {}) except Exception as e: self._log(f"cancel replay échoué : {e}") def resume_replay(self) -> None: """Auto-resume une pause (mode autonomous bypass mais supervised peut bloquer).""" if not self.replay_id: return # Récupérer les checks à acquitter ack: List[str] = [] try: state = self.get_replay_status() for c in state.get("safety_checks") or []: if c.get("required"): ack.append(c.get("id")) except Exception: pass body: Dict[str, Any] = {"acknowledged_check_ids": ack} try: self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/resume", body) self._resumes_done += 1 self._log(f"resume OK (checks ack={ack})") except Exception as e: self._log(f"resume échoué : {e}") # ---- dispatch d'actions ------------------------------------------ def resolve_target(self, target_spec: Dict[str, Any], strict: bool) -> Dict[str, Any]: """Appelle /replay/resolve_target côté serveur avec le screenshot fixture.""" body = { "session_id": self.session_id, "screenshot_b64": self._screenshot_b64 or "", "target_spec": target_spec or {}, "fallback_x_pct": 0.5, "fallback_y_pct": 0.5, "screen_width": self._screen_w, "screen_height": self._screen_h, "strict_mode": strict, } return self._post("/api/v1/traces/stream/replay/resolve_target", body) def dispatch(self, action: Dict[str, Any]) -> StepReport: """Simule l'exécution d'une action côté client et POST le résultat.""" self._action_counter += 1 action_id = action.get("action_id", f"unk_{self._action_counter}") action_type = action.get("type", "?") target_spec = action.get("target_spec") or {} by_text = (target_spec.get("by_text") or "")[:40] report = StepReport( order=self._action_counter, action_id=action_id, action_type=action_type, by_text=by_text, ) t0 = time.time() # ── Action visuelle : resolve_target puis renvoyer success ── if action_type in ("click", "click_anchor", "double_click"): try: res = self.resolve_target(target_spec, strict=bool(action.get("success_strict"))) report.method = res.get("method", "?") report.score = float(res.get("score") or 0.0) report.x_pct = res.get("x_pct") report.y_pct = res.get("y_pct") resolved = bool(res.get("resolved")) if not resolved: report.status = "FAIL" report.diag = res.get("reason", res.get("method", ""))[:80] self._post_result( action_id, success=False, error=f"resolve_failed:{report.method}", actual_position=None, resolution_method=report.method, resolution_score=report.score, resolution_elapsed_ms=res.get("elapsed_ms"), target_spec=target_spec, target_description=by_text, ) else: report.status = "OK" self._post_result( action_id, success=True, actual_position={"x_pct": report.x_pct, "y_pct": report.y_pct}, resolution_method=report.method, resolution_score=report.score, resolution_elapsed_ms=res.get("elapsed_ms"), ) except Exception as e: report.status = "FAIL" report.diag = f"client_error:{e}"[:80] self._post_result(action_id, success=False, error=str(e)[:200]) # ── Type texte / shortcut clavier / wait : on simule succès ── elif action_type in ("type_text", "type", "keyboard_shortcut", "key_combo", "wait"): report.status = "OK" report.method = "simulated" report.diag = f"{action_type} simulé" self._post_result(action_id, success=True) # ── Actions serveur (extract_text/table, t2a_decision) : # ne devraient PAS arriver côté client (le serveur les exécute en # interne dans /replay/next). On marque SKIP pour traçabilité. elif action_type in ("extract_text", "extract_table", "t2a_decision"): report.status = "SKIP" report.method = "server_side" report.diag = "(action serveur, exécutée en interne)" else: report.status = "OK" report.method = "noop" report.diag = f"action {action_type} non gérée → success simulé" self._post_result(action_id, success=True) report.elapsed_ms = (time.time() - t0) * 1000 self.reports.append(report) return report def _post_result( self, action_id: str, success: bool, error: Optional[str] = None, warning: Optional[str] = None, actual_position: Optional[Dict[str, float]] = None, resolution_method: Optional[str] = None, resolution_score: Optional[float] = None, resolution_elapsed_ms: Optional[float] = None, target_spec: Optional[Dict[str, Any]] = None, target_description: Optional[str] = None, ) -> None: body: Dict[str, Any] = { "session_id": self.session_id, "action_id": action_id, "success": success, } if error: body["error"] = error if warning: body["warning"] = warning if actual_position: body["actual_position"] = actual_position if resolution_method: body["resolution_method"] = resolution_method if resolution_score is not None: body["resolution_score"] = float(resolution_score) if resolution_elapsed_ms is not None: body["resolution_elapsed_ms"] = float(resolution_elapsed_ms) # Pour ne pas que le verifier ouvre un Critic VLM (lent), on n'envoie # PAS de screenshot_before/after (l'action sera marquée comme non # vérifiée mais avancera quand même). if target_spec: body["target_spec"] = target_spec if target_description: body["target_description"] = target_description try: self._post("/api/v1/traces/stream/replay/result", body) except Exception as e: self._log(f"POST result échoué (action {action_id}) : {e}") # ---- main loop ---------------------------------------------------- def run(self) -> None: iter_count = 0 last_paused_logged = "" empty_polls = 0 while iter_count < self.max_iter: iter_count += 1 try: resp = self._get( "/api/v1/traces/stream/replay/next", params={ "session_id": self.session_id, "machine_id": self.machine_id, }, ) except requests.exceptions.RequestException as e: self._log(f"poll {iter_count} : erreur réseau {e}, retry dans 1s") time.sleep(1) continue # Pause supervisée (paused_need_help) ? if resp.get("replay_paused"): msg = (resp.get("pause_message") or "")[:120] # Distinguer pause volontaire (user_request, safety_checks) vs # pause d'échec (target_not_found, wrong_window, system_dialog). # Pour les pauses d'échec, l'auto-resume relance la même action # qui échouera encore — on ne resume qu'une fois max pour ne # pas boucler infiniment. state = self.get_replay_status() failed = state.get("failed_action") or {} pause_reason = failed.get("reason") or "" is_failure_pause = pause_reason in ( "target_not_found", "wrong_window", "system_dialog", ) if msg != last_paused_logged: self._log(f"PAUSE ({pause_reason or 'user'}) : {msg}") last_paused_logged = msg # Marquer le report comme PAUSED (une seule fois) if not self.reports or self.reports[-1].status != "PAUSED": self._action_counter += 1 self.reports.append( StepReport( order=self._action_counter, action_id=resp.get("replay_id", "?"), action_type=f"pause:{pause_reason or 'user'}", by_text=(failed.get("target_description") or "")[:32], status="PAUSED", diag=msg[:80], ) ) if not self.auto_resume: self._log("--auto-resume désactivé : on stoppe.") break if is_failure_pause and self._resumes_done > 5: self._log( f"Trop de resumes ({self._resumes_done}) sur des " f"pauses d'échec — stop pour éviter la boucle." ) break time.sleep(0.5) self.resume_replay() last_paused_logged = "" continue action = resp.get("action") if action is None: # Pas d'action en attente : peut-être terminé, peut-être server_busy if resp.get("server_busy"): time.sleep(0.5) continue state = self.get_replay_status() status = state.get("status", "?") if status in ("completed", "cancelled", "error", "failed"): self._log(f"replay terminé status={status}") break empty_polls += 1 if empty_polls > 30: # 30 polls vides = ~30s : on lève le doute self._log("Trop de polls vides, on stoppe.") break time.sleep(0.5) continue empty_polls = 0 self.dispatch(action) if self.single_step is not None and self._action_counter >= self.single_step: self._log(f"--single-step {self.single_step} atteint, stop.") break if iter_count >= self.max_iter: self._log(f"WARN : max_iter ({self.max_iter}) atteint.") # Réconciliation : récupérer les actions exécutées côté serveur # (extract_text, extract_table, t2a_decision) qui ne sont jamais # passées par /replay/next côté client. try: state = self.get_replay_status() seen_ids = {r.action_id for r in self.reports} for res in state.get("results") or []: aid = res.get("action_id") if aid in seen_ids: continue # Heuristique : ce sont des actions serveur non vues ok = bool(res.get("success")) self._action_counter += 1 self.reports.append(StepReport( order=self._action_counter, action_id=aid or "?", action_type="(server)", by_text="", method="server_side", status="OK" if ok else "FAIL", diag=(res.get("error") or "")[:60], )) except Exception as e: self._log(f"reconciliation skipped : {e}") # ---- rapport ------------------------------------------------------ def render_report(self) -> str: out: List[str] = [] out.append("") out.append("| # | Type | by_text | Méthode | Score | Pos résolue | Status | Diag |") out.append("|----|------------------|----------------------------------|----------------------|-------|----------------------|---------|------|") for r in self.reports: pos = ( f"({r.x_pct:.4f}, {r.y_pct:.4f})" if r.x_pct is not None and r.y_pct is not None else "-" ) score = f"{r.score:.2f}" if r.method else "-" out.append( f"| {r.order:<2} | {r.action_type:<16} | {r.by_text[:32]:<32} | " f"{r.method[:20]:<20} | {score:<5} | {pos:<20} | {r.status:<7} | {r.diag[:60]} |" ) out.append("") return "\n".join(out) def export_expected(self, path: Path) -> None: """Sérialise les résolutions actuelles comme attendus de référence.""" data = { "workflow_session_id": self.session_id, "screenshot": str(self.screenshot_path), "steps": [asdict(r) for r in self.reports], } if path.suffix in (".yaml", ".yml") and _yaml is not None: path.write_text(_yaml.safe_dump(data, sort_keys=False, allow_unicode=True)) else: # fallback JSON path.write_text(json.dumps(data, indent=2, ensure_ascii=False)) self._log(f"Attendus exportés vers {path}") def compare_to_expected(self, expected_path: Path) -> Tuple[int, int]: """Compare reports vs attendus. Retourne (matching, total).""" if not expected_path.exists(): print(f"[expected] fichier introuvable : {expected_path}") return (0, len(self.reports)) if expected_path.suffix in (".yaml", ".yml") and _yaml is not None: expected = _yaml.safe_load(expected_path.read_text()) else: expected = json.loads(expected_path.read_text()) steps = expected.get("steps") or [] ok = 0 for actual, exp in zip(self.reports, steps): same_method = (actual.method == exp.get("method", "")) or ( actual.method.startswith("hybrid_") and exp.get("method", "").startswith("hybrid_") ) same_status = actual.status == exp.get("status", "") if same_method and same_status: ok += 1 return (ok, len(steps) if steps else len(self.reports)) # ========================================================================== # CLI # ========================================================================== def main(argv: Optional[List[str]] = None) -> int: parser = argparse.ArgumentParser( description="Harness E2E pour rejouer un workflow contre le serveur sans Léa V1." ) parser.add_argument("--workflow-id", default="wf_a38aeebea5e6_1778162737", help="ID du workflow (default: Urgence_aiva_demo)") parser.add_argument("--shot", default=None, help="Path screenshot fixture (default: dernier heartbeat)") parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="URL streaming server (default 5005)") parser.add_argument("--vwb-url", default=DEFAULT_VWB_URL, help="URL VWB backend (default 5002)") parser.add_argument("--token", default=None, help="RPA_API_TOKEN (default: lit .env.local)") parser.add_argument("--session-id", default=None, help="(default: test_e2e_)") parser.add_argument("--machine-id", default=None, help="(default: test_e2e_machine_)") parser.add_argument("--auto-resume", action="store_true", help="auto-acquitter pause_for_human") parser.add_argument("--no-auto-resume", action="store_true", help="stop dès qu'une pause est rencontrée") parser.add_argument("--execution-mode", choices=("autonomous", "supervised"), default="autonomous") parser.add_argument("--single-step", type=int, default=None) parser.add_argument("--verbose", action="store_true") parser.add_argument("--timeout-poll", type=float, default=8.0) parser.add_argument("--max-iter", type=int, default=200) parser.add_argument("--export-expected", type=Path, default=None, help="Exporter le run en YAML/JSON d'attendus") parser.add_argument("--expected", type=Path, default=None, help="Comparer le run à ce YAML/JSON d'attendus") args = parser.parse_args(argv) token = args.token or _load_token() if not token: print("WARN : pas de RPA_API_TOKEN trouvé.", file=sys.stderr) shot = args.shot or _find_latest_heartbeat() if not shot or not os.path.isfile(shot): print(f"ERREUR : screenshot introuvable ({shot})", file=sys.stderr) return 2 ts = time.strftime("%Y%m%dT%H%M%S") session_id = args.session_id or f"test_e2e_sess_{ts}_{uuid.uuid4().hex[:6]}" machine_id = args.machine_id or f"test_e2e_machine_{ts}" auto_resume = True if args.no_auto_resume: auto_resume = False if args.auto_resume: auto_resume = True print(f"[e2e] base_url={args.base_url}") print(f"[e2e] workflow_id={args.workflow_id}") print(f"[e2e] shot={shot}") print(f"[e2e] session_id={session_id}") print(f"[e2e] machine_id={machine_id}") print(f"[e2e] mode={args.execution_mode} auto_resume={auto_resume}") client = ReplayMockClient( base_url=args.base_url, vwb_url=args.vwb_url, token=token, session_id=session_id, machine_id=machine_id, screenshot_path=shot, verbose=args.verbose, auto_resume=auto_resume, execution_mode=args.execution_mode, timeout_poll=args.timeout_poll, single_step=args.single_step, max_iter=args.max_iter, ) # Healthcheck try: h = requests.get(f"{args.base_url}/health", timeout=3).json() if h.get("status") != "healthy": print(f"WARN : serveur health={h}") except Exception as e: print(f"ERREUR : serveur injoignable sur {args.base_url} ({e})", file=sys.stderr) return 3 client.cancel_stale_replays() client.register_session() t_start = time.time() final_state: Dict[str, Any] = {} try: info = client.start_replay(args.workflow_id) print(f"[e2e] replay_id={info.get('replay_id')} total_actions={info.get('total_actions')}") client.run() # Snapshot l'état AVANT cancel (sinon on voit toujours "cancelled") try: final_state = client.get_replay_status() except Exception: final_state = {} finally: # toujours annuler en sortie pour ne pas laisser un replay actif try: client.cancel_replay() except Exception: pass elapsed = time.time() - t_start print(client.render_report()) n_total = len(client.reports) n_ok = sum(1 for r in client.reports if r.status == "OK") n_skip = sum(1 for r in client.reports if r.status == "SKIP") n_paused = sum(1 for r in client.reports if r.status == "PAUSED") n_fail = sum(1 for r in client.reports if r.status == "FAIL") print( f"[e2e] {n_total} steps en {elapsed:.1f}s : " f"OK={n_ok} SKIP={n_skip} PAUSED={n_paused} FAIL={n_fail} " f"(resumes auto={client._resumes_done})" ) if final_state: print( f"[e2e] final replay status={final_state.get('status')} " f"completed={final_state.get('completed_actions')}/" f"{final_state.get('total_actions')} " f"failed={final_state.get('failed_actions')} " f"retried={final_state.get('retried_actions')}" ) for err in (final_state.get("error_log") or [])[-3:]: print(f" ERR action_id={err.get('action_id')} " f"error='{err.get('error')}' retry={err.get('retry_count')}") if args.export_expected: client.export_expected(args.export_expected) if args.expected: ok, total = client.compare_to_expected(args.expected) print(f"[e2e] comparaison attendus : {ok}/{total} steps matchent") if ok < total: return 1 return 1 if n_fail else 0 if __name__ == "__main__": sys.exit(main())