Files
rpa_vision_v3/core/healing/healing_engine.py
Dom d8756883c5 feat(corrections): Add Correction Packs system for cross-workflow learning
Implement a complete system for capitalizing user corrections across multiple
workflows and sessions. This enables automatic application of learned fixes
when similar failures occur in different contexts.

New components:
- core/corrections/models.py: CorrectionKey, Correction, CorrectionPack models
- core/corrections/correction_repository.py: JSON storage with atomic writes
- core/corrections/aggregator.py: Aggregation by hash and quality filtering
- core/corrections/correction_pack_service.py: CRUD, export/import, versioning
- backend/api/correction_packs.py: REST API with 15 endpoints

Features:
- MD5-based key hashing for correction deduplication
- Export/import in JSON and YAML formats
- Version history with rollback support
- Cross-workflow pattern detection
- Integration with SelfHealingEngine for automatic application
- 29 unit tests (all passing)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-18 18:48:35 +01:00

444 lines
16 KiB
Python

"""Main self-healing engine for workflow recovery."""
import time
import logging
from typing import List, Optional, Dict, Any, Tuple
from pathlib import Path
from .models import RecoveryContext, RecoveryResult, RecoverySuggestion
from .learning_repository import LearningRepository
from .confidence_scorer import ConfidenceScorer
from .strategies import (
RecoveryStrategy,
SemanticVariantStrategy,
SpatialFallbackStrategy,
TimingAdaptationStrategy,
FormatTransformationStrategy
)
logger = logging.getLogger(__name__)
class SelfHealingEngine:
"""Main engine for self-healing workflows."""
def __init__(
self,
learning_repo: Optional[LearningRepository] = None,
storage_path: Optional[Path] = None,
correction_packs_enabled: bool = True
):
"""
Initialize the self-healing engine.
Args:
learning_repo: Learning repository instance
storage_path: Path for storing learned patterns
correction_packs_enabled: Whether to consult correction packs
"""
# Initialize learning repository
if learning_repo:
self.learning_repo = learning_repo
else:
storage_path = storage_path or Path('data/healing')
self.learning_repo = LearningRepository(storage_path)
# Initialize confidence scorer
self.confidence_scorer = ConfidenceScorer()
# Initialize recovery strategies
self.recovery_strategies: List[RecoveryStrategy] = [
SemanticVariantStrategy(),
SpatialFallbackStrategy(),
TimingAdaptationStrategy(),
FormatTransformationStrategy()
]
# Configuration
self.max_recovery_time = 30.0 # seconds
self.parallel_execution = True
# Correction packs integration
self.correction_packs_enabled = correction_packs_enabled
self._correction_pack_service = None
@property
def correction_pack_service(self):
"""Lazy-load correction pack service."""
if self._correction_pack_service is None and self.correction_packs_enabled:
try:
from core.corrections import CorrectionPackService
self._correction_pack_service = CorrectionPackService()
logger.info("CorrectionPackService initialized for healing engine")
except ImportError as e:
logger.warning(f"CorrectionPackService not available: {e}")
self.correction_packs_enabled = False
return self._correction_pack_service
def attempt_recovery(self, context: RecoveryContext) -> RecoveryResult:
"""
Attempt to recover from a workflow failure.
Args:
context: Recovery context with failure information
Returns:
RecoveryResult with outcome
"""
start_time = time.time()
# Check if we've exceeded max attempts
if context.attempt_count >= context.max_attempts:
return RecoveryResult(
success=False,
strategy_used='none',
error_message=f'Max recovery attempts ({context.max_attempts}) exceeded'
)
# Try correction packs first (if enabled)
if self.correction_packs_enabled and self.correction_pack_service:
pack_result = self._try_correction_packs(context)
if pack_result and pack_result.success:
return pack_result
# Get learned patterns for this context
learned_patterns = self.learning_repo.get_matching_patterns(context)
# Get prioritized strategies
strategies = self._prioritize_strategies(context, learned_patterns)
# Try each strategy
for strategy in strategies:
# Check time limit
elapsed = time.time() - start_time
if elapsed >= self.max_recovery_time:
break
# Skip if strategy can't handle this failure
if not strategy.can_handle(context):
continue
# Attempt recovery
result = strategy.attempt_recovery(context)
# If successful, learn from it
if result.success:
# Calculate final confidence
historical_success = self._get_historical_success_rate(
strategy.name, context, learned_patterns
)
result.confidence_score = self.confidence_scorer.calculate_recovery_confidence(
result.strategy_used,
context,
historical_success
)
# Check if safe to proceed
if self.confidence_scorer.is_safe_to_proceed(
result.confidence_score,
context.confidence_threshold,
involves_data_modification=self._involves_data_modification(context)
):
# Learn from success
self.learn_from_success(context, result)
return result
else:
# Confidence too low, mark as requiring user input
result.requires_user_input = True
return result
# All strategies failed
total_time = time.time() - start_time
return RecoveryResult(
success=False,
strategy_used='all_failed',
execution_time=total_time,
error_message='All recovery strategies failed',
requires_user_input=True
)
def learn_from_success(self, context: RecoveryContext, result: RecoveryResult):
"""
Learn from successful recovery for future use.
Args:
context: Recovery context
result: Successful recovery result
"""
self.learning_repo.store_pattern(context, result)
def get_recovery_suggestions(self, context: RecoveryContext) -> List[RecoverySuggestion]:
"""
Get ranked recovery suggestions based on learned patterns.
Args:
context: Recovery context
Returns:
List of recovery suggestions sorted by confidence
"""
suggestions = []
# Get learned patterns
learned_patterns = self.learning_repo.get_matching_patterns(context)
# Get suggestions from each strategy
for strategy in self.recovery_strategies:
if not strategy.can_handle(context):
continue
# Calculate confidence based on historical success
historical_success = self._get_historical_success_rate(
strategy.name, context, learned_patterns
)
confidence = self.confidence_scorer.calculate_recovery_confidence(
strategy.name,
context,
historical_success
)
suggestion = RecoverySuggestion(
strategy=strategy.name,
confidence=confidence,
description=self._get_strategy_description(strategy),
estimated_time=self._estimate_strategy_time(strategy, context)
)
suggestions.append(suggestion)
# Sort by confidence
suggestions.sort(key=lambda s: s.confidence, reverse=True)
return suggestions
def prune_learned_patterns(
self,
max_age_days: int = 90,
min_confidence: float = 0.3
):
"""
Prune outdated learned patterns.
Args:
max_age_days: Maximum age for patterns
min_confidence: Minimum confidence threshold
"""
self.learning_repo.prune_outdated_patterns(max_age_days, min_confidence)
def _prioritize_strategies(
self,
context: RecoveryContext,
learned_patterns: List
) -> List[RecoveryStrategy]:
"""Prioritize strategies based on context and learned patterns."""
# Score each strategy
scored_strategies = []
for strategy in self.recovery_strategies:
# Base priority from strategy
priority = strategy.get_priority(context)
# Boost priority if we have successful patterns
historical_success = self._get_historical_success_rate(
strategy.name, context, learned_patterns
)
if historical_success > 0:
priority *= (1.0 + historical_success)
scored_strategies.append((priority, strategy))
# Sort by priority (highest first)
scored_strategies.sort(key=lambda x: x[0], reverse=True)
return [strategy for _, strategy in scored_strategies]
def _get_historical_success_rate(
self,
strategy_name: str,
context: RecoveryContext,
learned_patterns: List
) -> float:
"""Get historical success rate for a strategy."""
matching_patterns = [
p for p in learned_patterns
if p.recovery_strategy == strategy_name
]
if not matching_patterns:
return 0.0
# Return average success rate
return sum(p.success_rate for p in matching_patterns) / len(matching_patterns)
def _involves_data_modification(self, context: RecoveryContext) -> bool:
"""Check if the action involves data modification."""
data_modification_actions = [
'input', 'type', 'submit', 'delete', 'update', 'save'
]
return any(action in context.original_action.lower()
for action in data_modification_actions)
def _get_strategy_description(self, strategy: RecoveryStrategy) -> str:
"""Get human-readable description of strategy."""
descriptions = {
'SemanticVariantStrategy': 'Try semantic variants of the element (e.g., Submit → Send)',
'SpatialFallbackStrategy': 'Search in expanded area around original position',
'TimingAdaptationStrategy': 'Increase wait time for element to appear',
'FormatTransformationStrategy': 'Transform input format to match validation'
}
return descriptions.get(strategy.name, strategy.name)
def _estimate_strategy_time(
self,
strategy: RecoveryStrategy,
context: RecoveryContext
) -> float:
"""Estimate execution time for strategy."""
# Simple estimates in seconds
estimates = {
'SemanticVariantStrategy': 2.0,
'SpatialFallbackStrategy': 5.0,
'TimingAdaptationStrategy': 10.0,
'FormatTransformationStrategy': 1.0
}
return estimates.get(strategy.name, 3.0)
def _try_correction_packs(self, context: RecoveryContext) -> Optional[RecoveryResult]:
"""
Try to find and apply a correction from correction packs.
Args:
context: Recovery context
Returns:
RecoveryResult if a correction was successfully applied, None otherwise
"""
try:
# Get element type from metadata
element_type = context.metadata.get('element_type', 'unknown')
# Search for applicable corrections
corrections = self.correction_pack_service.find_applicable_corrections(
action_type=context.original_action,
element_type=element_type,
failure_context=context.failure_reason,
min_confidence=0.5 # Only use high-confidence corrections
)
if not corrections:
return None
# Try corrections in order of confidence
for correction_info in corrections:
pack_id = correction_info['pack_id']
correction = correction_info['correction']
try:
# Apply the correction
applied = self._apply_correction(context, correction)
if applied:
# Record successful application
self.correction_pack_service.apply_correction(
pack_id=pack_id,
correction_id=correction['id'],
success=True
)
# Propagate to learning repository
result = RecoveryResult(
success=True,
strategy_used=f"correction_pack:{correction['correction_type']}",
confidence_score=correction['confidence_score'],
modified_action=correction.get('corrected_params'),
modified_target=correction.get('corrected_target')
)
# Learn from this for future reference
self.learning_repo.store_pattern(context, result)
logger.info(
f"Applied correction from pack {pack_id}: "
f"{correction['id']} (confidence: {correction['confidence_score']:.2f})"
)
return result
except Exception as e:
logger.warning(f"Failed to apply correction {correction['id']}: {e}")
# Record failed application
self.correction_pack_service.apply_correction(
pack_id=pack_id,
correction_id=correction['id'],
success=False
)
return None
except Exception as e:
logger.error(f"Error in correction pack lookup: {e}")
return None
def _apply_correction(
self,
context: RecoveryContext,
correction: Dict[str, Any]
) -> bool:
"""
Apply a correction to the current context.
Args:
context: Recovery context
correction: Correction data dict
Returns:
True if correction was applied successfully
"""
correction_type = correction.get('correction_type', 'other')
# Handle different correction types
if correction_type == 'target_change':
# Update target in context
if correction.get('corrected_target'):
context.metadata['corrected_target'] = correction['corrected_target']
return True
elif correction_type == 'parameter_change':
# Update parameters in context
if correction.get('corrected_params'):
context.metadata['corrected_params'] = correction['corrected_params']
return True
elif correction_type == 'wait_added':
# Add wait time
wait_time = correction.get('corrected_params', {}).get('wait_time', 2.0)
context.metadata['additional_wait'] = wait_time
return True
elif correction_type == 'timing_adjust':
# Adjust timing
timing = correction.get('corrected_params', {}).get('timing_multiplier', 1.5)
context.metadata['timing_multiplier'] = timing
return True
elif correction_type == 'coordinates_adjust':
# Adjust coordinates
offset = correction.get('corrected_params', {}).get('offset', {})
context.metadata['coordinate_offset'] = offset
return True
# For other types, just mark as applied if we have any correction data
return bool(correction.get('corrected_target') or correction.get('corrected_params'))
def get_correction_pack_statistics(self) -> Optional[Dict[str, Any]]:
"""
Get statistics from correction packs.
Returns:
Statistics dict or None if not available
"""
if not self.correction_packs_enabled or not self.correction_pack_service:
return None
try:
return self.correction_pack_service.get_global_statistics()
except Exception as e:
logger.error(f"Error getting correction pack statistics: {e}")
return None