#!/usr/bin/env python3 """Lightweight validator for Lea short competence YAML files. This module is deliberately offline-only: it reads YAML and trace files, but it does not start services, load models, replay actions, or promote competences. """ from __future__ import annotations import argparse import json import re import sys from dataclasses import asdict, dataclass from pathlib import Path from typing import Any import yaml REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) try: from agent_v0.agent_v1.ui.message_contract import ( MessageContractError, format_supervised_pause_message, ) except Exception: # pragma: no cover - partial deployments can still run basics MessageContractError = ValueError format_supervised_pause_message = None LEARNING_STATES = {"observed", "candidate", "supervised", "stable"} LEARNING_STATE_ORDER = ("observed", "candidate", "supervised", "stable") METHODS_EXECUTION_MODES = {"alternatives", "sequence"} PRIMITIVES_DIR = Path("data") / "primitives" PRIMITIVE_PARAMETER_TYPES = {"str", "int", "bool", "list[str]", "dict", "dict_or_string"} PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS = { "schema_version", "id", "kind", "marker_or_action", "intent", "version", "parameters_schema", "failure_message_template", "created_at", } PRIMITIVE_FORBIDDEN_FIELDS = { "learning_state", "chain_refs", "promotion", "generalisation", "failure_log", "success_marker", "preconditions", "methods", } REQUIRED_TOP_LEVEL_KEYS = { "schema_version", "id", "name", "version", "learning_state", "intent", "parameters", "preconditions", "methods", "success_marker", "failure_message_template", "chain_refs", "promotion", } BLOCKED_DURABLE_COORDINATE_KEYS = { "x", "y", "left", "top", "width", "height", "w", "h", "pos", "bbox", "bounds", "rect", "coordinates", "x_pct", "y_pct", "window_bounds", "screen_resolution", } KEY_ALIASES = { "cmd": "win", "command": "win", "meta": "win", "super": "win", "windows": "win", } @dataclass(frozen=True) class CompetenceValidationIssue: code: str detail: str @dataclass(frozen=True) class CompetenceValidationReport: path: str issues: tuple[CompetenceValidationIssue, ...] @property def valid(self) -> bool: return not self.issues def to_dict(self) -> dict[str, Any]: data = asdict(self) data["valid"] = self.valid return data def validate_competence_file( path: str | Path, *, repo_root: str | Path | None = None, ) -> CompetenceValidationReport: competence_path = Path(path) root = Path(repo_root) if repo_root is not None else REPO_ROOT issues: list[CompetenceValidationIssue] = [] try: data = yaml.safe_load(competence_path.read_text(encoding="utf-8")) except FileNotFoundError: return CompetenceValidationReport( str(competence_path), (CompetenceValidationIssue("file_missing", f"{competence_path} does not exist"),), ) except yaml.YAMLError as exc: return CompetenceValidationReport( str(competence_path), (CompetenceValidationIssue("yaml_invalid", str(exc)),), ) if not isinstance(data, dict): return CompetenceValidationReport( str(competence_path), (CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),), ) _validate_required_shape(data, competence_path, issues) _validate_promotion_state(data, issues) _validate_t2_known_gaps(data, issues) _validate_methods_execution(data, issues) _validate_no_durable_coordinates(data, issues) _validate_failure_message_template(data, issues) _validate_preconditions(data, root, issues) _validate_methods_and_trace(data, root, issues) _validate_success_marker(data, root, issues) _validate_chain_refs(data, root, issues) return CompetenceValidationReport(str(competence_path), _dedupe_issues(issues)) def validate_primitive_file( path: str | Path, *, repo_root: str | Path | None = None, ) -> CompetenceValidationReport: primitive_path = Path(path) issues: list[CompetenceValidationIssue] = [] try: data = yaml.safe_load(primitive_path.read_text(encoding="utf-8")) except FileNotFoundError: return CompetenceValidationReport( str(primitive_path), (CompetenceValidationIssue("file_missing", f"{primitive_path} does not exist"),), ) except yaml.YAMLError as exc: return CompetenceValidationReport( str(primitive_path), (CompetenceValidationIssue("yaml_invalid", str(exc)),), ) if not isinstance(data, dict): return CompetenceValidationReport( str(primitive_path), (CompetenceValidationIssue("schema_type", "root YAML node must be a mapping"),), ) _validate_primitive_required_shape(data, primitive_path, issues) _validate_primitive_parameters_schema(data, issues) _validate_no_durable_coordinates(data, issues) _validate_failure_message_template(data, issues) return CompetenceValidationReport(str(primitive_path), _dedupe_issues(issues)) def validate_file( path: str | Path, *, repo_root: str | Path | None = None, ) -> CompetenceValidationReport: root = Path(repo_root) if repo_root is not None else REPO_ROOT candidate_path = Path(path) if _is_primitive_path(candidate_path, root): return validate_primitive_file(candidate_path, repo_root=root) return validate_competence_file(candidate_path, repo_root=root) def _validate_required_shape( data: dict[str, Any], competence_path: Path, issues: list[CompetenceValidationIssue], ) -> None: missing = sorted(REQUIRED_TOP_LEVEL_KEYS - set(data.keys())) for key in missing: issues.append(CompetenceValidationIssue("missing_key", f"missing top-level key: {key}")) if data.get("schema_version") != 1: issues.append(CompetenceValidationIssue("schema_version", "schema_version must be 1")) competence_id = data.get("id") if not isinstance(competence_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", competence_id): issues.append(CompetenceValidationIssue("id_invalid", "id must be a lowercase slug")) elif competence_id != competence_path.stem: issues.append( CompetenceValidationIssue( "id_filename_mismatch", f"id must match filename stem: id={competence_id!r} filename={competence_path.stem!r}", ) ) version = data.get("version") if not isinstance(version, int) or version < 1: issues.append(CompetenceValidationIssue("version_invalid", "version must be a positive integer")) state = data.get("learning_state") if state not in LEARNING_STATES: issues.append( CompetenceValidationIssue( "learning_state_invalid", f"learning_state must be one of {sorted(LEARNING_STATES)}", ) ) for key in ("intent", "parameters", "success_marker", "failure_message_template", "chain_refs", "promotion"): if key in data and not isinstance(data.get(key), dict): issues.append(CompetenceValidationIssue("mapping_expected", f"{key} must be a mapping")) for key in ("preconditions", "methods"): if key in data and not isinstance(data.get(key), list): issues.append(CompetenceValidationIssue("list_expected", f"{key} must be a list")) def _validate_primitive_required_shape( data: dict[str, Any], primitive_path: Path, issues: list[CompetenceValidationIssue], ) -> None: missing = sorted(PRIMITIVE_REQUIRED_TOP_LEVEL_KEYS - set(data.keys())) for key in missing: issues.append(CompetenceValidationIssue("primitive_missing_key", f"missing primitive key: {key}")) for key in sorted(PRIMITIVE_FORBIDDEN_FIELDS & set(data.keys())): issues.append(CompetenceValidationIssue("primitive_forbidden_field", f"primitive must not define {key}")) if data.get("schema_version") != 1: issues.append(CompetenceValidationIssue("primitive_file_invalid", "schema_version must be 1")) primitive_id = data.get("id") if not isinstance(primitive_id, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_id): issues.append(CompetenceValidationIssue("primitive_file_invalid", "id must be a lowercase slug")) elif primitive_id != primitive_path.stem: issues.append( CompetenceValidationIssue( "primitive_id_filename_mismatch", f"id must match filename stem: id={primitive_id!r} filename={primitive_path.stem!r}", ) ) if data.get("kind") != "primitive": issues.append(CompetenceValidationIssue("primitive_file_invalid", "kind must be primitive")) if data.get("marker_or_action") not in {"action", "marker"}: issues.append(CompetenceValidationIssue("primitive_file_invalid", "marker_or_action must be action or marker")) version = data.get("version") if not isinstance(version, int) or version < 1: issues.append(CompetenceValidationIssue("primitive_file_invalid", "version must be a positive integer")) intent = data.get("intent") if not isinstance(intent, dict) or not isinstance(intent.get("fr"), str) or not intent.get("fr", "").strip(): issues.append(CompetenceValidationIssue("primitive_file_invalid", "intent.fr must be non-empty text")) if "executor_kind" in data and (not isinstance(data.get("executor_kind"), str) or not data.get("executor_kind", "").strip()): issues.append(CompetenceValidationIssue("primitive_file_invalid", "executor_kind must be non-empty text")) if "notes" in data and not _is_string_list(data.get("notes")): issues.append(CompetenceValidationIssue("primitive_file_invalid", "notes must be a non-empty text list")) if "last_updated_at" in data and not isinstance(data.get("last_updated_at"), str): issues.append(CompetenceValidationIssue("primitive_file_invalid", "last_updated_at must be text")) def _validate_primitive_parameters_schema( data: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: schema = data.get("parameters_schema") if not isinstance(schema, dict) or not schema: issues.append(CompetenceValidationIssue("primitive_file_invalid", "parameters_schema must be a non-empty mapping")) return for param_name, spec in schema.items(): if not isinstance(param_name, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", param_name): issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", "parameter names must be lowercase slugs")) continue if not isinstance(spec, dict): issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name} schema must be a mapping")) continue param_type = spec.get("type") if param_type not in PRIMITIVE_PARAMETER_TYPES: issues.append( CompetenceValidationIssue( "primitive_param_schema_invalid", f"{param_name}.type must be one of {sorted(PRIMITIVE_PARAMETER_TYPES)}", ) ) required = spec.get("required") if required is not None and not isinstance(required, bool): issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.required must be bool")) if required is True and "default" in spec: issues.append( CompetenceValidationIssue( "primitive_param_schema_invalid", f"{param_name} cannot define default when required=true", ) ) required_unless = spec.get("required_unless") if required_unless is not None: if not _is_string_list(required_unless): issues.append( CompetenceValidationIssue( "primitive_param_schema_invalid", f"{param_name}.required_unless must be a non-empty text list", ) ) else: missing_refs = [name for name in required_unless if name not in schema] if missing_refs: issues.append( CompetenceValidationIssue( "primitive_param_schema_invalid", f"{param_name}.required_unless references unknown parameters: {missing_refs}", ) ) description = spec.get("description") if not isinstance(description, str) or not description.strip(): issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.description is required")) constraints = spec.get("constraints") if constraints is not None and not isinstance(constraints, dict): issues.append(CompetenceValidationIssue("primitive_param_schema_invalid", f"{param_name}.constraints must be a mapping")) elif isinstance(constraints, dict): enum = constraints.get("enum") if enum is not None and (not isinstance(enum, list) or not enum): issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"parameters_schema.{param_name}.constraints.enum must be a non-empty list", ) ) for min_key in ("min", "min_value"): min_value = constraints.get(min_key) if min_value is not None and (not isinstance(min_value, int) or isinstance(min_value, bool)): issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"parameters_schema.{param_name}.constraints.{min_key} must be an integer", ) ) def _validate_promotion_state(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None: state = data.get("learning_state") if state not in LEARNING_STATES: return promotion = data.get("promotion") if isinstance(data.get("promotion"), dict) else {} stable_requires = promotion.get("stable_requires") if isinstance(promotion.get("stable_requires"), dict) else {} min_successes = stable_requires.get("min_successes") if min_successes is not None and (not isinstance(min_successes, int) or min_successes < 3): issues.append( CompetenceValidationIssue( "promotion_stable_requires", "promotion.stable_requires.min_successes must be at least 3", ) ) if state == "observed": return chain_refs = data.get("chain_refs") if isinstance(data.get("chain_refs"), dict) else {} cleaned = chain_refs.get("cleaned_segment") if isinstance(chain_refs.get("cleaned_segment"), dict) else {} generalisation = data.get("generalisation") if isinstance(data.get("generalisation"), dict) else {} seen_contexts = generalisation.get("seen_contexts") if isinstance(generalisation.get("seen_contexts"), list) else [] history = promotion.get("history") if isinstance(promotion.get("history"), list) else [] if state == "candidate": if cleaned.get("status") != "documented_offline": issues.append( CompetenceValidationIssue( "learning_state_premature", "candidate requires chain_refs.cleaned_segment.status=documented_offline", ) ) return if state == "supervised": if not seen_contexts or not history: issues.append( CompetenceValidationIssue( "learning_state_premature", "supervised requires seen contexts and promotion.history", ) ) return if state == "stable": if len(seen_contexts) < 3 or len(_distinct_context_signatures(seen_contexts)) < 3: issues.append( CompetenceValidationIssue( "learning_state_premature", "stable requires at least 3 distinct seen contexts", ) ) def _validate_t2_known_gaps(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None: promotion = data.get("promotion") if not isinstance(promotion, dict): return gaps = promotion.get("t2_known_gaps") if gaps is None: return if not isinstance(gaps, list): issues.append(CompetenceValidationIssue("t2_known_gap_invalid", "promotion.t2_known_gaps must be a list")) return required_keys = ("id", "description", "impact", "proposed_resolution") optional_text_keys = ("acted_by", "acted_at") for index, gap in enumerate(gaps): if not isinstance(gap, dict): issues.append( CompetenceValidationIssue("t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}] must be a mapping") ) continue gap_id = gap.get("id") if isinstance(gap_id, str) and gap_id.strip() and not re.fullmatch(r"[a-z][a-z0-9_]*", gap_id): issues.append( CompetenceValidationIssue( "t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}].id must be a lowercase slug", ) ) for key in required_keys: value = gap.get(key) if not isinstance(value, str) or not value.strip(): issues.append( CompetenceValidationIssue( "t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}].{key} is required", ) ) for key in optional_text_keys: if key in gap and (not isinstance(gap.get(key), str) or not gap.get(key, "").strip()): issues.append( CompetenceValidationIssue( "t2_known_gap_invalid", f"promotion.t2_known_gaps[{index}].{key} must be non-empty text when present", ) ) def _validate_methods_execution(data: dict[str, Any], issues: list[CompetenceValidationIssue]) -> None: mode = data.get("methods_execution", "alternatives") if mode not in METHODS_EXECUTION_MODES: issues.append( CompetenceValidationIssue( "methods_sequence_invalid", f"methods_execution must be one of {sorted(METHODS_EXECUTION_MODES)}", ) ) return methods = data.get("methods") if not isinstance(methods, list): return if mode == "sequence" and len(methods) < 2: issues.append( CompetenceValidationIssue( "methods_sequence_invalid", "methods_execution=sequence requires at least two methods", ) ) return keep_indices = _cleaned_keep_indices(data) method_indices = _cleaned_method_indices(data) seen_ids: set[str] = set() last_trace_index = -1 for index, method in enumerate(methods): if not isinstance(method, dict): continue method_id = method.get("id") if mode == "sequence" and isinstance(method_id, str) and method_id.strip(): if method_id in seen_ids: issues.append( CompetenceValidationIssue( "methods_sequence_invalid", f"methods[{index}].id must be unique in sequence mode", ) ) seen_ids.add(method_id) if method.get("observed") is not True: continue trace_indices = method.get("trace_event_indices") if trace_indices is None and mode != "sequence": continue trace_issue_code = "methods_sequence_invalid" if mode == "sequence" else "method_trace_missing" if not _is_int_list(trace_indices): issues.append( CompetenceValidationIssue( trace_issue_code, f"methods[{index}].trace_event_indices must be a non-empty integer list", ) ) continue if keep_indices is not None: missing_keep_indices = [event_index for event_index in trace_indices if event_index not in keep_indices] if missing_keep_indices: issues.append( CompetenceValidationIssue( trace_issue_code, f"methods[{index}].trace_event_indices must be included in keep_event_indices: {missing_keep_indices}", ) ) if method_indices is not None: missing_method_indices = [event_index for event_index in trace_indices if event_index not in method_indices] if missing_method_indices: issues.append( CompetenceValidationIssue( trace_issue_code, f"methods[{index}].trace_event_indices must be included in method_event_indices: {missing_method_indices}", ) ) if mode == "sequence" and min(trace_indices) <= last_trace_index: issues.append( CompetenceValidationIssue( "methods_sequence_invalid", f"methods[{index}].trace_event_indices must follow previous observed step", ) ) if mode == "sequence": last_trace_index = max(last_trace_index, max(trace_indices)) def _validate_no_durable_coordinates(data: Any, issues: list[CompetenceValidationIssue], path: str = "") -> None: if isinstance(data, dict): for key, value in data.items(): key_text = str(key) key_path = f"{path}.{key_text}" if path else key_text key_lower = key_text.lower() pct_relative_offset = key_lower in {"x_pct", "y_pct"} and path.endswith("relative_offset") if key_lower in BLOCKED_DURABLE_COORDINATE_KEYS and not pct_relative_offset: issues.append( CompetenceValidationIssue( "durable_coordinate_key", f"durable competence data must not store coordinates: {key_path}", ) ) _validate_no_durable_coordinates(value, issues, key_path) elif isinstance(data, list): for index, value in enumerate(data): _validate_no_durable_coordinates(value, issues, f"{path}[{index}]") def _validate_failure_message_template( data: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: template = data.get("failure_message_template") if not isinstance(template, dict): return required = ("intention", "attendu", "vu", "demande") for key in required: if not isinstance(template.get(key), str) or not template.get(key, "").strip(): issues.append( CompetenceValidationIssue( "failure_message_template", f"failure_message_template.{key} must be non-empty text", ) ) if any(key not in template for key in required): return if format_supervised_pause_message is None: return try: format_supervised_pause_message( intention=template["intention"], attendu=template["attendu"], vu=template["vu"].replace( "{observed_human_state}", "la fenetre attendue n'est pas visible", ), demande=template["demande"], ) except MessageContractError as exc: issues.append( CompetenceValidationIssue( "failure_message_contract", str(exc), ) ) def _validate_preconditions( data: dict[str, Any], repo_root: Path, issues: list[CompetenceValidationIssue], ) -> None: preconditions = data.get("preconditions") if not isinstance(preconditions, list): return competence_id = data.get("id") for index, precondition in enumerate(preconditions): if not isinstance(precondition, dict): continue if precondition.get("kind") != "competence_required": continue dependency = precondition.get("competence") state = precondition.get("state") if not isinstance(dependency, str) or not dependency.strip(): issues.append( CompetenceValidationIssue( "competence_dependency_invalid", f"preconditions[{index}].competence must be non-empty text", ) ) continue if dependency == competence_id: issues.append( CompetenceValidationIssue( "competence_dependency_invalid", f"preconditions[{index}] must not depend on itself", ) ) if state not in LEARNING_STATES: issues.append( CompetenceValidationIssue( "competence_dependency_invalid", f"preconditions[{index}].state must be one of {sorted(LEARNING_STATES)}", ) ) continue dependency_path = _find_competence_dependency_path( repo_root, dependency, minimum_state=str(state), ) if not dependency_path.is_file(): issues.append( CompetenceValidationIssue( "competence_dependency_missing", f"required competence not found: {dependency} with minimum state {state}", ) ) def _validate_methods_and_trace( data: dict[str, Any], repo_root: Path, issues: list[CompetenceValidationIssue], ) -> None: methods = data.get("methods") if not isinstance(methods, list): return if not methods: issues.append(CompetenceValidationIssue("methods_empty", "at least one method is required")) return source_events = _load_source_events(data, repo_root, issues) keep_indices = _cleaned_keep_indices(data) for index, method in enumerate(methods): if not isinstance(method, dict): issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}] must be a mapping")) continue for key in ("id", "kind"): if not isinstance(method.get(key), str) or not method.get(key, "").strip(): issues.append(CompetenceValidationIssue("method_invalid", f"methods[{index}].{key} is required")) kind = method.get("kind") _validate_method_primitive_ref(method, kind, index, repo_root, issues) if kind == "key_combo": keys = _method_key_combo_keys(method) if not _is_string_list(keys): issues.append(CompetenceValidationIssue("method_keys_invalid", f"methods[{index}].keys must be text list")) continue if method.get("observed") is True: if not method.get("trace_source"): issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source")) trace_indices = _method_trace_indices(method) or keep_indices if source_events is not None and not _trace_has_key_combo(source_events, trace_indices, keys): issues.append( CompetenceValidationIssue( "method_trace_missing", f"observed key_combo {keys!r} not found in cleaned source segment", ) ) elif kind == "text_input" and method.get("observed") is True: if not method.get("trace_source"): issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source")) method_indices = _method_trace_indices(method) or _cleaned_method_indices(data) if source_events is None: continue if not method_indices: issues.append( CompetenceValidationIssue( "method_trace_missing", f"observed text_input method {method.get('id') or index} requires method_event_indices", ) ) continue non_text_indices = [ event_index for event_index in method_indices if event_index >= len(source_events) or source_events[event_index].get("type") != "text_input" ] if non_text_indices: issues.append( CompetenceValidationIssue( "method_trace_missing", f"method_event_indices contain non text_input events: {non_text_indices}", ) ) continue reconstructed = method.get("reconstructed_text") if isinstance(reconstructed, str): observed_text = _concat_text_input_events(source_events, method_indices) if observed_text != reconstructed: issues.append( CompetenceValidationIssue( "method_reconstructed_text_mismatch", f"reconstructed_text={reconstructed!r} trace_text={observed_text!r}", ) ) elif kind == "scroll" and method.get("observed") is True: if not method.get("trace_source"): issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source")) method_indices = _method_trace_indices(method) or _cleaned_method_indices(data) if source_events is None: continue if not method_indices: issues.append( CompetenceValidationIssue( "method_trace_missing", f"observed scroll method {method.get('id') or index} requires trace_event_indices or method_event_indices", ) ) continue _validate_scroll_method_trace(method, index, source_events, method_indices, issues) elif kind == "click" and method.get("observed") is True: if not method.get("trace_source"): issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source")) method_indices = _method_trace_indices(method) or _cleaned_method_indices(data) if source_events is None: continue if not method_indices: issues.append( CompetenceValidationIssue( "method_trace_missing", f"observed click method {method.get('id') or index} requires trace_event_indices or method_event_indices", ) ) continue _validate_click_method_trace(index, source_events, method_indices, issues) elif kind == "wait_state" and method.get("observed") is True: if not method.get("trace_source"): issues.append(CompetenceValidationIssue("method_trace_source", f"methods[{index}] missing trace_source")) method_indices = _method_trace_indices(method) or _cleaned_method_indices(data) if source_events is None: continue if not method_indices: issues.append( CompetenceValidationIssue( "method_trace_missing", f"observed wait_state method {method.get('id') or index} requires trace_event_indices or method_event_indices", ) ) continue _validate_wait_state_method_trace(index, source_events, method_indices, issues) def _validate_method_primitive_ref( method: dict[str, Any], method_kind: Any, method_index: int, repo_root: Path, issues: list[CompetenceValidationIssue], ) -> None: primitive_ref = method.get("primitive_ref") if primitive_ref is None: return if not isinstance(primitive_ref, str) or not re.fullmatch(r"[a-z][a-z0-9_]*", primitive_ref): issues.append( CompetenceValidationIssue( "primitive_ref_invalid", f"methods[{method_index}].primitive_ref must be a lowercase slug", ) ) return primitive_path = repo_root / PRIMITIVES_DIR / f"{primitive_ref}.yaml" if not primitive_path.is_file(): issues.append( CompetenceValidationIssue( "primitive_ref_unknown", f"primitive_ref={primitive_ref!r}: file not found: {primitive_path.relative_to(repo_root)}", ) ) return primitive_report = validate_primitive_file(primitive_path, repo_root=repo_root) if not primitive_report.valid: for issue in primitive_report.issues: issues.append( CompetenceValidationIssue( "primitive_file_invalid", f"primitive_ref={primitive_ref!r}: {issue.code}: {issue.detail}", ) ) return primitive = _read_yaml_mapping(primitive_path, issues) if primitive is None: return expected_kind = primitive.get("executor_kind") if isinstance(expected_kind, str) and method_kind != expected_kind: issues.append( CompetenceValidationIssue( "primitive_kind_mismatch", f"primitive_ref={primitive_ref!r} requires kind={expected_kind!r}, got kind={method_kind!r}", ) ) parameters = method.get("parameters") if parameters is None: parameters = {} if not isinstance(parameters, dict): issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} requires methods[{method_index}].parameters to be a mapping", ) ) return schema = primitive.get("parameters_schema") if not isinstance(schema, dict): return for param_name, spec in schema.items(): if not isinstance(spec, dict): continue _validate_primitive_method_parameter(primitive_ref, param_name, spec, parameters, issues) if primitive_ref == "click_anchor": _validate_click_anchor_parameters(parameters, issues) if primitive_ref == "wait_for_state": _validate_wait_for_state_parameters(parameters, issues) def _validate_scroll_method_trace( method: dict[str, Any], method_index: int, events: list[dict[str, Any]], indices: list[int], issues: list[CompetenceValidationIssue], ) -> None: direction = _method_scroll_direction(method) for event_index in indices: if event_index >= len(events) or events[event_index].get("type") != "mouse_scroll": observed_type = events[event_index].get("type") if event_index < len(events) else None issues.append( CompetenceValidationIssue( "method_trace_missing", f"methods[{method_index}] expects type=mouse_scroll, got type={observed_type!r} at event #{event_index}", ) ) continue delta = events[event_index].get("delta") if not _is_scroll_delta(delta): issues.append( CompetenceValidationIssue( "method_scroll_delta_missing", f"methods[{method_index}] points event #{event_index} type=mouse_scroll without usable delta field", ) ) continue if isinstance(direction, str) and not _scroll_delta_matches_direction(delta, direction): issues.append( CompetenceValidationIssue( "method_scroll_direction_mismatch", f"methods[{method_index}] direction={direction!r} does not match delta={delta!r} at event #{event_index}", ) ) def _validate_click_method_trace( method_index: int, events: list[dict[str, Any]], indices: list[int], issues: list[CompetenceValidationIssue], ) -> None: for event_index in indices: if event_index >= len(events) or events[event_index].get("type") != "mouse_click": observed_type = events[event_index].get("type") if event_index < len(events) else None issues.append( CompetenceValidationIssue( "method_trace_missing", f"methods[{method_index}] expects type=mouse_click, got type={observed_type!r} at event #{event_index}", ) ) def _validate_wait_state_method_trace( method_index: int, events: list[dict[str, Any]], indices: list[int], issues: list[CompetenceValidationIssue], ) -> None: for event_index in indices: event = events[event_index] if event_index < len(events) else {} event_type = event.get("type") if event_type == "window_focus_change" and (_event_title(event) or _event_process(event)): continue if event_type == "heartbeat" and (_event_title(event) or _event_process(event)): continue issues.append( CompetenceValidationIssue( "method_trace_missing", f"methods[{method_index}] expects durable wait_state evidence, got type={event_type!r} at event #{event_index}", ) ) def _validate_click_anchor_parameters( parameters: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: anchor_ref = parameters.get("anchor_ref") if not ( isinstance(anchor_ref, str) and anchor_ref.strip() or isinstance(anchor_ref, dict) and bool(anchor_ref) ): issues.append( CompetenceValidationIssue( "primitive_anchor_ref_invalid", "click_anchor requires anchor_ref as non-empty string or mapping", ) ) click_count = parameters.get("click_count", 1) if not isinstance(click_count, int) or isinstance(click_count, bool) or click_count < 1 or click_count > 2: issues.append( CompetenceValidationIssue( "primitive_click_count_out_of_range", "click_anchor click_count must be 1 or 2", ) ) if "relative_offset" in parameters: _validate_click_relative_offset(parameters.get("relative_offset"), issues) def _validate_click_relative_offset( offset: Any, issues: list[CompetenceValidationIssue], ) -> None: if not isinstance(offset, dict): issues.append( CompetenceValidationIssue( "primitive_relative_offset_invalid", "click_anchor relative_offset must be a mapping", ) ) return keys = set(offset.keys()) if keys == {"x_pct", "y_pct"}: if not all(_is_number_in_range(offset[key], 0.0, 1.0) for key in ("x_pct", "y_pct")): issues.append( CompetenceValidationIssue( "primitive_relative_offset_invalid", "click_anchor relative_offset x_pct/y_pct must be numbers between 0.0 and 1.0", ) ) return if keys == {"dx", "dy"}: if not all(_is_number_in_range(offset[key], -0.5, 0.5) for key in ("dx", "dy")): issues.append( CompetenceValidationIssue( "primitive_relative_offset_invalid", "click_anchor relative_offset dx/dy must be numbers between -0.5 and 0.5", ) ) return issues.append( CompetenceValidationIssue( "primitive_relative_offset_invalid", "click_anchor relative_offset must use exactly x_pct/y_pct or dx/dy", ) ) def _validate_wait_for_state_parameters( parameters: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: expected_state = parameters.get("expected_state") if not isinstance(expected_state, dict) or not expected_state: issues.append( CompetenceValidationIssue( "primitive_expected_state_invalid", "wait_for_state expected_state must be a non-empty mapping", ) ) timeout_ms = parameters.get("timeout_ms", 5000) if not _is_int_in_range(timeout_ms, 100, 60000): issues.append( CompetenceValidationIssue( "primitive_wait_timeout_invalid", "wait_for_state timeout_ms must be an integer between 100 and 60000", ) ) poll_interval_ms = parameters.get("poll_interval_ms", 250) if not _is_int_in_range(poll_interval_ms, 50, 5000): issues.append( CompetenceValidationIssue( "primitive_poll_interval_invalid", "wait_for_state poll_interval_ms must be an integer between 50 and 5000", ) ) def _validate_primitive_method_parameter( primitive_ref: str, param_name: str, spec: dict[str, Any], parameters: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: required = spec.get("required") is True required_unless = spec.get("required_unless") is_present = param_name in parameters if required and not is_present: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} requires parameter {param_name!r}", ) ) return if _is_string_list(required_unless): alternatives_present = [name for name in required_unless if name in parameters] if is_present and alternatives_present: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameters {param_name!r} and {alternatives_present!r} are mutually exclusive", ) ) elif not is_present and not alternatives_present: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} requires parameter {param_name!r} unless one of {required_unless!r} is present", ) ) return if is_present and not _primitive_value_matches_type(parameters[param_name], spec.get("type")): issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameter {param_name!r} has invalid type {spec.get('type')!r}", ) ) constraints = spec.get("constraints") if is_present and isinstance(constraints, dict): _validate_primitive_method_parameter_constraints( primitive_ref, param_name, parameters[param_name], constraints, issues, ) def _validate_primitive_method_parameter_constraints( primitive_ref: str, param_name: str, value: Any, constraints: dict[str, Any], issues: list[CompetenceValidationIssue], ) -> None: min_length = constraints.get("min_length") if isinstance(min_length, int) and hasattr(value, "__len__") and len(value) < min_length: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameter {param_name!r} must have length >= {min_length}", ) ) min_value = constraints.get("min", constraints.get("min_value")) if isinstance(min_value, int) and isinstance(value, int) and not isinstance(value, bool) and value < min_value: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be >= {min_value}", ) ) enum = constraints.get("enum") if isinstance(enum, list) and value not in enum: issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameter {param_name!r} must be one of {enum!r}", ) ) regex = constraints.get("regex") if isinstance(regex, str) and isinstance(value, str) and not re.fullmatch(regex, value): issues.append( CompetenceValidationIssue( "primitive_schema_invalid", f"primitive_ref={primitive_ref!r} parameter {param_name!r} must match {regex!r}", ) ) def _validate_success_marker( data: dict[str, Any], repo_root: Path, issues: list[CompetenceValidationIssue], ) -> None: marker = data.get("success_marker") if not isinstance(marker, dict): return if marker.get("mode") not in {"any_of", "all_of"}: issues.append(CompetenceValidationIssue("success_marker_mode", "success_marker.mode must be any_of or all_of")) timeout_ms = marker.get("timeout_ms") if not isinstance(timeout_ms, int) or timeout_ms <= 0: issues.append(CompetenceValidationIssue("success_marker_timeout", "success_marker.timeout_ms must be positive")) markers = marker.get("markers") if not isinstance(markers, list) or not markers: issues.append(CompetenceValidationIssue("success_marker_markers", "success_marker.markers must be a non-empty list")) return for index, item in enumerate(markers): if not isinstance(item, dict) or not item.get("kind"): issues.append(CompetenceValidationIssue("success_marker_invalid", f"markers[{index}] must define kind")) supervised_requires = marker.get("supervised_requires") if supervised_requires is not None and not isinstance(supervised_requires, list): issues.append( CompetenceValidationIssue( "success_marker_invalid", "success_marker.supervised_requires must be a list when present", ) ) source_events = _load_source_events(data, repo_root, issues) if source_events is None: return keep_indices = _cleaned_keep_indices(data) method_indices = _cleaned_method_indices(data) match_indices = _trace_success_marker_match_indices( source_events, keep_indices, markers, ) if not match_indices: issues.append( CompetenceValidationIssue( "success_marker_missing", "no success marker matches the cleaned source segment", ) ) return if method_indices: min_success_index = _minimum_success_index_after_methods(data, method_indices) if not any(index >= min_success_index for index in match_indices): issues.append( CompetenceValidationIssue( "success_marker_pre_method", "success marker must match an event after the observed method", ) ) def _validate_chain_refs( data: dict[str, Any], repo_root: Path, issues: list[CompetenceValidationIssue], ) -> None: chain_refs = data.get("chain_refs") if not isinstance(chain_refs, dict): return if not isinstance(chain_refs.get("source_session"), str) or not chain_refs.get("source_session", "").strip(): issues.append(CompetenceValidationIssue("chain_ref_missing", "chain_refs.source_session is required")) cleaned = chain_refs.get("cleaned_segment") if not isinstance(cleaned, dict): issues.append(CompetenceValidationIssue("cleaned_segment_missing", "chain_refs.cleaned_segment is required")) return source_event_format = cleaned.get("source_event_format") if source_event_format is not None and source_event_format not in {"streaming_session_json", "raw_live_events_jsonl"}: issues.append( CompetenceValidationIssue( "cleaned_segment_source", "cleaned_segment.source_event_format must be streaming_session_json or raw_live_events_jsonl", ) ) required_path_key = "live_events_path" if source_event_format == "raw_live_events_jsonl" else "streaming_session_path" for key in ("streaming_session_path", "live_events_path"): path_value = chain_refs.get(key) if key == required_path_key and (not isinstance(path_value, str) or not path_value.strip()): issues.append(CompetenceValidationIssue("chain_ref_missing", f"chain_refs.{key} is required")) continue if isinstance(path_value, str) and path_value.strip(): resolved = _repo_path(repo_root, path_value) if not resolved.is_file(): issues.append(CompetenceValidationIssue("chain_ref_path_missing", f"{key} not found: {path_value}")) keep_indices = cleaned.get("keep_event_indices") if not isinstance(keep_indices, list) or not keep_indices or not all(isinstance(i, int) and i >= 0 for i in keep_indices): issues.append( CompetenceValidationIssue( "cleaned_segment_indices", "cleaned_segment.keep_event_indices must be a non-empty list of positive indices", ) ) stop_before = cleaned.get("stop_before") if not isinstance(stop_before, list) or not stop_before: issues.append(CompetenceValidationIssue("cleaned_segment_stop", "cleaned_segment.stop_before must document cut reasons")) method_indices = cleaned.get("method_event_indices") success_indices = cleaned.get("success_event_indices") if method_indices is not None and not _is_int_list(method_indices): issues.append( CompetenceValidationIssue( "cleaned_segment_indices", "cleaned_segment.method_event_indices must be a list of positive indices", ) ) if success_indices is not None and not _is_int_list(success_indices): issues.append( CompetenceValidationIssue( "cleaned_segment_indices", "cleaned_segment.success_event_indices must be a list of positive indices", ) ) if _is_int_list(method_indices) and _is_int_list(success_indices): min_success_index = _minimum_success_index_after_methods(data, method_indices) if not any(index >= min_success_index for index in success_indices): issues.append( CompetenceValidationIssue( "success_marker_pre_method", "cleaned_segment.success_event_indices must be after method_event_indices", ) ) if _is_int_list(method_indices) and isinstance(keep_indices, list): missing_method_indices = [index for index in method_indices if index not in keep_indices] if missing_method_indices: issues.append( CompetenceValidationIssue( "cleaned_segment_indices", f"method_event_indices must be included in keep_event_indices: {missing_method_indices}", ) ) source_events = _load_source_events(data, repo_root, issues) if source_events is not None and isinstance(keep_indices, list): for index in keep_indices: if isinstance(index, int) and index >= len(source_events): issues.append( CompetenceValidationIssue( "cleaned_segment_indices", f"cleaned segment index out of range: {index}", ) ) def _load_source_events( data: dict[str, Any], repo_root: Path, issues: list[CompetenceValidationIssue], ) -> list[dict[str, Any]] | None: chain_refs = data.get("chain_refs") if not isinstance(chain_refs, dict): return None source_format = _cleaned_source_event_format(data) path_key = "live_events_path" if source_format == "raw_live_events_jsonl" else "streaming_session_path" path_value = chain_refs.get(path_key) if not isinstance(path_value, str) or not path_value: return None path = _repo_path(repo_root, path_value) if not path.is_file(): return None if source_format == "raw_live_events_jsonl": return _load_jsonl_source_events(path, str(chain_refs.get("source_session") or ""), issues) try: payload = json.loads(path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}")) return None source_session = chain_refs.get("source_session") if source_session and payload.get("session_id") != source_session: issues.append( CompetenceValidationIssue( "source_session_mismatch", f"source session mismatch: YAML={source_session} trace={payload.get('session_id')}", ) ) raw_events = payload.get("events") if not isinstance(raw_events, list): issues.append(CompetenceValidationIssue("source_session_invalid", "source session events must be a list")) return None return _normalize_source_events(raw_events) def _load_jsonl_source_events( path: Path, source_session: str, issues: list[CompetenceValidationIssue], ) -> list[dict[str, Any]] | None: raw_events: list[dict[str, Any]] = [] session_ids: set[str] = set() try: for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): if not line.strip(): continue try: payload = json.loads(line) except json.JSONDecodeError as exc: issues.append( CompetenceValidationIssue( "source_session_invalid", f"cannot read source session jsonl line {line_number}: {exc}", ) ) return None if not isinstance(payload, dict): issues.append( CompetenceValidationIssue( "source_session_invalid", f"source session jsonl line {line_number} must be a mapping", ) ) return None if isinstance(payload.get("session_id"), str): session_ids.add(payload["session_id"]) raw_events.append(payload) except OSError as exc: issues.append(CompetenceValidationIssue("source_session_invalid", f"cannot read source session: {exc}")) return None if source_session and source_session not in session_ids: issues.append( CompetenceValidationIssue( "source_session_mismatch", f"YAML source session {source_session!r} not found in jsonl sessions {sorted(session_ids)!r}", ) ) return _normalize_source_events(raw_events) def _normalize_source_events(raw_events: list[Any]) -> list[dict[str, Any]]: normalized: list[dict[str, Any]] = [] for raw_event in raw_events: if not isinstance(raw_event, dict): continue nested_event = raw_event.get("event") if isinstance(nested_event, dict) and isinstance(nested_event.get("type"), str): event = dict(nested_event) for key in ("session_id", "timestamp", "machine_id"): if key not in event and key in raw_event: event[key] = raw_event[key] normalized.append(event) else: normalized.append(raw_event) return normalized def _cleaned_keep_indices(data: dict[str, Any]) -> list[int] | None: chain_refs = data.get("chain_refs") if not isinstance(chain_refs, dict): return None cleaned = chain_refs.get("cleaned_segment") if not isinstance(cleaned, dict): return None indices = cleaned.get("keep_event_indices") if not isinstance(indices, list) or not all(isinstance(i, int) for i in indices): return None return indices def _cleaned_method_indices(data: dict[str, Any]) -> list[int] | None: chain_refs = data.get("chain_refs") if not isinstance(chain_refs, dict): return None cleaned = chain_refs.get("cleaned_segment") if not isinstance(cleaned, dict): return None indices = cleaned.get("method_event_indices") if not _is_int_list(indices): return None return indices def _methods_execution_mode(data: dict[str, Any]) -> str: mode = data.get("methods_execution", "alternatives") return mode if mode in METHODS_EXECUTION_MODES else "alternatives" def _cleaned_source_event_format(data: dict[str, Any]) -> str: chain_refs = data.get("chain_refs") if not isinstance(chain_refs, dict): return "streaming_session_json" cleaned = chain_refs.get("cleaned_segment") if not isinstance(cleaned, dict): return "streaming_session_json" value = cleaned.get("source_event_format") return value if value == "raw_live_events_jsonl" else "streaming_session_json" def _minimum_success_index_after_methods(data: dict[str, Any], method_indices: list[int]) -> int: last_method_index = max(method_indices) if _last_observed_method_is_wait_state_at(data, last_method_index): return last_method_index return last_method_index + 1 def _last_observed_method_is_wait_state_at(data: dict[str, Any], event_index: int) -> bool: methods = data.get("methods") if not isinstance(methods, list): return False for method in methods: if not isinstance(method, dict) or method.get("observed") is not True: continue if method.get("kind") != "wait_state": continue trace_indices = _method_trace_indices(method) if trace_indices and max(trace_indices) == event_index: return True return False def _method_trace_indices(method: dict[str, Any]) -> list[int] | None: indices = method.get("trace_event_indices") if not _is_int_list(indices): return None return indices def _method_scroll_direction(method: dict[str, Any]) -> str | None: parameters = method.get("parameters") if not isinstance(parameters, dict): return None direction = parameters.get("direction") return direction if isinstance(direction, str) else None def _is_scroll_delta(value: Any) -> bool: return ( isinstance(value, list) and len(value) >= 2 and isinstance(value[0], int) and isinstance(value[1], int) and not isinstance(value[0], bool) and not isinstance(value[1], bool) ) def _scroll_delta_matches_direction(delta: list[Any], direction: str) -> bool: if direction == "down": return delta[1] < 0 if direction == "up": return delta[1] > 0 if direction == "left": return delta[0] < 0 if direction == "right": return delta[0] > 0 return True def _method_key_combo_keys(method: dict[str, Any]) -> Any: keys = method.get("keys") if keys is not None: return keys parameters = method.get("parameters") if isinstance(parameters, dict): return parameters.get("keys") return None def _trace_has_key_combo( events: list[dict[str, Any]], keep_indices: list[int] | None, expected_keys: list[str], ) -> bool: expected = _normalize_key_combo_sequence(expected_keys) for index, event in enumerate(events): if keep_indices is not None and index not in keep_indices: continue if event.get("type") != "key_combo": continue keys = event.get("keys") if _is_string_list(keys) and _normalize_key_combo_sequence(keys) == expected: return True return False def _trace_success_marker_match_indices( events: list[dict[str, Any]], keep_indices: list[int] | None, markers: list[Any], ) -> list[int]: marker_maps = [marker for marker in markers if isinstance(marker, dict)] matches: list[int] = [] for index, event in enumerate(events): if keep_indices is not None and index not in keep_indices: continue for marker in marker_maps: kind = marker.get("kind") if kind == "active_process_name_is": expected = str(marker.get("value") or "").casefold() if expected and _event_process(event).casefold() == expected: matches.append(index) break elif kind == "active_window_title_in": values = marker.get("values") if _is_string_list(values) and _event_title(event).casefold() in {v.casefold() for v in values}: matches.append(index) break elif kind == "ocr_contains": # OCR is not required for offline validation if another marker # proves the state in the captured segment. continue return matches def _event_title(event: dict[str, Any]) -> str: window = event.get("window") if isinstance(event.get("window"), dict) else {} to_window = event.get("to") if isinstance(event.get("to"), dict) else {} return str(window.get("title") or event.get("active_window_title") or to_window.get("title") or "") def _event_process(event: dict[str, Any]) -> str: window = event.get("window") if isinstance(event.get("window"), dict) else {} to_window = event.get("to") if isinstance(event.get("to"), dict) else {} return str(window.get("app_name") or to_window.get("app_name") or "") def _concat_text_input_events(events: list[dict[str, Any]], indices: list[int]) -> str: chunks: list[str] = [] for index in indices: if 0 <= index < len(events): chunks.append(str(events[index].get("text") or "")) return "".join(chunks) def _repo_path(repo_root: Path, value: str) -> Path: path = Path(value) if path.is_absolute(): return path return repo_root / path def _is_primitive_path(path: Path, repo_root: Path) -> bool: try: relative = path.resolve().relative_to(repo_root.resolve()) except (OSError, ValueError): relative = path return len(relative.parts) >= 3 and relative.parts[0] == "data" and relative.parts[1] == "primitives" def _read_yaml_mapping(path: Path, issues: list[CompetenceValidationIssue]) -> dict[str, Any] | None: try: data = yaml.safe_load(path.read_text(encoding="utf-8")) except (OSError, yaml.YAMLError) as exc: issues.append(CompetenceValidationIssue("yaml_invalid", str(exc))) return None if not isinstance(data, dict): issues.append(CompetenceValidationIssue("schema_type", "root YAML node must be a mapping")) return None return data def _find_competence_dependency_path( repo_root: Path, competence_id: str, *, minimum_state: str, ) -> Path: try: start = LEARNING_STATE_ORDER.index(minimum_state) except ValueError: return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml" for state in LEARNING_STATE_ORDER[start:]: path = repo_root / "data" / "competences" / state / f"{competence_id}.yaml" if path.is_file(): return path return repo_root / "data" / "competences" / minimum_state / f"{competence_id}.yaml" def _normalize_key(key: str) -> str: normalized = str(key or "").strip().casefold() return KEY_ALIASES.get(normalized, normalized) def _normalize_key_combo_sequence(keys: list[str]) -> list[str]: normalized = [_normalize_key(key) for key in keys] if set(normalized) in ({"shift", "ctrl", "@"}, {"shift", "ctrl", "\x13"}): return ["ctrl", "s"] return normalized def _is_string_list(value: Any) -> bool: return isinstance(value, list) and bool(value) and all(isinstance(item, str) and item for item in value) def _is_int_list(value: Any) -> bool: return isinstance(value, list) and bool(value) and all(isinstance(item, int) and item >= 0 for item in value) def _is_int_in_range(value: Any, minimum: int, maximum: int) -> bool: return isinstance(value, int) and not isinstance(value, bool) and minimum <= value <= maximum def _primitive_value_matches_type(value: Any, expected_type: Any) -> bool: if expected_type == "str": return isinstance(value, str) if expected_type == "int": return isinstance(value, int) and not isinstance(value, bool) if expected_type == "bool": return isinstance(value, bool) if expected_type == "list[str]": return _is_string_list(value) if expected_type == "dict": return isinstance(value, dict) if expected_type == "dict_or_string": return isinstance(value, dict) or isinstance(value, str) return True def _is_number_in_range(value: Any, minimum: float, maximum: float) -> bool: return isinstance(value, (int, float)) and not isinstance(value, bool) and minimum <= float(value) <= maximum def _distinct_context_signatures(contexts: list[Any]) -> set[tuple[Any, ...]]: dimensions = ("dpi", "screen", "app_in_focus", "method_used", "screen_signature") signatures: set[tuple[Any, ...]] = set() for context in contexts: if not isinstance(context, dict): continue signature = tuple(context.get(dimension) for dimension in dimensions) if any(value not in (None, "", [], {}) for value in signature): signatures.add(signature) return signatures def _dedupe_issues(issues: list[CompetenceValidationIssue]) -> tuple[CompetenceValidationIssue, ...]: seen: set[tuple[str, str]] = set() deduped: list[CompetenceValidationIssue] = [] for issue in issues: key = (issue.code, issue.detail) if key in seen: continue seen.add(key) deduped.append(issue) return tuple(deduped) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Validate Lea short competence YAML files") parser.add_argument("paths", nargs="+", help="YAML competence file(s) to validate") parser.add_argument("--json", action="store_true", help="emit JSON report") args = parser.parse_args(argv) reports = [validate_file(path) for path in args.paths] if args.json: print(json.dumps([report.to_dict() for report in reports], ensure_ascii=False, indent=2)) else: for report in reports: status = "ok" if report.valid else "fail" print(f"{status}: {report.path}") for issue in report.issues: print(f" - {issue.code}: {issue.detail}") return 0 if all(report.valid for report in reports) else 1 if __name__ == "__main__": raise SystemExit(main())