""" ExecutionRobustness - Robustesse d'exécution avec retry et récupération Ce module ajoute: - Retry avec backoff exponentiel - Attente d'élément avec re-détection - Récupération d'état après échec - Gestion d'écran inconnu - Diagnostics détaillés d'échec """ import logging import time from typing import Optional, Dict, Any, Callable, List, Tuple from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path logger = logging.getLogger(__name__) # ============================================================================= # Dataclasses # ============================================================================= @dataclass class RetryConfig: """Configuration des retries""" max_retries: int = 3 base_delay_ms: float = 1000.0 # Délai de base en ms max_delay_ms: float = 30000.0 # Délai max en ms exponential_base: float = 2.0 # Base pour backoff exponentiel jitter_factor: float = 0.1 # Facteur de jitter (0-1) @dataclass class WaitConfig: """Configuration de l'attente d'élément""" timeout_ms: float = 10000.0 # Timeout total poll_interval_ms: float = 500.0 # Intervalle de re-détection min_confidence: float = 0.7 # Confiance minimum @dataclass class RecoveryConfig: """Configuration de la récupération""" enable_state_recovery: bool = True max_recovery_attempts: int = 3 recovery_timeout_ms: float = 30000.0 @dataclass class RetryResult: """Résultat d'une opération avec retry""" success: bool attempts: int total_delay_ms: float last_error: Optional[Exception] = None result: Any = None delays_used: List[float] = field(default_factory=list) @dataclass class WaitResult: """Résultat d'une attente d'élément""" found: bool element: Any = None confidence: float = 0.0 wait_time_ms: float = 0.0 detection_attempts: int = 0 @dataclass class RecoveryResult: """Résultat d'une tentative de récupération""" recovered: bool new_node_id: Optional[str] = None recovery_path: List[str] = field(default_factory=list) message: str = "" @dataclass class FailureDiagnostics: """Diagnostics détaillés d'un échec""" failure_type: str timestamp: datetime screenshot_path: Optional[str] = None match_scores: Dict[str, float] = field(default_factory=dict) attempted_strategies: List[str] = field(default_factory=list) context: Dict[str, Any] = field(default_factory=dict) recommendations: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: return { "failure_type": self.failure_type, "timestamp": self.timestamp.isoformat(), "screenshot_path": self.screenshot_path, "match_scores": self.match_scores, "attempted_strategies": self.attempted_strategies, "context": self.context, "recommendations": self.recommendations } class FailureType(Enum): """Types d'échec""" ACTION_FAILED = "action_failed" ELEMENT_NOT_FOUND = "element_not_found" STATE_MISMATCH = "state_mismatch" UNKNOWN_SCREEN = "unknown_screen" TIMEOUT = "timeout" NETWORK_ERROR = "network_error" # ============================================================================= # Gestionnaire de Retry # ============================================================================= class RetryManager: """ Gestionnaire de retry avec backoff exponentiel. Formule: delay = base_delay * (exponential_base ^ (attempt - 1)) Example: >>> manager = RetryManager() >>> result = manager.execute_with_retry(my_function, args) """ def __init__(self, config: Optional[RetryConfig] = None): """ Initialiser le gestionnaire. Args: config: Configuration des retries """ self.config = config or RetryConfig() logger.info(f"RetryManager initialisé (max_retries={self.config.max_retries})") def execute_with_retry( self, func: Callable, *args, on_retry: Optional[Callable[[int, Exception], None]] = None, **kwargs ) -> RetryResult: """ Exécuter une fonction avec retry et backoff exponentiel. Args: func: Fonction à exécuter *args: Arguments positionnels on_retry: Callback appelé à chaque retry **kwargs: Arguments nommés Returns: RetryResult avec résultat ou erreur """ attempts = 0 total_delay = 0.0 delays_used = [] last_error = None while attempts <= self.config.max_retries: attempts += 1 try: result = func(*args, **kwargs) return RetryResult( success=True, attempts=attempts, total_delay_ms=total_delay, result=result, delays_used=delays_used ) except Exception as e: last_error = e logger.warning(f"Tentative {attempts} échouée: {e}") if attempts > self.config.max_retries: break # Calculer délai avec backoff exponentiel delay = self.compute_delay(attempts) delays_used.append(delay) total_delay += delay # Callback de retry if on_retry: try: on_retry(attempts, e) except Exception as cb_error: logger.warning(f"Erreur callback retry: {cb_error}") # Attendre time.sleep(delay / 1000.0) return RetryResult( success=False, attempts=attempts, total_delay_ms=total_delay, last_error=last_error, delays_used=delays_used ) def compute_delay(self, attempt: int) -> float: """ Calculer le délai pour une tentative donnée. Formule: base_delay * (exponential_base ^ (attempt - 1)) + jitter Args: attempt: Numéro de tentative (1-based) Returns: Délai en millisecondes """ # Backoff exponentiel delay = self.config.base_delay_ms * ( self.config.exponential_base ** (attempt - 1) ) # Appliquer jitter import random jitter = delay * self.config.jitter_factor * random.random() delay += jitter # Plafonner au max delay = min(delay, self.config.max_delay_ms) return delay def get_expected_delays(self) -> List[float]: """ Obtenir les délais attendus pour chaque tentative. Returns: Liste des délais en ms """ delays = [] for attempt in range(1, self.config.max_retries + 1): delay = self.config.base_delay_ms * ( self.config.exponential_base ** (attempt - 1) ) delay = min(delay, self.config.max_delay_ms) delays.append(delay) return delays # ============================================================================= # Gestionnaire d'Attente d'Élément # ============================================================================= class ElementWaiter: """ Gestionnaire d'attente d'élément avec re-détection périodique. Example: >>> waiter = ElementWaiter() >>> result = waiter.wait_for_element(detector, target_spec) """ def __init__(self, config: Optional[WaitConfig] = None): """ Initialiser le gestionnaire. Args: config: Configuration de l'attente """ self.config = config or WaitConfig() logger.info(f"ElementWaiter initialisé (timeout={self.config.timeout_ms}ms)") def wait_for_element( self, detect_func: Callable[[], Optional[Any]], confidence_func: Optional[Callable[[Any], float]] = None, on_poll: Optional[Callable[[int], None]] = None ) -> WaitResult: """ Attendre qu'un élément soit détecté. Args: detect_func: Fonction de détection (retourne élément ou None) confidence_func: Fonction pour obtenir la confiance on_poll: Callback à chaque tentative de détection Returns: WaitResult avec élément trouvé ou timeout """ start_time = time.time() attempts = 0 while True: attempts += 1 elapsed_ms = (time.time() - start_time) * 1000 # Vérifier timeout if elapsed_ms >= self.config.timeout_ms: return WaitResult( found=False, wait_time_ms=elapsed_ms, detection_attempts=attempts ) # Callback de poll if on_poll: try: on_poll(attempts) except Exception as e: logger.warning(f"Erreur callback poll: {e}") # Tenter détection try: element = detect_func() if element is not None: # Vérifier confiance si fonction fournie confidence = 1.0 if confidence_func: confidence = confidence_func(element) if confidence >= self.config.min_confidence: return WaitResult( found=True, element=element, confidence=confidence, wait_time_ms=elapsed_ms, detection_attempts=attempts ) except Exception as e: logger.debug(f"Erreur détection (tentative {attempts}): {e}") # Attendre avant prochaine tentative time.sleep(self.config.poll_interval_ms / 1000.0) # ============================================================================= # Gestionnaire de Récupération d'État # ============================================================================= class StateRecoveryManager: """ Gestionnaire de récupération d'état après échec. Tente de re-matcher l'état actuel vers le graphe de workflow et trouve un chemin de récupération si possible. Example: >>> recovery = StateRecoveryManager(pipeline) >>> result = recovery.attempt_recovery(workflow_id, screenshot) """ def __init__( self, pipeline: Any, config: Optional[RecoveryConfig] = None ): """ Initialiser le gestionnaire. Args: pipeline: WorkflowPipeline pour matching config: Configuration de récupération """ self.pipeline = pipeline self.config = config or RecoveryConfig() logger.info("StateRecoveryManager initialisé") def attempt_recovery( self, workflow_id: str, screenshot_path: str, expected_node_id: Optional[str] = None ) -> RecoveryResult: """ Tenter de récupérer après un échec. Args: workflow_id: ID du workflow screenshot_path: Chemin du screenshot actuel expected_node_id: Node attendu (optionnel) Returns: RecoveryResult avec nouveau node ou échec """ if not self.config.enable_state_recovery: return RecoveryResult( recovered=False, message="State recovery disabled" ) logger.info(f"Tentative de récupération pour workflow {workflow_id}") for attempt in range(self.config.max_recovery_attempts): try: # Re-matcher l'état actuel match = self.pipeline.match_current_state( screenshot_path, workflow_id=workflow_id ) if match and match.get("confidence", 0) > 0.5: new_node_id = match["node_id"] # Vérifier si c'est un node valide workflow = self.pipeline.load_workflow(workflow_id) if workflow and any(n.node_id == new_node_id for n in workflow.nodes): # Trouver chemin de récupération si node différent recovery_path = [] if expected_node_id and new_node_id != expected_node_id: recovery_path = self._find_recovery_path( workflow, new_node_id, expected_node_id ) return RecoveryResult( recovered=True, new_node_id=new_node_id, recovery_path=recovery_path, message=f"Récupéré vers node {new_node_id}" ) # Attendre avant prochaine tentative time.sleep(1.0) except Exception as e: logger.warning(f"Erreur récupération (tentative {attempt + 1}): {e}") return RecoveryResult( recovered=False, message="Échec de récupération après toutes les tentatives" ) def _find_recovery_path( self, workflow: Any, from_node: str, to_node: str ) -> List[str]: """Trouver un chemin entre deux nodes (BFS).""" from collections import deque edges = getattr(workflow, 'edges', []) # Construire graphe d'adjacence adjacency = {} for edge in edges: if edge.from_node not in adjacency: adjacency[edge.from_node] = [] adjacency[edge.from_node].append(edge.to_node) # BFS queue = deque([(from_node, [from_node])]) visited = {from_node} while queue: current, path = queue.popleft() if current == to_node: return path for neighbor in adjacency.get(current, []): if neighbor not in visited: visited.add(neighbor) queue.append((neighbor, path + [neighbor])) return [] # Pas de chemin trouvé # ============================================================================= # Gestionnaire de Diagnostics # ============================================================================= class DiagnosticsManager: """ Gestionnaire de diagnostics détaillés d'échec. Collecte et enregistre les informations de diagnostic pour faciliter le débogage. Example: >>> diagnostics = DiagnosticsManager() >>> report = diagnostics.create_failure_report(...) """ def __init__(self, logs_dir: str = "data/diagnostics"): """ Initialiser le gestionnaire. Args: logs_dir: Répertoire pour les logs de diagnostic """ self.logs_dir = Path(logs_dir) self.logs_dir.mkdir(parents=True, exist_ok=True) self._failure_history: List[FailureDiagnostics] = [] logger.info(f"DiagnosticsManager initialisé: {logs_dir}") def create_failure_report( self, failure_type: FailureType, screenshot_path: Optional[str] = None, match_scores: Optional[Dict[str, float]] = None, attempted_strategies: Optional[List[str]] = None, context: Optional[Dict[str, Any]] = None ) -> FailureDiagnostics: """ Créer un rapport de diagnostic d'échec. Args: failure_type: Type d'échec screenshot_path: Chemin du screenshot match_scores: Scores de matching par node attempted_strategies: Stratégies tentées context: Contexte additionnel Returns: FailureDiagnostics avec recommandations """ # Générer recommandations basées sur le type d'échec recommendations = self._generate_recommendations( failure_type, match_scores, context ) diagnostics = FailureDiagnostics( failure_type=failure_type.value, timestamp=datetime.now(), screenshot_path=screenshot_path, match_scores=match_scores or {}, attempted_strategies=attempted_strategies or [], context=context or {}, recommendations=recommendations ) # Enregistrer dans l'historique self._failure_history.append(diagnostics) # Sauvegarder sur disque self._save_diagnostics(diagnostics) logger.info(f"Diagnostic créé: {failure_type.value}") return diagnostics def _generate_recommendations( self, failure_type: FailureType, match_scores: Optional[Dict[str, float]], context: Optional[Dict[str, Any]] ) -> List[str]: """Générer des recommandations basées sur l'échec.""" recommendations = [] if failure_type == FailureType.ELEMENT_NOT_FOUND: recommendations.append("Vérifier que l'élément cible est visible à l'écran") recommendations.append("Augmenter le timeout d'attente") recommendations.append("Vérifier les sélecteurs de l'élément") elif failure_type == FailureType.STATE_MISMATCH: recommendations.append("L'écran actuel ne correspond pas à l'état attendu") if match_scores: best_match = max(match_scores.items(), key=lambda x: x[1]) recommendations.append(f"Meilleur match: {best_match[0]} ({best_match[1]:.2%})") recommendations.append("Considérer l'ajout d'une variante pour ce nouvel état") elif failure_type == FailureType.UNKNOWN_SCREEN: recommendations.append("Écran non reconnu dans le workflow") recommendations.append("Vérifier si une popup ou modal bloque l'écran") recommendations.append("Considérer l'entraînement avec ce nouvel écran") elif failure_type == FailureType.ACTION_FAILED: recommendations.append("L'action n'a pas pu être exécutée") recommendations.append("Vérifier que l'élément cible est cliquable") recommendations.append("Vérifier les permissions de l'application") elif failure_type == FailureType.TIMEOUT: recommendations.append("Opération expirée") recommendations.append("Augmenter les timeouts de configuration") recommendations.append("Vérifier la réactivité de l'application") return recommendations def _save_diagnostics(self, diagnostics: FailureDiagnostics) -> None: """Sauvegarder les diagnostics sur disque.""" import json filename = f"failure_{diagnostics.timestamp.strftime('%Y%m%d_%H%M%S')}.json" filepath = self.logs_dir / filename with open(filepath, 'w') as f: json.dump(diagnostics.to_dict(), f, indent=2) def get_failure_history(self) -> List[FailureDiagnostics]: """Obtenir l'historique des échecs.""" return self._failure_history def get_failure_stats(self) -> Dict[str, int]: """Obtenir les statistiques d'échec par type.""" stats = {} for diag in self._failure_history: stats[diag.failure_type] = stats.get(diag.failure_type, 0) + 1 return stats # ============================================================================= # Classe principale de robustesse # ============================================================================= class ExecutionRobustness: """ Classe principale regroupant toutes les fonctionnalités de robustesse. Example: >>> robustness = ExecutionRobustness(pipeline) >>> result = robustness.execute_with_robustness(action_func) """ def __init__( self, pipeline: Any, retry_config: Optional[RetryConfig] = None, wait_config: Optional[WaitConfig] = None, recovery_config: Optional[RecoveryConfig] = None ): """ Initialiser la robustesse d'exécution. Args: pipeline: WorkflowPipeline retry_config: Configuration des retries wait_config: Configuration de l'attente recovery_config: Configuration de la récupération """ self.pipeline = pipeline self.retry_manager = RetryManager(retry_config) self.element_waiter = ElementWaiter(wait_config) self.state_recovery = StateRecoveryManager(pipeline, recovery_config) self.diagnostics = DiagnosticsManager() logger.info("ExecutionRobustness initialisé") def execute_with_robustness( self, action_func: Callable, workflow_id: str, screenshot_path: str, *args, **kwargs ) -> Tuple[bool, Any, Optional[FailureDiagnostics]]: """ Exécuter une action avec toutes les protections de robustesse. Args: action_func: Fonction d'action à exécuter workflow_id: ID du workflow screenshot_path: Chemin du screenshot actuel *args, **kwargs: Arguments pour action_func Returns: Tuple (success, result, diagnostics) """ # Tentative avec retry retry_result = self.retry_manager.execute_with_retry( action_func, *args, **kwargs ) if retry_result.success: return True, retry_result.result, None # Échec - créer diagnostics diagnostics = self.diagnostics.create_failure_report( failure_type=FailureType.ACTION_FAILED, screenshot_path=screenshot_path, context={ "attempts": retry_result.attempts, "total_delay_ms": retry_result.total_delay_ms, "error": str(retry_result.last_error) } ) # Tenter récupération recovery_result = self.state_recovery.attempt_recovery( workflow_id, screenshot_path ) if recovery_result.recovered: logger.info(f"Récupération réussie: {recovery_result.new_node_id}") return False, recovery_result, diagnostics return False, None, diagnostics # ============================================================================= # Fonctions utilitaires # ============================================================================= def create_robustness( pipeline: Any, max_retries: int = 3, base_delay_ms: float = 1000.0 ) -> ExecutionRobustness: """ Créer une instance de robustesse avec configuration personnalisée. Args: pipeline: WorkflowPipeline max_retries: Nombre max de retries base_delay_ms: Délai de base Returns: ExecutionRobustness configuré """ retry_config = RetryConfig( max_retries=max_retries, base_delay_ms=base_delay_ms ) return ExecutionRobustness(pipeline, retry_config=retry_config)