Files
rpa_vision_v3/tools/lea_healthcheck.py

402 lines
15 KiB
Python

#!/usr/bin/env python3
"""Read-only healthcheck for the Lea demo stack.
This script does not start, stop, restart, delete, or restore anything.
It is intended as a daily proof artifact before the 2026-06-01 demo.
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import shlex
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
REPO_ROOT = Path(__file__).resolve().parents[1]
OLLAMA_MODELS_DIR = Path("/var/lib/ollama/.ollama/models")
CRITICAL_MODELS = (
"qwen2.5vl:7b-rpa",
"t2a-gemma3-27b:latest",
"t2a-gemma3-27b-q4:latest",
"thiagomoraes/medgemma-27b-it:Q4_K_S",
)
CRITICAL_BLOBS = {
"t2a-gemma3-27b:latest": "sha256-2f2509e30b0d07db517b82e62404194ef355846f08ac287775ff363693086818",
"t2a-gemma3-27b-q4:latest": "sha256-0139f42273d53348fa0d24daae016b7231e1310258bbbaa7e38a1af703217c1a",
"thiagomoraes/medgemma-27b-it:Q4_K_S": "sha256-7cb6ff10942c8ccf370e274daafaf56da3fff318f40a355df331d8783c6c11f3",
}
def run_command(args: list[str], timeout: float = 5.0) -> tuple[int, str, str]:
try:
proc = subprocess.run(
args,
cwd=REPO_ROOT,
text=True,
encoding="utf-8",
errors="replace",
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=False,
)
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
except FileNotFoundError as exc:
return 127, "", str(exc)
except subprocess.TimeoutExpired as exc:
stdout = (exc.stdout or "").strip() if isinstance(exc.stdout, str) else ""
stderr = (exc.stderr or "").strip() if isinstance(exc.stderr, str) else ""
return 124, stdout, stderr or f"timeout after {timeout}s"
def http_json(url: str, timeout: float = 2.0) -> tuple[bool, Any, str]:
try:
req = urllib.request.Request(url, headers={"User-Agent": "lea-healthcheck/1.0"})
with urllib.request.urlopen(req, timeout=timeout) as response:
body = response.read().decode("utf-8", errors="replace")
if response.status >= 400:
return False, None, f"http {response.status}: {body[:300]}"
return True, json.loads(body), ""
except json.JSONDecodeError as exc:
return False, None, f"invalid json: {exc}"
except (urllib.error.URLError, TimeoutError, OSError) as exc:
return False, None, str(exc)
def tcp_open(host: str, port: int, timeout: float = 1.0) -> tuple[bool, str]:
try:
with socket.create_connection((host, port), timeout=timeout):
return True, ""
except OSError as exc:
return False, str(exc)
def add_check(
checks: list[dict[str, Any]],
name: str,
status: str,
summary: str,
details: Any | None = None,
) -> None:
checks.append(
{
"name": name,
"status": status,
"summary": summary,
"details": details,
}
)
def check_systemd(checks: list[dict[str, Any]]) -> None:
for unit, required in (
("rpa-streaming.service", True),
("rpa-agent-chat.service", False),
):
code, stdout, stderr = run_command(["systemctl", "--user", "is-active", unit])
if code == 0 and stdout == "active":
add_check(checks, f"systemd:{unit}", "ok", "active")
elif required:
add_check(
checks,
f"systemd:{unit}",
"fail",
stdout or stderr or f"is-active returned {code}",
)
else:
add_check(
checks,
f"systemd:{unit}",
"warn",
stdout or stderr or "inactive optional service",
{"note": "5004 narration bus is optional, but should be fixed before demo if enabled on Windows."},
)
def check_ports(checks: list[dict[str, Any]], host: str) -> None:
for port, name, required in (
(5005, "streaming-http", True),
(11434, "ollama-api", True),
(5004, "feedbackbus-socketio", False),
):
ok, error = tcp_open(host, port)
if ok:
add_check(checks, f"tcp:{name}:{port}", "ok", f"{host}:{port} accepts TCP")
elif required:
add_check(checks, f"tcp:{name}:{port}", "fail", f"{host}:{port} closed: {error}")
else:
add_check(checks, f"tcp:{name}:{port}", "warn", f"{host}:{port} closed: {error}")
def check_http_services(checks: list[dict[str, Any]]) -> None:
ok, data, error = http_json("http://127.0.0.1:5005/health")
if ok and isinstance(data, dict) and data.get("status") == "healthy":
add_check(checks, "http:rpa-streaming:/health", "ok", "healthy", data)
else:
add_check(checks, "http:rpa-streaming:/health", "fail", error or "unexpected health response", data)
ok, data, error = http_json("http://127.0.0.1:5004/api/status")
if ok and isinstance(data, dict) and data.get("status") == "online":
add_check(checks, "http:feedbackbus:/api/status", "ok", "online", data)
elif ok:
add_check(checks, "http:feedbackbus:/api/status", "warn", "unexpected status response", data)
else:
add_check(checks, "http:feedbackbus:/api/status", "warn", error or "not responding")
def check_ollama(checks: list[dict[str, Any]]) -> None:
ok, tags_data, error = http_json("http://127.0.0.1:11434/api/tags", timeout=4.0)
if not ok or not isinstance(tags_data, dict):
add_check(checks, "ollama:tags", "fail", error or "cannot read /api/tags")
return
models = tags_data.get("models") or []
names = {entry.get("name") or entry.get("model") for entry in models if isinstance(entry, dict)}
missing = [name for name in CRITICAL_MODELS if name not in names]
if missing:
add_check(checks, "ollama:critical-tags", "fail", f"missing critical tags: {', '.join(missing)}")
else:
add_check(checks, "ollama:critical-tags", "ok", f"{len(CRITICAL_MODELS)} critical tags present")
ok, ps_data, error = http_json("http://127.0.0.1:11434/api/ps", timeout=4.0)
if ok and isinstance(ps_data, dict):
loaded = ps_data.get("models") or []
vlm = next(
(entry for entry in loaded if isinstance(entry, dict) and entry.get("name") == "qwen2.5vl:7b-rpa"),
None,
)
if vlm:
add_check(
checks,
"ollama:resident-vlm",
"ok",
"qwen2.5vl:7b-rpa resident",
{
"context_length": vlm.get("context_length"),
"size": vlm.get("size"),
"size_vram": vlm.get("size_vram"),
"expires_at": vlm.get("expires_at"),
},
)
else:
add_check(checks, "ollama:resident-vlm", "warn", "qwen2.5vl:7b-rpa is not currently resident", loaded)
else:
add_check(checks, "ollama:ps", "warn", error or "cannot read /api/ps")
def check_model_store(checks: list[dict[str, Any]]) -> None:
manifests_dir = OLLAMA_MODELS_DIR / "manifests"
blobs_dir = OLLAMA_MODELS_DIR / "blobs"
if not OLLAMA_MODELS_DIR.exists():
add_check(checks, "ollama:store", "fail", f"missing {OLLAMA_MODELS_DIR}")
return
manifest_count = sum(1 for path in manifests_dir.rglob("*") if path.is_file()) if manifests_dir.exists() else 0
blob_count = sum(1 for path in blobs_dir.iterdir() if path.is_file()) if blobs_dir.exists() else 0
add_check(
checks,
"ollama:store-counts",
"ok" if manifest_count >= 38 and blob_count >= 100 else "warn",
f"{manifest_count} manifests, {blob_count} blobs",
{"path": str(OLLAMA_MODELS_DIR)},
)
missing_blobs = []
for model, blob_name in CRITICAL_BLOBS.items():
blob_path = blobs_dir / blob_name
if not blob_path.exists():
missing_blobs.append({"model": model, "blob": blob_name})
if missing_blobs:
add_check(checks, "ollama:critical-blobs", "fail", "missing critical blobs", missing_blobs)
else:
add_check(checks, "ollama:critical-blobs", "ok", f"{len(CRITICAL_BLOBS)} critical blobs present")
def check_windows(checks: list[dict[str, Any]], host: str, user: str, ssh_command: str) -> None:
powershell = r"""
$ErrorActionPreference = "SilentlyContinue"
$task = schtasks /query /tn LeaInteractive /fo LIST /v | Out-String
$taskState = $null
try {
$taskState = (Get-ScheduledTask -TaskName 'LeaInteractive').State.ToString()
} catch {
$taskState = $null
}
$procs = Get-CimInstance Win32_Process -Filter "name = 'pythonw.exe' or name = 'python.exe'" |
Where-Object { $_.CommandLine -like '*run_agent_v1.py*' } |
Select-Object ProcessId,ParentProcessId,CommandLine
$lock = $null
if (Test-Path 'C:\rpa_vision\lea_agent.lock') {
$lock = (Get-Content 'C:\rpa_vision\lea_agent.lock' -Raw).Trim()
}
[pscustomobject]@{
lea_feedback_bus_user = [Environment]::GetEnvironmentVariable('LEA_FEEDBACK_BUS', 'User')
lea_feedback_bus_machine = [Environment]::GetEnvironmentVariable('LEA_FEEDBACK_BUS', 'Machine')
rpa_capture_bind_user = [Environment]::GetEnvironmentVariable('RPA_CAPTURE_BIND', 'User')
task_state = $taskState
task_running = ($taskState -eq 'Running')
task_raw = $task
agent_processes = @($procs)
lock_pid = $lock
} | ConvertTo-Json -Compress -Depth 5
""".strip()
command_parts = shlex.split(ssh_command)
if not command_parts:
add_check(checks, "windows:ssh", "skip", "empty ssh command")
return
target = f"{user}@{host}"
args = command_parts + ["-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5"]
if "sshpass" not in command_parts[0]:
args += ["-o", "BatchMode=yes"]
encoded = base64.b64encode(powershell.encode("utf-16le")).decode("ascii")
args += [target, "powershell", "-NoProfile", "-EncodedCommand", encoded]
code, stdout, stderr = run_command(args, timeout=12.0)
if code != 0:
add_check(checks, "windows:ssh", "warn", stderr or stdout or f"ssh returned {code}")
return
try:
data = json.loads(stdout)
except json.JSONDecodeError as exc:
add_check(checks, "windows:ssh", "warn", f"invalid powershell json: {exc}", stdout[:1000])
return
processes = data.get("agent_processes") or []
if isinstance(processes, dict):
processes = [processes]
process_ids = {
int(proc.get("ProcessId"))
for proc in processes
if proc.get("ProcessId") is not None
}
process_roots = [
proc for proc in processes
if proc.get("ParentProcessId") is None
or int(proc.get("ParentProcessId")) not in process_ids
]
if processes and not process_roots:
process_roots = processes
instance_count = len(process_roots)
task_running = bool(data.get("task_running"))
task_state = data.get("task_state")
if task_running:
task_status = "ok"
task_summary = f"task state={task_state!r}"
elif instance_count:
task_status = "warn"
task_summary = f"task state={task_state!r}, but {instance_count} agent instance tree(s) are alive"
else:
task_status = "fail"
task_summary = f"task state={task_state!r}"
add_check(checks, "windows:ssh", "ok", f"reachable as {target}")
add_check(checks, "windows:LeaInteractive", task_status, task_summary)
add_check(
checks,
"windows:agent-process",
"ok" if instance_count == 1 else ("warn" if instance_count > 1 else "fail"),
f"{instance_count} Lea instance tree(s), {len(processes)} run_agent_v1.py process(es)",
{
"roots": process_roots,
"processes": processes,
"note": "pythonw.exe from a venv can spawn a child pythonw.exe; count root process trees, not raw processes.",
},
)
feedback_bus = data.get("lea_feedback_bus_user") or data.get("lea_feedback_bus_machine")
add_check(
checks,
"windows:LEA_FEEDBACK_BUS",
"ok",
f"LEA_FEEDBACK_BUS={feedback_bus!r}",
{
"note": "If set to '1', Windows will try port 5004; local TCP/HTTP checks report whether that service is available.",
"rpa_capture_bind_user": data.get("rpa_capture_bind_user"),
"lock_pid": data.get("lock_pid"),
},
)
def summarize(checks: list[dict[str, Any]]) -> str:
if any(check["status"] == "fail" for check in checks):
return "fail"
if any(check["status"] == "warn" for check in checks):
return "warn"
return "ok"
def print_text(report: dict[str, Any]) -> None:
print(f"Lea healthcheck: {report['overall'].upper()}")
print(f"Timestamp: {report['timestamp']}")
print()
for check in report["checks"]:
print(f"[{check['status'].upper():4}] {check['name']} - {check['summary']}")
def parse_args(argv: list[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--json", action="store_true", help="print machine-readable JSON")
parser.add_argument("--strict", action="store_true", help="exit non-zero on warnings")
parser.add_argument("--host", default="127.0.0.1", help="local service host for TCP checks")
parser.add_argument("--windows-host", default=os.environ.get("LEA_WINDOWS_HOST", ""))
parser.add_argument("--windows-user", default=os.environ.get("LEA_WINDOWS_USER", "dom"))
parser.add_argument(
"--ssh-command",
default=os.environ.get("LEA_SSH_COMMAND", "ssh"),
help="ssh command prefix; for password auth use LEA_SSH_COMMAND='sshpass -e ssh' and SSHPASS externally",
)
return parser.parse_args(argv)
def main(argv: list[str]) -> int:
args = parse_args(argv)
checks: list[dict[str, Any]] = []
check_systemd(checks)
check_ports(checks, args.host)
check_http_services(checks)
check_ollama(checks)
check_model_store(checks)
if args.windows_host:
check_windows(checks, args.windows_host, args.windows_user, args.ssh_command)
else:
add_check(checks, "windows", "skip", "not requested; pass --windows-host or LEA_WINDOWS_HOST")
report = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
"overall": summarize(checks),
"repo": str(REPO_ROOT),
"checks": checks,
}
if args.json:
print(json.dumps(report, indent=2, sort_keys=True))
else:
print_text(report)
if report["overall"] == "fail":
return 2
if args.strict and report["overall"] == "warn":
return 1
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))