#!/usr/bin/env python3 """ RPA Vision V3 - Command Line Interface Interface unifiée pour contrôler le système RPA Vision. Usage: python cli.py [options] Commands: status - Afficher l'état du système record - Démarrer l'enregistrement stop - Arrêter l'enregistrement play - Exécuter un workflow list - Lister les workflows gpu - Gérer les ressources GPU Examples: python cli.py status python cli.py record --app "Firefox" python cli.py play my_workflow.json python cli.py gpu load-vlm """ import argparse import asyncio import json import sys from pathlib import Path from typing import Optional # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) def print_banner(): """Afficher la bannière.""" print(""" ╔════════════════════════════════════════════════════════════╗ ║ RPA Vision V3 - CLI ║ ║ 100% Vision-Based RPA System ║ ╚════════════════════════════════════════════════════════════╝ """) # ============================================================================= # Status Commands # ============================================================================= def cmd_status(args): """Afficher l'état du système.""" print("📊 État du système RPA Vision V3\n") # Check GPU print("🖥️ GPU:") try: from core.gpu import get_gpu_resource_manager manager = get_gpu_resource_manager() status = manager.get_status() print(f" Mode: {status.execution_mode.value}") print(f" VLM: {status.vlm_state.value}") print(f" CLIP: {status.clip_device}") if status.vram: print(f" VRAM: {status.vram.used_mb}/{status.vram.total_mb} MB") except Exception as e: print(f" ⚠️ Non disponible: {e}") # Check Ollama print("\n🤖 Ollama:") try: import requests response = requests.get("http://localhost:11434/api/tags", timeout=2) if response.status_code == 200: models = response.json().get('models', []) print(f" ✅ Disponible ({len(models)} modèles)") for m in models[:3]: print(f" - {m['name']}") else: print(" ❌ Non disponible") except: print(" ❌ Non disponible") # Check API Server print("\n🌐 API Server:") try: import requests response = requests.get("http://localhost:8000/api/traces/status", timeout=2) if response.status_code == 200: print(" ✅ En ligne (port 8000)") else: print(" ❌ Hors ligne") except: print(" ❌ Hors ligne") # Check Dashboard print("\n📈 Dashboard:") try: import requests response = requests.get("http://localhost:5001/", timeout=2) if response.status_code == 200: print(" ✅ En ligne (port 5001)") else: print(" ❌ Hors ligne") except: print(" ❌ Hors ligne") # List workflows print("\n📁 Workflows:") workflow_dir = Path("data/workflows") if workflow_dir.exists(): workflows = list(workflow_dir.glob("*.json")) print(f" {len(workflows)} workflow(s) disponible(s)") for w in workflows[:5]: print(f" - {w.name}") else: print(" Aucun workflow") # ============================================================================= # GPU Commands # ============================================================================= def cmd_gpu(args): """Gérer les ressources GPU.""" action = args.action async def run(): from core.gpu import get_gpu_resource_manager, ExecutionMode manager = get_gpu_resource_manager() if action == "status": status = manager.get_status() print("🖥️ GPU Resource Manager Status") print(f" Mode: {status.execution_mode.value}") print(f" VLM State: {status.vlm_state.value}") print(f" VLM Model: {status.vlm_model}") print(f" CLIP Device: {status.clip_device}") print(f" Degraded Mode: {status.degraded_mode}") if status.vram: percent = (status.vram.used_mb / status.vram.total_mb * 100) if status.vram.total_mb > 0 else 0 print(f" VRAM: {status.vram.used_mb}/{status.vram.total_mb} MB ({percent:.1f}%)") elif action == "load-vlm": print("🔄 Chargement du VLM...") success = await manager.ensure_vlm_loaded() if success: print("✅ VLM chargé") else: print("❌ Échec du chargement") elif action == "unload-vlm": print("🔄 Déchargement du VLM...") success = await manager.ensure_vlm_unloaded() if success: print("✅ VLM déchargé") else: print("❌ Échec du déchargement") elif action == "recording": print("🔄 Passage en mode RECORDING...") await manager.set_execution_mode(ExecutionMode.RECORDING) print("✅ Mode RECORDING activé (VLM chargé)") elif action == "autopilot": print("🔄 Passage en mode AUTOPILOT...") await manager.set_execution_mode(ExecutionMode.AUTOPILOT) print("✅ Mode AUTOPILOT activé (VLM déchargé)") elif action == "idle": print("🔄 Passage en mode IDLE...") await manager.set_execution_mode(ExecutionMode.IDLE) print("✅ Mode IDLE activé") else: print(f"❌ Action inconnue: {action}") print("Actions disponibles: status, load-vlm, unload-vlm, recording, autopilot, idle") asyncio.run(run()) # ============================================================================= # Workflow Commands # ============================================================================= def cmd_list(args): """Lister les workflows disponibles.""" workflow_dir = Path("data/workflows") if not workflow_dir.exists(): print("📁 Aucun workflow trouvé") return workflows = list(workflow_dir.glob("*.json")) if not workflows: print("📁 Aucun workflow trouvé") return print(f"📁 {len(workflows)} workflow(s) disponible(s):\n") for w in workflows: try: with open(w) as f: data = json.load(f) name = data.get("name", w.stem) steps = len(data.get("steps", [])) print(f" 📋 {name}") print(f" Fichier: {w.name}") print(f" Étapes: {steps}") print() except: print(f" ⚠️ {w.name} (erreur de lecture)") def cmd_play(args): """Exécuter un workflow.""" workflow_path = Path(args.workflow) if not workflow_path.exists(): # Try in data/workflows workflow_path = Path("data/workflows") / args.workflow if not workflow_path.exists(): print(f"❌ Workflow non trouvé: {args.workflow}") return print(f"▶️ Exécution du workflow: {workflow_path.name}") async def run(): try: from core.execution.execution_loop import ExecutionLoop loop = ExecutionLoop() await loop.load_workflow(str(workflow_path)) print("🔄 Démarrage...") await loop.start() print("✅ Workflow terminé") except Exception as e: print(f"❌ Erreur: {e}") asyncio.run(run()) def cmd_record(args): """Démarrer l'enregistrement.""" print("🔴 Démarrage de l'enregistrement...") print(f" Application cible: {args.app or 'Toutes'}") # TODO: Implement recording via agent_v0 print("\n💡 Pour enregistrer, utilisez:") print(" ./run.sh --agent") print(" ou") print(" python agent_v0/main.py") def cmd_stop(args): """Arrêter l'enregistrement.""" print("⏹️ Arrêt de l'enregistrement...") # TODO: Send stop signal to agent # ============================================================================= # Task Commands (Natural Language) # ============================================================================= def cmd_task(args): """Exécuter une tâche en langage naturel.""" task_description = args.description print(f"🎯 Tâche demandée: {task_description}") print() # Parse les paramètres explicites explicit_params = {} if args.param: for p in args.param: if "=" in p: key, value = p.split("=", 1) explicit_params[key] = value if explicit_params: print(f"📋 Paramètres explicites: {explicit_params}") print() # Utiliser le SemanticMatcher pour trouver le workflow try: from core.workflow import SemanticMatcher, VariableManager matcher = SemanticMatcher("data/workflows") matches = matcher.find_workflows(task_description, limit=5, min_confidence=0.2) if matches: print(f"🔍 {len(matches)} workflow(s) correspondant(s) trouvé(s):\n") for i, match in enumerate(matches): confidence_bar = "█" * int(match.confidence * 10) + "░" * (10 - int(match.confidence * 10)) print(f" {i+1}. {match.workflow_name}") print(f" Confiance: [{confidence_bar}] {match.confidence:.0%}") print(f" Raison: {match.match_reason}") if match.extracted_params: print(f" Paramètres extraits: {match.extracted_params}") print() if not args.dry_run: # Utiliser le meilleur match best_match = matches[0] print(f"▶️ Exécution de: {best_match.workflow_name}") # Combiner les paramètres extraits et explicites all_params = {**best_match.extracted_params, **explicit_params} if all_params: print(f"📋 Paramètres finaux: {all_params}") async def run(): try: # Charger le workflow with open(best_match.workflow_path, 'r') as f: workflow_data = json.load(f) # Créer le VariableManager et injecter les paramètres var_manager = VariableManager() var_manager.set_variables(all_params) # Substituer les variables dans le workflow workflow_data = var_manager.substitute_dict(workflow_data) # Vérifier les variables requises errors = var_manager.validate() if errors: print(f"⚠️ Variables manquantes:") for err in errors: print(f" - {err}") return print("🔄 Démarrage...") # TODO: Exécuter le workflow avec ExecutionLoop # Pour l'instant, afficher ce qui serait exécuté print(f" Workflow: {workflow_data.get('name', 'Unknown')}") print(f" Étapes: {len(workflow_data.get('edges', []))}") print("✅ Tâche terminée (simulation)") except Exception as e: print(f"❌ Erreur: {e}") asyncio.run(run()) else: print("❌ Aucun workflow correspondant trouvé.") print() print("💡 Pour créer ce workflow:") print(" 1. Lancez l'agent: ./run.sh --agent") print(" 2. Effectuez la tâche manuellement") print(" 3. L'agent enregistrera vos actions") print(" 4. Le workflow sera créé automatiquement") print() print("📝 Ou créez un workflow manuellement:") print(f' python cli.py workflow create "{task_description}"') except ImportError as e: print(f"⚠️ Module non disponible: {e}") print(" Utilisation du matching simple...") # Fallback au matching simple workflow_dir = Path("data/workflows") if workflow_dir.exists(): for w in workflow_dir.glob("*.json"): try: with open(w) as f: data = json.load(f) name = data.get("name", "").lower() task_lower = task_description.lower() if any(word in name for word in task_lower.split()): print(f" Trouvé: {data.get('name', w.stem)}") except: pass def cmd_ask(args): """Demander au VLM d'analyser une situation.""" question = args.question screenshot = args.screenshot print(f"🤔 Question: {question}") async def run(): try: from core.detection.ollama_client import OllamaClient from PIL import Image client = OllamaClient() if screenshot: print(f"📸 Analyse de: {screenshot}") result = client.generate(question, image_path=screenshot) else: # Capturer l'écran actuel print("📸 Capture de l'écran...") from core.capture.screen_capturer import ScreenCapturer capturer = ScreenCapturer() img = capturer.capture_screen() result = client.generate(question, image=img) if result["success"]: print(f"\n💬 Réponse:\n{result['response']}") else: print(f"❌ Erreur: {result['error']}") except Exception as e: print(f"❌ Erreur: {e}") asyncio.run(run()) # ============================================================================= # Composition Commands # ============================================================================= def cmd_chain(args): """Gérer les chaînes de workflows.""" from core.workflow import WorkflowChainer, ChainConfig, GlobalVariableManager if args.action == "create": if not args.source or not args.target: print("❌ --source et --target sont requis pour créer une chaîne") return chainer = WorkflowChainer() config = ChainConfig( source_workflow_id=args.source, target_workflow_id=args.target, variable_mapping={}, on_failure="abort" ) chainer.add_chain(config) print(f"✅ Chaîne créée: {args.source} -> {args.target}") elif args.action == "list": print("📋 Chaînes de workflows:") print(" (Fonctionnalité à implémenter avec persistance)") elif args.action == "run": if not args.chain_id: print("❌ --chain-id est requis pour exécuter une chaîne") return print(f"🚀 Exécution de la chaîne {args.chain_id}...") print(" (Fonctionnalité à implémenter)") elif args.action == "delete": if not args.chain_id: print("❌ --chain-id est requis pour supprimer une chaîne") return print(f"🗑️ Suppression de la chaîne {args.chain_id}") def cmd_subworkflow(args): """Gérer les sous-workflows.""" from core.workflow import SubWorkflowRegistry, SubWorkflowDefinition if args.action == "register": if not args.workflow or not args.name: print("❌ --workflow et --name sont requis") return registry = SubWorkflowRegistry() defn = SubWorkflowDefinition( workflow_id=args.workflow, name=args.name, input_parameters=[], output_values=[] ) registry.register(defn) print(f"✅ Sous-workflow '{args.name}' enregistré") elif args.action == "list": print("📋 Sous-workflows enregistrés:") print(" (Fonctionnalité à implémenter avec persistance)") elif args.action == "extract": print("🔧 Extraction de séquences communes...") print(" (Fonctionnalité à implémenter)") elif args.action == "delete": print("🗑️ Suppression du sous-workflow") def cmd_trigger(args): """Gérer les déclencheurs.""" from core.workflow import TriggerManager, ScheduleTrigger, FileTrigger, VisualTrigger manager = TriggerManager() if args.action == "add": if not args.type or not args.workflow: print("❌ --type et --workflow sont requis") return trigger_id = args.trigger_id or f"trigger_{args.type}_{args.workflow}" if args.type == "schedule": trigger = ScheduleTrigger( trigger_id=trigger_id, workflow_id=args.workflow, cron_expression=args.cron, interval_seconds=args.interval ) elif args.type == "file": if not args.watch_dir: print("❌ --watch-dir est requis pour un trigger file") return trigger = FileTrigger( trigger_id=trigger_id, workflow_id=args.workflow, watch_directory=args.watch_dir, file_pattern=args.pattern or "*" ) elif args.type == "visual": trigger = VisualTrigger( trigger_id=trigger_id, workflow_id=args.workflow, target_element=args.pattern or "target", check_interval_seconds=args.interval or 5 ) manager.register_trigger(trigger) print(f"✅ Trigger '{trigger_id}' ajouté") elif args.action == "list": print("📋 Triggers configurés:") for tid, trigger in manager._triggers.items(): print(f" - {tid}: {trigger.workflow_id}") elif args.action == "remove": if not args.trigger_id: print("❌ --trigger-id est requis") return manager.unregister_trigger(args.trigger_id) print(f"🗑️ Trigger '{args.trigger_id}' supprimé") elif args.action == "fire": if not args.trigger_id: print("❌ --trigger-id est requis") return try: ctx = manager.fire_trigger(args.trigger_id) print(f"🔥 Trigger '{args.trigger_id}' déclenché à {ctx.fired_at}") except ValueError as e: print(f"❌ {e}") # ============================================================================= # Main # ============================================================================= def main(): parser = argparse.ArgumentParser( description="RPA Vision V3 - Command Line Interface", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python cli.py status # Voir l'état du système python cli.py gpu status # Voir l'état GPU python cli.py gpu load-vlm # Charger le VLM python cli.py gpu recording # Passer en mode recording python cli.py list # Lister les workflows python cli.py play workflow.json # Exécuter un workflow """ ) subparsers = parser.add_subparsers(dest="command", help="Commande à exécuter") # Status parser_status = subparsers.add_parser("status", help="Afficher l'état du système") parser_status.set_defaults(func=cmd_status) # GPU parser_gpu = subparsers.add_parser("gpu", help="Gérer les ressources GPU") parser_gpu.add_argument("action", choices=["status", "load-vlm", "unload-vlm", "recording", "autopilot", "idle"], help="Action à effectuer") parser_gpu.set_defaults(func=cmd_gpu) # List parser_list = subparsers.add_parser("list", help="Lister les workflows") parser_list.set_defaults(func=cmd_list) # Play parser_play = subparsers.add_parser("play", help="Exécuter un workflow") parser_play.add_argument("workflow", help="Chemin vers le workflow JSON") parser_play.set_defaults(func=cmd_play) # Record parser_record = subparsers.add_parser("record", help="Démarrer l'enregistrement") parser_record.add_argument("--app", help="Application cible") parser_record.set_defaults(func=cmd_record) # Stop parser_stop = subparsers.add_parser("stop", help="Arrêter l'enregistrement") parser_stop.set_defaults(func=cmd_stop) # Task (natural language) parser_task = subparsers.add_parser("task", help="Exécuter une tâche en langage naturel") parser_task.add_argument("description", help="Description de la tâche (ex: 'facturer client A')") parser_task.add_argument("-p", "--param", action="append", help="Paramètre (ex: client=A)") parser_task.add_argument("--dry-run", action="store_true", help="Ne pas exécuter, juste chercher") parser_task.set_defaults(func=cmd_task) # Ask (VLM question) parser_ask = subparsers.add_parser("ask", help="Poser une question au VLM") parser_ask.add_argument("question", help="Question à poser") parser_ask.add_argument("-s", "--screenshot", help="Chemin vers un screenshot (optionnel)") parser_ask.set_defaults(func=cmd_ask) # Chain (workflow composition) parser_chain = subparsers.add_parser("chain", help="Chaîner des workflows") parser_chain.add_argument("action", choices=["create", "list", "run", "delete"], help="Action: create, list, run, delete") parser_chain.add_argument("--source", help="Workflow source (pour create)") parser_chain.add_argument("--target", help="Workflow cible (pour create)") parser_chain.add_argument("--chain-id", help="ID de la chaîne (pour run/delete)") parser_chain.set_defaults(func=cmd_chain) # Subworkflow parser_subwf = subparsers.add_parser("subworkflow", help="Gérer les sous-workflows") parser_subwf.add_argument("action", choices=["register", "list", "extract", "delete"], help="Action: register, list, extract, delete") parser_subwf.add_argument("--workflow", help="Workflow à enregistrer/extraire") parser_subwf.add_argument("--name", help="Nom du sous-workflow") parser_subwf.set_defaults(func=cmd_subworkflow) # Trigger parser_trigger = subparsers.add_parser("trigger", help="Gérer les déclencheurs") parser_trigger.add_argument("action", choices=["add", "list", "remove", "fire"], help="Action: add, list, remove, fire") parser_trigger.add_argument("--type", choices=["schedule", "file", "visual"], help="Type de trigger") parser_trigger.add_argument("--workflow", help="Workflow cible") parser_trigger.add_argument("--trigger-id", help="ID du trigger") parser_trigger.add_argument("--cron", help="Expression cron (pour schedule)") parser_trigger.add_argument("--interval", type=int, help="Intervalle en secondes") parser_trigger.add_argument("--watch-dir", help="Répertoire à surveiller (pour file)") parser_trigger.add_argument("--pattern", help="Pattern de fichier (pour file)") parser_trigger.set_defaults(func=cmd_trigger) args = parser.parse_args() if args.command is None: print_banner() parser.print_help() return args.func(args) if __name__ == "__main__": main()