""" Tests d'integration Phase 0 - RPA Vision V3 Couvre les modules fondamentaux de la Phase 0 : - SessionRecorder (core/capture/session_recorder.py) - ScreenAnalyzer (core/pipeline/screen_analyzer.py) - EventListener (core/capture/event_listener.py) - GraphBuilder -> WorkflowPipeline connection (_extract_node_vector) Auteur : Dom, Claude - 11 mars 2026 """ import json import os import threading import time from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from unittest.mock import MagicMock, patch, PropertyMock import numpy as np import pytest from PIL import Image from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext from core.models.screen_state import ( ScreenState, RawLevel, PerceptionLevel, ContextLevel, WindowContext, EmbeddingRef, ) from core.models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures from core.models.base_models import BBox # ============================================================================= # Fixtures partagees # ============================================================================= @pytest.fixture def sample_raw_event(): """Evenement brut au format EventListener.""" return { "t": 1.234, "type": "mouse_click", "button": "left", "pos": [500, 300], "window": {"title": "Test Window", "app_name": "test_app"}, "screenshot_id": None, } @pytest.fixture def sample_session(tmp_path): """RawSession minimale avec screenshots.""" session = RawSession( session_id="test_session_001", agent_version="rpa_vision_v3", environment={"os": "linux", "screen": {"primary_resolution": [1920, 1080]}}, user={"id": "tester"}, context={"workflow": "test_workflow", "tags": []}, started_at=datetime.now(), ) return session @pytest.fixture def test_image_path(tmp_path): """Creer une image PNG de test et retourner son chemin.""" img = Image.new("RGB", (200, 100), color=(70, 130, 180)) path = tmp_path / "test_screenshot.png" img.save(str(path)) return str(path) @pytest.fixture def test_image_with_shapes(tmp_path): """Creer une image PNG plus elaboree avec formes geometriques.""" img = Image.new("RGB", (800, 600), color=(240, 240, 240)) # Ajouter un rectangle (simule un bouton) from PIL import ImageDraw draw = ImageDraw.Draw(img) draw.rectangle([50, 50, 200, 90], fill=(0, 120, 215)) draw.rectangle([50, 120, 300, 160], fill=(255, 255, 255), outline=(180, 180, 180)) draw.rectangle([50, 200, 150, 240], fill=(76, 175, 80)) path = tmp_path / "test_ui_screenshot.png" img.save(str(path)) return str(path) # ============================================================================= # 1. SessionRecorder # ============================================================================= class TestSessionRecorderDirectoryStructure: """Verifier que SessionRecorder cree la bonne arborescence de repertoires.""" def test_start_creates_session_directory(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) # Mocker EventListener pour eviter la dependance pynput with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): session_id = recorder.start( workflow_name="test_wf", session_id="sess_test_001" ) assert session_id == "sess_test_001" # Verifier la structure : output_dir / session_id / session_id / screenshots session_dir = tmp_path / "sessions" / "sess_test_001" screenshots_dir = session_dir / "sess_test_001" / "screenshots" assert session_dir.exists(), f"Session dir missing: {session_dir}" assert screenshots_dir.exists(), f"Screenshots dir missing: {screenshots_dir}" # Nettoyer recorder._running = False def test_start_generates_session_id_when_none(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): session_id = recorder.start(workflow_name="auto_id_wf") assert session_id.startswith("session_") assert len(session_id) > len("session_") recorder._running = False class TestSessionRecorderRawSession: """Verifier que SessionRecorder produit une RawSession valide.""" def test_produces_valid_raw_session(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start( workflow_name="valid_session", session_id="sess_valid" ) session = recorder._session assert session is not None assert isinstance(session, RawSession) assert session.session_id == "sess_valid" assert session.agent_version == "rpa_vision_v3" assert session.schema_version == "rawsession_v1" assert session.started_at is not None assert isinstance(session.started_at, datetime) assert session.environment.get("os") is not None assert session.user.get("id") is not None assert session.context.get("workflow") == "valid_session" recorder._running = False def test_stop_sets_ended_at_and_saves_json(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(workflow_name="stop_test", session_id="sess_stop") session = recorder.stop() assert session.ended_at is not None assert isinstance(session.ended_at, datetime) assert session.ended_at >= session.started_at # Verifier que le fichier JSON est cree json_path = tmp_path / "sessions" / "sess_stop" / "sess_stop.json" assert json_path.exists(), f"Session JSON missing: {json_path}" # Verifier que le JSON est valide et deserialisable with open(json_path, "r") as f: data = json.load(f) assert data["session_id"] == "sess_stop" assert data["schema_version"] == "rawsession_v1" class TestSessionRecorderLifecycle: """Verifier le cycle de vie start/stop.""" def test_is_running_property(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) assert recorder.is_running is False with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_lifecycle") assert recorder.is_running is True recorder.stop() assert recorder.is_running is False def test_double_start_returns_existing_session_id(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): first_id = recorder.start(session_id="sess_double") second_id = recorder.start(session_id="sess_other") assert first_id == "sess_double" assert second_id == "sess_double" # Doit retourner l'existant recorder._running = False def test_stop_without_start(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) # stop() sur un recorder non demarre ne doit pas planter result = recorder.stop() assert result is None # _session n'est pas initialise class TestSessionRecorderEvents: """Verifier l'enregistrement des evenements via callback.""" def test_on_raw_event_records_event(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_events") # Simuler un evenement via le callback interne raw_event = { "t": 0.5, "type": "key_press", "keys": ["a"], "window": {"title": "Editor", "app_name": "vim"}, } recorder._on_raw_event(raw_event) assert recorder.event_count == 1 assert recorder._session.events[0].type == "key_press" assert recorder._session.events[0].t == 0.5 assert recorder._session.events[0].window.title == "Editor" recorder._running = False def test_mouse_click_triggers_screenshot(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder( output_dir=str(tmp_path / "sessions"), screenshot_on_click=True, ) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_click_ss") # Mocker _take_screenshot pour retourner un ID with patch.object(recorder, "_take_screenshot", return_value="ss_0001") as mock_ss: raw_click = { "t": 1.0, "type": "mouse_click", "button": "left", "pos": [100, 200], "window": {"title": "App", "app_name": "app"}, } recorder._on_raw_event(raw_click) mock_ss.assert_called_once() assert recorder._session.events[0].screenshot_id == "ss_0001" recorder._running = False def test_on_event_callback_called(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) callback_received = [] on_event_fn = lambda e: callback_received.append(e) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_cb", on_event=on_event_fn) raw_event = { "t": 0.1, "type": "key_press", "keys": ["Enter"], "window": {"title": "T", "app_name": "a"}, } recorder._on_raw_event(raw_event) assert len(callback_received) == 1 assert callback_received[0]["type"] == "key_press" recorder._running = False class TestSessionRecorderScreenshots: """Verifier la sauvegarde des screenshots via _take_screenshot.""" def test_take_screenshot_saves_file(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_screenshot") # Creer un mock de ScreenCapturer fake_frame = np.zeros((100, 200, 3), dtype=np.uint8) mock_capturer = MagicMock() mock_capturer.capture_frame.return_value = fake_frame mock_capturer.save_frame.side_effect = lambda frame, path: Image.fromarray(frame).save(path) recorder._screen_capturer = mock_capturer screenshot_id = recorder._take_screenshot() assert screenshot_id == "ss_0001" assert recorder.screenshot_count == 1 # Verifier que le screenshot est enregistre dans la session ss = recorder._session.screenshots[0] assert ss.screenshot_id == "ss_0001" assert "screenshots/" in ss.relative_path assert ss.captured_at is not None recorder._running = False def test_take_screenshot_returns_none_without_capturer(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_no_capturer") # Pas de screen_capturer = _take_screenshot retourne None # Il faut aussi mocker _ensure_screen_capturer pour empecher la reinit lazy recorder._screen_capturer = None with patch.object(recorder, "_ensure_screen_capturer"): result = recorder._take_screenshot() assert result is None recorder._running = False def test_take_screenshot_handles_capture_failure(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_fail") mock_capturer = MagicMock() mock_capturer.capture_frame.return_value = None recorder._screen_capturer = mock_capturer result = recorder._take_screenshot() assert result is None recorder._running = False # ============================================================================= # 2. ScreenAnalyzer # ============================================================================= class TestScreenAnalyzerBuildScreenState: """Verifier la construction d'un ScreenState complet 4 niveaux.""" def test_analyze_builds_complete_screen_state(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer # Creer un ScreenAnalyzer sans OCR ni UIDetector analyzer = ScreenAnalyzer( ui_detector=None, ocr_engine=None, session_id="test_session", ) state = analyzer.analyze( screenshot_path=test_image_path, window_info={"title": "Test App", "app_name": "test"}, ) # Verifier le type assert isinstance(state, ScreenState) # Niveau 1 : Raw assert state.raw is not None assert state.raw.screenshot_path == test_image_path assert state.raw.capture_method == "mss" assert state.raw.file_size_bytes > 0 # Niveau 2 : Perception assert state.perception is not None assert isinstance(state.perception.detected_text, list) assert state.perception.embedding is not None assert state.perception.embedding.dimensions == 512 # Niveau 3 : UI elements (vide sans detecteur) assert isinstance(state.ui_elements, list) # Niveau 4 : Contexte assert state.context is not None assert state.window is not None assert state.window.app_name == "test" assert state.window.window_title == "Test App" # Metadata assert "analyzer_version" in state.metadata assert state.screen_state_id.startswith("test_session_state_") def test_analyze_with_default_window_info(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="default_win") state = analyzer.analyze(screenshot_path=test_image_path) assert state.window.app_name == "unknown" assert state.window.window_title == "Unknown" assert state.window.screen_resolution == [1920, 1080] def test_analyze_increments_state_counter(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="counter") state1 = analyzer.analyze(test_image_path) state2 = analyzer.analyze(test_image_path) assert state1.screen_state_id == "counter_state_0001" assert state2.screen_state_id == "counter_state_0002" def test_analyze_image_from_pil(self, tmp_path): from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="pil_test") img = Image.new("RGB", (320, 240), color=(100, 200, 50)) save_dir = str(tmp_path / "screens") state = analyzer.analyze_image(img, save_dir=save_dir) assert isinstance(state, ScreenState) assert Path(state.raw.screenshot_path).exists() assert state.raw.file_size_bytes > 0 class TestScreenAnalyzerOCRFallback: """Verifier le fallback OCR quand aucun moteur n'est disponible.""" def test_no_ocr_engine_returns_empty_text(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer # Forcer l'echec de tous les moteurs OCR analyzer = ScreenAnalyzer(session_id="no_ocr") # Mocker les createurs OCR pour qu'ils echouent with patch.object( analyzer, "_create_doctr_ocr", side_effect=ImportError("doctr not installed") ): with patch.object( analyzer, "_create_tesseract_ocr", side_effect=ImportError("tesseract not installed"), ): state = analyzer.analyze(test_image_path) assert state.perception.detected_text == [] assert state.perception.confidence_avg == 0.0 def test_ocr_method_name_none_when_no_engine(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="method_name") with patch.object( analyzer, "_create_doctr_ocr", side_effect=ImportError("no doctr") ): with patch.object( analyzer, "_create_tesseract_ocr", side_effect=ImportError("no tesseract"), ): state = analyzer.analyze(test_image_path) assert state.perception.text_detection_method == "none" def test_ocr_exception_returns_empty_text(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="ocr_fail") # Simuler un moteur OCR qui plante a l'appel def failing_ocr(path): raise RuntimeError("OCR crashed") analyzer._ocr = failing_ocr analyzer._ocr_initialized = True state = analyzer.analyze(test_image_path) assert state.perception.detected_text == [] class TestScreenAnalyzerUIDetector: """Verifier la gestion d'erreurs du UIDetector.""" def test_ui_detector_failure_returns_empty_elements(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer mock_detector = MagicMock() mock_detector.detect.side_effect = RuntimeError("Detector crash") analyzer = ScreenAnalyzer( ui_detector=mock_detector, session_id="detector_fail", ) state = analyzer.analyze(test_image_path) assert state.ui_elements == [] assert state.metadata["ui_elements_count"] == 0 def test_ui_detector_returns_elements(self, test_image_with_shapes): from core.pipeline.screen_analyzer import ScreenAnalyzer # Creer de faux elements UI mock_elements = [ UIElement( element_id="btn_001", type="button", role="primary_action", bbox=BBox(x=50, y=50, width=150, height=40), center=(125, 70), label="OK", label_confidence=0.95, embeddings=UIElementEmbeddings(), visual_features=VisualFeatures( dominant_color="blue", has_icon=False, shape="rectangle", size_category="medium", ), confidence=0.9, ) ] mock_detector = MagicMock() mock_detector.detect.return_value = mock_elements analyzer = ScreenAnalyzer( ui_detector=mock_detector, session_id="with_elements", ) state = analyzer.analyze(test_image_with_shapes) assert len(state.ui_elements) == 1 assert state.ui_elements[0].element_id == "btn_001" assert state.metadata["ui_elements_count"] == 1 def test_no_ui_detector_returns_empty_elements(self, test_image_path): from core.pipeline.screen_analyzer import ScreenAnalyzer # Mocker _ensure_ui_detector pour qu'il ne fasse rien (pas de detecteur) analyzer = ScreenAnalyzer(ui_detector=None, session_id="no_detector") analyzer._ui_detector_initialized = True # Empecher l'init lazy analyzer._ui_detector = None state = analyzer.analyze(test_image_path) assert state.ui_elements == [] # ============================================================================= # 3. EventListener # ============================================================================= class TestEventListenerDefinition: """Verifier que EventListener peut etre defini meme sans pynput.""" def test_class_is_importable(self): """Le module est importable meme si pynput est absent.""" # On importe le module — il gere l'absence de pynput gracieusement from core.capture import event_listener assert hasattr(event_listener, "EventListener") assert hasattr(event_listener, "PYNPUT_AVAILABLE") def test_pynput_available_flag_exists(self): from core.capture.event_listener import PYNPUT_AVAILABLE assert isinstance(PYNPUT_AVAILABLE, bool) def test_init_raises_import_error_without_pynput(self): """Si pynput n'est pas disponible, __init__ doit lever ImportError.""" from core.capture import event_listener original_flag = event_listener.PYNPUT_AVAILABLE try: # Simuler l'absence de pynput event_listener.PYNPUT_AVAILABLE = False with pytest.raises(ImportError, match="pynput"): event_listener.EventListener() finally: # Restaurer la valeur originale event_listener.PYNPUT_AVAILABLE = original_flag def test_init_does_not_raise_with_pynput(self): """Si pynput est disponible, __init__ ne doit pas lever d'erreur.""" from core.capture import event_listener if not event_listener.PYNPUT_AVAILABLE: pytest.skip("pynput non disponible, impossible de tester l'init normal") listener = event_listener.EventListener() assert listener is not None assert listener.is_running is False # ============================================================================= # 4. GraphBuilder -> WorkflowPipeline connection (_extract_node_vector) # ============================================================================= class TestExtractNodeVector: """ Verifier que _extract_node_vector dans WorkflowPipeline lit correctement le prototype depuis node.metadata["_prototype_vector"]. """ def _make_mock_node(self, metadata=None, template=None): """Creer un mock de WorkflowNode.""" node = MagicMock() node.metadata = metadata or {} node.template = template return node def test_reads_prototype_from_metadata(self): """v3 : prototype dans metadata._prototype_vector.""" # Nous importons et instancions uniquement _extract_node_vector # en mockant le constructeur lourd de WorkflowPipeline. from core.pipeline.workflow_pipeline import WorkflowPipeline # Creer un prototype de test prototype = [0.1, 0.2, 0.3, 0.4, 0.5] node = self._make_mock_node( metadata={"_prototype_vector": prototype} ) # Appeler _extract_node_vector en tant que methode non-bound # (elle n'utilise que self pour logger) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) assert result is not None assert isinstance(result, np.ndarray) assert result.dtype == np.float32 np.testing.assert_array_almost_equal(result, np.array(prototype, dtype=np.float32)) def test_metadata_prototype_takes_priority(self): """v3 metadata._prototype_vector est prioritaire sur template.""" from core.pipeline.workflow_pipeline import WorkflowPipeline meta_proto = [1.0, 2.0, 3.0] template_proto = [9.0, 8.0, 7.0] mock_template = MagicMock() mock_template.embedding_prototype = template_proto node = self._make_mock_node( metadata={"_prototype_vector": meta_proto}, template=mock_template, ) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) # Doit retourner le prototype metadata (prioritaire) np.testing.assert_array_almost_equal( result, np.array(meta_proto, dtype=np.float32) ) def test_fallback_to_template_embedding_prototype(self): """v1 fallback : template.embedding_prototype en liste.""" from core.pipeline.workflow_pipeline import WorkflowPipeline template_proto = [0.5, 0.6, 0.7] mock_template = MagicMock() mock_template.embedding_prototype = template_proto # Pas d'embedding.vector_id mock_template.embedding = None node = self._make_mock_node( metadata={}, # Pas de _prototype_vector template=mock_template, ) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) assert result is not None np.testing.assert_array_almost_equal( result, np.array(template_proto, dtype=np.float32) ) def test_returns_none_when_no_vector(self): """Retourne None quand aucun vecteur n'est disponible.""" from core.pipeline.workflow_pipeline import WorkflowPipeline mock_template = MagicMock() mock_template.embedding_prototype = None mock_template.embedding = None node = self._make_mock_node( metadata={}, template=mock_template, ) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) assert result is None def test_returns_none_when_no_metadata_and_no_template(self): """Retourne None quand le node n'a ni metadata ni template.""" from core.pipeline.workflow_pipeline import WorkflowPipeline node = self._make_mock_node(metadata={}, template=None) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) assert result is None def test_handles_invalid_prototype_gracefully(self): """Ne plante pas si le prototype metadata est mal forme.""" from core.pipeline.workflow_pipeline import WorkflowPipeline node = self._make_mock_node( metadata={"_prototype_vector": "not_a_list"}, ) pipeline_instance = MagicMock(spec=WorkflowPipeline) # Ne doit pas lever d'exception result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) # "not_a_list" n'est pas une liste, donc isinstance check echoue # et le code passe au fallback (template) # Puisque template est None, retourne None assert result is None def test_loads_vector_from_disk_v2(self, tmp_path): """v2 : prototype charge depuis disque via embedding.vector_id.""" from core.pipeline.workflow_pipeline import WorkflowPipeline # Creer un fichier .npy sur disque vec = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32) vec_path = tmp_path / "prototype.npy" np.save(str(vec_path), vec) mock_embedding = MagicMock() mock_embedding.vector_id = str(vec_path) mock_template = MagicMock() mock_template.embedding_prototype = None mock_template.embedding = mock_embedding node = self._make_mock_node( metadata={}, template=mock_template, ) pipeline_instance = MagicMock(spec=WorkflowPipeline) result = WorkflowPipeline._extract_node_vector(pipeline_instance, node) assert result is not None np.testing.assert_array_almost_equal(result, vec) # ============================================================================= # 5. Integration end-to-end legers (SessionRecorder -> ScreenAnalyzer) # ============================================================================= class TestSessionRecorderScreenAnalyzerIntegration: """ Verifier que les evenements et screenshots enregistres sont exploitables par ScreenAnalyzer. """ def test_recorded_screenshot_can_be_analyzed(self, tmp_path): """Un screenshot enregistre par SessionRecorder est analysable par ScreenAnalyzer.""" from core.capture.session_recorder import SessionRecorder from core.pipeline.screen_analyzer import ScreenAnalyzer recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_e2e") # Simuler un screenshot sauvegarde screenshots_dir = recorder._screenshots_dir img = Image.new("RGB", (640, 480), color=(200, 100, 50)) img_path = screenshots_dir / "screen_0001.png" img.save(str(img_path)) # Enregistrer dans la session screenshot = Screenshot( screenshot_id="ss_0001", relative_path="screenshots/screen_0001.png", captured_at=datetime.now().isoformat(), ) recorder._session.add_screenshot(screenshot) # Analyser avec ScreenAnalyzer analyzer = ScreenAnalyzer(session_id="sess_e2e") # Mocker les engines lourds analyzer._ui_detector_initialized = True analyzer._ui_detector = None analyzer._ocr_initialized = True analyzer._ocr = None state = analyzer.analyze(str(img_path)) assert isinstance(state, ScreenState) assert state.raw.file_size_bytes > 0 assert Path(state.raw.screenshot_path).exists() recorder._running = False class TestSessionRecorderEnvironment: """Verifier la collecte d'informations d'environnement.""" def test_get_environment_contains_os_info(self, tmp_path): from core.capture.session_recorder import SessionRecorder recorder = SessionRecorder(output_dir=str(tmp_path / "sessions")) with patch.object(recorder, "_start_event_listener"): with patch.object(recorder, "_ensure_screen_capturer"): recorder.start(session_id="sess_env") env = recorder._session.environment assert "os" in env assert "hostname" in env assert env["os"] in ("linux", "windows", "darwin") recorder._running = False