410 lines
14 KiB
Python
410 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""Read-only preflight for Lea micro-learning prerequisites.
|
|
|
|
The script performs fast checks only. It does not warm up models, pull models,
|
|
start services, stop replays, restart processes, or modify files.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
try: # Script execution from tools/
|
|
from lea_healthcheck import REPO_ROOT, add_check, http_json, run_command, summarize
|
|
except ImportError: # Test/import execution from repository root
|
|
try:
|
|
from tools.lea_healthcheck import REPO_ROOT, add_check, http_json, run_command, summarize
|
|
except ImportError:
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
def run_command(args: list[str], timeout: float = 5.0) -> tuple[int, str, str]:
|
|
try:
|
|
proc = subprocess.run(
|
|
args,
|
|
cwd=REPO_ROOT,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="replace",
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=timeout,
|
|
check=False,
|
|
)
|
|
return proc.returncode, proc.stdout.strip(), proc.stderr.strip()
|
|
except FileNotFoundError as exc:
|
|
return 127, "", str(exc)
|
|
except subprocess.TimeoutExpired as exc:
|
|
stdout = (exc.stdout or "").strip() if isinstance(exc.stdout, str) else ""
|
|
stderr = (exc.stderr or "").strip() if isinstance(exc.stderr, str) else ""
|
|
return 124, stdout, stderr or f"timeout after {timeout}s"
|
|
|
|
def http_json(url: str, timeout: float = 2.0) -> tuple[bool, Any, str]:
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "lea-micro-preflight/1.0"})
|
|
with urllib.request.urlopen(req, timeout=timeout) as response:
|
|
body = response.read().decode("utf-8", errors="replace")
|
|
if response.status >= 400:
|
|
return False, None, f"http {response.status}: {body[:300]}"
|
|
return True, json.loads(body), ""
|
|
except json.JSONDecodeError as exc:
|
|
return False, None, f"invalid json: {exc}"
|
|
except (urllib.error.URLError, TimeoutError, OSError) as exc:
|
|
return False, None, str(exc)
|
|
|
|
def add_check(
|
|
checks: list[dict[str, Any]],
|
|
name: str,
|
|
status: str,
|
|
summary: str,
|
|
details: Any | None = None,
|
|
) -> None:
|
|
checks.append({"name": name, "status": status, "summary": summary, "details": details})
|
|
|
|
def summarize(checks: list[dict[str, Any]]) -> str:
|
|
if any(check["status"] == "fail" for check in checks):
|
|
return "fail"
|
|
if any(check["status"] == "warn" for check in checks):
|
|
return "warn"
|
|
return "ok"
|
|
|
|
|
|
DEFAULT_MIN_VRAM_FREE_MIB = 4000
|
|
DEFAULT_MIN_RAM_AVAILABLE_MIB = 8192
|
|
DEFAULT_MAX_SWAP_USED_MIB = 4096
|
|
DEFAULT_MAX_SWAP_USED_PCT = 70.0
|
|
|
|
REQUIRED_MODELS = ("qwen2.5vl:7b-rpa", "qwen2.5:7b")
|
|
DEFAULT_RESIDENT_WARN_MODEL = "qwen2.5vl:7b-rpa"
|
|
|
|
|
|
def _parse_mib_int(value: str) -> int:
|
|
match = re.search(r"-?\d+", value.replace("\u00a0", " "))
|
|
if not match:
|
|
raise ValueError(f"cannot parse integer from {value!r}")
|
|
return int(match.group(0))
|
|
|
|
|
|
def parse_free_m(output: str) -> dict[str, dict[str, int]]:
|
|
"""Parse `free -m` output into mem/swap dictionaries.
|
|
|
|
Handles localized column names by normalizing to English keys.
|
|
"""
|
|
header: list[str] = []
|
|
parsed: dict[str, dict[str, int]] = {}
|
|
fallback_columns = {
|
|
"mem": ["total", "used", "free", "shared", "buff/cache", "available"],
|
|
"swap": ["total", "used", "free"],
|
|
}
|
|
|
|
# Localization map: known foreign column names → English
|
|
_LOCALIZATION_MAP = {
|
|
"disponible": "available",
|
|
"utilisé": "used",
|
|
"libre": "free",
|
|
"partagé": "shared",
|
|
"tamp/cache": "buff/cache", # French truncation of "tampon/cache"
|
|
}
|
|
|
|
def _normalize(name: str) -> str:
|
|
return _LOCALIZATION_MAP.get(name.lower(), name.lower())
|
|
|
|
for raw_line in output.splitlines():
|
|
parts = raw_line.strip().split()
|
|
if not parts:
|
|
continue
|
|
|
|
if parts[0].lower() == "total":
|
|
header = [_normalize(p) for p in parts]
|
|
continue
|
|
|
|
label = parts[0].rstrip(":").lower()
|
|
# Handle localized row labels: "échange" = "swap" (French)
|
|
if label == "échange":
|
|
label = "swap"
|
|
if label not in ("mem", "swap"):
|
|
continue
|
|
|
|
values = parts[1:]
|
|
if label == "mem" and header:
|
|
columns = header[: len(values)]
|
|
else:
|
|
columns = fallback_columns[label][: len(values)]
|
|
parsed[label] = {key: int(value) for key, value in zip(columns, values)}
|
|
|
|
if "mem" not in parsed:
|
|
raise ValueError("missing Mem line in free output")
|
|
if "available" not in parsed["mem"] and "free" in parsed["mem"]:
|
|
parsed["mem"]["available"] = parsed["mem"]["free"]
|
|
if "available" not in parsed["mem"]:
|
|
raise ValueError("missing available memory in free output")
|
|
if "swap" not in parsed:
|
|
raise ValueError("missing Swap line in free output")
|
|
return parsed
|
|
|
|
|
|
def parse_nvidia_smi_memory(output: str) -> list[dict[str, int]]:
|
|
"""Parse `nvidia-smi --query-gpu=memory.free,memory.total` CSV output."""
|
|
gpus: list[dict[str, int]] = []
|
|
for row in csv.reader(output.splitlines()):
|
|
cells = [cell.strip() for cell in row if cell.strip()]
|
|
if not cells:
|
|
continue
|
|
if len(cells) < 2:
|
|
raise ValueError(f"expected two CSV columns, got {cells!r}")
|
|
gpus.append(
|
|
{
|
|
"free_mib": _parse_mib_int(cells[0]),
|
|
"total_mib": _parse_mib_int(cells[1]),
|
|
}
|
|
)
|
|
if not gpus:
|
|
raise ValueError("nvidia-smi returned no GPU memory rows")
|
|
return gpus
|
|
|
|
|
|
def extract_ollama_tags(data: Any) -> set[str]:
|
|
"""Extract model tags from Ollama `/api/tags` or `/api/ps` style JSON."""
|
|
if isinstance(data, dict):
|
|
models = data.get("models") or []
|
|
elif isinstance(data, list):
|
|
models = data
|
|
else:
|
|
return set()
|
|
|
|
tags: set[str] = set()
|
|
for entry in models:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
for key in ("name", "model"):
|
|
value = entry.get(key)
|
|
if isinstance(value, str) and value:
|
|
tags.add(value)
|
|
return tags
|
|
|
|
|
|
def check_gpu(checks: list[dict[str, Any]], min_vram_free_mib: int, timeout: float) -> None:
|
|
code, stdout, stderr = run_command(
|
|
[
|
|
"nvidia-smi",
|
|
"--query-gpu=memory.free,memory.total",
|
|
"--format=csv,noheader,nounits",
|
|
],
|
|
timeout=timeout,
|
|
)
|
|
if code != 0:
|
|
summary = "nvidia-smi not available" if code == 127 else stderr or stdout or f"exit {code}"
|
|
add_check(checks, "gpu:nvidia-smi", "fail", summary)
|
|
return
|
|
|
|
try:
|
|
gpus = parse_nvidia_smi_memory(stdout)
|
|
except ValueError as exc:
|
|
add_check(checks, "gpu:nvidia-smi", "fail", f"cannot parse nvidia-smi output: {exc}", stdout)
|
|
return
|
|
|
|
add_check(checks, "gpu:nvidia-smi", "ok", f"query ok, {len(gpus)} GPU(s)", {"gpus": gpus})
|
|
best_free = max(gpu["free_mib"] for gpu in gpus)
|
|
status = "ok" if best_free >= min_vram_free_mib else "fail"
|
|
add_check(
|
|
checks,
|
|
"gpu:vram-free",
|
|
status,
|
|
f"best free VRAM {best_free} MiB / required {min_vram_free_mib} MiB",
|
|
{"threshold_mib": min_vram_free_mib, "gpus": gpus},
|
|
)
|
|
|
|
|
|
def check_memory(
|
|
checks: list[dict[str, Any]],
|
|
min_ram_available_mib: int,
|
|
max_swap_used_mib: int,
|
|
max_swap_used_pct: float,
|
|
timeout: float,
|
|
) -> None:
|
|
code, stdout, stderr = run_command(["free", "-m"], timeout=timeout)
|
|
if code != 0:
|
|
add_check(checks, "memory:free", "fail", stderr or stdout or f"exit {code}")
|
|
return
|
|
|
|
try:
|
|
memory = parse_free_m(stdout)
|
|
except ValueError as exc:
|
|
add_check(checks, "memory:free", "fail", f"cannot parse free -m output: {exc}", stdout)
|
|
return
|
|
|
|
mem = memory["mem"]
|
|
available = mem["available"]
|
|
add_check(
|
|
checks,
|
|
"ram:available",
|
|
"ok" if available >= min_ram_available_mib else "fail",
|
|
f"available RAM {available} MiB / required {min_ram_available_mib} MiB",
|
|
{"threshold_mib": min_ram_available_mib, "mem": mem},
|
|
)
|
|
|
|
swap = memory["swap"]
|
|
swap_total = swap.get("total", 0)
|
|
swap_used = swap.get("used", 0)
|
|
if swap_total <= 0:
|
|
add_check(checks, "swap:usage", "fail", "swap total is 0 MiB", {"swap": swap})
|
|
return
|
|
|
|
swap_used_pct = (swap_used / swap_total) * 100.0
|
|
swap_ok = swap_used <= max_swap_used_mib and swap_used_pct <= max_swap_used_pct
|
|
add_check(
|
|
checks,
|
|
"swap:usage",
|
|
"ok" if swap_ok else "fail",
|
|
f"swap used {swap_used} MiB ({swap_used_pct:.1f}%) / limits {max_swap_used_mib} MiB and {max_swap_used_pct:.1f}%",
|
|
{
|
|
"max_used_mib": max_swap_used_mib,
|
|
"max_used_pct": max_swap_used_pct,
|
|
"used_pct": round(swap_used_pct, 2),
|
|
"swap": swap,
|
|
},
|
|
)
|
|
|
|
|
|
def check_ollama(
|
|
checks: list[dict[str, Any]],
|
|
base_url: str,
|
|
required_models: tuple[str, ...],
|
|
resident_warn_model: str,
|
|
timeout: float,
|
|
) -> None:
|
|
base = base_url.rstrip("/")
|
|
ok, tags_data, error = http_json(f"{base}/api/tags", timeout=timeout)
|
|
if not ok or not isinstance(tags_data, dict):
|
|
add_check(checks, "ollama:tags", "fail", error or "cannot read /api/tags")
|
|
return
|
|
|
|
tags = extract_ollama_tags(tags_data)
|
|
add_check(checks, "ollama:tags", "ok", f"/api/tags readable, {len(tags)} tag(s)")
|
|
missing = [model for model in required_models if model not in tags]
|
|
if missing:
|
|
add_check(
|
|
checks,
|
|
"ollama:required-models",
|
|
"fail",
|
|
f"missing required model(s): {', '.join(missing)}",
|
|
{"required": list(required_models), "present": sorted(tags)},
|
|
)
|
|
else:
|
|
add_check(
|
|
checks,
|
|
"ollama:required-models",
|
|
"ok",
|
|
f"{len(required_models)} required model(s) present",
|
|
{"required": list(required_models)},
|
|
)
|
|
|
|
ok, ps_data, error = http_json(f"{base}/api/ps", timeout=timeout)
|
|
if not ok or not isinstance(ps_data, dict):
|
|
add_check(checks, "ollama:ps", "fail", error or "cannot read /api/ps")
|
|
return
|
|
|
|
resident = extract_ollama_tags(ps_data)
|
|
add_check(checks, "ollama:ps", "ok", f"/api/ps readable, {len(resident)} resident model(s)")
|
|
if resident_warn_model in resident:
|
|
add_check(checks, "ollama:resident-vlm", "ok", f"{resident_warn_model} resident")
|
|
else:
|
|
add_check(
|
|
checks,
|
|
"ollama:resident-vlm",
|
|
"warn",
|
|
f"{resident_warn_model} is not resident; no warmup was attempted",
|
|
{"resident": sorted(resident)},
|
|
)
|
|
|
|
|
|
def print_text(report: dict[str, Any]) -> None:
|
|
print(f"Lea micro preflight: {report['overall'].upper()}")
|
|
print(f"Timestamp: {report['timestamp']}")
|
|
print("Warmup: disabled")
|
|
print()
|
|
for check in report["checks"]:
|
|
print(f"[{check['status'].upper():4}] {check['name']} - {check['summary']}")
|
|
|
|
|
|
def parse_args(argv: list[str]) -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
parser.add_argument("--json", action="store_true", help="print machine-readable JSON")
|
|
parser.add_argument("--strict", action="store_true", help="exit 1 when only warnings are present")
|
|
parser.add_argument("--ollama-url", default=os.environ.get("OLLAMA_BASE_URL", "http://127.0.0.1:11434"))
|
|
parser.add_argument("--timeout", type=float, default=4.0, help="per-check timeout in seconds")
|
|
parser.add_argument("--min-vram-free-mib", type=int, default=DEFAULT_MIN_VRAM_FREE_MIB)
|
|
parser.add_argument("--min-ram-available-mib", type=int, default=DEFAULT_MIN_RAM_AVAILABLE_MIB)
|
|
parser.add_argument("--max-swap-used-mib", type=int, default=DEFAULT_MAX_SWAP_USED_MIB)
|
|
parser.add_argument("--max-swap-used-pct", type=float, default=DEFAULT_MAX_SWAP_USED_PCT)
|
|
parser.add_argument(
|
|
"--required-model",
|
|
action="append",
|
|
dest="required_models",
|
|
help="required Ollama model tag; may be repeated",
|
|
)
|
|
parser.add_argument("--resident-warn-model", default=DEFAULT_RESIDENT_WARN_MODEL)
|
|
return parser.parse_args(argv)
|
|
|
|
|
|
def build_report(args: argparse.Namespace) -> dict[str, Any]:
|
|
required_models = tuple(args.required_models or REQUIRED_MODELS)
|
|
checks: list[dict[str, Any]] = []
|
|
|
|
check_gpu(checks, args.min_vram_free_mib, args.timeout)
|
|
check_memory(
|
|
checks,
|
|
args.min_ram_available_mib,
|
|
args.max_swap_used_mib,
|
|
args.max_swap_used_pct,
|
|
args.timeout,
|
|
)
|
|
check_ollama(checks, args.ollama_url, required_models, args.resident_warn_model, args.timeout)
|
|
|
|
return {
|
|
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z"),
|
|
"overall": summarize(checks),
|
|
"repo": str(REPO_ROOT),
|
|
"warmup": "disabled",
|
|
"thresholds": {
|
|
"min_vram_free_mib": args.min_vram_free_mib,
|
|
"min_ram_available_mib": args.min_ram_available_mib,
|
|
"max_swap_used_mib": args.max_swap_used_mib,
|
|
"max_swap_used_pct": args.max_swap_used_pct,
|
|
},
|
|
"ollama_url": args.ollama_url,
|
|
"required_models": list(required_models),
|
|
"checks": checks,
|
|
}
|
|
|
|
|
|
def main(argv: list[str]) -> int:
|
|
args = parse_args(argv)
|
|
report = build_report(args)
|
|
|
|
if args.json:
|
|
print(json.dumps(report, indent=2, sort_keys=True))
|
|
else:
|
|
print_text(report)
|
|
|
|
if report["overall"] == "fail":
|
|
return 2
|
|
if args.strict and report["overall"] == "warn":
|
|
return 1
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main(sys.argv[1:]))
|