Files
rpa_vision_v3/tests/integration/test_stream_processor.py
Dom 7df51d2c79 snapshot: WIP 5j replay reliability (B1 watchdog + dialog handlers + grounding drift)
Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé,
polls morts ×2). Point de rollback stable.

Contenu:
- agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab
  hotkey fallback, confirm_save Unicode apostrophe, foreground dialog
  recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint,
  requires_post_verify_window_transition
- agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed)
- server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s
- server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan,
  metrics endpoint
- server_v1/replay_engine.py: _schedule_retry préserve original_action +
  dispatched_action
- stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab
  on save_as dialog open) + _attach_expected_window_before
- tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py
- tests/unit: test_executor_verify_window_guard.py (start_button, close_tab,
  runtime_dialog, post_verify, transition fallbacks)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 16:48:37 +02:00

833 lines
32 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tests d'intégration pour StreamProcessor + LiveSessionManager + StreamWorker.
Vérifie le pipeline complet : session → événements → screenshots → workflow.
Sans GPU/modèles lourds (mocks pour ScreenAnalyzer et CLIP).
"""
import json
import os
import shutil
import sys
import tempfile
import threading
from pathlib import Path
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
# Garantir que la racine du projet est dans sys.path (nécessaire pour les
# imports relatifs de agent_v0.server_v1)
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
@pytest.fixture
def temp_dir():
d = tempfile.mkdtemp(prefix="test_stream_")
yield d
shutil.rmtree(d, ignore_errors=True)
@pytest.fixture
def processor(temp_dir):
from agent_v0.server_v1.stream_processor import StreamProcessor
return StreamProcessor(data_dir=temp_dir)
@pytest.fixture
def worker(temp_dir, processor):
from agent_v0.server_v1.worker_stream import StreamWorker
return StreamWorker(live_dir=temp_dir, processor=processor)
# =========================================================================
# LiveSessionManager
# =========================================================================
class TestLiveSessionManager:
def test_register_and_get(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
s = mgr.register_session("sess_001")
assert s.session_id == "sess_001"
assert mgr.get_session("sess_001") is s
def test_get_or_create(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
s1 = mgr.get_or_create("sess_002")
s2 = mgr.get_or_create("sess_002")
assert s1 is s2
def test_add_event_updates_window_info(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_event("sess_003", {
"type": "mouse_click",
"window": {"title": "Firefox", "app_name": "firefox"},
})
session = mgr.get_session("sess_003")
assert session.last_window_info["title"] == "Firefox"
assert len(session.events) == 1
def test_add_screenshot(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_screenshot("sess_004", "shot_001", "/tmp/shot_001.png")
session = mgr.get_session("sess_004")
assert session.shot_paths["shot_001"] == "/tmp/shot_001.png"
def test_finalize(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.register_session("sess_005")
session = mgr.finalize("sess_005")
assert session.finalized is True
def test_active_session_count(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager(persist_dir=str(tmp_path / "test_sessions"))
mgr.register_session("a")
mgr.register_session("b")
assert mgr.active_session_count == 2
mgr.finalize("a")
assert mgr.active_session_count == 1
def test_to_raw_session(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_event("sess_006", {"type": "click", "timestamp": 1000})
mgr.add_screenshot("sess_006", "shot_full_001", "/tmp/full.png")
mgr.add_screenshot("sess_006", "shot_001_crop", "/tmp/crop.png")
raw = mgr.to_raw_session("sess_006")
assert raw is not None
assert raw["session_id"] == "sess_006"
assert len(raw["events"]) == 1
# Les crops sont filtrés
assert len(raw["screenshots"]) == 1
assert raw["screenshots"][0]["screenshot_id"] == "shot_full_001"
def test_discovers_bg_session_machine_id_from_root_folder(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
live_dir = tmp_path / "live_sessions"
session_dir = live_dir / "bg_DESKTOP-58D5CAC_windows"
session_dir.mkdir(parents=True)
(session_dir / "live_events.jsonl").write_text("{}", encoding="utf-8")
mgr = LiveSessionManager(
persist_dir=str(tmp_path / "persist"),
live_sessions_dir=str(live_dir),
)
session = mgr.get_session("bg_DESKTOP-58D5CAC_windows")
assert session is not None
assert session.machine_id == "DESKTOP-58D5CAC_windows"
def test_loads_persisted_bg_session_with_machine_id_inferred(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
persist_dir = tmp_path / "persist"
persist_dir.mkdir()
(persist_dir / "bg_DESKTOP-58D5CAC_windows.json").write_text(
'{"session_id":"bg_DESKTOP-58D5CAC_windows","machine_id":"default",'
'"events":[],"shot_paths":{},"last_window_info":{"title":"Unknown","app_name":"unknown"},'
'"created_at":"2026-05-20T14:00:00","last_activity":"2026-05-20T14:00:00",'
'"finalized":false,"window_titles_seen":{},"app_names_seen":{}}',
encoding="utf-8",
)
mgr = LiveSessionManager(persist_dir=str(persist_dir))
session = mgr.get_session("bg_DESKTOP-58D5CAC_windows")
assert session is not None
assert session.machine_id == "DESKTOP-58D5CAC_windows"
def test_find_active_agent_session_falls_back_to_bg_machine_session(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
from agent_v0.server_v1.replay_engine import _find_active_agent_session
mgr = LiveSessionManager(persist_dir=str(tmp_path / "persist"))
mgr.register_session(
"sess_20260520T102916_066851",
machine_id="DESKTOP-58D5CAC_windows",
)
mgr.finalize("sess_20260520T102916_066851")
mgr.register_session("bg_DESKTOP-58D5CAC_windows")
active = _find_active_agent_session(mgr, machine_id="DESKTOP-58D5CAC_windows")
assert active == "bg_DESKTOP-58D5CAC_windows"
# =========================================================================
# StreamProcessor
# =========================================================================
class TestStreamProcessor:
def test_process_event(self, processor):
result = processor.process_event("sess_010", {
"type": "mouse_click",
"timestamp": 1234,
"window": {"title": "Chrome", "app_name": "chrome"},
})
assert result["status"] == "event_recorded"
session = processor.session_manager.get_session("sess_010")
assert session.last_window_info["title"] == "Chrome"
def test_process_crop(self, processor):
result = processor.process_crop("sess_011", "shot_001_crop", "/tmp/crop.png")
assert result["status"] == "crop_stored"
def test_process_screenshot_no_analyzer(self, processor):
"""Sans ScreenAnalyzer, retourne un résultat minimal."""
# Forcer l'initialisation sans modèles GPU
processor._initialized = True
processor._screen_analyzer = None
processor._faiss_manager = None
result = processor.process_screenshot("sess_012", "shot_001", "/tmp/full.png")
assert result["shot_id"] == "shot_001"
assert result["state_id"] is None # Pas d'analyse
assert result["ui_elements_count"] == 0
@patch("agent_v0.server_v1.stream_processor.StreamProcessor._ensure_initialized")
def test_process_screenshot_with_mock_analyzer(self, mock_init, processor):
"""Avec un ScreenAnalyzer mocké, vérifie le flux complet."""
from core.models.screen_state import (
ScreenState, WindowContext, RawLevel,
PerceptionLevel, ContextLevel, EmbeddingRef,
)
mock_state = ScreenState(
screen_state_id="state_001",
timestamp="2026-01-01T00:00:00",
session_id="sess_013",
window=WindowContext(app_name="test", window_title="Test", screen_resolution=[1920, 1080]),
raw=RawLevel(screenshot_path="/tmp/test.png", capture_method="mss", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="test", vector_id="v1", dimensions=512),
detected_text=["Bonjour", "Valider"],
text_detection_method="mock",
confidence_avg=0.9,
),
context=ContextLevel(),
ui_elements=[MagicMock(), MagicMock(), MagicMock()],
)
processor._screen_analyzer = MagicMock()
processor._screen_analyzer.analyze.return_value = mock_state
processor._faiss_manager = None
processor._initialized = True
result = processor.process_screenshot("sess_013", "shot_full", "/tmp/full.png")
assert result["state_id"] == "state_001"
assert result["ui_elements_count"] == 3
assert result["text_detected"] == 2
# Le ScreenState est stocké pour le build final
assert len(processor._screen_states["sess_013"]) == 1
def test_finalize_insufficient_data(self, processor):
"""Finalisation avec pas assez de données."""
processor._initialized = True
processor.session_manager.register_session("sess_014")
result = processor.finalize_session("sess_014")
assert result["status"] == "insufficient_data"
def test_stats(self, processor):
stats = processor.stats
assert stats["active_sessions"] == 0
assert stats["total_workflows"] == 0
assert stats["initialized"] is False
def test_build_replay_does_not_compile_save_dialog_open_as_switch_tab(
self, tmp_path, monkeypatch,
):
"""`Enregistrer sous` same-app n'est pas un onglet.
Régression live 2026-05-23 : un clic menu dans Notepad était
recompilé en faux `switch_tab`, ce qui injectait un clic parasite
avant la vraie ouverture de dialog.
"""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [820, 630],
"button": "left",
"screenshot_id": "shot_001",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [320, 520, 2240, 1636],
"click_relative": [500, 110],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.2,
"pos": [860, 562],
"button": "left",
"screenshot_id": "shot_002",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [320, 520, 2240, 1636],
"click_relative": [540, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.35,
"from": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"to": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.6,
"pos": [997, 743],
"button": "left",
"screenshot_id": "shot_003",
"window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
]
actions = sp.build_replay_from_raw_events(
events, session_id="sess_save_dialog", session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 3
assert all(
(c.get("target_spec", {}).get("context_hints") or {}).get("interaction") != "switch_tab"
for c in clicks
)
assert clicks[1].get("expected_window_title") == "Enregistrer sous"
assert clicks[2].get("expected_window_before") == "Enregistrer sous"
def test_build_replay_tab_switch_focus_belongs_to_latest_click_only(
self, tmp_path, monkeypatch,
):
"""Le focus d'onglet doit être rattaché au dernier clic causal."""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1410, 562],
"button": "left",
"screenshot_id": "shot_001",
"window": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1087, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.1,
"pos": [1514, 562],
"button": "left",
"screenshot_id": "shot_002",
"window": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1191, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.2,
"from": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"to": {
"title": "Sans titre Bloc-notes",
"app_name": "Notepad.exe",
},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_intervening_click",
session_dir=str(session_dir),
)
assert len(actions) == 2
first_hints = actions[0].get("target_spec", {}).get("context_hints") or {}
second_hints = actions[1].get("target_spec", {}).get("context_hints") or {}
assert first_hints.get("interaction") != "switch_tab"
assert actions[1]["target_spec"]["by_text"] == "Sans titre"
assert actions[1]["target_spec"]["by_role"] == "tab"
assert second_hints.get("interaction") == "switch_tab"
def test_build_replay_infers_close_tab_before_save_dialog(
self, tmp_path, monkeypatch,
):
"""Le clic sur le x d'onglet actif doit être sémantisé comme close_tab."""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1814, 560],
"button": "left",
"screenshot_id": "shot_001",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1491, 38],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.3,
"pos": [1183, 1156],
"button": "left",
"screenshot_id": "shot_002",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [860, 634],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.5,
"from": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"to": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_close_tab",
session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 2
first_spec = clicks[0].get("target_spec", {})
first_hints = first_spec.get("context_hints") or {}
assert first_spec.get("by_role") == "tab_close_button"
assert first_spec.get("by_text", "") == ""
assert first_hints.get("interaction") == "close_tab"
assert first_hints.get("active_tab_label") == "test"
assert "fermer l'onglet actif 'test'" in first_spec.get("vlm_description", "")
# =========================================================================
# StreamWorker
# =========================================================================
class TestStreamWorker:
def test_process_event_direct(self, worker):
result = worker.process_event_direct("sess_020", {"type": "click"})
assert result["status"] == "event_recorded"
def test_process_crop_direct(self, worker):
result = worker.process_crop_direct("sess_021", "crop_001", "/tmp/crop.png")
assert result["status"] == "crop_stored"
def test_stats(self, worker):
stats = worker.stats
assert "active_sessions" in stats
def test_poll_reads_events_from_disk(self, worker, temp_dir):
"""Le worker lit les événements JSONL depuis le disque."""
session_dir = Path(temp_dir) / "test_sess"
session_dir.mkdir()
event_file = session_dir / "live_events.jsonl"
event_file.write_text(
json.dumps({"type": "click", "timestamp": 100}) + "\n"
+ json.dumps({"type": "key_press", "keys": ["enter"], "timestamp": 200}) + "\n"
)
# Simuler un tour de polling
worker._check_live_sessions()
session = worker.processor.session_manager.get_session("test_sess")
assert session is not None
assert len(session.events) == 2
# =========================================================================
# GraphBuilder precomputed_states
# =========================================================================
class TestGraphBuilderPrecomputed:
def test_accepts_precomputed_states(self):
"""GraphBuilder.build_from_session accepte precomputed_states."""
import inspect
from core.graph.graph_builder import GraphBuilder
sig = inspect.signature(GraphBuilder.build_from_session)
assert "precomputed_states" in sig.parameters
def test_raises_without_screenshots_or_states(self):
"""Erreur si ni screenshots ni precomputed_states."""
from core.graph.graph_builder import GraphBuilder
from core.models.raw_session import RawSession
builder = GraphBuilder(min_pattern_repetitions=2)
session = MagicMock(spec=RawSession)
session.screenshots = []
session.session_id = "empty"
with pytest.raises(ValueError, match="no screenshots"):
builder.build_from_session(session)
def test_skips_screen_state_creation_with_precomputed(self):
"""Avec precomputed_states, _create_screen_states n'est pas appelé."""
from core.graph.graph_builder import GraphBuilder
from core.models.raw_session import RawSession
builder = GraphBuilder(min_pattern_repetitions=2)
builder._create_screen_states = MagicMock()
# Mock du reste du pipeline
fake_embedding = np.random.randn(512).astype(np.float32)
fake_embedding /= np.linalg.norm(fake_embedding)
builder._compute_embeddings = MagicMock(return_value=[fake_embedding, fake_embedding])
builder._detect_patterns = MagicMock(return_value={})
builder._build_nodes = MagicMock(return_value=[])
builder._build_edges = MagicMock(return_value=[])
session = MagicMock(spec=RawSession)
session.session_id = "precomp"
session.screenshots = []
fake_states = [MagicMock(), MagicMock()]
builder.build_from_session(session, precomputed_states=fake_states)
# _create_screen_states ne doit PAS être appelé
builder._create_screen_states.assert_not_called()
# _compute_embeddings doit recevoir les precomputed states
builder._compute_embeddings.assert_called_once_with(fake_states)
# =========================================================================
# Thread-safety de StreamProcessor
# =========================================================================
class TestStreamProcessorThreadSafety:
"""Vérifie que les accès concurrents aux dicts internes sont protégés."""
def test_has_data_lock(self, processor):
"""StreamProcessor possède un _data_lock dédié."""
assert hasattr(processor, "_data_lock")
assert isinstance(processor._data_lock, type(threading.Lock()))
def test_concurrent_screen_states_access(self, processor):
"""Accès concurrent à _screen_states ne lève pas d'erreur."""
processor._initialized = True
processor._screen_analyzer = None
errors = []
def add_states(session_id):
try:
for i in range(50):
with processor._data_lock:
if session_id not in processor._screen_states:
processor._screen_states[session_id] = []
processor._screen_states[session_id].append(f"state_{i}")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_states, args=(f"sess_{t}",))
for t in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._screen_states) == 5
def test_concurrent_embeddings_access(self, processor):
"""Accès concurrent à _embeddings ne lève pas d'erreur."""
errors = []
def add_embeddings(session_id):
try:
for i in range(50):
with processor._data_lock:
if session_id not in processor._embeddings:
processor._embeddings[session_id] = []
processor._embeddings[session_id].append(
np.random.randn(512).astype(np.float32)
)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_embeddings, args=(f"sess_{t}",))
for t in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._embeddings) == 5
def test_concurrent_workflows_access(self, processor):
"""Accès concurrent à _workflows ne lève pas d'erreur."""
errors = []
def add_workflow(wf_id):
try:
mock_wf = MagicMock()
mock_wf.nodes = [1, 2]
mock_wf.edges = [1]
with processor._data_lock:
processor._workflows[wf_id] = mock_wf
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_workflow, args=(f"wf_{t}",))
for t in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._workflows) == 10
# =========================================================================
# list_sessions / list_workflows
# =========================================================================
class TestStreamProcessorListMethods:
"""Tests pour list_sessions() et list_workflows()."""
def test_list_sessions_empty(self, processor):
result = processor.list_sessions()
assert result == []
def test_list_sessions_with_data(self, processor):
processor.session_manager.register_session("sess_ls_1")
processor.session_manager.add_event("sess_ls_1", {
"type": "click",
"window": {"title": "App", "app_name": "app"},
})
processor.session_manager.add_screenshot("sess_ls_1", "shot_1", "/tmp/s.png")
with processor._data_lock:
processor._screen_states["sess_ls_1"] = ["state_a", "state_b"]
processor._embeddings["sess_ls_1"] = [np.zeros(512)]
sessions = processor.list_sessions()
assert len(sessions) == 1
s = sessions[0]
assert s["session_id"] == "sess_ls_1"
assert s["events_count"] == 1
assert s["screenshots_count"] == 1
assert s["states_count"] == 2
assert s["embeddings_count"] == 1
assert s["finalized"] is False
def test_list_sessions_multiple(self, processor):
processor.session_manager.register_session("a")
processor.session_manager.register_session("b")
processor.session_manager.finalize("b")
sessions = processor.list_sessions()
assert len(sessions) == 2
by_id = {s["session_id"]: s for s in sessions}
assert by_id["a"]["finalized"] is False
assert by_id["b"]["finalized"] is True
def test_list_workflows_empty(self, processor):
result = processor.list_workflows()
assert result == []
def test_list_workflows_with_data(self, processor):
mock_wf = MagicMock()
mock_wf.nodes = [1, 2, 3]
mock_wf.edges = [1, 2]
mock_wf.name = "test_workflow"
with processor._data_lock:
processor._workflows["wf_001"] = mock_wf
workflows = processor.list_workflows()
assert len(workflows) == 1
wf = workflows[0]
assert wf["workflow_id"] == "wf_001"
assert wf["nodes"] == 3
assert wf["edges"] == 2
assert wf["name"] == "test_workflow"
# =========================================================================
# API endpoints (sessions / workflows)
# =========================================================================
class TestAPIEndpoints:
"""Tests pour les endpoints GET sessions et workflows."""
# Token de test fixe utilisé pour tous les tests d'API.
# Doit être défini AVANT le premier import de agent_v0.server_v1.api_stream
# car le module fail-closed (sys.exit 1) si RPA_API_TOKEN est absent.
_TEST_API_TOKEN = "test_token_for_api_endpoints_0123456789abcdef"
@pytest.fixture(autouse=True)
def _ensure_api_token(self, monkeypatch):
"""Garantit que RPA_API_TOKEN est défini avant l'import de api_stream.
Le module agent_v0.server_v1.api_stream applique un fail-closed P0-C
(sys.exit 1) à l'import si RPA_API_TOKEN est absent. On force donc
une valeur de test ici avant tout import lazy dans la fixture client.
"""
monkeypatch.setenv("RPA_API_TOKEN", self._TEST_API_TOKEN)
# Si api_stream est déjà chargé dans sys.modules avec un autre token
# (par ex. depuis un précédent test), on aligne sa valeur API_TOKEN
# pour que les requêtes Bearer du test passent l'auth.
api_stream_mod = sys.modules.get("agent_v0.server_v1.api_stream")
if api_stream_mod is not None:
monkeypatch.setattr(api_stream_mod, "API_TOKEN", self._TEST_API_TOKEN)
@pytest.fixture
def client(self, temp_dir):
"""Client de test FastAPI."""
from fastapi.testclient import TestClient
from agent_v0.server_v1 import api_stream
from agent_v0.server_v1.stream_processor import StreamProcessor
from agent_v0.server_v1.worker_stream import StreamWorker
# Remplacer le processor global par un processor de test
original_processor = api_stream.processor
original_worker = api_stream.worker
test_processor = StreamProcessor(data_dir=temp_dir)
api_stream.processor = test_processor
api_stream.worker = StreamWorker(
live_dir=temp_dir, processor=test_processor
)
client = TestClient(api_stream.app, raise_server_exceptions=False)
# Récupérer le token API pour les requêtes authentifiées
token = api_stream.API_TOKEN
yield client, test_processor, token
# Restaurer
api_stream.processor = original_processor
api_stream.worker = original_worker
def test_get_sessions_empty(self, client):
c, _, token = client
resp = c.get("/api/v1/traces/stream/sessions", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["sessions"] == []
def test_get_sessions_with_data(self, client):
c, proc, token = client
proc.session_manager.register_session("api_sess_1")
proc.session_manager.add_event("api_sess_1", {"type": "click"})
resp = c.get("/api/v1/traces/stream/sessions", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
sessions = resp.json()["sessions"]
assert len(sessions) == 1
assert sessions[0]["session_id"] == "api_sess_1"
assert sessions[0]["events_count"] == 1
def test_get_workflows_empty(self, client):
c, _, token = client
resp = c.get("/api/v1/traces/stream/workflows", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["workflows"] == []
def test_get_workflows_with_data(self, client):
c, proc, token = client
mock_wf = MagicMock()
mock_wf.nodes = [1, 2]
mock_wf.edges = [1]
mock_wf.name = "api_test_wf"
with proc._data_lock:
proc._workflows["wf_api_001"] = mock_wf
resp = c.get("/api/v1/traces/stream/workflows", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
workflows = resp.json()["workflows"]
assert len(workflows) == 1
assert workflows[0]["workflow_id"] == "wf_api_001"
assert workflows[0]["nodes"] == 2