#!/usr/bin/env python3 """ Demo: Full Integration - Analytics + ExecutionLoop + Self-Healing This demo shows the complete integration of: - Analytics System (metrics collection, insights, reports) - Execution Loop (workflow execution) - Self-Healing (automatic recovery) All systems work together seamlessly. """ import time import logging from datetime import datetime, timedelta from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) print("=" * 60) print(" 🚀 RPA Vision V3 - Full Integration Demo") print("=" * 60) print() # ============================================================================ # 1. Initialize Analytics System # ============================================================================ print("1️⃣ Initializing Analytics System...") print() try: from core.analytics.analytics_system import get_analytics_system analytics = get_analytics_system() print(" ✅ Analytics System initialized") print(f" 📊 Collectors: Metrics, Resources") print(f" 🔍 Engines: Performance, Anomaly, Insights") print(f" 📈 Real-time tracking enabled") print() except Exception as e: print(f" ❌ Analytics initialization failed: {e}") print() # ============================================================================ # 2. Initialize Self-Healing # ============================================================================ print("2️⃣ Initializing Self-Healing System...") print() try: from core.healing.execution_integration import get_self_healing_integration healing = get_self_healing_integration(enabled=True) print(" ✅ Self-Healing System initialized") print(f" 🔧 Strategies: Semantic, Spatial, Timing, Format") print(f" 📚 Learning Repository active") print(f" 🔗 Analytics integration enabled") print() except Exception as e: print(f" ❌ Self-Healing initialization failed: {e}") print() # ============================================================================ # 3. Simulate Workflow Executions with Analytics # ============================================================================ print("3️⃣ Simulating Workflow Executions...") print() # Simulate multiple workflow executions workflows = [ {"id": "login_workflow", "steps": 5, "success_rate": 0.9}, {"id": "data_entry_workflow", "steps": 8, "success_rate": 0.85}, {"id": "report_generation", "steps": 12, "success_rate": 0.95}, ] execution_count = 0 for workflow in workflows: workflow_id = workflow["id"] steps = workflow["steps"] success_rate = workflow["success_rate"] print(f" 📋 Workflow: {workflow_id}") # Simulate 3 executions per workflow for i in range(3): execution_id = f"exec_{workflow_id}_{i}_{int(time.time())}" # Start execution tracking try: # Record execution start analytics.metrics_collector.record_execution_start( execution_id=execution_id, workflow_id=workflow_id, context={"mode": "automatic"} ) started_at = datetime.now() # Simulate steps steps_succeeded = 0 steps_failed = 0 for step_num in range(steps): step_start = datetime.now() # Simulate step execution import random success = random.random() < success_rate duration_ms = random.uniform(100, 500) confidence = random.uniform(0.7, 0.95) if success else random.uniform(0.3, 0.6) time.sleep(duration_ms / 1000.0) # Simulate work step_end = datetime.now() # Record step from core.analytics.collection.metrics_collector import StepMetrics step_metrics = StepMetrics( step_id=f"step_{step_num}", execution_id=execution_id, workflow_id=workflow_id, node_id=f"node_{step_num}", action_type="click", target_element="button", started_at=step_start, completed_at=step_end, duration_ms=duration_ms, status="success" if success else "failed", confidence_score=confidence ) analytics.metrics_collector.record_step(step_metrics) if success: steps_succeeded += 1 else: steps_failed += 1 # Simulate self-healing attempt if random.random() < 0.7: # 70% recovery rate print(f" 🔧 Self-healing: Attempting recovery for step {step_num}") # Record recovery attempt analytics.metrics_collector.record_recovery_attempt( workflow_id=workflow_id, node_id=f"step_{step_num}", failure_reason="element_not_found", recovery_success=True, strategy_used="semantic_variants", confidence=0.85 ) steps_succeeded += 1 steps_failed -= 1 print(f" ✅ Recovery successful!") # Complete execution completed_at = datetime.now() duration_ms = (completed_at - started_at).total_seconds() * 1000 # Record completion analytics.metrics_collector.record_execution_complete( execution_id=execution_id, status="completed" if steps_failed == 0 else "failed", steps_total=steps, steps_completed=steps_succeeded, steps_failed=steps_failed, error_message=None if steps_failed == 0 else "Some steps failed" ) status = "✅ Success" if steps_failed == 0 else "⚠️ Partial" print(f" {status} - {steps_succeeded}/{steps} steps ({duration_ms:.0f}ms)") execution_count += 1 except Exception as e: print(f" ❌ Error: {e}") print() print(f" 📊 Total executions: {execution_count}") print() # ============================================================================ # 4. Query Analytics Data # ============================================================================ print("4️⃣ Querying Analytics Data...") print() try: # Get performance stats print(" 📈 Performance Analysis:") for workflow in workflows: workflow_id = workflow["id"] # Query metrics metrics = analytics.query_engine.query( metric_type="execution", filters={"workflow_id": workflow_id}, time_range=(datetime.now() - timedelta(hours=1), datetime.now()) ) if metrics: avg_duration = sum(m.get('duration_ms', 0) for m in metrics) / len(metrics) success_count = sum(1 for m in metrics if m.get('status') == 'completed') success_rate = (success_count / len(metrics)) * 100 if metrics else 0 print(f" • {workflow_id}:") print(f" - Executions: {len(metrics)}") print(f" - Avg Duration: {avg_duration:.0f}ms") print(f" - Success Rate: {success_rate:.1f}%") print() except Exception as e: print(f" ⚠️ Query failed: {e}") print() # ============================================================================ # 5. Generate Insights # ============================================================================ print("5️⃣ Generating Insights...") print() try: # Generate insights for each workflow for workflow in workflows[:2]: # Just first 2 for demo workflow_id = workflow["id"] insights = analytics.insight_generator.generate_insights( workflow_id=workflow_id, time_window_hours=24 ) if insights: print(f" 💡 Insights for {workflow_id}:") for insight in insights[:2]: # Show top 2 print(f" • {insight.insight_type}: {insight.description}") print(f" Priority: {insight.priority_score:.2f}") print() except Exception as e: print(f" ⚠️ Insight generation failed: {e}") print() # ============================================================================ # 6. Check for Anomalies # ============================================================================ print("6️⃣ Detecting Anomalies...") print() try: anomalies = analytics.anomaly_detector.detect_anomalies( time_window_hours=1 ) if anomalies: print(f" 🚨 Found {len(anomalies)} anomalies:") for anomaly in anomalies[:3]: # Show top 3 print(f" • {anomaly.anomaly_type}: {anomaly.description}") print(f" Severity: {anomaly.severity:.2f}") print() else: print(" ✅ No anomalies detected") print() except Exception as e: print(f" ⚠️ Anomaly detection failed: {e}") print() # ============================================================================ # 7. Generate Report # ============================================================================ print("7️⃣ Generating Analytics Report...") print() try: report_path = analytics.report_generator.generate_report( report_type="performance", workflow_ids=[w["id"] for w in workflows], time_range=(datetime.now() - timedelta(hours=1), datetime.now()), format="json", output_path=Path("reports/integration_demo_report.json") ) print(f" 📄 Report generated: {report_path}") print() except Exception as e: print(f" ⚠️ Report generation failed: {e}") print() # ============================================================================ # 8. Self-Healing Statistics # ============================================================================ print("8️⃣ Self-Healing Statistics...") print() try: stats = healing.get_statistics() if stats.get('enabled'): print(" 🔧 Self-Healing Stats:") print(f" • Total Attempts: {stats.get('total_attempts', 0)}") print(f" • Successful: {stats.get('successful_recoveries', 0)}") print(f" • Success Rate: {stats.get('success_rate', 0):.1f}%") print() # Get insights insights = healing.get_insights() if insights: print(" 💡 Self-Healing Insights:") for insight in insights[:2]: print(f" • {insight}") print() except Exception as e: print(f" ⚠️ Self-healing stats failed: {e}") print() # ============================================================================ # 9. Real-time Metrics # ============================================================================ print("9️⃣ Real-time Analytics...") print() try: # Get active executions active = analytics.realtime_analytics.get_active_executions() print(f" ⚡ Active Executions: {len(active)}") # Get recent metrics recent_metrics = analytics.realtime_analytics.get_recent_metrics(limit=5) if recent_metrics: print(f" 📊 Recent Metrics: {len(recent_metrics)} entries") print() except Exception as e: print(f" ⚠️ Real-time metrics failed: {e}") print() # ============================================================================ # 10. Summary # ============================================================================ print("=" * 60) print(" ✅ Integration Demo Complete!") print("=" * 60) print() print("🎯 What was demonstrated:") print() print(" ✅ Analytics System") print(" • Automatic metrics collection") print(" • Performance analysis") print(" • Anomaly detection") print(" • Insight generation") print(" • Report generation") print() print(" ✅ Self-Healing Integration") print(" • Automatic recovery attempts") print(" • Analytics tracking of recoveries") print(" • Learning from failures") print() print(" ✅ ExecutionLoop Integration") print(" • Seamless analytics hooks") print(" • Resource monitoring") print(" • Real-time tracking") print() print(" ✅ End-to-End Flow") print(" • Workflow execution → Analytics → Insights") print(" • Failure → Self-Healing → Analytics") print(" • Real-time monitoring → Reports") print() print("=" * 60) print() print("📚 Next Steps:") print(" • Run: python demo_full_integration.py") print(" • View reports in: reports/") print(" • Check analytics DB: data/analytics/") print(" • Monitor real-time: Use analytics API") print() print("=" * 60)