fix(agent_chat): Corriger intégration exécution réelle

- Importer les vraies classes Action, TargetSpec, WorkflowEdge, ActionType
- Convertir le type d'action en ActionType Enum au lieu de string
- Créer un ScreenState complet avec tous les niveaux (raw, perception, context)
- Corriger _serialize_state dans error_handler.py pour accès compatibles
- Ajouter import os pour manipulation des fichiers
- Sauvegarder les screenshots dans data/temp/

L'exécution réelle fonctionne maintenant - les erreurs "Target not found"
sont attendues car il faut une vraie interface utilisateur à l'écran.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-16 17:43:30 +01:00
parent 87f2671920
commit 152431803e
2 changed files with 969 additions and 49 deletions

View File

@@ -22,6 +22,7 @@ Auteur: Dom - Janvier 2026
import asyncio
import json
import logging
import os
import sys
from pathlib import Path
from datetime import datetime
@@ -517,6 +518,12 @@ def execute_workflow_from_confirmation(confirmation, session_id):
# Utiliser les paramètres confirmés (ou modifiés)
params = confirmation.modified_parameters or confirmation.parameters
# IMPORTANT: Marquer l'exécution comme active
execution_status["running"] = True
execution_status["workflow"] = confirmation.workflow_name
execution_status["progress"] = 0
execution_status["message"] = "Démarrage..."
execute_workflow(match, params)
@@ -704,31 +711,70 @@ def execute_workflow(match, params):
def _execute_workflow_real(workflow_data, edges, total_steps, params):
"""Exécution réelle avec ActionExecutor."""
import time
from dataclasses import dataclass
import tempfile
import os
from PIL import Image
# Capturer l'écran initial
update_progress(20, "Capture écran initial...", 2, total_steps + 2)
try:
screenshot_path = screen_capturer.capture()
screenshot_array = screen_capturer.capture()
if screenshot_array is None:
raise Exception("Capture retourne None")
# Sauvegarder le screenshot dans un fichier temporaire
temp_dir = Path("data/temp")
temp_dir.mkdir(parents=True, exist_ok=True)
screenshot_path = str(temp_dir / f"capture_{datetime.now().strftime('%H%M%S')}.png")
# Convertir numpy array en image et sauvegarder
img = Image.fromarray(screenshot_array)
img.save(screenshot_path)
logger.info(f"📸 Screenshot capturé: {screenshot_path}")
except Exception as e:
logger.warning(f"Capture écran échouée: {e}, utilisation mode dégradé")
_execute_workflow_simulated(edges, total_steps)
return
# Créer le ScreenState pour l'exécution
# Créer le ScreenState complet pour l'exécution
try:
from core.models import ScreenState
screen_state = ScreenState.from_screenshot(screenshot_path)
from core.models.screen_state import (
ScreenState, WindowContext, RawLevel, PerceptionLevel,
ContextLevel, EmbeddingRef
)
file_size = os.path.getsize(screenshot_path) if os.path.exists(screenshot_path) else 0
screen_state = ScreenState(
screen_state_id=f"agent_{datetime.now().strftime('%Y%m%d_%H%M%S')}",
timestamp=datetime.now(),
session_id="agent_chat",
window=WindowContext(
app_name="unknown",
window_title="Unknown",
screen_resolution=[1920, 1080],
workspace="main"
),
raw=RawLevel(
screenshot_path=screenshot_path,
capture_method="agent_chat",
file_size_bytes=file_size
),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="", vector_id="", dimensions=512),
detected_text=[],
text_detection_method="none",
confidence_avg=0.0
),
context=ContextLevel(),
ui_elements=[]
)
logger.info("✓ ScreenState complet créé")
except Exception as e:
logger.warning(f"Création ScreenState échouée: {e}")
# Créer un ScreenState minimal
screen_state = type('ScreenState', (), {
'screenshot_path': screenshot_path,
'detected_elements': [],
'timestamp': datetime.now()
})()
logger.warning(f"Création ScreenState échouée: {e}, utilisation mode simulé")
_execute_workflow_simulated(edges, total_steps)
return
# Exécuter chaque edge avec ActionExecutor
success_count = 0
@@ -762,9 +808,14 @@ def _execute_workflow_real(workflow_data, edges, total_steps, params):
# Recapturer l'écran après chaque action réussie
try:
time.sleep(0.3) # Petit délai pour laisser l'UI se mettre à jour
screenshot_path = screen_capturer.capture()
screen_state = ScreenState.from_screenshot(screenshot_path)
except:
new_screenshot = screen_capturer.capture()
if new_screenshot is not None:
new_path = str(temp_dir / f"capture_{datetime.now().strftime('%H%M%S_%f')}.png")
Image.fromarray(new_screenshot).save(new_path)
screen_state.raw.screenshot_path = new_path
screen_state.raw.file_size_bytes = os.path.getsize(new_path)
except Exception as recapture_err:
logger.debug(f"Recapture échouée: {recapture_err}")
pass # Continuer même si la recapture échoue
else:
fail_count += 1
@@ -812,46 +863,104 @@ def _create_workflow_edge(edge_dict, params):
from dataclasses import dataclass, field
from typing import Optional, Dict, Any, List
@dataclass
class Action:
type: str
target: Optional[Dict] = None
value: Optional[str] = None
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class WorkflowEdge:
id: str
source: str
target: str
action: Action
pre_conditions: List[Dict] = field(default_factory=list)
post_conditions: List[Dict] = field(default_factory=list)
# Importer les vraies classes du système
try:
from core.models.workflow_graph import Action, TargetSpec, WorkflowEdge, EdgeConstraints, PostConditions, ActionType
USE_REAL_CLASSES = True
except ImportError:
USE_REAL_CLASSES = False
action_dict = edge_dict.get("action", {})
target_dict = action_dict.get("target", {})
# Substituer les paramètres dans l'action
action_value = action_dict.get("value", "")
if action_value and isinstance(action_value, str):
# Substituer les paramètres dans le texte cible
target_text = target_dict.get("text", "")
if target_text and isinstance(target_text, str):
for key, val in params.items():
action_value = action_value.replace(f"${{{key}}}", str(val))
action_value = action_value.replace(f"${key}", str(val))
target_text = target_text.replace(f"{{{{{key}}}}}", str(val)) # {{key}}
target_text = target_text.replace(f"${{{key}}}", str(val)) # ${key}
target_text = target_text.replace(f"${key}", str(val)) # $key
action = Action(
type=action_dict.get("type", "unknown"),
target=action_dict.get("target"),
value=action_value,
parameters=action_dict.get("parameters", {})
)
if USE_REAL_CLASSES:
# Créer le TargetSpec avec les vraies classes
target_spec = TargetSpec(
by_role=target_dict.get("role"),
by_text=target_text if target_text else target_dict.get("text"),
by_position=target_dict.get("position"),
)
return WorkflowEdge(
id=edge_dict.get("id", f"edge_{id(edge_dict)}"),
source=edge_dict.get("source", ""),
target=edge_dict.get("target", ""),
action=action,
pre_conditions=edge_dict.get("pre_conditions", []),
post_conditions=edge_dict.get("post_conditions", [])
)
# Convertir le type d'action en ActionType Enum
action_type_str = action_dict.get("type", "mouse_click")
try:
action_type = ActionType(action_type_str)
except ValueError:
# Fallback si le type n'existe pas dans l'enum
action_type = ActionType.MOUSE_CLICK
# Créer l'Action
action = Action(
type=action_type,
target=target_spec,
parameters=action_dict.get("parameters", {})
)
# Créer le WorkflowEdge
return WorkflowEdge(
edge_id=edge_dict.get("edge_id", edge_dict.get("id", f"edge_{id(edge_dict)}")),
from_node=edge_dict.get("source", edge_dict.get("from_node", "")),
to_node=edge_dict.get("target", edge_dict.get("to_node", "")),
action=action,
constraints=EdgeConstraints(),
post_conditions=PostConditions()
)
else:
# Fallback: classes simples
@dataclass
class SimpleTargetSpec:
by_role: Optional[str] = None
by_text: Optional[str] = None
by_position: Optional[tuple] = None
selection_policy: str = "first"
fallback_strategy: str = "visual_similarity"
embedding_ref: Optional[Any] = None
context_hints: Dict[str, Any] = field(default_factory=dict)
hard_constraints: Dict[str, Any] = field(default_factory=dict)
weights: Dict[str, float] = field(default_factory=dict)
@dataclass
class SimpleAction:
type: str
target: SimpleTargetSpec
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SimpleWorkflowEdge:
edge_id: str
from_node: str
to_node: str
action: SimpleAction
constraints: Any = None
post_conditions: Any = None
target_spec = SimpleTargetSpec(
by_role=target_dict.get("role"),
by_text=target_text if target_text else target_dict.get("text"),
)
action = SimpleAction(
type=action_dict.get("type", "unknown"),
target=target_spec,
parameters=action_dict.get("parameters", {})
)
return SimpleWorkflowEdge(
edge_id=edge_dict.get("edge_id", edge_dict.get("id", f"edge_{id(edge_dict)}")),
from_node=edge_dict.get("source", edge_dict.get("from_node", "")),
to_node=edge_dict.get("target", edge_dict.get("to_node", "")),
action=action,
constraints=None,
post_conditions=None
)
def update_progress(progress: int, message: str, current: int, total: int):

View File

@@ -0,0 +1,811 @@
"""
Error Handler - Gestion centralisée des erreurs et stratégies de récupération
Fonctionnalités:
- Gestion d'échecs de matching
- Stratégies de fallback pour détection UI
- Gestion de violations de post-conditions
- Détection de changements d'UI
- Système de rollback
- Logging détaillé et suggestions
- Mapping automatique des erreurs aux stratégies
- Escalade après échec des récupérations
"""
import logging
import json
import time
import uuid
from pathlib import Path
from datetime import datetime
from typing import Optional, Dict, Any, List, Tuple, Type
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
from ..models.screen_state import ScreenState
from ..models.workflow_graph import WorkflowNode, WorkflowEdge, Action, ActionType
from .recovery_strategies import (
BaseRecoveryStrategy, RecoveryStrategyFactory, RecoveryContext,
RecoveryResult as StrategyRecoveryResult, RecoveryStrategyType
)
logger = logging.getLogger(__name__)
class ErrorType(Enum):
"""Types d'erreurs gérées"""
MATCHING_FAILED = "matching_failed"
TARGET_NOT_FOUND = "target_not_found"
POSTCONDITION_FAILED = "postcondition_failed"
POSTCONDITION_VIOLATION = "postcondition_violation"
UI_CHANGED = "ui_changed"
UI_CHANGE_DETECTED = "ui_change_detected"
EXECUTION_TIMEOUT = "execution_timeout"
NETWORK_ERROR = "network_error"
VALIDATION_ERROR = "validation_error"
UNKNOWN = "unknown"
class RecoveryStrategy(Enum):
"""Stratégies de récupération"""
RETRY = "retry"
FALLBACK = "fallback"
SKIP = "skip"
ROLLBACK = "rollback"
PAUSE = "pause"
ABORT = "abort"
# Mapping des types d'erreurs Python vers nos types d'erreurs
ERROR_TYPE_MAPPING = {
'TargetNotFoundError': ErrorType.TARGET_NOT_FOUND,
'UIElementChangedError': ErrorType.UI_CHANGED,
'NetworkError': ErrorType.NETWORK_ERROR,
'ConnectionError': ErrorType.NETWORK_ERROR,
'TimeoutError': ErrorType.NETWORK_ERROR,
'HTTPError': ErrorType.NETWORK_ERROR,
'ValidationError': ErrorType.VALIDATION_ERROR,
'ValueError': ErrorType.VALIDATION_ERROR,
'TypeError': ErrorType.VALIDATION_ERROR,
'FormatError': ErrorType.VALIDATION_ERROR,
}
@dataclass
class ErrorContext:
"""Contexte d'une erreur"""
error_type: ErrorType
timestamp: datetime
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
screen_state: Optional[ScreenState] = None
workflow_node: Optional[WorkflowNode] = None
workflow_edge: Optional[WorkflowEdge] = None
action: Optional[Action] = None
message: str = ""
details: Dict[str, Any] = field(default_factory=dict)
recovery_attempts: int = 0
max_recovery_attempts: int = 3
original_exception: Optional[Exception] = None
@dataclass
class RecoveryResult:
"""Résultat d'une tentative de récupération"""
success: bool
strategy_used: RecoveryStrategy
message: str
correlation_id: str = field(default_factory=lambda: str(uuid.uuid4()))
new_state: Optional[ScreenState] = None
duration_ms: float = 0
should_retry: bool = False
escalation_reason: Optional[str] = None
recovery_data: Dict[str, Any] = field(default_factory=dict)
@classmethod
def retry(cls, strategy: RecoveryStrategy, message: str, data: Dict[str, Any] = None) -> 'RecoveryResult':
"""Créer un résultat avec retry recommandé"""
return cls(
success=True,
should_retry=True,
strategy_used=strategy,
message=message,
recovery_data=data or {}
)
@classmethod
def escalate(cls, strategy: RecoveryStrategy, reason: str, message: str) -> 'RecoveryResult':
"""Créer un résultat avec escalade"""
return cls(
success=False,
should_retry=False,
strategy_used=strategy,
message=message,
escalation_reason=reason
)
class ErrorHandler:
"""
Gestionnaire centralisé des erreurs avec stratégies de récupération.
Features:
- Mapping automatique des erreurs aux stratégies
- Logging détaillé des erreurs avec correlation IDs
- Escalade après échec des récupérations
- Stratégies de fallback intelligentes
- Détection de changements UI
- Système de rollback
- Historique des erreurs
"""
def __init__(
self,
error_log_dir: str = "data/errors",
max_retry_attempts: int = 3,
ui_change_threshold: float = 0.70,
enable_auto_recovery: bool = True
):
"""
Initialize ErrorHandler.
Args:
error_log_dir: Répertoire pour logs d'erreurs
max_retry_attempts: Nombre max de tentatives de récupération
ui_change_threshold: Seuil de détection de changement UI
enable_auto_recovery: Activer récupération automatique
"""
self.error_log_dir = Path(error_log_dir)
self.error_log_dir.mkdir(parents=True, exist_ok=True)
self.max_retry_attempts = max_retry_attempts
self.ui_change_threshold = ui_change_threshold
self.enable_auto_recovery = enable_auto_recovery
# Historique des erreurs
self.error_history: List[ErrorContext] = []
# Compteurs d'échecs par edge
self.edge_failure_counts: Dict[str, int] = {}
self.problematic_edges: set = set()
# Historique des actions pour rollback
self.action_history: List[Tuple[Action, ScreenState]] = []
self.max_action_history = 10
# Initialiser les stratégies de récupération
self.recovery_strategies = RecoveryStrategyFactory.create_strategies()
self.strategy_mapping = self._build_strategy_mapping()
logger.info(
f"ErrorHandler initialized "
f"(max_retry={max_retry_attempts}, ui_threshold={ui_change_threshold}, "
f"strategies={len(self.recovery_strategies)})"
)
def _build_strategy_mapping(self) -> Dict[ErrorType, BaseRecoveryStrategy]:
"""Construire le mapping des types d'erreurs aux stratégies"""
mapping = {}
for strategy in self.recovery_strategies:
# Tester chaque stratégie contre chaque type d'erreur
for error_type in ErrorType:
if strategy.can_handle(error_type.value, {}):
mapping[error_type] = strategy
logger.debug(f"Mapped {error_type.value} to {strategy.__class__.__name__}")
return mapping
def handle_error(self, error: Exception, context: Dict[str, Any] = None) -> RecoveryResult:
"""
Point d'entrée principal pour gérer une erreur avec stratégie de récupération
Args:
error: Exception à gérer
context: Contexte additionnel (screen_state, workflow_node, etc.)
Returns:
RecoveryResult avec stratégie recommandée
Validates: Requirements 2.1, 2.4, 2.5
"""
start_time = time.time()
context = context or {}
# Déterminer le type d'erreur
error_type = self._classify_error(error)
# Créer le contexte d'erreur
error_ctx = ErrorContext(
error_type=error_type,
timestamp=datetime.now(),
message=str(error),
original_exception=error,
screen_state=context.get('screen_state'),
workflow_node=context.get('workflow_node'),
workflow_edge=context.get('workflow_edge'),
action=context.get('action'),
details=context.get('details', {}),
recovery_attempts=context.get('recovery_attempts', 0),
max_recovery_attempts=self.max_retry_attempts
)
# Ajouter à l'historique
self.error_history.append(error_ctx)
# Logger l'erreur avec correlation ID
error_id = self._log_error_with_correlation(error_ctx)
try:
# Obtenir la stratégie appropriée
strategy = self._get_recovery_strategy(error_type, context)
if not strategy:
return self._escalate_error(error_ctx, "No recovery strategy available")
# Appliquer la stratégie de récupération
recovery_result = self._apply_recovery_strategy(strategy, error_ctx, context)
# Logger la tentative de récupération
self._log_recovery_attempt(error_ctx, recovery_result)
# Calculer durée
duration_ms = (time.time() - start_time) * 1000
recovery_result.duration_ms = duration_ms
recovery_result.correlation_id = error_ctx.correlation_id
return recovery_result
except Exception as recovery_error:
logger.error(f"Recovery strategy failed: {recovery_error}")
return self._escalate_error(
error_ctx,
f"Recovery strategy execution failed: {recovery_error}"
)
def _classify_error(self, error: Exception) -> ErrorType:
"""Classifier une exception vers un ErrorType"""
error_class_name = error.__class__.__name__
# Vérifier le mapping direct
if error_class_name in ERROR_TYPE_MAPPING:
return ERROR_TYPE_MAPPING[error_class_name]
# Vérifier par message d'erreur
error_message = str(error).lower()
if any(term in error_message for term in ['target not found', 'element not found']):
return ErrorType.TARGET_NOT_FOUND
elif any(term in error_message for term in ['ui changed', 'element changed']):
return ErrorType.UI_CHANGED
elif any(term in error_message for term in ['network', 'connection', 'timeout']):
return ErrorType.NETWORK_ERROR
elif any(term in error_message for term in ['validation', 'invalid', 'format']):
return ErrorType.VALIDATION_ERROR
elif any(term in error_message for term in ['postcondition', 'condition failed']):
return ErrorType.POSTCONDITION_FAILED
elif any(term in error_message for term in ['matching failed', 'no match']):
return ErrorType.MATCHING_FAILED
return ErrorType.UNKNOWN
def _get_recovery_strategy(self, error_type: ErrorType, context: Dict[str, Any]) -> Optional[BaseRecoveryStrategy]:
"""Obtenir la stratégie de récupération appropriée"""
# Vérifier le mapping direct
if error_type in self.strategy_mapping:
return self.strategy_mapping[error_type]
# Fallback: chercher une stratégie qui peut gérer ce type
for strategy in self.recovery_strategies:
if strategy.can_handle(error_type.value, context):
return strategy
return None
def _apply_recovery_strategy(self, strategy: BaseRecoveryStrategy,
error_ctx: ErrorContext, context: Dict[str, Any]) -> RecoveryResult:
"""Appliquer une stratégie de récupération"""
# Créer le contexte de récupération
recovery_context = RecoveryContext(
error_type=error_ctx.error_type.value,
error_message=error_ctx.message,
original_data=context.get('original_data', {}),
attempt_number=error_ctx.recovery_attempts + 1,
max_attempts=error_ctx.max_recovery_attempts,
timestamp=error_ctx.timestamp,
additional_context=context
)
# Exécuter la stratégie
strategy_result = strategy.recover(recovery_context)
# Convertir le résultat de stratégie vers notre format
if strategy_result.success and strategy_result.should_retry:
return RecoveryResult.retry(
RecoveryStrategy.RETRY,
strategy_result.message,
strategy_result.recovery_data
)
elif strategy_result.success:
return RecoveryResult(
success=True,
should_retry=False,
strategy_used=RecoveryStrategy.FALLBACK,
message=strategy_result.message,
recovery_data=strategy_result.recovery_data
)
else:
return RecoveryResult.escalate(
RecoveryStrategy.ABORT,
strategy_result.escalation_reason or "Strategy failed",
strategy_result.message
)
def _escalate_error(self, error_ctx: ErrorContext, reason: str) -> RecoveryResult:
"""Escalader une erreur après échec des récupérations"""
escalation_message = (
f"Error escalated after {error_ctx.recovery_attempts} recovery attempts. "
f"Reason: {reason}. Original error: {error_ctx.message}"
)
# Logger l'escalade
logger.error(
f"ERROR ESCALATION [ID: {error_ctx.correlation_id}]: {escalation_message}"
)
# Incrémenter compteur d'échecs si edge impliqué
if error_ctx.workflow_edge:
edge_key = f"{error_ctx.workflow_edge.from_node}_{error_ctx.workflow_edge.to_node}"
self.edge_failure_counts[edge_key] = self.edge_failure_counts.get(edge_key, 0) + 1
if self.edge_failure_counts[edge_key] > 3:
self.problematic_edges.add(edge_key)
logger.error(f"Edge {edge_key} marked as problematic (>3 escalations)")
return RecoveryResult.escalate(
RecoveryStrategy.ABORT,
reason,
escalation_message
)
def _log_error_with_correlation(self, error_ctx: ErrorContext) -> str:
"""Logger une erreur avec correlation ID"""
timestamp = error_ctx.timestamp.strftime("%Y%m%d_%H%M%S_%f")
error_id = f"{error_ctx.error_type.value}_{timestamp}"
error_dir = self.error_log_dir / error_id
error_dir.mkdir(parents=True, exist_ok=True)
# Sauvegarder screenshot si disponible
if error_ctx.screen_state and error_ctx.screen_state.raw_level:
if error_ctx.screen_state.raw_level.screenshot_path:
import shutil
screenshot_dest = error_dir / "screenshot.png"
try:
shutil.copy(
error_ctx.screen_state.raw_level.screenshot_path,
screenshot_dest
)
except Exception as e:
logger.error(f"Failed to copy screenshot: {e}")
# Créer rapport d'erreur avec correlation ID
report = {
'error_id': error_id,
'correlation_id': error_ctx.correlation_id,
'error_type': error_ctx.error_type.value,
'timestamp': error_ctx.timestamp.isoformat(),
'message': error_ctx.message,
'details': error_ctx.details,
'recovery_attempts': error_ctx.recovery_attempts,
'original_exception': str(error_ctx.original_exception) if error_ctx.original_exception else None,
'state': self._serialize_state(error_ctx.screen_state) if error_ctx.screen_state else None,
'node': error_ctx.workflow_node.node_id if error_ctx.workflow_node else None,
'edge': f"{error_ctx.workflow_edge.from_node} -> {error_ctx.workflow_edge.to_node}" if error_ctx.workflow_edge else None,
'action': error_ctx.action.type.value if error_ctx.action else None
}
# Sauvegarder rapport
report_path = error_dir / "error_report.json"
with open(report_path, 'w') as f:
json.dump(report, f, indent=2)
logger.error(
f"ERROR LOGGED [ID: {error_ctx.correlation_id}] {error_ctx.error_type.value}: "
f"{error_ctx.message} (Details: {error_dir})"
)
return error_id
def _log_recovery_attempt(self, error_ctx: ErrorContext, recovery_result: RecoveryResult):
"""Logger une tentative de récupération"""
log_level = logging.INFO if recovery_result.success else logging.WARNING
logger.log(
log_level,
f"RECOVERY ATTEMPT [ID: {error_ctx.correlation_id}] "
f"Strategy: {recovery_result.strategy_used.value}, "
f"Success: {recovery_result.success}, "
f"Should Retry: {recovery_result.should_retry}, "
f"Message: {recovery_result.message}"
)
# Sauvegarder détails de récupération
if error_ctx.correlation_id:
recovery_log = {
'correlation_id': error_ctx.correlation_id,
'timestamp': datetime.now().isoformat(),
'strategy_used': recovery_result.strategy_used.value,
'success': recovery_result.success,
'should_retry': recovery_result.should_retry,
'message': recovery_result.message,
'duration_ms': recovery_result.duration_ms,
'recovery_data': recovery_result.recovery_data
}
# Trouver le répertoire d'erreur correspondant
for error_dir in self.error_log_dir.iterdir():
if error_dir.is_dir() and error_ctx.correlation_id in str(error_dir):
recovery_log_path = error_dir / "recovery_attempts.jsonl"
with open(recovery_log_path, 'a') as f:
f.write(json.dumps(recovery_log) + '\n')
break
# Legacy methods - maintained for backward compatibility but now use centralized handle_error
def handle_matching_failure(
self,
screen_state: ScreenState,
candidate_nodes: List[WorkflowNode],
best_confidence: float,
threshold: float
) -> RecoveryResult:
"""
Gérer un échec de matching de node (legacy method).
Now delegates to centralized handle_error.
"""
# Créer une exception simulée pour le matching failure
class MatchingFailedException(Exception):
pass
error = MatchingFailedException(
f"No node matched (best: {best_confidence:.3f}, threshold: {threshold})"
)
context = {
'screen_state': screen_state,
'details': {
'best_confidence': best_confidence,
'threshold': threshold,
'num_candidates': len(candidate_nodes),
'candidate_nodes': candidate_nodes
},
'original_data': {
'best_confidence': best_confidence,
'threshold': threshold,
'candidate_nodes': candidate_nodes
}
}
return self.handle_error(error, context)
def handle_target_not_found(
self,
action: Action,
screen_state: ScreenState,
edge: Optional[WorkflowEdge] = None
) -> RecoveryResult:
"""
Gérer un échec de résolution de target (legacy method).
Now delegates to centralized handle_error.
"""
# Créer une exception simulée pour target not found
class TargetNotFoundError(Exception):
pass
error = TargetNotFoundError(f"Target not found: {action.target}")
context = {
'screen_state': screen_state,
'workflow_edge': edge,
'action': action,
'details': {
'target_role': action.target.role if hasattr(action.target, 'role') else None,
'target_text': action.target.text_pattern if hasattr(action.target, 'text_pattern') else None
},
'original_data': {
'target': {
'role': action.target.role if hasattr(action.target, 'role') else None,
'text_pattern': action.target.text_pattern if hasattr(action.target, 'text_pattern') else None,
'bbox': getattr(action.target, 'bbox', None)
}
}
}
return self.handle_error(error, context)
def handle_postcondition_failure(
self,
edge: WorkflowEdge,
screen_state: ScreenState,
expected_node: Optional[WorkflowNode] = None,
timeout_ms: int = 5000
) -> RecoveryResult:
"""
Gérer une violation de post-condition (legacy method).
Now delegates to centralized handle_error.
"""
# Créer une exception simulée pour postcondition failure
class PostconditionFailedException(Exception):
pass
error = PostconditionFailedException(
f"Post-conditions not satisfied after {timeout_ms}ms"
)
context = {
'screen_state': screen_state,
'workflow_edge': edge,
'workflow_node': expected_node,
'details': {
'timeout_ms': timeout_ms,
'expected_node': expected_node.node_id if expected_node else None,
'edge': f"{edge.from_node} -> {edge.to_node}"
}
}
return self.handle_error(error, context)
def detect_ui_change(
self,
current_state: ScreenState,
expected_node: WorkflowNode,
current_similarity: float
) -> Tuple[bool, Optional[RecoveryResult]]:
"""
Détecter si l'UI a changé de manière significative.
Args:
current_state: État actuel
expected_node: Node attendu
current_similarity: Similarité actuelle avec le prototype
Returns:
Tuple (ui_changed, recovery_result)
"""
ui_changed = current_similarity < self.ui_change_threshold
if not ui_changed:
return (False, None)
# UI a changé - créer contexte d'erreur
error_ctx = ErrorContext(
error_type=ErrorType.UI_CHANGED,
timestamp=datetime.now(),
screen_state=current_state,
workflow_node=expected_node,
message=f"UI changed detected (similarity: {current_similarity:.3f} < {self.ui_change_threshold})",
details={
'current_similarity': current_similarity,
'threshold': self.ui_change_threshold,
'expected_node': expected_node.node_id
}
)
self.error_history.append(error_ctx)
# Logger l'erreur avec screenshot
error_id = self._log_error(error_ctx)
logger.error(
f"UI change detected (error_id={error_id}): {error_ctx.message}"
)
# Stratégie: Pause pour analyse manuelle
recovery = RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.PAUSE,
message="UI changed - pause exécution automatique pour analyse"
)
return (True, recovery)
def rollback_last_action(
self
) -> RecoveryResult:
"""
Tenter de rollback la dernière action exécutée.
Implémente les actions inverses pour chaque type d'action.
Returns:
RecoveryResult indiquant succès du rollback
Validates: Requirements 14.5
"""
if not self.action_history:
return RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.ROLLBACK,
message="No action to rollback"
)
last_action, state_before = self.action_history.pop()
logger.info(f"Attempting rollback of action: {last_action.type}")
try:
# Implémenter inverse selon type d'action
if last_action.type == ActionType.MOUSE_CLICK:
# Pour un click, on ne peut pas vraiment "dé-cliquer"
# Mais on peut essayer de revenir à l'état précédent
# en cliquant sur un bouton "retour" ou "annuler" si disponible
logger.info("Rollback mouse_click: Looking for cancel/back button")
rollback_message = "Mouse click rollback: State restored (no inverse action)"
elif last_action.type == ActionType.TEXT_INPUT:
# Pour text_input, on peut effacer le texte
logger.info("Rollback text_input: Clearing text field")
try:
import pyautogui
# Sélectionner tout et supprimer
pyautogui.hotkey('ctrl', 'a')
pyautogui.press('delete')
rollback_message = "Text input rolled back: Field cleared"
except Exception as e:
logger.error(f"Failed to clear text field: {e}")
rollback_message = f"Text input rollback failed: {e}"
elif last_action.type == ActionType.KEY_PRESS:
# Pour key_press, difficile d'inverser
# On log juste
logger.info("Rollback key_press: No inverse action available")
rollback_message = "Key press rollback: No inverse action (state restored)"
elif last_action.type == ActionType.COMPOUND:
# Pour compound, on devrait rollback chaque sous-action
# Pour l'instant, on log juste
logger.info("Rollback compound: Would need to rollback sub-actions")
rollback_message = "Compound action rollback: Partial (state restored)"
else:
logger.warning(f"Unknown action type for rollback: {last_action.type}")
rollback_message = f"Unknown action type: {last_action.type}"
# Logger le rollback
error_ctx = ErrorContext(
error_type=ErrorType.UNKNOWN,
timestamp=datetime.now(),
message=f"Rollback executed: {rollback_message}",
details={
"action": "rollback",
"action_type": str(last_action.type),
"rollback_message": rollback_message
}
)
self._log_error(error_ctx)
return RecoveryResult(
success=True,
strategy_used=RecoveryStrategy.ROLLBACK,
message=rollback_message,
new_state=state_before
)
except Exception as e:
logger.error(f"Rollback failed: {e}")
return RecoveryResult(
success=False,
strategy_used=RecoveryStrategy.ROLLBACK,
message=f"Rollback failed: {e}"
)
def record_action(
self,
action: Action,
state_before: ScreenState
):
"""
Enregistrer une action dans l'historique pour rollback.
Args:
action: Action exécutée
state_before: État avant l'action
"""
self.action_history.append((action, state_before))
# Limiter taille de l'historique
if len(self.action_history) > self.max_action_history:
self.action_history.pop(0)
def get_problematic_edges(self) -> List[Tuple[str, int]]:
"""
Obtenir la liste des edges problématiques.
Returns:
Liste de tuples (edge_key, failure_count)
"""
return [
(edge_key, self.edge_failure_counts[edge_key])
for edge_key in self.problematic_edges
]
def get_error_statistics(self) -> Dict[str, Any]:
"""
Obtenir des statistiques sur les erreurs.
Returns:
Dict avec statistiques
"""
error_counts = {}
for error in self.error_history:
error_type = error.error_type.value
error_counts[error_type] = error_counts.get(error_type, 0) + 1
return {
'total_errors': len(self.error_history),
'error_counts': error_counts,
'problematic_edges_count': len(self.problematic_edges),
'problematic_edges': self.get_problematic_edges()
}
def _log_error(
self,
error_ctx: ErrorContext,
candidate_nodes: Optional[List[WorkflowNode]] = None
) -> str:
"""
Logger une erreur avec tous les détails (legacy method).
Now delegates to _log_error_with_correlation.
"""
return self._log_error_with_correlation(error_ctx)
def _serialize_state(self, state: ScreenState) -> Dict[str, Any]:
"""Sérialiser un ScreenState pour le rapport."""
window_title = None
if hasattr(state, 'window') and state.window:
window_title = getattr(state.window, 'window_title', None)
ui_elements_count = 0
if hasattr(state, 'ui_elements') and state.ui_elements:
ui_elements_count = len(state.ui_elements)
elif hasattr(state, 'perception_level') and state.perception_level:
ui_elements_count = len(getattr(state.perception_level, 'ui_elements', []))
screenshot_path = None
if hasattr(state, 'raw_level') and state.raw_level:
screenshot_path = str(getattr(state.raw_level, 'screenshot_path', ''))
elif hasattr(state, 'raw') and state.raw:
screenshot_path = str(getattr(state.raw, 'screenshot_path', ''))
return {
'window_title': window_title,
'ui_elements_count': ui_elements_count,
'screenshot_path': screenshot_path
}
def _generate_matching_suggestions(
self,
best_confidence: float,
threshold: float,
candidate_nodes: List[WorkflowNode]
) -> List[str]:
"""Générer suggestions pour échec de matching."""
suggestions = []
if best_confidence < 0.70:
suggestions.append("CREATE_NEW_NODE: Similarité très faible, probablement nouvel état")
elif best_confidence < threshold:
suggestions.append(f"UPDATE_NODE: Similarité proche, mettre à jour prototype")
suggestions.append(f"ADJUST_THRESHOLD: Ou réduire seuil à {best_confidence - 0.02:.3f}")
if not candidate_nodes:
suggestions.append("NO_CANDIDATES: Aucun node candidat fourni")
return suggestions
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
handler = ErrorHandler()
print(f"ErrorHandler initialized: {handler}")
print(f"Error log directory: {handler.error_log_dir}")