Files
rpa_vision_v3/core/healing/execution_integration.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

344 lines
11 KiB
Python

"""Integration of self-healing with execution loop."""
import logging
from typing import Optional, Dict, Any
from pathlib import Path
from core.healing.healing_engine import SelfHealingEngine
from core.healing.recovery_logger import RecoveryLogger
from core.healing.models import RecoveryContext, RecoveryResult
from core.execution.action_executor import ExecutionResult, ExecutionStatus
# Analytics integration
try:
from core.analytics.analytics_system import get_analytics_system
ANALYTICS_AVAILABLE = True
except ImportError:
ANALYTICS_AVAILABLE = False
logger = logging.getLogger(__name__)
class SelfHealingIntegration:
"""
Integration layer between self-healing engine and execution loop.
This class provides methods to integrate self-healing capabilities
into the existing execution loop without major refactoring.
"""
def __init__(
self,
storage_path: Optional[Path] = None,
log_path: Optional[Path] = None,
enabled: bool = True
):
"""
Initialize self-healing integration.
Args:
storage_path: Path for storing learned patterns
log_path: Path for recovery logs
enabled: Whether self-healing is enabled
"""
self.enabled = enabled
if enabled:
self.healing_engine = SelfHealingEngine(storage_path=storage_path)
self.recovery_logger = RecoveryLogger(log_path=log_path)
logger.info("Self-healing integration initialized")
else:
self.healing_engine = None
self.recovery_logger = None
logger.info("Self-healing integration disabled")
# Analytics integration
self._analytics = None
if ANALYTICS_AVAILABLE:
try:
self._analytics = get_analytics_system()
logger.info("Analytics integrated with self-healing")
except Exception as e:
logger.warning(f"Analytics integration failed: {e}")
def handle_execution_failure(
self,
action_info: Dict[str, Any],
execution_result: ExecutionResult,
workflow_id: str,
node_id: str,
screenshot_path: str,
attempt_count: int = 1
) -> Optional[RecoveryResult]:
"""
Handle an execution failure and attempt recovery.
Args:
action_info: Information about the failed action
execution_result: Result of the failed execution
workflow_id: ID of the workflow
node_id: ID of the current node
screenshot_path: Path to screenshot at time of failure
attempt_count: Number of attempts so far
Returns:
RecoveryResult if recovery attempted, None if disabled
"""
if not self.enabled:
return None
# Create recovery context
context = self._create_recovery_context(
action_info=action_info,
execution_result=execution_result,
workflow_id=workflow_id,
node_id=node_id,
screenshot_path=screenshot_path,
attempt_count=attempt_count
)
# Attempt recovery
logger.info(f"Attempting recovery for failed action: {action_info.get('action')}")
result = self.healing_engine.attempt_recovery(context)
# Log the recovery attempt
self.recovery_logger.log_recovery_attempt(context, result)
# Notify analytics about recovery attempt
if self._analytics:
try:
self._analytics.collectors.metrics.record_recovery_attempt(
workflow_id=workflow_id,
node_id=node_id,
failure_reason=context.failure_reason,
recovery_success=result.success,
strategy_used=result.strategy_used if result.success else None,
confidence=result.confidence if result.success else 0.0
)
except Exception as e:
logger.warning(f"Analytics recovery notification failed: {e}")
return result
def update_workflow_from_recovery(
self,
workflow_id: str,
node_id: str,
edge_id: str,
recovery_result: RecoveryResult
) -> bool:
"""
Update workflow definition based on successful recovery.
Args:
workflow_id: ID of the workflow
node_id: ID of the node
edge_id: ID of the edge
recovery_result: Successful recovery result
Returns:
True if workflow updated successfully
"""
if not self.enabled or not recovery_result.success:
return False
try:
# Extract learned pattern
if recovery_result.learned_pattern:
logger.info(
f"Updating workflow {workflow_id} with learned pattern: "
f"{recovery_result.learned_pattern}"
)
# TODO: Integrate with workflow storage to update definition
# This would update the workflow's edge or node with new information
# For now, just log the update
return True
except Exception as e:
logger.error(f"Failed to update workflow: {e}")
return False
def get_recovery_suggestions(
self,
action_info: Dict[str, Any],
workflow_id: str,
node_id: str,
screenshot_path: str
) -> list:
"""
Get recovery suggestions for a potential failure.
Args:
action_info: Information about the action
workflow_id: ID of the workflow
node_id: ID of the node
screenshot_path: Path to current screenshot
Returns:
List of recovery suggestions
"""
if not self.enabled:
return []
# Create a dummy context for getting suggestions
context = RecoveryContext(
original_action=action_info.get('action', 'unknown'),
target_element=action_info.get('target', 'unknown'),
failure_reason='potential_failure',
screenshot_path=screenshot_path,
workflow_id=workflow_id,
node_id=node_id,
attempt_count=0
)
return self.healing_engine.get_recovery_suggestions(context)
def get_statistics(self) -> Dict[str, Any]:
"""
Get self-healing statistics.
Returns:
Dictionary with statistics
"""
if not self.enabled:
return {"enabled": False}
stats = self.recovery_logger.get_recovery_statistics()
stats["enabled"] = True
return stats
def get_insights(self) -> list:
"""
Get insights from recovery patterns.
Returns:
List of insight strings
"""
if not self.enabled:
return []
return self.recovery_logger.generate_insights()
def check_alerts(self) -> list:
"""
Check for alerts that need administrator attention.
Returns:
List of alert dictionaries
"""
if not self.enabled:
return []
return self.recovery_logger.check_for_alerts()
def prune_patterns(
self,
max_age_days: int = 90,
min_confidence: float = 0.3
):
"""
Prune outdated recovery patterns.
Args:
max_age_days: Maximum age for patterns
min_confidence: Minimum confidence threshold
"""
if not self.enabled:
return
self.healing_engine.prune_learned_patterns(max_age_days, min_confidence)
logger.info(f"Pruned patterns older than {max_age_days} days")
def _create_recovery_context(
self,
action_info: Dict[str, Any],
execution_result: ExecutionResult,
workflow_id: str,
node_id: str,
screenshot_path: str,
attempt_count: int
) -> RecoveryContext:
"""Create a recovery context from execution failure."""
# Determine failure reason from execution result
failure_reason = self._determine_failure_reason(execution_result)
# Extract target element
target_element = action_info.get('target', 'unknown')
# Extract metadata
metadata = {
'action_type': action_info.get('action', 'unknown'),
'execution_status': execution_result.status.value,
'error_message': execution_result.message,
'element_type': action_info.get('element_type', 'unknown')
}
# Add input value if available
if 'value' in action_info:
metadata['input_value'] = action_info['value']
return RecoveryContext(
original_action=action_info.get('action', 'unknown'),
target_element=target_element,
failure_reason=failure_reason,
screenshot_path=screenshot_path,
workflow_id=workflow_id,
node_id=node_id,
attempt_count=attempt_count,
metadata=metadata
)
def _determine_failure_reason(self, execution_result: ExecutionResult) -> str:
"""Determine failure reason from execution result."""
if execution_result.status == ExecutionStatus.TARGET_NOT_FOUND:
return 'element_not_found'
elif execution_result.status == ExecutionStatus.TIMEOUT:
return 'timeout'
elif execution_result.status == ExecutionStatus.FAILED:
# Try to infer from message
message = execution_result.message.lower()
if 'validation' in message or 'invalid' in message:
return 'validation_failed'
elif 'timeout' in message:
return 'timeout'
elif 'not found' in message:
return 'element_not_found'
else:
return 'execution_failed'
else:
return 'unknown_failure'
# Global instance for easy access
_global_integration: Optional[SelfHealingIntegration] = None
def get_self_healing_integration(
storage_path: Optional[Path] = None,
log_path: Optional[Path] = None,
enabled: bool = True
) -> SelfHealingIntegration:
"""
Get or create the global self-healing integration instance.
Args:
storage_path: Path for storing learned patterns
log_path: Path for recovery logs
enabled: Whether self-healing is enabled
Returns:
SelfHealingIntegration instance
"""
global _global_integration
if _global_integration is None:
_global_integration = SelfHealingIntegration(
storage_path=storage_path,
log_path=log_path,
enabled=enabled
)
return _global_integration