#!/usr/bin/env python3 """Read-only preflight for Lea micro-learning prerequisites. The script performs fast checks only. It does not warm up models, pull models, start services, stop replays, restart processes, or modify files. """ from __future__ import annotations import argparse import csv import json import os import re import subprocess import sys import time import urllib.error import urllib.request from pathlib import Path from typing import Any try: # Script execution from tools/ from lea_healthcheck import REPO_ROOT, add_check, http_json, run_command, summarize except ImportError: # Test/import execution from repository root try: from tools.lea_healthcheck import REPO_ROOT, add_check, http_json, run_command, summarize except ImportError: REPO_ROOT = Path(__file__).resolve().parents[1] def run_command(args: list[str], timeout: float = 5.0) -> tuple[int, str, str]: try: proc = subprocess.run( args, cwd=REPO_ROOT, text=True, encoding="utf-8", errors="replace", stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False, ) return proc.returncode, proc.stdout.strip(), proc.stderr.strip() except FileNotFoundError as exc: return 127, "", str(exc) except subprocess.TimeoutExpired as exc: stdout = (exc.stdout or "").strip() if isinstance(exc.stdout, str) else "" stderr = (exc.stderr or "").strip() if isinstance(exc.stderr, str) else "" return 124, stdout, stderr or f"timeout after {timeout}s" def http_json(url: str, timeout: float = 2.0) -> tuple[bool, Any, str]: try: req = urllib.request.Request(url, headers={"User-Agent": "lea-micro-preflight/1.0"}) with urllib.request.urlopen(req, timeout=timeout) as response: body = response.read().decode("utf-8", errors="replace") if response.status >= 400: return False, None, f"http {response.status}: {body[:300]}" return True, json.loads(body), "" except json.JSONDecodeError as exc: return False, None, f"invalid json: {exc}" except (urllib.error.URLError, TimeoutError, OSError) as exc: return False, None, str(exc) def add_check( checks: list[dict[str, Any]], name: str, status: str, summary: str, details: Any | None = None, ) -> None: checks.append({"name": name, "status": status, "summary": summary, "details": details}) def summarize(checks: list[dict[str, Any]]) -> str: if any(check["status"] == "fail" for check in checks): return "fail" if any(check["status"] == "warn" for check in checks): return "warn" return "ok" DEFAULT_MIN_VRAM_FREE_MIB = 4000 DEFAULT_MIN_RAM_AVAILABLE_MIB = 8192 DEFAULT_MAX_SWAP_USED_MIB = 4096 DEFAULT_MAX_SWAP_USED_PCT = 70.0 REQUIRED_MODELS = ("qwen2.5vl:7b-rpa", "qwen2.5:7b") DEFAULT_RESIDENT_WARN_MODEL = "qwen2.5vl:7b-rpa" def _parse_mib_int(value: str) -> int: match = re.search(r"-?\d+", value.replace("\u00a0", " ")) if not match: raise ValueError(f"cannot parse integer from {value!r}") return int(match.group(0)) def parse_free_m(output: str) -> dict[str, dict[str, int]]: """Parse `free -m` output into mem/swap dictionaries. Handles localized column names by normalizing to English keys. """ header: list[str] = [] parsed: dict[str, dict[str, int]] = {} fallback_columns = { "mem": ["total", "used", "free", "shared", "buff/cache", "available"], "swap": ["total", "used", "free"], } # Localization map: known foreign column names → English _LOCALIZATION_MAP = { "disponible": "available", "utilisé": "used", "libre": "free", "partagé": "shared", "tamp/cache": "buff/cache", # French truncation of "tampon/cache" } def _normalize(name: str) -> str: return _LOCALIZATION_MAP.get(name.lower(), name.lower()) for raw_line in output.splitlines(): parts = raw_line.strip().split() if not parts: continue if parts[0].lower() == "total": header = [_normalize(p) for p in parts] continue label = parts[0].rstrip(":").lower() # Handle localized row labels: "échange" = "swap" (French) if label == "échange": label = "swap" if label not in ("mem", "swap"): continue values = parts[1:] if label == "mem" and header: columns = header[: len(values)] else: columns = fallback_columns[label][: len(values)] parsed[label] = {key: int(value) for key, value in zip(columns, values)} if "mem" not in parsed: raise ValueError("missing Mem line in free output") if "available" not in parsed["mem"] and "free" in parsed["mem"]: parsed["mem"]["available"] = parsed["mem"]["free"] if "available" not in parsed["mem"]: raise ValueError("missing available memory in free output") if "swap" not in parsed: raise ValueError("missing Swap line in free output") return parsed def parse_nvidia_smi_memory(output: str) -> list[dict[str, int]]: """Parse `nvidia-smi --query-gpu=memory.free,memory.total` CSV output.""" gpus: list[dict[str, int]] = [] for row in csv.reader(output.splitlines()): cells = [cell.strip() for cell in row if cell.strip()] if not cells: continue if len(cells) < 2: raise ValueError(f"expected two CSV columns, got {cells!r}") gpus.append( { "free_mib": _parse_mib_int(cells[0]), "total_mib": _parse_mib_int(cells[1]), } ) if not gpus: raise ValueError("nvidia-smi returned no GPU memory rows") return gpus def extract_ollama_tags(data: Any) -> set[str]: """Extract model tags from Ollama `/api/tags` or `/api/ps` style JSON.""" if isinstance(data, dict): models = data.get("models") or [] elif isinstance(data, list): models = data else: return set() tags: set[str] = set() for entry in models: if not isinstance(entry, dict): continue for key in ("name", "model"): value = entry.get(key) if isinstance(value, str) and value: tags.add(value) return tags def check_gpu(checks: list[dict[str, Any]], min_vram_free_mib: int, timeout: float) -> None: code, stdout, stderr = run_command( [ "nvidia-smi", "--query-gpu=memory.free,memory.total", "--format=csv,noheader,nounits", ], timeout=timeout, ) if code != 0: summary = "nvidia-smi not available" if code == 127 else stderr or stdout or f"exit {code}" add_check(checks, "gpu:nvidia-smi", "fail", summary) return try: gpus = parse_nvidia_smi_memory(stdout) except ValueError as exc: add_check(checks, "gpu:nvidia-smi", "fail", f"cannot parse nvidia-smi output: {exc}", stdout) return add_check(checks, "gpu:nvidia-smi", "ok", f"query ok, {len(gpus)} GPU(s)", {"gpus": gpus}) best_free = max(gpu["free_mib"] for gpu in gpus) status = "ok" if best_free >= min_vram_free_mib else "fail" add_check( checks, "gpu:vram-free", status, f"best free VRAM {best_free} MiB / required {min_vram_free_mib} MiB", {"threshold_mib": min_vram_free_mib, "gpus": gpus}, ) def check_memory( checks: list[dict[str, Any]], min_ram_available_mib: int, max_swap_used_mib: int, max_swap_used_pct: float, timeout: float, ) -> None: code, stdout, stderr = run_command(["free", "-m"], timeout=timeout) if code != 0: add_check(checks, "memory:free", "fail", stderr or stdout or f"exit {code}") return try: memory = parse_free_m(stdout) except ValueError as exc: add_check(checks, "memory:free", "fail", f"cannot parse free -m output: {exc}", stdout) return mem = memory["mem"] available = mem["available"] add_check( checks, "ram:available", "ok" if available >= min_ram_available_mib else "fail", f"available RAM {available} MiB / required {min_ram_available_mib} MiB", {"threshold_mib": min_ram_available_mib, "mem": mem}, ) swap = memory["swap"] swap_total = swap.get("total", 0) swap_used = swap.get("used", 0) if swap_total <= 0: add_check(checks, "swap:usage", "fail", "swap total is 0 MiB", {"swap": swap}) return swap_used_pct = (swap_used / swap_total) * 100.0 swap_ok = swap_used <= max_swap_used_mib and swap_used_pct <= max_swap_used_pct add_check( checks, "swap:usage", "ok" if swap_ok else "fail", f"swap used {swap_used} MiB ({swap_used_pct:.1f}%) / limits {max_swap_used_mib} MiB and {max_swap_used_pct:.1f}%", { "max_used_mib": max_swap_used_mib, "max_used_pct": max_swap_used_pct, "used_pct": round(swap_used_pct, 2), "swap": swap, }, ) def check_ollama( checks: list[dict[str, Any]], base_url: str, required_models: tuple[str, ...], resident_warn_model: str, timeout: float, ) -> None: base = base_url.rstrip("/") ok, tags_data, error = http_json(f"{base}/api/tags", timeout=timeout) if not ok or not isinstance(tags_data, dict): add_check(checks, "ollama:tags", "fail", error or "cannot read /api/tags") return tags = extract_ollama_tags(tags_data) add_check(checks, "ollama:tags", "ok", f"/api/tags readable, {len(tags)} tag(s)") missing = [model for model in required_models if model not in tags] if missing: add_check( checks, "ollama:required-models", "fail", f"missing required model(s): {', '.join(missing)}", {"required": list(required_models), "present": sorted(tags)}, ) else: add_check( checks, "ollama:required-models", "ok", f"{len(required_models)} required model(s) present", {"required": list(required_models)}, ) ok, ps_data, error = http_json(f"{base}/api/ps", timeout=timeout) if not ok or not isinstance(ps_data, dict): add_check(checks, "ollama:ps", "fail", error or "cannot read /api/ps") return resident = extract_ollama_tags(ps_data) add_check(checks, "ollama:ps", "ok", f"/api/ps readable, {len(resident)} resident model(s)") if resident_warn_model in resident: add_check(checks, "ollama:resident-vlm", "ok", f"{resident_warn_model} resident") else: add_check( checks, "ollama:resident-vlm", "warn", f"{resident_warn_model} is not resident; no warmup was attempted", {"resident": sorted(resident)}, ) def print_text(report: dict[str, Any]) -> None: print(f"Lea micro preflight: {report['overall'].upper()}") print(f"Timestamp: {report['timestamp']}") print("Warmup: disabled") print() for check in report["checks"]: print(f"[{check['status'].upper():4}] {check['name']} - {check['summary']}") def parse_args(argv: list[str]) -> argparse.Namespace: parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("--json", action="store_true", help="print machine-readable JSON") parser.add_argument("--strict", action="store_true", help="exit 1 when only warnings are present") parser.add_argument("--ollama-url", default=os.environ.get("OLLAMA_BASE_URL", "http://127.0.0.1:11434")) parser.add_argument("--timeout", type=float, default=4.0, help="per-check timeout in seconds") parser.add_argument("--min-vram-free-mib", type=int, default=DEFAULT_MIN_VRAM_FREE_MIB) parser.add_argument("--min-ram-available-mib", type=int, default=DEFAULT_MIN_RAM_AVAILABLE_MIB) parser.add_argument("--max-swap-used-mib", type=int, default=DEFAULT_MAX_SWAP_USED_MIB) parser.add_argument("--max-swap-used-pct", type=float, default=DEFAULT_MAX_SWAP_USED_PCT) parser.add_argument( "--required-model", action="append", dest="required_models", help="required Ollama model tag; may be repeated", ) parser.add_argument("--resident-warn-model", default=DEFAULT_RESIDENT_WARN_MODEL) return parser.parse_args(argv) def build_report(args: argparse.Namespace) -> dict[str, Any]: required_models = tuple(args.required_models or REQUIRED_MODELS) checks: list[dict[str, Any]] = [] check_gpu(checks, args.min_vram_free_mib, args.timeout) check_memory( checks, args.min_ram_available_mib, args.max_swap_used_mib, args.max_swap_used_pct, args.timeout, ) check_ollama(checks, args.ollama_url, required_models, args.resident_warn_model, args.timeout) return { "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"), "overall": summarize(checks), "repo": str(REPO_ROOT), "warmup": "disabled", "thresholds": { "min_vram_free_mib": args.min_vram_free_mib, "min_ram_available_mib": args.min_ram_available_mib, "max_swap_used_mib": args.max_swap_used_mib, "max_swap_used_pct": args.max_swap_used_pct, }, "ollama_url": args.ollama_url, "required_models": list(required_models), "checks": checks, } def main(argv: list[str]) -> int: args = parse_args(argv) report = build_report(args) if args.json: print(json.dumps(report, indent=2, sort_keys=True)) else: print_text(report) if report["overall"] == "fail": return 2 if args.strict and report["overall"] == "warn": return 1 return 0 if __name__ == "__main__": raise SystemExit(main(sys.argv[1:]))