Files
rpa_vision_v3/cli.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

661 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
RPA Vision V3 - Command Line Interface
Interface unifiée pour contrôler le système RPA Vision.
Usage:
python cli.py <command> [options]
Commands:
status - Afficher l'état du système
record - Démarrer l'enregistrement
stop - Arrêter l'enregistrement
play <workflow> - Exécuter un workflow
list - Lister les workflows
gpu - Gérer les ressources GPU
Examples:
python cli.py status
python cli.py record --app "Firefox"
python cli.py play my_workflow.json
python cli.py gpu load-vlm
"""
import argparse
import asyncio
import json
import sys
from pathlib import Path
from typing import Optional
# Add project root to path
sys.path.insert(0, str(Path(__file__).parent))
def print_banner():
"""Afficher la bannière."""
print("""
╔════════════════════════════════════════════════════════════╗
║ RPA Vision V3 - CLI ║
║ 100% Vision-Based RPA System ║
╚════════════════════════════════════════════════════════════╝
""")
# =============================================================================
# Status Commands
# =============================================================================
def cmd_status(args):
"""Afficher l'état du système."""
print("📊 État du système RPA Vision V3\n")
# Check GPU
print("🖥️ GPU:")
try:
from core.gpu import get_gpu_resource_manager
manager = get_gpu_resource_manager()
status = manager.get_status()
print(f" Mode: {status.execution_mode.value}")
print(f" VLM: {status.vlm_state.value}")
print(f" CLIP: {status.clip_device}")
if status.vram:
print(f" VRAM: {status.vram.used_mb}/{status.vram.total_mb} MB")
except Exception as e:
print(f" ⚠️ Non disponible: {e}")
# Check Ollama
print("\n🤖 Ollama:")
try:
import requests
response = requests.get("http://localhost:11434/api/tags", timeout=2)
if response.status_code == 200:
models = response.json().get('models', [])
print(f" ✅ Disponible ({len(models)} modèles)")
for m in models[:3]:
print(f" - {m['name']}")
else:
print(" ❌ Non disponible")
except:
print(" ❌ Non disponible")
# Check API Server
print("\n🌐 API Server:")
try:
import requests
response = requests.get("http://localhost:8000/api/traces/status", timeout=2)
if response.status_code == 200:
print(" ✅ En ligne (port 8000)")
else:
print(" ❌ Hors ligne")
except:
print(" ❌ Hors ligne")
# Check Dashboard
print("\n📈 Dashboard:")
try:
import requests
response = requests.get("http://localhost:5001/", timeout=2)
if response.status_code == 200:
print(" ✅ En ligne (port 5001)")
else:
print(" ❌ Hors ligne")
except:
print(" ❌ Hors ligne")
# List workflows
print("\n📁 Workflows:")
workflow_dir = Path("data/workflows")
if workflow_dir.exists():
workflows = list(workflow_dir.glob("*.json"))
print(f" {len(workflows)} workflow(s) disponible(s)")
for w in workflows[:5]:
print(f" - {w.name}")
else:
print(" Aucun workflow")
# =============================================================================
# GPU Commands
# =============================================================================
def cmd_gpu(args):
"""Gérer les ressources GPU."""
action = args.action
async def run():
from core.gpu import get_gpu_resource_manager, ExecutionMode
manager = get_gpu_resource_manager()
if action == "status":
status = manager.get_status()
print("🖥️ GPU Resource Manager Status")
print(f" Mode: {status.execution_mode.value}")
print(f" VLM State: {status.vlm_state.value}")
print(f" VLM Model: {status.vlm_model}")
print(f" CLIP Device: {status.clip_device}")
print(f" Degraded Mode: {status.degraded_mode}")
if status.vram:
percent = (status.vram.used_mb / status.vram.total_mb * 100) if status.vram.total_mb > 0 else 0
print(f" VRAM: {status.vram.used_mb}/{status.vram.total_mb} MB ({percent:.1f}%)")
elif action == "load-vlm":
print("🔄 Chargement du VLM...")
success = await manager.ensure_vlm_loaded()
if success:
print("✅ VLM chargé")
else:
print("❌ Échec du chargement")
elif action == "unload-vlm":
print("🔄 Déchargement du VLM...")
success = await manager.ensure_vlm_unloaded()
if success:
print("✅ VLM déchargé")
else:
print("❌ Échec du déchargement")
elif action == "recording":
print("🔄 Passage en mode RECORDING...")
await manager.set_execution_mode(ExecutionMode.RECORDING)
print("✅ Mode RECORDING activé (VLM chargé)")
elif action == "autopilot":
print("🔄 Passage en mode AUTOPILOT...")
await manager.set_execution_mode(ExecutionMode.AUTOPILOT)
print("✅ Mode AUTOPILOT activé (VLM déchargé)")
elif action == "idle":
print("🔄 Passage en mode IDLE...")
await manager.set_execution_mode(ExecutionMode.IDLE)
print("✅ Mode IDLE activé")
else:
print(f"❌ Action inconnue: {action}")
print("Actions disponibles: status, load-vlm, unload-vlm, recording, autopilot, idle")
asyncio.run(run())
# =============================================================================
# Workflow Commands
# =============================================================================
def cmd_list(args):
"""Lister les workflows disponibles."""
workflow_dir = Path("data/workflows")
if not workflow_dir.exists():
print("📁 Aucun workflow trouvé")
return
workflows = list(workflow_dir.glob("*.json"))
if not workflows:
print("📁 Aucun workflow trouvé")
return
print(f"📁 {len(workflows)} workflow(s) disponible(s):\n")
for w in workflows:
try:
with open(w) as f:
data = json.load(f)
name = data.get("name", w.stem)
steps = len(data.get("steps", []))
print(f" 📋 {name}")
print(f" Fichier: {w.name}")
print(f" Étapes: {steps}")
print()
except:
print(f" ⚠️ {w.name} (erreur de lecture)")
def cmd_play(args):
"""Exécuter un workflow."""
workflow_path = Path(args.workflow)
if not workflow_path.exists():
# Try in data/workflows
workflow_path = Path("data/workflows") / args.workflow
if not workflow_path.exists():
print(f"❌ Workflow non trouvé: {args.workflow}")
return
print(f"▶️ Exécution du workflow: {workflow_path.name}")
async def run():
try:
from core.execution.execution_loop import ExecutionLoop
loop = ExecutionLoop()
await loop.load_workflow(str(workflow_path))
print("🔄 Démarrage...")
await loop.start()
print("✅ Workflow terminé")
except Exception as e:
print(f"❌ Erreur: {e}")
asyncio.run(run())
def cmd_record(args):
"""Démarrer l'enregistrement."""
print("🔴 Démarrage de l'enregistrement...")
print(f" Application cible: {args.app or 'Toutes'}")
# TODO: Implement recording via agent_v0
print("\n💡 Pour enregistrer, utilisez:")
print(" ./run.sh --agent")
print(" ou")
print(" python agent_v0/main.py")
def cmd_stop(args):
"""Arrêter l'enregistrement."""
print("⏹️ Arrêt de l'enregistrement...")
# TODO: Send stop signal to agent
# =============================================================================
# Task Commands (Natural Language)
# =============================================================================
def cmd_task(args):
"""Exécuter une tâche en langage naturel."""
task_description = args.description
print(f"🎯 Tâche demandée: {task_description}")
print()
# Parse les paramètres explicites
explicit_params = {}
if args.param:
for p in args.param:
if "=" in p:
key, value = p.split("=", 1)
explicit_params[key] = value
if explicit_params:
print(f"📋 Paramètres explicites: {explicit_params}")
print()
# Utiliser le SemanticMatcher pour trouver le workflow
try:
from core.workflow import SemanticMatcher, VariableManager
matcher = SemanticMatcher("data/workflows")
matches = matcher.find_workflows(task_description, limit=5, min_confidence=0.2)
if matches:
print(f"🔍 {len(matches)} workflow(s) correspondant(s) trouvé(s):\n")
for i, match in enumerate(matches):
confidence_bar = "" * int(match.confidence * 10) + "" * (10 - int(match.confidence * 10))
print(f" {i+1}. {match.workflow_name}")
print(f" Confiance: [{confidence_bar}] {match.confidence:.0%}")
print(f" Raison: {match.match_reason}")
if match.extracted_params:
print(f" Paramètres extraits: {match.extracted_params}")
print()
if not args.dry_run:
# Utiliser le meilleur match
best_match = matches[0]
print(f"▶️ Exécution de: {best_match.workflow_name}")
# Combiner les paramètres extraits et explicites
all_params = {**best_match.extracted_params, **explicit_params}
if all_params:
print(f"📋 Paramètres finaux: {all_params}")
async def run():
try:
# Charger le workflow
with open(best_match.workflow_path, 'r') as f:
workflow_data = json.load(f)
# Créer le VariableManager et injecter les paramètres
var_manager = VariableManager()
var_manager.set_variables(all_params)
# Substituer les variables dans le workflow
workflow_data = var_manager.substitute_dict(workflow_data)
# Vérifier les variables requises
errors = var_manager.validate()
if errors:
print(f"⚠️ Variables manquantes:")
for err in errors:
print(f" - {err}")
return
print("🔄 Démarrage...")
# TODO: Exécuter le workflow avec ExecutionLoop
# Pour l'instant, afficher ce qui serait exécuté
print(f" Workflow: {workflow_data.get('name', 'Unknown')}")
print(f" Étapes: {len(workflow_data.get('edges', []))}")
print("✅ Tâche terminée (simulation)")
except Exception as e:
print(f"❌ Erreur: {e}")
asyncio.run(run())
else:
print("❌ Aucun workflow correspondant trouvé.")
print()
print("💡 Pour créer ce workflow:")
print(" 1. Lancez l'agent: ./run.sh --agent")
print(" 2. Effectuez la tâche manuellement")
print(" 3. L'agent enregistrera vos actions")
print(" 4. Le workflow sera créé automatiquement")
print()
print("📝 Ou créez un workflow manuellement:")
print(f' python cli.py workflow create "{task_description}"')
except ImportError as e:
print(f"⚠️ Module non disponible: {e}")
print(" Utilisation du matching simple...")
# Fallback au matching simple
workflow_dir = Path("data/workflows")
if workflow_dir.exists():
for w in workflow_dir.glob("*.json"):
try:
with open(w) as f:
data = json.load(f)
name = data.get("name", "").lower()
task_lower = task_description.lower()
if any(word in name for word in task_lower.split()):
print(f" Trouvé: {data.get('name', w.stem)}")
except:
pass
def cmd_ask(args):
"""Demander au VLM d'analyser une situation."""
question = args.question
screenshot = args.screenshot
print(f"🤔 Question: {question}")
async def run():
try:
from core.detection.ollama_client import OllamaClient
from PIL import Image
client = OllamaClient()
if screenshot:
print(f"📸 Analyse de: {screenshot}")
result = client.generate(question, image_path=screenshot)
else:
# Capturer l'écran actuel
print("📸 Capture de l'écran...")
from core.capture.screen_capturer import ScreenCapturer
capturer = ScreenCapturer()
img = capturer.capture_screen()
result = client.generate(question, image=img)
if result["success"]:
print(f"\n💬 Réponse:\n{result['response']}")
else:
print(f"❌ Erreur: {result['error']}")
except Exception as e:
print(f"❌ Erreur: {e}")
asyncio.run(run())
# =============================================================================
# Composition Commands
# =============================================================================
def cmd_chain(args):
"""Gérer les chaînes de workflows."""
from core.workflow import WorkflowChainer, ChainConfig, GlobalVariableManager
if args.action == "create":
if not args.source or not args.target:
print("❌ --source et --target sont requis pour créer une chaîne")
return
chainer = WorkflowChainer()
config = ChainConfig(
source_workflow_id=args.source,
target_workflow_id=args.target,
variable_mapping={},
on_failure="abort"
)
chainer.add_chain(config)
print(f"✅ Chaîne créée: {args.source} -> {args.target}")
elif args.action == "list":
print("📋 Chaînes de workflows:")
print(" (Fonctionnalité à implémenter avec persistance)")
elif args.action == "run":
if not args.chain_id:
print("❌ --chain-id est requis pour exécuter une chaîne")
return
print(f"🚀 Exécution de la chaîne {args.chain_id}...")
print(" (Fonctionnalité à implémenter)")
elif args.action == "delete":
if not args.chain_id:
print("❌ --chain-id est requis pour supprimer une chaîne")
return
print(f"🗑️ Suppression de la chaîne {args.chain_id}")
def cmd_subworkflow(args):
"""Gérer les sous-workflows."""
from core.workflow import SubWorkflowRegistry, SubWorkflowDefinition
if args.action == "register":
if not args.workflow or not args.name:
print("❌ --workflow et --name sont requis")
return
registry = SubWorkflowRegistry()
defn = SubWorkflowDefinition(
workflow_id=args.workflow,
name=args.name,
input_parameters=[],
output_values=[]
)
registry.register(defn)
print(f"✅ Sous-workflow '{args.name}' enregistré")
elif args.action == "list":
print("📋 Sous-workflows enregistrés:")
print(" (Fonctionnalité à implémenter avec persistance)")
elif args.action == "extract":
print("🔧 Extraction de séquences communes...")
print(" (Fonctionnalité à implémenter)")
elif args.action == "delete":
print("🗑️ Suppression du sous-workflow")
def cmd_trigger(args):
"""Gérer les déclencheurs."""
from core.workflow import TriggerManager, ScheduleTrigger, FileTrigger, VisualTrigger
manager = TriggerManager()
if args.action == "add":
if not args.type or not args.workflow:
print("❌ --type et --workflow sont requis")
return
trigger_id = args.trigger_id or f"trigger_{args.type}_{args.workflow}"
if args.type == "schedule":
trigger = ScheduleTrigger(
trigger_id=trigger_id,
workflow_id=args.workflow,
cron_expression=args.cron,
interval_seconds=args.interval
)
elif args.type == "file":
if not args.watch_dir:
print("❌ --watch-dir est requis pour un trigger file")
return
trigger = FileTrigger(
trigger_id=trigger_id,
workflow_id=args.workflow,
watch_directory=args.watch_dir,
file_pattern=args.pattern or "*"
)
elif args.type == "visual":
trigger = VisualTrigger(
trigger_id=trigger_id,
workflow_id=args.workflow,
target_element=args.pattern or "target",
check_interval_seconds=args.interval or 5
)
manager.register_trigger(trigger)
print(f"✅ Trigger '{trigger_id}' ajouté")
elif args.action == "list":
print("📋 Triggers configurés:")
for tid, trigger in manager._triggers.items():
print(f" - {tid}: {trigger.workflow_id}")
elif args.action == "remove":
if not args.trigger_id:
print("❌ --trigger-id est requis")
return
manager.unregister_trigger(args.trigger_id)
print(f"🗑️ Trigger '{args.trigger_id}' supprimé")
elif args.action == "fire":
if not args.trigger_id:
print("❌ --trigger-id est requis")
return
try:
ctx = manager.fire_trigger(args.trigger_id)
print(f"🔥 Trigger '{args.trigger_id}' déclenché à {ctx.fired_at}")
except ValueError as e:
print(f"{e}")
# =============================================================================
# Main
# =============================================================================
def main():
parser = argparse.ArgumentParser(
description="RPA Vision V3 - Command Line Interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python cli.py status # Voir l'état du système
python cli.py gpu status # Voir l'état GPU
python cli.py gpu load-vlm # Charger le VLM
python cli.py gpu recording # Passer en mode recording
python cli.py list # Lister les workflows
python cli.py play workflow.json # Exécuter un workflow
"""
)
subparsers = parser.add_subparsers(dest="command", help="Commande à exécuter")
# Status
parser_status = subparsers.add_parser("status", help="Afficher l'état du système")
parser_status.set_defaults(func=cmd_status)
# GPU
parser_gpu = subparsers.add_parser("gpu", help="Gérer les ressources GPU")
parser_gpu.add_argument("action", choices=["status", "load-vlm", "unload-vlm", "recording", "autopilot", "idle"],
help="Action à effectuer")
parser_gpu.set_defaults(func=cmd_gpu)
# List
parser_list = subparsers.add_parser("list", help="Lister les workflows")
parser_list.set_defaults(func=cmd_list)
# Play
parser_play = subparsers.add_parser("play", help="Exécuter un workflow")
parser_play.add_argument("workflow", help="Chemin vers le workflow JSON")
parser_play.set_defaults(func=cmd_play)
# Record
parser_record = subparsers.add_parser("record", help="Démarrer l'enregistrement")
parser_record.add_argument("--app", help="Application cible")
parser_record.set_defaults(func=cmd_record)
# Stop
parser_stop = subparsers.add_parser("stop", help="Arrêter l'enregistrement")
parser_stop.set_defaults(func=cmd_stop)
# Task (natural language)
parser_task = subparsers.add_parser("task", help="Exécuter une tâche en langage naturel")
parser_task.add_argument("description", help="Description de la tâche (ex: 'facturer client A')")
parser_task.add_argument("-p", "--param", action="append", help="Paramètre (ex: client=A)")
parser_task.add_argument("--dry-run", action="store_true", help="Ne pas exécuter, juste chercher")
parser_task.set_defaults(func=cmd_task)
# Ask (VLM question)
parser_ask = subparsers.add_parser("ask", help="Poser une question au VLM")
parser_ask.add_argument("question", help="Question à poser")
parser_ask.add_argument("-s", "--screenshot", help="Chemin vers un screenshot (optionnel)")
parser_ask.set_defaults(func=cmd_ask)
# Chain (workflow composition)
parser_chain = subparsers.add_parser("chain", help="Chaîner des workflows")
parser_chain.add_argument("action", choices=["create", "list", "run", "delete"],
help="Action: create, list, run, delete")
parser_chain.add_argument("--source", help="Workflow source (pour create)")
parser_chain.add_argument("--target", help="Workflow cible (pour create)")
parser_chain.add_argument("--chain-id", help="ID de la chaîne (pour run/delete)")
parser_chain.set_defaults(func=cmd_chain)
# Subworkflow
parser_subwf = subparsers.add_parser("subworkflow", help="Gérer les sous-workflows")
parser_subwf.add_argument("action", choices=["register", "list", "extract", "delete"],
help="Action: register, list, extract, delete")
parser_subwf.add_argument("--workflow", help="Workflow à enregistrer/extraire")
parser_subwf.add_argument("--name", help="Nom du sous-workflow")
parser_subwf.set_defaults(func=cmd_subworkflow)
# Trigger
parser_trigger = subparsers.add_parser("trigger", help="Gérer les déclencheurs")
parser_trigger.add_argument("action", choices=["add", "list", "remove", "fire"],
help="Action: add, list, remove, fire")
parser_trigger.add_argument("--type", choices=["schedule", "file", "visual"],
help="Type de trigger")
parser_trigger.add_argument("--workflow", help="Workflow cible")
parser_trigger.add_argument("--trigger-id", help="ID du trigger")
parser_trigger.add_argument("--cron", help="Expression cron (pour schedule)")
parser_trigger.add_argument("--interval", type=int, help="Intervalle en secondes")
parser_trigger.add_argument("--watch-dir", help="Répertoire à surveiller (pour file)")
parser_trigger.add_argument("--pattern", help="Pattern de fichier (pour file)")
parser_trigger.set_defaults(func=cmd_trigger)
args = parser.parse_args()
if args.command is None:
print_banner()
parser.print_help()
return
args.func(args)
if __name__ == "__main__":
main()