#!/usr/bin/env python3 """ Exemple de workflow complet : Capture → Apprentissage → Rejeu Ce script démontre le cycle complet du système RPA Vision V2 : 1. Capture d'événements utilisateur 2. Détection de patterns répétitifs 3. Apprentissage de tâches 4. Rejeu intelligent avec reconnaissance visuelle """ import asyncio import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent / "geniusia2")) from core.event_capture import EventCapture from core.learning_manager import LearningManager from core.embeddings_manager import EmbeddingsManager from core.task_replay import TaskReplayEngine from core.utils.vision_utils import VisionUtils from core.utils.input_utils import InputUtils from core.logger import Logger from core.config import get_config class RPAWorkflowDemo: """Démonstration du workflow complet RPA Vision V2.""" def __init__(self): """Initialise tous les composants.""" self.config = get_config() self.logger = Logger() # Composants de base self.embeddings_manager = EmbeddingsManager(self.logger, self.config) self.vision_utils = VisionUtils(self.logger, self.config) self.input_utils = InputUtils(self.logger, self.config) # Gestionnaire d'apprentissage self.learning_manager = LearningManager( self.embeddings_manager, self.logger, self.config, profiles_path="geniusia2/data/user_profiles" ) # Capture d'événements self.event_capture = EventCapture( self.learning_manager, self.embeddings_manager, self.vision_utils, self.logger, self.config ) # Moteur de rejeu self.replay_engine = TaskReplayEngine( self.learning_manager, self.embeddings_manager, self.vision_utils, self.input_utils, self.logger, self.config ) async def demo_capture_phase(self, duration: int = 30): """ Phase 1 : Capture d'événements. Args: duration: Durée de capture en secondes """ print("\n" + "="*60) print("📹 PHASE 1 : CAPTURE D'ÉVÉNEMENTS") print("="*60) print(f"\nCapture pendant {duration} secondes...") print("Effectuez des actions répétitives pour que le système apprenne.\n") # Démarrer la capture self.event_capture.start() # Attendre for i in range(duration, 0, -1): print(f"⏱️ {i} secondes restantes...", end="\r") await asyncio.sleep(1) # Arrêter la capture self.event_capture.stop() print("\n✅ Capture terminée!") # Afficher les statistiques stats = self.event_capture.get_stats() print(f"\n📊 Statistiques de capture:") print(f" - Événements capturés: {stats['total_events']}") print(f" - Patterns détectés: {stats['patterns_detected']}") print(f" - Tâches créées: {stats['tasks_created']}") async def demo_learning_phase(self): """Phase 2 : Apprentissage et analyse.""" print("\n" + "="*60) print("🧠 PHASE 2 : APPRENTISSAGE") print("="*60) # Obtenir les tâches apprises tasks = self.learning_manager.get_all_tasks() if not tasks: print("\n⚠️ Aucune tâche apprise.") print(" Effectuez plus d'actions répétitives pendant la capture.") return None print(f"\n✅ {len(tasks)} tâche(s) apprise(s):\n") for i, task in enumerate(tasks, 1): print(f"{i}. {task['task_name']}") print(f" Mode: {task['mode']}") print(f" Observations: {task['observation_count']}") print(f" Concordance: {task['concordance_rate']:.2%}") print(f" Confiance: {task['confidence_score']:.2%}") print() return tasks async def demo_replay_phase(self, task_id: str): """ Phase 3 : Rejeu de tâche. Args: task_id: ID de la tâche à rejouer """ print("\n" + "="*60) print("🎮 PHASE 3 : REJEU DE TÂCHE") print("="*60) print(f"\nTâche: {task_id}\n") # Callback pour monitoring def on_step(step_result): status_icons = { "success": "✅", "failed": "❌", "not_found": "🔍", "pending": "⏳" } icon = status_icons.get(step_result["status"], "❓") print(f"{icon} Étape {step_result['step']}: {step_result['description']}") if step_result["status"] == "success" and "location" in step_result: loc = step_result["location"] print(f" Position: ({loc['x']}, {loc['y']}) - Confiance: {loc.get('confidence', 0):.2%}") print("⏳ Démarrage du rejeu dans 3 secondes...") print(" (Préparez l'interface si nécessaire)\n") await asyncio.sleep(3) # Rejouer avec monitoring results = await self.replay_engine.replay_task_with_monitoring( task_id, on_step_completed=on_step ) # Afficher les résultats print(f"\n📊 Résultats du rejeu:") print(f" Succès: {'✅' if results['success'] else '❌'}") success_count = sum(1 for s in results['steps'] if s['status'] == 'success') total_count = len(results['steps']) print(f" Actions réussies: {success_count}/{total_count}") return results async def run_complete_workflow(self, capture_duration: int = 30): """ Exécute le workflow complet. Args: capture_duration: Durée de la phase de capture """ print("\n" + "="*60) print("🚀 WORKFLOW COMPLET RPA VISION V2") print("="*60) print("\nCe workflow démontre :") print("1. 📹 Capture d'événements utilisateur") print("2. 🧠 Apprentissage automatique de tâches") print("3. 🎮 Rejeu intelligent avec reconnaissance visuelle") try: # Phase 1 : Capture await self.demo_capture_phase(capture_duration) # Phase 2 : Apprentissage tasks = await self.demo_learning_phase() if not tasks: return # Phase 3 : Rejeu print("\n" + "="*60) print("Voulez-vous rejouer une tâche ?") for i, task in enumerate(tasks, 1): print(f"{i}. {task['task_name']}") choice = input("\nNuméro de la tâche (ou 'n' pour passer): ").strip() if choice.lower() != 'n' and choice.isdigit(): task_index = int(choice) - 1 if 0 <= task_index < len(tasks): task_id = tasks[task_index]['task_id'] await self.demo_replay_phase(task_id) print("\n" + "="*60) print("✅ WORKFLOW TERMINÉ") print("="*60) except KeyboardInterrupt: print("\n\n⚠️ Workflow interrompu par l'utilisateur") except Exception as e: print(f"\n❌ Erreur: {e}") import traceback traceback.print_exc() async def main(): """Fonction principale.""" print("\n🎯 Démonstration RPA Vision V2") print("="*60) # Créer le workflow workflow = RPAWorkflowDemo() # Menu print("\nOptions:") print("1. Workflow complet (capture + apprentissage + rejeu)") print("2. Seulement lister les tâches existantes") print("3. Rejouer une tâche existante") print("4. Quitter") choice = input("\nVotre choix: ").strip() if choice == "1": duration = input("Durée de capture (secondes, défaut=30): ").strip() duration = int(duration) if duration.isdigit() else 30 await workflow.run_complete_workflow(duration) elif choice == "2": tasks = await workflow.demo_learning_phase() elif choice == "3": tasks = workflow.replay_engine.list_available_tasks() if not tasks: print("\n❌ Aucune tâche disponible") return print("\nTâches disponibles:") for i, task in enumerate(tasks, 1): print(f"{i}. {task['task_name']} ({task['task_id']})") choice = input("\nNuméro de la tâche: ").strip() if choice.isdigit(): task_index = int(choice) - 1 if 0 <= task_index < len(tasks): task_id = tasks[task_index]['task_id'] await workflow.demo_replay_phase(task_id) elif choice == "4": print("Au revoir!") else: print("❌ Choix invalide") if __name__ == "__main__": asyncio.run(main())