""" Tests unitaires de l'intégration vision-aware dans ExecutionLoop (C1). Couvre : - Construction d'un ScreenState enrichi via ScreenAnalyzer - Cache hit évite un second appel à analyzer.analyze - Timeout → mode dégradé persistant - enable_ui_detection=False + enable_ocr=False → fallback stub - StepResult contient bien les champs temps (ocr_ms, ui_ms, analyze_ms, cache_hit, degraded) - Singleton get_screen_analyzer partage bien l'instance """ from __future__ import annotations import time from datetime import datetime from pathlib import Path from unittest.mock import MagicMock, patch import pytest from PIL import Image from core.execution.execution_loop import ExecutionContext, ExecutionLoop, ExecutionMode, StepResult from core.models.screen_state import ( ContextLevel, EmbeddingRef, PerceptionLevel, RawLevel, ScreenState, WindowContext, ) from core.pipeline import ( get_screen_analyzer, get_screen_state_cache, reset_screen_analyzer, reset_screen_state_cache, ) # ----------------------------------------------------------------------------- # Fixtures # ----------------------------------------------------------------------------- @pytest.fixture(autouse=True) def reset_singletons(): """Réinitialiser les singletons entre chaque test.""" reset_screen_analyzer() reset_screen_state_cache() yield reset_screen_analyzer() reset_screen_state_cache() @pytest.fixture def screenshot(tmp_path): path = tmp_path / "shot.png" Image.new("RGB", (320, 240), color=(128, 128, 128)).save(str(path)) return str(path) def _make_state(session_id: str = "s1") -> ScreenState: return ScreenState( screen_state_id="sid", timestamp=datetime.now(), session_id=session_id, window=WindowContext( app_name="app", window_title="Title", screen_resolution=[1920, 1080] ), raw=RawLevel(screenshot_path="", capture_method="test", file_size_bytes=0), perception=PerceptionLevel( embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512), detected_text=["hello"], text_detection_method="test", confidence_avg=0.9, ), context=ContextLevel(), metadata={"ocr_ms": 123.0, "ui_ms": 45.0}, ui_elements=[], ) def _make_loop(screen_analyzer=None, **kwargs) -> ExecutionLoop: pipeline = MagicMock() # Mocker load_workflow pour éviter dépendance FS pipeline.load_workflow.return_value = None loop = ExecutionLoop( pipeline=pipeline, action_executor=MagicMock(), screen_capturer=MagicMock(), screen_analyzer=screen_analyzer, **kwargs, ) loop.context = ExecutionContext( workflow_id="wf1", execution_id="exec1", mode=ExecutionMode.AUTOMATIC, started_at=datetime.now(), ) return loop # ----------------------------------------------------------------------------- # Tests # ----------------------------------------------------------------------------- class TestVisionAwareBuild: def test_build_screen_state_uses_analyzer(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer) state, timings = loop._build_screen_state(screenshot) assert analyzer.analyze.called assert state.session_id == "s1" assert timings["cache_hit"] is False assert timings["ocr_ms"] == 123.0 assert timings["ui_ms"] == 45.0 assert timings["degraded"] is False def test_build_screen_state_cache_hit_on_second_call(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer) loop._build_screen_state(screenshot) loop._build_screen_state(screenshot) # Un seul appel à analyze grâce au cache assert analyzer.analyze.call_count == 1 def test_disabled_ui_and_ocr_returns_stub(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop( screen_analyzer=analyzer, enable_ui_detection=False, enable_ocr=False, ) state, timings = loop._build_screen_state(screenshot) # analyze ne doit PAS avoir été appelé analyzer.analyze.assert_not_called() assert timings["degraded"] is True assert state.perception.detected_text == [] assert state.ui_elements == [] def test_timeout_activates_degraded_mode(self, screenshot): """Si l'analyse dépasse analyze_timeout_ms, le loop bascule en dégradé.""" analyzer = MagicMock() def slow_analyze(*_args, **_kw): time.sleep(0.15) return _make_state() analyzer.analyze.side_effect = slow_analyze loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) # Premier appel → mesure timeout et active dégradé _, timings1 = loop._build_screen_state(screenshot) assert timings1["degraded"] is True assert loop._degraded_mode is True # Deuxième appel (autre screenshot pour éviter cache) → stub direct img2 = Path(screenshot).parent / "other.png" Image.new("RGB", (320, 240), color=(1, 2, 3)).save(str(img2)) _, timings2 = loop._build_screen_state(str(img2)) assert timings2["degraded"] is True # analyzer.analyze n'a pas été appelé une 2ème fois assert analyzer.analyze.call_count == 1 def test_analyzer_unavailable_returns_stub(self, screenshot): """Si get_screen_analyzer() renvoie None, fallback stub.""" loop = _make_loop(screen_analyzer=None) # Forcer _get_screen_analyzer à retourner None with patch.object(loop, "_get_screen_analyzer", return_value=None): state, timings = loop._build_screen_state(screenshot) assert timings["degraded"] is True assert state.ui_elements == [] def test_stub_when_all_flags_off(self, screenshot): loop = _make_loop(enable_ui_detection=False, enable_ocr=False) state, timings = loop._build_screen_state(screenshot) assert state.window.window_title == "Unknown" assert timings["degraded"] is True class TestWindowInfoProvider: def test_window_info_provider_is_used(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() provider = lambda: {"title": "Chrome", "app_name": "chrome"} loop = _make_loop(screen_analyzer=analyzer, window_info_provider=provider) loop._build_screen_state(screenshot) # Vérifier que window_info a bien été passé à analyze call_kwargs = analyzer.analyze.call_args.kwargs assert call_kwargs.get("window_info") == {"title": "Chrome", "app_name": "chrome"} def test_falls_back_to_screen_capturer(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer) loop.screen_capturer.get_active_window.return_value = { "title": "Firefox", "app": "firefox", "x": 0, "y": 0, "width": 800, "height": 600, } loop._build_screen_state(screenshot) call_kwargs = analyzer.analyze.call_args.kwargs wi = call_kwargs.get("window_info") assert wi is not None assert wi["title"] == "Firefox" assert wi["app_name"] == "firefox" class TestDegradedModeRecovery: """Tâche 2 — Auto-rétablissement du mode dégradé après steps rapides.""" def test_fast_steps_counter_resets_on_degradation(self, screenshot): """Dépassement du timeout → active dégradé + reset compteur.""" analyzer = MagicMock() def slow_analyze(*_args, **_kw): time.sleep(0.15) return _make_state() analyzer.analyze.side_effect = slow_analyze loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) loop._successive_fast_steps = 2 # état fictif avant le timeout _, timings = loop._build_screen_state(screenshot) assert loop._degraded_mode is True assert loop._successive_fast_steps == 0 assert timings["degraded"] is True def test_recovery_after_three_fast_probes(self, tmp_path): """Après 3 probes rapides consécutifs, retour en mode complet.""" import random analyzer = MagicMock() analyzer.analyze.return_value = _make_state() # Timeout 1000ms → fast_threshold = 500ms ; MagicMock = instant (<<500ms). loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000) # Simuler un état dégradé préexistant loop._degraded_mode = True loop._successive_fast_steps = 0 loop._degraded_step_counter = 0 # Probe immédiat à chaque appel loop._probe_interval = 1 # 3 probes rapides sur 3 screenshots avec dhash différents. # Une image unie a toujours un dhash 0...0 → on génère du bruit. for i in range(3): random.seed(i + 1) img = Image.new("RGB", (320, 240)) for y in range(240): for x in range(320): v = random.randint(0, 255) img.putpixel((x, y), (v, v, v)) path = tmp_path / f"shot_{i}.png" img.save(str(path)) _, timings = loop._build_screen_state(str(path)) assert loop._degraded_mode is False, "Devrait être sorti du mode dégradé" assert loop._successive_fast_steps == 0 # Reset après récupération def test_slow_probe_keeps_degraded(self, tmp_path): """Un probe lent en mode dégradé garde _degraded_mode=True.""" analyzer = MagicMock() def slow_analyze(*_args, **_kw): time.sleep(0.15) return _make_state() analyzer.analyze.side_effect = slow_analyze loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) loop._degraded_mode = True loop._successive_fast_steps = 2 loop._degraded_step_counter = 0 loop._probe_interval = 1 path = tmp_path / "slow.png" Image.new("RGB", (320, 240), color=(80, 80, 80)).save(str(path)) _, timings = loop._build_screen_state(str(path)) assert loop._degraded_mode is True assert loop._successive_fast_steps == 0 # Reset au slow assert timings["degraded"] is True def test_probe_interval_respected_in_degraded(self, screenshot): """En dégradé, on ne fait probe que tous les _probe_interval steps.""" analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000) loop._degraded_mode = True loop._probe_interval = 5 # 4 appels successifs → aucun probe (stub direct) for _ in range(4): _, timings = loop._build_screen_state(screenshot) assert timings["degraded"] is True assert analyzer.analyze.call_count == 0 class TestStepResultFields: def test_step_result_has_new_timing_fields(self): r = StepResult( success=True, node_id="n1", edge_id=None, action_result=None, match_confidence=0.9, duration_ms=10.0, message="test", ) assert r.ocr_ms == 0.0 assert r.ui_ms == 0.0 assert r.analyze_ms == 0.0 assert r.total_ms == 0.0 assert r.cache_hit is False assert r.degraded is False class TestExecuteStepBlockedContract: """Lot A — contrat dict get_next_action dans ExecutionLoop._execute_step.""" def _setup_loop_with_match(self, next_action_return, screenshot): """Crée une ExecutionLoop avec un pipeline mocké qui renvoie ``next_action_return`` à get_next_action, et un ``match_current_state_from_state`` qui matche toujours (Lot E — le chemin d'exécution utilise la nouvelle API context-aware).""" analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer) # Nouveau chemin Lot E : match_current_state_from_state retourne un match valide loop.pipeline.match_current_state_from_state.return_value = { "node_id": "n1", "workflow_id": "wf1", "confidence": 0.95, } loop.pipeline.get_next_action.return_value = next_action_return # Mock _capture_screen pour éviter le vrai capture loop._capture_screen = lambda: screenshot return loop def test_blocked_triggers_paused_state(self, screenshot): """status="blocked" → PAUSED + success=False + on_error appelé.""" loop = self._setup_loop_with_match( next_action_return={"status": "blocked", "reason": "no_valid_edge"}, screenshot=screenshot, ) errors_seen = [] loop.on_error(lambda src, exc: errors_seen.append((src, exc))) result = loop._execute_step() assert result is not None assert result.success is False assert result.edge_id is None assert "Blocked" in result.message assert loop.state.value == "paused" # Callback on_error a bien été notifié assert len(errors_seen) == 1 assert errors_seen[0][0] == "blocked" def test_terminal_succeeds_without_edge(self, screenshot): """status="terminal" → success=True + message "terminated".""" loop = self._setup_loop_with_match( next_action_return={"status": "terminal"}, screenshot=screenshot, ) result = loop._execute_step() assert result is not None assert result.success is True assert result.edge_id is None assert "terminated" in result.message.lower() # PAS passé en PAUSED (workflow terminé légitimement) assert loop.state.value != "paused" def test_legacy_none_treated_as_blocked(self, screenshot): """Rétrocompat défensive : si un pipeline legacy renvoie None, on considère ça comme un blocage (safe default).""" loop = self._setup_loop_with_match( next_action_return=None, screenshot=screenshot, ) result = loop._execute_step() assert result is not None assert result.success is False assert loop.state.value == "paused" def test_selected_continues_execution(self, screenshot): """status="selected" → chemin nominal, tente d'exécuter l'edge.""" loop = self._setup_loop_with_match( next_action_return={ "status": "selected", "edge_id": "e1", "action": {"type": "click", "target": {}}, "target_node": "n2", "confidence": 0.9, "score": 0.9, }, screenshot=screenshot, ) # Mode OBSERVATION pour ne rien exécuter réellement loop.context.mode = ExecutionMode.OBSERVATION result = loop._execute_step() assert result is not None # Pas de PAUSED déclenché assert loop.state.value != "paused" # edge_id bien propagé assert result.edge_id == "e1" class TestSingleton: def test_get_screen_analyzer_returns_same_instance(self): a1 = get_screen_analyzer() a2 = get_screen_analyzer() assert a1 is a2 def test_force_new_creates_new_instance(self): a1 = get_screen_analyzer() a2 = get_screen_analyzer(force_new=True) assert a1 is not a2 def test_get_screen_state_cache_returns_same_instance(self): c1 = get_screen_state_cache() c2 = get_screen_state_cache() assert c1 is c2 class TestAnalyzerIsolationBetweenLoops: """ Lot C — Deux ExecutionLoop partageant le même ScreenAnalyzer ne doivent PAS se contaminer mutuellement. Règle : `analyze()` ne mute jamais `_ocr`, `_ui_detector`, `_ocr_initialized`, `_ui_detector_initialized` pour gérer les flags runtime. Les flags (`enable_ocr`, `enable_ui_detection`) et `session_id` circulent en kwargs d'appel, pas via l'état du singleton. """ def _make_distinct_image(self, path, seed: int): """Image avec dhash unique (random noise) pour éviter les cache hits.""" import random random.seed(seed) img = Image.new("RGB", (128, 128)) for y in range(128): for x in range(128): v = random.randint(0, 255) img.putpixel((x, y), (v, v, v)) img.save(str(path)) return str(path) def test_two_loops_share_analyzer_no_contamination(self, tmp_path): """Deux loops, le premier avec enable_ocr=False, le second avec enable_ocr=True → l'état interne du singleton doit être intact après l'appel du premier loop (pas de self._ocr=None).""" from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer() # Installer un OCR + UIDetector factices ET marqués "initialisés" pour # empêcher l'init lazy réelle pendant le test. sentinel_ocr = lambda path: ["texte_sentinelle"] sentinel_detector = MagicMock() sentinel_detector.detect.return_value = [] analyzer._ocr = sentinel_ocr analyzer._ocr_initialized = True analyzer._ui_detector = sentinel_detector analyzer._ui_detector_initialized = True # Deux screenshots avec dhash distincts (random noise) img_a = self._make_distinct_image(tmp_path / "shot_a.png", seed=1) img_b = self._make_distinct_image(tmp_path / "shot_b.png", seed=2) # Premier loop : OCR désactivé loop_a = _make_loop(screen_analyzer=analyzer, enable_ocr=False) state_a, _ = loop_a._build_screen_state(img_a) # Vérifier l'isolation : l'analyseur est INCHANGÉ. assert analyzer._ocr is sentinel_ocr, ( "analyze(enable_ocr=False) NE DOIT PAS muter self._ocr" ) assert analyzer._ocr_initialized is True assert analyzer._ui_detector is sentinel_detector assert analyzer._ui_detector_initialized is True # Pour le loop A, OCR bypass → detected_text vide assert state_a.perception.detected_text == [] # Deuxième loop : OCR activé loop_b = _make_loop(screen_analyzer=analyzer, enable_ocr=True) state_b, _ = loop_b._build_screen_state(img_b) # L'analyseur est toujours intact assert analyzer._ocr is sentinel_ocr # Et le loop B a bien bénéficié de l'OCR assert state_b.perception.detected_text == ["texte_sentinelle"] def test_session_id_is_per_call_not_singleton(self, tmp_path): """Deux appels avec session_id différent → chaque ScreenState porte le bon session_id, et le singleton ne garde pas de session résiduelle.""" from core.pipeline.screen_analyzer import ScreenAnalyzer # On patche _ensure_*_locked pour éviter l'init réelle. analyzer = ScreenAnalyzer() analyzer._ocr = None analyzer._ocr_initialized = True analyzer._ui_detector = None analyzer._ui_detector_initialized = True img1 = tmp_path / "s1.png" img2 = tmp_path / "s2.png" Image.new("RGB", (100, 100), color=(1, 2, 3)).save(str(img1)) Image.new("RGB", (100, 100), color=(4, 5, 6)).save(str(img2)) s1 = analyzer.analyze(str(img1), session_id="session_alpha") s2 = analyzer.analyze(str(img2), session_id="session_beta") assert s1.session_id == "session_alpha" assert s2.session_id == "session_beta" assert s1.metadata.get("session_id") == "session_alpha" assert s2.metadata.get("session_id") == "session_beta" # Le state_id doit refléter chaque session, pas la "dernière vue" du singleton assert s1.screen_state_id.startswith("session_alpha_") assert s2.screen_state_id.startswith("session_beta_") def test_analyze_flags_override_without_mutation(self, tmp_path): """enable_ui_detection=False → ui_elements=[] dans le résultat, mais analyzer._ui_detector reste initialisé (pas de mutation).""" from core.pipeline.screen_analyzer import ScreenAnalyzer analyzer = ScreenAnalyzer() sentinel_detector = MagicMock() sentinel_detector.detect.return_value = [MagicMock()] # 1 élément factice analyzer._ui_detector = sentinel_detector analyzer._ui_detector_initialized = True analyzer._ocr = lambda p: [] analyzer._ocr_initialized = True img = tmp_path / "shot.png" Image.new("RGB", (100, 100), color=(10, 20, 30)).save(str(img)) state = analyzer.analyze(str(img), enable_ui_detection=False) # ui_elements vide puisque détection désactivée pour cet appel assert state.ui_elements == [] # Mais le détecteur du singleton est intact assert analyzer._ui_detector is sentinel_detector assert analyzer._ui_detector_initialized is True # Le détecteur n'a PAS été appelé sentinel_detector.detect.assert_not_called() class TestCacheContextAwareFromLoop: """Lot D — Deux ExecutionLoop qui partagent le même ScreenStateCache mais s'exécutent dans des workflows différents NE DOIVENT PAS partager leurs entrées de cache : la clé composite inclut `workflow_id`. """ def test_two_loops_different_workflow_different_cache(self, tmp_path): """Même screenshot + même analyseur + workflow_id différent → 2 miss. Le compute_fn sous-jacent (analyzer.analyze) doit être appelé pour chaque loop : pas de contamination inter-workflows. """ from core.pipeline import get_screen_state_cache analyzer = MagicMock() analyzer.analyze.return_value = _make_state() # Un même cache partagé (singleton) entre les deux loops. shared_cache = get_screen_state_cache() # Image commune (dhash identique) img = tmp_path / "common.png" Image.new("RGB", (320, 240), color=(77, 77, 77)).save(str(img)) # Loop A → workflow "wf_A" loop_a = _make_loop( screen_analyzer=analyzer, screen_state_cache=shared_cache, ) loop_a.context.workflow_id = "wf_A" loop_a._build_screen_state(str(img)) assert analyzer.analyze.call_count == 1 # Loop B → workflow "wf_B" (même cache, même image, contexte différent) loop_b = _make_loop( screen_analyzer=analyzer, screen_state_cache=shared_cache, ) loop_b.context.workflow_id = "wf_B" loop_b._build_screen_state(str(img)) # Pas de collision : analyzer.analyze a bien été appelé une 2ème fois. assert analyzer.analyze.call_count == 2 # Une 3ème exécution du loop A (même workflow_id, même screenshot) # doit par contre frapper le cache. loop_a._build_screen_state(str(img)) assert analyzer.analyze.call_count == 2 # Pas de nouvel appel class TestExecutionLoopUsesMatchFromState: """ Lot E — ExecutionLoop._execute_step doit appeler ``pipeline.match_current_state_from_state`` avec le ScreenState enrichi, et NON plus l'API legacy ``match_current_state(screenshot_path, ...)``. """ def _make_loop_with_analyzer(self, screenshot): analyzer = MagicMock() analyzer.analyze.return_value = _make_state() loop = _make_loop(screen_analyzer=analyzer) loop._capture_screen = lambda: screenshot return loop def test_execution_loop_calls_match_from_state(self, screenshot): """_execute_step doit appeler match_current_state_from_state, pas l'ancienne API.""" loop = self._make_loop_with_analyzer(screenshot) loop.pipeline.match_current_state_from_state.return_value = { "node_id": "n1", "workflow_id": "wf1", "confidence": 0.9, } loop.pipeline.get_next_action.return_value = {"status": "terminal"} loop._execute_step() # La nouvelle API a été appelée assert loop.pipeline.match_current_state_from_state.called # L'ancienne API n'a PAS été appelée loop.pipeline.match_current_state.assert_not_called() def test_execution_loop_passes_enriched_screen_state(self, screenshot): """Le ScreenState passé à match_current_state_from_state doit être le résultat enrichi du ScreenAnalyzer (avec detected_text + title réel), pas un stub.""" loop = self._make_loop_with_analyzer(screenshot) loop.pipeline.match_current_state_from_state.return_value = None loop._execute_step() call_args = loop.pipeline.match_current_state_from_state.call_args passed_state = call_args.args[0] # Le state vient de _make_state() → detected_text=["hello"], title="Title" assert passed_state.perception.detected_text == ["hello"] assert passed_state.window.window_title == "Title" # Et le workflow_id est bien propagé assert call_args.kwargs.get("workflow_id") == "wf1"