Files
rpa_vision_v3/core/execution/recovery_strategies.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

833 lines
31 KiB
Python

"""
Recovery Strategies - Stratégies de récupération pour ErrorHandler
Ce module implémente les stratégies de récupération spécialisées pour différents types d'erreurs:
- SpatialFallbackStrategy pour TargetNotFoundError
- SemanticVariantStrategy pour UIElementChangedError
- RetryWithBackoffStrategy pour NetworkError
- DataNormalizationStrategy pour ValidationError
Chaque stratégie implémente l'interface BaseRecoveryStrategy et fournit une logique
de récupération spécialisée pour son type d'erreur.
"""
import logging
import time
import re
from abc import ABC, abstractmethod
from typing import Dict, Any, Optional, List, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
logger = logging.getLogger(__name__)
class RecoveryStrategyType(Enum):
"""Types de stratégies de récupération"""
SPATIAL_FALLBACK = "spatial_fallback"
SEMANTIC_VARIANT = "semantic_variant"
RETRY_WITH_BACKOFF = "retry_with_backoff"
DATA_NORMALIZATION = "data_normalization"
@dataclass
class RecoveryContext:
"""Contexte pour une tentative de récupération"""
error_type: str
error_message: str
original_data: Dict[str, Any]
attempt_number: int
max_attempts: int
timestamp: datetime
additional_context: Dict[str, Any]
@dataclass
class RecoveryResult:
"""Résultat d'une tentative de récupération"""
success: bool
should_retry: bool
strategy_used: RecoveryStrategyType
recovery_data: Dict[str, Any]
message: str
escalation_reason: Optional[str] = None
duration_ms: float = 0.0
@classmethod
def success_with_retry(cls, strategy: RecoveryStrategyType, data: Dict[str, Any],
message: str) -> 'RecoveryResult':
"""Créer un résultat de succès avec retry recommandé"""
return cls(
success=True,
should_retry=True,
strategy_used=strategy,
recovery_data=data,
message=message
)
@classmethod
def success_no_retry(cls, strategy: RecoveryStrategyType, data: Dict[str, Any],
message: str) -> 'RecoveryResult':
"""Créer un résultat de succès sans retry"""
return cls(
success=True,
should_retry=False,
strategy_used=strategy,
recovery_data=data,
message=message
)
@classmethod
def failure_with_escalation(cls, strategy: RecoveryStrategyType, reason: str,
message: str) -> 'RecoveryResult':
"""Créer un résultat d'échec avec escalade"""
return cls(
success=False,
should_retry=False,
strategy_used=strategy,
recovery_data={},
message=message,
escalation_reason=reason
)
class BaseRecoveryStrategy(ABC):
"""Interface de base pour les stratégies de récupération"""
def __init__(self, max_attempts: int = 3):
self.max_attempts = max_attempts
self.strategy_type = None # À définir dans les sous-classes
@abstractmethod
def can_handle(self, error_type: str, context: Dict[str, Any]) -> bool:
"""Détermine si cette stratégie peut gérer ce type d'erreur"""
pass
@abstractmethod
def recover(self, context: RecoveryContext) -> RecoveryResult:
"""Exécute la stratégie de récupération"""
pass
def _log_recovery_attempt(self, context: RecoveryContext, result: RecoveryResult):
"""Log une tentative de récupération"""
logger.info(
f"Recovery attempt {context.attempt_number}/{context.max_attempts} "
f"using {self.strategy_type.value}: {result.message}"
)
class SpatialFallbackStrategy(BaseRecoveryStrategy):
"""
Stratégie de récupération spatiale pour TargetNotFoundError
Utilise des critères spatiaux alternatifs quand un élément cible n'est pas trouvé:
- Recherche par position relative (à droite, en dessous, etc.)
- Recherche par zone élargie
- Recherche par similarité visuelle dans une zone plus large
"""
def __init__(self, max_attempts: int = 3, expand_factor: float = 1.5):
super().__init__(max_attempts)
self.strategy_type = RecoveryStrategyType.SPATIAL_FALLBACK
self.expand_factor = expand_factor
def can_handle(self, error_type: str, context: Dict[str, Any]) -> bool:
"""Peut gérer les erreurs de type TargetNotFoundError"""
return error_type in ["TargetNotFoundError", "target_not_found"]
def recover(self, context: RecoveryContext) -> RecoveryResult:
"""
Récupération par fallback spatial
Stratégies appliquées dans l'ordre:
1. Recherche dans une zone élargie
2. Recherche par position relative
3. Recherche par similarité visuelle élargie
"""
start_time = time.time()
try:
# Extraire les données du contexte
target_info = context.original_data.get('target', {})
screen_state = context.additional_context.get('screen_state')
if not target_info or not screen_state:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
"Missing target info or screen state",
"Cannot perform spatial fallback without target and screen data"
)
# Stratégie 1: Zone élargie
if context.attempt_number == 1:
recovery_data = self._expand_search_area(target_info, screen_state)
message = f"Expanded search area by factor {self.expand_factor}"
# Stratégie 2: Position relative
elif context.attempt_number == 2:
recovery_data = self._search_relative_position(target_info, screen_state)
message = "Searching by relative position (nearby elements)"
# Stratégie 3: Similarité visuelle élargie
elif context.attempt_number == 3:
recovery_data = self._visual_similarity_fallback(target_info, screen_state)
message = "Using visual similarity fallback with relaxed criteria"
else:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Max spatial fallback attempts reached ({self.max_attempts})",
"All spatial fallback strategies exhausted"
)
duration_ms = (time.time() - start_time) * 1000
result = RecoveryResult.success_with_retry(
self.strategy_type,
recovery_data,
message
)
result.duration_ms = duration_ms
self._log_recovery_attempt(context, result)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(f"Spatial fallback strategy failed: {e}")
result = RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Strategy execution error: {e}",
f"Spatial fallback failed with exception: {e}"
)
result.duration_ms = duration_ms
return result
def _expand_search_area(self, target_info: Dict[str, Any], screen_state) -> Dict[str, Any]:
"""Élargir la zone de recherche"""
original_bbox = target_info.get('bbox', {})
if not original_bbox:
return {'strategy': 'expand_area', 'success': False, 'reason': 'No bbox available'}
# Élargir la bbox par le facteur d'expansion
expanded_bbox = {
'x': max(0, original_bbox.get('x', 0) - int(original_bbox.get('width', 0) * (self.expand_factor - 1) / 2)),
'y': max(0, original_bbox.get('y', 0) - int(original_bbox.get('height', 0) * (self.expand_factor - 1) / 2)),
'width': int(original_bbox.get('width', 0) * self.expand_factor),
'height': int(original_bbox.get('height', 0) * self.expand_factor)
}
return {
'strategy': 'expand_area',
'original_bbox': original_bbox,
'expanded_bbox': expanded_bbox,
'expand_factor': self.expand_factor
}
def _search_relative_position(self, target_info: Dict[str, Any], screen_state) -> Dict[str, Any]:
"""Rechercher par position relative"""
# Rechercher des éléments proches qui pourraient servir de référence
target_text = target_info.get('text_pattern', '')
target_role = target_info.get('role', '')
relative_positions = ['right', 'below', 'above', 'left']
return {
'strategy': 'relative_position',
'target_text': target_text,
'target_role': target_role,
'search_positions': relative_positions,
'search_radius': 100 # pixels
}
def _visual_similarity_fallback(self, target_info: Dict[str, Any], screen_state) -> Dict[str, Any]:
"""Fallback par similarité visuelle avec critères relaxés"""
return {
'strategy': 'visual_similarity',
'relaxed_threshold': 0.6, # Seuil plus bas
'use_partial_matching': True,
'ignore_color_variations': True,
'target_info': target_info
}
class SemanticVariantStrategy(BaseRecoveryStrategy):
"""
Stratégie de récupération sémantique pour UIElementChangedError
Essaie des variantes sémantiques du texte quand un élément UI a changé:
- Variantes linguistiques (synonymes, traductions)
- Variantes de format (casse, espaces, ponctuation)
- Variantes contextuelles (texte partiel, mots-clés)
"""
def __init__(self, max_attempts: int = 3):
super().__init__(max_attempts)
self.strategy_type = RecoveryStrategyType.SEMANTIC_VARIANT
# Dictionnaire de synonymes courants
self.synonyms = {
'submit': ['send', 'confirm', 'ok', 'apply', 'save'],
'cancel': ['close', 'abort', 'dismiss', 'back'],
'delete': ['remove', 'erase', 'clear'],
'edit': ['modify', 'change', 'update'],
'search': ['find', 'lookup', 'query'],
'login': ['sign in', 'connect', 'authenticate'],
'logout': ['sign out', 'disconnect', 'exit']
}
def can_handle(self, error_type: str, context: Dict[str, Any]) -> bool:
"""Peut gérer les erreurs de changement d'UI"""
return error_type in ["UIElementChangedError", "ui_element_changed", "ui_changed"]
def recover(self, context: RecoveryContext) -> RecoveryResult:
"""
Récupération par variantes sémantiques
Stratégies appliquées dans l'ordre:
1. Variantes de format (casse, espaces)
2. Synonymes et variantes linguistiques
3. Correspondance partielle et mots-clés
"""
start_time = time.time()
try:
# Extraire le texte original
original_text = context.original_data.get('text_pattern', '')
if not original_text:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
"No text pattern available",
"Cannot generate semantic variants without original text"
)
# Générer variantes selon le numéro de tentative
if context.attempt_number == 1:
variants = self._generate_format_variants(original_text)
message = f"Generated {len(variants)} format variants"
elif context.attempt_number == 2:
variants = self._generate_semantic_variants(original_text)
message = f"Generated {len(variants)} semantic variants"
elif context.attempt_number == 3:
variants = self._generate_partial_variants(original_text)
message = f"Generated {len(variants)} partial matching variants"
else:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Max semantic variant attempts reached ({self.max_attempts})",
"All semantic variant strategies exhausted"
)
duration_ms = (time.time() - start_time) * 1000
recovery_data = {
'original_text': original_text,
'variants': variants,
'strategy_type': f'attempt_{context.attempt_number}'
}
result = RecoveryResult.success_with_retry(
self.strategy_type,
recovery_data,
message
)
result.duration_ms = duration_ms
self._log_recovery_attempt(context, result)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(f"Semantic variant strategy failed: {e}")
result = RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Strategy execution error: {e}",
f"Semantic variant generation failed: {e}"
)
result.duration_ms = duration_ms
return result
def _generate_format_variants(self, text: str) -> List[str]:
"""Générer des variantes de format"""
variants = []
# Variantes de casse
variants.extend([
text.lower(),
text.upper(),
text.title(),
text.capitalize()
])
# Variantes d'espaces et ponctuation
variants.extend([
text.strip(),
text.replace(' ', ''),
text.replace('-', ' '),
text.replace('_', ' '),
re.sub(r'[^\w\s]', '', text), # Supprimer ponctuation
re.sub(r'\s+', ' ', text) # Normaliser espaces
])
# Supprimer doublons et texte vide
return list(filter(None, set(variants)))
def _generate_semantic_variants(self, text: str) -> List[str]:
"""Générer des variantes sémantiques"""
variants = []
text_lower = text.lower()
# Chercher des synonymes
for word, synonyms in self.synonyms.items():
if word in text_lower:
for synonym in synonyms:
variants.append(text_lower.replace(word, synonym))
# Variantes communes
common_replacements = {
'btn': 'button',
'button': 'btn',
'&': 'and',
'and': '&',
'ok': 'okay',
'okay': 'ok'
}
for old, new in common_replacements.items():
if old in text_lower:
variants.append(text_lower.replace(old, new))
return variants
def _generate_partial_variants(self, text: str) -> List[str]:
"""Générer des variantes de correspondance partielle"""
variants = []
words = text.split()
if len(words) > 1:
# Mots individuels
variants.extend(words)
# Combinaisons de mots
for i in range(len(words)):
for j in range(i + 1, len(words) + 1):
variants.append(' '.join(words[i:j]))
# Préfixes et suffixes
if len(text) > 3:
variants.extend([
text[:len(text)//2], # Première moitié
text[len(text)//2:], # Deuxième moitié
text[:3], # 3 premiers caractères
text[-3:] # 3 derniers caractères
])
return list(filter(lambda x: len(x) > 1, set(variants)))
class RetryWithBackoffStrategy(BaseRecoveryStrategy):
"""
Stratégie de retry avec backoff exponentiel pour NetworkError
Implémente un retry intelligent avec délais croissants pour les erreurs réseau:
- Backoff exponentiel avec jitter
- Détection de types d'erreurs réseau
- Adaptation du délai selon le type d'erreur
"""
def __init__(self, max_attempts: int = 5, base_delay: float = 1.0, max_delay: float = 60.0):
super().__init__(max_attempts)
self.strategy_type = RecoveryStrategyType.RETRY_WITH_BACKOFF
self.base_delay = base_delay
self.max_delay = max_delay
def can_handle(self, error_type: str, context: Dict[str, Any]) -> bool:
"""Peut gérer les erreurs réseau"""
network_errors = [
"NetworkError", "ConnectionError", "TimeoutError", "HTTPError",
"network_error", "connection_error", "timeout_error", "http_error"
]
return error_type in network_errors
def recover(self, context: RecoveryContext) -> RecoveryResult:
"""
Récupération par retry avec backoff
Calcule le délai approprié et recommande un retry après attente
"""
start_time = time.time()
try:
if context.attempt_number > self.max_attempts:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Max retry attempts reached ({self.max_attempts})",
"Network retry limit exceeded"
)
# Calculer délai avec backoff exponentiel
delay = min(
self.base_delay * (2 ** (context.attempt_number - 1)),
self.max_delay
)
# Ajouter jitter (±25%)
import random
jitter = delay * 0.25 * (random.random() - 0.5)
final_delay = max(0.1, delay + jitter)
# Analyser le type d'erreur pour adapter la stratégie
error_analysis = self._analyze_network_error(context.error_message)
duration_ms = (time.time() - start_time) * 1000
recovery_data = {
'delay_seconds': final_delay,
'attempt_number': context.attempt_number,
'base_delay': self.base_delay,
'calculated_delay': delay,
'jitter': jitter,
'error_analysis': error_analysis
}
message = f"Retry #{context.attempt_number} after {final_delay:.1f}s delay ({error_analysis['category']})"
result = RecoveryResult.success_with_retry(
self.strategy_type,
recovery_data,
message
)
result.duration_ms = duration_ms
self._log_recovery_attempt(context, result)
# Attendre le délai calculé
logger.info(f"Waiting {final_delay:.1f}s before retry...")
time.sleep(final_delay)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(f"Retry with backoff strategy failed: {e}")
result = RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Strategy execution error: {e}",
f"Backoff calculation failed: {e}"
)
result.duration_ms = duration_ms
return result
def _analyze_network_error(self, error_message: str) -> Dict[str, Any]:
"""Analyser le type d'erreur réseau pour adapter la stratégie"""
error_lower = error_message.lower()
# Catégoriser l'erreur
if any(term in error_lower for term in ['timeout', 'timed out']):
category = 'timeout'
severity = 'medium'
recommendation = 'Increase timeout and retry'
elif any(term in error_lower for term in ['connection refused', 'connection failed']):
category = 'connection_refused'
severity = 'high'
recommendation = 'Service may be down, longer backoff recommended'
elif any(term in error_lower for term in ['dns', 'name resolution']):
category = 'dns_error'
severity = 'high'
recommendation = 'DNS issue, check network connectivity'
elif any(term in error_lower for term in ['ssl', 'certificate', 'tls']):
category = 'ssl_error'
severity = 'high'
recommendation = 'SSL/TLS issue, may need manual intervention'
elif any(term in error_lower for term in ['500', '502', '503', '504']):
category = 'server_error'
severity = 'medium'
recommendation = 'Server error, retry with backoff'
elif any(term in error_lower for term in ['401', '403']):
category = 'auth_error'
severity = 'high'
recommendation = 'Authentication issue, check credentials'
else:
category = 'unknown_network'
severity = 'medium'
recommendation = 'Generic network error, standard retry'
return {
'category': category,
'severity': severity,
'recommendation': recommendation,
'original_message': error_message
}
class DataNormalizationStrategy(BaseRecoveryStrategy):
"""
Stratégie de normalisation des données pour ValidationError
Normalise et convertit les données pour résoudre les erreurs de validation:
- Conversion de types
- Normalisation de formats
- Nettoyage de données
- Validation et correction automatique
"""
def __init__(self, max_attempts: int = 3):
super().__init__(max_attempts)
self.strategy_type = RecoveryStrategyType.DATA_NORMALIZATION
def can_handle(self, error_type: str, context: Dict[str, Any]) -> bool:
"""Peut gérer les erreurs de validation"""
validation_errors = [
"ValidationError", "ValueError", "TypeError", "FormatError",
"validation_error", "value_error", "type_error", "format_error"
]
return error_type in validation_errors
def recover(self, context: RecoveryContext) -> RecoveryResult:
"""
Récupération par normalisation des données
Stratégies appliquées dans l'ordre:
1. Conversion de types automatique
2. Normalisation de formats
3. Nettoyage et correction de données
"""
start_time = time.time()
try:
# Extraire les données à normaliser
invalid_data = context.original_data.get('invalid_data')
expected_type = context.original_data.get('expected_type')
field_name = context.original_data.get('field_name', 'unknown')
if invalid_data is None:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
"No invalid data provided",
"Cannot normalize data without input"
)
# Appliquer stratégie selon tentative
if context.attempt_number == 1:
normalized_data = self._type_conversion(invalid_data, expected_type)
message = f"Applied type conversion for field '{field_name}'"
elif context.attempt_number == 2:
normalized_data = self._format_normalization(invalid_data, expected_type)
message = f"Applied format normalization for field '{field_name}'"
elif context.attempt_number == 3:
normalized_data = self._data_cleaning(invalid_data, expected_type)
message = f"Applied data cleaning for field '{field_name}'"
else:
return RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Max normalization attempts reached ({self.max_attempts})",
"All data normalization strategies exhausted"
)
duration_ms = (time.time() - start_time) * 1000
recovery_data = {
'original_data': invalid_data,
'normalized_data': normalized_data,
'field_name': field_name,
'expected_type': expected_type,
'normalization_type': f'attempt_{context.attempt_number}'
}
result = RecoveryResult.success_with_retry(
self.strategy_type,
recovery_data,
message
)
result.duration_ms = duration_ms
self._log_recovery_attempt(context, result)
return result
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
logger.error(f"Data normalization strategy failed: {e}")
result = RecoveryResult.failure_with_escalation(
self.strategy_type,
f"Strategy execution error: {e}",
f"Data normalization failed: {e}"
)
result.duration_ms = duration_ms
return result
def _type_conversion(self, data: Any, expected_type: Optional[str]) -> Any:
"""Conversion de type automatique"""
if expected_type is None:
return data
try:
if expected_type == 'int':
if isinstance(data, str):
# Nettoyer les caractères non-numériques
cleaned = re.sub(r'[^\d.-]', '', data)
return int(float(cleaned)) if cleaned else 0
return int(data)
elif expected_type == 'float':
if isinstance(data, str):
cleaned = re.sub(r'[^\d.-]', '', data)
return float(cleaned) if cleaned else 0.0
return float(data)
elif expected_type == 'str':
return str(data)
elif expected_type == 'bool':
if isinstance(data, str):
return data.lower() in ['true', '1', 'yes', 'on', 'enabled']
return bool(data)
elif expected_type == 'datetime':
from datetime import datetime
if isinstance(data, str):
# Essayer plusieurs formats de date
formats = [
'%Y-%m-%d %H:%M:%S',
'%Y-%m-%d',
'%d/%m/%Y',
'%m/%d/%Y',
'%Y-%m-%dT%H:%M:%S'
]
for fmt in formats:
try:
return datetime.strptime(data, fmt)
except ValueError:
continue
return data
except (ValueError, TypeError) as e:
logger.warning(f"Type conversion failed: {e}")
return data
def _format_normalization(self, data: Any, expected_type: Optional[str]) -> Any:
"""Normalisation de format"""
if not isinstance(data, str):
return data
# Normalisation générale des chaînes
normalized = data.strip()
# Normalisation spécifique selon le type attendu
if expected_type == 'email':
normalized = normalized.lower()
elif expected_type == 'phone':
# Supprimer tous les caractères non-numériques sauf +
normalized = re.sub(r'[^\d+]', '', normalized)
elif expected_type == 'url':
if not normalized.startswith(('http://', 'https://')):
normalized = 'https://' + normalized
elif expected_type in ['bbox', 'coordinates']:
# Normaliser les coordonnées au format (x,y,w,h)
numbers = re.findall(r'-?\d+\.?\d*', normalized)
if len(numbers) >= 4:
normalized = f"({numbers[0]},{numbers[1]},{numbers[2]},{numbers[3]})"
return normalized
def _data_cleaning(self, data: Any, expected_type: Optional[str]) -> Any:
"""Nettoyage et correction de données"""
if isinstance(data, str):
# Nettoyage général
cleaned = data.strip()
# Supprimer caractères de contrôle
cleaned = re.sub(r'[\x00-\x1f\x7f-\x9f]', '', cleaned)
# Normaliser espaces multiples
cleaned = re.sub(r'\s+', ' ', cleaned)
# Corrections spécifiques
if expected_type == 'text':
# Corriger encodage commun
replacements = {
'’': "'",
'“': '"',
'â€': '"',
'…': '...'
}
for old, new in replacements.items():
cleaned = cleaned.replace(old, new)
return cleaned
return data
# Factory pour créer les stratégies
class RecoveryStrategyFactory:
"""Factory pour créer les stratégies de récupération appropriées"""
@staticmethod
def create_strategies() -> List[BaseRecoveryStrategy]:
"""Créer toutes les stratégies de récupération disponibles"""
return [
SpatialFallbackStrategy(),
SemanticVariantStrategy(),
RetryWithBackoffStrategy(),
DataNormalizationStrategy()
]
@staticmethod
def get_strategy_for_error(error_type: str, context: Dict[str, Any]) -> Optional[BaseRecoveryStrategy]:
"""Obtenir la stratégie appropriée pour un type d'erreur"""
strategies = RecoveryStrategyFactory.create_strategies()
for strategy in strategies:
if strategy.can_handle(error_type, context):
return strategy
return None
if __name__ == '__main__':
# Test des stratégies
logging.basicConfig(level=logging.INFO)
# Test SpatialFallbackStrategy
spatial_strategy = SpatialFallbackStrategy()
context = RecoveryContext(
error_type="TargetNotFoundError",
error_message="Target not found",
original_data={'target': {'bbox': {'x': 100, 'y': 100, 'width': 50, 'height': 20}}},
attempt_number=1,
max_attempts=3,
timestamp=datetime.now(),
additional_context={'screen_state': 'mock_state'}
)
result = spatial_strategy.recover(context)
print(f"Spatial strategy result: {result}")
# Test SemanticVariantStrategy
semantic_strategy = SemanticVariantStrategy()
context.error_type = "UIElementChangedError"
context.original_data = {'text_pattern': 'Submit Button'}
result = semantic_strategy.recover(context)
print(f"Semantic strategy result: {result}")