- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
719 lines
24 KiB
Python
719 lines
24 KiB
Python
"""
|
|
ExecutionRobustness - Robustesse d'exécution avec retry et récupération
|
|
|
|
Ce module ajoute:
|
|
- Retry avec backoff exponentiel
|
|
- Attente d'élément avec re-détection
|
|
- Récupération d'état après échec
|
|
- Gestion d'écran inconnu
|
|
- Diagnostics détaillés d'échec
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from typing import Optional, Dict, Any, Callable, List, Tuple
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Dataclasses
|
|
# =============================================================================
|
|
|
|
@dataclass
|
|
class RetryConfig:
|
|
"""Configuration des retries"""
|
|
max_retries: int = 3
|
|
base_delay_ms: float = 1000.0 # Délai de base en ms
|
|
max_delay_ms: float = 30000.0 # Délai max en ms
|
|
exponential_base: float = 2.0 # Base pour backoff exponentiel
|
|
jitter_factor: float = 0.1 # Facteur de jitter (0-1)
|
|
|
|
|
|
@dataclass
|
|
class WaitConfig:
|
|
"""Configuration de l'attente d'élément"""
|
|
timeout_ms: float = 10000.0 # Timeout total
|
|
poll_interval_ms: float = 500.0 # Intervalle de re-détection
|
|
min_confidence: float = 0.7 # Confiance minimum
|
|
|
|
|
|
@dataclass
|
|
class RecoveryConfig:
|
|
"""Configuration de la récupération"""
|
|
enable_state_recovery: bool = True
|
|
max_recovery_attempts: int = 3
|
|
recovery_timeout_ms: float = 30000.0
|
|
|
|
|
|
@dataclass
|
|
class RetryResult:
|
|
"""Résultat d'une opération avec retry"""
|
|
success: bool
|
|
attempts: int
|
|
total_delay_ms: float
|
|
last_error: Optional[Exception] = None
|
|
result: Any = None
|
|
delays_used: List[float] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class WaitResult:
|
|
"""Résultat d'une attente d'élément"""
|
|
found: bool
|
|
element: Any = None
|
|
confidence: float = 0.0
|
|
wait_time_ms: float = 0.0
|
|
detection_attempts: int = 0
|
|
|
|
|
|
@dataclass
|
|
class RecoveryResult:
|
|
"""Résultat d'une tentative de récupération"""
|
|
recovered: bool
|
|
new_node_id: Optional[str] = None
|
|
recovery_path: List[str] = field(default_factory=list)
|
|
message: str = ""
|
|
|
|
|
|
@dataclass
|
|
class FailureDiagnostics:
|
|
"""Diagnostics détaillés d'un échec"""
|
|
failure_type: str
|
|
timestamp: datetime
|
|
screenshot_path: Optional[str] = None
|
|
match_scores: Dict[str, float] = field(default_factory=dict)
|
|
attempted_strategies: List[str] = field(default_factory=list)
|
|
context: Dict[str, Any] = field(default_factory=dict)
|
|
recommendations: List[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"failure_type": self.failure_type,
|
|
"timestamp": self.timestamp.isoformat(),
|
|
"screenshot_path": self.screenshot_path,
|
|
"match_scores": self.match_scores,
|
|
"attempted_strategies": self.attempted_strategies,
|
|
"context": self.context,
|
|
"recommendations": self.recommendations
|
|
}
|
|
|
|
|
|
class FailureType(Enum):
|
|
"""Types d'échec"""
|
|
ACTION_FAILED = "action_failed"
|
|
ELEMENT_NOT_FOUND = "element_not_found"
|
|
STATE_MISMATCH = "state_mismatch"
|
|
UNKNOWN_SCREEN = "unknown_screen"
|
|
TIMEOUT = "timeout"
|
|
NETWORK_ERROR = "network_error"
|
|
|
|
|
|
# =============================================================================
|
|
# Gestionnaire de Retry
|
|
# =============================================================================
|
|
|
|
class RetryManager:
|
|
"""
|
|
Gestionnaire de retry avec backoff exponentiel.
|
|
|
|
Formule: delay = base_delay * (exponential_base ^ (attempt - 1))
|
|
|
|
Example:
|
|
>>> manager = RetryManager()
|
|
>>> result = manager.execute_with_retry(my_function, args)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[RetryConfig] = None):
|
|
"""
|
|
Initialiser le gestionnaire.
|
|
|
|
Args:
|
|
config: Configuration des retries
|
|
"""
|
|
self.config = config or RetryConfig()
|
|
logger.info(f"RetryManager initialisé (max_retries={self.config.max_retries})")
|
|
|
|
def execute_with_retry(
|
|
self,
|
|
func: Callable,
|
|
*args,
|
|
on_retry: Optional[Callable[[int, Exception], None]] = None,
|
|
**kwargs
|
|
) -> RetryResult:
|
|
"""
|
|
Exécuter une fonction avec retry et backoff exponentiel.
|
|
|
|
Args:
|
|
func: Fonction à exécuter
|
|
*args: Arguments positionnels
|
|
on_retry: Callback appelé à chaque retry
|
|
**kwargs: Arguments nommés
|
|
|
|
Returns:
|
|
RetryResult avec résultat ou erreur
|
|
"""
|
|
attempts = 0
|
|
total_delay = 0.0
|
|
delays_used = []
|
|
last_error = None
|
|
|
|
while attempts <= self.config.max_retries:
|
|
attempts += 1
|
|
|
|
try:
|
|
result = func(*args, **kwargs)
|
|
return RetryResult(
|
|
success=True,
|
|
attempts=attempts,
|
|
total_delay_ms=total_delay,
|
|
result=result,
|
|
delays_used=delays_used
|
|
)
|
|
|
|
except Exception as e:
|
|
last_error = e
|
|
logger.warning(f"Tentative {attempts} échouée: {e}")
|
|
|
|
if attempts > self.config.max_retries:
|
|
break
|
|
|
|
# Calculer délai avec backoff exponentiel
|
|
delay = self.compute_delay(attempts)
|
|
delays_used.append(delay)
|
|
total_delay += delay
|
|
|
|
# Callback de retry
|
|
if on_retry:
|
|
try:
|
|
on_retry(attempts, e)
|
|
except Exception as cb_error:
|
|
logger.warning(f"Erreur callback retry: {cb_error}")
|
|
|
|
# Attendre
|
|
time.sleep(delay / 1000.0)
|
|
|
|
return RetryResult(
|
|
success=False,
|
|
attempts=attempts,
|
|
total_delay_ms=total_delay,
|
|
last_error=last_error,
|
|
delays_used=delays_used
|
|
)
|
|
|
|
def compute_delay(self, attempt: int) -> float:
|
|
"""
|
|
Calculer le délai pour une tentative donnée.
|
|
|
|
Formule: base_delay * (exponential_base ^ (attempt - 1)) + jitter
|
|
|
|
Args:
|
|
attempt: Numéro de tentative (1-based)
|
|
|
|
Returns:
|
|
Délai en millisecondes
|
|
"""
|
|
# Backoff exponentiel
|
|
delay = self.config.base_delay_ms * (
|
|
self.config.exponential_base ** (attempt - 1)
|
|
)
|
|
|
|
# Appliquer jitter
|
|
import random
|
|
jitter = delay * self.config.jitter_factor * random.random()
|
|
delay += jitter
|
|
|
|
# Plafonner au max
|
|
delay = min(delay, self.config.max_delay_ms)
|
|
|
|
return delay
|
|
|
|
def get_expected_delays(self) -> List[float]:
|
|
"""
|
|
Obtenir les délais attendus pour chaque tentative.
|
|
|
|
Returns:
|
|
Liste des délais en ms
|
|
"""
|
|
delays = []
|
|
for attempt in range(1, self.config.max_retries + 1):
|
|
delay = self.config.base_delay_ms * (
|
|
self.config.exponential_base ** (attempt - 1)
|
|
)
|
|
delay = min(delay, self.config.max_delay_ms)
|
|
delays.append(delay)
|
|
return delays
|
|
|
|
|
|
# =============================================================================
|
|
# Gestionnaire d'Attente d'Élément
|
|
# =============================================================================
|
|
|
|
class ElementWaiter:
|
|
"""
|
|
Gestionnaire d'attente d'élément avec re-détection périodique.
|
|
|
|
Example:
|
|
>>> waiter = ElementWaiter()
|
|
>>> result = waiter.wait_for_element(detector, target_spec)
|
|
"""
|
|
|
|
def __init__(self, config: Optional[WaitConfig] = None):
|
|
"""
|
|
Initialiser le gestionnaire.
|
|
|
|
Args:
|
|
config: Configuration de l'attente
|
|
"""
|
|
self.config = config or WaitConfig()
|
|
logger.info(f"ElementWaiter initialisé (timeout={self.config.timeout_ms}ms)")
|
|
|
|
def wait_for_element(
|
|
self,
|
|
detect_func: Callable[[], Optional[Any]],
|
|
confidence_func: Optional[Callable[[Any], float]] = None,
|
|
on_poll: Optional[Callable[[int], None]] = None
|
|
) -> WaitResult:
|
|
"""
|
|
Attendre qu'un élément soit détecté.
|
|
|
|
Args:
|
|
detect_func: Fonction de détection (retourne élément ou None)
|
|
confidence_func: Fonction pour obtenir la confiance
|
|
on_poll: Callback à chaque tentative de détection
|
|
|
|
Returns:
|
|
WaitResult avec élément trouvé ou timeout
|
|
"""
|
|
start_time = time.time()
|
|
attempts = 0
|
|
|
|
while True:
|
|
attempts += 1
|
|
elapsed_ms = (time.time() - start_time) * 1000
|
|
|
|
# Vérifier timeout
|
|
if elapsed_ms >= self.config.timeout_ms:
|
|
return WaitResult(
|
|
found=False,
|
|
wait_time_ms=elapsed_ms,
|
|
detection_attempts=attempts
|
|
)
|
|
|
|
# Callback de poll
|
|
if on_poll:
|
|
try:
|
|
on_poll(attempts)
|
|
except Exception as e:
|
|
logger.warning(f"Erreur callback poll: {e}")
|
|
|
|
# Tenter détection
|
|
try:
|
|
element = detect_func()
|
|
|
|
if element is not None:
|
|
# Vérifier confiance si fonction fournie
|
|
confidence = 1.0
|
|
if confidence_func:
|
|
confidence = confidence_func(element)
|
|
|
|
if confidence >= self.config.min_confidence:
|
|
return WaitResult(
|
|
found=True,
|
|
element=element,
|
|
confidence=confidence,
|
|
wait_time_ms=elapsed_ms,
|
|
detection_attempts=attempts
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.debug(f"Erreur détection (tentative {attempts}): {e}")
|
|
|
|
# Attendre avant prochaine tentative
|
|
time.sleep(self.config.poll_interval_ms / 1000.0)
|
|
|
|
|
|
# =============================================================================
|
|
# Gestionnaire de Récupération d'État
|
|
# =============================================================================
|
|
|
|
class StateRecoveryManager:
|
|
"""
|
|
Gestionnaire de récupération d'état après échec.
|
|
|
|
Tente de re-matcher l'état actuel vers le graphe de workflow
|
|
et trouve un chemin de récupération si possible.
|
|
|
|
Example:
|
|
>>> recovery = StateRecoveryManager(pipeline)
|
|
>>> result = recovery.attempt_recovery(workflow_id, screenshot)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pipeline: Any,
|
|
config: Optional[RecoveryConfig] = None
|
|
):
|
|
"""
|
|
Initialiser le gestionnaire.
|
|
|
|
Args:
|
|
pipeline: WorkflowPipeline pour matching
|
|
config: Configuration de récupération
|
|
"""
|
|
self.pipeline = pipeline
|
|
self.config = config or RecoveryConfig()
|
|
logger.info("StateRecoveryManager initialisé")
|
|
|
|
def attempt_recovery(
|
|
self,
|
|
workflow_id: str,
|
|
screenshot_path: str,
|
|
expected_node_id: Optional[str] = None
|
|
) -> RecoveryResult:
|
|
"""
|
|
Tenter de récupérer après un échec.
|
|
|
|
Args:
|
|
workflow_id: ID du workflow
|
|
screenshot_path: Chemin du screenshot actuel
|
|
expected_node_id: Node attendu (optionnel)
|
|
|
|
Returns:
|
|
RecoveryResult avec nouveau node ou échec
|
|
"""
|
|
if not self.config.enable_state_recovery:
|
|
return RecoveryResult(
|
|
recovered=False,
|
|
message="State recovery disabled"
|
|
)
|
|
|
|
logger.info(f"Tentative de récupération pour workflow {workflow_id}")
|
|
|
|
for attempt in range(self.config.max_recovery_attempts):
|
|
try:
|
|
# Re-matcher l'état actuel
|
|
match = self.pipeline.match_current_state(
|
|
screenshot_path,
|
|
workflow_id=workflow_id
|
|
)
|
|
|
|
if match and match.get("confidence", 0) > 0.5:
|
|
new_node_id = match["node_id"]
|
|
|
|
# Vérifier si c'est un node valide
|
|
workflow = self.pipeline.load_workflow(workflow_id)
|
|
if workflow and any(n.node_id == new_node_id for n in workflow.nodes):
|
|
|
|
# Trouver chemin de récupération si node différent
|
|
recovery_path = []
|
|
if expected_node_id and new_node_id != expected_node_id:
|
|
recovery_path = self._find_recovery_path(
|
|
workflow, new_node_id, expected_node_id
|
|
)
|
|
|
|
return RecoveryResult(
|
|
recovered=True,
|
|
new_node_id=new_node_id,
|
|
recovery_path=recovery_path,
|
|
message=f"Récupéré vers node {new_node_id}"
|
|
)
|
|
|
|
# Attendre avant prochaine tentative
|
|
time.sleep(1.0)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Erreur récupération (tentative {attempt + 1}): {e}")
|
|
|
|
return RecoveryResult(
|
|
recovered=False,
|
|
message="Échec de récupération après toutes les tentatives"
|
|
)
|
|
|
|
def _find_recovery_path(
|
|
self,
|
|
workflow: Any,
|
|
from_node: str,
|
|
to_node: str
|
|
) -> List[str]:
|
|
"""Trouver un chemin entre deux nodes (BFS)."""
|
|
from collections import deque
|
|
|
|
edges = getattr(workflow, 'edges', [])
|
|
|
|
# Construire graphe d'adjacence
|
|
adjacency = {}
|
|
for edge in edges:
|
|
if edge.from_node not in adjacency:
|
|
adjacency[edge.from_node] = []
|
|
adjacency[edge.from_node].append(edge.to_node)
|
|
|
|
# BFS
|
|
queue = deque([(from_node, [from_node])])
|
|
visited = {from_node}
|
|
|
|
while queue:
|
|
current, path = queue.popleft()
|
|
|
|
if current == to_node:
|
|
return path
|
|
|
|
for neighbor in adjacency.get(current, []):
|
|
if neighbor not in visited:
|
|
visited.add(neighbor)
|
|
queue.append((neighbor, path + [neighbor]))
|
|
|
|
return [] # Pas de chemin trouvé
|
|
|
|
|
|
# =============================================================================
|
|
# Gestionnaire de Diagnostics
|
|
# =============================================================================
|
|
|
|
class DiagnosticsManager:
|
|
"""
|
|
Gestionnaire de diagnostics détaillés d'échec.
|
|
|
|
Collecte et enregistre les informations de diagnostic
|
|
pour faciliter le débogage.
|
|
|
|
Example:
|
|
>>> diagnostics = DiagnosticsManager()
|
|
>>> report = diagnostics.create_failure_report(...)
|
|
"""
|
|
|
|
def __init__(self, logs_dir: str = "data/diagnostics"):
|
|
"""
|
|
Initialiser le gestionnaire.
|
|
|
|
Args:
|
|
logs_dir: Répertoire pour les logs de diagnostic
|
|
"""
|
|
self.logs_dir = Path(logs_dir)
|
|
self.logs_dir.mkdir(parents=True, exist_ok=True)
|
|
self._failure_history: List[FailureDiagnostics] = []
|
|
logger.info(f"DiagnosticsManager initialisé: {logs_dir}")
|
|
|
|
def create_failure_report(
|
|
self,
|
|
failure_type: FailureType,
|
|
screenshot_path: Optional[str] = None,
|
|
match_scores: Optional[Dict[str, float]] = None,
|
|
attempted_strategies: Optional[List[str]] = None,
|
|
context: Optional[Dict[str, Any]] = None
|
|
) -> FailureDiagnostics:
|
|
"""
|
|
Créer un rapport de diagnostic d'échec.
|
|
|
|
Args:
|
|
failure_type: Type d'échec
|
|
screenshot_path: Chemin du screenshot
|
|
match_scores: Scores de matching par node
|
|
attempted_strategies: Stratégies tentées
|
|
context: Contexte additionnel
|
|
|
|
Returns:
|
|
FailureDiagnostics avec recommandations
|
|
"""
|
|
# Générer recommandations basées sur le type d'échec
|
|
recommendations = self._generate_recommendations(
|
|
failure_type, match_scores, context
|
|
)
|
|
|
|
diagnostics = FailureDiagnostics(
|
|
failure_type=failure_type.value,
|
|
timestamp=datetime.now(),
|
|
screenshot_path=screenshot_path,
|
|
match_scores=match_scores or {},
|
|
attempted_strategies=attempted_strategies or [],
|
|
context=context or {},
|
|
recommendations=recommendations
|
|
)
|
|
|
|
# Enregistrer dans l'historique
|
|
self._failure_history.append(diagnostics)
|
|
|
|
# Sauvegarder sur disque
|
|
self._save_diagnostics(diagnostics)
|
|
|
|
logger.info(f"Diagnostic créé: {failure_type.value}")
|
|
return diagnostics
|
|
|
|
def _generate_recommendations(
|
|
self,
|
|
failure_type: FailureType,
|
|
match_scores: Optional[Dict[str, float]],
|
|
context: Optional[Dict[str, Any]]
|
|
) -> List[str]:
|
|
"""Générer des recommandations basées sur l'échec."""
|
|
recommendations = []
|
|
|
|
if failure_type == FailureType.ELEMENT_NOT_FOUND:
|
|
recommendations.append("Vérifier que l'élément cible est visible à l'écran")
|
|
recommendations.append("Augmenter le timeout d'attente")
|
|
recommendations.append("Vérifier les sélecteurs de l'élément")
|
|
|
|
elif failure_type == FailureType.STATE_MISMATCH:
|
|
recommendations.append("L'écran actuel ne correspond pas à l'état attendu")
|
|
if match_scores:
|
|
best_match = max(match_scores.items(), key=lambda x: x[1])
|
|
recommendations.append(f"Meilleur match: {best_match[0]} ({best_match[1]:.2%})")
|
|
recommendations.append("Considérer l'ajout d'une variante pour ce nouvel état")
|
|
|
|
elif failure_type == FailureType.UNKNOWN_SCREEN:
|
|
recommendations.append("Écran non reconnu dans le workflow")
|
|
recommendations.append("Vérifier si une popup ou modal bloque l'écran")
|
|
recommendations.append("Considérer l'entraînement avec ce nouvel écran")
|
|
|
|
elif failure_type == FailureType.ACTION_FAILED:
|
|
recommendations.append("L'action n'a pas pu être exécutée")
|
|
recommendations.append("Vérifier que l'élément cible est cliquable")
|
|
recommendations.append("Vérifier les permissions de l'application")
|
|
|
|
elif failure_type == FailureType.TIMEOUT:
|
|
recommendations.append("Opération expirée")
|
|
recommendations.append("Augmenter les timeouts de configuration")
|
|
recommendations.append("Vérifier la réactivité de l'application")
|
|
|
|
return recommendations
|
|
|
|
def _save_diagnostics(self, diagnostics: FailureDiagnostics) -> None:
|
|
"""Sauvegarder les diagnostics sur disque."""
|
|
import json
|
|
|
|
filename = f"failure_{diagnostics.timestamp.strftime('%Y%m%d_%H%M%S')}.json"
|
|
filepath = self.logs_dir / filename
|
|
|
|
with open(filepath, 'w') as f:
|
|
json.dump(diagnostics.to_dict(), f, indent=2)
|
|
|
|
def get_failure_history(self) -> List[FailureDiagnostics]:
|
|
"""Obtenir l'historique des échecs."""
|
|
return self._failure_history
|
|
|
|
def get_failure_stats(self) -> Dict[str, int]:
|
|
"""Obtenir les statistiques d'échec par type."""
|
|
stats = {}
|
|
for diag in self._failure_history:
|
|
stats[diag.failure_type] = stats.get(diag.failure_type, 0) + 1
|
|
return stats
|
|
|
|
|
|
# =============================================================================
|
|
# Classe principale de robustesse
|
|
# =============================================================================
|
|
|
|
class ExecutionRobustness:
|
|
"""
|
|
Classe principale regroupant toutes les fonctionnalités de robustesse.
|
|
|
|
Example:
|
|
>>> robustness = ExecutionRobustness(pipeline)
|
|
>>> result = robustness.execute_with_robustness(action_func)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pipeline: Any,
|
|
retry_config: Optional[RetryConfig] = None,
|
|
wait_config: Optional[WaitConfig] = None,
|
|
recovery_config: Optional[RecoveryConfig] = None
|
|
):
|
|
"""
|
|
Initialiser la robustesse d'exécution.
|
|
|
|
Args:
|
|
pipeline: WorkflowPipeline
|
|
retry_config: Configuration des retries
|
|
wait_config: Configuration de l'attente
|
|
recovery_config: Configuration de la récupération
|
|
"""
|
|
self.pipeline = pipeline
|
|
self.retry_manager = RetryManager(retry_config)
|
|
self.element_waiter = ElementWaiter(wait_config)
|
|
self.state_recovery = StateRecoveryManager(pipeline, recovery_config)
|
|
self.diagnostics = DiagnosticsManager()
|
|
|
|
logger.info("ExecutionRobustness initialisé")
|
|
|
|
def execute_with_robustness(
|
|
self,
|
|
action_func: Callable,
|
|
workflow_id: str,
|
|
screenshot_path: str,
|
|
*args,
|
|
**kwargs
|
|
) -> Tuple[bool, Any, Optional[FailureDiagnostics]]:
|
|
"""
|
|
Exécuter une action avec toutes les protections de robustesse.
|
|
|
|
Args:
|
|
action_func: Fonction d'action à exécuter
|
|
workflow_id: ID du workflow
|
|
screenshot_path: Chemin du screenshot actuel
|
|
*args, **kwargs: Arguments pour action_func
|
|
|
|
Returns:
|
|
Tuple (success, result, diagnostics)
|
|
"""
|
|
# Tentative avec retry
|
|
retry_result = self.retry_manager.execute_with_retry(
|
|
action_func, *args, **kwargs
|
|
)
|
|
|
|
if retry_result.success:
|
|
return True, retry_result.result, None
|
|
|
|
# Échec - créer diagnostics
|
|
diagnostics = self.diagnostics.create_failure_report(
|
|
failure_type=FailureType.ACTION_FAILED,
|
|
screenshot_path=screenshot_path,
|
|
context={
|
|
"attempts": retry_result.attempts,
|
|
"total_delay_ms": retry_result.total_delay_ms,
|
|
"error": str(retry_result.last_error)
|
|
}
|
|
)
|
|
|
|
# Tenter récupération
|
|
recovery_result = self.state_recovery.attempt_recovery(
|
|
workflow_id, screenshot_path
|
|
)
|
|
|
|
if recovery_result.recovered:
|
|
logger.info(f"Récupération réussie: {recovery_result.new_node_id}")
|
|
return False, recovery_result, diagnostics
|
|
|
|
return False, None, diagnostics
|
|
|
|
|
|
# =============================================================================
|
|
# Fonctions utilitaires
|
|
# =============================================================================
|
|
|
|
def create_robustness(
|
|
pipeline: Any,
|
|
max_retries: int = 3,
|
|
base_delay_ms: float = 1000.0
|
|
) -> ExecutionRobustness:
|
|
"""
|
|
Créer une instance de robustesse avec configuration personnalisée.
|
|
|
|
Args:
|
|
pipeline: WorkflowPipeline
|
|
max_retries: Nombre max de retries
|
|
base_delay_ms: Délai de base
|
|
|
|
Returns:
|
|
ExecutionRobustness configuré
|
|
"""
|
|
retry_config = RetryConfig(
|
|
max_retries=max_retries,
|
|
base_delay_ms=base_delay_ms
|
|
)
|
|
return ExecutionRobustness(pipeline, retry_config=retry_config)
|