#!/usr/bin/env python3 """Read-only healthcheck for the Lea demo stack. This script does not start, stop, restart, delete, or restore anything. It is intended as a daily proof artifact before the 2026-06-01 demo. """ from __future__ import annotations import argparse import base64 import json import os import shlex import socket import subprocess import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any REPO_ROOT = Path(__file__).resolve().parents[1] OLLAMA_MODELS_DIR = Path("/var/lib/ollama/.ollama/models") CRITICAL_MODELS = ( "qwen2.5vl:7b-rpa", "t2a-gemma3-27b:latest", "t2a-gemma3-27b-q4:latest", "thiagomoraes/medgemma-27b-it:Q4_K_S", ) CRITICAL_BLOBS = { "t2a-gemma3-27b:latest": "sha256-2f2509e30b0d07db517b82e62404194ef355846f08ac287775ff363693086818", "t2a-gemma3-27b-q4:latest": "sha256-0139f42273d53348fa0d24daae016b7231e1310258bbbaa7e38a1af703217c1a", "thiagomoraes/medgemma-27b-it:Q4_K_S": "sha256-7cb6ff10942c8ccf370e274daafaf56da3fff318f40a355df331d8783c6c11f3", } def run_command(args: list[str], timeout: float = 5.0) -> tuple[int, str, str]: try: proc = subprocess.run( args, cwd=REPO_ROOT, text=True, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False, ) return proc.returncode, proc.stdout.strip(), proc.stderr.strip() except FileNotFoundError as exc: return 127, "", str(exc) except subprocess.TimeoutExpired as exc: stdout = (exc.stdout or "").strip() if isinstance(exc.stdout, str) else "" stderr = (exc.stderr or "").strip() if isinstance(exc.stderr, str) else "" return 124, stdout, stderr or f"timeout after {timeout}s" def http_json(url: str, timeout: float = 2.0) -> tuple[bool, Any, str]: try: req = urllib.request.Request(url, headers={"User-Agent": "lea-healthcheck/1.0"}) with urllib.request.urlopen(req, timeout=timeout) as response: body = response.read().decode("utf-8", errors="replace") if response.status >= 400: return False, None, f"http {response.status}: {body[:300]}" return True, json.loads(body), "" except json.JSONDecodeError as exc: return False, None, f"invalid json: {exc}" except (urllib.error.URLError, TimeoutError, OSError) as exc: return False, None, str(exc) def tcp_open(host: str, port: int, timeout: float = 1.0) -> tuple[bool, str]: try: with socket.create_connection((host, port), timeout=timeout): return True, "" except OSError as exc: return False, str(exc) def add_check( checks: list[dict[str, Any]], name: str, status: str, summary: str, details: Any | None = None, ) -> None: checks.append( { "name": name, "status": status, "summary": summary, "details": details, } ) def check_systemd(checks: list[dict[str, Any]]) -> None: for unit, required in ( ("rpa-streaming.service", True), ("rpa-agent-chat.service", False), ): code, stdout, stderr = run_command(["systemctl", "--user", "is-active", unit]) if code == 0 and stdout == "active": add_check(checks, f"systemd:{unit}", "ok", "active") elif required: add_check( checks, f"systemd:{unit}", "fail", stdout or stderr or f"is-active returned {code}", ) else: add_check( checks, f"systemd:{unit}", "warn", stdout or stderr or "inactive optional service", {"note": "5004 narration bus is optional, but should be fixed before demo if enabled on Windows."}, ) def check_ports(checks: list[dict[str, Any]], host: str) -> None: for port, name, required in ( (5005, "streaming-http", True), (11434, "ollama-api", True), (5004, "feedbackbus-socketio", False), ): ok, error = tcp_open(host, port) if ok: add_check(checks, f"tcp:{name}:{port}", "ok", f"{host}:{port} accepts TCP") elif required: add_check(checks, f"tcp:{name}:{port}", "fail", f"{host}:{port} closed: {error}") else: add_check(checks, f"tcp:{name}:{port}", "warn", f"{host}:{port} closed: {error}") def check_http_services(checks: list[dict[str, Any]]) -> None: ok, data, error = http_json("http://127.0.0.1:5005/health") if ok and isinstance(data, dict) and data.get("status") == "healthy": add_check(checks, "http:rpa-streaming:/health", "ok", "healthy", data) else: add_check(checks, "http:rpa-streaming:/health", "fail", error or "unexpected health response", data) ok, data, error = http_json("http://127.0.0.1:5004/api/status") if ok and isinstance(data, dict) and data.get("status") == "online": add_check(checks, "http:feedbackbus:/api/status", "ok", "online", data) elif ok: add_check(checks, "http:feedbackbus:/api/status", "warn", "unexpected status response", data) else: add_check(checks, "http:feedbackbus:/api/status", "warn", error or "not responding") def check_ollama(checks: list[dict[str, Any]]) -> None: ok, tags_data, error = http_json("http://127.0.0.1:11434/api/tags", timeout=4.0) if not ok or not isinstance(tags_data, dict): add_check(checks, "ollama:tags", "fail", error or "cannot read /api/tags") return models = tags_data.get("models") or [] names = {entry.get("name") or entry.get("model") for entry in models if isinstance(entry, dict)} missing = [name for name in CRITICAL_MODELS if name not in names] if missing: add_check(checks, "ollama:critical-tags", "fail", f"missing critical tags: {', '.join(missing)}") else: add_check(checks, "ollama:critical-tags", "ok", f"{len(CRITICAL_MODELS)} critical tags present") ok, ps_data, error = http_json("http://127.0.0.1:11434/api/ps", timeout=4.0) if ok and isinstance(ps_data, dict): loaded = ps_data.get("models") or [] vlm = next( (entry for entry in loaded if isinstance(entry, dict) and entry.get("name") == "qwen2.5vl:7b-rpa"), None, ) if vlm: add_check( checks, "ollama:resident-vlm", "ok", "qwen2.5vl:7b-rpa resident", { "context_length": vlm.get("context_length"), "size": vlm.get("size"), "size_vram": vlm.get("size_vram"), "expires_at": vlm.get("expires_at"), }, ) else: add_check(checks, "ollama:resident-vlm", "warn", "qwen2.5vl:7b-rpa is not currently resident", loaded) else: add_check(checks, "ollama:ps", "warn", error or "cannot read /api/ps") def check_model_store(checks: list[dict[str, Any]]) -> None: manifests_dir = OLLAMA_MODELS_DIR / "manifests" blobs_dir = OLLAMA_MODELS_DIR / "blobs" if not OLLAMA_MODELS_DIR.exists(): add_check(checks, "ollama:store", "fail", f"missing {OLLAMA_MODELS_DIR}") return manifest_count = sum(1 for path in manifests_dir.rglob("*") if path.is_file()) if manifests_dir.exists() else 0 blob_count = sum(1 for path in blobs_dir.iterdir() if path.is_file()) if blobs_dir.exists() else 0 add_check( checks, "ollama:store-counts", "ok" if manifest_count >= 38 and blob_count >= 100 else "warn", f"{manifest_count} manifests, {blob_count} blobs", {"path": str(OLLAMA_MODELS_DIR)}, ) missing_blobs = [] for model, blob_name in CRITICAL_BLOBS.items(): blob_path = blobs_dir / blob_name if not blob_path.exists(): missing_blobs.append({"model": model, "blob": blob_name}) if missing_blobs: add_check(checks, "ollama:critical-blobs", "fail", "missing critical blobs", missing_blobs) else: add_check(checks, "ollama:critical-blobs", "ok", f"{len(CRITICAL_BLOBS)} critical blobs present") def check_windows(checks: list[dict[str, Any]], host: str, user: str, ssh_command: str) -> None: powershell = r""" $ErrorActionPreference = "SilentlyContinue" $task = schtasks /query /tn LeaInteractive /fo LIST /v | Out-String $taskState = $null try { $taskState = (Get-ScheduledTask -TaskName 'LeaInteractive').State.ToString() } catch { $taskState = $null } $procs = Get-CimInstance Win32_Process -Filter "name = 'pythonw.exe' or name = 'python.exe'" | Where-Object { $_.CommandLine -like '*run_agent_v1.py*' } | Select-Object ProcessId,ParentProcessId,CommandLine $lock = $null if (Test-Path 'C:\rpa_vision\lea_agent.lock') { $lock = (Get-Content 'C:\rpa_vision\lea_agent.lock' -Raw).Trim() } [pscustomobject]@{ lea_feedback_bus_user = [Environment]::GetEnvironmentVariable('LEA_FEEDBACK_BUS', 'User') lea_feedback_bus_machine = [Environment]::GetEnvironmentVariable('LEA_FEEDBACK_BUS', 'Machine') rpa_capture_bind_user = [Environment]::GetEnvironmentVariable('RPA_CAPTURE_BIND', 'User') task_state = $taskState task_running = ($taskState -eq 'Running') task_raw = $task agent_processes = @($procs) lock_pid = $lock } | ConvertTo-Json -Compress -Depth 5 """.strip() command_parts = shlex.split(ssh_command) if not command_parts: add_check(checks, "windows:ssh", "skip", "empty ssh command") return target = f"{user}@{host}" args = command_parts + ["-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=5"] if "sshpass" not in command_parts[0]: args += ["-o", "BatchMode=yes"] encoded = base64.b64encode(powershell.encode("utf-16le")).decode("ascii") args += [target, "powershell", "-NoProfile", "-EncodedCommand", encoded] code, stdout, stderr = run_command(args, timeout=12.0) if code != 0: add_check(checks, "windows:ssh", "warn", stderr or stdout or f"ssh returned {code}") return try: data = json.loads(stdout) except json.JSONDecodeError as exc: add_check(checks, "windows:ssh", "warn", f"invalid powershell json: {exc}", stdout[:1000]) return processes = data.get("agent_processes") or [] if isinstance(processes, dict): processes = [processes] process_ids = { int(proc.get("ProcessId")) for proc in processes if proc.get("ProcessId") is not None } process_roots = [ proc for proc in processes if proc.get("ParentProcessId") is None or int(proc.get("ParentProcessId")) not in process_ids ] if processes and not process_roots: process_roots = processes instance_count = len(process_roots) task_running = bool(data.get("task_running")) task_state = data.get("task_state") if task_running: task_status = "ok" task_summary = f"task state={task_state!r}" elif instance_count: task_status = "warn" task_summary = f"task state={task_state!r}, but {instance_count} agent instance tree(s) are alive" else: task_status = "fail" task_summary = f"task state={task_state!r}" add_check(checks, "windows:ssh", "ok", f"reachable as {target}") add_check(checks, "windows:LeaInteractive", task_status, task_summary) add_check( checks, "windows:agent-process", "ok" if instance_count == 1 else ("warn" if instance_count > 1 else "fail"), f"{instance_count} Lea instance tree(s), {len(processes)} run_agent_v1.py process(es)", { "roots": process_roots, "processes": processes, "note": "pythonw.exe from a venv can spawn a child pythonw.exe; count root process trees, not raw processes.", }, ) feedback_bus = data.get("lea_feedback_bus_user") or data.get("lea_feedback_bus_machine") add_check( checks, "windows:LEA_FEEDBACK_BUS", "ok", f"LEA_FEEDBACK_BUS={feedback_bus!r}", { "note": "If set to '1', Windows will try port 5004; local TCP/HTTP checks report whether that service is available.", "rpa_capture_bind_user": data.get("rpa_capture_bind_user"), "lock_pid": data.get("lock_pid"), }, ) def summarize(checks: list[dict[str, Any]]) -> str: if any(check["status"] == "fail" for check in checks): return "fail" if any(check["status"] == "warn" for check in checks): return "warn" return "ok" def print_text(report: dict[str, Any]) -> None: print(f"Lea healthcheck: {report['overall'].upper()}") print(f"Timestamp: {report['timestamp']}") print() for check in report["checks"]: print(f"[{check['status'].upper():4}] {check['name']} - {check['summary']}") def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--json", action="store_true", help="print machine-readable JSON") parser.add_argument("--strict", action="store_true", help="exit non-zero on warnings") parser.add_argument("--host", default="127.0.0.1", help="local service host for TCP checks") parser.add_argument("--windows-host", default=os.environ.get("LEA_WINDOWS_HOST", "")) parser.add_argument("--windows-user", default=os.environ.get("LEA_WINDOWS_USER", "dom")) parser.add_argument( "--ssh-command", default=os.environ.get("LEA_SSH_COMMAND", "ssh"), help="ssh command prefix; for password auth use LEA_SSH_COMMAND='sshpass -e ssh' and SSHPASS externally", ) return parser.parse_args(argv) def main(argv: list[str]) -> int: args = parse_args(argv) checks: list[dict[str, Any]] = [] check_systemd(checks) check_ports(checks, args.host) check_http_services(checks) check_ollama(checks) check_model_store(checks) if args.windows_host: check_windows(checks, args.windows_host, args.windows_user, args.ssh_command) else: add_check(checks, "windows", "skip", "not requested; pass --windows-host or LEA_WINDOWS_HOST") report = { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "overall": summarize(checks), "repo": str(REPO_ROOT), "checks": checks, } if args.json: print(json.dumps(report, indent=2, sort_keys=True)) else: print_text(report) if report["overall"] == "fail": return 2 if args.strict and report["overall"] == "warn": return 1 return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))