Files
rpa_vision_v3/tools/competence_validator.py

1778 lines
67 KiB
Python

#!/usr/bin/env python3
"""Lightweight validator for Lea short competence YAML files.
This module is deliberately offline-only: it reads YAML and trace files, but it
does not start services, load models, replay actions, or promote competences.
"""
from __future__ import annotations
import argparse
import json
import re
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Any
import yaml
REPO_ROOT = Path(__file__).resolve().parents[1]
if str(REPO_ROOT) not in sys.path:
sys.path.insert(0, str(REPO_ROOT))
try:
from agent_v0.agent_v1.ui.message_contract import (
MessageContractError,
format_supervised_pause_message,
)
except Exception: # pragma: no cover - partial deployments can still run basics
MessageContractError = ValueError
format_supervised_pause_message = None
LEARNING_STATES = {"observed", "candidate", "supervised", "stable"}
LEARNING_STATE_ORDER = ("observed", "candidate", "supervised", "stable")
METHODS_EXECUTION_MODES = {"alternatives", "sequence"}
PRIMITIVES_DIR = Path("data") / "primitives"
PRIMITIVE_PARAMETER_TYPES = {"str", "int", "bool", "list[str]", "dict", "dict_or_string"}
PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS = {
"schema_version",
"id",
"kind",
"marker_or_action",
"intent",
"version",
"parameters_schema",
"failure_message_template",
"created_at",
}
PRIMITIVE_FORBIDDEN_FIELDS = {
"learning_state",
"chain_refs",
"promotion",
"generalisation",
"failure_log",
"success_marker",
"preconditions",
"methods",
}
REQUIRED_TOP_LEVEL_KEYS = {
"schema_version",
"id",
"name",
"version",
"learning_state",
"intent",
"parameters",
"preconditions",
"methods",
"success_marker",
"failure_message_template",
"chain_refs",
"promotion",
}
BLOCKED_DURABLE_COORDINATE_KEYS = {
"x",
"y",
"left",
"top",
"width",
"height",
"w",
"h",
"pos",
"bbox",
"bounds",
"rect",
"coordinates",
"x_pct",
"y_pct",
"window_bounds",
"screen_resolution",
}
KEY_ALIASES = {
"cmd": "win",
"command": "win",
"meta": "win",
"super": "win",
"windows": "win",
}
@dataclass(frozen=True)
class CompetenceValidationIssue:
code: str
detail: str
@dataclass(frozen=True)
class CompetenceValidationReport:
path: str
issues: tuple[CompetenceValidationIssue, ...]
@property
def valid(self) -> bool:
return not self.issues
def to_dict(self) -> dict[str, Any]:
data = asdict(self)
data["valid"] = self.valid
return data
def validate_competence_file(
path: str | Path,
*,
repo_root: str | Path | None = None,
) -> CompetenceValidationReport:
competence_path = Path(path)
root = Path(repo_root) if repo_root is not None else REPO_ROOT
issues: list[CompetenceValidationIssue] = []
try:
data = yaml.safe_load(competence_path.read_text(encoding="utf-8"))
except FileNotFoundError:
return CompetenceValidationReport(
str(competence_path),
(CompetenceValidationIssue("file_missing", f"{competence_path} does not exist"),),
)
except yaml.YAMLError as exc:
return CompetenceValidationReport(
str(competence_path),
(CompetenceValidationIssue("yaml_invalid", str(exc)),),
)
if not isinstance(data, dict):
return CompetenceValidationReport(
str(competence_path),
(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),),
)
_validate_required_shape(data, competence_path, issues)
_validate_promotion_state(data, issues)
_validate_t2_known_gaps(data, issues)
_validate_methods_execution(data, issues)
_validate_no_durable_coordinates(data, issues)
_validate_failure_message_template(data, issues)
_validate_preconditions(data, root, issues)
_validate_methods_and_trace(data, root, issues)
_validate_success_marker(data, root, issues)
_validate_chain_refs(data, root, issues)
return CompetenceValidationReport(str(competence_path), _dedupe_issues(issues))
def validate_primitive_file(
path: str | Path,
*,
repo_root: str | Path | None = None,
) -> CompetenceValidationReport:
primitive_path = Path(path)
issues: list[CompetenceValidationIssue] = []
try:
data = yaml.safe_load(primitive_path.read_text(encoding="utf-8"))
except FileNotFoundError:
return CompetenceValidationReport(
str(primitive_path),
(CompetenceValidationIssue("file_missing", f"{primitive_path} does not exist"),),
)
except yaml.YAMLError as exc:
return CompetenceValidationReport(
str(primitive_path),
(CompetenceValidationIssue("yaml_invalid", str(exc)),),
)
if not isinstance(data, dict):
return CompetenceValidationReport(
str(primitive_path),
(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),),
)
_validate_primitive_required_shape(data, primitive_path, issues)
_validate_primitive_parameters_schema(data, issues)
_validate_no_durable_coordinates(data, issues)
_validate_failure_message_template(data, issues)
return CompetenceValidationReport(str(primitive_path), _dedupe_issues(issues))
def validate_file(
path: str | Path,
*,
repo_root: str | Path | None = None,
) -> CompetenceValidationReport:
root = Path(repo_root) if repo_root is not None else REPO_ROOT
candidate_path = Path(path)
if _is_primitive_path(candidate_path, root):
return validate_primitive_file(candidate_path, repo_root=root)
return validate_competence_file(candidate_path, repo_root=root)
def _validate_required_shape(
data: dict[str, Any],
competence_path: Path,
issues: list[CompetenceValidationIssue],
) -> None:
missing = sorted(REQUIRED_TOP_LEVEL_KEYS - set(data.keys()))
for key in missing:
issues.append(CompetenceValidationIssue("missing_key", f"missing top-level key: {key}"))
if data.get("schema_version") != 1:
issues.append(CompetenceValidationIssue("schema_version", "schema_version must be 1"))
competence_id = data.get("id")
if not isinstance(competence_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", competence_id):
issues.append(CompetenceValidationIssue("id_invalid", "id must be a lowercase slug"))
elif competence_id != competence_path.stem:
issues.append(
CompetenceValidationIssue(
"id_filename_mismatch",
f"id must match filename stem: id={competence_id!r} filename={competence_path.stem!r}",
)
)
version = data.get("version")
if not isinstance(version, int) or version < 1:
issues.append(CompetenceValidationIssue("version_invalid", "version must be a positive integer"))
state = data.get("learning_state")
if state not in LEARNING_STATES:
issues.append(
CompetenceValidationIssue(
"learning_state_invalid",
f"learning_state must be one of {sorted(LEARNING_STATES)}",
)
)
for key in ("intent", "parameters", "success_marker", "failure_message_template", "chain_refs", "promotion"):
if key in data and not isinstance(data.get(key), dict):
issues.append(CompetenceValidationIssue("mapping_expected", f"{key} must be a mapping"))
for key in ("preconditions", "methods"):
if key in data and not isinstance(data.get(key), list):
issues.append(CompetenceValidationIssue("list_expected", f"{key} must be a list"))
def _validate_primitive_required_shape(
data: dict[str, Any],
primitive_path: Path,
issues: list[CompetenceValidationIssue],
) -> None:
missing = sorted(PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS - set(data.keys()))
for key in missing:
issues.append(CompetenceValidationIssue("primitive_missing_key", f"missing primitive key: {key}"))
for key in sorted(PRIMITIVE_FORBIDDEN_FIELDS & set(data.keys())):
issues.append(CompetenceValidationIssue("primitive_forbidden_field", f"primitive must not define {key}"))
if data.get("schema_version") != 1:
issues.append(CompetenceValidationIssue("primitive_file_invalid", "schema_version must be 1"))
primitive_id = data.get("id")
if not isinstance(primitive_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_id):
issues.append(CompetenceValidationIssue("primitive_file_invalid", "id must be a lowercase slug"))
elif primitive_id != primitive_path.stem:
issues.append(
CompetenceValidationIssue(
"primitive_id_filename_mismatch",
f"id must match filename stem: id={primitive_id!r} filename={primitive_path.stem!r}",
)
)
if data.get("kind") != "primitive":
issues.append(CompetenceValidationIssue("primitive_file_invalid", "kind must be primitive"))
if data.get("marker_or_action") not in {"action", "marker"}:
issues.append(CompetenceValidationIssue("primitive_file_invalid", "marker_or_action must be action or marker"))
version = data.get("version")
if not isinstance(version, int) or version < 1:
issues.append(CompetenceValidationIssue("primitive_file_invalid", "version must be a positive integer"))
intent = data.get("intent")
if not isinstance(intent, dict) or not isinstance(intent.get("fr"), str) or not intent.get("fr", "").strip():
issues.append(CompetenceValidationIssue("primitive_file_invalid", "intent.fr must be non-empty text"))
if "executor_kind" in data and (not isinstance(data.get("executor_kind"), str) or not data.get("executor_kind", "").strip()):
issues.append(CompetenceValidationIssue("primitive_file_invalid", "executor_kind must be non-empty text"))
if "notes" in data and not _is_string_list(data.get("notes")):
issues.append(CompetenceValidationIssue("primitive_file_invalid", "notes must be a non-empty text list"))
if "last_updated_at" in data and not isinstance(data.get("last_updated_at"), str):
issues.append(CompetenceValidationIssue("primitive_file_invalid", "last_updated_at must be text"))
def _validate_primitive_parameters_schema(
data: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
schema = data.get("parameters_schema")
if not isinstance(schema, dict) or not schema:
issues.append(CompetenceValidationIssue("primitive_file_invalid", "parameters_schema must be a non-empty mapping"))
return
for param_name, spec in schema.items():
if not isinstance(param_name, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", param_name):
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", "parameter names must be lowercase slugs"))
continue
if not isinstance(spec, dict):
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name} schema must be a mapping"))
continue
param_type = spec.get("type")
if param_type not in PRIMITIVE_PARAMETER_TYPES:
issues.append(
CompetenceValidationIssue(
"primitive_param_schema_invalid",
f"{param_name}.type must be one of {sorted(PRIMITIVE_PARAMETER_TYPES)}",
)
)
required = spec.get("required")
if required is not None and not isinstance(required, bool):
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.required must be bool"))
if required is True and "default" in spec:
issues.append(
CompetenceValidationIssue(
"primitive_param_schema_invalid",
f"{param_name} cannot define default when required=true",
)
)
required_unless = spec.get("required_unless")
if required_unless is not None:
if not _is_string_list(required_unless):
issues.append(
CompetenceValidationIssue(
"primitive_param_schema_invalid",
f"{param_name}.required_unless must be a non-empty text list",
)
)
else:
missing_refs = [name for name in required_unless if name not in schema]
if missing_refs:
issues.append(
CompetenceValidationIssue(
"primitive_param_schema_invalid",
f"{param_name}.required_unless references unknown parameters: {missing_refs}",
)
)
description = spec.get("description")
if not isinstance(description, str) or not description.strip():
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.description is required"))
constraints = spec.get("constraints")
if constraints is not None and not isinstance(constraints, dict):
issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.constraints must be a mapping"))
elif isinstance(constraints, dict):
enum = constraints.get("enum")
if enum is not None and (not isinstance(enum, list) or not enum):
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"parameters_schema.{param_name}.constraints.enum must be a non-empty list",
)
)
for min_key in ("min", "min_value"):
min_value = constraints.get(min_key)
if min_value is not None and (not isinstance(min_value, int) or isinstance(min_value, bool)):
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"parameters_schema.{param_name}.constraints.{min_key} must be an integer",
)
)
def _validate_promotion_state(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
state = data.get("learning_state")
if state not in LEARNING_STATES:
return
promotion = data.get("promotion") if isinstance(data.get("promotion"), dict) else {}
stable_requires = promotion.get("stable_requires") if isinstance(promotion.get("stable_requires"), dict) else {}
min_successes = stable_requires.get("min_successes")
if min_successes is not None and (not isinstance(min_successes, int) or min_successes < 3):
issues.append(
CompetenceValidationIssue(
"promotion_stable_requires",
"promotion.stable_requires.min_successes must be at least 3",
)
)
if state == "observed":
return
chain_refs = data.get("chain_refs") if isinstance(data.get("chain_refs"), dict) else {}
cleaned = chain_refs.get("cleaned_segment") if isinstance(chain_refs.get("cleaned_segment"), dict) else {}
generalisation = data.get("generalisation") if isinstance(data.get("generalisation"), dict) else {}
seen_contexts = generalisation.get("seen_contexts") if isinstance(generalisation.get("seen_contexts"), list) else []
history = promotion.get("history") if isinstance(promotion.get("history"), list) else []
if state == "candidate":
if cleaned.get("status") != "documented_offline":
issues.append(
CompetenceValidationIssue(
"learning_state_premature",
"candidate requires chain_refs.cleaned_segment.status=documented_offline",
)
)
return
if state == "supervised":
if not seen_contexts or not history:
issues.append(
CompetenceValidationIssue(
"learning_state_premature",
"supervised requires seen contexts and promotion.history",
)
)
return
if state == "stable":
if len(seen_contexts) < 3 or len(_distinct_context_signatures(seen_contexts)) < 3:
issues.append(
CompetenceValidationIssue(
"learning_state_premature",
"stable requires at least 3 distinct seen contexts",
)
)
def _validate_t2_known_gaps(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
promotion = data.get("promotion")
if not isinstance(promotion, dict):
return
gaps = promotion.get("t2_known_gaps")
if gaps is None:
return
if not isinstance(gaps, list):
issues.append(CompetenceValidationIssue("t2_known_gap_invalid", "promotion.t2_known_gaps must be a list"))
return
required_keys = ("id", "description", "impact", "proposed_resolution")
optional_text_keys = ("acted_by", "acted_at")
for index, gap in enumerate(gaps):
if not isinstance(gap, dict):
issues.append(
CompetenceValidationIssue("t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}] must be a mapping")
)
continue
gap_id = gap.get("id")
if isinstance(gap_id, str) and gap_id.strip() and not re.fullmatch(r"[a-z][a-z0-9_]*", gap_id):
issues.append(
CompetenceValidationIssue(
"t2_known_gap_invalid",
f"promotion.t2_known_gaps[{index}].id must be a lowercase slug",
)
)
for key in required_keys:
value = gap.get(key)
if not isinstance(value, str) or not value.strip():
issues.append(
CompetenceValidationIssue(
"t2_known_gap_invalid",
f"promotion.t2_known_gaps[{index}].{key} is required",
)
)
for key in optional_text_keys:
if key in gap and (not isinstance(gap.get(key), str) or not gap.get(key, "").strip()):
issues.append(
CompetenceValidationIssue(
"t2_known_gap_invalid",
f"promotion.t2_known_gaps[{index}].{key} must be non-empty text when present",
)
)
def _validate_methods_execution(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None:
mode = data.get("methods_execution", "alternatives")
if mode not in METHODS_EXECUTION_MODES:
issues.append(
CompetenceValidationIssue(
"methods_sequence_invalid",
f"methods_execution must be one of {sorted(METHODS_EXECUTION_MODES)}",
)
)
return
methods = data.get("methods")
if not isinstance(methods, list):
return
if mode == "sequence" and len(methods) < 2:
issues.append(
CompetenceValidationIssue(
"methods_sequence_invalid",
"methods_execution=sequence requires at least two methods",
)
)
return
keep_indices = _cleaned_keep_indices(data)
method_indices = _cleaned_method_indices(data)
seen_ids: set[str] = set()
last_trace_index = -1
for index, method in enumerate(methods):
if not isinstance(method, dict):
continue
method_id = method.get("id")
if mode == "sequence" and isinstance(method_id, str) and method_id.strip():
if method_id in seen_ids:
issues.append(
CompetenceValidationIssue(
"methods_sequence_invalid",
f"methods[{index}].id must be unique in sequence mode",
)
)
seen_ids.add(method_id)
if method.get("observed") is not True:
continue
trace_indices = method.get("trace_event_indices")
if trace_indices is None and mode != "sequence":
continue
trace_issue_code = "methods_sequence_invalid" if mode == "sequence" else "method_trace_missing"
if not _is_int_list(trace_indices):
issues.append(
CompetenceValidationIssue(
trace_issue_code,
f"methods[{index}].trace_event_indices must be a non-empty integer list",
)
)
continue
if keep_indices is not None:
missing_keep_indices = [event_index for event_index in trace_indices if event_index not in keep_indices]
if missing_keep_indices:
issues.append(
CompetenceValidationIssue(
trace_issue_code,
f"methods[{index}].trace_event_indices must be included in keep_event_indices: {missing_keep_indices}",
)
)
if method_indices is not None:
missing_method_indices = [event_index for event_index in trace_indices if event_index not in method_indices]
if missing_method_indices:
issues.append(
CompetenceValidationIssue(
trace_issue_code,
f"methods[{index}].trace_event_indices must be included in method_event_indices: {missing_method_indices}",
)
)
if mode == "sequence" and min(trace_indices) <= last_trace_index:
issues.append(
CompetenceValidationIssue(
"methods_sequence_invalid",
f"methods[{index}].trace_event_indices must follow previous observed step",
)
)
if mode == "sequence":
last_trace_index = max(last_trace_index, max(trace_indices))
def _validate_no_durable_coordinates(data: Any, issues: list[CompetenceValidationIssue], path: str = "") -> None:
if isinstance(data, dict):
for key, value in data.items():
key_text = str(key)
key_path = f"{path}.{key_text}" if path else key_text
key_lower = key_text.lower()
pct_relative_offset = key_lower in {"x_pct", "y_pct"} and path.endswith("relative_offset")
if key_lower in BLOCKED_DURABLE_COORDINATE_KEYS and not pct_relative_offset:
issues.append(
CompetenceValidationIssue(
"durable_coordinate_key",
f"durable competence data must not store coordinates: {key_path}",
)
)
_validate_no_durable_coordinates(value, issues, key_path)
elif isinstance(data, list):
for index, value in enumerate(data):
_validate_no_durable_coordinates(value, issues, f"{path}[{index}]")
def _validate_failure_message_template(
data: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
template = data.get("failure_message_template")
if not isinstance(template, dict):
return
required = ("intention", "attendu", "vu", "demande")
for key in required:
if not isinstance(template.get(key), str) or not template.get(key, "").strip():
issues.append(
CompetenceValidationIssue(
"failure_message_template",
f"failure_message_template.{key} must be non-empty text",
)
)
if any(key not in template for key in required):
return
if format_supervised_pause_message is None:
return
try:
format_supervised_pause_message(
intention=template["intention"],
attendu=template["attendu"],
vu=template["vu"].replace(
"{observed_human_state}",
"la fenetre attendue n'est pas visible",
),
demande=template["demande"],
)
except MessageContractError as exc:
issues.append(
CompetenceValidationIssue(
"failure_message_contract",
str(exc),
)
)
def _validate_preconditions(
data: dict[str, Any],
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> None:
preconditions = data.get("preconditions")
if not isinstance(preconditions, list):
return
competence_id = data.get("id")
for index, precondition in enumerate(preconditions):
if not isinstance(precondition, dict):
continue
if precondition.get("kind") != "competence_required":
continue
dependency = precondition.get("competence")
state = precondition.get("state")
if not isinstance(dependency, str) or not dependency.strip():
issues.append(
CompetenceValidationIssue(
"competence_dependency_invalid",
f"preconditions[{index}].competence must be non-empty text",
)
)
continue
if dependency == competence_id:
issues.append(
CompetenceValidationIssue(
"competence_dependency_invalid",
f"preconditions[{index}] must not depend on itself",
)
)
if state not in LEARNING_STATES:
issues.append(
CompetenceValidationIssue(
"competence_dependency_invalid",
f"preconditions[{index}].state must be one of {sorted(LEARNING_STATES)}",
)
)
continue
dependency_path = _find_competence_dependency_path(
repo_root,
dependency,
minimum_state=str(state),
)
if not dependency_path.is_file():
issues.append(
CompetenceValidationIssue(
"competence_dependency_missing",
f"required competence not found: {dependency} with minimum state {state}",
)
)
def _validate_methods_and_trace(
data: dict[str, Any],
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> None:
methods = data.get("methods")
if not isinstance(methods, list):
return
if not methods:
issues.append(CompetenceValidationIssue("methods_empty", "at least one method is required"))
return
source_events = _load_source_events(data, repo_root, issues)
keep_indices = _cleaned_keep_indices(data)
for index, method in enumerate(methods):
if not isinstance(method, dict):
issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}] must be a mapping"))
continue
for key in ("id", "kind"):
if not isinstance(method.get(key), str) or not method.get(key, "").strip():
issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}].{key} is required"))
kind = method.get("kind")
_validate_method_primitive_ref(method, kind, index, repo_root, issues)
if kind == "key_combo":
keys = _method_key_combo_keys(method)
if not _is_string_list(keys):
issues.append(CompetenceValidationIssue("method_keys_invalid", f"methods[{index}].keys must be text list"))
continue
if method.get("observed") is True:
if not method.get("trace_source"):
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
trace_indices = _method_trace_indices(method) or keep_indices
if source_events is not None and not _trace_has_key_combo(source_events, trace_indices, keys):
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"observed key_combo {keys!r} not found in cleaned source segment",
)
)
elif kind == "text_input" and method.get("observed") is True:
if not method.get("trace_source"):
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
if source_events is None:
continue
if not method_indices:
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"observed text_input method {method.get('id') or index} requires method_event_indices",
)
)
continue
non_text_indices = [
event_index
for event_index in method_indices
if event_index >= len(source_events)
or source_events[event_index].get("type") != "text_input"
]
if non_text_indices:
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"method_event_indices contain non text_input events: {non_text_indices}",
)
)
continue
reconstructed = method.get("reconstructed_text")
if isinstance(reconstructed, str):
observed_text = _concat_text_input_events(source_events, method_indices)
if observed_text != reconstructed:
issues.append(
CompetenceValidationIssue(
"method_reconstructed_text_mismatch",
f"reconstructed_text={reconstructed!r} trace_text={observed_text!r}",
)
)
elif kind == "scroll" and method.get("observed") is True:
if not method.get("trace_source"):
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
if source_events is None:
continue
if not method_indices:
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"observed scroll method {method.get('id') or index} requires trace_event_indices or method_event_indices",
)
)
continue
_validate_scroll_method_trace(method, index, source_events, method_indices, issues)
elif kind == "click" and method.get("observed") is True:
if not method.get("trace_source"):
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
if source_events is None:
continue
if not method_indices:
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"observed click method {method.get('id') or index} requires trace_event_indices or method_event_indices",
)
)
continue
_validate_click_method_trace(index, source_events, method_indices, issues)
elif kind == "wait_state" and method.get("observed") is True:
if not method.get("trace_source"):
issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source"))
method_indices = _method_trace_indices(method) or _cleaned_method_indices(data)
if source_events is None:
continue
if not method_indices:
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"observed wait_state method {method.get('id') or index} requires trace_event_indices or method_event_indices",
)
)
continue
_validate_wait_state_method_trace(index, source_events, method_indices, issues)
def _validate_method_primitive_ref(
method: dict[str, Any],
method_kind: Any,
method_index: int,
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> None:
primitive_ref = method.get("primitive_ref")
if primitive_ref is None:
return
if not isinstance(primitive_ref, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_ref):
issues.append(
CompetenceValidationIssue(
"primitive_ref_invalid",
f"methods[{method_index}].primitive_ref must be a lowercase slug",
)
)
return
primitive_path = repo_root / PRIMITIVES_DIR / f"{primitive_ref}.yaml"
if not primitive_path.is_file():
issues.append(
CompetenceValidationIssue(
"primitive_ref_unknown",
f"primitive_ref={primitive_ref!r}: file not found: {primitive_path.relative_to(repo_root)}",
)
)
return
primitive_report = validate_primitive_file(primitive_path, repo_root=repo_root)
if not primitive_report.valid:
for issue in primitive_report.issues:
issues.append(
CompetenceValidationIssue(
"primitive_file_invalid",
f"primitive_ref={primitive_ref!r}: {issue.code}: {issue.detail}",
)
)
return
primitive = _read_yaml_mapping(primitive_path, issues)
if primitive is None:
return
expected_kind = primitive.get("executor_kind")
if isinstance(expected_kind, str) and method_kind != expected_kind:
issues.append(
CompetenceValidationIssue(
"primitive_kind_mismatch",
f"primitive_ref={primitive_ref!r} requires kind={expected_kind!r}, got kind={method_kind!r}",
)
)
parameters = method.get("parameters")
if parameters is None:
parameters = {}
if not isinstance(parameters, dict):
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} requires methods[{method_index}].parameters to be a mapping",
)
)
return
schema = primitive.get("parameters_schema")
if not isinstance(schema, dict):
return
for param_name, spec in schema.items():
if not isinstance(spec, dict):
continue
_validate_primitive_method_parameter(primitive_ref, param_name, spec, parameters, issues)
if primitive_ref == "click_anchor":
_validate_click_anchor_parameters(parameters, issues)
if primitive_ref == "wait_for_state":
_validate_wait_for_state_parameters(parameters, issues)
def _validate_scroll_method_trace(
method: dict[str, Any],
method_index: int,
events: list[dict[str, Any]],
indices: list[int],
issues: list[CompetenceValidationIssue],
) -> None:
direction = _method_scroll_direction(method)
for event_index in indices:
if event_index >= len(events) or events[event_index].get("type") != "mouse_scroll":
observed_type = events[event_index].get("type") if event_index < len(events) else None
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"methods[{method_index}] expects type=mouse_scroll, got type={observed_type!r} at event #{event_index}",
)
)
continue
delta = events[event_index].get("delta")
if not _is_scroll_delta(delta):
issues.append(
CompetenceValidationIssue(
"method_scroll_delta_missing",
f"methods[{method_index}] points event #{event_index} type=mouse_scroll without usable delta field",
)
)
continue
if isinstance(direction, str) and not _scroll_delta_matches_direction(delta, direction):
issues.append(
CompetenceValidationIssue(
"method_scroll_direction_mismatch",
f"methods[{method_index}] direction={direction!r} does not match delta={delta!r} at event #{event_index}",
)
)
def _validate_click_method_trace(
method_index: int,
events: list[dict[str, Any]],
indices: list[int],
issues: list[CompetenceValidationIssue],
) -> None:
for event_index in indices:
if event_index >= len(events) or events[event_index].get("type") != "mouse_click":
observed_type = events[event_index].get("type") if event_index < len(events) else None
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"methods[{method_index}] expects type=mouse_click, got type={observed_type!r} at event #{event_index}",
)
)
def _validate_wait_state_method_trace(
method_index: int,
events: list[dict[str, Any]],
indices: list[int],
issues: list[CompetenceValidationIssue],
) -> None:
for event_index in indices:
event = events[event_index] if event_index < len(events) else {}
event_type = event.get("type")
if event_type == "window_focus_change" and (_event_title(event) or _event_process(event)):
continue
if event_type == "heartbeat" and (_event_title(event) or _event_process(event)):
continue
issues.append(
CompetenceValidationIssue(
"method_trace_missing",
f"methods[{method_index}] expects durable wait_state evidence, got type={event_type!r} at event #{event_index}",
)
)
def _validate_click_anchor_parameters(
parameters: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
anchor_ref = parameters.get("anchor_ref")
if not (
isinstance(anchor_ref, str)
and anchor_ref.strip()
or isinstance(anchor_ref, dict)
and bool(anchor_ref)
):
issues.append(
CompetenceValidationIssue(
"primitive_anchor_ref_invalid",
"click_anchor requires anchor_ref as non-empty string or mapping",
)
)
click_count = parameters.get("click_count", 1)
if not isinstance(click_count, int) or isinstance(click_count, bool) or click_count < 1 or click_count > 2:
issues.append(
CompetenceValidationIssue(
"primitive_click_count_out_of_range",
"click_anchor click_count must be 1 or 2",
)
)
if "relative_offset" in parameters:
_validate_click_relative_offset(parameters.get("relative_offset"), issues)
def _validate_click_relative_offset(
offset: Any,
issues: list[CompetenceValidationIssue],
) -> None:
if not isinstance(offset, dict):
issues.append(
CompetenceValidationIssue(
"primitive_relative_offset_invalid",
"click_anchor relative_offset must be a mapping",
)
)
return
keys = set(offset.keys())
if keys == {"x_pct", "y_pct"}:
if not all(_is_number_in_range(offset[key], 0.0, 1.0) for key in ("x_pct", "y_pct")):
issues.append(
CompetenceValidationIssue(
"primitive_relative_offset_invalid",
"click_anchor relative_offset x_pct/y_pct must be numbers between 0.0 and 1.0",
)
)
return
if keys == {"dx", "dy"}:
if not all(_is_number_in_range(offset[key], -0.5, 0.5) for key in ("dx", "dy")):
issues.append(
CompetenceValidationIssue(
"primitive_relative_offset_invalid",
"click_anchor relative_offset dx/dy must be numbers between -0.5 and 0.5",
)
)
return
issues.append(
CompetenceValidationIssue(
"primitive_relative_offset_invalid",
"click_anchor relative_offset must use exactly x_pct/y_pct or dx/dy",
)
)
def _validate_wait_for_state_parameters(
parameters: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
expected_state = parameters.get("expected_state")
if not isinstance(expected_state, dict) or not expected_state:
issues.append(
CompetenceValidationIssue(
"primitive_expected_state_invalid",
"wait_for_state expected_state must be a non-empty mapping",
)
)
timeout_ms = parameters.get("timeout_ms", 5000)
if not _is_int_in_range(timeout_ms, 100, 60000):
issues.append(
CompetenceValidationIssue(
"primitive_wait_timeout_invalid",
"wait_for_state timeout_ms must be an integer between 100 and 60000",
)
)
poll_interval_ms = parameters.get("poll_interval_ms", 250)
if not _is_int_in_range(poll_interval_ms, 50, 5000):
issues.append(
CompetenceValidationIssue(
"primitive_poll_interval_invalid",
"wait_for_state poll_interval_ms must be an integer between 50 and 5000",
)
)
def _validate_primitive_method_parameter(
primitive_ref: str,
param_name: str,
spec: dict[str, Any],
parameters: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
required = spec.get("required") is True
required_unless = spec.get("required_unless")
is_present = param_name in parameters
if required and not is_present:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} requires parameter {param_name!r}",
)
)
return
if _is_string_list(required_unless):
alternatives_present = [name for name in required_unless if name in parameters]
if is_present and alternatives_present:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameters {param_name!r} and {alternatives_present!r} are mutually exclusive",
)
)
elif not is_present and not alternatives_present:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} requires parameter {param_name!r} unless one of {required_unless!r} is present",
)
)
return
if is_present and not _primitive_value_matches_type(parameters[param_name], spec.get("type")):
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameter {param_name!r} has invalid type {spec.get('type')!r}",
)
)
constraints = spec.get("constraints")
if is_present and isinstance(constraints, dict):
_validate_primitive_method_parameter_constraints(
primitive_ref,
param_name,
parameters[param_name],
constraints,
issues,
)
def _validate_primitive_method_parameter_constraints(
primitive_ref: str,
param_name: str,
value: Any,
constraints: dict[str, Any],
issues: list[CompetenceValidationIssue],
) -> None:
min_length = constraints.get("min_length")
if isinstance(min_length, int) and hasattr(value, "__len__") and len(value) < min_length:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must have length >= {min_length}",
)
)
min_value = constraints.get("min", constraints.get("min_value"))
if isinstance(min_value, int) and isinstance(value, int) and not isinstance(value, bool) and value < min_value:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be >= {min_value}",
)
)
enum = constraints.get("enum")
if isinstance(enum, list) and value not in enum:
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be one of {enum!r}",
)
)
regex = constraints.get("regex")
if isinstance(regex, str) and isinstance(value, str) and not re.fullmatch(regex, value):
issues.append(
CompetenceValidationIssue(
"primitive_schema_invalid",
f"primitive_ref={primitive_ref!r} parameter {param_name!r} must match {regex!r}",
)
)
def _validate_success_marker(
data: dict[str, Any],
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> None:
marker = data.get("success_marker")
if not isinstance(marker, dict):
return
if marker.get("mode") not in {"any_of", "all_of"}:
issues.append(CompetenceValidationIssue("success_marker_mode", "success_marker.mode must be any_of or all_of"))
timeout_ms = marker.get("timeout_ms")
if not isinstance(timeout_ms, int) or timeout_ms <= 0:
issues.append(CompetenceValidationIssue("success_marker_timeout", "success_marker.timeout_ms must be positive"))
markers = marker.get("markers")
if not isinstance(markers, list) or not markers:
issues.append(CompetenceValidationIssue("success_marker_markers", "success_marker.markers must be a non-empty list"))
return
for index, item in enumerate(markers):
if not isinstance(item, dict) or not item.get("kind"):
issues.append(CompetenceValidationIssue("success_marker_invalid", f"markers[{index}] must define kind"))
supervised_requires = marker.get("supervised_requires")
if supervised_requires is not None and not isinstance(supervised_requires, list):
issues.append(
CompetenceValidationIssue(
"success_marker_invalid",
"success_marker.supervised_requires must be a list when present",
)
)
source_events = _load_source_events(data, repo_root, issues)
if source_events is None:
return
keep_indices = _cleaned_keep_indices(data)
method_indices = _cleaned_method_indices(data)
match_indices = _trace_success_marker_match_indices(
source_events,
keep_indices,
markers,
)
if not match_indices:
issues.append(
CompetenceValidationIssue(
"success_marker_missing",
"no success marker matches the cleaned source segment",
)
)
return
if method_indices:
min_success_index = _minimum_success_index_after_methods(data, method_indices)
if not any(index >= min_success_index for index in match_indices):
issues.append(
CompetenceValidationIssue(
"success_marker_pre_method",
"success marker must match an event after the observed method",
)
)
def _validate_chain_refs(
data: dict[str, Any],
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> None:
chain_refs = data.get("chain_refs")
if not isinstance(chain_refs, dict):
return
if not isinstance(chain_refs.get("source_session"), str) or not chain_refs.get("source_session", "").strip():
issues.append(CompetenceValidationIssue("chain_ref_missing", "chain_refs.source_session is required"))
cleaned = chain_refs.get("cleaned_segment")
if not isinstance(cleaned, dict):
issues.append(CompetenceValidationIssue("cleaned_segment_missing", "chain_refs.cleaned_segment is required"))
return
source_event_format = cleaned.get("source_event_format")
if source_event_format is not None and source_event_format not in {"streaming_session_json", "raw_live_events_jsonl"}:
issues.append(
CompetenceValidationIssue(
"cleaned_segment_source",
"cleaned_segment.source_event_format must be streaming_session_json or raw_live_events_jsonl",
)
)
required_path_key = "live_events_path" if source_event_format == "raw_live_events_jsonl" else "streaming_session_path"
for key in ("streaming_session_path", "live_events_path"):
path_value = chain_refs.get(key)
if key == required_path_key and (not isinstance(path_value, str) or not path_value.strip()):
issues.append(CompetenceValidationIssue("chain_ref_missing", f"chain_refs.{key} is required"))
continue
if isinstance(path_value, str) and path_value.strip():
resolved = _repo_path(repo_root, path_value)
if not resolved.is_file():
issues.append(CompetenceValidationIssue("chain_ref_path_missing", f"{key} not found: {path_value}"))
keep_indices = cleaned.get("keep_event_indices")
if not isinstance(keep_indices, list) or not keep_indices or not all(isinstance(i, int) and i >= 0 for i in keep_indices):
issues.append(
CompetenceValidationIssue(
"cleaned_segment_indices",
"cleaned_segment.keep_event_indices must be a non-empty list of positive indices",
)
)
stop_before = cleaned.get("stop_before")
if not isinstance(stop_before, list) or not stop_before:
issues.append(CompetenceValidationIssue("cleaned_segment_stop", "cleaned_segment.stop_before must document cut reasons"))
method_indices = cleaned.get("method_event_indices")
success_indices = cleaned.get("success_event_indices")
if method_indices is not None and not _is_int_list(method_indices):
issues.append(
CompetenceValidationIssue(
"cleaned_segment_indices",
"cleaned_segment.method_event_indices must be a list of positive indices",
)
)
if success_indices is not None and not _is_int_list(success_indices):
issues.append(
CompetenceValidationIssue(
"cleaned_segment_indices",
"cleaned_segment.success_event_indices must be a list of positive indices",
)
)
if _is_int_list(method_indices) and _is_int_list(success_indices):
min_success_index = _minimum_success_index_after_methods(data, method_indices)
if not any(index >= min_success_index for index in success_indices):
issues.append(
CompetenceValidationIssue(
"success_marker_pre_method",
"cleaned_segment.success_event_indices must be after method_event_indices",
)
)
if _is_int_list(method_indices) and isinstance(keep_indices, list):
missing_method_indices = [index for index in method_indices if index not in keep_indices]
if missing_method_indices:
issues.append(
CompetenceValidationIssue(
"cleaned_segment_indices",
f"method_event_indices must be included in keep_event_indices: {missing_method_indices}",
)
)
source_events = _load_source_events(data, repo_root, issues)
if source_events is not None and isinstance(keep_indices, list):
for index in keep_indices:
if isinstance(index, int) and index >= len(source_events):
issues.append(
CompetenceValidationIssue(
"cleaned_segment_indices",
f"cleaned segment index out of range: {index}",
)
)
def _load_source_events(
data: dict[str, Any],
repo_root: Path,
issues: list[CompetenceValidationIssue],
) -> list[dict[str, Any]] | None:
chain_refs = data.get("chain_refs")
if not isinstance(chain_refs, dict):
return None
source_format = _cleaned_source_event_format(data)
path_key = "live_events_path" if source_format == "raw_live_events_jsonl" else "streaming_session_path"
path_value = chain_refs.get(path_key)
if not isinstance(path_value, str) or not path_value:
return None
path = _repo_path(repo_root, path_value)
if not path.is_file():
return None
if source_format == "raw_live_events_jsonl":
return _load_jsonl_source_events(path, str(chain_refs.get("source_session") or ""), issues)
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}"))
return None
source_session = chain_refs.get("source_session")
if source_session and payload.get("session_id") != source_session:
issues.append(
CompetenceValidationIssue(
"source_session_mismatch",
f"source session mismatch: YAML={source_session} trace={payload.get('session_id')}",
)
)
raw_events = payload.get("events")
if not isinstance(raw_events, list):
issues.append(CompetenceValidationIssue("source_session_invalid", "source session events must be a list"))
return None
return _normalize_source_events(raw_events)
def _load_jsonl_source_events(
path: Path,
source_session: str,
issues: list[CompetenceValidationIssue],
) -> list[dict[str, Any]] | None:
raw_events: list[dict[str, Any]] = []
session_ids: set[str] = set()
try:
for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if not line.strip():
continue
try:
payload = json.loads(line)
except json.JSONDecodeError as exc:
issues.append(
CompetenceValidationIssue(
"source_session_invalid",
f"cannot read source session jsonl line {line_number}: {exc}",
)
)
return None
if not isinstance(payload, dict):
issues.append(
CompetenceValidationIssue(
"source_session_invalid",
f"source session jsonl line {line_number} must be a mapping",
)
)
return None
if isinstance(payload.get("session_id"), str):
session_ids.add(payload["session_id"])
raw_events.append(payload)
except OSError as exc:
issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}"))
return None
if source_session and source_session not in session_ids:
issues.append(
CompetenceValidationIssue(
"source_session_mismatch",
f"YAML source session {source_session!r} not found in jsonl sessions {sorted(session_ids)!r}",
)
)
return _normalize_source_events(raw_events)
def _normalize_source_events(raw_events: list[Any]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for raw_event in raw_events:
if not isinstance(raw_event, dict):
continue
nested_event = raw_event.get("event")
if isinstance(nested_event, dict) and isinstance(nested_event.get("type"), str):
event = dict(nested_event)
for key in ("session_id", "timestamp", "machine_id"):
if key not in event and key in raw_event:
event[key] = raw_event[key]
normalized.append(event)
else:
normalized.append(raw_event)
return normalized
def _cleaned_keep_indices(data: dict[str, Any]) -> list[int] | None:
chain_refs = data.get("chain_refs")
if not isinstance(chain_refs, dict):
return None
cleaned = chain_refs.get("cleaned_segment")
if not isinstance(cleaned, dict):
return None
indices = cleaned.get("keep_event_indices")
if not isinstance(indices, list) or not all(isinstance(i, int) for i in indices):
return None
return indices
def _cleaned_method_indices(data: dict[str, Any]) -> list[int] | None:
chain_refs = data.get("chain_refs")
if not isinstance(chain_refs, dict):
return None
cleaned = chain_refs.get("cleaned_segment")
if not isinstance(cleaned, dict):
return None
indices = cleaned.get("method_event_indices")
if not _is_int_list(indices):
return None
return indices
def _methods_execution_mode(data: dict[str, Any]) -> str:
mode = data.get("methods_execution", "alternatives")
return mode if mode in METHODS_EXECUTION_MODES else "alternatives"
def _cleaned_source_event_format(data: dict[str, Any]) -> str:
chain_refs = data.get("chain_refs")
if not isinstance(chain_refs, dict):
return "streaming_session_json"
cleaned = chain_refs.get("cleaned_segment")
if not isinstance(cleaned, dict):
return "streaming_session_json"
value = cleaned.get("source_event_format")
return value if value == "raw_live_events_jsonl" else "streaming_session_json"
def _minimum_success_index_after_methods(data: dict[str, Any], method_indices: list[int]) -> int:
last_method_index = max(method_indices)
if _last_observed_method_is_wait_state_at(data, last_method_index):
return last_method_index
return last_method_index + 1
def _last_observed_method_is_wait_state_at(data: dict[str, Any], event_index: int) -> bool:
methods = data.get("methods")
if not isinstance(methods, list):
return False
for method in methods:
if not isinstance(method, dict) or method.get("observed") is not True:
continue
if method.get("kind") != "wait_state":
continue
trace_indices = _method_trace_indices(method)
if trace_indices and max(trace_indices) == event_index:
return True
return False
def _method_trace_indices(method: dict[str, Any]) -> list[int] | None:
indices = method.get("trace_event_indices")
if not _is_int_list(indices):
return None
return indices
def _method_scroll_direction(method: dict[str, Any]) -> str | None:
parameters = method.get("parameters")
if not isinstance(parameters, dict):
return None
direction = parameters.get("direction")
return direction if isinstance(direction, str) else None
def _is_scroll_delta(value: Any) -> bool:
return (
isinstance(value, list)
and len(value) >= 2
and isinstance(value[0], int)
and isinstance(value[1], int)
and not isinstance(value[0], bool)
and not isinstance(value[1], bool)
)
def _scroll_delta_matches_direction(delta: list[Any], direction: str) -> bool:
if direction == "down":
return delta[1] < 0
if direction == "up":
return delta[1] > 0
if direction == "left":
return delta[0] < 0
if direction == "right":
return delta[0] > 0
return True
def _method_key_combo_keys(method: dict[str, Any]) -> Any:
keys = method.get("keys")
if keys is not None:
return keys
parameters = method.get("parameters")
if isinstance(parameters, dict):
return parameters.get("keys")
return None
def _trace_has_key_combo(
events: list[dict[str, Any]],
keep_indices: list[int] | None,
expected_keys: list[str],
) -> bool:
expected = _normalize_key_combo_sequence(expected_keys)
for index, event in enumerate(events):
if keep_indices is not None and index not in keep_indices:
continue
if event.get("type") != "key_combo":
continue
keys = event.get("keys")
if _is_string_list(keys) and _normalize_key_combo_sequence(keys) == expected:
return True
return False
def _trace_success_marker_match_indices(
events: list[dict[str, Any]],
keep_indices: list[int] | None,
markers: list[Any],
) -> list[int]:
marker_maps = [marker for marker in markers if isinstance(marker, dict)]
matches: list[int] = []
for index, event in enumerate(events):
if keep_indices is not None and index not in keep_indices:
continue
for marker in marker_maps:
kind = marker.get("kind")
if kind == "active_process_name_is":
expected = str(marker.get("value") or "").casefold()
if expected and _event_process(event).casefold() == expected:
matches.append(index)
break
elif kind == "active_window_title_in":
values = marker.get("values")
if _is_string_list(values) and _event_title(event).casefold() in {v.casefold() for v in values}:
matches.append(index)
break
elif kind == "ocr_contains":
# OCR is not required for offline validation if another marker
# proves the state in the captured segment.
continue
return matches
def _event_title(event: dict[str, Any]) -> str:
window = event.get("window") if isinstance(event.get("window"), dict) else {}
to_window = event.get("to") if isinstance(event.get("to"), dict) else {}
return str(window.get("title") or event.get("active_window_title") or to_window.get("title") or "")
def _event_process(event: dict[str, Any]) -> str:
window = event.get("window") if isinstance(event.get("window"), dict) else {}
to_window = event.get("to") if isinstance(event.get("to"), dict) else {}
return str(window.get("app_name") or to_window.get("app_name") or "")
def _concat_text_input_events(events: list[dict[str, Any]], indices: list[int]) -> str:
chunks: list[str] = []
for index in indices:
if 0 <= index < len(events):
chunks.append(str(events[index].get("text") or ""))
return "".join(chunks)
def _repo_path(repo_root: Path, value: str) -> Path:
path = Path(value)
if path.is_absolute():
return path
return repo_root / path
def _is_primitive_path(path: Path, repo_root: Path) -> bool:
try:
relative = path.resolve().relative_to(repo_root.resolve())
except (OSError, ValueError):
relative = path
return len(relative.parts) >= 3 and relative.parts[0] == "data" and relative.parts[1] == "primitives"
def _read_yaml_mapping(path: Path, issues: list[CompetenceValidationIssue]) -> dict[str, Any] | None:
try:
data = yaml.safe_load(path.read_text(encoding="utf-8"))
except (OSError, yaml.YAMLError) as exc:
issues.append(CompetenceValidationIssue("yaml_invalid", str(exc)))
return None
if not isinstance(data, dict):
issues.append(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"))
return None
return data
def _find_competence_dependency_path(
repo_root: Path,
competence_id: str,
*,
minimum_state: str,
) -> Path:
try:
start = LEARNING_STATE_ORDER.index(minimum_state)
except ValueError:
return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml"
for state in LEARNING_STATE_ORDER[start:]:
path = repo_root / "data" / "competences" / state / f"{competence_id}.yaml"
if path.is_file():
return path
return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml"
def _normalize_key(key: str) -> str:
normalized = str(key or "").strip().casefold()
return KEY_ALIASES.get(normalized, normalized)
def _normalize_key_combo_sequence(keys: list[str]) -> list[str]:
normalized = [_normalize_key(key) for key in keys]
if set(normalized) in ({"shift", "ctrl", "@"}, {"shift", "ctrl", "\x13"}):
return ["ctrl", "s"]
return normalized
def _is_string_list(value: Any) -> bool:
return isinstance(value, list) and bool(value) and all(isinstance(item, str) and item for item in value)
def _is_int_list(value: Any) -> bool:
return isinstance(value, list) and bool(value) and all(isinstance(item, int) and item >= 0 for item in value)
def _is_int_in_range(value: Any, minimum: int, maximum: int) -> bool:
return isinstance(value, int) and not isinstance(value, bool) and minimum <= value <= maximum
def _primitive_value_matches_type(value: Any, expected_type: Any) -> bool:
if expected_type == "str":
return isinstance(value, str)
if expected_type == "int":
return isinstance(value, int) and not isinstance(value, bool)
if expected_type == "bool":
return isinstance(value, bool)
if expected_type == "list[str]":
return _is_string_list(value)
if expected_type == "dict":
return isinstance(value, dict)
if expected_type == "dict_or_string":
return isinstance(value, dict) or isinstance(value, str)
return True
def _is_number_in_range(value: Any, minimum: float, maximum: float) -> bool:
return isinstance(value, (int, float)) and not isinstance(value, bool) and minimum <= float(value) <= maximum
def _distinct_context_signatures(contexts: list[Any]) -> set[tuple[Any, ...]]:
dimensions = ("dpi", "screen", "app_in_focus", "method_used", "screen_signature")
signatures: set[tuple[Any, ...]] = set()
for context in contexts:
if not isinstance(context, dict):
continue
signature = tuple(context.get(dimension) for dimension in dimensions)
if any(value not in (None, "", [], {}) for value in signature):
signatures.add(signature)
return signatures
def _dedupe_issues(issues: list[CompetenceValidationIssue]) -> tuple[CompetenceValidationIssue, ...]:
seen: set[tuple[str, str]] = set()
deduped: list[CompetenceValidationIssue] = []
for issue in issues:
key = (issue.code, issue.detail)
if key in seen:
continue
seen.add(key)
deduped.append(issue)
return tuple(deduped)
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description="Validate Lea short competence YAML files")
parser.add_argument("paths", nargs="+", help="YAML competence file(s) to validate")
parser.add_argument("--json", action="store_true", help="emit JSON report")
args = parser.parse_args(argv)
reports = [validate_file(path) for path in args.paths]
if args.json:
print(json.dumps([report.to_dict() for report in reports], ensure_ascii=False, indent=2))
else:
for report in reports:
status = "ok" if report.valid else "fail"
print(f"{status}: {report.path}")
for issue in report.issues:
print(f" - {issue.code}: {issue.detail}")
return 0 if all(report.valid for report in reports) else 1
if __name__ == "__main__":
raise SystemExit(main())