Files
rpa_vision_v3/tests/test_phase0_integration.py
Dom cf495dd82f feat: chat unifié, GestureCatalog, Copilot, Léa UI, extraction données, vérification replay
Refonte majeure du système Agent Chat et ajout de nombreux modules :

- Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat
  avec résolution en 3 niveaux (workflow → geste → "montre-moi")
- GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique,
  substitution automatique dans les replays, et endpoint /api/gestures
- Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket
  (approve/skip/abort) avant chaque action
- Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent
  pour feedback visuel pendant le replay
- Data Extraction (core/extraction/) : moteur d'extraction visuelle de données
  (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel
- ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison
  de screenshots, avec logique de retry (max 3)
- IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés
- Dashboard : nouvelles pages gestures, streaming, extractions
- Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants
- Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410,
  suppression du code hardcodé _plan_to_replay_actions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-15 10:02:09 +01:00

884 lines
31 KiB
Python

"""
Tests d'integration Phase 0 - RPA Vision V3
Couvre les modules fondamentaux de la Phase 0 :
- SessionRecorder (core/capture/session_recorder.py)
- ScreenAnalyzer (core/pipeline/screen_analyzer.py)
- EventListener (core/capture/event_listener.py)
- GraphBuilder -> WorkflowPipeline connection (_extract_node_vector)
Auteur : Dom, Claude - 11 mars 2026
"""
import json
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest.mock import MagicMock, patch, PropertyMock
import numpy as np
import pytest
from PIL import Image
from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext
from core.models.screen_state import (
ScreenState,
RawLevel,
PerceptionLevel,
ContextLevel,
WindowContext,
EmbeddingRef,
)
from core.models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures
from core.models.base_models import BBox
# =============================================================================
# Fixtures partagees
# =============================================================================
@pytest.fixture
def sample_raw_event():
"""Evenement brut au format EventListener."""
return {
"t": 1.234,
"type": "mouse_click",
"button": "left",
"pos": [500, 300],
"window": {"title": "Test Window", "app_name": "test_app"},
"screenshot_id": None,
}
@pytest.fixture
def sample_session(tmp_path):
"""RawSession minimale avec screenshots."""
session = RawSession(
session_id="test_session_001",
agent_version="rpa_vision_v3",
environment={"os": "linux", "screen": {"primary_resolution": [1920, 1080]}},
user={"id": "tester"},
context={"workflow": "test_workflow", "tags": []},
started_at=datetime.now(),
)
return session
@pytest.fixture
def test_image_path(tmp_path):
"""Creer une image PNG de test et retourner son chemin."""
img = Image.new("RGB", (200, 100), color=(70, 130, 180))
path = tmp_path / "test_screenshot.png"
img.save(str(path))
return str(path)
@pytest.fixture
def test_image_with_shapes(tmp_path):
"""Creer une image PNG plus elaboree avec formes geometriques."""
img = Image.new("RGB", (800, 600), color=(240, 240, 240))
# Ajouter un rectangle (simule un bouton)
from PIL import ImageDraw
draw = ImageDraw.Draw(img)
draw.rectangle([50, 50, 200, 90], fill=(0, 120, 215))
draw.rectangle([50, 120, 300, 160], fill=(255, 255, 255), outline=(180, 180, 180))
draw.rectangle([50, 200, 150, 240], fill=(76, 175, 80))
path = tmp_path / "test_ui_screenshot.png"
img.save(str(path))
return str(path)
# =============================================================================
# 1. SessionRecorder
# =============================================================================
class TestSessionRecorderDirectoryStructure:
"""Verifier que SessionRecorder cree la bonne arborescence de repertoires."""
def test_start_creates_session_directory(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
# Mocker EventListener pour eviter la dependance pynput
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
session_id = recorder.start(
workflow_name="test_wf", session_id="sess_test_001"
)
assert session_id == "sess_test_001"
# Verifier la structure : output_dir / session_id / session_id / screenshots
session_dir = tmp_path / "sessions" / "sess_test_001"
screenshots_dir = session_dir / "sess_test_001" / "screenshots"
assert session_dir.exists(), f"Session dir missing: {session_dir}"
assert screenshots_dir.exists(), f"Screenshots dir missing: {screenshots_dir}"
# Nettoyer
recorder._running = False
def test_start_generates_session_id_when_none(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
session_id = recorder.start(workflow_name="auto_id_wf")
assert session_id.startswith("session_")
assert len(session_id) > len("session_")
recorder._running = False
class TestSessionRecorderRawSession:
"""Verifier que SessionRecorder produit une RawSession valide."""
def test_produces_valid_raw_session(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(
workflow_name="valid_session", session_id="sess_valid"
)
session = recorder._session
assert session is not None
assert isinstance(session, RawSession)
assert session.session_id == "sess_valid"
assert session.agent_version == "rpa_vision_v3"
assert session.schema_version == "rawsession_v1"
assert session.started_at is not None
assert isinstance(session.started_at, datetime)
assert session.environment.get("os") is not None
assert session.user.get("id") is not None
assert session.context.get("workflow") == "valid_session"
recorder._running = False
def test_stop_sets_ended_at_and_saves_json(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(workflow_name="stop_test", session_id="sess_stop")
session = recorder.stop()
assert session.ended_at is not None
assert isinstance(session.ended_at, datetime)
assert session.ended_at >= session.started_at
# Verifier que le fichier JSON est cree
json_path = tmp_path / "sessions" / "sess_stop" / "sess_stop.json"
assert json_path.exists(), f"Session JSON missing: {json_path}"
# Verifier que le JSON est valide et deserialisable
with open(json_path, "r") as f:
data = json.load(f)
assert data["session_id"] == "sess_stop"
assert data["schema_version"] == "rawsession_v1"
class TestSessionRecorderLifecycle:
"""Verifier le cycle de vie start/stop."""
def test_is_running_property(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
assert recorder.is_running is False
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_lifecycle")
assert recorder.is_running is True
recorder.stop()
assert recorder.is_running is False
def test_double_start_returns_existing_session_id(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
first_id = recorder.start(session_id="sess_double")
second_id = recorder.start(session_id="sess_other")
assert first_id == "sess_double"
assert second_id == "sess_double" # Doit retourner l'existant
recorder._running = False
def test_stop_without_start(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
# stop() sur un recorder non demarre ne doit pas planter
result = recorder.stop()
assert result is None # _session n'est pas initialise
class TestSessionRecorderEvents:
"""Verifier l'enregistrement des evenements via callback."""
def test_on_raw_event_records_event(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_events")
# Simuler un evenement via le callback interne
raw_event = {
"t": 0.5,
"type": "key_press",
"keys": ["a"],
"window": {"title": "Editor", "app_name": "vim"},
}
recorder._on_raw_event(raw_event)
assert recorder.event_count == 1
assert recorder._session.events[0].type == "key_press"
assert recorder._session.events[0].t == 0.5
assert recorder._session.events[0].window.title == "Editor"
recorder._running = False
def test_mouse_click_triggers_screenshot(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(
output_dir=str(tmp_path / "sessions"),
screenshot_on_click=True,
)
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_click_ss")
# Mocker _take_screenshot pour retourner un ID
with patch.object(recorder, "_take_screenshot", return_value="ss_0001") as mock_ss:
raw_click = {
"t": 1.0,
"type": "mouse_click",
"button": "left",
"pos": [100, 200],
"window": {"title": "App", "app_name": "app"},
}
recorder._on_raw_event(raw_click)
mock_ss.assert_called_once()
assert recorder._session.events[0].screenshot_id == "ss_0001"
recorder._running = False
def test_on_event_callback_called(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
callback_received = []
on_event_fn = lambda e: callback_received.append(e)
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_cb", on_event=on_event_fn)
raw_event = {
"t": 0.1,
"type": "key_press",
"keys": ["Enter"],
"window": {"title": "T", "app_name": "a"},
}
recorder._on_raw_event(raw_event)
assert len(callback_received) == 1
assert callback_received[0]["type"] == "key_press"
recorder._running = False
class TestSessionRecorderScreenshots:
"""Verifier la sauvegarde des screenshots via _take_screenshot."""
def test_take_screenshot_saves_file(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_screenshot")
# Creer un mock de ScreenCapturer
fake_frame = np.zeros((100, 200, 3), dtype=np.uint8)
mock_capturer = MagicMock()
mock_capturer.capture_frame.return_value = fake_frame
mock_capturer.save_frame.side_effect = lambda frame, path: Image.fromarray(frame).save(path)
recorder._screen_capturer = mock_capturer
screenshot_id = recorder._take_screenshot()
assert screenshot_id == "ss_0001"
assert recorder.screenshot_count == 1
# Verifier que le screenshot est enregistre dans la session
ss = recorder._session.screenshots[0]
assert ss.screenshot_id == "ss_0001"
assert "screenshots/" in ss.relative_path
assert ss.captured_at is not None
recorder._running = False
def test_take_screenshot_returns_none_without_capturer(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_no_capturer")
# Pas de screen_capturer = _take_screenshot retourne None
# Il faut aussi mocker _ensure_screen_capturer pour empecher la reinit lazy
recorder._screen_capturer = None
with patch.object(recorder, "_ensure_screen_capturer"):
result = recorder._take_screenshot()
assert result is None
recorder._running = False
def test_take_screenshot_handles_capture_failure(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_fail")
mock_capturer = MagicMock()
mock_capturer.capture_frame.return_value = None
recorder._screen_capturer = mock_capturer
result = recorder._take_screenshot()
assert result is None
recorder._running = False
# =============================================================================
# 2. ScreenAnalyzer
# =============================================================================
class TestScreenAnalyzerBuildScreenState:
"""Verifier la construction d'un ScreenState complet 4 niveaux."""
def test_analyze_builds_complete_screen_state(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
# Creer un ScreenAnalyzer sans OCR ni UIDetector
analyzer = ScreenAnalyzer(
ui_detector=None,
ocr_engine=None,
session_id="test_session",
)
state = analyzer.analyze(
screenshot_path=test_image_path,
window_info={"title": "Test App", "app_name": "test"},
)
# Verifier le type
assert isinstance(state, ScreenState)
# Niveau 1 : Raw
assert state.raw is not None
assert state.raw.screenshot_path == test_image_path
assert state.raw.capture_method == "mss"
assert state.raw.file_size_bytes > 0
# Niveau 2 : Perception
assert state.perception is not None
assert isinstance(state.perception.detected_text, list)
assert state.perception.embedding is not None
assert state.perception.embedding.dimensions == 512
# Niveau 3 : UI elements (vide sans detecteur)
assert isinstance(state.ui_elements, list)
# Niveau 4 : Contexte
assert state.context is not None
assert state.window is not None
assert state.window.app_name == "test"
assert state.window.window_title == "Test App"
# Metadata
assert "analyzer_version" in state.metadata
assert state.screen_state_id.startswith("test_session_state_")
def test_analyze_with_default_window_info(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="default_win")
state = analyzer.analyze(screenshot_path=test_image_path)
assert state.window.app_name == "unknown"
assert state.window.window_title == "Unknown"
assert state.window.screen_resolution == [1920, 1080]
def test_analyze_increments_state_counter(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="counter")
state1 = analyzer.analyze(test_image_path)
state2 = analyzer.analyze(test_image_path)
assert state1.screen_state_id == "counter_state_0001"
assert state2.screen_state_id == "counter_state_0002"
def test_analyze_image_from_pil(self, tmp_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="pil_test")
img = Image.new("RGB", (320, 240), color=(100, 200, 50))
save_dir = str(tmp_path / "screens")
state = analyzer.analyze_image(img, save_dir=save_dir)
assert isinstance(state, ScreenState)
assert Path(state.raw.screenshot_path).exists()
assert state.raw.file_size_bytes > 0
class TestScreenAnalyzerOCRFallback:
"""Verifier le fallback OCR quand aucun moteur n'est disponible."""
def test_no_ocr_engine_returns_empty_text(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
# Forcer l'echec de tous les moteurs OCR
analyzer = ScreenAnalyzer(session_id="no_ocr")
# Mocker les createurs OCR pour qu'ils echouent
with patch.object(
analyzer, "_create_doctr_ocr", side_effect=ImportError("doctr not installed")
):
with patch.object(
analyzer,
"_create_tesseract_ocr",
side_effect=ImportError("tesseract not installed"),
):
state = analyzer.analyze(test_image_path)
assert state.perception.detected_text == []
assert state.perception.confidence_avg == 0.0
def test_ocr_method_name_none_when_no_engine(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="method_name")
with patch.object(
analyzer, "_create_doctr_ocr", side_effect=ImportError("no doctr")
):
with patch.object(
analyzer,
"_create_tesseract_ocr",
side_effect=ImportError("no tesseract"),
):
state = analyzer.analyze(test_image_path)
assert state.perception.text_detection_method == "none"
def test_ocr_exception_returns_empty_text(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="ocr_fail")
# Simuler un moteur OCR qui plante a l'appel
def failing_ocr(path):
raise RuntimeError("OCR crashed")
analyzer._ocr = failing_ocr
analyzer._ocr_initialized = True
state = analyzer.analyze(test_image_path)
assert state.perception.detected_text == []
class TestScreenAnalyzerUIDetector:
"""Verifier la gestion d'erreurs du UIDetector."""
def test_ui_detector_failure_returns_empty_elements(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
mock_detector = MagicMock()
mock_detector.detect.side_effect = RuntimeError("Detector crash")
analyzer = ScreenAnalyzer(
ui_detector=mock_detector,
session_id="detector_fail",
)
state = analyzer.analyze(test_image_path)
assert state.ui_elements == []
assert state.metadata["ui_elements_count"] == 0
def test_ui_detector_returns_elements(self, test_image_with_shapes):
from core.pipeline.screen_analyzer import ScreenAnalyzer
# Creer de faux elements UI
mock_elements = [
UIElement(
element_id="btn_001",
type="button",
role="primary_action",
bbox=BBox(x=50, y=50, width=150, height=40),
center=(125, 70),
label="OK",
label_confidence=0.95,
embeddings=UIElementEmbeddings(),
visual_features=VisualFeatures(
dominant_color="blue",
has_icon=False,
shape="rectangle",
size_category="medium",
),
confidence=0.9,
)
]
mock_detector = MagicMock()
mock_detector.detect.return_value = mock_elements
analyzer = ScreenAnalyzer(
ui_detector=mock_detector,
session_id="with_elements",
)
state = analyzer.analyze(test_image_with_shapes)
assert len(state.ui_elements) == 1
assert state.ui_elements[0].element_id == "btn_001"
assert state.metadata["ui_elements_count"] == 1
def test_no_ui_detector_returns_empty_elements(self, test_image_path):
from core.pipeline.screen_analyzer import ScreenAnalyzer
# Mocker _ensure_ui_detector pour qu'il ne fasse rien (pas de detecteur)
analyzer = ScreenAnalyzer(ui_detector=None, session_id="no_detector")
analyzer._ui_detector_initialized = True # Empecher l'init lazy
analyzer._ui_detector = None
state = analyzer.analyze(test_image_path)
assert state.ui_elements == []
# =============================================================================
# 3. EventListener
# =============================================================================
class TestEventListenerDefinition:
"""Verifier que EventListener peut etre defini meme sans pynput."""
def test_class_is_importable(self):
"""Le module est importable meme si pynput est absent."""
# On importe le module — il gere l'absence de pynput gracieusement
from core.capture import event_listener
assert hasattr(event_listener, "EventListener")
assert hasattr(event_listener, "PYNPUT_AVAILABLE")
def test_pynput_available_flag_exists(self):
from core.capture.event_listener import PYNPUT_AVAILABLE
assert isinstance(PYNPUT_AVAILABLE, bool)
def test_init_raises_import_error_without_pynput(self):
"""Si pynput n'est pas disponible, __init__ doit lever ImportError."""
from core.capture import event_listener
original_flag = event_listener.PYNPUT_AVAILABLE
try:
# Simuler l'absence de pynput
event_listener.PYNPUT_AVAILABLE = False
with pytest.raises(ImportError, match="pynput"):
event_listener.EventListener()
finally:
# Restaurer la valeur originale
event_listener.PYNPUT_AVAILABLE = original_flag
def test_init_does_not_raise_with_pynput(self):
"""Si pynput est disponible, __init__ ne doit pas lever d'erreur."""
from core.capture import event_listener
if not event_listener.PYNPUT_AVAILABLE:
pytest.skip("pynput non disponible, impossible de tester l'init normal")
listener = event_listener.EventListener()
assert listener is not None
assert listener.is_running is False
# =============================================================================
# 4. GraphBuilder -> WorkflowPipeline connection (_extract_node_vector)
# =============================================================================
class TestExtractNodeVector:
"""
Verifier que _extract_node_vector dans WorkflowPipeline
lit correctement le prototype depuis node.metadata["_prototype_vector"].
"""
def _make_mock_node(self, metadata=None, template=None):
"""Creer un mock de WorkflowNode."""
node = MagicMock()
node.metadata = metadata or {}
node.template = template
return node
def test_reads_prototype_from_metadata(self):
"""v3 : prototype dans metadata._prototype_vector."""
# Nous importons et instancions uniquement _extract_node_vector
# en mockant le constructeur lourd de WorkflowPipeline.
from core.pipeline.workflow_pipeline import WorkflowPipeline
# Creer un prototype de test
prototype = [0.1, 0.2, 0.3, 0.4, 0.5]
node = self._make_mock_node(
metadata={"_prototype_vector": prototype}
)
# Appeler _extract_node_vector en tant que methode non-bound
# (elle n'utilise que self pour logger)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
assert result is not None
assert isinstance(result, np.ndarray)
assert result.dtype == np.float32
np.testing.assert_array_almost_equal(result, np.array(prototype, dtype=np.float32))
def test_metadata_prototype_takes_priority(self):
"""v3 metadata._prototype_vector est prioritaire sur template."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
meta_proto = [1.0, 2.0, 3.0]
template_proto = [9.0, 8.0, 7.0]
mock_template = MagicMock()
mock_template.embedding_prototype = template_proto
node = self._make_mock_node(
metadata={"_prototype_vector": meta_proto},
template=mock_template,
)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
# Doit retourner le prototype metadata (prioritaire)
np.testing.assert_array_almost_equal(
result, np.array(meta_proto, dtype=np.float32)
)
def test_fallback_to_template_embedding_prototype(self):
"""v1 fallback : template.embedding_prototype en liste."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
template_proto = [0.5, 0.6, 0.7]
mock_template = MagicMock()
mock_template.embedding_prototype = template_proto
# Pas d'embedding.vector_id
mock_template.embedding = None
node = self._make_mock_node(
metadata={}, # Pas de _prototype_vector
template=mock_template,
)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
assert result is not None
np.testing.assert_array_almost_equal(
result, np.array(template_proto, dtype=np.float32)
)
def test_returns_none_when_no_vector(self):
"""Retourne None quand aucun vecteur n'est disponible."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
mock_template = MagicMock()
mock_template.embedding_prototype = None
mock_template.embedding = None
node = self._make_mock_node(
metadata={},
template=mock_template,
)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
assert result is None
def test_returns_none_when_no_metadata_and_no_template(self):
"""Retourne None quand le node n'a ni metadata ni template."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
node = self._make_mock_node(metadata={}, template=None)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
assert result is None
def test_handles_invalid_prototype_gracefully(self):
"""Ne plante pas si le prototype metadata est mal forme."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
node = self._make_mock_node(
metadata={"_prototype_vector": "not_a_list"},
)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
# Ne doit pas lever d'exception
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
# "not_a_list" n'est pas une liste, donc isinstance check echoue
# et le code passe au fallback (template)
# Puisque template est None, retourne None
assert result is None
def test_loads_vector_from_disk_v2(self, tmp_path):
"""v2 : prototype charge depuis disque via embedding.vector_id."""
from core.pipeline.workflow_pipeline import WorkflowPipeline
# Creer un fichier .npy sur disque
vec = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32)
vec_path = tmp_path / "prototype.npy"
np.save(str(vec_path), vec)
mock_embedding = MagicMock()
mock_embedding.vector_id = str(vec_path)
mock_template = MagicMock()
mock_template.embedding_prototype = None
mock_template.embedding = mock_embedding
node = self._make_mock_node(
metadata={},
template=mock_template,
)
pipeline_instance = MagicMock(spec=WorkflowPipeline)
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
assert result is not None
np.testing.assert_array_almost_equal(result, vec)
# =============================================================================
# 5. Integration end-to-end legers (SessionRecorder -> ScreenAnalyzer)
# =============================================================================
class TestSessionRecorderScreenAnalyzerIntegration:
"""
Verifier que les evenements et screenshots enregistres
sont exploitables par ScreenAnalyzer.
"""
def test_recorded_screenshot_can_be_analyzed(self, tmp_path):
"""Un screenshot enregistre par SessionRecorder est analysable par ScreenAnalyzer."""
from core.capture.session_recorder import SessionRecorder
from core.pipeline.screen_analyzer import ScreenAnalyzer
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_e2e")
# Simuler un screenshot sauvegarde
screenshots_dir = recorder._screenshots_dir
img = Image.new("RGB", (640, 480), color=(200, 100, 50))
img_path = screenshots_dir / "screen_0001.png"
img.save(str(img_path))
# Enregistrer dans la session
screenshot = Screenshot(
screenshot_id="ss_0001",
relative_path="screenshots/screen_0001.png",
captured_at=datetime.now().isoformat(),
)
recorder._session.add_screenshot(screenshot)
# Analyser avec ScreenAnalyzer
analyzer = ScreenAnalyzer(session_id="sess_e2e")
# Mocker les engines lourds
analyzer._ui_detector_initialized = True
analyzer._ui_detector = None
analyzer._ocr_initialized = True
analyzer._ocr = None
state = analyzer.analyze(str(img_path))
assert isinstance(state, ScreenState)
assert state.raw.file_size_bytes > 0
assert Path(state.raw.screenshot_path).exists()
recorder._running = False
class TestSessionRecorderEnvironment:
"""Verifier la collecte d'informations d'environnement."""
def test_get_environment_contains_os_info(self, tmp_path):
from core.capture.session_recorder import SessionRecorder
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
with patch.object(recorder, "_start_event_listener"):
with patch.object(recorder, "_ensure_screen_capturer"):
recorder.start(session_id="sess_env")
env = recorder._session.environment
assert "os" in env
assert "hostname" in env
assert env["os"] in ("linux", "windows", "darwin")
recorder._running = False