""" Circuit Breaker pour la gestion des échecs et auto-healing - Fiche #22 Implémente un mécanisme de circuit breaker avec fenêtres glissantes, seuils de déclenchement et gestion des échecs pour le système RPA Vision V3. Auteur: Kiro AI Assistant - 7 janvier 2026 """ import time from datetime import datetime, timedelta from collections import defaultdict, deque from typing import Dict, List, Optional, Any from core.system.models import SimpleFailureEvent class SlidingWindow: """Fenêtre glissante pour compter les échecs dans une période donnée""" def __init__(self, window_duration_s: int): self.window_duration_s = window_duration_s self.failures = deque() def add_failure(self, failure_event): """Ajouter un échec à la fenêtre""" self.failures.append(failure_event) self._cleanup_old_failures() def get_failure_count(self) -> int: """Obtenir le nombre d'échecs dans la fenêtre actuelle""" self._cleanup_old_failures() return len(self.failures) def get_failure_types(self) -> Dict[str, int]: """Obtenir les types d'échecs et leurs compteurs""" self._cleanup_old_failures() failure_types = defaultdict(int) for failure in self.failures: failure_types[failure.failure_type] += 1 return dict(failure_types) def _cleanup_old_failures(self): """Nettoyer les échecs expirés de la fenêtre""" cutoff_time = datetime.now() - timedelta(seconds=self.window_duration_s) while self.failures and self.failures[0].timestamp < cutoff_time: self.failures.popleft() class CircuitBreaker: """ Circuit Breaker pour la gestion des échecs et déclenchement des modes de récupération. Gère trois niveaux de déclenchement: - DEGRADED: Échecs consécutifs sur une étape spécifique - QUARANTINED: Trop d'échecs dans une fenêtre de temps pour un workflow - GLOBAL_PAUSE: Trop d'échecs globaux dans le système """ def __init__(self, policy: Dict[str, Any]): """ Initialiser le CircuitBreaker avec une politique de gestion des échecs. Args: policy: Configuration avec les seuils et fenêtres - step_fail_streak_to_degraded: Nombre d'échecs consécutifs pour DEGRADED - workflow_fail_window_s: Durée de la fenêtre pour workflow (secondes) - workflow_fail_max_in_window: Max échecs par workflow dans la fenêtre - global_fail_max_in_window: Max échecs globaux dans la fenêtre - success_reset_threshold: Nombre de succès pour reset des échecs """ self.policy = policy # Échecs consécutifs par étape (workflow_id:step_id -> List[SimpleFailureEvent]) self.step_consecutive_failures: Dict[str, List[SimpleFailureEvent]] = defaultdict(list) # Compteurs de succès par étape pour reset self.step_success_counts: Dict[str, int] = defaultdict(int) # Fenêtres glissantes par workflow self.workflow_windows: Dict[str, SlidingWindow] = {} # Fenêtre glissante globale self.global_window = SlidingWindow(policy.get('workflow_fail_window_s', 600)) def record_failure(self, workflow_id: str, step_id: str, failure_type: str): """ Enregistrer un échec pour une étape spécifique. Args: workflow_id: Identifiant du workflow step_id: Identifiant de l'étape failure_type: Type d'échec (TARGET_NOT_FOUND, TIMEOUT, etc.) """ failure_event = SimpleFailureEvent( timestamp=datetime.now(), workflow_id=workflow_id, step_id=step_id, failure_type=failure_type ) # Enregistrer l'échec consécutif pour l'étape step_key = f"{workflow_id}:{step_id}" self.step_consecutive_failures[step_key].append(failure_event) # Reset le compteur de succès pour cette étape self.step_success_counts[step_key] = 0 # Ajouter à la fenêtre du workflow if workflow_id not in self.workflow_windows: self.workflow_windows[workflow_id] = SlidingWindow( self.policy.get('workflow_fail_window_s', 600) ) self.workflow_windows[workflow_id].add_failure(failure_event) # Ajouter à la fenêtre globale self.global_window.add_failure(failure_event) def record_success(self, workflow_id: str, step_id: str): """ Enregistrer un succès pour une étape spécifique. Args: workflow_id: Identifiant du workflow step_id: Identifiant de l'étape """ step_key = f"{workflow_id}:{step_id}" self.step_success_counts[step_key] += 1 # Si on atteint le seuil de succès, reset les échecs consécutifs success_threshold = self.policy.get('success_reset_threshold', 2) if self.step_success_counts[step_key] >= success_threshold: self.step_consecutive_failures[step_key] = [] def should_trigger_degraded(self, workflow_id: str, step_id: str) -> bool: """ Vérifier si le mode DEGRADED doit être déclenché pour une étape. Args: workflow_id: Identifiant du workflow step_id: Identifiant de l'étape Returns: True si le mode DEGRADED doit être déclenché """ step_key = f"{workflow_id}:{step_id}" consecutive_failures = len(self.step_consecutive_failures[step_key]) threshold = self.policy.get('step_fail_streak_to_degraded', 3) return consecutive_failures >= threshold def should_trigger_quarantine(self, workflow_id: str) -> bool: """ Vérifier si le mode QUARANTINED doit être déclenché pour un workflow. Args: workflow_id: Identifiant du workflow Returns: True si le mode QUARANTINED doit être déclenché """ if workflow_id not in self.workflow_windows: return False failure_count = self.workflow_windows[workflow_id].get_failure_count() threshold = self.policy.get('workflow_fail_max_in_window', 10) return failure_count >= threshold def should_trigger_global_pause(self) -> bool: """ Vérifier si le PAUSE global doit être déclenché. Returns: True si le PAUSE global doit être déclenché """ global_failure_count = self.global_window.get_failure_count() threshold = self.policy.get('global_fail_max_in_window', 30) return global_failure_count >= threshold def get_failure_counts(self, workflow_id: str) -> Dict[str, Any]: """ Obtenir les compteurs d'échecs pour un workflow. Args: workflow_id: Identifiant du workflow Returns: Dictionnaire avec les compteurs d'échecs """ # Compteurs d'échecs consécutifs par étape step_consecutive = {} for step_key, failures in self.step_consecutive_failures.items(): if step_key.startswith(f"{workflow_id}:"): step_id = step_key.split(":", 1)[1] step_consecutive[step_id] = len(failures) # Compteur d'échecs dans la fenêtre du workflow workflow_window_count = 0 if workflow_id in self.workflow_windows: workflow_window_count = self.workflow_windows[workflow_id].get_failure_count() return { 'step_consecutive': step_consecutive, 'workflow_window': workflow_window_count, 'global_window': self.global_window.get_failure_count(), 'window_duration_s': self.policy.get('workflow_fail_window_s', 600) } def get_step_failure_history(self, workflow_id: str, step_id: str, limit: Optional[int] = None) -> List[SimpleFailureEvent]: """ Obtenir l'historique des échecs pour une étape spécifique. Args: workflow_id: Identifiant du workflow step_id: Identifiant de l'étape limit: Nombre maximum d'échecs à retourner (les plus récents) Returns: Liste des échecs pour l'étape """ step_key = f"{workflow_id}:{step_id}" failures = self.step_consecutive_failures[step_key] if limit is not None: failures = failures[-limit:] return failures def get_workflow_failure_types(self, workflow_id: str) -> Dict[str, int]: """ Obtenir les types d'échecs et leurs compteurs pour un workflow. Args: workflow_id: Identifiant du workflow Returns: Dictionnaire des types d'échecs et leurs compteurs """ if workflow_id not in self.workflow_windows: return {} return self.workflow_windows[workflow_id].get_failure_types() def cleanup_old_data(self): """ Nettoyer les anciennes données expirées. """ # Nettoyer les fenêtres glissantes (se fait automatiquement) for workflow_window in self.workflow_windows.values(): workflow_window._cleanup_old_failures() self.global_window._cleanup_old_failures() # Nettoyer les échecs consécutifs très anciens (plus de 1 heure) cutoff_time = datetime.now() - timedelta(hours=1) keys_to_clean = [] for step_key, failures in self.step_consecutive_failures.items(): # Garder seulement les échecs récents recent_failures = [f for f in failures if f.timestamp > cutoff_time] if recent_failures: self.step_consecutive_failures[step_key] = recent_failures else: keys_to_clean.append(step_key) # Supprimer les clés vides for key in keys_to_clean: del self.step_consecutive_failures[key] if key in self.step_success_counts: del self.step_success_counts[key] def reset_step_failures(self, workflow_id: str, step_id: str): """ Réinitialiser manuellement les échecs pour une étape spécifique. Args: workflow_id: Identifiant du workflow step_id: Identifiant de l'étape """ step_key = f"{workflow_id}:{step_id}" if step_key in self.step_consecutive_failures: del self.step_consecutive_failures[step_key] self.step_success_counts[step_key] = 0 def reset_workflow_failures(self, workflow_id: str): """ Réinitialiser manuellement tous les échecs pour un workflow. Args: workflow_id: Identifiant du workflow """ # Reset la fenêtre du workflow if workflow_id in self.workflow_windows: self.workflow_windows[workflow_id].failures.clear() # Reset tous les échecs consécutifs pour ce workflow keys_to_remove = [] for step_key in self.step_consecutive_failures.keys(): if step_key.startswith(f"{workflow_id}:"): keys_to_remove.append(step_key) for key in keys_to_remove: del self.step_consecutive_failures[key] if key in self.step_success_counts: del self.step_success_counts[key] def get_status_summary(self) -> Dict[str, Any]: """ Obtenir un résumé complet du statut du CircuitBreaker. Returns: Dictionnaire avec le statut complet """ # Statistiques globales global_failure_types = self.global_window.get_failure_types() # Compter les workflows avec échecs workflows_with_failures = len([wf for wf in self.workflow_windows.keys() if self.workflow_windows[wf].get_failure_count() > 0]) # Compter les étapes avec échecs consécutifs steps_with_failures = len([key for key, failures in self.step_consecutive_failures.items() if len(failures) > 0]) return { 'timestamp': datetime.now().isoformat(), 'policy': self.policy, 'global_stats': { 'global_failures_in_window': self.global_window.get_failure_count(), 'workflows_with_failures': workflows_with_failures, 'steps_with_consecutive_failures': steps_with_failures, 'global_failure_types': global_failure_types }, 'thresholds': { 'step_consecutive_to_degraded': self.policy.get('step_fail_streak_to_degraded', 3), 'workflow_window_to_quarantine': self.policy.get('workflow_fail_max_in_window', 10), 'global_window_to_pause': self.policy.get('global_fail_max_in_window', 30), 'window_duration_s': self.policy.get('workflow_fail_window_s', 600) } }