"""Tests Job 1 — préflight replay non destructif. Vérifie les fonctions pures du préflight (`_detect_dialogs_static`, `_build_preflight_report`) et la garantie de non-destruction (aucune mutation des états replay). Le préflight prouve `commande → workflow → actions non vides → dialogues statiquement détectables` SANS injecter d'action ni poser de lock. """ import os import sys from pathlib import Path ROOT = Path(__file__).resolve().parents[2] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) os.environ.setdefault("RPA_AUTH_DISABLED", "true") from agent_v0.server_v1 import api_stream # --- Workflow dict minimal avec un node dialogue "Enregistrer sous" --- def _wf_with_save_dialog(): return { "name": "Bloc-notes save test", "nodes": [ {"node_id": "n1", "template": {"window": {"title_contains": "Bloc-notes"}}}, {"node_id": "n2", "template": {"window": {"title_contains": "Enregistrer sous"}}}, ], } def _wf_without_dialog(): return { "name": "Explorateur simple", "nodes": [ {"node_id": "n1", "template": {"window": {"title_contains": "Explorateur"}}}, ], } _ACTIONS = [ {"type": "key_combo", "keys": "ctrl+s"}, {"type": "text_input", "text": "test_lea.txt"}, {"type": "click", "x_pct": 0.45, "y_pct": 0.61}, ] def test_detect_dialogs_static_finds_save_as(): dialogs = api_stream._detect_dialogs_static(_wf_with_save_dialog()) assert any("enregistrer sous" in d.lower() for d in dialogs) def test_detect_dialogs_static_empty_when_none(): assert api_stream._detect_dialogs_static(_wf_without_dialog()) == [] def test_build_preflight_report_shape(): report = api_stream._build_preflight_report( _wf_with_save_dialog(), "wf_123", _ACTIONS ) assert report["workflow_known"] is True assert report["workflow_id"] == "wf_123" assert report["n_actions"] == 3 assert report["action_types"]["key_combo"] == 1 assert report["action_types"]["click"] == 1 assert report["non_destructive"] is True assert any("enregistrer sous" in d.lower() for d in report["dialogs_detected"]) # sample_actions limité (≤3) et présent assert 0 < len(report["sample_actions"]) <= 3 def test_build_preflight_report_does_not_mutate_replay_state(): before_q = dict(api_stream._replay_queues) before_s = dict(api_stream._replay_states) api_stream._build_preflight_report(_wf_with_save_dialog(), "wf_123", _ACTIONS) assert dict(api_stream._replay_queues) == before_q assert dict(api_stream._replay_states) == before_s