#!/usr/bin/env python3 # /// script # requires-python = ">=3.10" # dependencies = ["mcp[cli]", "httpx"] # /// """ MCP Server — RPA Vision V3 Expose les endpoints de Léa comme outils sémantiques pour Claude Code. Usage (claude mcp add) : uv run /home/dom/ai/rpa_vision_v3/mcp_rpa_vision.py Env vars : RPA_API_TOKEN — token Bearer pour le streaming server (5005) RPA_STREAM_URL — défaut http://localhost:5005 RPA_VWB_URL — défaut http://localhost:5002 OLLAMA_BASE_URL — défaut http://localhost:11434 """ import json import os import httpx from mcp.server.fastmcp import FastMCP # --- Config --- STREAM = os.getenv("RPA_STREAM_URL", "http://localhost:5005") VWB = os.getenv("RPA_VWB_URL", "http://localhost:5002") OLLAMA = os.getenv("OLLAMA_BASE_URL", "http://localhost:11434") TOKEN = os.getenv("RPA_API_TOKEN", "") mcp = FastMCP("rpa-vision") def _hdrs(): return {"Authorization": f"Bearer {TOKEN}"} def _get(url: str, **params) -> dict: with httpx.Client(timeout=30) as c: r = c.get(url, headers=_hdrs(), params=params) r.raise_for_status() return r.json() def _post(url: str, data: dict | None = None) -> dict: with httpx.Client(timeout=60) as c: r = c.post(url, headers=_hdrs(), json=data or {}) r.raise_for_status() return r.json() def _vwb(url: str) -> dict: with httpx.Client(timeout=15) as c: r = c.get(url) r.raise_for_status() return r.json() def _j(obj) -> str: return json.dumps(obj, ensure_ascii=False, indent=2) # ── Workflows VWB ────────────────────────────────────────────────────────────── @mcp.tool() def list_workflows() -> str: """Liste tous les workflows VWB : id, nom, nombre d'étapes.""" data = _vwb(f"{VWB}/api/workflows/") wfs = data if isinstance(data, list) else data.get("workflows", []) return _j([ {"id": w.get("id"), "name": w.get("name"), "steps": len(w.get("steps", []))} for w in wfs ]) @mcp.tool() def get_workflow(workflow_id: str) -> str: """Retourne le détail complet d'un workflow VWB (étapes, actions, paramètres).""" return _j(_vwb(f"{VWB}/api/workflows/{workflow_id}")) # ── Sessions streaming ───────────────────────────────────────────────────────── @mcp.tool() def list_sessions() -> str: """Liste les sessions streaming actives (session_id, machine, dernière activité).""" return _j(_get(f"{STREAM}/api/v1/traces/stream/sessions")) @mcp.tool() def get_session(session_id: str) -> str: """Retourne les détails d'une session : état, heartbeat, actions récentes.""" return _j(_get(f"{STREAM}/api/v1/traces/stream/session/{session_id}")) @mcp.tool() def get_streaming_stats() -> str: """Statistiques du serveur streaming : sessions actives, replays en cours, charge.""" return _j(_get(f"{STREAM}/api/v1/traces/stream/stats")) # ── Replays ──────────────────────────────────────────────────────────────────── @mcp.tool() def list_replays() -> str: """Liste les replays actifs ou récents avec leur état (running/paused/done/error).""" return _j(_get(f"{STREAM}/api/v1/traces/stream/replays")) @mcp.tool() def get_replay_state(replay_id: str) -> str: """Retourne l'état complet d'un replay : statut, action courante, variables runtime, progression.""" return _j(_get(f"{STREAM}/api/v1/traces/stream/replay/{replay_id}")) @mcp.tool() def start_replay(workflow_id: str, session_id: str) -> str: """Lance un replay d'un workflow sur une session active. Retourne le replay_id.""" return _j(_post(f"{STREAM}/api/v1/traces/stream/replay", { "workflow_id": workflow_id, "session_id": session_id, })) @mcp.tool() def resume_replay(replay_id: str) -> str: """Reprend un replay en état paused_need_help (équivalent bouton Continuer de Léa).""" return _j(_post(f"{STREAM}/api/v1/traces/stream/replay/{replay_id}/resume")) @mcp.tool() def list_stream_workflows() -> str: """Liste les workflows compilés côté streaming server (peut différer du VWB).""" return _j(_get(f"{STREAM}/api/v1/traces/stream/workflows")) # ── Ollama ───────────────────────────────────────────────────────────────────── @mcp.tool() def ollama_list_models() -> str: """Liste les modèles Ollama disponibles sur cette machine.""" with httpx.Client(timeout=10) as c: r = c.get(f"{OLLAMA}/api/tags") r.raise_for_status() return _j([m["name"] for m in r.json().get("models", [])]) @mcp.tool() def ollama_query(prompt: str, model: str = "qwen2.5:7b") -> str: """Envoie un prompt à un modèle Ollama local. Utile pour tester t2a_decision ou debug LLM.""" with httpx.Client(timeout=120) as c: r = c.post(f"{OLLAMA}/api/generate", json={ "model": model, "prompt": prompt, "stream": False, }) r.raise_for_status() return r.json().get("response", "") @mcp.tool() def ollama_chat(messages: list, model: str = "qwen2.5:7b") -> str: """Chat avec un modèle Ollama. messages = [{"role": "user", "content": "..."}]""" with httpx.Client(timeout=120) as c: r = c.post(f"{OLLAMA}/api/chat", json={ "model": model, "messages": messages, "stream": False, }) r.raise_for_status() return r.json().get("message", {}).get("content", "") if __name__ == "__main__": mcp.run()