refactor(pipeline): ScreenAnalyzer thread-safe et isolé (Lot C)

Retrait de l'état global toxique :
  - analyze() : kwargs-only enable_ocr, enable_ui_detection, session_id
  - Ne mute JAMAIS self pour les flags (variables locales + branches)
  - _resolve_ocr_instance() / _resolve_ui_detector_instance() : lecture seule
  - _init_lock par instance pour lazy init concurrent safe
  - session_id par appel, plus via mutation singleton

Avant : ExecutionLoop mutait analyzer._ocr, _ui_detector,
_ocr_initialized, _ui_detector_initialized pour désactiver OCR/UI.
Deux loops partageant le singleton se polluaient mutuellement.

Après : deux loops partageant l'analyzer sont complètement isolés.
Preuve par TestAnalyzerIsolationBetweenLoops (3 tests).

Singleton get_screen_analyzer() préservé — garde uniquement les
ressources lourdes, plus de contexte d'exécution.

9 nouveaux tests (3 isolation + 6 kwargs-only/lazy-init).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-15 09:06:41 +02:00
parent 8c7b6e5696
commit 9ca277a63f
4 changed files with 1221 additions and 42 deletions

View File

@@ -2,7 +2,140 @@
Pipeline module - Orchestration du flux RPA Vision V3
"""
from __future__ import annotations
import threading
from typing import Optional
from .workflow_pipeline import WorkflowPipeline, create_pipeline
from .screen_analyzer import ScreenAnalyzer
from .screen_state_cache import ScreenStateCache, compute_perceptual_hash
from .edge_scorer import EdgeScorer, EdgeScore
__all__ = ["WorkflowPipeline", "create_pipeline", "ScreenAnalyzer"]
__all__ = [
"WorkflowPipeline",
"create_pipeline",
"ScreenAnalyzer",
"ScreenStateCache",
"compute_perceptual_hash",
"EdgeScorer",
"EdgeScore",
"get_screen_analyzer",
"reset_screen_analyzer",
"get_screen_state_cache",
"reset_screen_state_cache",
]
# =============================================================================
# Singleton ScreenAnalyzer
# =============================================================================
#
# Une seule instance est partagée entre ExecutionLoop, GraphBuilder et
# stream_processor pour éviter le double chargement GPU (UIDetector + CLIP
# = 6-10 Go VRAM, plafond 12 Go sur RTX 5070).
#
# Thread-safe : protégé par un lock.
#
# IMPORTANT (Lot C — avril 2026) :
# Ce singleton ne porte plus AUCUN contexte d'exécution. Il détient
# uniquement les ressources lourdes (modèles OCR, UIDetector, CLIP).
# • Les flags runtime (`enable_ocr`, `enable_ui_detection`) et l'identité
# de session (`session_id`) se passent en kwargs-only à `analyze()`,
# jamais en mutant l'instance. Voir `ScreenAnalyzer.analyze()`.
# • L'argument `session_id` de `get_screen_analyzer()` ne sert QUE de
# valeur par défaut historique, ignorée après la première création.
# À terme, prévoir sa suppression.
# =============================================================================
_SCREEN_ANALYZER_SINGLETON: Optional[ScreenAnalyzer] = None
_SCREEN_ANALYZER_LOCK = threading.Lock()
def get_screen_analyzer(
ui_detector=None,
ocr_engine: Optional[str] = None,
session_id: str = "",
force_new: bool = False,
) -> ScreenAnalyzer:
"""
Récupérer l'instance partagée de ScreenAnalyzer.
Création à la première demande (lazy). Les appels ultérieurs retournent
la même instance, quels que soient les arguments (sauf `force_new=True`).
Args:
ui_detector: UIDetector optionnel (utilisé seulement à la 1ère création)
ocr_engine: Moteur OCR ("doctr", "tesseract", None=auto)
session_id: ID de session pour la 1ère création
force_new: Forcer la création d'une nouvelle instance (tests)
Returns:
Instance partagée de ScreenAnalyzer
"""
global _SCREEN_ANALYZER_SINGLETON
if force_new:
with _SCREEN_ANALYZER_LOCK:
_SCREEN_ANALYZER_SINGLETON = ScreenAnalyzer(
ui_detector=ui_detector,
ocr_engine=ocr_engine,
session_id=session_id,
)
return _SCREEN_ANALYZER_SINGLETON
if _SCREEN_ANALYZER_SINGLETON is not None:
return _SCREEN_ANALYZER_SINGLETON
with _SCREEN_ANALYZER_LOCK:
# Double-check locking
if _SCREEN_ANALYZER_SINGLETON is None:
_SCREEN_ANALYZER_SINGLETON = ScreenAnalyzer(
ui_detector=ui_detector,
ocr_engine=ocr_engine,
session_id=session_id,
)
return _SCREEN_ANALYZER_SINGLETON
def reset_screen_analyzer() -> None:
"""Réinitialiser le singleton (tests uniquement)."""
global _SCREEN_ANALYZER_SINGLETON
with _SCREEN_ANALYZER_LOCK:
_SCREEN_ANALYZER_SINGLETON = None
# =============================================================================
# Singleton ScreenStateCache (partagé)
# =============================================================================
_SCREEN_STATE_CACHE_SINGLETON: Optional[ScreenStateCache] = None
_SCREEN_STATE_CACHE_LOCK = threading.Lock()
def get_screen_state_cache(
ttl_seconds: float = 2.0,
max_entries: int = 16,
) -> ScreenStateCache:
"""
Retourne le cache de ScreenState partagé (créé à la 1ère demande).
"""
global _SCREEN_STATE_CACHE_SINGLETON
if _SCREEN_STATE_CACHE_SINGLETON is not None:
return _SCREEN_STATE_CACHE_SINGLETON
with _SCREEN_STATE_CACHE_LOCK:
if _SCREEN_STATE_CACHE_SINGLETON is None:
_SCREEN_STATE_CACHE_SINGLETON = ScreenStateCache(
ttl_seconds=ttl_seconds,
max_entries=max_entries,
)
return _SCREEN_STATE_CACHE_SINGLETON
def reset_screen_state_cache() -> None:
"""Réinitialiser le cache partagé (tests uniquement)."""
global _SCREEN_STATE_CACHE_SINGLETON
with _SCREEN_STATE_CACHE_LOCK:
_SCREEN_STATE_CACHE_SINGLETON = None

View File

@@ -9,13 +9,33 @@ Orchestre les 4 niveaux du ScreenState :
Ce module comble le chaînon manquant entre la capture brute (Couche 0)
et la construction d'embeddings (Couche 3).
=============================================================================
Thread-safety & partage multi-loops (Lot C — avril 2026)
=============================================================================
Cet analyseur peut être partagé entre plusieurs `ExecutionLoop` (singleton
`get_screen_analyzer()`). Pour éviter la contamination croisée :
• `analyze()` NE MUTE JAMAIS `self._ocr`, `self._ui_detector`,
`self._ocr_initialized`, `self._ui_detector_initialized` pour gérer les
flags runtime (enable_ocr / enable_ui_detection). Ces flags sont par
appel, résolus en variables locales.
• `session_id` circule en paramètre d'appel et renseigne la metadata du
ScreenState ; l'attribut `self.session_id` n'est qu'un défaut historique
(rétrocompat) et n'est plus la source de vérité.
• L'init lazy des composants lourds (OCR, UIDetector) est protégée par un
`_init_lock` par instance pour empêcher une double initialisation
concurrente.
"""
import contextlib
import logging
import os
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
from typing import Optional, Dict, Any, List, Tuple
from PIL import Image
@@ -32,6 +52,44 @@ from core.models.ui_element import UIElement
logger = logging.getLogger(__name__)
# Lock d'inférence local au module : sert de fallback si le GPUResourceManager
# n'est pas disponible (import error, tests). Partagé entre toutes les instances
# ScreenAnalyzer du process, cohérent avec le singleton get_screen_analyzer().
_ANALYZE_FALLBACK_LOCK = threading.Lock()
def _acquire_gpu_context(timeout: Optional[float] = None):
"""
Retourne un context manager pour sérialiser les appels GPU.
Préfère `GPUResourceManager.acquire_inference()` si disponible (coordination
globale), sinon bascule sur un lock threading local au module.
"""
try:
from core.gpu import get_gpu_resource_manager
manager = get_gpu_resource_manager()
return manager.acquire_inference(timeout=timeout)
except Exception as e: # pragma: no cover - fallback defensif
logger.debug(f"GPUResourceManager indisponible, fallback lock local: {e}")
@contextlib.contextmanager
def _fallback():
if timeout is None:
_ANALYZE_FALLBACK_LOCK.acquire()
yield True
_ANALYZE_FALLBACK_LOCK.release()
else:
got = _ANALYZE_FALLBACK_LOCK.acquire(timeout=timeout)
try:
yield got
finally:
if got:
_ANALYZE_FALLBACK_LOCK.release()
return _fallback()
class ScreenAnalyzer:
"""
Construit un ScreenState complet (4 niveaux) depuis un screenshot.
@@ -44,6 +102,14 @@ class ScreenAnalyzer:
>>> state = analyzer.analyze("/path/to/screenshot.png")
>>> print(state.perception.detected_text)
>>> print(len(state.ui_elements))
Runtime overrides (kwargs-only) sur analyze() :
>>> state = analyzer.analyze(
... path,
... enable_ocr=False, # bypass OCR pour cet appel
... enable_ui_detection=False, # bypass UIDetector
... session_id="session_42", # session par appel
... )
"""
def __init__(
@@ -56,18 +122,27 @@ class ScreenAnalyzer:
Args:
ui_detector: Instance de UIDetector (créé si None)
ocr_engine: Moteur OCR à utiliser ("doctr", "tesseract", None=auto)
session_id: ID de la session en cours
session_id: ID de session par défaut (rétrocompat ; préférer passer
`session_id` en kwarg de `analyze()` pour chaque appel).
"""
self._ui_detector = ui_detector
self._ocr_engine_name = ocr_engine
self._ocr = None
# Session par défaut (rétrocompat). La source de vérité est désormais
# le paramètre `session_id` de `analyze()`.
self.session_id = session_id
# Compteur d'états — protégé par _state_lock pour être safe en parallèle.
self._state_counter = 0
self._state_lock = threading.Lock()
# Initialisation lazy pour éviter les imports lourds au démarrage
# Initialisation lazy pour éviter les imports lourds au démarrage.
self._ui_detector_initialized = ui_detector is not None
self._ocr_initialized = False
# Lock dédié à l'init lazy : empêche deux threads d'initialiser
# simultanément OCR ou UIDetector (double chargement GPU).
self._init_lock = threading.Lock()
# =========================================================================
# API publique
# =========================================================================
@@ -77,28 +152,85 @@ class ScreenAnalyzer:
screenshot_path: str,
window_info: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
*,
enable_ocr: bool = True,
enable_ui_detection: bool = True,
session_id: str = "",
) -> ScreenState:
"""
Analyser un screenshot et construire un ScreenState complet.
Les flags `enable_ocr`, `enable_ui_detection` et `session_id` sont
**par appel, kwargs-only**, pour ne pas polluer l'état partagé du
singleton quand plusieurs `ExecutionLoop` se partagent l'analyseur.
Args:
screenshot_path: Chemin vers le fichier image
window_info: Infos fenêtre active {"title": ..., "app_name": ...}
context: Contexte métier optionnel
enable_ocr: Active l'OCR pour cet appel (True par défaut).
False → `detected_text=[]`, aucune init d'OCR déclenchée.
enable_ui_detection: Active la détection UI pour cet appel
(True par défaut). False → `ui_elements=[]`.
session_id: ID de session pour cet appel. Si vide, on retombe sur
`self.session_id` (rétrocompat). Cette valeur est propagée
dans `ScreenState.session_id` et `metadata["session_id"]`.
Returns:
ScreenState avec les 4 niveaux remplis
ScreenState avec les 4 niveaux remplis.
"""
screenshot_path = str(screenshot_path)
self._state_counter += 1
state_id = f"{self.session_id}_state_{self._state_counter:04d}" if self.session_id else f"state_{self._state_counter:04d}"
# Résolution de la session : priorité au kwarg, fallback sur l'état
# interne (legacy). Variable locale uniquement — pas de mutation.
effective_session_id = session_id or self.session_id
# Niveau 1 : Raw
# Compteur incrémenté sous lock pour identifiants uniques même en
# parallèle. C'est la seule mutation tolérée : elle n'impacte pas le
# comportement OCR/UI.
with self._state_lock:
self._state_counter += 1
state_counter = self._state_counter
state_id = (
f"{effective_session_id}_state_{state_counter:04d}"
if effective_session_id
else f"state_{state_counter:04d}"
)
# Niveau 1 : Raw (léger, hors lock GPU)
raw = self._build_raw_level(screenshot_path)
# Niveau 2 : Perception (OCR)
detected_text = self._extract_text(screenshot_path)
# Résolution locale des instances OCR / UIDetector selon les flags.
# Aucune mutation de self ici : on décide simplement ce qu'on utilise.
ocr_instance = self._resolve_ocr_instance(enable_ocr=enable_ocr)
ui_detector_instance = self._resolve_ui_detector_instance(
enable_ui_detection=enable_ui_detection
)
# Niveaux 2 et 3 : OCR + détection UI sont les étapes lourdes en GPU.
# On sérialise via GPUResourceManager.acquire_inference() pour éviter
# que ExecutionLoop et stream_processor saturent simultanément la VRAM
# sur RTX 5070 (12 Go). Timeout généreux : un appel peut prendre 15-20s.
with _acquire_gpu_context(timeout=60.0) as acquired:
if not acquired:
logger.warning(
"Timeout en attendant le lock GPU pour ScreenAnalyzer.analyze() "
"→ exécution sans sérialisation (risque saturation VRAM)"
)
# Niveau 2 : Perception (OCR) — mesure du temps OCR
ocr_t0 = time.time()
detected_text = self._extract_text_with(ocr_instance, screenshot_path)
ocr_ms = (time.time() - ocr_t0) * 1000
# Niveau 3 : UI Elements — mesure du temps détection
ui_t0 = time.time()
ui_elements = self._detect_ui_elements_with(
ui_detector_instance, screenshot_path, window_info
)
ui_ms = (time.time() - ui_t0) * 1000
perception = PerceptionLevel(
embedding=EmbeddingRef(
provider="openclip_ViT-B-32",
@@ -106,13 +238,10 @@ class ScreenAnalyzer:
dimensions=512,
),
detected_text=detected_text,
text_detection_method=self._get_ocr_method_name(),
text_detection_method=self._get_ocr_method_name(ocr_instance),
confidence_avg=0.85 if detected_text else 0.0,
)
# Niveau 3 : UI Elements
ui_elements = self._detect_ui_elements(screenshot_path, window_info)
# Niveau 4 : Contexte
window_ctx = self._build_window_context(window_info)
context_level = self._build_context_level(context)
@@ -120,22 +249,28 @@ class ScreenAnalyzer:
state = ScreenState(
screen_state_id=state_id,
timestamp=datetime.now(),
session_id=self.session_id,
session_id=effective_session_id,
window=window_ctx,
raw=raw,
perception=perception,
context=context_level,
metadata={
"analyzer_version": "1.0",
"analyzer_version": "1.1",
"session_id": effective_session_id,
"ui_elements_count": len(ui_elements),
"text_regions_count": len(detected_text),
"ocr_ms": ocr_ms,
"ui_ms": ui_ms,
"ocr_enabled": enable_ocr,
"ui_detection_enabled": enable_ui_detection,
},
ui_elements=ui_elements,
)
logger.info(
f"ScreenState {state_id} construit: "
f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés"
f"{len(ui_elements)} éléments UI, {len(detected_text)} textes détectés "
f"(ocr={enable_ocr}, ui={enable_ui_detection})"
)
return state
@@ -145,11 +280,16 @@ class ScreenAnalyzer:
save_dir: str = "data/screens",
window_info: Optional[Dict[str, Any]] = None,
context: Optional[Dict[str, Any]] = None,
*,
enable_ocr: bool = True,
enable_ui_detection: bool = True,
session_id: str = "",
) -> ScreenState:
"""
Analyser une PIL Image (utile quand on a déjà l'image en mémoire).
Sauvegarde l'image sur disque puis appelle analyze().
Sauvegarde l'image sur disque puis appelle analyze(). Les flags
runtime sont propagés à `analyze()` en kwargs-only.
"""
save_path = Path(save_dir)
save_path.mkdir(parents=True, exist_ok=True)
@@ -159,7 +299,49 @@ class ScreenAnalyzer:
filepath = save_path / filename
image.save(str(filepath))
return self.analyze(str(filepath), window_info=window_info, context=context)
return self.analyze(
str(filepath),
window_info=window_info,
context=context,
enable_ocr=enable_ocr,
enable_ui_detection=enable_ui_detection,
session_id=session_id,
)
# =========================================================================
# Résolution des instances OCR / UI selon les flags d'appel
# =========================================================================
def _resolve_ocr_instance(self, *, enable_ocr: bool):
"""
Retourner l'instance OCR à utiliser pour cet appel.
- `enable_ocr=False` → None (pas d'init, pas d'appel OCR)
- sinon → init lazy sous lock si nécessaire, puis retour de `self._ocr`
Ne mute `self._ocr` / `self._ocr_initialized` QUE pendant l'init lazy
réelle, jamais pour bypasser l'OCR d'un appel.
"""
if not enable_ocr:
return None
if not self._ocr_initialized:
with self._init_lock:
# Double-check : un autre thread a pu initialiser entretemps.
if not self._ocr_initialized:
self._ensure_ocr_locked()
return self._ocr
def _resolve_ui_detector_instance(self, *, enable_ui_detection: bool):
"""
Retourner l'instance UIDetector pour cet appel (idem _resolve_ocr_instance).
"""
if not enable_ui_detection:
return None
if not self._ui_detector_initialized:
with self._init_lock:
if not self._ui_detector_initialized:
self._ensure_ui_detector_locked()
return self._ui_detector
# =========================================================================
# Niveau 1 : Raw
@@ -182,23 +364,24 @@ class ScreenAnalyzer:
# Niveau 2 : Perception — OCR
# =========================================================================
def _extract_text(self, screenshot_path: str) -> List[str]:
"""Extraire le texte d'un screenshot via OCR."""
self._ensure_ocr()
if self._ocr is None:
def _extract_text_with(self, ocr_callable, screenshot_path: str) -> List[str]:
"""Extraire le texte via un callable OCR donné (peut être None)."""
if ocr_callable is None:
return []
try:
return self._ocr(screenshot_path)
return ocr_callable(screenshot_path)
except Exception as e:
logger.warning(f"OCR échoué: {e}")
return []
def _ensure_ocr(self) -> None:
"""Initialiser le moteur OCR (lazy)."""
if self._ocr_initialized:
return
def _ensure_ocr_locked(self) -> None:
"""
Initialiser le moteur OCR (appelé sous `self._init_lock`).
Ne doit PAS être appelé hors de `_resolve_ocr_instance()`.
"""
# Mutation intentionnelle : on installe l'instance OCR réelle.
# Protégée par le lock d'init (pas le lock GPU).
self._ocr_initialized = True
engine = self._ocr_engine_name
@@ -257,8 +440,9 @@ class ScreenAnalyzer:
return ocr_func
def _get_ocr_method_name(self) -> str:
if self._ocr is None:
def _get_ocr_method_name(self, ocr_instance=None) -> str:
"""Nom du moteur OCR effectivement utilisé pour cet appel."""
if ocr_instance is None:
return "none"
if self._ocr_engine_name:
return self._ocr_engine_name
@@ -268,19 +452,18 @@ class ScreenAnalyzer:
# Niveau 3 : UI Elements
# =========================================================================
def _detect_ui_elements(
def _detect_ui_elements_with(
self,
ui_detector,
screenshot_path: str,
window_info: Optional[Dict[str, Any]] = None,
) -> List[UIElement]:
"""Détecter les éléments UI dans le screenshot."""
self._ensure_ui_detector()
if self._ui_detector is None:
"""Détecter les éléments UI via un détecteur donné (peut être None)."""
if ui_detector is None:
return []
try:
elements = self._ui_detector.detect(
elements = ui_detector.detect(
screenshot_path, window_context=window_info
)
return elements
@@ -288,10 +471,10 @@ class ScreenAnalyzer:
logger.warning(f"Détection UI échouée: {e}")
return []
def _ensure_ui_detector(self) -> None:
"""Initialiser le UIDetector (lazy)."""
if self._ui_detector_initialized:
return
def _ensure_ui_detector_locked(self) -> None:
"""
Initialiser le UIDetector (appelé sous `self._init_lock`).
"""
self._ui_detector_initialized = True
try:

View File

@@ -0,0 +1,678 @@
"""
Tests unitaires de l'intégration vision-aware dans ExecutionLoop (C1).
Couvre :
- Construction d'un ScreenState enrichi via ScreenAnalyzer
- Cache hit évite un second appel à analyzer.analyze
- Timeout → mode dégradé persistant
- enable_ui_detection=False + enable_ocr=False → fallback stub
- StepResult contient bien les champs temps (ocr_ms, ui_ms, analyze_ms, cache_hit, degraded)
- Singleton get_screen_analyzer partage bien l'instance
"""
from __future__ import annotations
import time
from datetime import datetime
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from PIL import Image
from core.execution.execution_loop import ExecutionContext, ExecutionLoop, ExecutionMode, StepResult
from core.models.screen_state import (
ContextLevel,
EmbeddingRef,
PerceptionLevel,
RawLevel,
ScreenState,
WindowContext,
)
from core.pipeline import (
get_screen_analyzer,
get_screen_state_cache,
reset_screen_analyzer,
reset_screen_state_cache,
)
# -----------------------------------------------------------------------------
# Fixtures
# -----------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def reset_singletons():
"""Réinitialiser les singletons entre chaque test."""
reset_screen_analyzer()
reset_screen_state_cache()
yield
reset_screen_analyzer()
reset_screen_state_cache()
@pytest.fixture
def screenshot(tmp_path):
path = tmp_path / "shot.png"
Image.new("RGB", (320, 240), color=(128, 128, 128)).save(str(path))
return str(path)
def _make_state(session_id: str = "s1") -> ScreenState:
return ScreenState(
screen_state_id="sid",
timestamp=datetime.now(),
session_id=session_id,
window=WindowContext(
app_name="app", window_title="Title", screen_resolution=[1920, 1080]
),
raw=RawLevel(screenshot_path="", capture_method="test", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512),
detected_text=["hello"],
text_detection_method="test",
confidence_avg=0.9,
),
context=ContextLevel(),
metadata={"ocr_ms": 123.0, "ui_ms": 45.0},
ui_elements=[],
)
def _make_loop(screen_analyzer=None, **kwargs) -> ExecutionLoop:
pipeline = MagicMock()
# Mocker load_workflow pour éviter dépendance FS
pipeline.load_workflow.return_value = None
loop = ExecutionLoop(
pipeline=pipeline,
action_executor=MagicMock(),
screen_capturer=MagicMock(),
screen_analyzer=screen_analyzer,
**kwargs,
)
loop.context = ExecutionContext(
workflow_id="wf1",
execution_id="exec1",
mode=ExecutionMode.AUTOMATIC,
started_at=datetime.now(),
)
return loop
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
class TestVisionAwareBuild:
def test_build_screen_state_uses_analyzer(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer)
state, timings = loop._build_screen_state(screenshot)
assert analyzer.analyze.called
assert state.session_id == "s1"
assert timings["cache_hit"] is False
assert timings["ocr_ms"] == 123.0
assert timings["ui_ms"] == 45.0
assert timings["degraded"] is False
def test_build_screen_state_cache_hit_on_second_call(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer)
loop._build_screen_state(screenshot)
loop._build_screen_state(screenshot)
# Un seul appel à analyze grâce au cache
assert analyzer.analyze.call_count == 1
def test_disabled_ui_and_ocr_returns_stub(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(
screen_analyzer=analyzer,
enable_ui_detection=False,
enable_ocr=False,
)
state, timings = loop._build_screen_state(screenshot)
# analyze ne doit PAS avoir été appelé
analyzer.analyze.assert_not_called()
assert timings["degraded"] is True
assert state.perception.detected_text == []
assert state.ui_elements == []
def test_timeout_activates_degraded_mode(self, screenshot):
"""Si l'analyse dépasse analyze_timeout_ms, le loop bascule en dégradé."""
analyzer = MagicMock()
def slow_analyze(*_args, **_kw):
time.sleep(0.15)
return _make_state()
analyzer.analyze.side_effect = slow_analyze
loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50)
# Premier appel → mesure timeout et active dégradé
_, timings1 = loop._build_screen_state(screenshot)
assert timings1["degraded"] is True
assert loop._degraded_mode is True
# Deuxième appel (autre screenshot pour éviter cache) → stub direct
img2 = Path(screenshot).parent / "other.png"
Image.new("RGB", (320, 240), color=(1, 2, 3)).save(str(img2))
_, timings2 = loop._build_screen_state(str(img2))
assert timings2["degraded"] is True
# analyzer.analyze n'a pas été appelé une 2ème fois
assert analyzer.analyze.call_count == 1
def test_analyzer_unavailable_returns_stub(self, screenshot):
"""Si get_screen_analyzer() renvoie None, fallback stub."""
loop = _make_loop(screen_analyzer=None)
# Forcer _get_screen_analyzer à retourner None
with patch.object(loop, "_get_screen_analyzer", return_value=None):
state, timings = loop._build_screen_state(screenshot)
assert timings["degraded"] is True
assert state.ui_elements == []
def test_stub_when_all_flags_off(self, screenshot):
loop = _make_loop(enable_ui_detection=False, enable_ocr=False)
state, timings = loop._build_screen_state(screenshot)
assert state.window.window_title == "Unknown"
assert timings["degraded"] is True
class TestWindowInfoProvider:
def test_window_info_provider_is_used(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
provider = lambda: {"title": "Chrome", "app_name": "chrome"}
loop = _make_loop(screen_analyzer=analyzer, window_info_provider=provider)
loop._build_screen_state(screenshot)
# Vérifier que window_info a bien été passé à analyze
call_kwargs = analyzer.analyze.call_args.kwargs
assert call_kwargs.get("window_info") == {"title": "Chrome", "app_name": "chrome"}
def test_falls_back_to_screen_capturer(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer)
loop.screen_capturer.get_active_window.return_value = {
"title": "Firefox",
"app": "firefox",
"x": 0,
"y": 0,
"width": 800,
"height": 600,
}
loop._build_screen_state(screenshot)
call_kwargs = analyzer.analyze.call_args.kwargs
wi = call_kwargs.get("window_info")
assert wi is not None
assert wi["title"] == "Firefox"
assert wi["app_name"] == "firefox"
class TestDegradedModeRecovery:
"""Tâche 2 — Auto-rétablissement du mode dégradé après steps rapides."""
def test_fast_steps_counter_resets_on_degradation(self, screenshot):
"""Dépassement du timeout → active dégradé + reset compteur."""
analyzer = MagicMock()
def slow_analyze(*_args, **_kw):
time.sleep(0.15)
return _make_state()
analyzer.analyze.side_effect = slow_analyze
loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50)
loop._successive_fast_steps = 2 # état fictif avant le timeout
_, timings = loop._build_screen_state(screenshot)
assert loop._degraded_mode is True
assert loop._successive_fast_steps == 0
assert timings["degraded"] is True
def test_recovery_after_three_fast_probes(self, tmp_path):
"""Après 3 probes rapides consécutifs, retour en mode complet."""
import random
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
# Timeout 1000ms → fast_threshold = 500ms ; MagicMock = instant (<<500ms).
loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000)
# Simuler un état dégradé préexistant
loop._degraded_mode = True
loop._successive_fast_steps = 0
loop._degraded_step_counter = 0
# Probe immédiat à chaque appel
loop._probe_interval = 1
# 3 probes rapides sur 3 screenshots avec dhash différents.
# Une image unie a toujours un dhash 0...0 → on génère du bruit.
for i in range(3):
random.seed(i + 1)
img = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
v = random.randint(0, 255)
img.putpixel((x, y), (v, v, v))
path = tmp_path / f"shot_{i}.png"
img.save(str(path))
_, timings = loop._build_screen_state(str(path))
assert loop._degraded_mode is False, "Devrait être sorti du mode dégradé"
assert loop._successive_fast_steps == 0 # Reset après récupération
def test_slow_probe_keeps_degraded(self, tmp_path):
"""Un probe lent en mode dégradé garde _degraded_mode=True."""
analyzer = MagicMock()
def slow_analyze(*_args, **_kw):
time.sleep(0.15)
return _make_state()
analyzer.analyze.side_effect = slow_analyze
loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=50)
loop._degraded_mode = True
loop._successive_fast_steps = 2
loop._degraded_step_counter = 0
loop._probe_interval = 1
path = tmp_path / "slow.png"
Image.new("RGB", (320, 240), color=(80, 80, 80)).save(str(path))
_, timings = loop._build_screen_state(str(path))
assert loop._degraded_mode is True
assert loop._successive_fast_steps == 0 # Reset au slow
assert timings["degraded"] is True
def test_probe_interval_respected_in_degraded(self, screenshot):
"""En dégradé, on ne fait probe que tous les _probe_interval steps."""
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer, analyze_timeout_ms=1000)
loop._degraded_mode = True
loop._probe_interval = 5
# 4 appels successifs → aucun probe (stub direct)
for _ in range(4):
_, timings = loop._build_screen_state(screenshot)
assert timings["degraded"] is True
assert analyzer.analyze.call_count == 0
class TestStepResultFields:
def test_step_result_has_new_timing_fields(self):
r = StepResult(
success=True,
node_id="n1",
edge_id=None,
action_result=None,
match_confidence=0.9,
duration_ms=10.0,
message="test",
)
assert r.ocr_ms == 0.0
assert r.ui_ms == 0.0
assert r.analyze_ms == 0.0
assert r.total_ms == 0.0
assert r.cache_hit is False
assert r.degraded is False
class TestExecuteStepBlockedContract:
"""Lot A — contrat dict get_next_action dans ExecutionLoop._execute_step."""
def _setup_loop_with_match(self, next_action_return, screenshot):
"""Crée une ExecutionLoop avec un pipeline mocké qui renvoie
``next_action_return`` à get_next_action, et un
``match_current_state_from_state`` qui matche toujours (Lot E — le
chemin d'exécution utilise la nouvelle API context-aware)."""
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer)
# Nouveau chemin Lot E : match_current_state_from_state retourne un match valide
loop.pipeline.match_current_state_from_state.return_value = {
"node_id": "n1",
"workflow_id": "wf1",
"confidence": 0.95,
}
loop.pipeline.get_next_action.return_value = next_action_return
# Mock _capture_screen pour éviter le vrai capture
loop._capture_screen = lambda: screenshot
return loop
def test_blocked_triggers_paused_state(self, screenshot):
"""status="blocked" → PAUSED + success=False + on_error appelé."""
loop = self._setup_loop_with_match(
next_action_return={"status": "blocked", "reason": "no_valid_edge"},
screenshot=screenshot,
)
errors_seen = []
loop.on_error(lambda src, exc: errors_seen.append((src, exc)))
result = loop._execute_step()
assert result is not None
assert result.success is False
assert result.edge_id is None
assert "Blocked" in result.message
assert loop.state.value == "paused"
# Callback on_error a bien été notifié
assert len(errors_seen) == 1
assert errors_seen[0][0] == "blocked"
def test_terminal_succeeds_without_edge(self, screenshot):
"""status="terminal" → success=True + message "terminated"."""
loop = self._setup_loop_with_match(
next_action_return={"status": "terminal"},
screenshot=screenshot,
)
result = loop._execute_step()
assert result is not None
assert result.success is True
assert result.edge_id is None
assert "terminated" in result.message.lower()
# PAS passé en PAUSED (workflow terminé légitimement)
assert loop.state.value != "paused"
def test_legacy_none_treated_as_blocked(self, screenshot):
"""Rétrocompat défensive : si un pipeline legacy renvoie None,
on considère ça comme un blocage (safe default)."""
loop = self._setup_loop_with_match(
next_action_return=None,
screenshot=screenshot,
)
result = loop._execute_step()
assert result is not None
assert result.success is False
assert loop.state.value == "paused"
def test_selected_continues_execution(self, screenshot):
"""status="selected" → chemin nominal, tente d'exécuter l'edge."""
loop = self._setup_loop_with_match(
next_action_return={
"status": "selected",
"edge_id": "e1",
"action": {"type": "click", "target": {}},
"target_node": "n2",
"confidence": 0.9,
"score": 0.9,
},
screenshot=screenshot,
)
# Mode OBSERVATION pour ne rien exécuter réellement
loop.context.mode = ExecutionMode.OBSERVATION
result = loop._execute_step()
assert result is not None
# Pas de PAUSED déclenché
assert loop.state.value != "paused"
# edge_id bien propagé
assert result.edge_id == "e1"
class TestSingleton:
def test_get_screen_analyzer_returns_same_instance(self):
a1 = get_screen_analyzer()
a2 = get_screen_analyzer()
assert a1 is a2
def test_force_new_creates_new_instance(self):
a1 = get_screen_analyzer()
a2 = get_screen_analyzer(force_new=True)
assert a1 is not a2
def test_get_screen_state_cache_returns_same_instance(self):
c1 = get_screen_state_cache()
c2 = get_screen_state_cache()
assert c1 is c2
class TestAnalyzerIsolationBetweenLoops:
"""
Lot C — Deux ExecutionLoop partageant le même ScreenAnalyzer ne doivent
PAS se contaminer mutuellement.
Règle : `analyze()` ne mute jamais `_ocr`, `_ui_detector`,
`_ocr_initialized`, `_ui_detector_initialized` pour gérer les flags runtime.
Les flags (`enable_ocr`, `enable_ui_detection`) et `session_id` circulent
en kwargs d'appel, pas via l'état du singleton.
"""
def _make_distinct_image(self, path, seed: int):
"""Image avec dhash unique (random noise) pour éviter les cache hits."""
import random
random.seed(seed)
img = Image.new("RGB", (128, 128))
for y in range(128):
for x in range(128):
v = random.randint(0, 255)
img.putpixel((x, y), (v, v, v))
img.save(str(path))
return str(path)
def test_two_loops_share_analyzer_no_contamination(self, tmp_path):
"""Deux loops, le premier avec enable_ocr=False, le second avec
enable_ocr=True → l'état interne du singleton doit être intact
après l'appel du premier loop (pas de self._ocr=None)."""
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer()
# Installer un OCR + UIDetector factices ET marqués "initialisés" pour
# empêcher l'init lazy réelle pendant le test.
sentinel_ocr = lambda path: ["texte_sentinelle"]
sentinel_detector = MagicMock()
sentinel_detector.detect.return_value = []
analyzer._ocr = sentinel_ocr
analyzer._ocr_initialized = True
analyzer._ui_detector = sentinel_detector
analyzer._ui_detector_initialized = True
# Deux screenshots avec dhash distincts (random noise)
img_a = self._make_distinct_image(tmp_path / "shot_a.png", seed=1)
img_b = self._make_distinct_image(tmp_path / "shot_b.png", seed=2)
# Premier loop : OCR désactivé
loop_a = _make_loop(screen_analyzer=analyzer, enable_ocr=False)
state_a, _ = loop_a._build_screen_state(img_a)
# Vérifier l'isolation : l'analyseur est INCHANGÉ.
assert analyzer._ocr is sentinel_ocr, (
"analyze(enable_ocr=False) NE DOIT PAS muter self._ocr"
)
assert analyzer._ocr_initialized is True
assert analyzer._ui_detector is sentinel_detector
assert analyzer._ui_detector_initialized is True
# Pour le loop A, OCR bypass → detected_text vide
assert state_a.perception.detected_text == []
# Deuxième loop : OCR activé
loop_b = _make_loop(screen_analyzer=analyzer, enable_ocr=True)
state_b, _ = loop_b._build_screen_state(img_b)
# L'analyseur est toujours intact
assert analyzer._ocr is sentinel_ocr
# Et le loop B a bien bénéficié de l'OCR
assert state_b.perception.detected_text == ["texte_sentinelle"]
def test_session_id_is_per_call_not_singleton(self, tmp_path):
"""Deux appels avec session_id différent → chaque ScreenState porte
le bon session_id, et le singleton ne garde pas de session résiduelle."""
from core.pipeline.screen_analyzer import ScreenAnalyzer
# On patche _ensure_*_locked pour éviter l'init réelle.
analyzer = ScreenAnalyzer()
analyzer._ocr = None
analyzer._ocr_initialized = True
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
img1 = tmp_path / "s1.png"
img2 = tmp_path / "s2.png"
Image.new("RGB", (100, 100), color=(1, 2, 3)).save(str(img1))
Image.new("RGB", (100, 100), color=(4, 5, 6)).save(str(img2))
s1 = analyzer.analyze(str(img1), session_id="session_alpha")
s2 = analyzer.analyze(str(img2), session_id="session_beta")
assert s1.session_id == "session_alpha"
assert s2.session_id == "session_beta"
assert s1.metadata.get("session_id") == "session_alpha"
assert s2.metadata.get("session_id") == "session_beta"
# Le state_id doit refléter chaque session, pas la "dernière vue" du singleton
assert s1.screen_state_id.startswith("session_alpha_")
assert s2.screen_state_id.startswith("session_beta_")
def test_analyze_flags_override_without_mutation(self, tmp_path):
"""enable_ui_detection=False → ui_elements=[] dans le résultat,
mais analyzer._ui_detector reste initialisé (pas de mutation)."""
from core.pipeline.screen_analyzer import ScreenAnalyzer
analyzer = ScreenAnalyzer()
sentinel_detector = MagicMock()
sentinel_detector.detect.return_value = [MagicMock()] # 1 élément factice
analyzer._ui_detector = sentinel_detector
analyzer._ui_detector_initialized = True
analyzer._ocr = lambda p: []
analyzer._ocr_initialized = True
img = tmp_path / "shot.png"
Image.new("RGB", (100, 100), color=(10, 20, 30)).save(str(img))
state = analyzer.analyze(str(img), enable_ui_detection=False)
# ui_elements vide puisque détection désactivée pour cet appel
assert state.ui_elements == []
# Mais le détecteur du singleton est intact
assert analyzer._ui_detector is sentinel_detector
assert analyzer._ui_detector_initialized is True
# Le détecteur n'a PAS été appelé
sentinel_detector.detect.assert_not_called()
class TestCacheContextAwareFromLoop:
"""Lot D — Deux ExecutionLoop qui partagent le même ScreenStateCache
mais s'exécutent dans des workflows différents NE DOIVENT PAS partager
leurs entrées de cache : la clé composite inclut `workflow_id`.
"""
def test_two_loops_different_workflow_different_cache(self, tmp_path):
"""Même screenshot + même analyseur + workflow_id différent → 2 miss.
Le compute_fn sous-jacent (analyzer.analyze) doit être appelé pour
chaque loop : pas de contamination inter-workflows.
"""
from core.pipeline import get_screen_state_cache
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
# Un même cache partagé (singleton) entre les deux loops.
shared_cache = get_screen_state_cache()
# Image commune (dhash identique)
img = tmp_path / "common.png"
Image.new("RGB", (320, 240), color=(77, 77, 77)).save(str(img))
# Loop A → workflow "wf_A"
loop_a = _make_loop(
screen_analyzer=analyzer,
screen_state_cache=shared_cache,
)
loop_a.context.workflow_id = "wf_A"
loop_a._build_screen_state(str(img))
assert analyzer.analyze.call_count == 1
# Loop B → workflow "wf_B" (même cache, même image, contexte différent)
loop_b = _make_loop(
screen_analyzer=analyzer,
screen_state_cache=shared_cache,
)
loop_b.context.workflow_id = "wf_B"
loop_b._build_screen_state(str(img))
# Pas de collision : analyzer.analyze a bien été appelé une 2ème fois.
assert analyzer.analyze.call_count == 2
# Une 3ème exécution du loop A (même workflow_id, même screenshot)
# doit par contre frapper le cache.
loop_a._build_screen_state(str(img))
assert analyzer.analyze.call_count == 2 # Pas de nouvel appel
class TestExecutionLoopUsesMatchFromState:
"""
Lot E — ExecutionLoop._execute_step doit appeler
``pipeline.match_current_state_from_state`` avec le ScreenState enrichi,
et NON plus l'API legacy ``match_current_state(screenshot_path, ...)``.
"""
def _make_loop_with_analyzer(self, screenshot):
analyzer = MagicMock()
analyzer.analyze.return_value = _make_state()
loop = _make_loop(screen_analyzer=analyzer)
loop._capture_screen = lambda: screenshot
return loop
def test_execution_loop_calls_match_from_state(self, screenshot):
"""_execute_step doit appeler match_current_state_from_state, pas
l'ancienne API."""
loop = self._make_loop_with_analyzer(screenshot)
loop.pipeline.match_current_state_from_state.return_value = {
"node_id": "n1",
"workflow_id": "wf1",
"confidence": 0.9,
}
loop.pipeline.get_next_action.return_value = {"status": "terminal"}
loop._execute_step()
# La nouvelle API a été appelée
assert loop.pipeline.match_current_state_from_state.called
# L'ancienne API n'a PAS été appelée
loop.pipeline.match_current_state.assert_not_called()
def test_execution_loop_passes_enriched_screen_state(self, screenshot):
"""Le ScreenState passé à match_current_state_from_state doit être le
résultat enrichi du ScreenAnalyzer (avec detected_text + title réel),
pas un stub."""
loop = self._make_loop_with_analyzer(screenshot)
loop.pipeline.match_current_state_from_state.return_value = None
loop._execute_step()
call_args = loop.pipeline.match_current_state_from_state.call_args
passed_state = call_args.args[0]
# Le state vient de _make_state() → detected_text=["hello"], title="Title"
assert passed_state.perception.detected_text == ["hello"]
assert passed_state.window.window_title == "Title"
# Et le workflow_id est bien propagé
assert call_args.kwargs.get("workflow_id") == "wf1"

View File

@@ -0,0 +1,185 @@
"""
Tests unitaires de `ScreenAnalyzer` (Lot C — thread-safety).
Couvre :
- Les flags runtime sont kwargs-only (enable_ocr, enable_ui_detection, session_id)
- L'init lazy (OCR + UIDetector) est protégée par un lock → pas de double init
- `analyze()` ne mute jamais `_ocr*` / `_ui_detector*` pour gérer les flags
"""
from __future__ import annotations
import threading
import time
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from PIL import Image
from core.pipeline.screen_analyzer import ScreenAnalyzer
@pytest.fixture
def screenshot(tmp_path):
path = tmp_path / "shot.png"
Image.new("RGB", (64, 64), color=(100, 100, 100)).save(str(path))
return str(path)
# -----------------------------------------------------------------------------
# API — kwargs-only
# -----------------------------------------------------------------------------
class TestAnalyzeKwargsOnly:
"""Les flags runtime doivent être passés en kwargs-only, jamais positionnels."""
def test_analyze_kwargs_only_accept(self, screenshot):
"""L'appel nominal avec kwargs fonctionne."""
analyzer = ScreenAnalyzer()
# Empêcher l'init réelle
analyzer._ocr = None
analyzer._ocr_initialized = True
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
state = analyzer.analyze(
screenshot,
enable_ocr=False,
enable_ui_detection=False,
session_id="s_kwargs",
)
assert state.session_id == "s_kwargs"
assert state.perception.detected_text == []
assert state.ui_elements == []
def test_analyze_rejects_positional_flags(self, screenshot):
"""Passer enable_ocr en position 4 (après window_info, context) → TypeError."""
analyzer = ScreenAnalyzer()
analyzer._ocr = None
analyzer._ocr_initialized = True
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
# Signature : analyze(self, screenshot_path, window_info=None, context=None,
# *, enable_ocr=..., enable_ui_detection=..., session_id=...)
# Un 4e argument positionnel doit être rejeté.
with pytest.raises(TypeError):
analyzer.analyze(screenshot, None, None, False) # noqa: E501 (flag positionnel interdit)
def test_analyze_session_id_propagates_to_state(self, screenshot):
"""session_id passé en kwarg remplit ScreenState.session_id et metadata."""
analyzer = ScreenAnalyzer(session_id="default_session")
analyzer._ocr = None
analyzer._ocr_initialized = True
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
# kwarg explicite → prioritaire
state_call = analyzer.analyze(screenshot, session_id="explicit_session")
assert state_call.session_id == "explicit_session"
assert state_call.metadata["session_id"] == "explicit_session"
# kwarg vide → fallback sur la valeur d'instance (rétrocompat)
state_default = analyzer.analyze(screenshot)
assert state_default.session_id == "default_session"
# -----------------------------------------------------------------------------
# Lazy init sous lock
# -----------------------------------------------------------------------------
class TestLazyInitUnderLock:
"""L'init lazy (OCR / UIDetector) ne doit jamais se faire en double."""
def test_analyze_lazy_init_under_lock(self, screenshot):
"""Init concurrente → une seule création de l'OCR."""
analyzer = ScreenAnalyzer()
# Simuler un init OCR coûteux : compte les appels, renvoie un OCR factice.
init_count = {"n": 0}
def fake_ensure_ocr_locked():
# Ne marcher qu'une fois : mimer _ensure_ocr_locked qui s'auto-verrouille.
init_count["n"] += 1
time.sleep(0.05) # laisser la concurrence s'exprimer
analyzer._ocr = lambda p: ["ok"]
analyzer._ocr_initialized = True
analyzer._ensure_ocr_locked = fake_ensure_ocr_locked # type: ignore[assignment]
# UIDetector déjà "prêt" (pas None → détection évitée via mock)
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
# N threads lancent analyze() simultanément
results = []
errors = []
def worker():
try:
s = analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=False)
results.append(s)
except Exception as e: # pragma: no cover
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(8)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=10)
assert not errors, f"Erreurs dans les threads: {errors}"
assert len(results) == 8
# UNE seule init OCR malgré 8 appels concurrents
assert init_count["n"] == 1, (
f"Init OCR exécutée {init_count['n']} fois — doit être 1 sous lock"
)
def test_analyze_no_mutation_for_flag_bypass(self, screenshot):
"""enable_ocr=False NE DOIT PAS muter self._ocr ni _ocr_initialized."""
analyzer = ScreenAnalyzer()
# État "frais" : rien d'initialisé
assert analyzer._ocr is None
assert analyzer._ocr_initialized is False
assert analyzer._ui_detector is None
assert analyzer._ui_detector_initialized is False
analyzer.analyze(screenshot, enable_ocr=False, enable_ui_detection=False)
# L'état interne doit être strictement inchangé : aucune init n'a été
# déclenchée puisque les deux flags étaient à False.
assert analyzer._ocr is None
assert analyzer._ocr_initialized is False
assert analyzer._ui_detector is None
assert analyzer._ui_detector_initialized is False
def test_analyze_lazy_init_only_when_requested(self, screenshot):
"""enable_ocr=True sur instance fraîche → init déclenchée.
enable_ocr=False sur instance fraîche → pas d'init."""
analyzer = ScreenAnalyzer()
calls = {"ocr": 0, "ui": 0}
def fake_ocr_init():
calls["ocr"] += 1
analyzer._ocr = lambda p: []
analyzer._ocr_initialized = True
def fake_ui_init():
calls["ui"] += 1
analyzer._ui_detector = None
analyzer._ui_detector_initialized = True
analyzer._ensure_ocr_locked = fake_ocr_init # type: ignore[assignment]
analyzer._ensure_ui_detector_locked = fake_ui_init # type: ignore[assignment]
# Appel 1 : seul OCR demandé
analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=False)
assert calls["ocr"] == 1
assert calls["ui"] == 0
# Appel 2 : maintenant UI demandée
analyzer.analyze(screenshot, enable_ocr=True, enable_ui_detection=True)
assert calls["ocr"] == 1 # déjà initialisé, pas de réinit
assert calls["ui"] == 1