Refonte majeure du système Agent Chat et ajout de nombreux modules : - Chat unifié : suppression du dual Workflows/Agent Libre, tout passe par /api/chat avec résolution en 3 niveaux (workflow → geste → "montre-moi") - GestureCatalog : 38 raccourcis clavier universels Windows avec matching sémantique, substitution automatique dans les replays, et endpoint /api/gestures - Mode Copilot : exécution pas-à-pas des workflows avec validation humaine via WebSocket (approve/skip/abort) avant chaque action - Léa UI (agent_v0/lea_ui/) : interface PyQt5 pour Windows avec overlay transparent pour feedback visuel pendant le replay - Data Extraction (core/extraction/) : moteur d'extraction visuelle de données (OCR + VLM → SQLite), avec schémas YAML et export CSV/Excel - ReplayVerifier (agent_v0/server_v1/) : vérification post-action par comparaison de screenshots, avec logique de retry (max 3) - IntentParser durci : meilleur fallback regex, type GREETING, patterns améliorés - Dashboard : nouvelles pages gestures, streaming, extractions - Tests : 63 tests GestureCatalog, 47 tests extraction, corrections tests existants - Dépréciation : /api/agent/plan et /api/agent/execute retournent HTTP 410, suppression du code hardcodé _plan_to_replay_actions Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
884 lines
31 KiB
Python
884 lines
31 KiB
Python
"""
|
|
Tests d'integration Phase 0 - RPA Vision V3
|
|
|
|
Couvre les modules fondamentaux de la Phase 0 :
|
|
- SessionRecorder (core/capture/session_recorder.py)
|
|
- ScreenAnalyzer (core/pipeline/screen_analyzer.py)
|
|
- EventListener (core/capture/event_listener.py)
|
|
- GraphBuilder -> WorkflowPipeline connection (_extract_node_vector)
|
|
|
|
Auteur : Dom, Claude - 11 mars 2026
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
from unittest.mock import MagicMock, patch, PropertyMock
|
|
|
|
import numpy as np
|
|
import pytest
|
|
from PIL import Image
|
|
|
|
from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext
|
|
from core.models.screen_state import (
|
|
ScreenState,
|
|
RawLevel,
|
|
PerceptionLevel,
|
|
ContextLevel,
|
|
WindowContext,
|
|
EmbeddingRef,
|
|
)
|
|
from core.models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures
|
|
from core.models.base_models import BBox
|
|
|
|
|
|
# =============================================================================
|
|
# Fixtures partagees
|
|
# =============================================================================
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_raw_event():
|
|
"""Evenement brut au format EventListener."""
|
|
return {
|
|
"t": 1.234,
|
|
"type": "mouse_click",
|
|
"button": "left",
|
|
"pos": [500, 300],
|
|
"window": {"title": "Test Window", "app_name": "test_app"},
|
|
"screenshot_id": None,
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_session(tmp_path):
|
|
"""RawSession minimale avec screenshots."""
|
|
session = RawSession(
|
|
session_id="test_session_001",
|
|
agent_version="rpa_vision_v3",
|
|
environment={"os": "linux", "screen": {"primary_resolution": [1920, 1080]}},
|
|
user={"id": "tester"},
|
|
context={"workflow": "test_workflow", "tags": []},
|
|
started_at=datetime.now(),
|
|
)
|
|
return session
|
|
|
|
|
|
@pytest.fixture
|
|
def test_image_path(tmp_path):
|
|
"""Creer une image PNG de test et retourner son chemin."""
|
|
img = Image.new("RGB", (200, 100), color=(70, 130, 180))
|
|
path = tmp_path / "test_screenshot.png"
|
|
img.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
@pytest.fixture
|
|
def test_image_with_shapes(tmp_path):
|
|
"""Creer une image PNG plus elaboree avec formes geometriques."""
|
|
img = Image.new("RGB", (800, 600), color=(240, 240, 240))
|
|
# Ajouter un rectangle (simule un bouton)
|
|
from PIL import ImageDraw
|
|
|
|
draw = ImageDraw.Draw(img)
|
|
draw.rectangle([50, 50, 200, 90], fill=(0, 120, 215))
|
|
draw.rectangle([50, 120, 300, 160], fill=(255, 255, 255), outline=(180, 180, 180))
|
|
draw.rectangle([50, 200, 150, 240], fill=(76, 175, 80))
|
|
path = tmp_path / "test_ui_screenshot.png"
|
|
img.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
# =============================================================================
|
|
# 1. SessionRecorder
|
|
# =============================================================================
|
|
|
|
|
|
class TestSessionRecorderDirectoryStructure:
|
|
"""Verifier que SessionRecorder cree la bonne arborescence de repertoires."""
|
|
|
|
def test_start_creates_session_directory(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
# Mocker EventListener pour eviter la dependance pynput
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
session_id = recorder.start(
|
|
workflow_name="test_wf", session_id="sess_test_001"
|
|
)
|
|
|
|
assert session_id == "sess_test_001"
|
|
|
|
# Verifier la structure : output_dir / session_id / session_id / screenshots
|
|
session_dir = tmp_path / "sessions" / "sess_test_001"
|
|
screenshots_dir = session_dir / "sess_test_001" / "screenshots"
|
|
assert session_dir.exists(), f"Session dir missing: {session_dir}"
|
|
assert screenshots_dir.exists(), f"Screenshots dir missing: {screenshots_dir}"
|
|
|
|
# Nettoyer
|
|
recorder._running = False
|
|
|
|
def test_start_generates_session_id_when_none(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
session_id = recorder.start(workflow_name="auto_id_wf")
|
|
|
|
assert session_id.startswith("session_")
|
|
assert len(session_id) > len("session_")
|
|
|
|
recorder._running = False
|
|
|
|
|
|
class TestSessionRecorderRawSession:
|
|
"""Verifier que SessionRecorder produit une RawSession valide."""
|
|
|
|
def test_produces_valid_raw_session(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(
|
|
workflow_name="valid_session", session_id="sess_valid"
|
|
)
|
|
|
|
session = recorder._session
|
|
assert session is not None
|
|
assert isinstance(session, RawSession)
|
|
assert session.session_id == "sess_valid"
|
|
assert session.agent_version == "rpa_vision_v3"
|
|
assert session.schema_version == "rawsession_v1"
|
|
assert session.started_at is not None
|
|
assert isinstance(session.started_at, datetime)
|
|
assert session.environment.get("os") is not None
|
|
assert session.user.get("id") is not None
|
|
assert session.context.get("workflow") == "valid_session"
|
|
|
|
recorder._running = False
|
|
|
|
def test_stop_sets_ended_at_and_saves_json(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(workflow_name="stop_test", session_id="sess_stop")
|
|
|
|
session = recorder.stop()
|
|
|
|
assert session.ended_at is not None
|
|
assert isinstance(session.ended_at, datetime)
|
|
assert session.ended_at >= session.started_at
|
|
|
|
# Verifier que le fichier JSON est cree
|
|
json_path = tmp_path / "sessions" / "sess_stop" / "sess_stop.json"
|
|
assert json_path.exists(), f"Session JSON missing: {json_path}"
|
|
|
|
# Verifier que le JSON est valide et deserialisable
|
|
with open(json_path, "r") as f:
|
|
data = json.load(f)
|
|
assert data["session_id"] == "sess_stop"
|
|
assert data["schema_version"] == "rawsession_v1"
|
|
|
|
|
|
class TestSessionRecorderLifecycle:
|
|
"""Verifier le cycle de vie start/stop."""
|
|
|
|
def test_is_running_property(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
assert recorder.is_running is False
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_lifecycle")
|
|
|
|
assert recorder.is_running is True
|
|
|
|
recorder.stop()
|
|
assert recorder.is_running is False
|
|
|
|
def test_double_start_returns_existing_session_id(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
first_id = recorder.start(session_id="sess_double")
|
|
second_id = recorder.start(session_id="sess_other")
|
|
|
|
assert first_id == "sess_double"
|
|
assert second_id == "sess_double" # Doit retourner l'existant
|
|
|
|
recorder._running = False
|
|
|
|
def test_stop_without_start(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
# stop() sur un recorder non demarre ne doit pas planter
|
|
result = recorder.stop()
|
|
assert result is None # _session n'est pas initialise
|
|
|
|
|
|
class TestSessionRecorderEvents:
|
|
"""Verifier l'enregistrement des evenements via callback."""
|
|
|
|
def test_on_raw_event_records_event(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_events")
|
|
|
|
# Simuler un evenement via le callback interne
|
|
raw_event = {
|
|
"t": 0.5,
|
|
"type": "key_press",
|
|
"keys": ["a"],
|
|
"window": {"title": "Editor", "app_name": "vim"},
|
|
}
|
|
recorder._on_raw_event(raw_event)
|
|
|
|
assert recorder.event_count == 1
|
|
assert recorder._session.events[0].type == "key_press"
|
|
assert recorder._session.events[0].t == 0.5
|
|
assert recorder._session.events[0].window.title == "Editor"
|
|
|
|
recorder._running = False
|
|
|
|
def test_mouse_click_triggers_screenshot(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(
|
|
output_dir=str(tmp_path / "sessions"),
|
|
screenshot_on_click=True,
|
|
)
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_click_ss")
|
|
|
|
# Mocker _take_screenshot pour retourner un ID
|
|
with patch.object(recorder, "_take_screenshot", return_value="ss_0001") as mock_ss:
|
|
raw_click = {
|
|
"t": 1.0,
|
|
"type": "mouse_click",
|
|
"button": "left",
|
|
"pos": [100, 200],
|
|
"window": {"title": "App", "app_name": "app"},
|
|
}
|
|
recorder._on_raw_event(raw_click)
|
|
|
|
mock_ss.assert_called_once()
|
|
assert recorder._session.events[0].screenshot_id == "ss_0001"
|
|
|
|
recorder._running = False
|
|
|
|
def test_on_event_callback_called(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
callback_received = []
|
|
on_event_fn = lambda e: callback_received.append(e)
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_cb", on_event=on_event_fn)
|
|
|
|
raw_event = {
|
|
"t": 0.1,
|
|
"type": "key_press",
|
|
"keys": ["Enter"],
|
|
"window": {"title": "T", "app_name": "a"},
|
|
}
|
|
recorder._on_raw_event(raw_event)
|
|
|
|
assert len(callback_received) == 1
|
|
assert callback_received[0]["type"] == "key_press"
|
|
|
|
recorder._running = False
|
|
|
|
|
|
class TestSessionRecorderScreenshots:
|
|
"""Verifier la sauvegarde des screenshots via _take_screenshot."""
|
|
|
|
def test_take_screenshot_saves_file(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_screenshot")
|
|
|
|
# Creer un mock de ScreenCapturer
|
|
fake_frame = np.zeros((100, 200, 3), dtype=np.uint8)
|
|
mock_capturer = MagicMock()
|
|
mock_capturer.capture_frame.return_value = fake_frame
|
|
mock_capturer.save_frame.side_effect = lambda frame, path: Image.fromarray(frame).save(path)
|
|
|
|
recorder._screen_capturer = mock_capturer
|
|
|
|
screenshot_id = recorder._take_screenshot()
|
|
|
|
assert screenshot_id == "ss_0001"
|
|
assert recorder.screenshot_count == 1
|
|
|
|
# Verifier que le screenshot est enregistre dans la session
|
|
ss = recorder._session.screenshots[0]
|
|
assert ss.screenshot_id == "ss_0001"
|
|
assert "screenshots/" in ss.relative_path
|
|
assert ss.captured_at is not None
|
|
|
|
recorder._running = False
|
|
|
|
def test_take_screenshot_returns_none_without_capturer(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_no_capturer")
|
|
|
|
# Pas de screen_capturer = _take_screenshot retourne None
|
|
# Il faut aussi mocker _ensure_screen_capturer pour empecher la reinit lazy
|
|
recorder._screen_capturer = None
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
result = recorder._take_screenshot()
|
|
assert result is None
|
|
|
|
recorder._running = False
|
|
|
|
def test_take_screenshot_handles_capture_failure(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_fail")
|
|
|
|
mock_capturer = MagicMock()
|
|
mock_capturer.capture_frame.return_value = None
|
|
recorder._screen_capturer = mock_capturer
|
|
|
|
result = recorder._take_screenshot()
|
|
assert result is None
|
|
|
|
recorder._running = False
|
|
|
|
|
|
# =============================================================================
|
|
# 2. ScreenAnalyzer
|
|
# =============================================================================
|
|
|
|
|
|
class TestScreenAnalyzerBuildScreenState:
|
|
"""Verifier la construction d'un ScreenState complet 4 niveaux."""
|
|
|
|
def test_analyze_builds_complete_screen_state(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
# Creer un ScreenAnalyzer sans OCR ni UIDetector
|
|
analyzer = ScreenAnalyzer(
|
|
ui_detector=None,
|
|
ocr_engine=None,
|
|
session_id="test_session",
|
|
)
|
|
|
|
state = analyzer.analyze(
|
|
screenshot_path=test_image_path,
|
|
window_info={"title": "Test App", "app_name": "test"},
|
|
)
|
|
|
|
# Verifier le type
|
|
assert isinstance(state, ScreenState)
|
|
|
|
# Niveau 1 : Raw
|
|
assert state.raw is not None
|
|
assert state.raw.screenshot_path == test_image_path
|
|
assert state.raw.capture_method == "mss"
|
|
assert state.raw.file_size_bytes > 0
|
|
|
|
# Niveau 2 : Perception
|
|
assert state.perception is not None
|
|
assert isinstance(state.perception.detected_text, list)
|
|
assert state.perception.embedding is not None
|
|
assert state.perception.embedding.dimensions == 512
|
|
|
|
# Niveau 3 : UI elements (vide sans detecteur)
|
|
assert isinstance(state.ui_elements, list)
|
|
|
|
# Niveau 4 : Contexte
|
|
assert state.context is not None
|
|
assert state.window is not None
|
|
assert state.window.app_name == "test"
|
|
assert state.window.window_title == "Test App"
|
|
|
|
# Metadata
|
|
assert "analyzer_version" in state.metadata
|
|
assert state.screen_state_id.startswith("test_session_state_")
|
|
|
|
def test_analyze_with_default_window_info(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
analyzer = ScreenAnalyzer(session_id="default_win")
|
|
state = analyzer.analyze(screenshot_path=test_image_path)
|
|
|
|
assert state.window.app_name == "unknown"
|
|
assert state.window.window_title == "Unknown"
|
|
assert state.window.screen_resolution == [1920, 1080]
|
|
|
|
def test_analyze_increments_state_counter(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
analyzer = ScreenAnalyzer(session_id="counter")
|
|
|
|
state1 = analyzer.analyze(test_image_path)
|
|
state2 = analyzer.analyze(test_image_path)
|
|
|
|
assert state1.screen_state_id == "counter_state_0001"
|
|
assert state2.screen_state_id == "counter_state_0002"
|
|
|
|
def test_analyze_image_from_pil(self, tmp_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
analyzer = ScreenAnalyzer(session_id="pil_test")
|
|
img = Image.new("RGB", (320, 240), color=(100, 200, 50))
|
|
|
|
save_dir = str(tmp_path / "screens")
|
|
state = analyzer.analyze_image(img, save_dir=save_dir)
|
|
|
|
assert isinstance(state, ScreenState)
|
|
assert Path(state.raw.screenshot_path).exists()
|
|
assert state.raw.file_size_bytes > 0
|
|
|
|
|
|
class TestScreenAnalyzerOCRFallback:
|
|
"""Verifier le fallback OCR quand aucun moteur n'est disponible."""
|
|
|
|
def test_no_ocr_engine_returns_empty_text(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
# Forcer l'echec de tous les moteurs OCR
|
|
analyzer = ScreenAnalyzer(session_id="no_ocr")
|
|
|
|
# Mocker les createurs OCR pour qu'ils echouent
|
|
with patch.object(
|
|
analyzer, "_create_doctr_ocr", side_effect=ImportError("doctr not installed")
|
|
):
|
|
with patch.object(
|
|
analyzer,
|
|
"_create_tesseract_ocr",
|
|
side_effect=ImportError("tesseract not installed"),
|
|
):
|
|
state = analyzer.analyze(test_image_path)
|
|
|
|
assert state.perception.detected_text == []
|
|
assert state.perception.confidence_avg == 0.0
|
|
|
|
def test_ocr_method_name_none_when_no_engine(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
analyzer = ScreenAnalyzer(session_id="method_name")
|
|
|
|
with patch.object(
|
|
analyzer, "_create_doctr_ocr", side_effect=ImportError("no doctr")
|
|
):
|
|
with patch.object(
|
|
analyzer,
|
|
"_create_tesseract_ocr",
|
|
side_effect=ImportError("no tesseract"),
|
|
):
|
|
state = analyzer.analyze(test_image_path)
|
|
|
|
assert state.perception.text_detection_method == "none"
|
|
|
|
def test_ocr_exception_returns_empty_text(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
analyzer = ScreenAnalyzer(session_id="ocr_fail")
|
|
|
|
# Simuler un moteur OCR qui plante a l'appel
|
|
def failing_ocr(path):
|
|
raise RuntimeError("OCR crashed")
|
|
|
|
analyzer._ocr = failing_ocr
|
|
analyzer._ocr_initialized = True
|
|
|
|
state = analyzer.analyze(test_image_path)
|
|
assert state.perception.detected_text == []
|
|
|
|
|
|
class TestScreenAnalyzerUIDetector:
|
|
"""Verifier la gestion d'erreurs du UIDetector."""
|
|
|
|
def test_ui_detector_failure_returns_empty_elements(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
mock_detector = MagicMock()
|
|
mock_detector.detect.side_effect = RuntimeError("Detector crash")
|
|
|
|
analyzer = ScreenAnalyzer(
|
|
ui_detector=mock_detector,
|
|
session_id="detector_fail",
|
|
)
|
|
|
|
state = analyzer.analyze(test_image_path)
|
|
|
|
assert state.ui_elements == []
|
|
assert state.metadata["ui_elements_count"] == 0
|
|
|
|
def test_ui_detector_returns_elements(self, test_image_with_shapes):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
# Creer de faux elements UI
|
|
mock_elements = [
|
|
UIElement(
|
|
element_id="btn_001",
|
|
type="button",
|
|
role="primary_action",
|
|
bbox=BBox(x=50, y=50, width=150, height=40),
|
|
center=(125, 70),
|
|
label="OK",
|
|
label_confidence=0.95,
|
|
embeddings=UIElementEmbeddings(),
|
|
visual_features=VisualFeatures(
|
|
dominant_color="blue",
|
|
has_icon=False,
|
|
shape="rectangle",
|
|
size_category="medium",
|
|
),
|
|
confidence=0.9,
|
|
)
|
|
]
|
|
|
|
mock_detector = MagicMock()
|
|
mock_detector.detect.return_value = mock_elements
|
|
|
|
analyzer = ScreenAnalyzer(
|
|
ui_detector=mock_detector,
|
|
session_id="with_elements",
|
|
)
|
|
|
|
state = analyzer.analyze(test_image_with_shapes)
|
|
|
|
assert len(state.ui_elements) == 1
|
|
assert state.ui_elements[0].element_id == "btn_001"
|
|
assert state.metadata["ui_elements_count"] == 1
|
|
|
|
def test_no_ui_detector_returns_empty_elements(self, test_image_path):
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
# Mocker _ensure_ui_detector pour qu'il ne fasse rien (pas de detecteur)
|
|
analyzer = ScreenAnalyzer(ui_detector=None, session_id="no_detector")
|
|
analyzer._ui_detector_initialized = True # Empecher l'init lazy
|
|
analyzer._ui_detector = None
|
|
|
|
state = analyzer.analyze(test_image_path)
|
|
assert state.ui_elements == []
|
|
|
|
|
|
# =============================================================================
|
|
# 3. EventListener
|
|
# =============================================================================
|
|
|
|
|
|
class TestEventListenerDefinition:
|
|
"""Verifier que EventListener peut etre defini meme sans pynput."""
|
|
|
|
def test_class_is_importable(self):
|
|
"""Le module est importable meme si pynput est absent."""
|
|
# On importe le module — il gere l'absence de pynput gracieusement
|
|
from core.capture import event_listener
|
|
|
|
assert hasattr(event_listener, "EventListener")
|
|
assert hasattr(event_listener, "PYNPUT_AVAILABLE")
|
|
|
|
def test_pynput_available_flag_exists(self):
|
|
from core.capture.event_listener import PYNPUT_AVAILABLE
|
|
|
|
assert isinstance(PYNPUT_AVAILABLE, bool)
|
|
|
|
def test_init_raises_import_error_without_pynput(self):
|
|
"""Si pynput n'est pas disponible, __init__ doit lever ImportError."""
|
|
from core.capture import event_listener
|
|
|
|
original_flag = event_listener.PYNPUT_AVAILABLE
|
|
|
|
try:
|
|
# Simuler l'absence de pynput
|
|
event_listener.PYNPUT_AVAILABLE = False
|
|
|
|
with pytest.raises(ImportError, match="pynput"):
|
|
event_listener.EventListener()
|
|
finally:
|
|
# Restaurer la valeur originale
|
|
event_listener.PYNPUT_AVAILABLE = original_flag
|
|
|
|
def test_init_does_not_raise_with_pynput(self):
|
|
"""Si pynput est disponible, __init__ ne doit pas lever d'erreur."""
|
|
from core.capture import event_listener
|
|
|
|
if not event_listener.PYNPUT_AVAILABLE:
|
|
pytest.skip("pynput non disponible, impossible de tester l'init normal")
|
|
|
|
listener = event_listener.EventListener()
|
|
assert listener is not None
|
|
assert listener.is_running is False
|
|
|
|
|
|
# =============================================================================
|
|
# 4. GraphBuilder -> WorkflowPipeline connection (_extract_node_vector)
|
|
# =============================================================================
|
|
|
|
|
|
class TestExtractNodeVector:
|
|
"""
|
|
Verifier que _extract_node_vector dans WorkflowPipeline
|
|
lit correctement le prototype depuis node.metadata["_prototype_vector"].
|
|
"""
|
|
|
|
def _make_mock_node(self, metadata=None, template=None):
|
|
"""Creer un mock de WorkflowNode."""
|
|
node = MagicMock()
|
|
node.metadata = metadata or {}
|
|
node.template = template
|
|
return node
|
|
|
|
def test_reads_prototype_from_metadata(self):
|
|
"""v3 : prototype dans metadata._prototype_vector."""
|
|
# Nous importons et instancions uniquement _extract_node_vector
|
|
# en mockant le constructeur lourd de WorkflowPipeline.
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
# Creer un prototype de test
|
|
prototype = [0.1, 0.2, 0.3, 0.4, 0.5]
|
|
|
|
node = self._make_mock_node(
|
|
metadata={"_prototype_vector": prototype}
|
|
)
|
|
|
|
# Appeler _extract_node_vector en tant que methode non-bound
|
|
# (elle n'utilise que self pour logger)
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
assert result is not None
|
|
assert isinstance(result, np.ndarray)
|
|
assert result.dtype == np.float32
|
|
np.testing.assert_array_almost_equal(result, np.array(prototype, dtype=np.float32))
|
|
|
|
def test_metadata_prototype_takes_priority(self):
|
|
"""v3 metadata._prototype_vector est prioritaire sur template."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
meta_proto = [1.0, 2.0, 3.0]
|
|
template_proto = [9.0, 8.0, 7.0]
|
|
|
|
mock_template = MagicMock()
|
|
mock_template.embedding_prototype = template_proto
|
|
|
|
node = self._make_mock_node(
|
|
metadata={"_prototype_vector": meta_proto},
|
|
template=mock_template,
|
|
)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
# Doit retourner le prototype metadata (prioritaire)
|
|
np.testing.assert_array_almost_equal(
|
|
result, np.array(meta_proto, dtype=np.float32)
|
|
)
|
|
|
|
def test_fallback_to_template_embedding_prototype(self):
|
|
"""v1 fallback : template.embedding_prototype en liste."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
template_proto = [0.5, 0.6, 0.7]
|
|
|
|
mock_template = MagicMock()
|
|
mock_template.embedding_prototype = template_proto
|
|
# Pas d'embedding.vector_id
|
|
mock_template.embedding = None
|
|
|
|
node = self._make_mock_node(
|
|
metadata={}, # Pas de _prototype_vector
|
|
template=mock_template,
|
|
)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
assert result is not None
|
|
np.testing.assert_array_almost_equal(
|
|
result, np.array(template_proto, dtype=np.float32)
|
|
)
|
|
|
|
def test_returns_none_when_no_vector(self):
|
|
"""Retourne None quand aucun vecteur n'est disponible."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
mock_template = MagicMock()
|
|
mock_template.embedding_prototype = None
|
|
mock_template.embedding = None
|
|
|
|
node = self._make_mock_node(
|
|
metadata={},
|
|
template=mock_template,
|
|
)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
assert result is None
|
|
|
|
def test_returns_none_when_no_metadata_and_no_template(self):
|
|
"""Retourne None quand le node n'a ni metadata ni template."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
node = self._make_mock_node(metadata={}, template=None)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
assert result is None
|
|
|
|
def test_handles_invalid_prototype_gracefully(self):
|
|
"""Ne plante pas si le prototype metadata est mal forme."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
node = self._make_mock_node(
|
|
metadata={"_prototype_vector": "not_a_list"},
|
|
)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
# Ne doit pas lever d'exception
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
# "not_a_list" n'est pas une liste, donc isinstance check echoue
|
|
# et le code passe au fallback (template)
|
|
# Puisque template est None, retourne None
|
|
assert result is None
|
|
|
|
def test_loads_vector_from_disk_v2(self, tmp_path):
|
|
"""v2 : prototype charge depuis disque via embedding.vector_id."""
|
|
from core.pipeline.workflow_pipeline import WorkflowPipeline
|
|
|
|
# Creer un fichier .npy sur disque
|
|
vec = np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32)
|
|
vec_path = tmp_path / "prototype.npy"
|
|
np.save(str(vec_path), vec)
|
|
|
|
mock_embedding = MagicMock()
|
|
mock_embedding.vector_id = str(vec_path)
|
|
|
|
mock_template = MagicMock()
|
|
mock_template.embedding_prototype = None
|
|
mock_template.embedding = mock_embedding
|
|
|
|
node = self._make_mock_node(
|
|
metadata={},
|
|
template=mock_template,
|
|
)
|
|
|
|
pipeline_instance = MagicMock(spec=WorkflowPipeline)
|
|
result = WorkflowPipeline._extract_node_vector(pipeline_instance, node)
|
|
|
|
assert result is not None
|
|
np.testing.assert_array_almost_equal(result, vec)
|
|
|
|
|
|
# =============================================================================
|
|
# 5. Integration end-to-end legers (SessionRecorder -> ScreenAnalyzer)
|
|
# =============================================================================
|
|
|
|
|
|
class TestSessionRecorderScreenAnalyzerIntegration:
|
|
"""
|
|
Verifier que les evenements et screenshots enregistres
|
|
sont exploitables par ScreenAnalyzer.
|
|
"""
|
|
|
|
def test_recorded_screenshot_can_be_analyzed(self, tmp_path):
|
|
"""Un screenshot enregistre par SessionRecorder est analysable par ScreenAnalyzer."""
|
|
from core.capture.session_recorder import SessionRecorder
|
|
from core.pipeline.screen_analyzer import ScreenAnalyzer
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_e2e")
|
|
|
|
# Simuler un screenshot sauvegarde
|
|
screenshots_dir = recorder._screenshots_dir
|
|
img = Image.new("RGB", (640, 480), color=(200, 100, 50))
|
|
img_path = screenshots_dir / "screen_0001.png"
|
|
img.save(str(img_path))
|
|
|
|
# Enregistrer dans la session
|
|
screenshot = Screenshot(
|
|
screenshot_id="ss_0001",
|
|
relative_path="screenshots/screen_0001.png",
|
|
captured_at=datetime.now().isoformat(),
|
|
)
|
|
recorder._session.add_screenshot(screenshot)
|
|
|
|
# Analyser avec ScreenAnalyzer
|
|
analyzer = ScreenAnalyzer(session_id="sess_e2e")
|
|
# Mocker les engines lourds
|
|
analyzer._ui_detector_initialized = True
|
|
analyzer._ui_detector = None
|
|
analyzer._ocr_initialized = True
|
|
analyzer._ocr = None
|
|
|
|
state = analyzer.analyze(str(img_path))
|
|
|
|
assert isinstance(state, ScreenState)
|
|
assert state.raw.file_size_bytes > 0
|
|
assert Path(state.raw.screenshot_path).exists()
|
|
|
|
recorder._running = False
|
|
|
|
|
|
class TestSessionRecorderEnvironment:
|
|
"""Verifier la collecte d'informations d'environnement."""
|
|
|
|
def test_get_environment_contains_os_info(self, tmp_path):
|
|
from core.capture.session_recorder import SessionRecorder
|
|
|
|
recorder = SessionRecorder(output_dir=str(tmp_path / "sessions"))
|
|
|
|
with patch.object(recorder, "_start_event_listener"):
|
|
with patch.object(recorder, "_ensure_screen_capturer"):
|
|
recorder.start(session_id="sess_env")
|
|
|
|
env = recorder._session.environment
|
|
assert "os" in env
|
|
assert "hostname" in env
|
|
assert env["os"] in ("linux", "windows", "darwin")
|
|
|
|
recorder._running = False
|