Files
rpa_vision_v3/tests/integration/test_stream_processor.py

1292 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Tests d'intégration pour StreamProcessor + LiveSessionManager + StreamWorker.
Vérifie le pipeline complet : session → événements → screenshots → workflow.
Sans GPU/modèles lourds (mocks pour ScreenAnalyzer et CLIP).
"""
import json
import os
import shutil
import sys
import tempfile
import threading
import types
from pathlib import Path
from unittest.mock import MagicMock, patch
import numpy as np
import pytest
# Garantir que la racine du projet est dans sys.path (nécessaire pour les
# imports relatifs de agent_v0.server_v1)
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
@pytest.fixture
def temp_dir():
d = tempfile.mkdtemp(prefix="test_stream_")
yield d
shutil.rmtree(d, ignore_errors=True)
@pytest.fixture
def processor(temp_dir):
from agent_v0.server_v1.stream_processor import StreamProcessor
return StreamProcessor(data_dir=temp_dir)
@pytest.fixture
def worker(temp_dir, processor):
from agent_v0.server_v1.worker_stream import StreamWorker
return StreamWorker(live_dir=temp_dir, processor=processor)
# =========================================================================
# LiveSessionManager
# =========================================================================
class TestLiveSessionManager:
def test_register_and_get(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
s = mgr.register_session("sess_001")
assert s.session_id == "sess_001"
assert mgr.get_session("sess_001") is s
def test_get_or_create(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
s1 = mgr.get_or_create("sess_002")
s2 = mgr.get_or_create("sess_002")
assert s1 is s2
def test_add_event_updates_window_info(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_event("sess_003", {
"type": "mouse_click",
"window": {"title": "Firefox", "app_name": "firefox"},
})
session = mgr.get_session("sess_003")
assert session.last_window_info["title"] == "Firefox"
assert len(session.events) == 1
def test_add_screenshot(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_screenshot("sess_004", "shot_001", "/tmp/shot_001.png")
session = mgr.get_session("sess_004")
assert session.shot_paths["shot_001"] == "/tmp/shot_001.png"
def test_finalize(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.register_session("sess_005")
session = mgr.finalize("sess_005")
assert session.finalized is True
def test_active_session_count(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager(persist_dir=str(tmp_path / "test_sessions"))
mgr.register_session("a")
mgr.register_session("b")
assert mgr.active_session_count == 2
mgr.finalize("a")
assert mgr.active_session_count == 1
def test_to_raw_session(self):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
mgr = LiveSessionManager()
mgr.add_event("sess_006", {"type": "click", "timestamp": 1000})
mgr.add_screenshot("sess_006", "shot_full_001", "/tmp/full.png")
mgr.add_screenshot("sess_006", "shot_001_crop", "/tmp/crop.png")
raw = mgr.to_raw_session("sess_006")
assert raw is not None
assert raw["session_id"] == "sess_006"
assert len(raw["events"]) == 1
# Les crops sont filtrés
assert len(raw["screenshots"]) == 1
assert raw["screenshots"][0]["screenshot_id"] == "shot_full_001"
def test_discovers_bg_session_machine_id_from_root_folder(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
live_dir = tmp_path / "live_sessions"
session_dir = live_dir / "bg_DESKTOP-58D5CAC_windows"
session_dir.mkdir(parents=True)
(session_dir / "live_events.jsonl").write_text("{}", encoding="utf-8")
mgr = LiveSessionManager(
persist_dir=str(tmp_path / "persist"),
live_sessions_dir=str(live_dir),
)
session = mgr.get_session("bg_DESKTOP-58D5CAC_windows")
assert session is not None
assert session.machine_id == "DESKTOP-58D5CAC_windows"
def test_loads_persisted_bg_session_with_machine_id_inferred(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
persist_dir = tmp_path / "persist"
persist_dir.mkdir()
(persist_dir / "bg_DESKTOP-58D5CAC_windows.json").write_text(
'{"session_id":"bg_DESKTOP-58D5CAC_windows","machine_id":"default",'
'"events":[],"shot_paths":{},"last_window_info":{"title":"Unknown","app_name":"unknown"},'
'"created_at":"2026-05-20T14:00:00","last_activity":"2026-05-20T14:00:00",'
'"finalized":false,"window_titles_seen":{},"app_names_seen":{}}',
encoding="utf-8",
)
mgr = LiveSessionManager(persist_dir=str(persist_dir))
session = mgr.get_session("bg_DESKTOP-58D5CAC_windows")
assert session is not None
assert session.machine_id == "DESKTOP-58D5CAC_windows"
def test_find_active_agent_session_falls_back_to_bg_machine_session(self, tmp_path):
from agent_v0.server_v1.live_session_manager import LiveSessionManager
from agent_v0.server_v1.replay_engine import _find_active_agent_session
mgr = LiveSessionManager(persist_dir=str(tmp_path / "persist"))
mgr.register_session(
"sess_20260520T102916_066851",
machine_id="DESKTOP-58D5CAC_windows",
)
mgr.finalize("sess_20260520T102916_066851")
mgr.register_session("bg_DESKTOP-58D5CAC_windows")
active = _find_active_agent_session(mgr, machine_id="DESKTOP-58D5CAC_windows")
assert active == "bg_DESKTOP-58D5CAC_windows"
# =========================================================================
# StreamProcessor
# =========================================================================
class TestStreamProcessor:
def test_default_initialization_stays_light(self, temp_dir):
"""Par défaut, l'API HTTP ne charge pas les composants VLM/GPU."""
from agent_v0.server_v1.stream_processor import StreamProcessor
test_processor = StreamProcessor(data_dir=temp_dir)
test_processor._ensure_initialized()
assert test_processor._initialized is True
assert test_processor._screen_analyzer is None
assert test_processor._clip_embedder is None
assert test_processor._faiss_manager is None
def test_enable_vlm_initialization_loads_components(self, temp_dir, monkeypatch):
"""Le worker VLM peut explicitement charger ScreenAnalyzer/CLIP/FAISS."""
from agent_v0.server_v1.stream_processor import StreamProcessor
screen_module = types.ModuleType("core.pipeline.screen_analyzer")
clip_module = types.ModuleType("core.embedding.clip_embedder")
state_module = types.ModuleType("core.embedding.state_embedding_builder")
faiss_module = types.ModuleType("core.embedding.faiss_manager")
class FakeScreenAnalyzer:
def __init__(self, session_id=""):
self.session_id = session_id
class FakeCLIPEmbedder:
pass
class FakeStateEmbeddingBuilder:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
class FakeFAISSManager:
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.index = MagicMock(ntotal=0)
screen_module.ScreenAnalyzer = FakeScreenAnalyzer
clip_module.CLIPEmbedder = FakeCLIPEmbedder
state_module.StateEmbeddingBuilder = FakeStateEmbeddingBuilder
faiss_module.FAISSManager = FakeFAISSManager
monkeypatch.setitem(sys.modules, "core.pipeline.screen_analyzer", screen_module)
monkeypatch.setitem(sys.modules, "core.embedding.clip_embedder", clip_module)
monkeypatch.setitem(sys.modules, "core.embedding.state_embedding_builder", state_module)
monkeypatch.setitem(sys.modules, "core.embedding.faiss_manager", faiss_module)
test_processor = StreamProcessor(data_dir=temp_dir, enable_vlm=True)
test_processor._ensure_initialized()
assert test_processor._initialized is True
assert isinstance(test_processor._screen_analyzer, FakeScreenAnalyzer)
assert isinstance(test_processor._clip_embedder, FakeCLIPEmbedder)
assert isinstance(test_processor._state_embedding_builder, FakeStateEmbeddingBuilder)
assert isinstance(test_processor._faiss_manager, FakeFAISSManager)
def test_enable_vlm_screen_analyzer_failure_does_not_cache_broken_state(
self, temp_dir, monkeypatch, caplog
):
"""N1 anti-poison : en mode VLM, si ScreenAnalyzer échoue à l'init, ne PAS
figer _initialized=True (sinon le worker reste cassé à vie, cf. blocage R6
des 5 jours). Doit logger en critical et permettre un retry au cycle suivant.
"""
import logging
from agent_v0.server_v1.stream_processor import StreamProcessor
screen_module = types.ModuleType("core.pipeline.screen_analyzer")
clip_module = types.ModuleType("core.embedding.clip_embedder")
state_module = types.ModuleType("core.embedding.state_embedding_builder")
faiss_module = types.ModuleType("core.embedding.faiss_manager")
class BrokenScreenAnalyzer:
def __init__(self, session_id=""):
raise RuntimeError("CUDA indisponible au démarrage du worker")
class HealedScreenAnalyzer:
def __init__(self, session_id=""):
self.session_id = session_id
class FakeCLIPEmbedder:
pass
class FakeStateEmbeddingBuilder:
def __init__(self, *args, **kwargs):
pass
class FakeFAISSManager:
def __init__(self, *args, **kwargs):
self.index = MagicMock(ntotal=0)
screen_module.ScreenAnalyzer = BrokenScreenAnalyzer
clip_module.CLIPEmbedder = FakeCLIPEmbedder
state_module.StateEmbeddingBuilder = FakeStateEmbeddingBuilder
faiss_module.FAISSManager = FakeFAISSManager
monkeypatch.setitem(sys.modules, "core.pipeline.screen_analyzer", screen_module)
monkeypatch.setitem(sys.modules, "core.embedding.clip_embedder", clip_module)
monkeypatch.setitem(sys.modules, "core.embedding.state_embedding_builder", state_module)
monkeypatch.setitem(sys.modules, "core.embedding.faiss_manager", faiss_module)
test_processor = StreamProcessor(data_dir=temp_dir, enable_vlm=True)
with caplog.at_level(logging.CRITICAL):
test_processor._ensure_initialized()
# Pas de cache à vie : l'état reste retry-able
assert test_processor._initialized is False
assert test_processor._screen_analyzer is None
assert any(rec.levelno == logging.CRITICAL for rec in caplog.records), (
"un log critical doit signaler le worker VLM dégradé"
)
# Retry au cycle suivant : ScreenAnalyzer réparé → init réussit cette fois
screen_module.ScreenAnalyzer = HealedScreenAnalyzer
test_processor._ensure_initialized()
assert test_processor._initialized is True
assert isinstance(test_processor._screen_analyzer, HealedScreenAnalyzer)
def test_worker_writes_health_file_with_component_status(self, tmp_path, monkeypatch):
"""N2 : le worker écrit _worker_health.json avec le statut des composants
dérivé du processor, le pid, les stats et le statut global."""
from agent_v0.server_v1 import run_worker
data_dir = tmp_path / "data" / "training"
data_dir.mkdir(parents=True)
monkeypatch.setattr(run_worker, "DATA_DIR", data_dir)
worker = run_worker.VLMWorker()
class FakeProc:
_enable_vlm = True
_screen_analyzer = object()
_clip_embedder = object()
_faiss_manager = object()
_state_embedding_builder = object()
worker._processor = FakeProc()
worker._stats["sessions_processed"] = 1
worker._stats["total_screenshots_analyzed"] = 7
worker._write_health("healthy")
health_path = data_dir / "_worker_health.json"
assert health_path.exists()
data = json.loads(health_path.read_text(encoding="utf-8"))
assert data["status"] == "healthy"
assert data["pid"] == os.getpid()
assert data["components"] == {
"screen_analyzer": True,
"clip_embedder": True,
"faiss_manager": True,
"state_embedding_builder": True,
}
assert data["stats"]["sessions_processed"] == 1
assert data["stats"]["total_screenshots_analyzed"] == 7
def test_worker_health_degraded_when_screen_analyzer_missing(self, tmp_path, monkeypatch):
"""N2 : worker VLM dont le ScreenAnalyzer est absent => status 'degraded',
même si l'appelant demande 'healthy'."""
from agent_v0.server_v1 import run_worker
data_dir = tmp_path / "data" / "training"
data_dir.mkdir(parents=True)
monkeypatch.setattr(run_worker, "DATA_DIR", data_dir)
worker = run_worker.VLMWorker()
class DegradedProc:
_enable_vlm = True
_screen_analyzer = None
_clip_embedder = object()
_faiss_manager = object()
_state_embedding_builder = None
worker._processor = DegradedProc()
worker._write_health("healthy")
data = json.loads((data_dir / "_worker_health.json").read_text(encoding="utf-8"))
assert data["status"] == "degraded"
assert data["components"]["screen_analyzer"] is False
def test_worker_health_file_contains_no_patient_data(self, tmp_path, monkeypatch):
"""N2 confidentialité : le health file ne contient que des clés autorisées —
aucune donnée patient (OCR, noms de fichiers screenshots, contenu session)."""
from agent_v0.server_v1 import run_worker
data_dir = tmp_path / "data" / "training"
data_dir.mkdir(parents=True)
monkeypatch.setattr(run_worker, "DATA_DIR", data_dir)
worker = run_worker.VLMWorker()
worker._current_session = "sess_20260529T154427_f95956"
worker._write_health("busy")
data = json.loads((data_dir / "_worker_health.json").read_text(encoding="utf-8"))
allowed_top = {
"pid", "started_at", "last_cycle", "current_session",
"queue_length", "components", "stats", "status",
}
assert set(data.keys()) <= allowed_top, f"clés inattendues: {set(data.keys()) - allowed_top}"
# current_session ne porte que l'identifiant, pas de contenu de session
assert data["current_session"] == "sess_20260529T154427_f95956"
def test_sd_notify_noop_without_socket(self, monkeypatch):
"""N3 : hors systemd (NOTIFY_SOCKET absent), _sd_notify est un no-op
silencieux qui retourne False — jamais d'exception."""
from agent_v0.server_v1 import run_worker
monkeypatch.delenv("NOTIFY_SOCKET", raising=False)
worker = run_worker.VLMWorker()
assert worker._sd_notify("WATCHDOG=1") is False
def test_sd_notify_sends_watchdog_to_socket(self, tmp_path, monkeypatch):
"""N3 : sous systemd, _sd_notify écrit l'état brut dans $NOTIFY_SOCKET."""
import socket
from agent_v0.server_v1 import run_worker
sock_path = str(tmp_path / "notify.sock")
listener = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
listener.bind(sock_path)
listener.settimeout(2)
try:
monkeypatch.setenv("NOTIFY_SOCKET", sock_path)
worker = run_worker.VLMWorker()
assert worker._sd_notify("WATCHDOG=1") is True
received = listener.recv(64)
assert received == b"WATCHDOG=1"
finally:
listener.close()
def test_vlm_worker_uses_training_root_data_dir(self, tmp_path, monkeypatch):
"""Le worker R6 doit produire workflows/embeddings sous data/training."""
from agent_v0.server_v1 import run_worker
data_dir = tmp_path / "data" / "training"
live_sessions_dir = data_dir / "live_sessions"
monkeypatch.setattr(run_worker, "DATA_DIR", data_dir)
monkeypatch.setattr(run_worker, "LIVE_SESSIONS_DIR", live_sessions_dir)
test_worker = run_worker.VLMWorker()
test_processor = test_worker._get_processor()
assert test_processor.data_dir == data_dir
assert test_processor.session_manager._live_sessions_dir == live_sessions_dir
assert test_processor._enable_vlm is True
def test_stream_worker_standalone_uses_training_root_data_dir(self, tmp_path):
"""Le StreamWorker standalone garde aussi data/training comme racine."""
from agent_v0.server_v1.worker_stream import StreamWorker
live_sessions_dir = tmp_path / "data" / "training" / "live_sessions"
test_worker = StreamWorker(live_dir=str(live_sessions_dir))
assert test_worker.processor.data_dir == live_sessions_dir.parent
assert test_worker.processor.session_manager._live_sessions_dir == live_sessions_dir
assert test_worker.processor._enable_vlm is True
def test_process_event(self, processor):
result = processor.process_event("sess_010", {
"type": "mouse_click",
"timestamp": 1234,
"window": {"title": "Chrome", "app_name": "chrome"},
})
assert result["status"] == "event_recorded"
session = processor.session_manager.get_session("sess_010")
assert session.last_window_info["title"] == "Chrome"
def test_restore_user_events_keeps_key_combo(self, processor, tmp_path):
session_id = "sess_restore_combo"
session_dir = tmp_path / session_id
session_dir.mkdir()
(session_dir / "live_events.jsonl").write_text(
json.dumps({
"session_id": session_id,
"timestamp": 1779900720.0,
"event": {
"type": "key_combo",
"keys": ["win", "s"],
"raw_keys": [
{"action": "release", "kind": "vk", "vk": 83, "char": "s"},
{"action": "release", "kind": "key", "name": "cmd"},
],
"timestamp": 1779900719.5,
"window": {"title": "Rechercher", "app_name": "SearchHost.exe"},
"screenshot_id": "shot_0001",
},
}) + "\n"
+ json.dumps({
"session_id": session_id,
"timestamp": 1779900725.0,
"event": {
"type": "text_input",
"text": "test",
"timestamp": 1779900725.0,
"window": {"title": "Rechercher", "app_name": "SearchHost.exe"},
},
}) + "\n",
encoding="utf-8",
)
processor.session_manager.add_event(session_id, {"type": "text_input", "text": "old"})
processor._restore_user_events(session_id, session_dir)
session = processor.session_manager.get_session(session_id)
assert [event["type"] for event in session.events] == ["key_combo", "text_input"]
assert session.events[0]["keys"] == ["win", "s"]
assert session.events[0]["raw_keys"][0]["vk"] == 83
assert session.events[0]["screenshot_id"] == "shot_0001"
def test_process_crop(self, processor):
result = processor.process_crop("sess_011", "shot_001_crop", "/tmp/crop.png")
assert result["status"] == "crop_stored"
def test_process_screenshot_no_analyzer(self, processor):
"""Sans ScreenAnalyzer, retourne un résultat minimal."""
# Forcer l'initialisation sans modèles GPU
processor._initialized = True
processor._screen_analyzer = None
processor._faiss_manager = None
result = processor.process_screenshot("sess_012", "shot_001", "/tmp/full.png")
assert result["shot_id"] == "shot_001"
assert result["state_id"] is None # Pas d'analyse
assert result["ui_elements_count"] == 0
@patch("agent_v0.server_v1.stream_processor.StreamProcessor._ensure_initialized")
def test_process_screenshot_with_mock_analyzer(self, mock_init, processor):
"""Avec un ScreenAnalyzer mocké, vérifie le flux complet."""
from core.models.screen_state import (
ScreenState, WindowContext, RawLevel,
PerceptionLevel, ContextLevel, EmbeddingRef,
)
mock_state = ScreenState(
screen_state_id="state_001",
timestamp="2026-01-01T00:00:00",
session_id="sess_013",
window=WindowContext(app_name="test", window_title="Test", screen_resolution=[1920, 1080]),
raw=RawLevel(screenshot_path="/tmp/test.png", capture_method="mss", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="test", vector_id="v1", dimensions=512),
detected_text=["Bonjour", "Valider"],
text_detection_method="mock",
confidence_avg=0.9,
),
context=ContextLevel(),
ui_elements=[MagicMock(), MagicMock(), MagicMock()],
)
processor._screen_analyzer = MagicMock()
processor._screen_analyzer.analyze.return_value = mock_state
processor._faiss_manager = None
processor._initialized = True
result = processor.process_screenshot("sess_013", "shot_full", "/tmp/full.png")
assert result["state_id"] == "state_001"
assert result["ui_elements_count"] == 3
assert result["text_detected"] == 2
# Le ScreenState est stocké pour le build final
assert len(processor._screen_states["sess_013"]) == 1
def test_finalize_insufficient_data(self, processor):
"""Finalisation avec pas assez de données."""
processor._initialized = True
processor.session_manager.register_session("sess_014")
result = processor.finalize_session("sess_014")
assert result["status"] == "insufficient_data"
def test_stats(self, processor):
stats = processor.stats
assert stats["active_sessions"] == 0
assert stats["total_workflows"] == 0
assert stats["initialized"] is False
def test_build_replay_does_not_compile_save_dialog_open_as_switch_tab(
self, tmp_path, monkeypatch,
):
"""`Enregistrer sous` same-app n'est pas un onglet.
Régression live 2026-05-23 : un clic menu dans Notepad était
recompilé en faux `switch_tab`, ce qui injectait un clic parasite
avant la vraie ouverture de dialog.
"""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [820, 630],
"button": "left",
"screenshot_id": "shot_001",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [320, 520, 2240, 1636],
"click_relative": [500, 110],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.2,
"pos": [860, 562],
"button": "left",
"screenshot_id": "shot_002",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [320, 520, 2240, 1636],
"click_relative": [540, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.35,
"from": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"to": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.6,
"pos": [997, 743],
"button": "left",
"screenshot_id": "shot_003",
"window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
]
actions = sp.build_replay_from_raw_events(
events, session_id="sess_save_dialog", session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 3
assert all(
(c.get("target_spec", {}).get("context_hints") or {}).get("interaction") != "switch_tab"
for c in clicks
)
assert clicks[1].get("expected_window_title") == "Enregistrer sous"
assert clicks[2].get("expected_window_before") == "Enregistrer sous"
def test_build_replay_tab_switch_focus_belongs_to_latest_click_only(
self, tmp_path, monkeypatch,
):
"""Le focus d'onglet doit être rattaché au dernier clic causal."""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1410, 562],
"button": "left",
"screenshot_id": "shot_001",
"window": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1087, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.1,
"pos": [1514, 562],
"button": "left",
"screenshot_id": "shot_002",
"window": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1191, 40],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.2,
"from": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"to": {
"title": "Sans titre Bloc-notes",
"app_name": "Notepad.exe",
},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_intervening_click",
session_dir=str(session_dir),
)
assert len(actions) == 2
first_hints = actions[0].get("target_spec", {}).get("context_hints") or {}
second_hints = actions[1].get("target_spec", {}).get("context_hints") or {}
assert first_hints.get("interaction") != "switch_tab"
assert actions[1]["target_spec"]["by_text"] == "Sans titre"
assert actions[1]["target_spec"]["by_role"] == "tab"
assert second_hints.get("interaction") == "switch_tab"
def test_build_replay_infers_close_tab_before_save_dialog(
self, tmp_path, monkeypatch,
):
"""Le clic sur le x d'onglet actif doit être sémantisé comme close_tab."""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: None)
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123", "by_role": "yolo"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1814, 560],
"button": "left",
"screenshot_id": "shot_001",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1491, 38],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.3,
"pos": [1183, 1156],
"button": "left",
"screenshot_id": "shot_002",
"window": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [860, 634],
"window_size": [1920, 1116],
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.5,
"from": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
"to": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_close_tab",
session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 2
first_spec = clicks[0].get("target_spec", {})
first_hints = first_spec.get("context_hints") or {}
assert first_spec.get("by_role") == "tab_close_button"
assert first_spec.get("by_text", "") == ""
assert first_hints.get("interaction") == "close_tab"
assert first_hints.get("active_tab_label") == "test"
assert "fermer l'onglet actif 'test'" in first_spec.get("vlm_description", "")
def test_build_replay_save_as_button_gets_semantic_target(
self, tmp_path, monkeypatch,
):
"""Le clic du bouton Enregistrer dans Save As ne doit pas rester
anchor-only/positionnel.
Régression live 2026-05-25 : avec RPA_SKIP_BUILD_VISION, l'action
Save As était seulement décrite par position + crop, puis résolue par
template matching trop haut/gauche. Le builder doit encoder le bouton
primaire stable ``Enregistrer``.
"""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: "abc123")
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {
"anchor_image_base64": "abc123",
"by_text": "",
"by_role": "",
"vlm_description": "positionnel",
},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1329, 1265],
"button": "left",
"screenshot_id": "shot_006",
"screen_metadata": {"screen_resolution": [2560, 1600]},
"window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [332, 522, 1613, 1323],
"click_relative": [997, 743],
"window_size": [1281, 801],
"click_inside_window": True,
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.2,
"from": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
"to": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_save_as_button",
session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 1
spec = clicks[0].get("target_spec", {})
hints = spec.get("context_hints") or {}
assert spec.get("by_text") == "Enregistrer"
assert spec.get("by_text_source") == "heuristic"
assert spec.get("by_role") == "button"
assert spec.get("window_title") == "Enregistrer sous"
assert hints.get("interaction") == "save_dialog_primary_button"
assert hints.get("expected_after_window") == "*test Bloc-notes"
assert clicks[0].get("expected_window_title") == "*test Bloc-notes"
def test_build_replay_cuts_post_save_out_of_window_click(
self, tmp_path, monkeypatch,
):
"""Le clic hors fenêtre après retour Bloc-notes est parasite.
C'est l'ancienne action finale 17/18 : coordonnées en bas à droite,
``click_inside_window=false``. Elle ne fait pas partie du coeur
"saisir et enregistrer".
"""
from agent_v0.server_v1 import stream_processor as sp
session_dir = tmp_path / "sess"
(session_dir / "shots").mkdir(parents=True)
monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: "abc123")
monkeypatch.setattr(
sp,
"enrich_click_from_screenshot",
lambda *args, **kwargs: {"anchor_image_base64": "abc123"},
)
monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None)
monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None)
events = [
{"event": {
"type": "mouse_click",
"timestamp": 1.0,
"pos": [1329, 1265],
"button": "left",
"screenshot_id": "shot_006",
"screen_metadata": {"screen_resolution": [2560, 1600]},
"window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
"window_capture": {
"rect": [332, 522, 1613, 1323],
"click_relative": [997, 743],
"window_size": [1281, 801],
"click_inside_window": True,
},
}},
{"event": {
"type": "window_focus_change",
"timestamp": 1.2,
"from": {"title": "Enregistrer sous", "app_name": "Notepad.exe"},
"to": {"title": "*test Bloc-notes", "app_name": "Notepad.exe"},
}},
{"event": {
"type": "mouse_click",
"timestamp": 1.5,
"pos": [2248, 1577],
"button": "left",
"screenshot_id": "shot_007",
"screen_metadata": {"screen_resolution": [2560, 1600]},
"window": {
"title": "http192.168.1.408765dossier.htmlid=.txt Bloc-notes",
"app_name": "Notepad.exe",
},
"window_capture": {
"rect": [323, 522, 2243, 1638],
"click_relative": [1925, 1055],
"window_size": [1920, 1116],
"click_inside_window": False,
},
}},
]
actions = sp.build_replay_from_raw_events(
events,
session_id="sess_post_save_cut",
session_dir=str(session_dir),
)
clicks = [a for a in actions if a.get("type") == "click"]
assert len(clicks) == 1
assert clicks[0].get("target_spec", {}).get("by_text") == "Enregistrer"
assert clicks[0].get("expected_window_title") == "*test Bloc-notes"
# =========================================================================
# StreamWorker
# =========================================================================
class TestStreamWorker:
def test_process_event_direct(self, worker):
result = worker.process_event_direct("sess_020", {"type": "click"})
assert result["status"] == "event_recorded"
def test_process_crop_direct(self, worker):
result = worker.process_crop_direct("sess_021", "crop_001", "/tmp/crop.png")
assert result["status"] == "crop_stored"
def test_stats(self, worker):
stats = worker.stats
assert "active_sessions" in stats
def test_poll_reads_events_from_disk(self, worker, temp_dir):
"""Le worker lit les événements JSONL depuis le disque."""
session_dir = Path(temp_dir) / "test_sess"
session_dir.mkdir()
event_file = session_dir / "live_events.jsonl"
event_file.write_text(
json.dumps({"type": "click", "timestamp": 100}) + "\n"
+ json.dumps({"type": "key_press", "keys": ["enter"], "timestamp": 200}) + "\n"
)
# Simuler un tour de polling
worker._check_live_sessions()
session = worker.processor.session_manager.get_session("test_sess")
assert session is not None
assert len(session.events) == 2
# =========================================================================
# GraphBuilder precomputed_states
# =========================================================================
class TestGraphBuilderPrecomputed:
def test_accepts_precomputed_states(self):
"""GraphBuilder.build_from_session accepte precomputed_states."""
import inspect
from core.graph.graph_builder import GraphBuilder
sig = inspect.signature(GraphBuilder.build_from_session)
assert "precomputed_states" in sig.parameters
def test_raises_without_screenshots_or_states(self):
"""Erreur si ni screenshots ni precomputed_states."""
from core.graph.graph_builder import GraphBuilder
from core.models.raw_session import RawSession
builder = GraphBuilder(min_pattern_repetitions=2)
session = MagicMock(spec=RawSession)
session.screenshots = []
session.session_id = "empty"
with pytest.raises(ValueError, match="no screenshots"):
builder.build_from_session(session)
def test_skips_screen_state_creation_with_precomputed(self):
"""Avec precomputed_states, _create_screen_states n'est pas appelé."""
from core.graph.graph_builder import GraphBuilder
from core.models.raw_session import RawSession
builder = GraphBuilder(min_pattern_repetitions=2)
builder._create_screen_states = MagicMock()
# Mock du reste du pipeline
fake_embedding = np.random.randn(512).astype(np.float32)
fake_embedding /= np.linalg.norm(fake_embedding)
builder._compute_embeddings = MagicMock(return_value=[fake_embedding, fake_embedding])
builder._detect_patterns = MagicMock(return_value={})
builder._build_nodes = MagicMock(return_value=[])
builder._build_edges = MagicMock(return_value=[])
session = MagicMock(spec=RawSession)
session.session_id = "precomp"
session.screenshots = []
fake_states = [MagicMock(), MagicMock()]
builder.build_from_session(session, precomputed_states=fake_states)
# _create_screen_states ne doit PAS être appelé
builder._create_screen_states.assert_not_called()
# _compute_embeddings doit recevoir les precomputed states
builder._compute_embeddings.assert_called_once_with(fake_states)
# =========================================================================
# Thread-safety de StreamProcessor
# =========================================================================
class TestStreamProcessorThreadSafety:
"""Vérifie que les accès concurrents aux dicts internes sont protégés."""
def test_has_data_lock(self, processor):
"""StreamProcessor possède un _data_lock dédié."""
assert hasattr(processor, "_data_lock")
assert isinstance(processor._data_lock, type(threading.Lock()))
def test_concurrent_screen_states_access(self, processor):
"""Accès concurrent à _screen_states ne lève pas d'erreur."""
processor._initialized = True
processor._screen_analyzer = None
errors = []
def add_states(session_id):
try:
for i in range(50):
with processor._data_lock:
if session_id not in processor._screen_states:
processor._screen_states[session_id] = []
processor._screen_states[session_id].append(f"state_{i}")
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_states, args=(f"sess_{t}",))
for t in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._screen_states) == 5
def test_concurrent_embeddings_access(self, processor):
"""Accès concurrent à _embeddings ne lève pas d'erreur."""
errors = []
def add_embeddings(session_id):
try:
for i in range(50):
with processor._data_lock:
if session_id not in processor._embeddings:
processor._embeddings[session_id] = []
processor._embeddings[session_id].append(
np.random.randn(512).astype(np.float32)
)
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_embeddings, args=(f"sess_{t}",))
for t in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._embeddings) == 5
def test_concurrent_workflows_access(self, processor):
"""Accès concurrent à _workflows ne lève pas d'erreur."""
errors = []
def add_workflow(wf_id):
try:
mock_wf = MagicMock()
mock_wf.nodes = [1, 2]
mock_wf.edges = [1]
with processor._data_lock:
processor._workflows[wf_id] = mock_wf
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=add_workflow, args=(f"wf_{t}",))
for t in range(10)
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(processor._workflows) == 10
# =========================================================================
# list_sessions / list_workflows
# =========================================================================
class TestStreamProcessorListMethods:
"""Tests pour list_sessions() et list_workflows()."""
def test_list_sessions_empty(self, processor):
result = processor.list_sessions()
assert result == []
def test_list_sessions_with_data(self, processor):
processor.session_manager.register_session("sess_ls_1")
processor.session_manager.add_event("sess_ls_1", {
"type": "click",
"window": {"title": "App", "app_name": "app"},
})
processor.session_manager.add_screenshot("sess_ls_1", "shot_1", "/tmp/s.png")
with processor._data_lock:
processor._screen_states["sess_ls_1"] = ["state_a", "state_b"]
processor._embeddings["sess_ls_1"] = [np.zeros(512)]
sessions = processor.list_sessions()
assert len(sessions) == 1
s = sessions[0]
assert s["session_id"] == "sess_ls_1"
assert s["events_count"] == 1
assert s["screenshots_count"] == 1
assert s["states_count"] == 2
assert s["embeddings_count"] == 1
assert s["finalized"] is False
def test_list_sessions_multiple(self, processor):
processor.session_manager.register_session("a")
processor.session_manager.register_session("b")
processor.session_manager.finalize("b")
sessions = processor.list_sessions()
assert len(sessions) == 2
by_id = {s["session_id"]: s for s in sessions}
assert by_id["a"]["finalized"] is False
assert by_id["b"]["finalized"] is True
def test_list_workflows_empty(self, processor):
result = processor.list_workflows()
assert result == []
def test_list_workflows_with_data(self, processor):
mock_wf = MagicMock()
mock_wf.nodes = [1, 2, 3]
mock_wf.edges = [1, 2]
mock_wf.name = "test_workflow"
with processor._data_lock:
processor._workflows["wf_001"] = mock_wf
workflows = processor.list_workflows()
assert len(workflows) == 1
wf = workflows[0]
assert wf["workflow_id"] == "wf_001"
assert wf["nodes"] == 3
assert wf["edges"] == 2
assert wf["name"] == "test_workflow"
# =========================================================================
# API endpoints (sessions / workflows)
# =========================================================================
class TestAPIEndpoints:
"""Tests pour les endpoints GET sessions et workflows."""
# Token de test fixe utilisé pour tous les tests d'API.
# Doit être défini AVANT le premier import de agent_v0.server_v1.api_stream
# car le module fail-closed (sys.exit 1) si RPA_API_TOKEN est absent.
_TEST_API_TOKEN = "test_token_for_api_endpoints_0123456789abcdef"
@pytest.fixture(autouse=True)
def _ensure_api_token(self, monkeypatch):
"""Garantit que RPA_API_TOKEN est défini avant l'import de api_stream.
Le module agent_v0.server_v1.api_stream applique un fail-closed P0-C
(sys.exit 1) à l'import si RPA_API_TOKEN est absent. On force donc
une valeur de test ici avant tout import lazy dans la fixture client.
"""
monkeypatch.setenv("RPA_API_TOKEN", self._TEST_API_TOKEN)
# Si api_stream est déjà chargé dans sys.modules avec un autre token
# (par ex. depuis un précédent test), on aligne sa valeur API_TOKEN
# pour que les requêtes Bearer du test passent l'auth.
api_stream_mod = sys.modules.get("agent_v0.server_v1.api_stream")
if api_stream_mod is not None:
monkeypatch.setattr(api_stream_mod, "API_TOKEN", self._TEST_API_TOKEN)
@pytest.fixture
def client(self, temp_dir):
"""Client de test FastAPI."""
from fastapi.testclient import TestClient
from agent_v0.server_v1 import api_stream
from agent_v0.server_v1.stream_processor import StreamProcessor
from agent_v0.server_v1.worker_stream import StreamWorker
# Remplacer le processor global par un processor de test
original_processor = api_stream.processor
original_worker = api_stream.worker
test_processor = StreamProcessor(data_dir=temp_dir)
api_stream.processor = test_processor
api_stream.worker = StreamWorker(
live_dir=temp_dir, processor=test_processor
)
client = TestClient(api_stream.app, raise_server_exceptions=False)
# Récupérer le token API pour les requêtes authentifiées
token = api_stream.API_TOKEN
yield client, test_processor, token
# Restaurer
api_stream.processor = original_processor
api_stream.worker = original_worker
def test_get_sessions_empty(self, client):
c, _, token = client
resp = c.get("/api/v1/traces/stream/sessions", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["sessions"] == []
def test_get_sessions_with_data(self, client):
c, proc, token = client
proc.session_manager.register_session("api_sess_1")
proc.session_manager.add_event("api_sess_1", {"type": "click"})
resp = c.get("/api/v1/traces/stream/sessions", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
sessions = resp.json()["sessions"]
assert len(sessions) == 1
assert sessions[0]["session_id"] == "api_sess_1"
assert sessions[0]["events_count"] == 1
def test_get_workflows_empty(self, client):
c, _, token = client
resp = c.get("/api/v1/traces/stream/workflows", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
data = resp.json()
assert data["workflows"] == []
def test_get_workflows_with_data(self, client):
c, proc, token = client
mock_wf = MagicMock()
mock_wf.nodes = [1, 2]
mock_wf.edges = [1]
mock_wf.name = "api_test_wf"
with proc._data_lock:
proc._workflows["wf_api_001"] = mock_wf
resp = c.get("/api/v1/traces/stream/workflows", headers={"Authorization": f"Bearer {token}"})
assert resp.status_code == 200
workflows = resp.json()["workflows"]
assert len(workflows) == 1
assert workflows[0]["workflow_id"] == "wf_api_001"
assert workflows[0]["nodes"] == 2