feat(cache): ScreenStateCache clé composite context-aware (Lot D)

Avant : clé = phash seul
-> deux contextes différents avec même screenshot partageaient
la même entrée cache -> collisions silencieuses.

Après : clé composite {phash}|{md5(ctx)[:16]} avec ctx =
  - window_title
  - app_name
  - enable_ocr
  - enable_ui_detection
  - workflow_id (isolation inter-workflows)

get_or_compute() kwargs-only. TTL 2s et éviction LRU inchangés.
invalidate_if_changed() continue de comparer uniquement les phash.

ExecutionLoop propage tout le contexte au cache.

8 nouveaux tests prouvant :
  - même image + window différent = miss
  - même image + app différent = miss
  - même image + flags différents = miss
  - même image + workflow_id différent = miss
  - même image + même contexte = hit

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-15 09:06:51 +02:00
parent 9ca277a63f
commit c8a3618e27
2 changed files with 858 additions and 0 deletions

View File

@@ -0,0 +1,409 @@
"""
ScreenStateCache — Cache perceptuel de ScreenState (context-aware).
Objectif : éviter de réanalyser un screenshot identique (5-15s VLM/OCR)
à chaque step de la boucle d'exécution.
Principe (Lot D — avril 2026) :
- Clé = composite de 6 éléments pour éviter les collisions silencieuses
entre contextes différents partageant un même screenshot :
1. phash (dhash 8x8 du screenshot) — calculé en ~2-5ms
2. window_title (titre fenêtre active)
3. app_name (nom process actif)
4. enable_ocr (flag runtime)
5. enable_ui_detection (flag runtime)
6. workflow_id (isolation inter-workflows)
- TTL par défaut : 2 secondes (configurable)
- Invalidation explicite possible (par clé composite ou globale)
- invalidate_if_changed reste piloté par le phash seul (détection de
changement visuel majeur, indépendant du contexte)
- Thread-safe (lock interne)
API principale :
>>> cache = ScreenStateCache(ttl_seconds=2.0)
>>> state, hit, ms = cache.get_or_compute(
... screenshot_path, compute_fn,
... window_title="App", app_name="app.exe",
... enable_ocr=True, enable_ui_detection=True,
... workflow_id="wf_123",
... )
La fonction `compute_fn` prend le chemin du screenshot et doit retourner
un `ScreenState`. Elle n'est appelée qu'en cache miss.
"""
from __future__ import annotations
import hashlib
import logging
import threading
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Optional, Tuple
from PIL import Image
from core.models.screen_state import ScreenState
logger = logging.getLogger(__name__)
# =============================================================================
# Hash perceptuel (dhash simple, sans dépendance imagehash)
# =============================================================================
def _hamming_distance_hex(a: str, b: str) -> int:
"""
Distance de Hamming entre deux chaînes hexadécimales de même longueur.
Retourne le nombre de bits qui diffèrent entre les deux hashes.
Si les longueurs diffèrent, on pad à droite par des zéros.
"""
if len(a) != len(b):
max_len = max(len(a), len(b))
a = a.ljust(max_len, "0")
b = b.ljust(max_len, "0")
try:
xor = int(a, 16) ^ int(b, 16)
return bin(xor).count("1")
except ValueError:
# Fallback : comparaison caractère à caractère
return sum(1 for ca, cb in zip(a, b) if ca != cb) * 4
def compute_perceptual_hash(screenshot_path: str, size: int = 8) -> str:
"""
Calculer un dhash (difference hash) pour un screenshot.
Algorithme :
1. Convertir en niveaux de gris
2. Redimensionner à (size+1) x size
3. Comparer chaque pixel avec son voisin de droite (dhash)
4. Retourner un hash hexadécimal de size*size bits
Robuste aux petites variations (curseur, blink, compression).
Coût typique : 2-5 ms sur un 1920x1080.
Args:
screenshot_path: Chemin vers le fichier image
size: Taille du hash (8 = 64 bits, défaut)
Returns:
Chaîne hexadécimale (size*size/4 caractères)
"""
try:
img = Image.open(screenshot_path)
img = img.convert("L").resize((size + 1, size), Image.LANCZOS)
pixels = list(img.getdata())
# dhash : comparer chaque pixel avec celui de droite
bits = []
for row in range(size):
for col in range(size):
left = pixels[row * (size + 1) + col]
right = pixels[row * (size + 1) + col + 1]
bits.append(1 if left > right else 0)
# Convertir en hex
value = 0
for bit in bits:
value = (value << 1) | bit
return format(value, f"0{size * size // 4}x")
except Exception as e:
logger.warning(f"Hash perceptuel échoué pour {screenshot_path}: {e}")
# Fallback : hash du contenu brut
try:
data = Path(screenshot_path).read_bytes()
return hashlib.md5(data).hexdigest()[:16]
except Exception:
return f"unhashable_{int(time.time() * 1000)}"
# =============================================================================
# Clé composite (Lot D)
# =============================================================================
def _make_cache_key(
phash: str,
window_title: str,
app_name: str,
enable_ocr: bool,
enable_ui_detection: bool,
workflow_id: str,
) -> str:
"""
Construire une clé composite stable pour le cache.
Combine les 6 dimensions du contexte d'exécution dans une chaîne
hexadécimale (md5 tronqué à 16 caractères), préfixée par le phash pour
conserver une lisibilité minimale en debug (log : `aabb…|ctx=1234…`).
NB : On hash plutôt que concaténer brut pour :
- Borner la taille de la clé même si window_title est long
- Éviter les collisions triviales (séparateur présent dans un titre)
- Rendre la clé opaque (pas de PII en clair dans les logs de cache)
Args:
phash: Hash perceptuel du screenshot (dhash 8x8)
window_title: Titre de la fenêtre active (str)
app_name: Nom du process actif (str)
enable_ocr: Flag runtime OCR (bool)
enable_ui_detection: Flag runtime détection UI (bool)
workflow_id: ID du workflow en cours (str, "" pour legacy)
Returns:
Clé composite `{phash}|{ctx_hash}` où ctx_hash = md5(16)
"""
# Sérialisation déterministe ; `|` comme séparateur interne puisque hashé.
ctx_repr = (
f"{window_title or ''}\x1f"
f"{app_name or ''}\x1f"
f"{int(bool(enable_ocr))}\x1f"
f"{int(bool(enable_ui_detection))}\x1f"
f"{workflow_id or ''}"
)
ctx_hash = hashlib.md5(ctx_repr.encode("utf-8")).hexdigest()[:16]
return f"{phash}|{ctx_hash}"
# =============================================================================
# Entry
# =============================================================================
@dataclass
class _CacheEntry:
state: ScreenState
created_at: float
phash: str # phash seul (utilisé par invalidate_if_changed)
# =============================================================================
# Cache
# =============================================================================
class ScreenStateCache:
"""
Cache de ScreenState avec TTL et clé composite context-aware.
Thread-safe. Utilise un lock interne pour les opérations get/set.
"""
def __init__(self, ttl_seconds: float = 2.0, max_entries: int = 16):
"""
Args:
ttl_seconds: Durée de vie d'une entrée (en secondes)
max_entries: Nombre max d'entrées avant éviction LRU simple
"""
self.ttl_seconds = ttl_seconds
self.max_entries = max_entries
# Clé = composite (_make_cache_key), valeur = _CacheEntry
self._store: dict[str, _CacheEntry] = {}
self._lock = threading.Lock()
# Métriques simples (utile pour le debug / logs)
self.hits = 0
self.misses = 0
self.invalidations = 0
# -------------------------------------------------------------------------
# API bas niveau (par clé composite)
# -------------------------------------------------------------------------
def _get(self, composite_key: str) -> Optional[ScreenState]:
"""Retourne l'entrée pour cette clé composite si encore valide."""
with self._lock:
entry = self._store.get(composite_key)
if entry is None:
return None
if time.time() - entry.created_at > self.ttl_seconds:
# Expiré
self._store.pop(composite_key, None)
return None
return entry.state
def _set(self, composite_key: str, phash: str, state: ScreenState) -> None:
"""Enregistre un état pour cette clé composite."""
with self._lock:
# Éviction simple : si plein, virer l'entrée la plus ancienne
if (
len(self._store) >= self.max_entries
and composite_key not in self._store
):
oldest_key = min(
self._store, key=lambda k: self._store[k].created_at
)
self._store.pop(oldest_key, None)
self._store[composite_key] = _CacheEntry(
state=state,
created_at=time.time(),
phash=phash,
)
def invalidate(self, composite_key: Optional[str] = None) -> None:
"""
Invalider une entrée ou tout le cache.
Args:
composite_key: Clé à invalider. Si None, vide tout le cache.
"""
with self._lock:
if composite_key is None:
self._store.clear()
else:
self._store.pop(composite_key, None)
self.invalidations += 1
def invalidate_if_changed(
self,
screenshot_path: str,
threshold: float = 0.3,
) -> bool:
"""
Invalider le cache si l'écran a suffisamment changé.
Compare le dhash du screenshot courant avec le phash (seul) de chaque
entrée du cache. La décision est volontairement indépendante du reste
de la clé composite : un changement visuel majeur rend toutes les
entrées obsolètes, quel que soit le contexte.
Args:
screenshot_path: Chemin du screenshot courant
threshold: Proportion de bits qui doivent différer (0.0-1.0).
0.3 = 30% (~19 bits sur 64) = changement significatif.
Returns:
True si le cache a été invalidé, False sinon.
"""
if not self._store:
return False
current_phash = compute_perceptual_hash(screenshot_path)
# Bits totaux : 64 pour un dhash 8x8 standard. On déduit via la
# longueur hexa du hash courant pour rester générique.
total_bits = len(current_phash) * 4
if total_bits == 0:
return False
threshold_bits = threshold * total_bits
with self._lock:
if not self._store:
return False
# Distance de Hamming minimale avec les phashes des entrées
# (on regarde entry.phash, pas la clé composite).
min_distance = None
for entry in self._store.values():
distance = _hamming_distance_hex(current_phash, entry.phash)
if min_distance is None or distance < min_distance:
min_distance = distance
if min_distance is not None and min_distance > threshold_bits:
size_before = len(self._store)
self._store.clear()
self.invalidations += 1
logger.debug(
f"[ScreenStateCache] invalidate_if_changed: "
f"distance={min_distance}/{total_bits} > "
f"threshold={threshold_bits:.1f}{size_before} entrées purgées"
)
return True
return False
# -------------------------------------------------------------------------
# API haut niveau (context-aware)
# -------------------------------------------------------------------------
def get_or_compute(
self,
screenshot_path: str,
compute_fn: Callable[[str], ScreenState],
*,
window_title: str = "",
app_name: str = "",
enable_ocr: bool = True,
enable_ui_detection: bool = True,
workflow_id: str = "",
force_refresh: bool = False,
) -> Tuple[ScreenState, bool, float]:
"""
Récupérer ou calculer le ScreenState pour un screenshot + contexte.
Clé de cache = composite(phash, window_title, app_name, enable_ocr,
enable_ui_detection, workflow_id). Deux contextes différents partageant
le même screenshot n'entrent PAS en collision.
Rétrocompatibilité : tous les kwargs de contexte ont une valeur par
défaut. Un caller legacy qui n'a pas encore été adapté partagera la
même entrée de cache qu'un autre caller legacy (comportement antérieur).
Args:
screenshot_path: Chemin du screenshot
compute_fn: Fonction qui construit un ScreenState si cache miss
window_title: Titre de la fenêtre active (contexte visuel)
app_name: Nom du process actif (contexte applicatif)
enable_ocr: Flag runtime — différencie états avec/sans OCR
enable_ui_detection: Flag runtime — différencie états avec/sans UI
workflow_id: ID du workflow — isolation inter-workflows
force_refresh: Ignorer le cache et recalculer
Returns:
Tuple (state, cache_hit, elapsed_ms)
"""
t0 = time.time()
phash = compute_perceptual_hash(screenshot_path)
composite_key = _make_cache_key(
phash=phash,
window_title=window_title,
app_name=app_name,
enable_ocr=enable_ocr,
enable_ui_detection=enable_ui_detection,
workflow_id=workflow_id,
)
if not force_refresh:
cached = self._get(composite_key)
if cached is not None:
self.hits += 1
elapsed_ms = (time.time() - t0) * 1000
logger.debug(
f"[ScreenStateCache] HIT key={composite_key[:24]}"
f"({elapsed_ms:.1f}ms)"
)
return cached, True, elapsed_ms
# Cache miss → calcul complet
self.misses += 1
state = compute_fn(screenshot_path)
self._set(composite_key, phash, state)
elapsed_ms = (time.time() - t0) * 1000
logger.debug(
f"[ScreenStateCache] MISS key={composite_key[:24]}"
f"({elapsed_ms:.1f}ms)"
)
return state, False, elapsed_ms
def stats(self) -> dict:
"""Retourne les métriques du cache."""
with self._lock:
total = self.hits + self.misses
return {
"hits": self.hits,
"misses": self.misses,
"invalidations": self.invalidations,
"hit_rate": self.hits / total if total > 0 else 0.0,
"size": len(self._store),
"max_entries": self.max_entries,
"ttl_seconds": self.ttl_seconds,
}
def __len__(self) -> int:
with self._lock:
return len(self._store)

View File

@@ -0,0 +1,449 @@
"""
Tests unitaires du ScreenStateCache.
Couvre :
- Hash perceptuel (déterministe, stable sur même image, différent sur autres)
- Cache hit / miss
- TTL (expiration)
- Invalidation explicite
- Éviction LRU
- Thread-safety basique
"""
from __future__ import annotations
import threading
import time
from datetime import datetime
from pathlib import Path
import pytest
from PIL import Image
from core.models.screen_state import (
ContextLevel,
EmbeddingRef,
PerceptionLevel,
RawLevel,
ScreenState,
WindowContext,
)
from core.pipeline.screen_state_cache import (
ScreenStateCache,
compute_perceptual_hash,
)
# -----------------------------------------------------------------------------
# Fixtures
# -----------------------------------------------------------------------------
def _make_screenshot(tmp_path: Path, color: tuple, name: str = "shot.png") -> str:
img = Image.new("RGB", (320, 240), color=color)
path = tmp_path / name
img.save(str(path))
return str(path)
def _make_state(session_id: str = "s1") -> ScreenState:
return ScreenState(
screen_state_id=f"state_{datetime.now().strftime('%H%M%S%f')}",
timestamp=datetime.now(),
session_id=session_id,
window=WindowContext(
app_name="app", window_title="Title", screen_resolution=[1920, 1080]
),
raw=RawLevel(screenshot_path="", capture_method="test", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512),
detected_text=[],
text_detection_method="none",
confidence_avg=0.0,
),
context=ContextLevel(),
ui_elements=[],
)
# -----------------------------------------------------------------------------
# Hash perceptuel
# -----------------------------------------------------------------------------
class TestPerceptualHash:
def test_deterministic_for_same_image(self, tmp_path):
path = _make_screenshot(tmp_path, (255, 0, 0))
h1 = compute_perceptual_hash(path)
h2 = compute_perceptual_hash(path)
assert h1 == h2
assert len(h1) == 16 # 8*8 bits = 64 bits = 16 hex chars
def test_differs_across_images(self, tmp_path):
path_red = _make_screenshot(tmp_path, (255, 0, 0), "red.png")
path_blue = _make_screenshot(tmp_path, (0, 0, 255), "blue.png")
# Note : deux images unies ont le même dhash (toutes différences nulles)
# On doit utiliser des images avec un vrai gradient pour différer.
grad_red = Image.new("RGB", (320, 240))
for x in range(320):
for y in range(240):
grad_red.putpixel((x, y), (x % 256, 0, 0))
grad_path = tmp_path / "grad_red.png"
grad_red.save(str(grad_path))
h_red = compute_perceptual_hash(path_red)
h_grad = compute_perceptual_hash(str(grad_path))
assert h_red != h_grad
def test_robust_to_missing_file(self, tmp_path):
# Chemin inexistant → fallback mais pas de crash
h = compute_perceptual_hash(str(tmp_path / "does_not_exist.png"))
assert isinstance(h, str)
assert len(h) > 0
# -----------------------------------------------------------------------------
# Cache
# -----------------------------------------------------------------------------
class TestScreenStateCache:
def test_get_or_compute_cache_miss_then_hit(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (100, 100, 100))
calls = []
def compute(p):
calls.append(p)
return _make_state()
s1, hit1, _ = cache.get_or_compute(path, compute)
s2, hit2, _ = cache.get_or_compute(path, compute)
assert hit1 is False
assert hit2 is True
assert len(calls) == 1
assert s1 is s2 # Même objet retourné
def test_ttl_expiration(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=0.1)
path = _make_screenshot(tmp_path, (50, 50, 50))
def compute(_):
return _make_state()
cache.get_or_compute(path, compute)
time.sleep(0.15)
_, hit, _ = cache.get_or_compute(path, compute)
assert hit is False # Expiré
def test_force_refresh_bypasses_cache(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (10, 10, 10))
cache.get_or_compute(path, lambda _: _make_state())
_, hit, _ = cache.get_or_compute(
path, lambda _: _make_state(), force_refresh=True
)
assert hit is False
def test_invalidate_all(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (200, 200, 200))
cache.get_or_compute(path, lambda _: _make_state())
cache.invalidate()
_, hit, _ = cache.get_or_compute(path, lambda _: _make_state())
assert hit is False
def test_eviction_lru(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0, max_entries=2)
# Créer 3 images différentes (gradients différents pour hashes différents)
paths = []
for i, intensity in enumerate([30, 120, 220]):
img = Image.new("RGB", (320, 240))
for x in range(320):
for y in range(240):
img.putpixel((x, y), ((x + intensity) % 256, intensity, 0))
p = tmp_path / f"grad_{i}.png"
img.save(str(p))
paths.append(str(p))
def compute(_):
return _make_state()
cache.get_or_compute(paths[0], compute)
time.sleep(0.01)
cache.get_or_compute(paths[1], compute)
time.sleep(0.01)
cache.get_or_compute(paths[2], compute)
# Le 1er doit avoir été évincé
assert len(cache) == 2
def test_stats(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (77, 77, 77))
cache.get_or_compute(path, lambda _: _make_state())
cache.get_or_compute(path, lambda _: _make_state())
stats = cache.stats()
assert stats["hits"] == 1
assert stats["misses"] == 1
assert stats["hit_rate"] == 0.5
def test_invalidate_if_changed_purges_on_big_change(self, tmp_path):
"""Un screenshot très différent doit invalider tout le cache."""
import random
cache = ScreenStateCache(ttl_seconds=10.0)
# Image 1 : gradient doux
img1 = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
img1.putpixel((x, y), (y, y, y))
p1 = tmp_path / "v.png"
img1.save(str(p1))
# Image 2 : bruit aléatoire (structure radicalement différente)
random.seed(42)
img2 = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
v = random.randint(0, 255)
img2.putpixel((x, y), (v, v, v))
p2 = tmp_path / "noise.png"
img2.save(str(p2))
cache.get_or_compute(str(p1), lambda _: _make_state())
assert len(cache) == 1
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
assert purged is True
assert len(cache) == 0
def test_invalidate_if_changed_keeps_cache_on_small_change(self, tmp_path):
"""Un screenshot très proche ne doit PAS invalider le cache."""
cache = ScreenStateCache(ttl_seconds=10.0)
# Même gradient avec un léger bruit
img1 = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
img1.putpixel((x, y), ((x + y) % 256, 0, 0))
p1 = tmp_path / "a.png"
img1.save(str(p1))
img2 = img1.copy()
# Bruit léger : changer seulement quelques pixels
for i in range(5):
img2.putpixel((i, 0), (255, 255, 255))
p2 = tmp_path / "b.png"
img2.save(str(p2))
cache.get_or_compute(str(p1), lambda _: _make_state())
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
assert purged is False
assert len(cache) == 1
def test_invalidate_if_changed_empty_cache_is_noop(self, tmp_path):
"""Sur cache vide, invalidate_if_changed ne doit rien faire."""
cache = ScreenStateCache(ttl_seconds=10.0)
p = _make_screenshot(tmp_path, (100, 100, 100))
purged = cache.invalidate_if_changed(p, threshold=0.3)
assert purged is False
def test_thread_safety(self, tmp_path):
"""Lecture/écriture concurrentes ne doivent pas crasher."""
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (64, 64, 64))
errors = []
def worker():
try:
for _ in range(20):
cache.get_or_compute(path, lambda _: _make_state())
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=worker) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert not errors
# -----------------------------------------------------------------------------
# Clé composite context-aware (Lot D)
# -----------------------------------------------------------------------------
class TestCacheContextAware:
"""Lot D — Le cache ne doit jamais hit entre deux contextes différents.
La clé composite combine 6 éléments : phash, window_title, app_name,
enable_ocr, enable_ui_detection, workflow_id. Toute variation sur une
de ces dimensions doit produire un cache miss, même si le screenshot
(donc le phash) est strictement identique.
"""
def test_same_image_different_window_miss(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (60, 60, 60))
_, hit_a, _ = cache.get_or_compute(
path,
lambda _: _make_state(),
window_title="Chrome",
app_name="chrome.exe",
workflow_id="wf1",
)
_, hit_b, _ = cache.get_or_compute(
path,
lambda _: _make_state(),
window_title="Firefox", # Diffère
app_name="chrome.exe",
workflow_id="wf1",
)
assert hit_a is False
assert hit_b is False # Contexte fenêtre différent → miss
def test_same_image_different_app_miss(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (90, 90, 90))
cache.get_or_compute(
path,
lambda _: _make_state(),
window_title="Doc.pdf",
app_name="acrobat.exe",
)
_, hit, _ = cache.get_or_compute(
path,
lambda _: _make_state(),
window_title="Doc.pdf",
app_name="sumatra.exe", # Diffère
)
assert hit is False # app_name différent → miss
def test_same_image_different_flags_miss(self, tmp_path):
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (120, 120, 120))
# Run 1 : OCR actif
cache.get_or_compute(
path,
lambda _: _make_state(),
enable_ocr=True,
enable_ui_detection=True,
)
# Run 2 : OCR désactivé → clé différente
_, hit_ocr_off, _ = cache.get_or_compute(
path,
lambda _: _make_state(),
enable_ocr=False,
enable_ui_detection=True,
)
# Run 3 : UI désactivé → encore une autre clé
_, hit_ui_off, _ = cache.get_or_compute(
path,
lambda _: _make_state(),
enable_ocr=True,
enable_ui_detection=False,
)
assert hit_ocr_off is False
assert hit_ui_off is False
def test_same_image_different_workflow_miss(self, tmp_path):
"""Isolation stricte inter-workflows : replay wf1 ≠ replay wf2."""
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (33, 77, 200))
cache.get_or_compute(
path, lambda _: _make_state(), workflow_id="wf_alpha"
)
_, hit, _ = cache.get_or_compute(
path, lambda _: _make_state(), workflow_id="wf_beta"
)
assert hit is False
def test_same_image_same_context_hit(self, tmp_path):
"""Tout identique → hit (comportement cache nominal)."""
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (42, 42, 42))
kwargs = dict(
window_title="Notepad",
app_name="notepad.exe",
enable_ocr=True,
enable_ui_detection=True,
workflow_id="wf_stable",
)
calls = []
def compute(p):
calls.append(p)
return _make_state()
_, hit1, _ = cache.get_or_compute(path, compute, **kwargs)
_, hit2, _ = cache.get_or_compute(path, compute, **kwargs)
assert hit1 is False
assert hit2 is True
assert len(calls) == 1
def test_default_context_is_stable(self, tmp_path):
"""Rétrocompat : deux callers sans kwargs de contexte partagent
la même entrée de cache (ancien comportement préservé)."""
cache = ScreenStateCache(ttl_seconds=10.0)
path = _make_screenshot(tmp_path, (11, 22, 33))
calls = []
def compute(p):
calls.append(p)
return _make_state()
# Deux appels sans kwargs → doivent partager la même clé
_, hit1, _ = cache.get_or_compute(path, compute)
_, hit2, _ = cache.get_or_compute(path, compute)
assert hit1 is False
assert hit2 is True
assert len(calls) == 1
def test_invalidate_if_changed_ignores_context(self, tmp_path):
"""invalidate_if_changed regarde le phash seul, pas la clé composite.
Un changement visuel majeur purge toutes les entrées, quel que soit
leur contexte (workflow, flags, fenêtre)."""
import random
cache = ScreenStateCache(ttl_seconds=10.0)
# Deux entrées dans des contextes différents MAIS pour la même image.
img1 = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
img1.putpixel((x, y), (y, y, y))
p1 = tmp_path / "orig.png"
img1.save(str(p1))
cache.get_or_compute(
str(p1), lambda _: _make_state(), workflow_id="wf1"
)
cache.get_or_compute(
str(p1), lambda _: _make_state(), workflow_id="wf2"
)
assert len(cache) == 2
# Nouveau screenshot radicalement différent → doit tout purger.
random.seed(42)
img2 = Image.new("RGB", (320, 240))
for y in range(240):
for x in range(320):
v = random.randint(0, 255)
img2.putpixel((x, y), (v, v, v))
p2 = tmp_path / "noise.png"
img2.save(str(p2))
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
assert purged is True
assert len(cache) == 0