- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
486 lines
20 KiB
Python
486 lines
20 KiB
Python
"""
|
|
Tests de propriétés pour les optimisations de performance - Tâche 5
|
|
|
|
Valide les propriétés des optimisations implémentées :
|
|
- Propriété 6: Réutilisation de l'index spatial
|
|
- Cache des embeddings avec lazy loading
|
|
- Cache des modèles ML
|
|
- Cache des calculs redondants
|
|
|
|
Auteur : Dom, Alice Kiro - 20 décembre 2024
|
|
"""
|
|
|
|
import pytest
|
|
from hypothesis import given, strategies as st, assume, settings
|
|
import time
|
|
from unittest.mock import Mock, patch
|
|
from typing import List, Dict, Any
|
|
|
|
from core.execution.target_resolver import TargetResolver
|
|
from core.execution.computation_cache import ComputationCache, cached_bbox_center, cached_euclidean_distance
|
|
from core.embedding.fusion_engine import FusionEngine
|
|
from core.models.model_cache import ModelCache, ModelCacheConfig
|
|
from core.models.ui_element import UIElement
|
|
from core.models.screen_state import ScreenState
|
|
from core.models.workflow_graph import TargetSpec
|
|
|
|
|
|
# Stratégies Hypothesis pour les tests
|
|
@st.composite
|
|
def ui_element_strategy(draw):
|
|
"""Stratégie pour générer des UIElements"""
|
|
element_id = draw(st.text(min_size=1, max_size=20, alphabet=st.characters(whitelist_categories=('Lu', 'Ll', 'Nd'))))
|
|
x = draw(st.integers(min_value=0, max_value=1920))
|
|
y = draw(st.integers(min_value=0, max_value=1080))
|
|
w = draw(st.integers(min_value=10, max_value=300))
|
|
h = draw(st.integers(min_value=10, max_value=100))
|
|
|
|
return UIElement(
|
|
element_id=element_id,
|
|
type=draw(st.sampled_from(['button', 'input', 'label', 'checkbox'])),
|
|
role=draw(st.sampled_from(['button', 'textfield', 'label', 'checkbox'])),
|
|
bbox=(x, y, w, h),
|
|
center=(x + w//2, y + h//2),
|
|
label=draw(st.text(max_size=50)),
|
|
confidence=draw(st.floats(min_value=0.1, max_value=1.0))
|
|
)
|
|
|
|
|
|
@st.composite
|
|
def screen_state_strategy(draw):
|
|
"""Stratégie pour générer des ScreenStates"""
|
|
screen_state_id = draw(st.text(min_size=5, max_size=20))
|
|
|
|
# Mock window avec screen_resolution
|
|
mock_window = Mock()
|
|
mock_window.screen_resolution = (1920, 1080)
|
|
|
|
screen_state = Mock(spec=ScreenState)
|
|
screen_state.screen_state_id = screen_state_id
|
|
screen_state.window = mock_window
|
|
|
|
return screen_state
|
|
|
|
|
|
class TestPerformanceOptimizationProperties:
|
|
"""Tests de propriétés pour les optimisations de performance"""
|
|
|
|
@given(
|
|
ui_elements=st.lists(ui_element_strategy(), min_size=5, max_size=20),
|
|
screen_states=st.lists(screen_state_strategy(), min_size=2, max_size=5)
|
|
)
|
|
@settings(max_examples=50, deadline=5000)
|
|
def test_property_6_spatial_index_reuse(self, ui_elements: List[UIElement], screen_states: List[ScreenState]):
|
|
"""
|
|
Propriété 6: Réutilisation de l'index spatial
|
|
|
|
Pour toute résolution de TargetResolver avec la même signature d'écran,
|
|
l'index spatial doit être réutilisé (Exigence 5.1).
|
|
|
|
Invariants:
|
|
1. Même signature d'écran → même index spatial réutilisé
|
|
2. Cache hit doit être plus rapide que construction
|
|
3. Index doit être fonctionnellement équivalent
|
|
"""
|
|
resolver = TargetResolver(cache_size=100)
|
|
|
|
# Utiliser le même screen_state plusieurs fois pour forcer la réutilisation
|
|
screen_state = screen_states[0]
|
|
|
|
# Premier accès - construction de l'index
|
|
start_time = time.perf_counter()
|
|
index1 = resolver._get_spatial_index(screen_state, ui_elements)
|
|
first_access_time = time.perf_counter() - start_time
|
|
|
|
# Deuxième accès - doit réutiliser le cache
|
|
start_time = time.perf_counter()
|
|
index2 = resolver._get_spatial_index(screen_state, ui_elements)
|
|
second_access_time = time.perf_counter() - start_time
|
|
|
|
# Propriété 1: Même objet réutilisé
|
|
assert index1 is index2, "L'index spatial doit être réutilisé pour la même signature d'écran"
|
|
|
|
# Propriété 2: Cache hit plus rapide (au moins 2x plus rapide)
|
|
if first_access_time > 0.001: # Seulement si le premier accès est mesurable
|
|
assert second_access_time < first_access_time / 2, \
|
|
f"Cache hit ({second_access_time:.4f}s) doit être plus rapide que construction ({first_access_time:.4f}s)"
|
|
|
|
# Propriété 3: Fonctionnalité équivalente
|
|
# Tester quelques requêtes pour vérifier que l'index fonctionne
|
|
if ui_elements:
|
|
test_element = ui_elements[0]
|
|
x, y = test_element.bbox[0] + 5, test_element.bbox[1] + 5
|
|
|
|
results1 = index1.query_point(x, y)
|
|
results2 = index2.query_point(x, y)
|
|
|
|
assert results1 == results2, "Les résultats de requête doivent être identiques"
|
|
|
|
# Vérifier les stats du cache
|
|
stats = resolver.get_stats()
|
|
assert 'spatial_index_cache' in stats
|
|
assert stats['spatial_index_cache']['size'] >= 1
|
|
|
|
@given(
|
|
embedding_paths=st.dictionaries(
|
|
st.sampled_from(['image', 'text', 'ui']),
|
|
st.text(min_size=5, max_size=30),
|
|
min_size=1, max_size=3
|
|
)
|
|
)
|
|
@settings(max_examples=30, deadline=3000)
|
|
def test_embedding_lazy_loading_cache(self, embedding_paths: Dict[str, str]):
|
|
"""
|
|
Test du lazy loading des embeddings avec cache.
|
|
|
|
Propriétés:
|
|
1. Premier accès charge depuis le disque
|
|
2. Accès suivants utilisent le cache
|
|
3. Cache WeakValueDictionary permet garbage collection
|
|
"""
|
|
fusion_engine = FusionEngine()
|
|
|
|
# Mock des embeddings sur disque
|
|
import numpy as np
|
|
mock_embedding = np.random.rand(512).astype(np.float32)
|
|
|
|
with patch('numpy.load', return_value=mock_embedding), \
|
|
patch('pathlib.Path.exists', return_value=True):
|
|
|
|
# Premier accès pour chaque embedding
|
|
first_access_times = {}
|
|
for modality, path in embedding_paths.items():
|
|
start_time = time.perf_counter()
|
|
result1 = fusion_engine.load_embedding_lazy(path)
|
|
first_access_times[path] = time.perf_counter() - start_time
|
|
|
|
assert result1 is not None, f"Embedding doit être chargé pour {modality}"
|
|
assert result1.shape == (512,), "Embedding doit avoir la bonne forme"
|
|
|
|
# Deuxième accès - doit utiliser le cache
|
|
for modality, path in embedding_paths.items():
|
|
start_time = time.perf_counter()
|
|
result2 = fusion_engine.load_embedding_lazy(path)
|
|
second_access_time = time.perf_counter() - start_time
|
|
|
|
assert result2 is not None, "Embedding en cache doit être accessible"
|
|
|
|
# Cache hit doit être plus rapide
|
|
if first_access_times[path] > 0.0001: # Si mesurable
|
|
assert second_access_time < first_access_times[path], \
|
|
"Cache hit doit être plus rapide que le chargement initial"
|
|
|
|
# Vérifier les stats du cache
|
|
stats = fusion_engine.get_cache_stats()
|
|
assert stats['hits'] >= len(embedding_paths), "Doit avoir des cache hits"
|
|
assert stats['loads'] >= len(embedding_paths), "Doit avoir des chargements"
|
|
assert stats['cache_size'] >= len(embedding_paths), "Cache doit contenir les embeddings"
|
|
|
|
@given(
|
|
model_keys=st.lists(st.text(min_size=3, max_size=20), min_size=2, max_size=5, unique=True),
|
|
model_types=st.lists(st.sampled_from(['pytorch', 'sklearn', 'custom']), min_size=2, max_size=5)
|
|
)
|
|
@settings(max_examples=20, deadline=3000)
|
|
def test_model_cache_properties(self, model_keys: List[str], model_types: List[str]):
|
|
"""
|
|
Test des propriétés du cache de modèles ML.
|
|
|
|
Propriétés:
|
|
1. Modèle chargé une seule fois pour la même clé
|
|
2. Cache respecte les limites de taille
|
|
3. LRU éviction fonctionne correctement
|
|
"""
|
|
config = ModelCacheConfig(max_models=3, max_memory_mb=100.0)
|
|
model_cache = ModelCache(config)
|
|
|
|
# Mock des modèles
|
|
mock_models = {}
|
|
load_counts = {}
|
|
|
|
def create_loader(key: str, model_type: str):
|
|
def loader():
|
|
load_counts[key] = load_counts.get(key, 0) + 1
|
|
# Simuler un modèle avec une taille
|
|
mock_model = Mock()
|
|
mock_model.__sizeof__ = Mock(return_value=10 * 1024 * 1024) # 10MB
|
|
mock_models[key] = mock_model
|
|
return mock_model
|
|
return loader
|
|
|
|
# Charger les modèles
|
|
loaded_models = {}
|
|
for i, key in enumerate(model_keys[:config.max_models]):
|
|
model_type = model_types[i % len(model_types)]
|
|
loader = create_loader(key, model_type)
|
|
|
|
# Premier accès
|
|
model1 = model_cache.get_model(key, loader, model_type)
|
|
loaded_models[key] = model1
|
|
|
|
# Deuxième accès - doit réutiliser
|
|
model2 = model_cache.get_model(key, loader, model_type)
|
|
|
|
# Propriété 1: Même modèle réutilisé
|
|
assert model1 is model2, f"Modèle {key} doit être réutilisé"
|
|
assert load_counts[key] == 1, f"Modèle {key} ne doit être chargé qu'une fois"
|
|
|
|
# Propriété 2: Limites respectées
|
|
stats = model_cache.get_stats()
|
|
assert stats['cache_size'] <= config.max_models, "Cache ne doit pas dépasser la limite"
|
|
|
|
# Test d'éviction LRU si on a assez de clés
|
|
if len(model_keys) > config.max_models:
|
|
# Charger un modèle supplémentaire pour déclencher l'éviction
|
|
extra_key = model_keys[config.max_models]
|
|
extra_type = model_types[0]
|
|
extra_loader = create_loader(extra_key, extra_type)
|
|
|
|
model_cache.get_model(extra_key, extra_loader, extra_type)
|
|
|
|
# Vérifier que la limite est toujours respectée
|
|
final_stats = model_cache.get_stats()
|
|
assert final_stats['cache_size'] <= config.max_models, \
|
|
"Cache doit respecter la limite après éviction"
|
|
|
|
model_cache.shutdown()
|
|
|
|
@given(
|
|
element_pairs=st.lists(
|
|
st.tuples(ui_element_strategy(), ui_element_strategy()),
|
|
min_size=3, max_size=10
|
|
)
|
|
)
|
|
@settings(max_examples=30, deadline=3000)
|
|
def test_computation_cache_properties(self, element_pairs: List[tuple]):
|
|
"""
|
|
Test des propriétés du cache de calculs redondants.
|
|
|
|
Propriétés:
|
|
1. Calculs identiques réutilisés
|
|
2. Cache symétrique pour distances
|
|
3. Performance améliorée sur calculs répétés
|
|
"""
|
|
computation_cache = ComputationCache(max_size=100)
|
|
|
|
# Test de la propriété de cache des distances
|
|
calculation_counts = {}
|
|
|
|
def create_distance_calculator(elem1_id: str, elem2_id: str):
|
|
def calculator():
|
|
key = f"{elem1_id}-{elem2_id}"
|
|
calculation_counts[key] = calculation_counts.get(key, 0) + 1
|
|
# Simuler un calcul coûteux
|
|
time.sleep(0.001) # 1ms de calcul simulé
|
|
return 100.0 # Distance fixe pour le test
|
|
return calculator
|
|
|
|
# Tester les calculs de distance
|
|
for elem1, elem2 in element_pairs[:5]: # Limiter pour la performance
|
|
calculator = create_distance_calculator(elem1.element_id, elem2.element_id)
|
|
|
|
# Premier calcul
|
|
start_time = time.perf_counter()
|
|
distance1 = computation_cache.get_distance(
|
|
elem1.element_id, elem2.element_id, calculator
|
|
)
|
|
first_time = time.perf_counter() - start_time
|
|
|
|
# Deuxième calcul - doit utiliser le cache
|
|
start_time = time.perf_counter()
|
|
distance2 = computation_cache.get_distance(
|
|
elem1.element_id, elem2.element_id, calculator
|
|
)
|
|
second_time = time.perf_counter() - start_time
|
|
|
|
# Propriété 1: Résultat identique
|
|
assert distance1 == distance2, "Distance doit être identique depuis le cache"
|
|
|
|
# Propriété 2: Cache hit plus rapide
|
|
assert second_time < first_time, "Cache hit doit être plus rapide"
|
|
|
|
# Propriété 3: Calcul effectué une seule fois
|
|
calc_key = f"{elem1.element_id}-{elem2.element_id}"
|
|
assert calculation_counts.get(calc_key, 0) == 1, \
|
|
"Calcul ne doit être effectué qu'une fois"
|
|
|
|
# Test de la symétrie des distances
|
|
if len(element_pairs) >= 2:
|
|
elem1, elem2 = element_pairs[0]
|
|
calculator = create_distance_calculator(elem1.element_id, elem2.element_id)
|
|
|
|
# Distance A->B
|
|
dist_ab = computation_cache.get_distance(
|
|
elem1.element_id, elem2.element_id, calculator
|
|
)
|
|
|
|
# Distance B->A (doit utiliser le même cache)
|
|
dist_ba = computation_cache.get_distance(
|
|
elem2.element_id, elem1.element_id, calculator
|
|
)
|
|
|
|
assert dist_ab == dist_ba, "Distance doit être symétrique"
|
|
|
|
# Vérifier les stats
|
|
stats = computation_cache.get_stats()
|
|
assert stats['hits'] > 0, "Doit avoir des cache hits"
|
|
assert stats['hit_rate_percent'] > 0, "Taux de hit doit être positif"
|
|
|
|
@given(
|
|
bbox_tuples=st.lists(
|
|
st.tuples(
|
|
st.integers(min_value=0, max_value=1000), # x
|
|
st.integers(min_value=0, max_value=1000), # y
|
|
st.integers(min_value=10, max_value=200), # w
|
|
st.integers(min_value=10, max_value=200) # h
|
|
),
|
|
min_size=5, max_size=15
|
|
)
|
|
)
|
|
@settings(max_examples=30, deadline=2000)
|
|
def test_lru_cache_functions_properties(self, bbox_tuples: List[tuple]):
|
|
"""
|
|
Test des propriétés des fonctions avec cache LRU.
|
|
|
|
Propriétés:
|
|
1. Résultats cohérents pour mêmes inputs
|
|
2. Cache améliore les performances
|
|
3. Fonctions mathématiquement correctes
|
|
"""
|
|
from core.execution.computation_cache import (
|
|
cached_bbox_center, cached_bbox_area, cached_bbox_iou, cached_euclidean_distance
|
|
)
|
|
|
|
# Test cached_bbox_center
|
|
for bbox in bbox_tuples[:10]: # Limiter pour performance
|
|
x, y, w, h = bbox
|
|
|
|
# Calculs multiples du même centre
|
|
center1 = cached_bbox_center(bbox)
|
|
center2 = cached_bbox_center(bbox)
|
|
|
|
# Propriété 1: Résultats identiques
|
|
assert center1 == center2, "Centre doit être identique pour même bbox"
|
|
|
|
# Propriété 2: Calcul mathématiquement correct
|
|
expected_center = (float(x + w / 2), float(y + h / 2))
|
|
assert center1 == expected_center, "Centre doit être calculé correctement"
|
|
|
|
# Test cached_bbox_area
|
|
for bbox in bbox_tuples[:10]:
|
|
x, y, w, h = bbox
|
|
|
|
area1 = cached_bbox_area(bbox)
|
|
area2 = cached_bbox_area(bbox)
|
|
|
|
assert area1 == area2, "Aire doit être identique pour même bbox"
|
|
assert area1 == float(w * h), "Aire doit être calculée correctement"
|
|
|
|
# Test cached_bbox_iou avec paires
|
|
for i in range(min(5, len(bbox_tuples) - 1)):
|
|
bbox1 = bbox_tuples[i]
|
|
bbox2 = bbox_tuples[i + 1]
|
|
|
|
iou1 = cached_bbox_iou(bbox1, bbox2)
|
|
iou2 = cached_bbox_iou(bbox1, bbox2)
|
|
|
|
# Propriété 1: Résultats identiques
|
|
assert iou1 == iou2, "IoU doit être identique pour mêmes bboxes"
|
|
|
|
# Propriété 2: IoU symétrique
|
|
iou_reverse = cached_bbox_iou(bbox2, bbox1)
|
|
assert abs(iou1 - iou_reverse) < 1e-10, "IoU doit être symétrique"
|
|
|
|
# Propriété 3: IoU dans [0, 1]
|
|
assert 0.0 <= iou1 <= 1.0, "IoU doit être dans [0, 1]"
|
|
|
|
# Test cached_euclidean_distance
|
|
points = [(float(bbox[0]), float(bbox[1])) for bbox in bbox_tuples[:10]]
|
|
for i in range(min(5, len(points) - 1)):
|
|
point1 = points[i]
|
|
point2 = points[i + 1]
|
|
|
|
dist1 = cached_euclidean_distance(point1, point2)
|
|
dist2 = cached_euclidean_distance(point1, point2)
|
|
|
|
# Propriété 1: Résultats identiques
|
|
assert dist1 == dist2, "Distance doit être identique pour mêmes points"
|
|
|
|
# Propriété 2: Distance symétrique
|
|
dist_reverse = cached_euclidean_distance(point2, point1)
|
|
assert abs(dist1 - dist_reverse) < 1e-10, "Distance doit être symétrique"
|
|
|
|
# Propriété 3: Distance positive
|
|
assert dist1 >= 0.0, "Distance doit être positive"
|
|
|
|
def test_integrated_performance_improvement(self):
|
|
"""
|
|
Test d'intégration vérifiant l'amélioration globale des performances.
|
|
|
|
Vérifie que toutes les optimisations ensemble améliorent les performances
|
|
de résolution de cibles.
|
|
"""
|
|
# Créer des données de test
|
|
ui_elements = [
|
|
UIElement(
|
|
element_id=f"elem_{i}",
|
|
type="button",
|
|
role="button",
|
|
bbox=(i * 100, i * 50, 80, 30),
|
|
center=(i * 100 + 40, i * 50 + 15),
|
|
label=f"Button {i}",
|
|
confidence=0.9
|
|
)
|
|
for i in range(10)
|
|
]
|
|
|
|
screen_state = Mock(spec=ScreenState)
|
|
screen_state.screen_state_id = "test_screen"
|
|
screen_state.window = Mock()
|
|
screen_state.window.screen_resolution = (1920, 1080)
|
|
|
|
# Mock pour obtenir les éléments UI
|
|
with patch.object(TargetResolver, '_get_ui_elements', return_value=ui_elements):
|
|
resolver = TargetResolver(cache_size=50)
|
|
|
|
target_spec = TargetSpec(by_role="button", by_text="Button 5")
|
|
|
|
# Première résolution - construction des caches
|
|
start_time = time.perf_counter()
|
|
result1 = resolver.resolve_target(target_spec, screen_state)
|
|
first_resolution_time = time.perf_counter() - start_time
|
|
|
|
# Deuxième résolution - doit utiliser les caches
|
|
start_time = time.perf_counter()
|
|
result2 = resolver.resolve_target(target_spec, screen_state)
|
|
second_resolution_time = time.perf_counter() - start_time
|
|
|
|
# Vérifier que les résultats sont cohérents
|
|
if result1 and result2:
|
|
assert result1.element.element_id == result2.element.element_id, \
|
|
"Résultats doivent être cohérents"
|
|
|
|
# Vérifier l'amélioration des performances
|
|
if first_resolution_time > 0.001: # Si mesurable
|
|
improvement_ratio = first_resolution_time / max(second_resolution_time, 0.0001)
|
|
assert improvement_ratio > 1.5, \
|
|
f"Deuxième résolution doit être au moins 50% plus rapide (ratio: {improvement_ratio:.2f})"
|
|
|
|
# Vérifier les stats des caches
|
|
stats = resolver.get_stats()
|
|
|
|
# Cache de résolution
|
|
assert stats.get('cache_hits', 0) >= 1, "Doit avoir des cache hits de résolution"
|
|
|
|
# Cache d'index spatial
|
|
spatial_cache = stats.get('spatial_index_cache', {})
|
|
assert spatial_cache.get('size', 0) >= 1, "Doit avoir un index spatial en cache"
|
|
|
|
# Cache de calculs
|
|
comp_cache = stats.get('computation_cache', {})
|
|
if comp_cache:
|
|
assert comp_cache.get('cache_sizes', {}).get('total', 0) >= 0, \
|
|
"Cache de calculs doit être initialisé"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"]) |