- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
437 lines
16 KiB
Python
437 lines
16 KiB
Python
"""
|
|
Tests unitaires pour les modèles de données Auto-Heal Hybride - Fiche #22
|
|
|
|
Tests pour ExecutionState, ExecutionStateInfo, FailureWindow, VersionInfo,
|
|
PolicyConfig et leurs méthodes de sérialisation/désérialisation.
|
|
|
|
Auteur: Dom, Alice Kiro - 23 décembre 2024
|
|
"""
|
|
|
|
import json
|
|
import pytest
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, Any
|
|
|
|
from core.system.auto_heal_manager import (
|
|
ExecutionState,
|
|
ExecutionStateInfo,
|
|
FailureEvent,
|
|
FailureWindow,
|
|
VersionInfo,
|
|
PolicyConfig
|
|
)
|
|
|
|
|
|
class TestExecutionState:
|
|
"""Tests pour l'enum ExecutionState"""
|
|
|
|
def test_execution_state_values(self):
|
|
"""Test des valeurs de l'enum ExecutionState"""
|
|
assert ExecutionState.RUNNING.value == "running"
|
|
assert ExecutionState.DEGRADED.value == "degraded"
|
|
assert ExecutionState.QUARANTINED.value == "quarantined"
|
|
assert ExecutionState.ROLLBACK.value == "rollback"
|
|
assert ExecutionState.PAUSED.value == "paused"
|
|
|
|
def test_is_valid(self):
|
|
"""Test de la validation des états"""
|
|
assert ExecutionState.is_valid("running") is True
|
|
assert ExecutionState.is_valid("degraded") is True
|
|
assert ExecutionState.is_valid("invalid_state") is False
|
|
assert ExecutionState.is_valid("") is False
|
|
|
|
def test_valid_transitions(self):
|
|
"""Test des transitions d'état valides"""
|
|
# RUNNING peut aller vers DEGRADED, QUARANTINED, PAUSED
|
|
running_transitions = ExecutionState.get_valid_transitions(ExecutionState.RUNNING)
|
|
assert ExecutionState.DEGRADED in running_transitions
|
|
assert ExecutionState.QUARANTINED in running_transitions
|
|
assert ExecutionState.PAUSED in running_transitions
|
|
assert ExecutionState.ROLLBACK not in running_transitions
|
|
|
|
# DEGRADED peut aller vers RUNNING, QUARANTINED, ROLLBACK, PAUSED
|
|
degraded_transitions = ExecutionState.get_valid_transitions(ExecutionState.DEGRADED)
|
|
assert ExecutionState.RUNNING in degraded_transitions
|
|
assert ExecutionState.QUARANTINED in degraded_transitions
|
|
assert ExecutionState.ROLLBACK in degraded_transitions
|
|
assert ExecutionState.PAUSED in degraded_transitions
|
|
|
|
def test_can_transition_to(self):
|
|
"""Test de la méthode can_transition_to"""
|
|
assert ExecutionState.RUNNING.can_transition_to(ExecutionState.DEGRADED) is True
|
|
assert ExecutionState.RUNNING.can_transition_to(ExecutionState.ROLLBACK) is False
|
|
assert ExecutionState.DEGRADED.can_transition_to(ExecutionState.ROLLBACK) is True
|
|
assert ExecutionState.QUARANTINED.can_transition_to(ExecutionState.RUNNING) is True
|
|
|
|
|
|
class TestExecutionStateInfo:
|
|
"""Tests pour ExecutionStateInfo"""
|
|
|
|
def test_creation(self):
|
|
"""Test de création d'ExecutionStateInfo"""
|
|
now = datetime.now()
|
|
state_info = ExecutionStateInfo(
|
|
workflow_id="test_workflow",
|
|
current_state=ExecutionState.RUNNING,
|
|
state_since=now,
|
|
failure_count=0,
|
|
last_failure=None,
|
|
confidence_threshold=0.72,
|
|
learning_enabled=True,
|
|
quarantine_until=None
|
|
)
|
|
|
|
assert state_info.workflow_id == "test_workflow"
|
|
assert state_info.current_state == ExecutionState.RUNNING
|
|
assert state_info.state_since == now
|
|
assert state_info.failure_count == 0
|
|
assert state_info.last_failure is None
|
|
assert state_info.confidence_threshold == 0.72
|
|
assert state_info.learning_enabled is True
|
|
assert state_info.quarantine_until is None
|
|
|
|
def test_serialization(self):
|
|
"""Test de sérialisation/désérialisation"""
|
|
now = datetime.now()
|
|
quarantine_time = now + timedelta(hours=1)
|
|
|
|
original = ExecutionStateInfo(
|
|
workflow_id="test_workflow",
|
|
current_state=ExecutionState.QUARANTINED,
|
|
state_since=now,
|
|
failure_count=5,
|
|
last_failure=now - timedelta(minutes=5),
|
|
confidence_threshold=0.82,
|
|
learning_enabled=False,
|
|
quarantine_until=quarantine_time
|
|
)
|
|
|
|
# Sérialisation
|
|
data = original.to_dict()
|
|
assert data['workflow_id'] == "test_workflow"
|
|
assert data['current_state'] == "quarantined"
|
|
assert data['failure_count'] == 5
|
|
assert data['learning_enabled'] is False
|
|
|
|
# Désérialisation
|
|
restored = ExecutionStateInfo.from_dict(data)
|
|
assert restored.workflow_id == original.workflow_id
|
|
assert restored.current_state == original.current_state
|
|
assert restored.failure_count == original.failure_count
|
|
assert restored.learning_enabled == original.learning_enabled
|
|
# Vérifier les dates (avec tolérance pour les microsecondes)
|
|
assert abs((restored.state_since - original.state_since).total_seconds()) < 1
|
|
assert abs((restored.last_failure - original.last_failure).total_seconds()) < 1
|
|
assert abs((restored.quarantine_until - original.quarantine_until).total_seconds()) < 1
|
|
|
|
|
|
class TestFailureEvent:
|
|
"""Tests pour FailureEvent"""
|
|
|
|
def test_creation(self):
|
|
"""Test de création de FailureEvent"""
|
|
now = datetime.now()
|
|
event = FailureEvent(
|
|
timestamp=now,
|
|
workflow_id="test_workflow",
|
|
step_id="step_1",
|
|
failure_type="TARGET_NOT_FOUND"
|
|
)
|
|
|
|
assert event.timestamp == now
|
|
assert event.workflow_id == "test_workflow"
|
|
assert event.step_id == "step_1"
|
|
assert event.failure_type == "TARGET_NOT_FOUND"
|
|
|
|
def test_serialization(self):
|
|
"""Test de sérialisation/désérialisation"""
|
|
now = datetime.now()
|
|
original = FailureEvent(
|
|
timestamp=now,
|
|
workflow_id="test_workflow",
|
|
step_id="step_1",
|
|
failure_type="POSTCONDITION_FAILED"
|
|
)
|
|
|
|
# Sérialisation
|
|
data = original.to_dict()
|
|
assert data['workflow_id'] == "test_workflow"
|
|
assert data['step_id'] == "step_1"
|
|
assert data['failure_type'] == "POSTCONDITION_FAILED"
|
|
|
|
# Désérialisation
|
|
restored = FailureEvent.from_dict(data)
|
|
assert restored.workflow_id == original.workflow_id
|
|
assert restored.step_id == original.step_id
|
|
assert restored.failure_type == original.failure_type
|
|
assert abs((restored.timestamp - original.timestamp).total_seconds()) < 1
|
|
|
|
|
|
class TestFailureWindow:
|
|
"""Tests pour FailureWindow"""
|
|
|
|
def test_creation(self):
|
|
"""Test de création de FailureWindow"""
|
|
now = datetime.now()
|
|
window = FailureWindow(
|
|
window_start=now,
|
|
window_duration_s=600,
|
|
failures=[]
|
|
)
|
|
|
|
assert window.window_start == now
|
|
assert window.window_duration_s == 600
|
|
assert len(window.failures) == 0
|
|
|
|
def test_add_failure(self):
|
|
"""Test d'ajout d'échecs"""
|
|
now = datetime.now()
|
|
window = FailureWindow(now, 600, [])
|
|
|
|
failure = FailureEvent(now, "workflow_1", "step_1", "TARGET_NOT_FOUND")
|
|
window.add_failure(failure)
|
|
|
|
assert len(window.failures) == 1
|
|
assert window.get_failure_count() == 1
|
|
|
|
def test_cleanup_expired(self):
|
|
"""Test du nettoyage des échecs expirés"""
|
|
now = datetime.now()
|
|
window = FailureWindow(now, 300, []) # 5 minutes
|
|
|
|
# Ajouter un échec récent
|
|
recent_failure = FailureEvent(now, "workflow_1", "step_1", "TARGET_NOT_FOUND")
|
|
window.add_failure(recent_failure)
|
|
|
|
# Ajouter un échec ancien (simulé)
|
|
old_failure = FailureEvent(now - timedelta(minutes=10), "workflow_1", "step_2", "TIMEOUT")
|
|
window.failures.append(old_failure)
|
|
|
|
# Avant nettoyage
|
|
assert len(window.failures) == 2
|
|
|
|
# Après nettoyage
|
|
window.cleanup_expired()
|
|
assert len(window.failures) == 1
|
|
assert window.failures[0] == recent_failure
|
|
|
|
def test_serialization(self):
|
|
"""Test de sérialisation/désérialisation"""
|
|
now = datetime.now()
|
|
failure = FailureEvent(now, "workflow_1", "step_1", "TARGET_NOT_FOUND")
|
|
|
|
original = FailureWindow(now, 600, [failure])
|
|
|
|
# Sérialisation
|
|
data = original.to_dict()
|
|
assert data['window_duration_s'] == 600
|
|
assert len(data['failures']) == 1
|
|
|
|
# Désérialisation
|
|
restored = FailureWindow.from_dict(data)
|
|
assert restored.window_duration_s == original.window_duration_s
|
|
assert len(restored.failures) == 1
|
|
assert restored.failures[0].workflow_id == failure.workflow_id
|
|
|
|
|
|
class TestVersionInfo:
|
|
"""Tests pour VersionInfo"""
|
|
|
|
def test_creation(self):
|
|
"""Test de création de VersionInfo"""
|
|
now = datetime.now()
|
|
version = VersionInfo(
|
|
version_id="v001",
|
|
created_at=now,
|
|
workflow_id="test_workflow",
|
|
success_rate_before=0.85,
|
|
success_rate_after=None,
|
|
components_versioned=["prototypes", "faiss"]
|
|
)
|
|
|
|
assert version.version_id == "v001"
|
|
assert version.created_at == now
|
|
assert version.workflow_id == "test_workflow"
|
|
assert version.success_rate_before == 0.85
|
|
assert version.success_rate_after is None
|
|
assert version.components_versioned == ["prototypes", "faiss"]
|
|
|
|
def test_serialization(self):
|
|
"""Test de sérialisation/désérialisation"""
|
|
now = datetime.now()
|
|
original = VersionInfo(
|
|
version_id="v002",
|
|
created_at=now,
|
|
workflow_id="test_workflow",
|
|
success_rate_before=0.90,
|
|
success_rate_after=0.75,
|
|
components_versioned=["prototypes", "faiss", "memory"]
|
|
)
|
|
|
|
# Sérialisation
|
|
data = original.to_dict()
|
|
assert data['version_id'] == "v002"
|
|
assert data['success_rate_before'] == 0.90
|
|
assert data['success_rate_after'] == 0.75
|
|
assert data['components_versioned'] == ["prototypes", "faiss", "memory"]
|
|
|
|
# Désérialisation
|
|
restored = VersionInfo.from_dict(data)
|
|
assert restored.version_id == original.version_id
|
|
assert restored.workflow_id == original.workflow_id
|
|
assert restored.success_rate_before == original.success_rate_before
|
|
assert restored.success_rate_after == original.success_rate_after
|
|
assert restored.components_versioned == original.components_versioned
|
|
assert abs((restored.created_at - original.created_at).total_seconds()) < 1
|
|
|
|
|
|
class TestPolicyConfig:
|
|
"""Tests pour PolicyConfig"""
|
|
|
|
def test_default_creation(self):
|
|
"""Test de création avec valeurs par défaut"""
|
|
policy = PolicyConfig()
|
|
|
|
assert policy.mode == "hybrid"
|
|
assert policy.step_fail_streak_to_degraded == 3
|
|
assert policy.workflow_fail_window_s == 600
|
|
assert policy.min_confidence_normal == 0.72
|
|
assert policy.min_confidence_degraded == 0.82
|
|
assert policy.disable_learning_in_degraded is True
|
|
|
|
def test_from_dict(self):
|
|
"""Test de création depuis dictionnaire"""
|
|
data = {
|
|
"mode": "conservative",
|
|
"step_fail_streak_to_degraded": 2,
|
|
"min_confidence_normal": 0.80,
|
|
"quarantine_duration_s": 3600
|
|
}
|
|
|
|
policy = PolicyConfig.from_dict(data)
|
|
assert policy.mode == "conservative"
|
|
assert policy.step_fail_streak_to_degraded == 2
|
|
assert policy.min_confidence_normal == 0.80
|
|
assert policy.quarantine_duration_s == 3600
|
|
# Valeurs par défaut pour les autres
|
|
assert policy.workflow_fail_window_s == 600
|
|
|
|
def test_validation_success(self):
|
|
"""Test de validation réussie"""
|
|
policy = PolicyConfig()
|
|
errors = policy.validate()
|
|
assert len(errors) == 0
|
|
|
|
def test_validation_errors(self):
|
|
"""Test de validation avec erreurs"""
|
|
policy = PolicyConfig(
|
|
mode="invalid_mode",
|
|
step_fail_streak_to_degraded=0,
|
|
min_confidence_normal=1.5,
|
|
min_confidence_degraded=0.5, # Plus bas que normal
|
|
workflow_fail_window_s=30, # Trop court
|
|
quarantine_duration_s=100 # Trop court
|
|
)
|
|
|
|
errors = policy.validate()
|
|
assert len(errors) > 0
|
|
|
|
# Vérifier quelques erreurs spécifiques
|
|
error_messages = " ".join(errors)
|
|
assert "Invalid mode" in error_messages
|
|
assert "step_fail_streak_to_degraded must be >= 1" in error_messages
|
|
assert "min_confidence_normal must be between 0.0 and 1.0" in error_messages
|
|
assert "min_confidence_degraded must be >= min_confidence_normal" in error_messages
|
|
|
|
def test_serialization(self):
|
|
"""Test de sérialisation"""
|
|
policy = PolicyConfig(
|
|
mode="aggressive",
|
|
step_fail_streak_to_degraded=5,
|
|
min_confidence_normal=0.65
|
|
)
|
|
|
|
data = policy.to_dict()
|
|
assert data['mode'] == "aggressive"
|
|
assert data['step_fail_streak_to_degraded'] == 5
|
|
assert data['min_confidence_normal'] == 0.65
|
|
|
|
# Vérifier que toutes les clés sont présentes
|
|
expected_keys = {
|
|
'mode', 'step_fail_streak_to_degraded', 'workflow_fail_window_s',
|
|
'workflow_fail_max_in_window', 'global_fail_max_in_window',
|
|
'min_confidence_normal', 'min_confidence_degraded', 'min_margin_top1_top2_degraded',
|
|
'disable_learning_in_degraded', 'rollback_on_regression', 'regression_window_steps',
|
|
'regression_fail_ratio', 'quarantine_duration_s', 'max_versions_to_keep'
|
|
}
|
|
assert set(data.keys()) == expected_keys
|
|
|
|
|
|
class TestDataModelIntegration:
|
|
"""Tests d'intégration entre les modèles de données"""
|
|
|
|
def test_complete_workflow_state_cycle(self):
|
|
"""Test d'un cycle complet d'état de workflow"""
|
|
now = datetime.now()
|
|
|
|
# Créer un état initial
|
|
state_info = ExecutionStateInfo(
|
|
workflow_id="integration_test",
|
|
current_state=ExecutionState.RUNNING,
|
|
state_since=now,
|
|
failure_count=0,
|
|
last_failure=None,
|
|
confidence_threshold=0.72,
|
|
learning_enabled=True,
|
|
quarantine_until=None
|
|
)
|
|
|
|
# Simuler une transition vers DEGRADED
|
|
state_info.current_state = ExecutionState.DEGRADED
|
|
state_info.confidence_threshold = 0.82
|
|
state_info.learning_enabled = False
|
|
state_info.failure_count = 3
|
|
state_info.last_failure = now
|
|
|
|
# Vérifier la sérialisation/désérialisation
|
|
data = state_info.to_dict()
|
|
restored = ExecutionStateInfo.from_dict(data)
|
|
|
|
assert restored.current_state == ExecutionState.DEGRADED
|
|
assert restored.confidence_threshold == 0.82
|
|
assert restored.learning_enabled is False
|
|
assert restored.failure_count == 3
|
|
|
|
def test_failure_window_with_multiple_events(self):
|
|
"""Test de fenêtre d'échecs avec plusieurs événements"""
|
|
now = datetime.now()
|
|
window = FailureWindow(now, 600, [])
|
|
|
|
# Ajouter plusieurs échecs
|
|
for i in range(5):
|
|
failure = FailureEvent(
|
|
timestamp=now - timedelta(minutes=i),
|
|
workflow_id="test_workflow",
|
|
step_id=f"step_{i}",
|
|
failure_type="TARGET_NOT_FOUND"
|
|
)
|
|
window.add_failure(failure)
|
|
|
|
assert window.get_failure_count() == 5
|
|
|
|
# Sérialiser et désérialiser
|
|
data = window.to_dict()
|
|
restored = FailureWindow.from_dict(data)
|
|
|
|
assert restored.get_failure_count() == 5
|
|
assert len(restored.failures) == 5
|
|
|
|
# Vérifier que les échecs sont correctement restaurés
|
|
for i, failure in enumerate(restored.failures):
|
|
assert failure.step_id == f"step_{i}"
|
|
assert failure.failure_type == "TARGET_NOT_FOUND"
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__]) |