1778 lines
67 KiB
Python
1778 lines
67 KiB
Python
#!/usr/bin/env python3
|
|
"""Lightweight validator for Lea short competence YAML files.
|
|
|
|
This module is deliberately offline-only: it reads YAML and trace files, but it
|
|
does not start services, load models, replay actions, or promote competences.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import re
|
|
import sys
|
|
from dataclasses import asdict, dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(REPO_ROOT))
|
|
|
|
try:
|
|
from agent_v0.agent_v1.ui.message_contract import (
|
|
MessageContractError,
|
|
format_supervised_pause_message,
|
|
)
|
|
except Exception: # pragma: no cover - partial deployments can still run basics
|
|
MessageContractError = ValueError
|
|
format_supervised_pause_message = None
|
|
|
|
LEARNING_STATES = {"observed", "candidate", "supervised", "stable"}
|
|
LEARNING_STATE_ORDER = ("observed", "candidate", "supervised", "stable")
|
|
METHODS_EXECUTION_MODES = {"alternatives", "sequence"}
|
|
PRIMITIVES_DIR = Path("data") / "primitives"
|
|
PRIMITIVE_PARAMETER_TYPES = {"str", "int", "bool", "list[str]", "dict", "dict_or_string"}
|
|
PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS = {
|
|
"schema_version",
|
|
"id",
|
|
"kind",
|
|
"marker_or_action",
|
|
"intent",
|
|
"version",
|
|
"parameters_schema",
|
|
"failure_message_template",
|
|
"created_at",
|
|
}
|
|
PRIMITIVE_FORBIDDEN_FIELDS = {
|
|
"learning_state",
|
|
"chain_refs",
|
|
"promotion",
|
|
"generalisation",
|
|
"failure_log",
|
|
"success_marker",
|
|
"preconditions",
|
|
"methods",
|
|
}
|
|
REQUIRED_TOP_LEVEL_KEYS = {
|
|
"schema_version",
|
|
"id",
|
|
"name",
|
|
"version",
|
|
"learning_state",
|
|
"intent",
|
|
"parameters",
|
|
"preconditions",
|
|
"methods",
|
|
"success_marker",
|
|
"failure_message_template",
|
|
"chain_refs",
|
|
"promotion",
|
|
}
|
|
BLOCKED_DURABLE_COORDINATE_KEYS = {
|
|
"x",
|
|
"y",
|
|
"left",
|
|
"top",
|
|
"width",
|
|
"height",
|
|
"w",
|
|
"h",
|
|
"pos",
|
|
"bbox",
|
|
"bounds",
|
|
"rect",
|
|
"coordinates",
|
|
"x_pct",
|
|
"y_pct",
|
|
"window_bounds",
|
|
"screen_resolution",
|
|
}
|
|
KEY_ALIASES = {
|
|
"cmd": "win",
|
|
"command": "win",
|
|
"meta": "win",
|
|
"super": "win",
|
|
"windows": "win",
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CompetenceValidationIssue:
|
|
code: str
|
|
detail: str
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class CompetenceValidationReport:
|
|
path: str
|
|
issues: tuple[CompetenceValidationIssue, ...]
|
|
|
|
@property
|
|
def valid(self) -> bool:
|
|
return not self.issues
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
data = asdict(self)
|
|
data["valid"] = self.valid
|
|
return data
|
|
|
|
|
|
def validate_competence_file(
|
|
path: str | Path,
|
|
*,
|
|
repo_root: str | Path | None = None,
|
|
) -> CompetenceValidationReport:
|
|
competence_path = Path(path)
|
|
root = Path(repo_root) if repo_root is not None else REPO_ROOT
|
|
issues: list[CompetenceValidationIssue] = []
|
|
|
|
try:
|
|
data = yaml.safe_load(competence_path.read_text(encoding="utf-8"))
|
|
except FileNotFoundError:
|
|
return CompetenceValidationReport(
|
|
str(competence_path),
|
|
(CompetenceValidationIssue("file_missing", f"{competence_path} does not exist"),),
|
|
)
|
|
except yaml.YAMLError as exc:
|
|
return CompetenceValidationReport(
|
|
str(competence_path),
|
|
(CompetenceValidationIssue("yaml_invalid", str(exc)),),
|
|
)
|
|
|
|
if not isinstance(data, dict):
|
|
return CompetenceValidationReport(
|
|
str(competence_path),
|
|
(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),),
|
|
)
|
|
|
|
_validate_required_shape(data, competence_path, issues)
|
|
_validate_promotion_state(data, issues)
|
|
_validate_t2_known_gaps(data, issues)
|
|
_validate_methods_execution(data, issues)
|
|
_validate_no_durable_coordinates(data, issues)
|
|
_validate_failure_message_template(data, issues)
|
|
_validate_preconditions(data, root, issues)
|
|
_validate_methods_and_trace(data, root, issues)
|
|
_validate_success_marker(data, root, issues)
|
|
_validate_chain_refs(data, root, issues)
|
|
|
|
return CompetenceValidationReport(str(competence_path), _dedupe_issues(issues))
|
|
|
|
|
|
def validate_primitive_file(
|
|
path: str | Path,
|
|
*,
|
|
repo_root: str | Path | None = None,
|
|
) -> CompetenceValidationReport:
|
|
primitive_path = Path(path)
|
|
issues: list[CompetenceValidationIssue] = []
|
|
|
|
try:
|
|
data = yaml.safe_load(primitive_path.read_text(encoding="utf-8"))
|
|
except FileNotFoundError:
|
|
return CompetenceValidationReport(
|
|
str(primitive_path),
|
|
(CompetenceValidationIssue("file_missing", f"{primitive_path} does not exist"),),
|
|
)
|
|
except yaml.YAMLError as exc:
|
|
return CompetenceValidationReport(
|
|
str(primitive_path),
|
|
(CompetenceValidationIssue("yaml_invalid", str(exc)),),
|
|
)
|
|
|
|
if not isinstance(data, dict):
|
|
return CompetenceValidationReport(
|
|
str(primitive_path),
|
|
(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),),
|
|
)
|
|
|
|
_validate_primitive_required_shape(data, primitive_path, issues)
|
|
_validate_primitive_parameters_schema(data, issues)
|
|
_validate_no_durable_coordinates(data, issues)
|
|
_validate_failure_message_template(data, issues)
|
|
|
|
return CompetenceValidationReport(str(primitive_path), _dedupe_issues(issues))
|
|
|
|
|
|
def validate_file(
|
|
path: str | Path,
|
|
*,
|
|
repo_root: str | Path | None = None,
|
|
) -> CompetenceValidationReport:
|
|
root = Path(repo_root) if repo_root is not None else REPO_ROOT
|
|
candidate_path = Path(path)
|
|
if _is_primitive_path(candidate_path, root):
|
|
return validate_primitive_file(candidate_path, repo_root=root)
|
|
return validate_competence_file(candidate_path, repo_root=root)
|
|
|
|
|
|
def _validate_required_shape(
|
|
data: dict[str, Any],
|
|
competence_path: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
missing = sorted(REQUIRED_TOP_LEVEL_KEYS - set(data.keys()))
|
|
for key in missing:
|
|
issues.append(CompetenceValidationIssue("missing_key", f"missing top-level key: {key}"))
|
|
|
|
if data.get("schema_version") != 1:
|
|
issues.append(CompetenceValidationIssue("schema_version", "schema_version must be 1"))
|
|
|
|
competence_id = data.get("id")
|
|
if not isinstance(competence_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", competence_id):
|
|
issues.append(CompetenceValidationIssue("id_invalid", "id must be a lowercase slug"))
|
|
elif competence_id != competence_path.stem:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"id_filename_mismatch",
|
|
f"id must match filename stem: id={competence_id!r} filename={competence_path.stem!r}",
|
|
)
|
|
)
|
|
|
|
version = data.get("version")
|
|
if not isinstance(version, int) or version < 1:
|
|
issues.append(CompetenceValidationIssue("version_invalid", "version must be a positive integer"))
|
|
|
|
state = data.get("learning_state")
|
|
if state not in LEARNING_STATES:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"learning_state_invalid",
|
|
f"learning_state must be one of {sorted(LEARNING_STATES)}",
|
|
)
|
|
)
|
|
|
|
for key in ("intent", "parameters", "success_marker", "failure_message_template", "chain_refs", "promotion"):
|
|
if key in data and not isinstance(data.get(key), dict):
|
|
issues.append(CompetenceValidationIssue("mapping_expected", f"{key} must be a mapping"))
|
|
|
|
for key in ("preconditions", "methods"):
|
|
if key in data and not isinstance(data.get(key), list):
|
|
issues.append(CompetenceValidationIssue("list_expected", f"{key} must be a list"))
|
|
|
|
|
|
def _validate_primitive_required_shape(
|
|
data: dict[str, Any],
|
|
primitive_path: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
missing = sorted(PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS - set(data.keys()))
|
|
for key in missing:
|
|
issues.append(CompetenceValidationIssue("primitive_missing_key", f"missing primitive key: {key}"))
|
|
|
|
for key in sorted(PRIMITIVE_FORBIDDEN_FIELDS & set(data.keys())):
|
|
issues.append(CompetenceValidationIssue("primitive_forbidden_field", f"primitive must not define {key}"))
|
|
|
|
if data.get("schema_version") != 1:
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "schema_version must be 1"))
|
|
|
|
primitive_id = data.get("id")
|
|
if not isinstance(primitive_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_id):
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "id must be a lowercase slug"))
|
|
elif primitive_id != primitive_path.stem:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_id_filename_mismatch",
|
|
f"id must match filename stem: id={primitive_id!r} filename={primitive_path.stem!r}",
|
|
)
|
|
)
|
|
|
|
if data.get("kind") != "primitive":
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "kind must be primitive"))
|
|
|
|
if data.get("marker_or_action") not in {"action", "marker"}:
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "marker_or_action must be action or marker"))
|
|
|
|
version = data.get("version")
|
|
if not isinstance(version, int) or version < 1:
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "version must be a positive integer"))
|
|
|
|
intent = data.get("intent")
|
|
if not isinstance(intent, dict) or not isinstance(intent.get("fr"), str) or not intent.get("fr", "").strip():
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "intent.fr must be non-empty text"))
|
|
|
|
if "executor_kind" in data and (not isinstance(data.get("executor_kind"), str) or not data.get("executor_kind", "").strip()):
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "executor_kind must be non-empty text"))
|
|
|
|
if "notes" in data and not _is_string_list(data.get("notes")):
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "notes must be a non-empty text list"))
|
|
|
|
if "last_updated_at" in data and not isinstance(data.get("last_updated_at"), str):
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "last_updated_at must be text"))
|
|
|
|
|
|
def _validate_primitive_parameters_schema(
|
|
data: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
schema = data.get("parameters_schema")
|
|
if not isinstance(schema, dict) or not schema:
|
|
issues.append(CompetenceValidationIssue("primitive_file_invalid", "parameters_schema must be a non-empty mapping"))
|
|
return
|
|
|
|
for param_name, spec in schema.items():
|
|
if not isinstance(param_name, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", param_name):
|
|
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", "parameter names must be lowercase slugs"))
|
|
continue
|
|
if not isinstance(spec, dict):
|
|
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name} schema must be a mapping"))
|
|
continue
|
|
|
|
param_type = spec.get("type")
|
|
if param_type not in PRIMITIVE_PARAMETER_TYPES:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_param_schema_invalid",
|
|
f"{param_name}.type must be one of {sorted(PRIMITIVE_PARAMETER_TYPES)}",
|
|
)
|
|
)
|
|
|
|
required = spec.get("required")
|
|
if required is not None and not isinstance(required, bool):
|
|
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.required must be bool"))
|
|
if required is True and "default" in spec:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_param_schema_invalid",
|
|
f"{param_name} cannot define default when required=true",
|
|
)
|
|
)
|
|
|
|
required_unless = spec.get("required_unless")
|
|
if required_unless is not None:
|
|
if not _is_string_list(required_unless):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_param_schema_invalid",
|
|
f"{param_name}.required_unless must be a non-empty text list",
|
|
)
|
|
)
|
|
else:
|
|
missing_refs = [name for name in required_unless if name not in schema]
|
|
if missing_refs:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_param_schema_invalid",
|
|
f"{param_name}.required_unless references unknown parameters: {missing_refs}",
|
|
)
|
|
)
|
|
|
|
description = spec.get("description")
|
|
if not isinstance(description, str) or not description.strip():
|
|
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.description is required"))
|
|
|
|
constraints = spec.get("constraints")
|
|
if constraints is not None and not isinstance(constraints, dict):
|
|
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.constraints must be a mapping"))
|
|
elif isinstance(constraints, dict):
|
|
enum = constraints.get("enum")
|
|
if enum is not None and (not isinstance(enum, list) or not enum):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"parameters_schema.{param_name}.constraints.enum must be a non-empty list",
|
|
)
|
|
)
|
|
|
|
for min_key in ("min", "min_value"):
|
|
min_value = constraints.get(min_key)
|
|
if min_value is not None and (not isinstance(min_value, int) or isinstance(min_value, bool)):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"parameters_schema.{param_name}.constraints.{min_key} must be an integer",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_promotion_state(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
|
|
state = data.get("learning_state")
|
|
if state not in LEARNING_STATES:
|
|
return
|
|
|
|
promotion = data.get("promotion") if isinstance(data.get("promotion"), dict) else {}
|
|
stable_requires = promotion.get("stable_requires") if isinstance(promotion.get("stable_requires"), dict) else {}
|
|
min_successes = stable_requires.get("min_successes")
|
|
if min_successes is not None and (not isinstance(min_successes, int) or min_successes < 3):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"promotion_stable_requires",
|
|
"promotion.stable_requires.min_successes must be at least 3",
|
|
)
|
|
)
|
|
|
|
if state == "observed":
|
|
return
|
|
|
|
chain_refs = data.get("chain_refs") if isinstance(data.get("chain_refs"), dict) else {}
|
|
cleaned = chain_refs.get("cleaned_segment") if isinstance(chain_refs.get("cleaned_segment"), dict) else {}
|
|
generalisation = data.get("generalisation") if isinstance(data.get("generalisation"), dict) else {}
|
|
seen_contexts = generalisation.get("seen_contexts") if isinstance(generalisation.get("seen_contexts"), list) else []
|
|
history = promotion.get("history") if isinstance(promotion.get("history"), list) else []
|
|
|
|
if state == "candidate":
|
|
if cleaned.get("status") != "documented_offline":
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"learning_state_premature",
|
|
"candidate requires chain_refs.cleaned_segment.status=documented_offline",
|
|
)
|
|
)
|
|
return
|
|
|
|
if state == "supervised":
|
|
if not seen_contexts or not history:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"learning_state_premature",
|
|
"supervised requires seen contexts and promotion.history",
|
|
)
|
|
)
|
|
return
|
|
|
|
if state == "stable":
|
|
if len(seen_contexts) < 3 or len(_distinct_context_signatures(seen_contexts)) < 3:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"learning_state_premature",
|
|
"stable requires at least 3 distinct seen contexts",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_t2_known_gaps(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
|
|
promotion = data.get("promotion")
|
|
if not isinstance(promotion, dict):
|
|
return
|
|
|
|
gaps = promotion.get("t2_known_gaps")
|
|
if gaps is None:
|
|
return
|
|
if not isinstance(gaps, list):
|
|
issues.append(CompetenceValidationIssue("t2_known_gap_invalid", "promotion.t2_known_gaps must be a list"))
|
|
return
|
|
|
|
required_keys = ("id", "description", "impact", "proposed_resolution")
|
|
optional_text_keys = ("acted_by", "acted_at")
|
|
for index, gap in enumerate(gaps):
|
|
if not isinstance(gap, dict):
|
|
issues.append(
|
|
CompetenceValidationIssue("t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}] must be a mapping")
|
|
)
|
|
continue
|
|
|
|
gap_id = gap.get("id")
|
|
if isinstance(gap_id, str) and gap_id.strip() and not re.fullmatch(r"[a-z][a-z0-9_]*", gap_id):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"t2_known_gap_invalid",
|
|
f"promotion.t2_known_gaps[{index}].id must be a lowercase slug",
|
|
)
|
|
)
|
|
|
|
for key in required_keys:
|
|
value = gap.get(key)
|
|
if not isinstance(value, str) or not value.strip():
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"t2_known_gap_invalid",
|
|
f"promotion.t2_known_gaps[{index}].{key} is required",
|
|
)
|
|
)
|
|
|
|
for key in optional_text_keys:
|
|
if key in gap and (not isinstance(gap.get(key), str) or not gap.get(key, "").strip()):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"t2_known_gap_invalid",
|
|
f"promotion.t2_known_gaps[{index}].{key} must be non-empty text when present",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_methods_execution(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
|
|
mode = data.get("methods_execution", "alternatives")
|
|
if mode not in METHODS_EXECUTION_MODES:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"methods_sequence_invalid",
|
|
f"methods_execution must be one of {sorted(METHODS_EXECUTION_MODES)}",
|
|
)
|
|
)
|
|
return
|
|
|
|
methods = data.get("methods")
|
|
if not isinstance(methods, list):
|
|
return
|
|
|
|
if mode == "sequence" and len(methods) < 2:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"methods_sequence_invalid",
|
|
"methods_execution=sequence requires at least two methods",
|
|
)
|
|
)
|
|
return
|
|
|
|
keep_indices = _cleaned_keep_indices(data)
|
|
method_indices = _cleaned_method_indices(data)
|
|
seen_ids: set[str] = set()
|
|
last_trace_index = -1
|
|
for index, method in enumerate(methods):
|
|
if not isinstance(method, dict):
|
|
continue
|
|
|
|
method_id = method.get("id")
|
|
if mode == "sequence" and isinstance(method_id, str) and method_id.strip():
|
|
if method_id in seen_ids:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"methods_sequence_invalid",
|
|
f"methods[{index}].id must be unique in sequence mode",
|
|
)
|
|
)
|
|
seen_ids.add(method_id)
|
|
|
|
if method.get("observed") is not True:
|
|
continue
|
|
|
|
trace_indices = method.get("trace_event_indices")
|
|
if trace_indices is None and mode != "sequence":
|
|
continue
|
|
trace_issue_code = "methods_sequence_invalid" if mode == "sequence" else "method_trace_missing"
|
|
if not _is_int_list(trace_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
trace_issue_code,
|
|
f"methods[{index}].trace_event_indices must be a non-empty integer list",
|
|
)
|
|
)
|
|
continue
|
|
|
|
if keep_indices is not None:
|
|
missing_keep_indices = [event_index for event_index in trace_indices if event_index not in keep_indices]
|
|
if missing_keep_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
trace_issue_code,
|
|
f"methods[{index}].trace_event_indices must be included in keep_event_indices: {missing_keep_indices}",
|
|
)
|
|
)
|
|
|
|
if method_indices is not None:
|
|
missing_method_indices = [event_index for event_index in trace_indices if event_index not in method_indices]
|
|
if missing_method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
trace_issue_code,
|
|
f"methods[{index}].trace_event_indices must be included in method_event_indices: {missing_method_indices}",
|
|
)
|
|
)
|
|
|
|
if mode == "sequence" and min(trace_indices) <= last_trace_index:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"methods_sequence_invalid",
|
|
f"methods[{index}].trace_event_indices must follow previous observed step",
|
|
)
|
|
)
|
|
if mode == "sequence":
|
|
last_trace_index = max(last_trace_index, max(trace_indices))
|
|
|
|
|
|
def _validate_no_durable_coordinates(data: Any, issues: list[CompetenceValidationIssue], path: str = "") -> None:
|
|
if isinstance(data, dict):
|
|
for key, value in data.items():
|
|
key_text = str(key)
|
|
key_path = f"{path}.{key_text}" if path else key_text
|
|
key_lower = key_text.lower()
|
|
pct_relative_offset = key_lower in {"x_pct", "y_pct"} and path.endswith("relative_offset")
|
|
if key_lower in BLOCKED_DURABLE_COORDINATE_KEYS and not pct_relative_offset:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"durable_coordinate_key",
|
|
f"durable competence data must not store coordinates: {key_path}",
|
|
)
|
|
)
|
|
_validate_no_durable_coordinates(value, issues, key_path)
|
|
elif isinstance(data, list):
|
|
for index, value in enumerate(data):
|
|
_validate_no_durable_coordinates(value, issues, f"{path}[{index}]")
|
|
|
|
|
|
def _validate_failure_message_template(
|
|
data: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
template = data.get("failure_message_template")
|
|
if not isinstance(template, dict):
|
|
return
|
|
|
|
required = ("intention", "attendu", "vu", "demande")
|
|
for key in required:
|
|
if not isinstance(template.get(key), str) or not template.get(key, "").strip():
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"failure_message_template",
|
|
f"failure_message_template.{key} must be non-empty text",
|
|
)
|
|
)
|
|
if any(key not in template for key in required):
|
|
return
|
|
|
|
if format_supervised_pause_message is None:
|
|
return
|
|
|
|
try:
|
|
format_supervised_pause_message(
|
|
intention=template["intention"],
|
|
attendu=template["attendu"],
|
|
vu=template["vu"].replace(
|
|
"{observed_human_state}",
|
|
"la fenetre attendue n'est pas visible",
|
|
),
|
|
demande=template["demande"],
|
|
)
|
|
except MessageContractError as exc:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"failure_message_contract",
|
|
str(exc),
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_preconditions(
|
|
data: dict[str, Any],
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
preconditions = data.get("preconditions")
|
|
if not isinstance(preconditions, list):
|
|
return
|
|
|
|
competence_id = data.get("id")
|
|
for index, precondition in enumerate(preconditions):
|
|
if not isinstance(precondition, dict):
|
|
continue
|
|
if precondition.get("kind") != "competence_required":
|
|
continue
|
|
|
|
dependency = precondition.get("competence")
|
|
state = precondition.get("state")
|
|
if not isinstance(dependency, str) or not dependency.strip():
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"competence_dependency_invalid",
|
|
f"preconditions[{index}].competence must be non-empty text",
|
|
)
|
|
)
|
|
continue
|
|
if dependency == competence_id:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"competence_dependency_invalid",
|
|
f"preconditions[{index}] must not depend on itself",
|
|
)
|
|
)
|
|
if state not in LEARNING_STATES:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"competence_dependency_invalid",
|
|
f"preconditions[{index}].state must be one of {sorted(LEARNING_STATES)}",
|
|
)
|
|
)
|
|
continue
|
|
|
|
dependency_path = _find_competence_dependency_path(
|
|
repo_root,
|
|
dependency,
|
|
minimum_state=str(state),
|
|
)
|
|
if not dependency_path.is_file():
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"competence_dependency_missing",
|
|
f"required competence not found: {dependency} with minimum state {state}",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_methods_and_trace(
|
|
data: dict[str, Any],
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
methods = data.get("methods")
|
|
if not isinstance(methods, list):
|
|
return
|
|
|
|
if not methods:
|
|
issues.append(CompetenceValidationIssue("methods_empty", "at least one method is required"))
|
|
return
|
|
|
|
source_events = _load_source_events(data, repo_root, issues)
|
|
keep_indices = _cleaned_keep_indices(data)
|
|
|
|
for index, method in enumerate(methods):
|
|
if not isinstance(method, dict):
|
|
issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}] must be a mapping"))
|
|
continue
|
|
|
|
for key in ("id", "kind"):
|
|
if not isinstance(method.get(key), str) or not method.get(key, "").strip():
|
|
issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}].{key} is required"))
|
|
|
|
kind = method.get("kind")
|
|
_validate_method_primitive_ref(method, kind, index, repo_root, issues)
|
|
if kind == "key_combo":
|
|
keys = _method_key_combo_keys(method)
|
|
if not _is_string_list(keys):
|
|
issues.append(CompetenceValidationIssue("method_keys_invalid", f"methods[{index}].keys must be text list"))
|
|
continue
|
|
if method.get("observed") is True:
|
|
if not method.get("trace_source"):
|
|
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
|
|
trace_indices = _method_trace_indices(method) or keep_indices
|
|
if source_events is not None and not _trace_has_key_combo(source_events, trace_indices, keys):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"observed key_combo {keys!r} not found in cleaned source segment",
|
|
)
|
|
)
|
|
elif kind == "text_input" and method.get("observed") is True:
|
|
if not method.get("trace_source"):
|
|
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
|
|
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
|
|
if source_events is None:
|
|
continue
|
|
if not method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"observed text_input method {method.get('id') or index} requires method_event_indices",
|
|
)
|
|
)
|
|
continue
|
|
non_text_indices = [
|
|
event_index
|
|
for event_index in method_indices
|
|
if event_index >= len(source_events)
|
|
or source_events[event_index].get("type") != "text_input"
|
|
]
|
|
if non_text_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"method_event_indices contain non text_input events: {non_text_indices}",
|
|
)
|
|
)
|
|
continue
|
|
|
|
reconstructed = method.get("reconstructed_text")
|
|
if isinstance(reconstructed, str):
|
|
observed_text = _concat_text_input_events(source_events, method_indices)
|
|
if observed_text != reconstructed:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_reconstructed_text_mismatch",
|
|
f"reconstructed_text={reconstructed!r} trace_text={observed_text!r}",
|
|
)
|
|
)
|
|
elif kind == "scroll" and method.get("observed") is True:
|
|
if not method.get("trace_source"):
|
|
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
|
|
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
|
|
if source_events is None:
|
|
continue
|
|
if not method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"observed scroll method {method.get('id') or index} requires trace_event_indices or method_event_indices",
|
|
)
|
|
)
|
|
continue
|
|
_validate_scroll_method_trace(method, index, source_events, method_indices, issues)
|
|
elif kind == "click" and method.get("observed") is True:
|
|
if not method.get("trace_source"):
|
|
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
|
|
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
|
|
if source_events is None:
|
|
continue
|
|
if not method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"observed click method {method.get('id') or index} requires trace_event_indices or method_event_indices",
|
|
)
|
|
)
|
|
continue
|
|
_validate_click_method_trace(index, source_events, method_indices, issues)
|
|
elif kind == "wait_state" and method.get("observed") is True:
|
|
if not method.get("trace_source"):
|
|
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
|
|
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
|
|
if source_events is None:
|
|
continue
|
|
if not method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"observed wait_state method {method.get('id') or index} requires trace_event_indices or method_event_indices",
|
|
)
|
|
)
|
|
continue
|
|
_validate_wait_state_method_trace(index, source_events, method_indices, issues)
|
|
|
|
|
|
def _validate_method_primitive_ref(
|
|
method: dict[str, Any],
|
|
method_kind: Any,
|
|
method_index: int,
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
primitive_ref = method.get("primitive_ref")
|
|
if primitive_ref is None:
|
|
return
|
|
|
|
if not isinstance(primitive_ref, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_ref):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_ref_invalid",
|
|
f"methods[{method_index}].primitive_ref must be a lowercase slug",
|
|
)
|
|
)
|
|
return
|
|
|
|
primitive_path = repo_root / PRIMITIVES_DIR / f"{primitive_ref}.yaml"
|
|
if not primitive_path.is_file():
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_ref_unknown",
|
|
f"primitive_ref={primitive_ref!r}: file not found: {primitive_path.relative_to(repo_root)}",
|
|
)
|
|
)
|
|
return
|
|
|
|
primitive_report = validate_primitive_file(primitive_path, repo_root=repo_root)
|
|
if not primitive_report.valid:
|
|
for issue in primitive_report.issues:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_file_invalid",
|
|
f"primitive_ref={primitive_ref!r}: {issue.code}: {issue.detail}",
|
|
)
|
|
)
|
|
return
|
|
|
|
primitive = _read_yaml_mapping(primitive_path, issues)
|
|
if primitive is None:
|
|
return
|
|
|
|
expected_kind = primitive.get("executor_kind")
|
|
if isinstance(expected_kind, str) and method_kind != expected_kind:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_kind_mismatch",
|
|
f"primitive_ref={primitive_ref!r} requires kind={expected_kind!r}, got kind={method_kind!r}",
|
|
)
|
|
)
|
|
|
|
parameters = method.get("parameters")
|
|
if parameters is None:
|
|
parameters = {}
|
|
if not isinstance(parameters, dict):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} requires methods[{method_index}].parameters to be a mapping",
|
|
)
|
|
)
|
|
return
|
|
|
|
schema = primitive.get("parameters_schema")
|
|
if not isinstance(schema, dict):
|
|
return
|
|
|
|
for param_name, spec in schema.items():
|
|
if not isinstance(spec, dict):
|
|
continue
|
|
_validate_primitive_method_parameter(primitive_ref, param_name, spec, parameters, issues)
|
|
|
|
if primitive_ref == "click_anchor":
|
|
_validate_click_anchor_parameters(parameters, issues)
|
|
if primitive_ref == "wait_for_state":
|
|
_validate_wait_for_state_parameters(parameters, issues)
|
|
|
|
|
|
def _validate_scroll_method_trace(
|
|
method: dict[str, Any],
|
|
method_index: int,
|
|
events: list[dict[str, Any]],
|
|
indices: list[int],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
direction = _method_scroll_direction(method)
|
|
for event_index in indices:
|
|
if event_index >= len(events) or events[event_index].get("type") != "mouse_scroll":
|
|
observed_type = events[event_index].get("type") if event_index < len(events) else None
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"methods[{method_index}] expects type=mouse_scroll, got type={observed_type!r} at event #{event_index}",
|
|
)
|
|
)
|
|
continue
|
|
|
|
delta = events[event_index].get("delta")
|
|
if not _is_scroll_delta(delta):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_scroll_delta_missing",
|
|
f"methods[{method_index}] points event #{event_index} type=mouse_scroll without usable delta field",
|
|
)
|
|
)
|
|
continue
|
|
|
|
if isinstance(direction, str) and not _scroll_delta_matches_direction(delta, direction):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_scroll_direction_mismatch",
|
|
f"methods[{method_index}] direction={direction!r} does not match delta={delta!r} at event #{event_index}",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_click_method_trace(
|
|
method_index: int,
|
|
events: list[dict[str, Any]],
|
|
indices: list[int],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
for event_index in indices:
|
|
if event_index >= len(events) or events[event_index].get("type") != "mouse_click":
|
|
observed_type = events[event_index].get("type") if event_index < len(events) else None
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"methods[{method_index}] expects type=mouse_click, got type={observed_type!r} at event #{event_index}",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_wait_state_method_trace(
|
|
method_index: int,
|
|
events: list[dict[str, Any]],
|
|
indices: list[int],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
for event_index in indices:
|
|
event = events[event_index] if event_index < len(events) else {}
|
|
event_type = event.get("type")
|
|
if event_type == "window_focus_change" and (_event_title(event) or _event_process(event)):
|
|
continue
|
|
if event_type == "heartbeat" and (_event_title(event) or _event_process(event)):
|
|
continue
|
|
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"method_trace_missing",
|
|
f"methods[{method_index}] expects durable wait_state evidence, got type={event_type!r} at event #{event_index}",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_click_anchor_parameters(
|
|
parameters: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
anchor_ref = parameters.get("anchor_ref")
|
|
if not (
|
|
isinstance(anchor_ref, str)
|
|
and anchor_ref.strip()
|
|
or isinstance(anchor_ref, dict)
|
|
and bool(anchor_ref)
|
|
):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_anchor_ref_invalid",
|
|
"click_anchor requires anchor_ref as non-empty string or mapping",
|
|
)
|
|
)
|
|
|
|
click_count = parameters.get("click_count", 1)
|
|
if not isinstance(click_count, int) or isinstance(click_count, bool) or click_count < 1 or click_count > 2:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_click_count_out_of_range",
|
|
"click_anchor click_count must be 1 or 2",
|
|
)
|
|
)
|
|
|
|
if "relative_offset" in parameters:
|
|
_validate_click_relative_offset(parameters.get("relative_offset"), issues)
|
|
|
|
|
|
def _validate_click_relative_offset(
|
|
offset: Any,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
if not isinstance(offset, dict):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_relative_offset_invalid",
|
|
"click_anchor relative_offset must be a mapping",
|
|
)
|
|
)
|
|
return
|
|
|
|
keys = set(offset.keys())
|
|
if keys == {"x_pct", "y_pct"}:
|
|
if not all(_is_number_in_range(offset[key], 0.0, 1.0) for key in ("x_pct", "y_pct")):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_relative_offset_invalid",
|
|
"click_anchor relative_offset x_pct/y_pct must be numbers between 0.0 and 1.0",
|
|
)
|
|
)
|
|
return
|
|
|
|
if keys == {"dx", "dy"}:
|
|
if not all(_is_number_in_range(offset[key], -0.5, 0.5) for key in ("dx", "dy")):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_relative_offset_invalid",
|
|
"click_anchor relative_offset dx/dy must be numbers between -0.5 and 0.5",
|
|
)
|
|
)
|
|
return
|
|
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_relative_offset_invalid",
|
|
"click_anchor relative_offset must use exactly x_pct/y_pct or dx/dy",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_wait_for_state_parameters(
|
|
parameters: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
expected_state = parameters.get("expected_state")
|
|
if not isinstance(expected_state, dict) or not expected_state:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_expected_state_invalid",
|
|
"wait_for_state expected_state must be a non-empty mapping",
|
|
)
|
|
)
|
|
|
|
timeout_ms = parameters.get("timeout_ms", 5000)
|
|
if not _is_int_in_range(timeout_ms, 100, 60000):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_wait_timeout_invalid",
|
|
"wait_for_state timeout_ms must be an integer between 100 and 60000",
|
|
)
|
|
)
|
|
|
|
poll_interval_ms = parameters.get("poll_interval_ms", 250)
|
|
if not _is_int_in_range(poll_interval_ms, 50, 5000):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_poll_interval_invalid",
|
|
"wait_for_state poll_interval_ms must be an integer between 50 and 5000",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_primitive_method_parameter(
|
|
primitive_ref: str,
|
|
param_name: str,
|
|
spec: dict[str, Any],
|
|
parameters: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
required = spec.get("required") is True
|
|
required_unless = spec.get("required_unless")
|
|
is_present = param_name in parameters
|
|
|
|
if required and not is_present:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} requires parameter {param_name!r}",
|
|
)
|
|
)
|
|
return
|
|
|
|
if _is_string_list(required_unless):
|
|
alternatives_present = [name for name in required_unless if name in parameters]
|
|
if is_present and alternatives_present:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameters {param_name!r} and {alternatives_present!r} are mutually exclusive",
|
|
)
|
|
)
|
|
elif not is_present and not alternatives_present:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} requires parameter {param_name!r} unless one of {required_unless!r} is present",
|
|
)
|
|
)
|
|
return
|
|
|
|
if is_present and not _primitive_value_matches_type(parameters[param_name], spec.get("type")):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameter {param_name!r} has invalid type {spec.get('type')!r}",
|
|
)
|
|
)
|
|
|
|
constraints = spec.get("constraints")
|
|
if is_present and isinstance(constraints, dict):
|
|
_validate_primitive_method_parameter_constraints(
|
|
primitive_ref,
|
|
param_name,
|
|
parameters[param_name],
|
|
constraints,
|
|
issues,
|
|
)
|
|
|
|
|
|
def _validate_primitive_method_parameter_constraints(
|
|
primitive_ref: str,
|
|
param_name: str,
|
|
value: Any,
|
|
constraints: dict[str, Any],
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
min_length = constraints.get("min_length")
|
|
if isinstance(min_length, int) and hasattr(value, "__len__") and len(value) < min_length:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must have length >= {min_length}",
|
|
)
|
|
)
|
|
|
|
min_value = constraints.get("min", constraints.get("min_value"))
|
|
if isinstance(min_value, int) and isinstance(value, int) and not isinstance(value, bool) and value < min_value:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be >= {min_value}",
|
|
)
|
|
)
|
|
|
|
enum = constraints.get("enum")
|
|
if isinstance(enum, list) and value not in enum:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be one of {enum!r}",
|
|
)
|
|
)
|
|
|
|
regex = constraints.get("regex")
|
|
if isinstance(regex, str) and isinstance(value, str) and not re.fullmatch(regex, value):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"primitive_schema_invalid",
|
|
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must match {regex!r}",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_success_marker(
|
|
data: dict[str, Any],
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
marker = data.get("success_marker")
|
|
if not isinstance(marker, dict):
|
|
return
|
|
|
|
if marker.get("mode") not in {"any_of", "all_of"}:
|
|
issues.append(CompetenceValidationIssue("success_marker_mode", "success_marker.mode must be any_of or all_of"))
|
|
|
|
timeout_ms = marker.get("timeout_ms")
|
|
if not isinstance(timeout_ms, int) or timeout_ms <= 0:
|
|
issues.append(CompetenceValidationIssue("success_marker_timeout", "success_marker.timeout_ms must be positive"))
|
|
|
|
markers = marker.get("markers")
|
|
if not isinstance(markers, list) or not markers:
|
|
issues.append(CompetenceValidationIssue("success_marker_markers", "success_marker.markers must be a non-empty list"))
|
|
return
|
|
|
|
for index, item in enumerate(markers):
|
|
if not isinstance(item, dict) or not item.get("kind"):
|
|
issues.append(CompetenceValidationIssue("success_marker_invalid", f"markers[{index}] must define kind"))
|
|
|
|
supervised_requires = marker.get("supervised_requires")
|
|
if supervised_requires is not None and not isinstance(supervised_requires, list):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"success_marker_invalid",
|
|
"success_marker.supervised_requires must be a list when present",
|
|
)
|
|
)
|
|
|
|
source_events = _load_source_events(data, repo_root, issues)
|
|
if source_events is None:
|
|
return
|
|
|
|
keep_indices = _cleaned_keep_indices(data)
|
|
method_indices = _cleaned_method_indices(data)
|
|
match_indices = _trace_success_marker_match_indices(
|
|
source_events,
|
|
keep_indices,
|
|
markers,
|
|
)
|
|
if not match_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"success_marker_missing",
|
|
"no success marker matches the cleaned source segment",
|
|
)
|
|
)
|
|
return
|
|
|
|
if method_indices:
|
|
min_success_index = _minimum_success_index_after_methods(data, method_indices)
|
|
if not any(index >= min_success_index for index in match_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"success_marker_pre_method",
|
|
"success marker must match an event after the observed method",
|
|
)
|
|
)
|
|
|
|
|
|
def _validate_chain_refs(
|
|
data: dict[str, Any],
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> None:
|
|
chain_refs = data.get("chain_refs")
|
|
if not isinstance(chain_refs, dict):
|
|
return
|
|
|
|
if not isinstance(chain_refs.get("source_session"), str) or not chain_refs.get("source_session", "").strip():
|
|
issues.append(CompetenceValidationIssue("chain_ref_missing", "chain_refs.source_session is required"))
|
|
|
|
cleaned = chain_refs.get("cleaned_segment")
|
|
if not isinstance(cleaned, dict):
|
|
issues.append(CompetenceValidationIssue("cleaned_segment_missing", "chain_refs.cleaned_segment is required"))
|
|
return
|
|
|
|
source_event_format = cleaned.get("source_event_format")
|
|
if source_event_format is not None and source_event_format not in {"streaming_session_json", "raw_live_events_jsonl"}:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_source",
|
|
"cleaned_segment.source_event_format must be streaming_session_json or raw_live_events_jsonl",
|
|
)
|
|
)
|
|
required_path_key = "live_events_path" if source_event_format == "raw_live_events_jsonl" else "streaming_session_path"
|
|
for key in ("streaming_session_path", "live_events_path"):
|
|
path_value = chain_refs.get(key)
|
|
if key == required_path_key and (not isinstance(path_value, str) or not path_value.strip()):
|
|
issues.append(CompetenceValidationIssue("chain_ref_missing", f"chain_refs.{key} is required"))
|
|
continue
|
|
if isinstance(path_value, str) and path_value.strip():
|
|
resolved = _repo_path(repo_root, path_value)
|
|
if not resolved.is_file():
|
|
issues.append(CompetenceValidationIssue("chain_ref_path_missing", f"{key} not found: {path_value}"))
|
|
|
|
keep_indices = cleaned.get("keep_event_indices")
|
|
if not isinstance(keep_indices, list) or not keep_indices or not all(isinstance(i, int) and i >= 0 for i in keep_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_indices",
|
|
"cleaned_segment.keep_event_indices must be a non-empty list of positive indices",
|
|
)
|
|
)
|
|
|
|
stop_before = cleaned.get("stop_before")
|
|
if not isinstance(stop_before, list) or not stop_before:
|
|
issues.append(CompetenceValidationIssue("cleaned_segment_stop", "cleaned_segment.stop_before must document cut reasons"))
|
|
|
|
method_indices = cleaned.get("method_event_indices")
|
|
success_indices = cleaned.get("success_event_indices")
|
|
if method_indices is not None and not _is_int_list(method_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_indices",
|
|
"cleaned_segment.method_event_indices must be a list of positive indices",
|
|
)
|
|
)
|
|
if success_indices is not None and not _is_int_list(success_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_indices",
|
|
"cleaned_segment.success_event_indices must be a list of positive indices",
|
|
)
|
|
)
|
|
if _is_int_list(method_indices) and _is_int_list(success_indices):
|
|
min_success_index = _minimum_success_index_after_methods(data, method_indices)
|
|
if not any(index >= min_success_index for index in success_indices):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"success_marker_pre_method",
|
|
"cleaned_segment.success_event_indices must be after method_event_indices",
|
|
)
|
|
)
|
|
if _is_int_list(method_indices) and isinstance(keep_indices, list):
|
|
missing_method_indices = [index for index in method_indices if index not in keep_indices]
|
|
if missing_method_indices:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_indices",
|
|
f"method_event_indices must be included in keep_event_indices: {missing_method_indices}",
|
|
)
|
|
)
|
|
|
|
source_events = _load_source_events(data, repo_root, issues)
|
|
if source_events is not None and isinstance(keep_indices, list):
|
|
for index in keep_indices:
|
|
if isinstance(index, int) and index >= len(source_events):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"cleaned_segment_indices",
|
|
f"cleaned segment index out of range: {index}",
|
|
)
|
|
)
|
|
|
|
|
|
def _load_source_events(
|
|
data: dict[str, Any],
|
|
repo_root: Path,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> list[dict[str, Any]] | None:
|
|
chain_refs = data.get("chain_refs")
|
|
if not isinstance(chain_refs, dict):
|
|
return None
|
|
|
|
source_format = _cleaned_source_event_format(data)
|
|
path_key = "live_events_path" if source_format == "raw_live_events_jsonl" else "streaming_session_path"
|
|
path_value = chain_refs.get(path_key)
|
|
if not isinstance(path_value, str) or not path_value:
|
|
return None
|
|
|
|
path = _repo_path(repo_root, path_value)
|
|
if not path.is_file():
|
|
return None
|
|
|
|
if source_format == "raw_live_events_jsonl":
|
|
return _load_jsonl_source_events(path, str(chain_refs.get("source_session") or ""), issues)
|
|
|
|
try:
|
|
payload = json.loads(path.read_text(encoding="utf-8"))
|
|
except (OSError, json.JSONDecodeError) as exc:
|
|
issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}"))
|
|
return None
|
|
|
|
source_session = chain_refs.get("source_session")
|
|
if source_session and payload.get("session_id") != source_session:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"source_session_mismatch",
|
|
f"source session mismatch: YAML={source_session} trace={payload.get('session_id')}",
|
|
)
|
|
)
|
|
|
|
raw_events = payload.get("events")
|
|
if not isinstance(raw_events, list):
|
|
issues.append(CompetenceValidationIssue("source_session_invalid", "source session events must be a list"))
|
|
return None
|
|
|
|
return _normalize_source_events(raw_events)
|
|
|
|
|
|
def _load_jsonl_source_events(
|
|
path: Path,
|
|
source_session: str,
|
|
issues: list[CompetenceValidationIssue],
|
|
) -> list[dict[str, Any]] | None:
|
|
raw_events: list[dict[str, Any]] = []
|
|
session_ids: set[str] = set()
|
|
try:
|
|
for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
payload = json.loads(line)
|
|
except json.JSONDecodeError as exc:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"source_session_invalid",
|
|
f"cannot read source session jsonl line {line_number}: {exc}",
|
|
)
|
|
)
|
|
return None
|
|
if not isinstance(payload, dict):
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"source_session_invalid",
|
|
f"source session jsonl line {line_number} must be a mapping",
|
|
)
|
|
)
|
|
return None
|
|
if isinstance(payload.get("session_id"), str):
|
|
session_ids.add(payload["session_id"])
|
|
raw_events.append(payload)
|
|
except OSError as exc:
|
|
issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}"))
|
|
return None
|
|
|
|
if source_session and source_session not in session_ids:
|
|
issues.append(
|
|
CompetenceValidationIssue(
|
|
"source_session_mismatch",
|
|
f"YAML source session {source_session!r} not found in jsonl sessions {sorted(session_ids)!r}",
|
|
)
|
|
)
|
|
return _normalize_source_events(raw_events)
|
|
|
|
|
|
def _normalize_source_events(raw_events: list[Any]) -> list[dict[str, Any]]:
|
|
normalized: list[dict[str, Any]] = []
|
|
for raw_event in raw_events:
|
|
if not isinstance(raw_event, dict):
|
|
continue
|
|
nested_event = raw_event.get("event")
|
|
if isinstance(nested_event, dict) and isinstance(nested_event.get("type"), str):
|
|
event = dict(nested_event)
|
|
for key in ("session_id", "timestamp", "machine_id"):
|
|
if key not in event and key in raw_event:
|
|
event[key] = raw_event[key]
|
|
normalized.append(event)
|
|
else:
|
|
normalized.append(raw_event)
|
|
return normalized
|
|
|
|
|
|
def _cleaned_keep_indices(data: dict[str, Any]) -> list[int] | None:
|
|
chain_refs = data.get("chain_refs")
|
|
if not isinstance(chain_refs, dict):
|
|
return None
|
|
cleaned = chain_refs.get("cleaned_segment")
|
|
if not isinstance(cleaned, dict):
|
|
return None
|
|
indices = cleaned.get("keep_event_indices")
|
|
if not isinstance(indices, list) or not all(isinstance(i, int) for i in indices):
|
|
return None
|
|
return indices
|
|
|
|
|
|
def _cleaned_method_indices(data: dict[str, Any]) -> list[int] | None:
|
|
chain_refs = data.get("chain_refs")
|
|
if not isinstance(chain_refs, dict):
|
|
return None
|
|
cleaned = chain_refs.get("cleaned_segment")
|
|
if not isinstance(cleaned, dict):
|
|
return None
|
|
indices = cleaned.get("method_event_indices")
|
|
if not _is_int_list(indices):
|
|
return None
|
|
return indices
|
|
|
|
|
|
def _methods_execution_mode(data: dict[str, Any]) -> str:
|
|
mode = data.get("methods_execution", "alternatives")
|
|
return mode if mode in METHODS_EXECUTION_MODES else "alternatives"
|
|
|
|
|
|
def _cleaned_source_event_format(data: dict[str, Any]) -> str:
|
|
chain_refs = data.get("chain_refs")
|
|
if not isinstance(chain_refs, dict):
|
|
return "streaming_session_json"
|
|
cleaned = chain_refs.get("cleaned_segment")
|
|
if not isinstance(cleaned, dict):
|
|
return "streaming_session_json"
|
|
value = cleaned.get("source_event_format")
|
|
return value if value == "raw_live_events_jsonl" else "streaming_session_json"
|
|
|
|
|
|
def _minimum_success_index_after_methods(data: dict[str, Any], method_indices: list[int]) -> int:
|
|
last_method_index = max(method_indices)
|
|
if _last_observed_method_is_wait_state_at(data, last_method_index):
|
|
return last_method_index
|
|
return last_method_index + 1
|
|
|
|
|
|
def _last_observed_method_is_wait_state_at(data: dict[str, Any], event_index: int) -> bool:
|
|
methods = data.get("methods")
|
|
if not isinstance(methods, list):
|
|
return False
|
|
for method in methods:
|
|
if not isinstance(method, dict) or method.get("observed") is not True:
|
|
continue
|
|
if method.get("kind") != "wait_state":
|
|
continue
|
|
trace_indices = _method_trace_indices(method)
|
|
if trace_indices and max(trace_indices) == event_index:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _method_trace_indices(method: dict[str, Any]) -> list[int] | None:
|
|
indices = method.get("trace_event_indices")
|
|
if not _is_int_list(indices):
|
|
return None
|
|
return indices
|
|
|
|
|
|
def _method_scroll_direction(method: dict[str, Any]) -> str | None:
|
|
parameters = method.get("parameters")
|
|
if not isinstance(parameters, dict):
|
|
return None
|
|
direction = parameters.get("direction")
|
|
return direction if isinstance(direction, str) else None
|
|
|
|
|
|
def _is_scroll_delta(value: Any) -> bool:
|
|
return (
|
|
isinstance(value, list)
|
|
and len(value) >= 2
|
|
and isinstance(value[0], int)
|
|
and isinstance(value[1], int)
|
|
and not isinstance(value[0], bool)
|
|
and not isinstance(value[1], bool)
|
|
)
|
|
|
|
|
|
def _scroll_delta_matches_direction(delta: list[Any], direction: str) -> bool:
|
|
if direction == "down":
|
|
return delta[1] < 0
|
|
if direction == "up":
|
|
return delta[1] > 0
|
|
if direction == "left":
|
|
return delta[0] < 0
|
|
if direction == "right":
|
|
return delta[0] > 0
|
|
return True
|
|
|
|
|
|
def _method_key_combo_keys(method: dict[str, Any]) -> Any:
|
|
keys = method.get("keys")
|
|
if keys is not None:
|
|
return keys
|
|
parameters = method.get("parameters")
|
|
if isinstance(parameters, dict):
|
|
return parameters.get("keys")
|
|
return None
|
|
|
|
|
|
def _trace_has_key_combo(
|
|
events: list[dict[str, Any]],
|
|
keep_indices: list[int] | None,
|
|
expected_keys: list[str],
|
|
) -> bool:
|
|
expected = _normalize_key_combo_sequence(expected_keys)
|
|
for index, event in enumerate(events):
|
|
if keep_indices is not None and index not in keep_indices:
|
|
continue
|
|
if event.get("type") != "key_combo":
|
|
continue
|
|
keys = event.get("keys")
|
|
if _is_string_list(keys) and _normalize_key_combo_sequence(keys) == expected:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _trace_success_marker_match_indices(
|
|
events: list[dict[str, Any]],
|
|
keep_indices: list[int] | None,
|
|
markers: list[Any],
|
|
) -> list[int]:
|
|
marker_maps = [marker for marker in markers if isinstance(marker, dict)]
|
|
matches: list[int] = []
|
|
for index, event in enumerate(events):
|
|
if keep_indices is not None and index not in keep_indices:
|
|
continue
|
|
for marker in marker_maps:
|
|
kind = marker.get("kind")
|
|
if kind == "active_process_name_is":
|
|
expected = str(marker.get("value") or "").casefold()
|
|
if expected and _event_process(event).casefold() == expected:
|
|
matches.append(index)
|
|
break
|
|
elif kind == "active_window_title_in":
|
|
values = marker.get("values")
|
|
if _is_string_list(values) and _event_title(event).casefold() in {v.casefold() for v in values}:
|
|
matches.append(index)
|
|
break
|
|
elif kind == "ocr_contains":
|
|
# OCR is not required for offline validation if another marker
|
|
# proves the state in the captured segment.
|
|
continue
|
|
return matches
|
|
|
|
|
|
def _event_title(event: dict[str, Any]) -> str:
|
|
window = event.get("window") if isinstance(event.get("window"), dict) else {}
|
|
to_window = event.get("to") if isinstance(event.get("to"), dict) else {}
|
|
return str(window.get("title") or event.get("active_window_title") or to_window.get("title") or "")
|
|
|
|
|
|
def _event_process(event: dict[str, Any]) -> str:
|
|
window = event.get("window") if isinstance(event.get("window"), dict) else {}
|
|
to_window = event.get("to") if isinstance(event.get("to"), dict) else {}
|
|
return str(window.get("app_name") or to_window.get("app_name") or "")
|
|
|
|
|
|
def _concat_text_input_events(events: list[dict[str, Any]], indices: list[int]) -> str:
|
|
chunks: list[str] = []
|
|
for index in indices:
|
|
if 0 <= index < len(events):
|
|
chunks.append(str(events[index].get("text") or ""))
|
|
return "".join(chunks)
|
|
|
|
|
|
def _repo_path(repo_root: Path, value: str) -> Path:
|
|
path = Path(value)
|
|
if path.is_absolute():
|
|
return path
|
|
return repo_root / path
|
|
|
|
|
|
def _is_primitive_path(path: Path, repo_root: Path) -> bool:
|
|
try:
|
|
relative = path.resolve().relative_to(repo_root.resolve())
|
|
except (OSError, ValueError):
|
|
relative = path
|
|
return len(relative.parts) >= 3 and relative.parts[0] == "data" and relative.parts[1] == "primitives"
|
|
|
|
|
|
def _read_yaml_mapping(path: Path, issues: list[CompetenceValidationIssue]) -> dict[str, Any] | None:
|
|
try:
|
|
data = yaml.safe_load(path.read_text(encoding="utf-8"))
|
|
except (OSError, yaml.YAMLError) as exc:
|
|
issues.append(CompetenceValidationIssue("yaml_invalid", str(exc)))
|
|
return None
|
|
if not isinstance(data, dict):
|
|
issues.append(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"))
|
|
return None
|
|
return data
|
|
|
|
|
|
def _find_competence_dependency_path(
|
|
repo_root: Path,
|
|
competence_id: str,
|
|
*,
|
|
minimum_state: str,
|
|
) -> Path:
|
|
try:
|
|
start = LEARNING_STATE_ORDER.index(minimum_state)
|
|
except ValueError:
|
|
return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml"
|
|
|
|
for state in LEARNING_STATE_ORDER[start:]:
|
|
path = repo_root / "data" / "competences" / state / f"{competence_id}.yaml"
|
|
if path.is_file():
|
|
return path
|
|
return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml"
|
|
|
|
|
|
def _normalize_key(key: str) -> str:
|
|
normalized = str(key or "").strip().casefold()
|
|
return KEY_ALIASES.get(normalized, normalized)
|
|
|
|
|
|
def _normalize_key_combo_sequence(keys: list[str]) -> list[str]:
|
|
normalized = [_normalize_key(key) for key in keys]
|
|
if set(normalized) in ({"shift", "ctrl", "@"}, {"shift", "ctrl", "\x13"}):
|
|
return ["ctrl", "s"]
|
|
return normalized
|
|
|
|
|
|
def _is_string_list(value: Any) -> bool:
|
|
return isinstance(value, list) and bool(value) and all(isinstance(item, str) and item for item in value)
|
|
|
|
|
|
def _is_int_list(value: Any) -> bool:
|
|
return isinstance(value, list) and bool(value) and all(isinstance(item, int) and item >= 0 for item in value)
|
|
|
|
|
|
def _is_int_in_range(value: Any, minimum: int, maximum: int) -> bool:
|
|
return isinstance(value, int) and not isinstance(value, bool) and minimum <= value <= maximum
|
|
|
|
|
|
def _primitive_value_matches_type(value: Any, expected_type: Any) -> bool:
|
|
if expected_type == "str":
|
|
return isinstance(value, str)
|
|
if expected_type == "int":
|
|
return isinstance(value, int) and not isinstance(value, bool)
|
|
if expected_type == "bool":
|
|
return isinstance(value, bool)
|
|
if expected_type == "list[str]":
|
|
return _is_string_list(value)
|
|
if expected_type == "dict":
|
|
return isinstance(value, dict)
|
|
if expected_type == "dict_or_string":
|
|
return isinstance(value, dict) or isinstance(value, str)
|
|
return True
|
|
|
|
|
|
def _is_number_in_range(value: Any, minimum: float, maximum: float) -> bool:
|
|
return isinstance(value, (int, float)) and not isinstance(value, bool) and minimum <= float(value) <= maximum
|
|
|
|
|
|
def _distinct_context_signatures(contexts: list[Any]) -> set[tuple[Any, ...]]:
|
|
dimensions = ("dpi", "screen", "app_in_focus", "method_used", "screen_signature")
|
|
signatures: set[tuple[Any, ...]] = set()
|
|
for context in contexts:
|
|
if not isinstance(context, dict):
|
|
continue
|
|
signature = tuple(context.get(dimension) for dimension in dimensions)
|
|
if any(value not in (None, "", [], {}) for value in signature):
|
|
signatures.add(signature)
|
|
return signatures
|
|
|
|
|
|
def _dedupe_issues(issues: list[CompetenceValidationIssue]) -> tuple[CompetenceValidationIssue, ...]:
|
|
seen: set[tuple[str, str]] = set()
|
|
deduped: list[CompetenceValidationIssue] = []
|
|
for issue in issues:
|
|
key = (issue.code, issue.detail)
|
|
if key in seen:
|
|
continue
|
|
seen.add(key)
|
|
deduped.append(issue)
|
|
return tuple(deduped)
|
|
|
|
|
|
def main(argv: list[str] | None = None) -> int:
|
|
parser = argparse.ArgumentParser(description="Validate Lea short competence YAML files")
|
|
parser.add_argument("paths", nargs="+", help="YAML competence file(s) to validate")
|
|
parser.add_argument("--json", action="store_true", help="emit JSON report")
|
|
args = parser.parse_args(argv)
|
|
|
|
reports = [validate_file(path) for path in args.paths]
|
|
if args.json:
|
|
print(json.dumps([report.to_dict() for report in reports], ensure_ascii=False, indent=2))
|
|
else:
|
|
for report in reports:
|
|
status = "ok" if report.valid else "fail"
|
|
print(f"{status}: {report.path}")
|
|
for issue in report.issues:
|
|
print(f" - {issue.code}: {issue.detail}")
|
|
|
|
return 0 if all(report.valid for report in reports) else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|