#!/usr/bin/env python3 """ Démonstration de l'automatisation RPA Vision V3 Ce script montre comment créer des chaînes et triggers automatiques. """ import sys from pathlib import Path sys.path.insert(0, '.') from core.monitoring.chain_manager import ChainManager from core.monitoring.trigger_manager import TriggerManager # Initialiser les managers print("🚀 Initialisation...") cm = ChainManager(Path('data/chains')) tm = TriggerManager(Path('data/triggers')) print("\n" + "="*60) print("DÉMONSTRATION : Automatisation RPA Vision V3") print("="*60) # 1. Créer une chaîne de test print("\n📋 1. Création d'une chaîne de workflows...") try: chain = cm.create_chain( name="Demo Backup Chain", workflows=["wf_export", "wf_compress", "wf_upload"] ) print(f" ✅ Chaîne créée: {chain.chain_id}") print(f" 📝 Nom: {chain.name}") print(f" 🔄 Workflows: {' → '.join(chain.workflows)}") except Exception as e: print(f" ⚠️ Erreur: {e}") # 2. Créer un trigger schedule print("\n⏰ 2. Création d'un trigger schedule...") try: trigger_schedule = tm.create_trigger( trigger_type="schedule", workflow_id=chain.chain_id, config={ "interval_seconds": 300 # Toutes les 5 minutes } ) print(f" ✅ Trigger créé: {trigger_schedule.trigger_id}") print(f" ⏱️ Intervalle: 300 secondes (5 minutes)") print(f" 🎯 Cible: {trigger_schedule.workflow_id}") print(f" 🟢 Activé: {trigger_schedule.enabled}") except Exception as e: print(f" ⚠️ Erreur: {e}") # 3. Créer un trigger file print("\n📁 3. Création d'un trigger file...") try: trigger_file = tm.create_trigger( trigger_type="file", workflow_id="wf_process_invoice", config={ "watch_directory": "/tmp/rpa_test", "file_pattern": "*.txt" } ) print(f" ✅ Trigger créé: {trigger_file.trigger_id}") print(f" 📂 Répertoire: /tmp/rpa_test") print(f" 🔍 Pattern: *.txt") print(f" 🎯 Workflow: {trigger_file.workflow_id}") except Exception as e: print(f" ⚠️ Erreur: {e}") # 4. Lister toutes les chaînes print("\n📊 4. Chaînes configurées:") chains = cm.list_chains() for c in chains: print(f" • {c.name} ({c.chain_id})") print(f" Workflows: {len(c.workflows)}") print(f" Taux de succès: {c.success_rate:.1f}%") # 5. Lister tous les triggers print("\n⚡ 5. Triggers configurés:") triggers = tm.list_triggers() for t in triggers: status = "🟢 Activé" if t.enabled else "🔴 Désactivé" print(f" • {t.trigger_type.upper()} - {status}") print(f" ID: {t.trigger_id}") print(f" Cible: {t.workflow_id}") print(f" Déclenchements: {t.fire_count}") print("\n" + "="*60) print("✨ AUTOMATISATION CONFIGURÉE") print("="*60) print("\n📌 Prochaines étapes:") print(" 1. Le scheduler tourne en arrière-plan") print(" 2. Les triggers schedule se déclenchent automatiquement") print(" 3. Les triggers file surveillent les répertoires") print(" 4. Les chaînes s'exécutent séquentiellement") print("\n🌐 Interface admin: http://localhost:5001") print(" → Onglet 'Chaînes' pour voir les chaînes") print(" → Onglet 'Déclencheurs' pour gérer les triggers") print(" → Onglet 'Métriques' pour voir le statut du scheduler") print("\n🎉 Tout est prêt pour l'automatisation !")