Files
rpa_vision_v3/tests/property/test_performance_optimization_properties.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

486 lines
20 KiB
Python

"""
Tests de propriétés pour les optimisations de performance - Tâche 5
Valide les propriétés des optimisations implémentées :
- Propriété 6: Réutilisation de l'index spatial
- Cache des embeddings avec lazy loading
- Cache des modèles ML
- Cache des calculs redondants
Auteur : Dom, Alice Kiro - 20 décembre 2024
"""
import pytest
from hypothesis import given, strategies as st, assume, settings
import time
from unittest.mock import Mock, patch
from typing import List, Dict, Any
from core.execution.target_resolver import TargetResolver
from core.execution.computation_cache import ComputationCache, cached_bbox_center, cached_euclidean_distance
from core.embedding.fusion_engine import FusionEngine
from core.models.model_cache import ModelCache, ModelCacheConfig
from core.models.ui_element import UIElement
from core.models.screen_state import ScreenState
from core.models.workflow_graph import TargetSpec
# Stratégies Hypothesis pour les tests
@st.composite
def ui_element_strategy(draw):
"""Stratégie pour générer des UIElements"""
element_id = draw(st.text(min_size=1, max_size=20, alphabet=st.characters(whitelist_categories=('Lu', 'Ll', 'Nd'))))
x = draw(st.integers(min_value=0, max_value=1920))
y = draw(st.integers(min_value=0, max_value=1080))
w = draw(st.integers(min_value=10, max_value=300))
h = draw(st.integers(min_value=10, max_value=100))
return UIElement(
element_id=element_id,
type=draw(st.sampled_from(['button', 'input', 'label', 'checkbox'])),
role=draw(st.sampled_from(['button', 'textfield', 'label', 'checkbox'])),
bbox=(x, y, w, h),
center=(x + w//2, y + h//2),
label=draw(st.text(max_size=50)),
confidence=draw(st.floats(min_value=0.1, max_value=1.0))
)
@st.composite
def screen_state_strategy(draw):
"""Stratégie pour générer des ScreenStates"""
screen_state_id = draw(st.text(min_size=5, max_size=20))
# Mock window avec screen_resolution
mock_window = Mock()
mock_window.screen_resolution = (1920, 1080)
screen_state = Mock(spec=ScreenState)
screen_state.screen_state_id = screen_state_id
screen_state.window = mock_window
return screen_state
class TestPerformanceOptimizationProperties:
"""Tests de propriétés pour les optimisations de performance"""
@given(
ui_elements=st.lists(ui_element_strategy(), min_size=5, max_size=20),
screen_states=st.lists(screen_state_strategy(), min_size=2, max_size=5)
)
@settings(max_examples=50, deadline=5000)
def test_property_6_spatial_index_reuse(self, ui_elements: List[UIElement], screen_states: List[ScreenState]):
"""
Propriété 6: Réutilisation de l'index spatial
Pour toute résolution de TargetResolver avec la même signature d'écran,
l'index spatial doit être réutilisé (Exigence 5.1).
Invariants:
1. Même signature d'écran → même index spatial réutilisé
2. Cache hit doit être plus rapide que construction
3. Index doit être fonctionnellement équivalent
"""
resolver = TargetResolver(cache_size=100)
# Utiliser le même screen_state plusieurs fois pour forcer la réutilisation
screen_state = screen_states[0]
# Premier accès - construction de l'index
start_time = time.perf_counter()
index1 = resolver._get_spatial_index(screen_state, ui_elements)
first_access_time = time.perf_counter() - start_time
# Deuxième accès - doit réutiliser le cache
start_time = time.perf_counter()
index2 = resolver._get_spatial_index(screen_state, ui_elements)
second_access_time = time.perf_counter() - start_time
# Propriété 1: Même objet réutilisé
assert index1 is index2, "L'index spatial doit être réutilisé pour la même signature d'écran"
# Propriété 2: Cache hit plus rapide (au moins 2x plus rapide)
if first_access_time > 0.001: # Seulement si le premier accès est mesurable
assert second_access_time < first_access_time / 2, \
f"Cache hit ({second_access_time:.4f}s) doit être plus rapide que construction ({first_access_time:.4f}s)"
# Propriété 3: Fonctionnalité équivalente
# Tester quelques requêtes pour vérifier que l'index fonctionne
if ui_elements:
test_element = ui_elements[0]
x, y = test_element.bbox[0] + 5, test_element.bbox[1] + 5
results1 = index1.query_point(x, y)
results2 = index2.query_point(x, y)
assert results1 == results2, "Les résultats de requête doivent être identiques"
# Vérifier les stats du cache
stats = resolver.get_stats()
assert 'spatial_index_cache' in stats
assert stats['spatial_index_cache']['size'] >= 1
@given(
embedding_paths=st.dictionaries(
st.sampled_from(['image', 'text', 'ui']),
st.text(min_size=5, max_size=30),
min_size=1, max_size=3
)
)
@settings(max_examples=30, deadline=3000)
def test_embedding_lazy_loading_cache(self, embedding_paths: Dict[str, str]):
"""
Test du lazy loading des embeddings avec cache.
Propriétés:
1. Premier accès charge depuis le disque
2. Accès suivants utilisent le cache
3. Cache WeakValueDictionary permet garbage collection
"""
fusion_engine = FusionEngine()
# Mock des embeddings sur disque
import numpy as np
mock_embedding = np.random.rand(512).astype(np.float32)
with patch('numpy.load', return_value=mock_embedding), \
patch('pathlib.Path.exists', return_value=True):
# Premier accès pour chaque embedding
first_access_times = {}
for modality, path in embedding_paths.items():
start_time = time.perf_counter()
result1 = fusion_engine.load_embedding_lazy(path)
first_access_times[path] = time.perf_counter() - start_time
assert result1 is not None, f"Embedding doit être chargé pour {modality}"
assert result1.shape == (512,), "Embedding doit avoir la bonne forme"
# Deuxième accès - doit utiliser le cache
for modality, path in embedding_paths.items():
start_time = time.perf_counter()
result2 = fusion_engine.load_embedding_lazy(path)
second_access_time = time.perf_counter() - start_time
assert result2 is not None, "Embedding en cache doit être accessible"
# Cache hit doit être plus rapide
if first_access_times[path] > 0.0001: # Si mesurable
assert second_access_time < first_access_times[path], \
"Cache hit doit être plus rapide que le chargement initial"
# Vérifier les stats du cache
stats = fusion_engine.get_cache_stats()
assert stats['hits'] >= len(embedding_paths), "Doit avoir des cache hits"
assert stats['loads'] >= len(embedding_paths), "Doit avoir des chargements"
assert stats['cache_size'] >= len(embedding_paths), "Cache doit contenir les embeddings"
@given(
model_keys=st.lists(st.text(min_size=3, max_size=20), min_size=2, max_size=5, unique=True),
model_types=st.lists(st.sampled_from(['pytorch', 'sklearn', 'custom']), min_size=2, max_size=5)
)
@settings(max_examples=20, deadline=3000)
def test_model_cache_properties(self, model_keys: List[str], model_types: List[str]):
"""
Test des propriétés du cache de modèles ML.
Propriétés:
1. Modèle chargé une seule fois pour la même clé
2. Cache respecte les limites de taille
3. LRU éviction fonctionne correctement
"""
config = ModelCacheConfig(max_models=3, max_memory_mb=100.0)
model_cache = ModelCache(config)
# Mock des modèles
mock_models = {}
load_counts = {}
def create_loader(key: str, model_type: str):
def loader():
load_counts[key] = load_counts.get(key, 0) + 1
# Simuler un modèle avec une taille
mock_model = Mock()
mock_model.__sizeof__ = Mock(return_value=10 * 1024 * 1024) # 10MB
mock_models[key] = mock_model
return mock_model
return loader
# Charger les modèles
loaded_models = {}
for i, key in enumerate(model_keys[:config.max_models]):
model_type = model_types[i % len(model_types)]
loader = create_loader(key, model_type)
# Premier accès
model1 = model_cache.get_model(key, loader, model_type)
loaded_models[key] = model1
# Deuxième accès - doit réutiliser
model2 = model_cache.get_model(key, loader, model_type)
# Propriété 1: Même modèle réutilisé
assert model1 is model2, f"Modèle {key} doit être réutilisé"
assert load_counts[key] == 1, f"Modèle {key} ne doit être chargé qu'une fois"
# Propriété 2: Limites respectées
stats = model_cache.get_stats()
assert stats['cache_size'] <= config.max_models, "Cache ne doit pas dépasser la limite"
# Test d'éviction LRU si on a assez de clés
if len(model_keys) > config.max_models:
# Charger un modèle supplémentaire pour déclencher l'éviction
extra_key = model_keys[config.max_models]
extra_type = model_types[0]
extra_loader = create_loader(extra_key, extra_type)
model_cache.get_model(extra_key, extra_loader, extra_type)
# Vérifier que la limite est toujours respectée
final_stats = model_cache.get_stats()
assert final_stats['cache_size'] <= config.max_models, \
"Cache doit respecter la limite après éviction"
model_cache.shutdown()
@given(
element_pairs=st.lists(
st.tuples(ui_element_strategy(), ui_element_strategy()),
min_size=3, max_size=10
)
)
@settings(max_examples=30, deadline=3000)
def test_computation_cache_properties(self, element_pairs: List[tuple]):
"""
Test des propriétés du cache de calculs redondants.
Propriétés:
1. Calculs identiques réutilisés
2. Cache symétrique pour distances
3. Performance améliorée sur calculs répétés
"""
computation_cache = ComputationCache(max_size=100)
# Test de la propriété de cache des distances
calculation_counts = {}
def create_distance_calculator(elem1_id: str, elem2_id: str):
def calculator():
key = f"{elem1_id}-{elem2_id}"
calculation_counts[key] = calculation_counts.get(key, 0) + 1
# Simuler un calcul coûteux
time.sleep(0.001) # 1ms de calcul simulé
return 100.0 # Distance fixe pour le test
return calculator
# Tester les calculs de distance
for elem1, elem2 in element_pairs[:5]: # Limiter pour la performance
calculator = create_distance_calculator(elem1.element_id, elem2.element_id)
# Premier calcul
start_time = time.perf_counter()
distance1 = computation_cache.get_distance(
elem1.element_id, elem2.element_id, calculator
)
first_time = time.perf_counter() - start_time
# Deuxième calcul - doit utiliser le cache
start_time = time.perf_counter()
distance2 = computation_cache.get_distance(
elem1.element_id, elem2.element_id, calculator
)
second_time = time.perf_counter() - start_time
# Propriété 1: Résultat identique
assert distance1 == distance2, "Distance doit être identique depuis le cache"
# Propriété 2: Cache hit plus rapide
assert second_time < first_time, "Cache hit doit être plus rapide"
# Propriété 3: Calcul effectué une seule fois
calc_key = f"{elem1.element_id}-{elem2.element_id}"
assert calculation_counts.get(calc_key, 0) == 1, \
"Calcul ne doit être effectué qu'une fois"
# Test de la symétrie des distances
if len(element_pairs) >= 2:
elem1, elem2 = element_pairs[0]
calculator = create_distance_calculator(elem1.element_id, elem2.element_id)
# Distance A->B
dist_ab = computation_cache.get_distance(
elem1.element_id, elem2.element_id, calculator
)
# Distance B->A (doit utiliser le même cache)
dist_ba = computation_cache.get_distance(
elem2.element_id, elem1.element_id, calculator
)
assert dist_ab == dist_ba, "Distance doit être symétrique"
# Vérifier les stats
stats = computation_cache.get_stats()
assert stats['hits'] > 0, "Doit avoir des cache hits"
assert stats['hit_rate_percent'] > 0, "Taux de hit doit être positif"
@given(
bbox_tuples=st.lists(
st.tuples(
st.integers(min_value=0, max_value=1000), # x
st.integers(min_value=0, max_value=1000), # y
st.integers(min_value=10, max_value=200), # w
st.integers(min_value=10, max_value=200) # h
),
min_size=5, max_size=15
)
)
@settings(max_examples=30, deadline=2000)
def test_lru_cache_functions_properties(self, bbox_tuples: List[tuple]):
"""
Test des propriétés des fonctions avec cache LRU.
Propriétés:
1. Résultats cohérents pour mêmes inputs
2. Cache améliore les performances
3. Fonctions mathématiquement correctes
"""
from core.execution.computation_cache import (
cached_bbox_center, cached_bbox_area, cached_bbox_iou, cached_euclidean_distance
)
# Test cached_bbox_center
for bbox in bbox_tuples[:10]: # Limiter pour performance
x, y, w, h = bbox
# Calculs multiples du même centre
center1 = cached_bbox_center(bbox)
center2 = cached_bbox_center(bbox)
# Propriété 1: Résultats identiques
assert center1 == center2, "Centre doit être identique pour même bbox"
# Propriété 2: Calcul mathématiquement correct
expected_center = (float(x + w / 2), float(y + h / 2))
assert center1 == expected_center, "Centre doit être calculé correctement"
# Test cached_bbox_area
for bbox in bbox_tuples[:10]:
x, y, w, h = bbox
area1 = cached_bbox_area(bbox)
area2 = cached_bbox_area(bbox)
assert area1 == area2, "Aire doit être identique pour même bbox"
assert area1 == float(w * h), "Aire doit être calculée correctement"
# Test cached_bbox_iou avec paires
for i in range(min(5, len(bbox_tuples) - 1)):
bbox1 = bbox_tuples[i]
bbox2 = bbox_tuples[i + 1]
iou1 = cached_bbox_iou(bbox1, bbox2)
iou2 = cached_bbox_iou(bbox1, bbox2)
# Propriété 1: Résultats identiques
assert iou1 == iou2, "IoU doit être identique pour mêmes bboxes"
# Propriété 2: IoU symétrique
iou_reverse = cached_bbox_iou(bbox2, bbox1)
assert abs(iou1 - iou_reverse) < 1e-10, "IoU doit être symétrique"
# Propriété 3: IoU dans [0, 1]
assert 0.0 <= iou1 <= 1.0, "IoU doit être dans [0, 1]"
# Test cached_euclidean_distance
points = [(float(bbox[0]), float(bbox[1])) for bbox in bbox_tuples[:10]]
for i in range(min(5, len(points) - 1)):
point1 = points[i]
point2 = points[i + 1]
dist1 = cached_euclidean_distance(point1, point2)
dist2 = cached_euclidean_distance(point1, point2)
# Propriété 1: Résultats identiques
assert dist1 == dist2, "Distance doit être identique pour mêmes points"
# Propriété 2: Distance symétrique
dist_reverse = cached_euclidean_distance(point2, point1)
assert abs(dist1 - dist_reverse) < 1e-10, "Distance doit être symétrique"
# Propriété 3: Distance positive
assert dist1 >= 0.0, "Distance doit être positive"
def test_integrated_performance_improvement(self):
"""
Test d'intégration vérifiant l'amélioration globale des performances.
Vérifie que toutes les optimisations ensemble améliorent les performances
de résolution de cibles.
"""
# Créer des données de test
ui_elements = [
UIElement(
element_id=f"elem_{i}",
type="button",
role="button",
bbox=(i * 100, i * 50, 80, 30),
center=(i * 100 + 40, i * 50 + 15),
label=f"Button {i}",
confidence=0.9
)
for i in range(10)
]
screen_state = Mock(spec=ScreenState)
screen_state.screen_state_id = "test_screen"
screen_state.window = Mock()
screen_state.window.screen_resolution = (1920, 1080)
# Mock pour obtenir les éléments UI
with patch.object(TargetResolver, '_get_ui_elements', return_value=ui_elements):
resolver = TargetResolver(cache_size=50)
target_spec = TargetSpec(by_role="button", by_text="Button 5")
# Première résolution - construction des caches
start_time = time.perf_counter()
result1 = resolver.resolve_target(target_spec, screen_state)
first_resolution_time = time.perf_counter() - start_time
# Deuxième résolution - doit utiliser les caches
start_time = time.perf_counter()
result2 = resolver.resolve_target(target_spec, screen_state)
second_resolution_time = time.perf_counter() - start_time
# Vérifier que les résultats sont cohérents
if result1 and result2:
assert result1.element.element_id == result2.element.element_id, \
"Résultats doivent être cohérents"
# Vérifier l'amélioration des performances
if first_resolution_time > 0.001: # Si mesurable
improvement_ratio = first_resolution_time / max(second_resolution_time, 0.0001)
assert improvement_ratio > 1.5, \
f"Deuxième résolution doit être au moins 50% plus rapide (ratio: {improvement_ratio:.2f})"
# Vérifier les stats des caches
stats = resolver.get_stats()
# Cache de résolution
assert stats.get('cache_hits', 0) >= 1, "Doit avoir des cache hits de résolution"
# Cache d'index spatial
spatial_cache = stats.get('spatial_index_cache', {})
assert spatial_cache.get('size', 0) >= 1, "Doit avoir un index spatial en cache"
# Cache de calculs
comp_cache = stats.get('computation_cache', {})
if comp_cache:
assert comp_cache.get('cache_sizes', {}).get('total', 0) >= 0, \
"Cache de calculs doit être initialisé"
if __name__ == "__main__":
pytest.main([__file__, "-v"])