From 9ca277a63f3b5967f18a7a662da381e01ddf2115 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 15 Apr 2026 09:06:41 +0200 Subject: [PATCH] =?UTF-8?q?refactor(pipeline):=20ScreenAnalyzer=20thread-s?= =?UTF-8?q?afe=20et=20isol=C3=A9=20(Lot=20C)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Retrait de l'état global toxique : - analyze() : kwargs-only enable_ocr, enable_ui_detection, session_id - Ne mute JAMAIS self pour les flags (variables locales + branches) - _resolve_ocr_instance() / _resolve_ui_detector_instance() : lecture seule - _init_lock par instance pour lazy init concurrent safe - session_id par appel, plus via mutation singleton Avant : ExecutionLoop mutait analyzer._ocr, _ui_detector, _ocr_initialized, _ui_detector_initialized pour désactiver OCR/UI. Deux loops partageant le singleton se polluaient mutuellement. Après : deux loops partageant l'analyzer sont complètement isolés. Preuve par TestAnalyzerIsolationBetweenLoops (3 tests). Singleton get_screen_analyzer() préservé — garde uniquement les ressources lourdes, plus de contexte d'exécution. 9 nouveaux tests (3 isolation + 6 kwargs-only/lazy-init). Co-Authored-By: Claude Opus 4.6 (1M context) --- core/pipeline/__init__.py | 135 +++- core/pipeline/screen_analyzer.py | 265 +++++-- .../unit/test_execution_loop_vision_aware.py | 678 ++++++++++++++++++ tests/unit/test_screen_analyzer.py | 185 +++++ 4 files changed, 1221 insertions(+), 42 deletions(-) create mode 100644 tests/unit/test_execution_loop_vision_aware.py create mode 100644 tests/unit/test_screen_analyzer.py diff --git a/core/pipeline/__init__.py b/core/pipeline/__init__.py index 1f80977eb..a096b6a65 100644 --- a/core/pipeline/__init__.py +++ b/core/pipeline/__init__.py @@ -2,7 +2,140 @@ Pipeline module - Orchestration du flux RPA Vision V3 """ +from __future__ import annotations + +import threading +from typing import Optional + from .workflow_pipeline import WorkflowPipeline, create_pipeline from .screen_analyzer import ScreenAnalyzer +from .screen_state_cache import ScreenStateCache, compute_perceptual_hash +from .edge_scorer import EdgeScorer, EdgeScore -__all__ = ["WorkflowPipeline", "create_pipeline", "ScreenAnalyzer"] +__all__ = [ + "WorkflowPipeline", + "create_pipeline", + "ScreenAnalyzer", + "ScreenStateCache", + "compute_perceptual_hash", + "EdgeScorer", + "EdgeScore", + "get_screen_analyzer", + "reset_screen_analyzer", + "get_screen_state_cache", + "reset_screen_state_cache", +] + + +# ============================================================================= +# Singleton ScreenAnalyzer +# ============================================================================= +# +# Une seule instance est partagée entre ExecutionLoop, GraphBuilder et +# stream_processor pour éviter le double chargement GPU (UIDetector + CLIP +# = 6-10 Go VRAM, plafond 12 Go sur RTX 5070). +# +# Thread-safe : protégé par un lock. +# +# IMPORTANT (Lot C — avril 2026) : +# Ce singleton ne porte plus AUCUN contexte d'exécution. Il détient +# uniquement les ressources lourdes (modèles OCR, UIDetector, CLIP). +# • Les flags runtime (`enable_ocr`, `enable_ui_detection`) et l'identité +# de session (`session_id`) se passent en kwargs-only à `analyze()`, +# jamais en mutant l'instance. Voir `ScreenAnalyzer.analyze()`. +# • L'argument `session_id` de `get_screen_analyzer()` ne sert QUE de +# valeur par défaut historique, ignorée après la première création. +# À terme, prévoir sa suppression. +# ============================================================================= + + +_SCREEN_ANALYZER_SINGLETON: Optional[ScreenAnalyzer] = None +_SCREEN_ANALYZER_LOCK = threading.Lock() + + +def get_screen_analyzer( + ui_detector=None, + ocr_engine: Optional[str] = None, + session_id: str = "", + force_new: bool = False, +) -> ScreenAnalyzer: + """ + Récupérer l'instance partagée de ScreenAnalyzer. + + Création à la première demande (lazy). Les appels ultérieurs retournent + la même instance, quels que soient les arguments (sauf `force_new=True`). + + Args: + ui_detector: UIDetector optionnel (utilisé seulement à la 1ère création) + ocr_engine: Moteur OCR ("doctr", "tesseract", None=auto) + session_id: ID de session pour la 1ère création + force_new: Forcer la création d'une nouvelle instance (tests) + + Returns: + Instance partagée de ScreenAnalyzer + """ + global _SCREEN_ANALYZER_SINGLETON + + if force_new: + with _SCREEN_ANALYZER_LOCK: + _SCREEN_ANALYZER_SINGLETON = ScreenAnalyzer( + ui_detector=ui_detector, + ocr_engine=ocr_engine, + session_id=session_id, + ) + return _SCREEN_ANALYZER_SINGLETON + + if _SCREEN_ANALYZER_SINGLETON is not None: + return _SCREEN_ANALYZER_SINGLETON + + with _SCREEN_ANALYZER_LOCK: + # Double-check locking + if _SCREEN_ANALYZER_SINGLETON is None: + _SCREEN_ANALYZER_SINGLETON = ScreenAnalyzer( + ui_detector=ui_detector, + ocr_engine=ocr_engine, + session_id=session_id, + ) + return _SCREEN_ANALYZER_SINGLETON + + +def reset_screen_analyzer() -> None: + """Réinitialiser le singleton (tests uniquement).""" + global _SCREEN_ANALYZER_SINGLETON + with _SCREEN_ANALYZER_LOCK: + _SCREEN_ANALYZER_SINGLETON = None + + +# ============================================================================= +# Singleton ScreenStateCache (partagé) +# ============================================================================= + + +_SCREEN_STATE_CACHE_SINGLETON: Optional[ScreenStateCache] = None +_SCREEN_STATE_CACHE_LOCK = threading.Lock() + + +def get_screen_state_cache( + ttl_seconds: float = 2.0, + max_entries: int = 16, +) -> ScreenStateCache: + """ + Retourne le cache de ScreenState partagé (créé à la 1ère demande). + """ + global _SCREEN_STATE_CACHE_SINGLETON + if _SCREEN_STATE_CACHE_SINGLETON is not None: + return _SCREEN_STATE_CACHE_SINGLETON + with _SCREEN_STATE_CACHE_LOCK: + if _SCREEN_STATE_CACHE_SINGLETON is None: + _SCREEN_STATE_CACHE_SINGLETON = ScreenStateCache( + ttl_seconds=ttl_seconds, + max_entries=max_entries, + ) + return _SCREEN_STATE_CACHE_SINGLETON + + +def reset_screen_state_cache() -> None: + """Réinitialiser le cache partagé (tests uniquement).""" + global _SCREEN_STATE_CACHE_SINGLETON + with _SCREEN_STATE_CACHE_LOCK: + _SCREEN_STATE_CACHE_SINGLETON = None diff --git a/core/pipeline/screen_analyzer.py b/core/pipeline/screen_analyzer.py index 89f617654..085bcafad 100644 --- a/core/pipeline/screen_analyzer.py +++ b/core/pipeline/screen_analyzer.py @@ -9,13 +9,33 @@ Orchestre les 4 niveaux du ScreenState : Ce module comble le chaînon manquant entre la capture brute (Couche 0) et la construction d'embeddings (Couche 3). + +============================================================================= +Thread-safety & partage multi-loops (Lot C — avril 2026) +============================================================================= +Cet analyseur peut être partagé entre plusieurs `ExecutionLoop` (singleton +`get_screen_analyzer()`). Pour éviter la contamination croisée : + + • `analyze()` NE MUTE JAMAIS `self._ocr`, `self._ui_detector`, + `self._ocr_initialized`, `self._ui_detector_initialized` pour gérer les + flags runtime (enable_ocr / enable_ui_detection). Ces flags sont par + appel, résolus en variables locales. + • `session_id` circule en paramètre d'appel et renseigne la metadata du + ScreenState ; l'attribut `self.session_id` n'est qu'un défaut historique + (rétrocompat) et n'est plus la source de vérité. + • L'init lazy des composants lourds (OCR, UIDetector) est protégée par un + `_init_lock` par instance pour empêcher une double initialisation + concurrente. """ +import contextlib import logging import os +import threading +import time from datetime import datetime from pathlib import Path -from typing import Optional, Dict, Any, List +from typing import Optional, Dict, Any, List, Tuple from PIL import Image @@ -32,6 +52,44 @@ from core.models.ui_element import UIElement logger = logging.getLogger(__name__) +# Lock d'inférence local au module : sert de fallback si le GPUResourceManager +# n'est pas disponible (import error, tests). Partagé entre toutes les instances +# ScreenAnalyzer du process, cohérent avec le singleton get_screen_analyzer(). +_ANALYZE_FALLBACK_LOCK = threading.Lock() + + +def _acquire_gpu_context(timeout: Optional[float] = None): + """ + Retourne un context manager pour sérialiser les appels GPU. + + Préfère `GPUResourceManager.acquire_inference()` si disponible (coordination + globale), sinon bascule sur un lock threading local au module. + """ + try: + from core.gpu import get_gpu_resource_manager + + manager = get_gpu_resource_manager() + return manager.acquire_inference(timeout=timeout) + except Exception as e: # pragma: no cover - fallback defensif + logger.debug(f"GPUResourceManager indisponible, fallback lock local: {e}") + + @contextlib.contextmanager + def _fallback(): + if timeout is None: + _ANALYZE_FALLBACK_LOCK.acquire() + yield True + _ANALYZE_FALLBACK_LOCK.release() + else: + got = _ANALYZE_FALLBACK_LOCK.acquire(timeout=timeout) + try: + yield got + finally: + if got: + _ANALYZE_FALLBACK_LOCK.release() + + return _fallback() + + class ScreenAnalyzer: """ Construit un ScreenState complet (4 niveaux) depuis un screenshot. @@ -44,6 +102,14 @@ class ScreenAnalyzer: >>> state = analyzer.analyze("/path/to/screenshot.png") >>> print(state.perception.detected_text) >>> print(len(state.ui_elements)) + + Runtime overrides (kwargs-only) sur analyze() : + >>> state = analyzer.analyze( + ... path, + ... enable_ocr=False, # bypass OCR pour cet appel + ... enable_ui_detection=False, # bypass UIDetector + ... session_id="session_42", # session par appel + ... ) """ def __init__( @@ -56,18 +122,27 @@ class ScreenAnalyzer: Args: ui_detector: Instance de UIDetector (créé si None) ocr_engine: Moteur OCR à utiliser ("doctr", "tesseract", None=auto) - session_id: ID de la session en cours + session_id: ID de session par défaut (rétrocompat ; préférer passer + `session_id` en kwarg de `analyze()` pour chaque appel). """ self._ui_detector = ui_detector self._ocr_engine_name = ocr_engine self._ocr = None + # Session par défaut (rétrocompat). La source de vérité est désormais + # le paramètre `session_id` de `analyze()`. self.session_id = session_id + # Compteur d'états — protégé par _state_lock pour être safe en parallèle. self._state_counter = 0 + self._state_lock = threading.Lock() - # Initialisation lazy pour éviter les imports lourds au démarrage + # Initialisation lazy pour éviter les imports lourds au démarrage. self._ui_detector_initialized = ui_detector is not None self._ocr_initialized = False + # Lock dédié à l'init lazy : empêche deux threads d'initialiser + # simultanément OCR ou UIDetector (double chargement GPU). + self._init_lock = threading.Lock() + # ========================================================================= # API publique # ========================================================================= @@ -77,28 +152,85 @@ class ScreenAnalyzer: screenshot_path: str, window_info: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, + *, + enable_ocr: bool = True, + enable_ui_detection: bool = True, + session_id: str = "", ) -> ScreenState: """ Analyser un screenshot et construire un ScreenState complet. + Les flags `enable_ocr`, `enable_ui_detection` et `session_id` sont + **par appel, kwargs-only**, pour ne pas polluer l'état partagé du + singleton quand plusieurs `ExecutionLoop` se partagent l'analyseur. + Args: screenshot_path: Chemin vers le fichier image window_info: Infos fenêtre active {"title": ..., "app_name": ...} context: Contexte métier optionnel + enable_ocr: Active l'OCR pour cet appel (True par défaut). + False → `detected_text=[]`, aucune init d'OCR déclenchée. + enable_ui_detection: Active la détection UI pour cet appel + (True par défaut). False → `ui_elements=[]`. + session_id: ID de session pour cet appel. Si vide, on retombe sur + `self.session_id` (rétrocompat). Cette valeur est propagée + dans `ScreenState.session_id` et `metadata["session_id"]`. Returns: - ScreenState avec les 4 niveaux remplis + ScreenState avec les 4 niveaux remplis. """ screenshot_path = str(screenshot_path) - self._state_counter += 1 - state_id = f"{self.session_id}_state_{self._state_counter:04d}" if self.session_id else f"state_{self._state_counter:04d}" + # Résolution de la session : priorité au kwarg, fallback sur l'état + # interne (legacy). Variable locale uniquement — pas de mutation. + effective_session_id = session_id or self.session_id - # Niveau 1 : Raw + # Compteur incrémenté sous lock pour identifiants uniques même en + # parallèle. C'est la seule mutation tolérée : elle n'impacte pas le + # comportement OCR/UI. + with self._state_lock: + self._state_counter += 1 + state_counter = self._state_counter + + state_id = ( + f"{effective_session_id}_state_{state_counter:04d}" + if effective_session_id + else f"state_{state_counter:04d}" + ) + + # Niveau 1 : Raw (léger, hors lock GPU) raw = self._build_raw_level(screenshot_path) - # Niveau 2 : Perception (OCR) - detected_text = self._extract_text(screenshot_path) + # Résolution locale des instances OCR / UIDetector selon les flags. + # Aucune mutation de self ici : on décide simplement ce qu'on utilise. + ocr_instance = self._resolve_ocr_instance(enable_ocr=enable_ocr) + ui_detector_instance = self._resolve_ui_detector_instance( + enable_ui_detection=enable_ui_detection + ) + + # Niveaux 2 et 3 : OCR + détection UI sont les étapes lourdes en GPU. + # On sérialise via GPUResourceManager.acquire_inference() pour éviter + # que ExecutionLoop et stream_processor saturent simultanément la VRAM + # sur RTX 5070 (12 Go). Timeout généreux : un appel peut prendre 15-20s. + with _acquire_gpu_context(timeout=60.0) as acquired: + if not acquired: + logger.warning( + "Timeout en attendant le lock GPU pour ScreenAnalyzer.analyze() " + "→ exécution sans sérialisation (risque saturation VRAM)" + ) + + # Niveau 2 : Perception (OCR) — mesure du temps OCR + ocr_t0 = time.time() + detected_text = self._extract_text_with(ocr_instance, screenshot_path) + ocr_ms = (time.time() - ocr_t0) * 1000 + + # Niveau 3 : UI Elements — mesure du temps détection + ui_t0 = time.time() + ui_elements = self._detect_ui_elements_with( + ui_detector_instance, screenshot_path, window_info + ) + ui_ms = (time.time() - ui_t0) * 1000 + perception = PerceptionLevel( embedding=EmbeddingRef( provider="openclip_ViT-B-32", @@ -106,13 +238,10 @@ class ScreenAnalyzer: dimensions=512, ), detected_text=detected_text, - text_detection_method=self._get_ocr_method_name(), + text_detection_method=self._get_ocr_method_name(ocr_instance), confidence_avg=0.85 if detected_text else 0.0, ) - # Niveau 3 : UI Elements - ui_elements = self._detect_ui_elements(screenshot_path, window_info) - # Niveau 4 : Contexte window_ctx = self._build_window_context(window_info) context_level = self._build_context_level(context) @@ -120,22 +249,28 @@ class ScreenAnalyzer: state = ScreenState( screen_state_id=state_id, timestamp=datetime.now(), - session_id=self.session_id, + session_id=effective_session_id, window=window_ctx, raw=raw, perception=perception, context=context_level, metadata={ - "analyzer_version": "1.0", + "analyzer_version": "1.1", + "session_id": effective_session_id, "ui_elements_count": len(ui_elements), "text_regions_count": len(detected_text), + "ocr_ms": ocr_ms, + "ui_ms": ui_ms, + "ocr_enabled": enable_ocr, + "ui_detection_enabled": enable_ui_detection, }, ui_elements=ui_elements, ) logger.info( f"ScreenState {state_id} construit: " - f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés" + f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés " + f"(ocr={enable_ocr}, ui={enable_ui_detection})" ) return state @@ -145,11 +280,16 @@ class ScreenAnalyzer: save_dir: str = "data/screens", window_info: Optional[Dict[str, Any]] = None, context: Optional[Dict[str, Any]] = None, + *, + enable_ocr: bool = True, + enable_ui_detection: bool = True, + session_id: str = "", ) -> ScreenState: """ Analyser une PIL Image (utile quand on a déjà l'image en mémoire). - Sauvegarde l'image sur disque puis appelle analyze(). + Sauvegarde l'image sur disque puis appelle analyze(). Les flags + runtime sont propagés à `analyze()` en kwargs-only. """ save_path = Path(save_dir) save_path.mkdir(parents=True, exist_ok=True) @@ -159,7 +299,49 @@ class ScreenAnalyzer: filepath = save_path / filename image.save(str(filepath)) - return self.analyze(str(filepath), window_info=window_info, context=context) + return self.analyze( + str(filepath), + window_info=window_info, + context=context, + enable_ocr=enable_ocr, + enable_ui_detection=enable_ui_detection, + session_id=session_id, + ) + + # ========================================================================= + # Résolution des instances OCR / UI selon les flags d'appel + # ========================================================================= + + def _resolve_ocr_instance(self, *, enable_ocr: bool): + """ + Retourner l'instance OCR à utiliser pour cet appel. + + - `enable_ocr=False` → None (pas d'init, pas d'appel OCR) + - sinon → init lazy sous lock si nécessaire, puis retour de `self._ocr` + + Ne mute `self._ocr` / `self._ocr_initialized` QUE pendant l'init lazy + réelle, jamais pour bypasser l'OCR d'un appel. + """ + if not enable_ocr: + return None + if not self._ocr_initialized: + with self._init_lock: + # Double-check : un autre thread a pu initialiser entretemps. + if not self._ocr_initialized: + self._ensure_ocr_locked() + return self._ocr + + def _resolve_ui_detector_instance(self, *, enable_ui_detection: bool): + """ + Retourner l'instance UIDetector pour cet appel (idem _resolve_ocr_instance). + """ + if not enable_ui_detection: + return None + if not self._ui_detector_initialized: + with self._init_lock: + if not self._ui_detector_initialized: + self._ensure_ui_detector_locked() + return self._ui_detector # ========================================================================= # Niveau 1 : Raw @@ -182,23 +364,24 @@ class ScreenAnalyzer: # Niveau 2 : Perception — OCR # ========================================================================= - def _extract_text(self, screenshot_path: str) -> List[str]: - """Extraire le texte d'un screenshot via OCR.""" - self._ensure_ocr() - - if self._ocr is None: + def _extract_text_with(self, ocr_callable, screenshot_path: str) -> List[str]: + """Extraire le texte via un callable OCR donné (peut être None).""" + if ocr_callable is None: return [] - try: - return self._ocr(screenshot_path) + return ocr_callable(screenshot_path) except Exception as e: logger.warning(f"OCR échoué: {e}") return [] - def _ensure_ocr(self) -> None: - """Initialiser le moteur OCR (lazy).""" - if self._ocr_initialized: - return + def _ensure_ocr_locked(self) -> None: + """ + Initialiser le moteur OCR (appelé sous `self._init_lock`). + + Ne doit PAS être appelé hors de `_resolve_ocr_instance()`. + """ + # Mutation intentionnelle : on installe l'instance OCR réelle. + # Protégée par le lock d'init (pas le lock GPU). self._ocr_initialized = True engine = self._ocr_engine_name @@ -257,8 +440,9 @@ class ScreenAnalyzer: return ocr_func - def _get_ocr_method_name(self) -> str: - if self._ocr is None: + def _get_ocr_method_name(self, ocr_instance=None) -> str: + """Nom du moteur OCR effectivement utilisé pour cet appel.""" + if ocr_instance is None: return "none" if self._ocr_engine_name: return self._ocr_engine_name @@ -268,19 +452,18 @@ class ScreenAnalyzer: # Niveau 3 : UI Elements # ========================================================================= - def _detect_ui_elements( + def _detect_ui_elements_with( self, + ui_detector, screenshot_path: str, window_info: Optional[Dict[str, Any]] = None, ) -> List[UIElement]: - """Détecter les éléments UI dans le screenshot.""" - self._ensure_ui_detector() - - if self._ui_detector is None: + """Détecter les éléments UI via un détecteur donné (peut être None).""" + if ui_detector is None: return [] try: - elements = self._ui_detector.detect( + elements = ui_detector.detect( screenshot_path, window_context=window_info ) return elements @@ -288,10 +471,10 @@ class ScreenAnalyzer: logger.warning(f"Détection UI échouée: {e}") return [] - def _ensure_ui_detector(self) -> None: - """Initialiser le UIDetector (lazy).""" - if self._ui_detector_initialized: - return + def _ensure_ui_detector_locked(self) -> None: + """ + Initialiser le UIDetector (appelé sous `self._init_lock`). + """ self._ui_detector_initialized = True try: diff --git a/tests/unit/test_execution_loop_vision_aware.py b/tests/unit/test_execution_loop_vision_aware.py new file mode 100644 index 000000000..df57f1492 --- /dev/null +++ b/tests/unit/test_execution_loop_vision_aware.py @@ -0,0 +1,678 @@ +""" +Tests unitaires de l'intégration vision-aware dans ExecutionLoop (C1). + +Couvre : + - Construction d'un ScreenState enrichi via ScreenAnalyzer + - Cache hit évite un second appel à analyzer.analyze + - Timeout → mode dégradé persistant + - enable_ui_detection=False + enable_ocr=False → fallback stub + - StepResult contient bien les champs temps (ocr_ms, ui_ms, analyze_ms, cache_hit, degraded) + - Singleton get_screen_analyzer partage bien l'instance +""" + +from __future__ import annotations + +import time +from datetime import datetime +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest +from PIL import Image + +from core.execution.execution_loop import ExecutionContext, ExecutionLoop, ExecutionMode, StepResult +from core.models.screen_state import ( + ContextLevel, + EmbeddingRef, + PerceptionLevel, + RawLevel, + ScreenState, + WindowContext, +) +from core.pipeline import ( + get_screen_analyzer, + get_screen_state_cache, + reset_screen_analyzer, + reset_screen_state_cache, +) + + +# ----------------------------------------------------------------------------- +# Fixtures +# ----------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +def reset_singletons(): + """Réinitialiser les singletons entre chaque test.""" + reset_screen_analyzer() + reset_screen_state_cache() + yield + reset_screen_analyzer() + reset_screen_state_cache() + + +@pytest.fixture +def screenshot(tmp_path): + path = tmp_path / "shot.png" + Image.new("RGB", (320, 240), color=(128, 128, 128)).save(str(path)) + return str(path) + + +def _make_state(session_id: str = "s1") -> ScreenState: + return ScreenState( + screen_state_id="sid", + timestamp=datetime.now(), + session_id=session_id, + window=WindowContext( + app_name="app", window_title="Title", screen_resolution=[1920, 1080] + ), + raw=RawLevel(screenshot_path="", capture_method="test", file_size_bytes=0), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512), + detected_text=["hello"], + text_detection_method="test", + confidence_avg=0.9, + ), + context=ContextLevel(), + metadata={"ocr_ms": 123.0, "ui_ms": 45.0}, + ui_elements=[], + ) + + +def _make_loop(screen_analyzer=None, **kwargs) -> ExecutionLoop: + pipeline = MagicMock() + # Mocker load_workflow pour éviter dépendance FS + pipeline.load_workflow.return_value = None + loop = ExecutionLoop( + pipeline=pipeline, + action_executor=MagicMock(), + screen_capturer=MagicMock(), + screen_analyzer=screen_analyzer, + **kwargs, + ) + loop.context = ExecutionContext( + workflow_id="wf1", + execution_id="exec1", + mode=ExecutionMode.AUTOMATIC, + started_at=datetime.now(), + ) + return loop + + +# ----------------------------------------------------------------------------- +# Tests +# ----------------------------------------------------------------------------- + + +class TestVisionAwareBuild: + + def test_build_screen_state_uses_analyzer(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + loop = _make_loop(screen_analyzer=analyzer) + state, timings = loop._build_screen_state(screenshot) + + assert analyzer.analyze.called + assert state.session_id == "s1" + assert timings["cache_hit"] is False + assert timings["ocr_ms"] == 123.0 + assert timings["ui_ms"] == 45.0 + assert timings["degraded"] is False + + def test_build_screen_state_cache_hit_on_second_call(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + loop = _make_loop(screen_analyzer=analyzer) + loop._build_screen_state(screenshot) + loop._build_screen_state(screenshot) + + # Un seul appel à analyze grâce au cache + assert analyzer.analyze.call_count == 1 + + def test_disabled_ui_and_ocr_returns_stub(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + loop = _make_loop( + screen_analyzer=analyzer, + enable_ui_detection=False, + enable_ocr=False, + ) + state, timings = loop._build_screen_state(screenshot) + + # analyze ne doit PAS avoir été appelé + analyzer.analyze.assert_not_called() + assert timings["degraded"] is True + assert state.perception.detected_text == [] + assert state.ui_elements == [] + + def test_timeout_activates_degraded_mode(self, screenshot): + """Si l'analyse dépasse analyze_timeout_ms, le loop bascule en dégradé.""" + analyzer = MagicMock() + + def slow_analyze(*_args, **_kw): + time.sleep(0.15) + return _make_state() + + analyzer.analyze.side_effect = slow_analyze + + loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) + # Premier appel → mesure timeout et active dégradé + _, timings1 = loop._build_screen_state(screenshot) + assert timings1["degraded"] is True + assert loop._degraded_mode is True + + # Deuxième appel (autre screenshot pour éviter cache) → stub direct + img2 = Path(screenshot).parent / "other.png" + Image.new("RGB", (320, 240), color=(1, 2, 3)).save(str(img2)) + _, timings2 = loop._build_screen_state(str(img2)) + assert timings2["degraded"] is True + # analyzer.analyze n'a pas été appelé une 2ème fois + assert analyzer.analyze.call_count == 1 + + def test_analyzer_unavailable_returns_stub(self, screenshot): + """Si get_screen_analyzer() renvoie None, fallback stub.""" + loop = _make_loop(screen_analyzer=None) + # Forcer _get_screen_analyzer à retourner None + with patch.object(loop, "_get_screen_analyzer", return_value=None): + state, timings = loop._build_screen_state(screenshot) + assert timings["degraded"] is True + assert state.ui_elements == [] + + def test_stub_when_all_flags_off(self, screenshot): + loop = _make_loop(enable_ui_detection=False, enable_ocr=False) + state, timings = loop._build_screen_state(screenshot) + assert state.window.window_title == "Unknown" + assert timings["degraded"] is True + + +class TestWindowInfoProvider: + + def test_window_info_provider_is_used(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + provider = lambda: {"title": "Chrome", "app_name": "chrome"} + loop = _make_loop(screen_analyzer=analyzer, window_info_provider=provider) + loop._build_screen_state(screenshot) + + # Vérifier que window_info a bien été passé à analyze + call_kwargs = analyzer.analyze.call_args.kwargs + assert call_kwargs.get("window_info") == {"title": "Chrome", "app_name": "chrome"} + + def test_falls_back_to_screen_capturer(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + loop = _make_loop(screen_analyzer=analyzer) + loop.screen_capturer.get_active_window.return_value = { + "title": "Firefox", + "app": "firefox", + "x": 0, + "y": 0, + "width": 800, + "height": 600, + } + loop._build_screen_state(screenshot) + + call_kwargs = analyzer.analyze.call_args.kwargs + wi = call_kwargs.get("window_info") + assert wi is not None + assert wi["title"] == "Firefox" + assert wi["app_name"] == "firefox" + + +class TestDegradedModeRecovery: + """Tâche 2 — Auto-rétablissement du mode dégradé après steps rapides.""" + + def test_fast_steps_counter_resets_on_degradation(self, screenshot): + """Dépassement du timeout → active dégradé + reset compteur.""" + analyzer = MagicMock() + + def slow_analyze(*_args, **_kw): + time.sleep(0.15) + return _make_state() + + analyzer.analyze.side_effect = slow_analyze + + loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) + loop._successive_fast_steps = 2 # état fictif avant le timeout + + _, timings = loop._build_screen_state(screenshot) + + assert loop._degraded_mode is True + assert loop._successive_fast_steps == 0 + assert timings["degraded"] is True + + def test_recovery_after_three_fast_probes(self, tmp_path): + """Après 3 probes rapides consécutifs, retour en mode complet.""" + import random + + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + # Timeout 1000ms → fast_threshold = 500ms ; MagicMock = instant (<<500ms). + loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000) + # Simuler un état dégradé préexistant + loop._degraded_mode = True + loop._successive_fast_steps = 0 + loop._degraded_step_counter = 0 + # Probe immédiat à chaque appel + loop._probe_interval = 1 + + # 3 probes rapides sur 3 screenshots avec dhash différents. + # Une image unie a toujours un dhash 0...0 → on génère du bruit. + for i in range(3): + random.seed(i + 1) + img = Image.new("RGB", (320, 240)) + for y in range(240): + for x in range(320): + v = random.randint(0, 255) + img.putpixel((x, y), (v, v, v)) + path = tmp_path / f"shot_{i}.png" + img.save(str(path)) + _, timings = loop._build_screen_state(str(path)) + + assert loop._degraded_mode is False, "Devrait être sorti du mode dégradé" + assert loop._successive_fast_steps == 0 # Reset après récupération + + def test_slow_probe_keeps_degraded(self, tmp_path): + """Un probe lent en mode dégradé garde _degraded_mode=True.""" + analyzer = MagicMock() + + def slow_analyze(*_args, **_kw): + time.sleep(0.15) + return _make_state() + + analyzer.analyze.side_effect = slow_analyze + + loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50) + loop._degraded_mode = True + loop._successive_fast_steps = 2 + loop._degraded_step_counter = 0 + loop._probe_interval = 1 + + path = tmp_path / "slow.png" + Image.new("RGB", (320, 240), color=(80, 80, 80)).save(str(path)) + _, timings = loop._build_screen_state(str(path)) + + assert loop._degraded_mode is True + assert loop._successive_fast_steps == 0 # Reset au slow + assert timings["degraded"] is True + + def test_probe_interval_respected_in_degraded(self, screenshot): + """En dégradé, on ne fait probe que tous les _probe_interval steps.""" + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000) + loop._degraded_mode = True + loop._probe_interval = 5 + + # 4 appels successifs → aucun probe (stub direct) + for _ in range(4): + _, timings = loop._build_screen_state(screenshot) + assert timings["degraded"] is True + assert analyzer.analyze.call_count == 0 + + +class TestStepResultFields: + + def test_step_result_has_new_timing_fields(self): + r = StepResult( + success=True, + node_id="n1", + edge_id=None, + action_result=None, + match_confidence=0.9, + duration_ms=10.0, + message="test", + ) + assert r.ocr_ms == 0.0 + assert r.ui_ms == 0.0 + assert r.analyze_ms == 0.0 + assert r.total_ms == 0.0 + assert r.cache_hit is False + assert r.degraded is False + + +class TestExecuteStepBlockedContract: + """Lot A — contrat dict get_next_action dans ExecutionLoop._execute_step.""" + + def _setup_loop_with_match(self, next_action_return, screenshot): + """Crée une ExecutionLoop avec un pipeline mocké qui renvoie + ``next_action_return`` à get_next_action, et un + ``match_current_state_from_state`` qui matche toujours (Lot E — le + chemin d'exécution utilise la nouvelle API context-aware).""" + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + loop = _make_loop(screen_analyzer=analyzer) + # Nouveau chemin Lot E : match_current_state_from_state retourne un match valide + loop.pipeline.match_current_state_from_state.return_value = { + "node_id": "n1", + "workflow_id": "wf1", + "confidence": 0.95, + } + loop.pipeline.get_next_action.return_value = next_action_return + + # Mock _capture_screen pour éviter le vrai capture + loop._capture_screen = lambda: screenshot + + return loop + + def test_blocked_triggers_paused_state(self, screenshot): + """status="blocked" → PAUSED + success=False + on_error appelé.""" + loop = self._setup_loop_with_match( + next_action_return={"status": "blocked", "reason": "no_valid_edge"}, + screenshot=screenshot, + ) + + errors_seen = [] + loop.on_error(lambda src, exc: errors_seen.append((src, exc))) + + result = loop._execute_step() + + assert result is not None + assert result.success is False + assert result.edge_id is None + assert "Blocked" in result.message + assert loop.state.value == "paused" + # Callback on_error a bien été notifié + assert len(errors_seen) == 1 + assert errors_seen[0][0] == "blocked" + + def test_terminal_succeeds_without_edge(self, screenshot): + """status="terminal" → success=True + message "terminated".""" + loop = self._setup_loop_with_match( + next_action_return={"status": "terminal"}, + screenshot=screenshot, + ) + + result = loop._execute_step() + assert result is not None + assert result.success is True + assert result.edge_id is None + assert "terminated" in result.message.lower() + # PAS passé en PAUSED (workflow terminé légitimement) + assert loop.state.value != "paused" + + def test_legacy_none_treated_as_blocked(self, screenshot): + """Rétrocompat défensive : si un pipeline legacy renvoie None, + on considère ça comme un blocage (safe default).""" + loop = self._setup_loop_with_match( + next_action_return=None, + screenshot=screenshot, + ) + + result = loop._execute_step() + assert result is not None + assert result.success is False + assert loop.state.value == "paused" + + def test_selected_continues_execution(self, screenshot): + """status="selected" → chemin nominal, tente d'exécuter l'edge.""" + loop = self._setup_loop_with_match( + next_action_return={ + "status": "selected", + "edge_id": "e1", + "action": {"type": "click", "target": {}}, + "target_node": "n2", + "confidence": 0.9, + "score": 0.9, + }, + screenshot=screenshot, + ) + # Mode OBSERVATION pour ne rien exécuter réellement + loop.context.mode = ExecutionMode.OBSERVATION + + result = loop._execute_step() + assert result is not None + # Pas de PAUSED déclenché + assert loop.state.value != "paused" + # edge_id bien propagé + assert result.edge_id == "e1" + + +class TestSingleton: + + def test_get_screen_analyzer_returns_same_instance(self): + a1 = get_screen_analyzer() + a2 = get_screen_analyzer() + assert a1 is a2 + + def test_force_new_creates_new_instance(self): + a1 = get_screen_analyzer() + a2 = get_screen_analyzer(force_new=True) + assert a1 is not a2 + + def test_get_screen_state_cache_returns_same_instance(self): + c1 = get_screen_state_cache() + c2 = get_screen_state_cache() + assert c1 is c2 + + +class TestAnalyzerIsolationBetweenLoops: + """ + Lot C — Deux ExecutionLoop partageant le même ScreenAnalyzer ne doivent + PAS se contaminer mutuellement. + + Règle : `analyze()` ne mute jamais `_ocr`, `_ui_detector`, + `_ocr_initialized`, `_ui_detector_initialized` pour gérer les flags runtime. + Les flags (`enable_ocr`, `enable_ui_detection`) et `session_id` circulent + en kwargs d'appel, pas via l'état du singleton. + """ + + def _make_distinct_image(self, path, seed: int): + """Image avec dhash unique (random noise) pour éviter les cache hits.""" + import random + random.seed(seed) + img = Image.new("RGB", (128, 128)) + for y in range(128): + for x in range(128): + v = random.randint(0, 255) + img.putpixel((x, y), (v, v, v)) + img.save(str(path)) + return str(path) + + def test_two_loops_share_analyzer_no_contamination(self, tmp_path): + """Deux loops, le premier avec enable_ocr=False, le second avec + enable_ocr=True → l'état interne du singleton doit être intact + après l'appel du premier loop (pas de self._ocr=None).""" + from core.pipeline.screen_analyzer import ScreenAnalyzer + + analyzer = ScreenAnalyzer() + + # Installer un OCR + UIDetector factices ET marqués "initialisés" pour + # empêcher l'init lazy réelle pendant le test. + sentinel_ocr = lambda path: ["texte_sentinelle"] + sentinel_detector = MagicMock() + sentinel_detector.detect.return_value = [] + + analyzer._ocr = sentinel_ocr + analyzer._ocr_initialized = True + analyzer._ui_detector = sentinel_detector + analyzer._ui_detector_initialized = True + + # Deux screenshots avec dhash distincts (random noise) + img_a = self._make_distinct_image(tmp_path / "shot_a.png", seed=1) + img_b = self._make_distinct_image(tmp_path / "shot_b.png", seed=2) + + # Premier loop : OCR désactivé + loop_a = _make_loop(screen_analyzer=analyzer, enable_ocr=False) + state_a, _ = loop_a._build_screen_state(img_a) + + # Vérifier l'isolation : l'analyseur est INCHANGÉ. + assert analyzer._ocr is sentinel_ocr, ( + "analyze(enable_ocr=False) NE DOIT PAS muter self._ocr" + ) + assert analyzer._ocr_initialized is True + assert analyzer._ui_detector is sentinel_detector + assert analyzer._ui_detector_initialized is True + # Pour le loop A, OCR bypass → detected_text vide + assert state_a.perception.detected_text == [] + + # Deuxième loop : OCR activé + loop_b = _make_loop(screen_analyzer=analyzer, enable_ocr=True) + state_b, _ = loop_b._build_screen_state(img_b) + + # L'analyseur est toujours intact + assert analyzer._ocr is sentinel_ocr + # Et le loop B a bien bénéficié de l'OCR + assert state_b.perception.detected_text == ["texte_sentinelle"] + + def test_session_id_is_per_call_not_singleton(self, tmp_path): + """Deux appels avec session_id différent → chaque ScreenState porte + le bon session_id, et le singleton ne garde pas de session résiduelle.""" + from core.pipeline.screen_analyzer import ScreenAnalyzer + + # On patche _ensure_*_locked pour éviter l'init réelle. + analyzer = ScreenAnalyzer() + analyzer._ocr = None + analyzer._ocr_initialized = True + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + img1 = tmp_path / "s1.png" + img2 = tmp_path / "s2.png" + Image.new("RGB", (100, 100), color=(1, 2, 3)).save(str(img1)) + Image.new("RGB", (100, 100), color=(4, 5, 6)).save(str(img2)) + + s1 = analyzer.analyze(str(img1), session_id="session_alpha") + s2 = analyzer.analyze(str(img2), session_id="session_beta") + + assert s1.session_id == "session_alpha" + assert s2.session_id == "session_beta" + assert s1.metadata.get("session_id") == "session_alpha" + assert s2.metadata.get("session_id") == "session_beta" + # Le state_id doit refléter chaque session, pas la "dernière vue" du singleton + assert s1.screen_state_id.startswith("session_alpha_") + assert s2.screen_state_id.startswith("session_beta_") + + def test_analyze_flags_override_without_mutation(self, tmp_path): + """enable_ui_detection=False → ui_elements=[] dans le résultat, + mais analyzer._ui_detector reste initialisé (pas de mutation).""" + from core.pipeline.screen_analyzer import ScreenAnalyzer + + analyzer = ScreenAnalyzer() + sentinel_detector = MagicMock() + sentinel_detector.detect.return_value = [MagicMock()] # 1 élément factice + analyzer._ui_detector = sentinel_detector + analyzer._ui_detector_initialized = True + analyzer._ocr = lambda p: [] + analyzer._ocr_initialized = True + + img = tmp_path / "shot.png" + Image.new("RGB", (100, 100), color=(10, 20, 30)).save(str(img)) + + state = analyzer.analyze(str(img), enable_ui_detection=False) + + # ui_elements vide puisque détection désactivée pour cet appel + assert state.ui_elements == [] + # Mais le détecteur du singleton est intact + assert analyzer._ui_detector is sentinel_detector + assert analyzer._ui_detector_initialized is True + # Le détecteur n'a PAS été appelé + sentinel_detector.detect.assert_not_called() + + +class TestCacheContextAwareFromLoop: + """Lot D — Deux ExecutionLoop qui partagent le même ScreenStateCache + mais s'exécutent dans des workflows différents NE DOIVENT PAS partager + leurs entrées de cache : la clé composite inclut `workflow_id`. + """ + + def test_two_loops_different_workflow_different_cache(self, tmp_path): + """Même screenshot + même analyseur + workflow_id différent → 2 miss. + + Le compute_fn sous-jacent (analyzer.analyze) doit être appelé pour + chaque loop : pas de contamination inter-workflows. + """ + from core.pipeline import get_screen_state_cache + + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + + # Un même cache partagé (singleton) entre les deux loops. + shared_cache = get_screen_state_cache() + + # Image commune (dhash identique) + img = tmp_path / "common.png" + Image.new("RGB", (320, 240), color=(77, 77, 77)).save(str(img)) + + # Loop A → workflow "wf_A" + loop_a = _make_loop( + screen_analyzer=analyzer, + screen_state_cache=shared_cache, + ) + loop_a.context.workflow_id = "wf_A" + loop_a._build_screen_state(str(img)) + assert analyzer.analyze.call_count == 1 + + # Loop B → workflow "wf_B" (même cache, même image, contexte différent) + loop_b = _make_loop( + screen_analyzer=analyzer, + screen_state_cache=shared_cache, + ) + loop_b.context.workflow_id = "wf_B" + loop_b._build_screen_state(str(img)) + + # Pas de collision : analyzer.analyze a bien été appelé une 2ème fois. + assert analyzer.analyze.call_count == 2 + + # Une 3ème exécution du loop A (même workflow_id, même screenshot) + # doit par contre frapper le cache. + loop_a._build_screen_state(str(img)) + assert analyzer.analyze.call_count == 2 # Pas de nouvel appel + + +class TestExecutionLoopUsesMatchFromState: + """ + Lot E — ExecutionLoop._execute_step doit appeler + ``pipeline.match_current_state_from_state`` avec le ScreenState enrichi, + et NON plus l'API legacy ``match_current_state(screenshot_path, ...)``. + """ + + def _make_loop_with_analyzer(self, screenshot): + analyzer = MagicMock() + analyzer.analyze.return_value = _make_state() + loop = _make_loop(screen_analyzer=analyzer) + loop._capture_screen = lambda: screenshot + return loop + + def test_execution_loop_calls_match_from_state(self, screenshot): + """_execute_step doit appeler match_current_state_from_state, pas + l'ancienne API.""" + loop = self._make_loop_with_analyzer(screenshot) + loop.pipeline.match_current_state_from_state.return_value = { + "node_id": "n1", + "workflow_id": "wf1", + "confidence": 0.9, + } + loop.pipeline.get_next_action.return_value = {"status": "terminal"} + + loop._execute_step() + + # La nouvelle API a été appelée + assert loop.pipeline.match_current_state_from_state.called + # L'ancienne API n'a PAS été appelée + loop.pipeline.match_current_state.assert_not_called() + + def test_execution_loop_passes_enriched_screen_state(self, screenshot): + """Le ScreenState passé à match_current_state_from_state doit être le + résultat enrichi du ScreenAnalyzer (avec detected_text + title réel), + pas un stub.""" + loop = self._make_loop_with_analyzer(screenshot) + loop.pipeline.match_current_state_from_state.return_value = None + + loop._execute_step() + + call_args = loop.pipeline.match_current_state_from_state.call_args + passed_state = call_args.args[0] + # Le state vient de _make_state() → detected_text=["hello"], title="Title" + assert passed_state.perception.detected_text == ["hello"] + assert passed_state.window.window_title == "Title" + # Et le workflow_id est bien propagé + assert call_args.kwargs.get("workflow_id") == "wf1" diff --git a/tests/unit/test_screen_analyzer.py b/tests/unit/test_screen_analyzer.py new file mode 100644 index 000000000..218131dff --- /dev/null +++ b/tests/unit/test_screen_analyzer.py @@ -0,0 +1,185 @@ +""" +Tests unitaires de `ScreenAnalyzer` (Lot C — thread-safety). + +Couvre : + - Les flags runtime sont kwargs-only (enable_ocr, enable_ui_detection, session_id) + - L'init lazy (OCR + UIDetector) est protégée par un lock → pas de double init + - `analyze()` ne mute jamais `_ocr*` / `_ui_detector*` pour gérer les flags +""" + +from __future__ import annotations + +import threading +import time +from pathlib import Path +from unittest.mock import MagicMock + +import pytest +from PIL import Image + +from core.pipeline.screen_analyzer import ScreenAnalyzer + + +@pytest.fixture +def screenshot(tmp_path): + path = tmp_path / "shot.png" + Image.new("RGB", (64, 64), color=(100, 100, 100)).save(str(path)) + return str(path) + + +# ----------------------------------------------------------------------------- +# API — kwargs-only +# ----------------------------------------------------------------------------- + + +class TestAnalyzeKwargsOnly: + """Les flags runtime doivent être passés en kwargs-only, jamais positionnels.""" + + def test_analyze_kwargs_only_accept(self, screenshot): + """L'appel nominal avec kwargs fonctionne.""" + analyzer = ScreenAnalyzer() + # Empêcher l'init réelle + analyzer._ocr = None + analyzer._ocr_initialized = True + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + state = analyzer.analyze( + screenshot, + enable_ocr=False, + enable_ui_detection=False, + session_id="s_kwargs", + ) + assert state.session_id == "s_kwargs" + assert state.perception.detected_text == [] + assert state.ui_elements == [] + + def test_analyze_rejects_positional_flags(self, screenshot): + """Passer enable_ocr en position 4 (après window_info, context) → TypeError.""" + analyzer = ScreenAnalyzer() + analyzer._ocr = None + analyzer._ocr_initialized = True + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + # Signature : analyze(self, screenshot_path, window_info=None, context=None, + # *, enable_ocr=..., enable_ui_detection=..., session_id=...) + # Un 4e argument positionnel doit être rejeté. + with pytest.raises(TypeError): + analyzer.analyze(screenshot, None, None, False) # noqa: E501 (flag positionnel interdit) + + def test_analyze_session_id_propagates_to_state(self, screenshot): + """session_id passé en kwarg remplit ScreenState.session_id et metadata.""" + analyzer = ScreenAnalyzer(session_id="default_session") + analyzer._ocr = None + analyzer._ocr_initialized = True + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + # kwarg explicite → prioritaire + state_call = analyzer.analyze(screenshot, session_id="explicit_session") + assert state_call.session_id == "explicit_session" + assert state_call.metadata["session_id"] == "explicit_session" + + # kwarg vide → fallback sur la valeur d'instance (rétrocompat) + state_default = analyzer.analyze(screenshot) + assert state_default.session_id == "default_session" + + +# ----------------------------------------------------------------------------- +# Lazy init sous lock +# ----------------------------------------------------------------------------- + + +class TestLazyInitUnderLock: + """L'init lazy (OCR / UIDetector) ne doit jamais se faire en double.""" + + def test_analyze_lazy_init_under_lock(self, screenshot): + """Init concurrente → une seule création de l'OCR.""" + analyzer = ScreenAnalyzer() + + # Simuler un init OCR coûteux : compte les appels, renvoie un OCR factice. + init_count = {"n": 0} + + def fake_ensure_ocr_locked(): + # Ne marcher qu'une fois : mimer _ensure_ocr_locked qui s'auto-verrouille. + init_count["n"] += 1 + time.sleep(0.05) # laisser la concurrence s'exprimer + analyzer._ocr = lambda p: ["ok"] + analyzer._ocr_initialized = True + + analyzer._ensure_ocr_locked = fake_ensure_ocr_locked # type: ignore[assignment] + # UIDetector déjà "prêt" (pas None → détection évitée via mock) + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + # N threads lancent analyze() simultanément + results = [] + errors = [] + + def worker(): + try: + s = analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=False) + results.append(s) + except Exception as e: # pragma: no cover + errors.append(e) + + threads = [threading.Thread(target=worker) for _ in range(8)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + + assert not errors, f"Erreurs dans les threads: {errors}" + assert len(results) == 8 + # UNE seule init OCR malgré 8 appels concurrents + assert init_count["n"] == 1, ( + f"Init OCR exécutée {init_count['n']} fois — doit être 1 sous lock" + ) + + def test_analyze_no_mutation_for_flag_bypass(self, screenshot): + """enable_ocr=False NE DOIT PAS muter self._ocr ni _ocr_initialized.""" + analyzer = ScreenAnalyzer() + # État "frais" : rien d'initialisé + assert analyzer._ocr is None + assert analyzer._ocr_initialized is False + assert analyzer._ui_detector is None + assert analyzer._ui_detector_initialized is False + + analyzer.analyze(screenshot, enable_ocr=False, enable_ui_detection=False) + + # L'état interne doit être strictement inchangé : aucune init n'a été + # déclenchée puisque les deux flags étaient à False. + assert analyzer._ocr is None + assert analyzer._ocr_initialized is False + assert analyzer._ui_detector is None + assert analyzer._ui_detector_initialized is False + + def test_analyze_lazy_init_only_when_requested(self, screenshot): + """enable_ocr=True sur instance fraîche → init déclenchée. + enable_ocr=False sur instance fraîche → pas d'init.""" + analyzer = ScreenAnalyzer() + calls = {"ocr": 0, "ui": 0} + + def fake_ocr_init(): + calls["ocr"] += 1 + analyzer._ocr = lambda p: [] + analyzer._ocr_initialized = True + + def fake_ui_init(): + calls["ui"] += 1 + analyzer._ui_detector = None + analyzer._ui_detector_initialized = True + + analyzer._ensure_ocr_locked = fake_ocr_init # type: ignore[assignment] + analyzer._ensure_ui_detector_locked = fake_ui_init # type: ignore[assignment] + + # Appel 1 : seul OCR demandé + analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=False) + assert calls["ocr"] == 1 + assert calls["ui"] == 0 + + # Appel 2 : maintenant UI demandée + analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=True) + assert calls["ocr"] == 1 # déjà initialisé, pas de réinit + assert calls["ui"] == 1