From 35fd6cf4c5582ff6f0ed2947f02a4f51fc84449b Mon Sep 17 00:00:00 2001 From: Dom Date: Thu, 7 May 2026 22:11:07 +0200 Subject: [PATCH] =?UTF-8?q?test(e2e):=20harness=20replay=20reproductible?= =?UTF-8?q?=20=E2=80=94=20mock=20client=20L=C3=A9a=20V1=20contre=20serveur?= =?UTF-8?q?=20r=C3=A9el?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Réduit le cycle debug d'un workflow de 1-2 min (replay manuel via Windows + Léa V1 + maquette) à ~2-5s (mock client Linux contre serveur de streaming localhost:5005). 30-60× plus rapide. Architecture : - tools/test_replay_e2e.py — harness CLI (~580 lignes), reproduit la chaîne réelle : VWB /api/v3/execute-windows → streaming /replay/raw → boucle /replay/next côté harness avec resolve_target sur un screenshot fixture → POST /replay/result. Pas de modification serveur. - tests/e2e/test_urgence_aiva_demo.py — wrapper pytest (smoke). - tests/e2e/urgence_aiva_demo_expected.yaml — référence générée par --export-expected, pour comparaison régression auto. - pytest.ini — ajout du marqueur e2e. Usage : python tools/test_replay_e2e.py --execution-mode autonomous --max-iter 120 --verbose python tools/test_replay_e2e.py --single-step 8 --shot .png python tools/test_replay_e2e.py --expected tests/e2e/urgence_aiva_demo_expected.yaml pytest tests/e2e -v -m e2e Sortie : tableau Markdown step × méthode × score × pos × status × diag. Limitations connues (extensions post-démo) : - Une seule fixture screenshot pour tout le replay → click_anchor réalistes échouent dès qu'on dépasse l'écran fixture. Carte step_id → fixture à venir. - extract_text/table/t2a_decision exécutés côté serveur, observables mais pas modifiables. - Pas de simulation screenshot_after → ReplayVerifier (Critic VLM) ne tourne pas. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.7 (1M context) --- pytest.ini | 1 + tests/e2e/__init__.py | 0 tests/e2e/test_urgence_aiva_demo.py | 118 ++++ tests/e2e/urgence_aiva_demo_expected.yaml | 81 +++ tools/test_replay_e2e.py | 812 ++++++++++++++++++++++ 5 files changed, 1012 insertions(+) create mode 100644 tests/e2e/__init__.py create mode 100644 tests/e2e/test_urgence_aiva_demo.py create mode 100644 tests/e2e/urgence_aiva_demo_expected.yaml create mode 100644 tools/test_replay_e2e.py diff --git a/pytest.ini b/pytest.ini index 28968cc62..8e073bd49 100644 --- a/pytest.ini +++ b/pytest.ini @@ -27,6 +27,7 @@ markers = fiche9: Tests Fiche #9 (postconditions retry backoff) fiche10: Tests Fiche #10 (precision metrics engine) visual: Tests visuels sur captures réelles (nécessite serveur GPU) + e2e: Tests E2E contre serveurs (streaming + VWB) actifs — lents, à lancer manuellement # Note: Chemins Python gérés par tests/conftest.py diff --git a/tests/e2e/__init__.py b/tests/e2e/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/e2e/test_urgence_aiva_demo.py b/tests/e2e/test_urgence_aiva_demo.py new file mode 100644 index 000000000..0f5394ae0 --- /dev/null +++ b/tests/e2e/test_urgence_aiva_demo.py @@ -0,0 +1,118 @@ +"""Tests E2E du workflow Urgence_aiva_demo via le harness mock client. + +Marqueurs : @pytest.mark.e2e @pytest.mark.slow +Pré-requis : streaming server (5005) + VWB (5002) actifs. + +Lancement : + pytest tests/e2e -v -m e2e + +Le test est un smoke check : il vérifie qu'on arrive à lancer un replay, +poller les actions et que le harness termine sans crash. Il n'exige PAS +que tous les steps réussissent (le screenshot fixture peut être obsolète). +""" +from __future__ import annotations + +from pathlib import Path + +import pytest +import requests + +from tools.test_replay_e2e import ( + ReplayMockClient, + _find_latest_heartbeat, + _load_token, + DEFAULT_BASE_URL, + DEFAULT_VWB_URL, +) + +WORKFLOW_ID = "wf_a38aeebea5e6_1778162737" # Urgence_aiva_demo + + +def _server_alive(url: str, timeout: float = 2.0) -> bool: + try: + resp = requests.get(f"{url}/health", timeout=timeout) + return resp.status_code == 200 + except Exception: + return False + + +def _vwb_alive(url: str, timeout: float = 2.0) -> bool: + try: + # VWB n'a pas /health, on tape /api/v3/session/state + resp = requests.get(f"{url}/api/v3/session/state", timeout=timeout) + return resp.status_code in (200, 404) + except Exception: + return False + + +@pytest.fixture(scope="module") +def streaming_url() -> str: + if not _server_alive(DEFAULT_BASE_URL): + pytest.skip(f"Streaming server inactif sur {DEFAULT_BASE_URL}") + return DEFAULT_BASE_URL + + +@pytest.fixture(scope="module") +def vwb_url() -> str: + if not _vwb_alive(DEFAULT_VWB_URL): + pytest.skip(f"VWB backend inactif sur {DEFAULT_VWB_URL}") + return DEFAULT_VWB_URL + + +@pytest.fixture(scope="module") +def heartbeat() -> str: + path = _find_latest_heartbeat() + if not path or not Path(path).exists(): + pytest.skip("Aucun heartbeat fixture disponible sur disque") + return path + + +@pytest.mark.e2e +@pytest.mark.slow +def test_urgence_aiva_demo_smoke(streaming_url, vwb_url, heartbeat): + """Smoke : lance et déroule le workflow Urgence_aiva_demo via le harness. + + Vérifie que : + - le harness peut compiler et lancer le replay (pas d'exception réseau) + - au moins quelques steps sont reportés (la chaîne tourne) + - aucune exception non gérée n'est levée + """ + import time as _time + import uuid as _uuid + + ts = _time.strftime("%Y%m%dT%H%M%S") + client = ReplayMockClient( + base_url=streaming_url, + vwb_url=vwb_url, + token=_load_token(), + session_id=f"test_e2e_pytest_{ts}_{_uuid.uuid4().hex[:6]}", + machine_id=f"test_e2e_pytest_machine_{ts}", + screenshot_path=heartbeat, + verbose=False, + auto_resume=True, + execution_mode="autonomous", + timeout_poll=10.0, + single_step=None, + max_iter=80, + ) + + try: + client.cancel_stale_replays() + client.register_session() + info = client.start_replay(WORKFLOW_ID) + assert info.get("replay_id"), f"replay_id absent : {info}" + assert info.get("total_actions", 0) > 0 + client.run() + finally: + try: + client.cancel_replay() + except Exception: + pass + + # Le harness doit avoir produit au moins quelques rapports + assert len(client.reports) > 0, "Aucune action reportée — harness cassé ?" + + # Le 1er step est un wait synthétique injecté par VWB → doit être OK + first = client.reports[0] + assert first.action_type == "wait", f"1er step inattendu : {first}" + assert first.status == "OK" diff --git a/tests/e2e/urgence_aiva_demo_expected.yaml b/tests/e2e/urgence_aiva_demo_expected.yaml new file mode 100644 index 000000000..cc85dba2a --- /dev/null +++ b/tests/e2e/urgence_aiva_demo_expected.yaml @@ -0,0 +1,81 @@ +workflow_session_id: test_e2e_sess_20260507T220822_c91f30 +screenshot: /home/dom/ai/rpa_vision_v3/data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png +steps: +- order: 1 + action_id: wait_before_start + action_type: wait + by_text: '' + method: simulated + score: 0.0 + x_pct: null + y_pct: null + status: OK + diag: wait simulé + elapsed_ms: 1.013040542602539 +- order: 2 + action_id: replay_free_74c2d90b + action_type: pause:user_request + by_text: '' + method: '' + score: 0.0 + x_pct: null + y_pct: null + status: PAUSED + diag: 'Léa : j''ai trouvé ces dossiers : []. Pour la démo je vais traiter MOREL + Catherin' + elapsed_ms: 0.0 +- order: 3 + action_id: step_288d0bceea90_1778162737752 + action_type: click + by_text: '25003284' + method: fallback + score: 0.0 + x_pct: 0.5 + y_pct: 0.5 + status: FAIL + diag: template_matching_failed + elapsed_ms: 1064.7194385528564 +- order: 4 + action_id: step_288d0bceea90_1778162737752_retry1 + action_type: click + by_text: '25003284' + method: fallback + score: 0.0 + x_pct: 0.5 + y_pct: 0.5 + status: FAIL + diag: template_matching_failed + elapsed_ms: 1075.0248432159424 +- order: 5 + action_id: wait_retry_381c1b + action_type: wait + by_text: '' + method: simulated + score: 0.0 + x_pct: null + y_pct: null + status: OK + diag: wait simulé + elapsed_ms: 12.79759407043457 +- order: 6 + action_id: step_288d0bceea90_1778162737752_retry2 + action_type: click + by_text: '25003284' + method: fallback + score: 0.0 + x_pct: 0.5 + y_pct: 0.5 + status: FAIL + diag: template_matching_failed + elapsed_ms: 1037.236213684082 +- order: 7 + action_id: step_288d0bceea90_1778162737752_retry3 + action_type: click + by_text: '25003284' + method: fallback + score: 0.0 + x_pct: 0.5 + y_pct: 0.5 + status: FAIL + diag: template_matching_failed + elapsed_ms: 1051.6366958618164 diff --git a/tools/test_replay_e2e.py b/tools/test_replay_e2e.py new file mode 100644 index 000000000..b20fd6c5f --- /dev/null +++ b/tools/test_replay_e2e.py @@ -0,0 +1,812 @@ +#!/usr/bin/env python3 +"""Harness E2E pour tester un replay sans Léa V1 / Windows. + +Mocque le client Léa V1 contre le serveur de streaming réel (port 5005). +Le harness compile le workflow via VWB (port 5002, /api/v3/execute-windows) +exactement comme le frontend, puis prend la place de l'agent Windows : +- boucle GET /replay/next (poll) +- résout les actions click_anchor via POST /replay/resolve_target avec un + screenshot fixture (heartbeat sur disque) +- POST /replay/result avec succès/échec +- gère pause_for_human (auto-resume ou stop selon mode) +- imprime un tableau Markdown des résolutions et compare à un YAML d'attendus + +Permet d'itérer en quelques secondes (vs 1-2 min de replay Windows réel) sur : +- modifications serveur (resolve_engine, replay_engine, validation OCR…) +- robustesse de la cascade visuelle sur un screenshot donné +- cas d'erreur (target_not_found, pause supervisée, retry). + +Usage standard (workflow Urgence_aiva_demo, screenshot le plus récent) : + + cd /home/dom/ai/rpa_vision_v3 && source .venv/bin/activate + python tools/test_replay_e2e.py \\ + --workflow-id wf_a38aeebea5e6_1778162737 \\ + --shot data/training/live_sessions/bg_DESKTOP-58D5CAC_windows/shots/heartbeat_1773792436.png + +Options : + --workflow-id ID workflow à rejouer (default Urgence_aiva_demo) + --shot PATH screenshot fixture (default: dernier heartbeat trouvé) + --expected YAML fichier attendus (compare step par step) + --export-expected PATH exporter le run en YAML/JSON d'attendus + --auto-resume auto-acquitter pause_for_human + --execution-mode autonomous|supervised (par défaut: autonomous) + --single-step N (debug) ne lancer que les N premières actions + --verbose logs détaillés HTTP + --timeout-poll SECONDS timeout par poll (default 8s) + --max-iter N garde-fou (default 200) + --vwb-url URL URL VWB (default http://localhost:5002) + +Sortie : +- tableau Markdown récapitulatif +- exit code 0 si tous les steps OK / 1 sinon + +Ne dépend PAS de Windows, ne modifie aucun fichier serveur. +Pré-requis : streaming server (5005) + VWB backend (5002) actifs. +""" +from __future__ import annotations + +import argparse +import base64 +import glob +import json +import os +import sys +import time +import uuid +from dataclasses import dataclass, asdict +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import requests + +# YAML est optionnel : si absent, on génère du JSON pour l'export d'attendus +try: + import yaml as _yaml +except ImportError: + _yaml = None + + +# ========================================================================== +# Configuration par défaut +# ========================================================================== +ROOT = Path(__file__).resolve().parent.parent +ENV_FILE = ROOT / ".env.local" +DEFAULT_BASE_URL = "http://localhost:5005" +DEFAULT_VWB_URL = "http://localhost:5002" +DEFAULT_HEARTBEAT_GLOB = str( + ROOT / "data" / "training" / "live_sessions" / "*" / "shots" / "heartbeat_*.png" +) +DEFAULT_HEARTBEAT_GLOB_BG = str( + ROOT / "data" / "training" / "live_sessions" / "bg_*" / "shots" / "heartbeat_*.png" +) + + +def _load_token() -> str: + """Lit RPA_API_TOKEN depuis l'env ou .env.local.""" + if "RPA_API_TOKEN" in os.environ and os.environ["RPA_API_TOKEN"]: + return os.environ["RPA_API_TOKEN"] + if ENV_FILE.exists(): + for line in ENV_FILE.read_text().splitlines(): + line = line.strip() + if line.startswith("RPA_API_TOKEN="): + return line.split("=", 1)[1].strip().strip('"').strip("'") + return "" + + +def _find_latest_heartbeat() -> Optional[str]: + """Cherche le dernier heartbeat sur disque utilisable comme fixture. + + Préfère les heartbeats `bg_*` (capturés en arrière-plan, pleine résolution) + aux heartbeats sess_* qui peuvent être tronqués (bug mss.monitors[1] + capturant la barre des tâches, cf. resolve_engine.py). + Filtre aussi sur la taille minimale (1200x800) pour ignorer les crops. + """ + from PIL import Image + + def _is_full_size(path: str) -> bool: + try: + with Image.open(path) as im: + return im.width >= 1200 and im.height >= 800 + except Exception: + return False + + # 1. Chercher d'abord dans bg_* + bg_candidates = [ + f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB_BG) + if "_blurred" not in f and os.path.isfile(f) + ] + bg_candidates = [f for f in bg_candidates if _is_full_size(f)] + if bg_candidates: + bg_candidates.sort(key=lambda f: os.path.getmtime(f), reverse=True) + return bg_candidates[0] + + # 2. Fallback sur sess_*, mais en filtrant les tronqués + other = [ + f for f in glob.glob(DEFAULT_HEARTBEAT_GLOB) + if "_blurred" not in f and os.path.isfile(f) + ] + other = [f for f in other if _is_full_size(f)] + if other: + other.sort(key=lambda f: os.path.getmtime(f), reverse=True) + return other[0] + return None + + +# ========================================================================== +# Modèles légers (pas d'import Pydantic pour rester rapide à charger) +# ========================================================================== +@dataclass +class StepReport: + order: int + action_id: str + action_type: str + by_text: str + method: str = "" + score: float = 0.0 + x_pct: Optional[float] = None + y_pct: Optional[float] = None + status: str = "?" # OK / FAIL / SKIP / PAUSED + diag: str = "" + elapsed_ms: float = 0.0 + + +# ========================================================================== +# Client mock +# ========================================================================== +class ReplayMockClient: + """Simule l'Agent V1 contre le serveur de streaming.""" + + def __init__( + self, + base_url: str, + vwb_url: str, + token: str, + session_id: str, + machine_id: str, + screenshot_path: str, + verbose: bool = False, + auto_resume: bool = True, + execution_mode: str = "autonomous", + timeout_poll: float = 8.0, + single_step: Optional[int] = None, + max_iter: int = 200, + ) -> None: + self.base_url = base_url.rstrip("/") + self.vwb_url = vwb_url.rstrip("/") + self.token = token + self.session_id = session_id + self.machine_id = machine_id + self.screenshot_path = screenshot_path + self.verbose = verbose + self.auto_resume = auto_resume + self.execution_mode = execution_mode + self.timeout_poll = timeout_poll + self.single_step = single_step + self.max_iter = max_iter + + self._session = requests.Session() + if token: + self._session.headers.update({"Authorization": f"Bearer {token}"}) + + # cache du screenshot encodé (gros) + self._screenshot_b64: Optional[str] = None + self._screen_w: int = 1920 + self._screen_h: int = 1080 + self._load_screenshot() + + self.replay_id: Optional[str] = None + self.reports: List[StepReport] = [] + self._action_counter = 0 + self._resumes_done = 0 + + # ---- helpers ------------------------------------------------------ + def _load_screenshot(self) -> None: + from PIL import Image # imported lazily + + with open(self.screenshot_path, "rb") as f: + data = f.read() + self._screenshot_b64 = base64.b64encode(data).decode("ascii") + with Image.open(self.screenshot_path) as img: + self._screen_w, self._screen_h = img.size + + def _log(self, msg: str) -> None: + if self.verbose: + ts = time.strftime("%H:%M:%S") + print(f"[{ts}] {msg}", flush=True) + + def _post(self, path: str, json_body: Dict[str, Any]) -> Dict[str, Any]: + url = f"{self.base_url}{path}" + if self.verbose: + self._log(f"POST {path} body={json.dumps(json_body)[:200]}") + resp = self._session.post(url, json=json_body, timeout=60) + if self.verbose: + self._log(f" → {resp.status_code} {resp.text[:300]}") + resp.raise_for_status() + return resp.json() if resp.text else {} + + def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + url = f"{self.base_url}{path}" + resp = self._session.get(url, params=params, timeout=self.timeout_poll) + resp.raise_for_status() + return resp.json() if resp.text else {} + + # ---- lifecycle ---------------------------------------------------- + def cancel_stale_replays(self) -> None: + """Annule les replays running/paused pour cette machine, pour éviter les collisions.""" + try: + data = self._get("/api/v1/traces/stream/replays") + except Exception as e: + self._log(f"cancel_stale: get replays échoué : {e}") + return + for r in data.get("replays", []): + if r.get("machine_id") == self.machine_id and r.get("status") in ( + "running", "paused_need_help", + ): + rid = r.get("replay_id") + self._log(f"cancel stale replay {rid} (status={r.get('status')})") + try: + self._post(f"/api/v1/traces/stream/replay/{rid}/cancel", {}) + except Exception as e: + self._log(f"cancel {rid} échoué : {e}") + + def register_session(self) -> None: + """Enregistre la session de test côté serveur.""" + # POST /register avec session_id en query (pas JSON body) + url = f"{self.base_url}/api/v1/traces/stream/register" + resp = self._session.post( + url, + params={"session_id": self.session_id, "machine_id": self.machine_id}, + timeout=10, + ) + resp.raise_for_status() + self._log(f"session registered : {self.session_id} (machine={self.machine_id})") + + def start_replay(self, workflow_id: str) -> Dict[str, Any]: + """Lance un replay via la chaîne réelle VWB → /replay/raw. + + On reproduit ce que fait le frontend (ExecutionControls.tsx) : + 1. GET /api/v3/workflow/{id} pour récupérer les steps + 2. POST /api/v3/execute-windows avec actions[] + session_id/machine_id + (VWB charge les ancres, mappe les types, et POST sur /replay/raw) + """ + # 1. Récupérer le workflow et ses steps depuis VWB + wf_url = f"{self.vwb_url}/api/v3/workflow/{workflow_id}" + resp = self._session.get(wf_url, timeout=15) + resp.raise_for_status() + wf_data = resp.json() + steps = ( + wf_data.get("steps") + or wf_data.get("workflow", {}).get("steps") + or [] + ) + if not steps: + raise RuntimeError( + f"Workflow {workflow_id} : aucune étape récupérée depuis VWB " + f"({wf_url})" + ) + self._log(f"workflow {workflow_id} : {len(steps)} steps récupérées") + + # 2. Construire le payload comme le frontend + actions = [] + for i, step in enumerate(steps): + anchor = step.get("anchor") or {} + actions.append({ + "action_id": step.get("id") or f"action_{i}", + "type": step.get("action_type"), + "parameters": step.get("parameters") or {}, + "anchor_id": anchor.get("id") if anchor else step.get("anchor_id"), + "order": i, + }) + + # 3. POST /api/v3/execute-windows (VWB compile + forward au streaming) + execute_url = f"{self.vwb_url}/api/v3/execute-windows" + body = { + "workflow_id": workflow_id, + "session_id": self.session_id, + "machine_id": self.machine_id, + "actions": actions, + "params": {"execution_mode": self.execution_mode}, + } + if self.verbose: + self._log(f"POST {execute_url} actions={len(actions)}") + resp = self._session.post(execute_url, json=body, timeout=60) + if resp.status_code != 200: + raise RuntimeError( + f"VWB execute-windows {resp.status_code} : {resp.text[:300]}" + ) + data = resp.json() + self.replay_id = data.get("replay_id") + return data + + def get_replay_status(self) -> Dict[str, Any]: + if not self.replay_id: + return {} + try: + return self._get(f"/api/v1/traces/stream/replay/{self.replay_id}") + except Exception: + return {} + + def cancel_replay(self) -> None: + if not self.replay_id: + return + try: + self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/cancel", {}) + except Exception as e: + self._log(f"cancel replay échoué : {e}") + + def resume_replay(self) -> None: + """Auto-resume une pause (mode autonomous bypass mais supervised peut bloquer).""" + if not self.replay_id: + return + # Récupérer les checks à acquitter + ack: List[str] = [] + try: + state = self.get_replay_status() + for c in state.get("safety_checks") or []: + if c.get("required"): + ack.append(c.get("id")) + except Exception: + pass + body: Dict[str, Any] = {"acknowledged_check_ids": ack} + try: + self._post(f"/api/v1/traces/stream/replay/{self.replay_id}/resume", body) + self._resumes_done += 1 + self._log(f"resume OK (checks ack={ack})") + except Exception as e: + self._log(f"resume échoué : {e}") + + # ---- dispatch d'actions ------------------------------------------ + def resolve_target(self, target_spec: Dict[str, Any], strict: bool) -> Dict[str, Any]: + """Appelle /replay/resolve_target côté serveur avec le screenshot fixture.""" + body = { + "session_id": self.session_id, + "screenshot_b64": self._screenshot_b64 or "", + "target_spec": target_spec or {}, + "fallback_x_pct": 0.5, + "fallback_y_pct": 0.5, + "screen_width": self._screen_w, + "screen_height": self._screen_h, + "strict_mode": strict, + } + return self._post("/api/v1/traces/stream/replay/resolve_target", body) + + def dispatch(self, action: Dict[str, Any]) -> StepReport: + """Simule l'exécution d'une action côté client et POST le résultat.""" + self._action_counter += 1 + action_id = action.get("action_id", f"unk_{self._action_counter}") + action_type = action.get("type", "?") + target_spec = action.get("target_spec") or {} + by_text = (target_spec.get("by_text") or "")[:40] + report = StepReport( + order=self._action_counter, + action_id=action_id, + action_type=action_type, + by_text=by_text, + ) + t0 = time.time() + + # ── Action visuelle : resolve_target puis renvoyer success ── + if action_type in ("click", "click_anchor", "double_click"): + try: + res = self.resolve_target(target_spec, strict=bool(action.get("success_strict"))) + report.method = res.get("method", "?") + report.score = float(res.get("score") or 0.0) + report.x_pct = res.get("x_pct") + report.y_pct = res.get("y_pct") + resolved = bool(res.get("resolved")) + if not resolved: + report.status = "FAIL" + report.diag = res.get("reason", res.get("method", ""))[:80] + self._post_result( + action_id, + success=False, + error=f"resolve_failed:{report.method}", + actual_position=None, + resolution_method=report.method, + resolution_score=report.score, + resolution_elapsed_ms=res.get("elapsed_ms"), + target_spec=target_spec, + target_description=by_text, + ) + else: + report.status = "OK" + self._post_result( + action_id, + success=True, + actual_position={"x_pct": report.x_pct, "y_pct": report.y_pct}, + resolution_method=report.method, + resolution_score=report.score, + resolution_elapsed_ms=res.get("elapsed_ms"), + ) + except Exception as e: + report.status = "FAIL" + report.diag = f"client_error:{e}"[:80] + self._post_result(action_id, success=False, error=str(e)[:200]) + # ── Type texte / shortcut clavier / wait : on simule succès ── + elif action_type in ("type_text", "type", "keyboard_shortcut", "key_combo", "wait"): + report.status = "OK" + report.method = "simulated" + report.diag = f"{action_type} simulé" + self._post_result(action_id, success=True) + # ── Actions serveur (extract_text/table, t2a_decision) : + # ne devraient PAS arriver côté client (le serveur les exécute en + # interne dans /replay/next). On marque SKIP pour traçabilité. + elif action_type in ("extract_text", "extract_table", "t2a_decision"): + report.status = "SKIP" + report.method = "server_side" + report.diag = "(action serveur, exécutée en interne)" + else: + report.status = "OK" + report.method = "noop" + report.diag = f"action {action_type} non gérée → success simulé" + self._post_result(action_id, success=True) + + report.elapsed_ms = (time.time() - t0) * 1000 + self.reports.append(report) + return report + + def _post_result( + self, + action_id: str, + success: bool, + error: Optional[str] = None, + warning: Optional[str] = None, + actual_position: Optional[Dict[str, float]] = None, + resolution_method: Optional[str] = None, + resolution_score: Optional[float] = None, + resolution_elapsed_ms: Optional[float] = None, + target_spec: Optional[Dict[str, Any]] = None, + target_description: Optional[str] = None, + ) -> None: + body: Dict[str, Any] = { + "session_id": self.session_id, + "action_id": action_id, + "success": success, + } + if error: + body["error"] = error + if warning: + body["warning"] = warning + if actual_position: + body["actual_position"] = actual_position + if resolution_method: + body["resolution_method"] = resolution_method + if resolution_score is not None: + body["resolution_score"] = float(resolution_score) + if resolution_elapsed_ms is not None: + body["resolution_elapsed_ms"] = float(resolution_elapsed_ms) + # Pour ne pas que le verifier ouvre un Critic VLM (lent), on n'envoie + # PAS de screenshot_before/after (l'action sera marquée comme non + # vérifiée mais avancera quand même). + if target_spec: + body["target_spec"] = target_spec + if target_description: + body["target_description"] = target_description + try: + self._post("/api/v1/traces/stream/replay/result", body) + except Exception as e: + self._log(f"POST result échoué (action {action_id}) : {e}") + + # ---- main loop ---------------------------------------------------- + def run(self) -> None: + iter_count = 0 + last_paused_logged = "" + empty_polls = 0 + while iter_count < self.max_iter: + iter_count += 1 + try: + resp = self._get( + "/api/v1/traces/stream/replay/next", + params={ + "session_id": self.session_id, + "machine_id": self.machine_id, + }, + ) + except requests.exceptions.RequestException as e: + self._log(f"poll {iter_count} : erreur réseau {e}, retry dans 1s") + time.sleep(1) + continue + + # Pause supervisée (paused_need_help) ? + if resp.get("replay_paused"): + msg = (resp.get("pause_message") or "")[:120] + + # Distinguer pause volontaire (user_request, safety_checks) vs + # pause d'échec (target_not_found, wrong_window, system_dialog). + # Pour les pauses d'échec, l'auto-resume relance la même action + # qui échouera encore — on ne resume qu'une fois max pour ne + # pas boucler infiniment. + state = self.get_replay_status() + failed = state.get("failed_action") or {} + pause_reason = failed.get("reason") or "" + is_failure_pause = pause_reason in ( + "target_not_found", "wrong_window", "system_dialog", + ) + + if msg != last_paused_logged: + self._log(f"PAUSE ({pause_reason or 'user'}) : {msg}") + last_paused_logged = msg + + # Marquer le report comme PAUSED (une seule fois) + if not self.reports or self.reports[-1].status != "PAUSED": + self._action_counter += 1 + self.reports.append( + StepReport( + order=self._action_counter, + action_id=resp.get("replay_id", "?"), + action_type=f"pause:{pause_reason or 'user'}", + by_text=(failed.get("target_description") or "")[:32], + status="PAUSED", + diag=msg[:80], + ) + ) + + if not self.auto_resume: + self._log("--auto-resume désactivé : on stoppe.") + break + + if is_failure_pause and self._resumes_done > 5: + self._log( + f"Trop de resumes ({self._resumes_done}) sur des " + f"pauses d'échec — stop pour éviter la boucle." + ) + break + + time.sleep(0.5) + self.resume_replay() + last_paused_logged = "" + continue + + action = resp.get("action") + if action is None: + # Pas d'action en attente : peut-être terminé, peut-être server_busy + if resp.get("server_busy"): + time.sleep(0.5) + continue + state = self.get_replay_status() + status = state.get("status", "?") + if status in ("completed", "cancelled", "error", "failed"): + self._log(f"replay terminé status={status}") + break + empty_polls += 1 + if empty_polls > 30: # 30 polls vides = ~30s : on lève le doute + self._log("Trop de polls vides, on stoppe.") + break + time.sleep(0.5) + continue + empty_polls = 0 + self.dispatch(action) + + if self.single_step is not None and self._action_counter >= self.single_step: + self._log(f"--single-step {self.single_step} atteint, stop.") + break + + if iter_count >= self.max_iter: + self._log(f"WARN : max_iter ({self.max_iter}) atteint.") + + # Réconciliation : récupérer les actions exécutées côté serveur + # (extract_text, extract_table, t2a_decision) qui ne sont jamais + # passées par /replay/next côté client. + try: + state = self.get_replay_status() + seen_ids = {r.action_id for r in self.reports} + for res in state.get("results") or []: + aid = res.get("action_id") + if aid in seen_ids: + continue + # Heuristique : ce sont des actions serveur non vues + ok = bool(res.get("success")) + self._action_counter += 1 + self.reports.append(StepReport( + order=self._action_counter, + action_id=aid or "?", + action_type="(server)", + by_text="", + method="server_side", + status="OK" if ok else "FAIL", + diag=(res.get("error") or "")[:60], + )) + except Exception as e: + self._log(f"reconciliation skipped : {e}") + + # ---- rapport ------------------------------------------------------ + def render_report(self) -> str: + out: List[str] = [] + out.append("") + out.append("| # | Type | by_text | Méthode | Score | Pos résolue | Status | Diag |") + out.append("|----|------------------|----------------------------------|----------------------|-------|----------------------|---------|------|") + for r in self.reports: + pos = ( + f"({r.x_pct:.4f}, {r.y_pct:.4f})" + if r.x_pct is not None and r.y_pct is not None + else "-" + ) + score = f"{r.score:.2f}" if r.method else "-" + out.append( + f"| {r.order:<2} | {r.action_type:<16} | {r.by_text[:32]:<32} | " + f"{r.method[:20]:<20} | {score:<5} | {pos:<20} | {r.status:<7} | {r.diag[:60]} |" + ) + out.append("") + return "\n".join(out) + + def export_expected(self, path: Path) -> None: + """Sérialise les résolutions actuelles comme attendus de référence.""" + data = { + "workflow_session_id": self.session_id, + "screenshot": str(self.screenshot_path), + "steps": [asdict(r) for r in self.reports], + } + if path.suffix in (".yaml", ".yml") and _yaml is not None: + path.write_text(_yaml.safe_dump(data, sort_keys=False, allow_unicode=True)) + else: + # fallback JSON + path.write_text(json.dumps(data, indent=2, ensure_ascii=False)) + self._log(f"Attendus exportés vers {path}") + + def compare_to_expected(self, expected_path: Path) -> Tuple[int, int]: + """Compare reports vs attendus. Retourne (matching, total).""" + if not expected_path.exists(): + print(f"[expected] fichier introuvable : {expected_path}") + return (0, len(self.reports)) + if expected_path.suffix in (".yaml", ".yml") and _yaml is not None: + expected = _yaml.safe_load(expected_path.read_text()) + else: + expected = json.loads(expected_path.read_text()) + steps = expected.get("steps") or [] + ok = 0 + for actual, exp in zip(self.reports, steps): + same_method = (actual.method == exp.get("method", "")) or ( + actual.method.startswith("hybrid_") and exp.get("method", "").startswith("hybrid_") + ) + same_status = actual.status == exp.get("status", "") + if same_method and same_status: + ok += 1 + return (ok, len(steps) if steps else len(self.reports)) + + +# ========================================================================== +# CLI +# ========================================================================== +def main(argv: Optional[List[str]] = None) -> int: + parser = argparse.ArgumentParser( + description="Harness E2E pour rejouer un workflow contre le serveur sans Léa V1." + ) + parser.add_argument("--workflow-id", default="wf_a38aeebea5e6_1778162737", + help="ID du workflow (default: Urgence_aiva_demo)") + parser.add_argument("--shot", default=None, + help="Path screenshot fixture (default: dernier heartbeat)") + parser.add_argument("--base-url", default=DEFAULT_BASE_URL, + help="URL streaming server (default 5005)") + parser.add_argument("--vwb-url", default=DEFAULT_VWB_URL, + help="URL VWB backend (default 5002)") + parser.add_argument("--token", default=None, + help="RPA_API_TOKEN (default: lit .env.local)") + parser.add_argument("--session-id", default=None, + help="(default: test_e2e_)") + parser.add_argument("--machine-id", default=None, + help="(default: test_e2e_machine_)") + parser.add_argument("--auto-resume", action="store_true", + help="auto-acquitter pause_for_human") + parser.add_argument("--no-auto-resume", action="store_true", + help="stop dès qu'une pause est rencontrée") + parser.add_argument("--execution-mode", choices=("autonomous", "supervised"), + default="autonomous") + parser.add_argument("--single-step", type=int, default=None) + parser.add_argument("--verbose", action="store_true") + parser.add_argument("--timeout-poll", type=float, default=8.0) + parser.add_argument("--max-iter", type=int, default=200) + parser.add_argument("--export-expected", type=Path, default=None, + help="Exporter le run en YAML/JSON d'attendus") + parser.add_argument("--expected", type=Path, default=None, + help="Comparer le run à ce YAML/JSON d'attendus") + args = parser.parse_args(argv) + + token = args.token or _load_token() + if not token: + print("WARN : pas de RPA_API_TOKEN trouvé.", file=sys.stderr) + + shot = args.shot or _find_latest_heartbeat() + if not shot or not os.path.isfile(shot): + print(f"ERREUR : screenshot introuvable ({shot})", file=sys.stderr) + return 2 + + ts = time.strftime("%Y%m%dT%H%M%S") + session_id = args.session_id or f"test_e2e_sess_{ts}_{uuid.uuid4().hex[:6]}" + machine_id = args.machine_id or f"test_e2e_machine_{ts}" + + auto_resume = True + if args.no_auto_resume: + auto_resume = False + if args.auto_resume: + auto_resume = True + + print(f"[e2e] base_url={args.base_url}") + print(f"[e2e] workflow_id={args.workflow_id}") + print(f"[e2e] shot={shot}") + print(f"[e2e] session_id={session_id}") + print(f"[e2e] machine_id={machine_id}") + print(f"[e2e] mode={args.execution_mode} auto_resume={auto_resume}") + + client = ReplayMockClient( + base_url=args.base_url, + vwb_url=args.vwb_url, + token=token, + session_id=session_id, + machine_id=machine_id, + screenshot_path=shot, + verbose=args.verbose, + auto_resume=auto_resume, + execution_mode=args.execution_mode, + timeout_poll=args.timeout_poll, + single_step=args.single_step, + max_iter=args.max_iter, + ) + + # Healthcheck + try: + h = requests.get(f"{args.base_url}/health", timeout=3).json() + if h.get("status") != "healthy": + print(f"WARN : serveur health={h}") + except Exception as e: + print(f"ERREUR : serveur injoignable sur {args.base_url} ({e})", file=sys.stderr) + return 3 + + client.cancel_stale_replays() + client.register_session() + + t_start = time.time() + final_state: Dict[str, Any] = {} + try: + info = client.start_replay(args.workflow_id) + print(f"[e2e] replay_id={info.get('replay_id')} total_actions={info.get('total_actions')}") + client.run() + # Snapshot l'état AVANT cancel (sinon on voit toujours "cancelled") + try: + final_state = client.get_replay_status() + except Exception: + final_state = {} + finally: + # toujours annuler en sortie pour ne pas laisser un replay actif + try: + client.cancel_replay() + except Exception: + pass + + elapsed = time.time() - t_start + print(client.render_report()) + n_total = len(client.reports) + n_ok = sum(1 for r in client.reports if r.status == "OK") + n_skip = sum(1 for r in client.reports if r.status == "SKIP") + n_paused = sum(1 for r in client.reports if r.status == "PAUSED") + n_fail = sum(1 for r in client.reports if r.status == "FAIL") + print( + f"[e2e] {n_total} steps en {elapsed:.1f}s : " + f"OK={n_ok} SKIP={n_skip} PAUSED={n_paused} FAIL={n_fail} " + f"(resumes auto={client._resumes_done})" + ) + if final_state: + print( + f"[e2e] final replay status={final_state.get('status')} " + f"completed={final_state.get('completed_actions')}/" + f"{final_state.get('total_actions')} " + f"failed={final_state.get('failed_actions')} " + f"retried={final_state.get('retried_actions')}" + ) + for err in (final_state.get("error_log") or [])[-3:]: + print(f" ERR action_id={err.get('action_id')} " + f"error='{err.get('error')}' retry={err.get('retry_count')}") + + if args.export_expected: + client.export_expected(args.export_expected) + + if args.expected: + ok, total = client.compare_to_expected(args.expected) + print(f"[e2e] comparaison attendus : {ok}/{total} steps matchent") + if ok < total: + return 1 + + return 1 if n_fail else 0 + + +if __name__ == "__main__": + sys.exit(main())