Avant : clé = phash seul
-> deux contextes différents avec même screenshot partageaient
la même entrée cache -> collisions silencieuses.
Après : clé composite {phash}|{md5(ctx)[:16]} avec ctx =
- window_title
- app_name
- enable_ocr
- enable_ui_detection
- workflow_id (isolation inter-workflows)
get_or_compute() kwargs-only. TTL 2s et éviction LRU inchangés.
invalidate_if_changed() continue de comparer uniquement les phash.
ExecutionLoop propage tout le contexte au cache.
8 nouveaux tests prouvant :
- même image + window différent = miss
- même image + app différent = miss
- même image + flags différents = miss
- même image + workflow_id différent = miss
- même image + même contexte = hit
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
450 lines
15 KiB
Python
450 lines
15 KiB
Python
"""
|
|
Tests unitaires du ScreenStateCache.
|
|
|
|
Couvre :
|
|
- Hash perceptuel (déterministe, stable sur même image, différent sur autres)
|
|
- Cache hit / miss
|
|
- TTL (expiration)
|
|
- Invalidation explicite
|
|
- Éviction LRU
|
|
- Thread-safety basique
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from PIL import Image
|
|
|
|
from core.models.screen_state import (
|
|
ContextLevel,
|
|
EmbeddingRef,
|
|
PerceptionLevel,
|
|
RawLevel,
|
|
ScreenState,
|
|
WindowContext,
|
|
)
|
|
from core.pipeline.screen_state_cache import (
|
|
ScreenStateCache,
|
|
compute_perceptual_hash,
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Fixtures
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
def _make_screenshot(tmp_path: Path, color: tuple, name: str = "shot.png") -> str:
|
|
img = Image.new("RGB", (320, 240), color=color)
|
|
path = tmp_path / name
|
|
img.save(str(path))
|
|
return str(path)
|
|
|
|
|
|
def _make_state(session_id: str = "s1") -> ScreenState:
|
|
return ScreenState(
|
|
screen_state_id=f"state_{datetime.now().strftime('%H%M%S%f')}",
|
|
timestamp=datetime.now(),
|
|
session_id=session_id,
|
|
window=WindowContext(
|
|
app_name="app", window_title="Title", screen_resolution=[1920, 1080]
|
|
),
|
|
raw=RawLevel(screenshot_path="", capture_method="test", file_size_bytes=0),
|
|
perception=PerceptionLevel(
|
|
embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512),
|
|
detected_text=[],
|
|
text_detection_method="none",
|
|
confidence_avg=0.0,
|
|
),
|
|
context=ContextLevel(),
|
|
ui_elements=[],
|
|
)
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Hash perceptuel
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
class TestPerceptualHash:
|
|
|
|
def test_deterministic_for_same_image(self, tmp_path):
|
|
path = _make_screenshot(tmp_path, (255, 0, 0))
|
|
h1 = compute_perceptual_hash(path)
|
|
h2 = compute_perceptual_hash(path)
|
|
assert h1 == h2
|
|
assert len(h1) == 16 # 8*8 bits = 64 bits = 16 hex chars
|
|
|
|
def test_differs_across_images(self, tmp_path):
|
|
path_red = _make_screenshot(tmp_path, (255, 0, 0), "red.png")
|
|
path_blue = _make_screenshot(tmp_path, (0, 0, 255), "blue.png")
|
|
# Note : deux images unies ont le même dhash (toutes différences nulles)
|
|
# On doit utiliser des images avec un vrai gradient pour différer.
|
|
grad_red = Image.new("RGB", (320, 240))
|
|
for x in range(320):
|
|
for y in range(240):
|
|
grad_red.putpixel((x, y), (x % 256, 0, 0))
|
|
grad_path = tmp_path / "grad_red.png"
|
|
grad_red.save(str(grad_path))
|
|
|
|
h_red = compute_perceptual_hash(path_red)
|
|
h_grad = compute_perceptual_hash(str(grad_path))
|
|
assert h_red != h_grad
|
|
|
|
def test_robust_to_missing_file(self, tmp_path):
|
|
# Chemin inexistant → fallback mais pas de crash
|
|
h = compute_perceptual_hash(str(tmp_path / "does_not_exist.png"))
|
|
assert isinstance(h, str)
|
|
assert len(h) > 0
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Cache
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
class TestScreenStateCache:
|
|
|
|
def test_get_or_compute_cache_miss_then_hit(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (100, 100, 100))
|
|
|
|
calls = []
|
|
|
|
def compute(p):
|
|
calls.append(p)
|
|
return _make_state()
|
|
|
|
s1, hit1, _ = cache.get_or_compute(path, compute)
|
|
s2, hit2, _ = cache.get_or_compute(path, compute)
|
|
|
|
assert hit1 is False
|
|
assert hit2 is True
|
|
assert len(calls) == 1
|
|
assert s1 is s2 # Même objet retourné
|
|
|
|
def test_ttl_expiration(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=0.1)
|
|
path = _make_screenshot(tmp_path, (50, 50, 50))
|
|
|
|
def compute(_):
|
|
return _make_state()
|
|
|
|
cache.get_or_compute(path, compute)
|
|
time.sleep(0.15)
|
|
_, hit, _ = cache.get_or_compute(path, compute)
|
|
assert hit is False # Expiré
|
|
|
|
def test_force_refresh_bypasses_cache(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (10, 10, 10))
|
|
cache.get_or_compute(path, lambda _: _make_state())
|
|
_, hit, _ = cache.get_or_compute(
|
|
path, lambda _: _make_state(), force_refresh=True
|
|
)
|
|
assert hit is False
|
|
|
|
def test_invalidate_all(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (200, 200, 200))
|
|
cache.get_or_compute(path, lambda _: _make_state())
|
|
cache.invalidate()
|
|
_, hit, _ = cache.get_or_compute(path, lambda _: _make_state())
|
|
assert hit is False
|
|
|
|
def test_eviction_lru(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0, max_entries=2)
|
|
# Créer 3 images différentes (gradients différents pour hashes différents)
|
|
paths = []
|
|
for i, intensity in enumerate([30, 120, 220]):
|
|
img = Image.new("RGB", (320, 240))
|
|
for x in range(320):
|
|
for y in range(240):
|
|
img.putpixel((x, y), ((x + intensity) % 256, intensity, 0))
|
|
p = tmp_path / f"grad_{i}.png"
|
|
img.save(str(p))
|
|
paths.append(str(p))
|
|
|
|
def compute(_):
|
|
return _make_state()
|
|
|
|
cache.get_or_compute(paths[0], compute)
|
|
time.sleep(0.01)
|
|
cache.get_or_compute(paths[1], compute)
|
|
time.sleep(0.01)
|
|
cache.get_or_compute(paths[2], compute)
|
|
# Le 1er doit avoir été évincé
|
|
assert len(cache) == 2
|
|
|
|
def test_stats(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (77, 77, 77))
|
|
cache.get_or_compute(path, lambda _: _make_state())
|
|
cache.get_or_compute(path, lambda _: _make_state())
|
|
stats = cache.stats()
|
|
assert stats["hits"] == 1
|
|
assert stats["misses"] == 1
|
|
assert stats["hit_rate"] == 0.5
|
|
|
|
def test_invalidate_if_changed_purges_on_big_change(self, tmp_path):
|
|
"""Un screenshot très différent doit invalider tout le cache."""
|
|
import random
|
|
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
# Image 1 : gradient doux
|
|
img1 = Image.new("RGB", (320, 240))
|
|
for y in range(240):
|
|
for x in range(320):
|
|
img1.putpixel((x, y), (y, y, y))
|
|
p1 = tmp_path / "v.png"
|
|
img1.save(str(p1))
|
|
|
|
# Image 2 : bruit aléatoire (structure radicalement différente)
|
|
random.seed(42)
|
|
img2 = Image.new("RGB", (320, 240))
|
|
for y in range(240):
|
|
for x in range(320):
|
|
v = random.randint(0, 255)
|
|
img2.putpixel((x, y), (v, v, v))
|
|
p2 = tmp_path / "noise.png"
|
|
img2.save(str(p2))
|
|
|
|
cache.get_or_compute(str(p1), lambda _: _make_state())
|
|
assert len(cache) == 1
|
|
|
|
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
|
|
assert purged is True
|
|
assert len(cache) == 0
|
|
|
|
def test_invalidate_if_changed_keeps_cache_on_small_change(self, tmp_path):
|
|
"""Un screenshot très proche ne doit PAS invalider le cache."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
# Même gradient avec un léger bruit
|
|
img1 = Image.new("RGB", (320, 240))
|
|
for y in range(240):
|
|
for x in range(320):
|
|
img1.putpixel((x, y), ((x + y) % 256, 0, 0))
|
|
p1 = tmp_path / "a.png"
|
|
img1.save(str(p1))
|
|
|
|
img2 = img1.copy()
|
|
# Bruit léger : changer seulement quelques pixels
|
|
for i in range(5):
|
|
img2.putpixel((i, 0), (255, 255, 255))
|
|
p2 = tmp_path / "b.png"
|
|
img2.save(str(p2))
|
|
|
|
cache.get_or_compute(str(p1), lambda _: _make_state())
|
|
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
|
|
assert purged is False
|
|
assert len(cache) == 1
|
|
|
|
def test_invalidate_if_changed_empty_cache_is_noop(self, tmp_path):
|
|
"""Sur cache vide, invalidate_if_changed ne doit rien faire."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
p = _make_screenshot(tmp_path, (100, 100, 100))
|
|
purged = cache.invalidate_if_changed(p, threshold=0.3)
|
|
assert purged is False
|
|
|
|
def test_thread_safety(self, tmp_path):
|
|
"""Lecture/écriture concurrentes ne doivent pas crasher."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (64, 64, 64))
|
|
errors = []
|
|
|
|
def worker():
|
|
try:
|
|
for _ in range(20):
|
|
cache.get_or_compute(path, lambda _: _make_state())
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
threads = [threading.Thread(target=worker) for _ in range(5)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
|
|
assert not errors
|
|
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# Clé composite context-aware (Lot D)
|
|
# -----------------------------------------------------------------------------
|
|
|
|
|
|
class TestCacheContextAware:
|
|
"""Lot D — Le cache ne doit jamais hit entre deux contextes différents.
|
|
|
|
La clé composite combine 6 éléments : phash, window_title, app_name,
|
|
enable_ocr, enable_ui_detection, workflow_id. Toute variation sur une
|
|
de ces dimensions doit produire un cache miss, même si le screenshot
|
|
(donc le phash) est strictement identique.
|
|
"""
|
|
|
|
def test_same_image_different_window_miss(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (60, 60, 60))
|
|
|
|
_, hit_a, _ = cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
window_title="Chrome",
|
|
app_name="chrome.exe",
|
|
workflow_id="wf1",
|
|
)
|
|
_, hit_b, _ = cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
window_title="Firefox", # Diffère
|
|
app_name="chrome.exe",
|
|
workflow_id="wf1",
|
|
)
|
|
assert hit_a is False
|
|
assert hit_b is False # Contexte fenêtre différent → miss
|
|
|
|
def test_same_image_different_app_miss(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (90, 90, 90))
|
|
|
|
cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
window_title="Doc.pdf",
|
|
app_name="acrobat.exe",
|
|
)
|
|
_, hit, _ = cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
window_title="Doc.pdf",
|
|
app_name="sumatra.exe", # Diffère
|
|
)
|
|
assert hit is False # app_name différent → miss
|
|
|
|
def test_same_image_different_flags_miss(self, tmp_path):
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (120, 120, 120))
|
|
|
|
# Run 1 : OCR actif
|
|
cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
enable_ocr=True,
|
|
enable_ui_detection=True,
|
|
)
|
|
# Run 2 : OCR désactivé → clé différente
|
|
_, hit_ocr_off, _ = cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
enable_ocr=False,
|
|
enable_ui_detection=True,
|
|
)
|
|
# Run 3 : UI désactivé → encore une autre clé
|
|
_, hit_ui_off, _ = cache.get_or_compute(
|
|
path,
|
|
lambda _: _make_state(),
|
|
enable_ocr=True,
|
|
enable_ui_detection=False,
|
|
)
|
|
assert hit_ocr_off is False
|
|
assert hit_ui_off is False
|
|
|
|
def test_same_image_different_workflow_miss(self, tmp_path):
|
|
"""Isolation stricte inter-workflows : replay wf1 ≠ replay wf2."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (33, 77, 200))
|
|
|
|
cache.get_or_compute(
|
|
path, lambda _: _make_state(), workflow_id="wf_alpha"
|
|
)
|
|
_, hit, _ = cache.get_or_compute(
|
|
path, lambda _: _make_state(), workflow_id="wf_beta"
|
|
)
|
|
assert hit is False
|
|
|
|
def test_same_image_same_context_hit(self, tmp_path):
|
|
"""Tout identique → hit (comportement cache nominal)."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (42, 42, 42))
|
|
|
|
kwargs = dict(
|
|
window_title="Notepad",
|
|
app_name="notepad.exe",
|
|
enable_ocr=True,
|
|
enable_ui_detection=True,
|
|
workflow_id="wf_stable",
|
|
)
|
|
calls = []
|
|
|
|
def compute(p):
|
|
calls.append(p)
|
|
return _make_state()
|
|
|
|
_, hit1, _ = cache.get_or_compute(path, compute, **kwargs)
|
|
_, hit2, _ = cache.get_or_compute(path, compute, **kwargs)
|
|
assert hit1 is False
|
|
assert hit2 is True
|
|
assert len(calls) == 1
|
|
|
|
def test_default_context_is_stable(self, tmp_path):
|
|
"""Rétrocompat : deux callers sans kwargs de contexte partagent
|
|
la même entrée de cache (ancien comportement préservé)."""
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
path = _make_screenshot(tmp_path, (11, 22, 33))
|
|
|
|
calls = []
|
|
|
|
def compute(p):
|
|
calls.append(p)
|
|
return _make_state()
|
|
|
|
# Deux appels sans kwargs → doivent partager la même clé
|
|
_, hit1, _ = cache.get_or_compute(path, compute)
|
|
_, hit2, _ = cache.get_or_compute(path, compute)
|
|
assert hit1 is False
|
|
assert hit2 is True
|
|
assert len(calls) == 1
|
|
|
|
def test_invalidate_if_changed_ignores_context(self, tmp_path):
|
|
"""invalidate_if_changed regarde le phash seul, pas la clé composite.
|
|
Un changement visuel majeur purge toutes les entrées, quel que soit
|
|
leur contexte (workflow, flags, fenêtre)."""
|
|
import random
|
|
|
|
cache = ScreenStateCache(ttl_seconds=10.0)
|
|
|
|
# Deux entrées dans des contextes différents MAIS pour la même image.
|
|
img1 = Image.new("RGB", (320, 240))
|
|
for y in range(240):
|
|
for x in range(320):
|
|
img1.putpixel((x, y), (y, y, y))
|
|
p1 = tmp_path / "orig.png"
|
|
img1.save(str(p1))
|
|
|
|
cache.get_or_compute(
|
|
str(p1), lambda _: _make_state(), workflow_id="wf1"
|
|
)
|
|
cache.get_or_compute(
|
|
str(p1), lambda _: _make_state(), workflow_id="wf2"
|
|
)
|
|
assert len(cache) == 2
|
|
|
|
# Nouveau screenshot radicalement différent → doit tout purger.
|
|
random.seed(42)
|
|
img2 = Image.new("RGB", (320, 240))
|
|
for y in range(240):
|
|
for x in range(320):
|
|
v = random.randint(0, 255)
|
|
img2.putpixel((x, y), (v, v, v))
|
|
p2 = tmp_path / "noise.png"
|
|
img2.save(str(p2))
|
|
|
|
purged = cache.invalidate_if_changed(str(p2), threshold=0.3)
|
|
assert purged is True
|
|
assert len(cache) == 0
|