#!/usr/bin/env python3 """Competence candidate extractor for Lea sessions. By default this tool runs read-only: it loads one session, proposes observed candidates, validates temporary YAML files, and emits a report. Apply mode is guarded by an explicit allow-list and validates the full batch before writing. """ from __future__ import annotations import argparse import json import re import sys import tempfile from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any import yaml REPO_ROOT = Path(__file__).resolve().parents[1] if str(REPO_ROOT) not in sys.path: sys.path.insert(0, str(REPO_ROOT)) from tools.competence_validator import validate_competence_file SOURCE_FORMATS = {"streaming_session_json", "raw_live_events_jsonl"} ACTION_TYPES = {"key_combo", "mouse_click", "mouse_scroll", "text_input"} HUMAN_CONTINUATION_TYPES = {"key_combo", "mouse_click", "text_input"} MODIFIER_ONLY_KEYS = {"alt", "ctrl", "control", "shift", "win", "windows", "cmd", "command", "meta", "super"} APPLY_MIN_CONFIDENCE = 0.7 BLOCKING_APPLY_GAPS = { "marker_satisfied_by_human_continuation", "wait_state_inferred_from_action", "scroll_no_observable_marker", } DEFAULT_OUTPUT_DIR = Path("data") / "competences" / "observed" @dataclass(frozen=True) class LoadedSession: path: Path source_format: str session_id: str events: list[dict[str, Any]] @dataclass(frozen=True) class CandidateDraft: competence_id: str confidence: float segment: dict[str, list[int]] methods_execution: str primitive_refs: list[str] t2_gaps_detected: list[str] human_review_notes: list[str] yaml_path_would_be: str yaml_data: dict[str, Any] def build_report( *, session_path: str | Path, machine_id: str, source_format: str | None = None, output_dir: str | Path = DEFAULT_OUTPUT_DIR, max_candidates: int = 5, mode: str = "dry_run", allow_list: str | list[str] | tuple[str, ...] | None = None, repo_root: str | Path = REPO_ROOT, ) -> dict[str, Any]: root = Path(repo_root) if max_candidates < 1: raise ValueError("max_candidates must be >= 1") if max_candidates > 10: raise ValueError("max_candidates hard-cap is 10") if mode not in {"dry_run", "apply"}: raise ValueError("mode must be dry_run or apply") allow_ids = _parse_allow_list(allow_list) if mode == "apply" else [] loaded = load_session(session_path, source_format=source_format) candidates, rejected = propose_candidates( loaded, machine_id=machine_id, output_dir=Path(output_dir), max_candidates=max_candidates, repo_root=root, ) enriched_candidates = [ _candidate_report(candidate, root) for candidate in candidates ] selected: list[CandidateDraft] = [] applied: list[dict[str, str]] = [] if mode == "apply": selected = _select_allowed_candidates( candidates=candidates, candidate_reports=enriched_candidates, allow_ids=allow_ids, ) applied = _apply_candidates(selected, repo_root=root) report = { "run_id": f"extract_{datetime.now(timezone.utc).replace(microsecond=0).isoformat()}", "session": loaded.session_id, "session_path": _display_path(loaded.path, root), "source_format": loaded.source_format, "mode": mode, "candidates": enriched_candidates, "rejected": rejected, "summary": { "candidates_generated": len(enriched_candidates), "candidates_rejected": len(rejected), "would_write": len(selected), "written": len(applied), "apply_min_confidence": APPLY_MIN_CONFIDENCE, }, } if mode == "apply": report["allow_list"] = allow_ids report["applied"] = applied return report def load_session(path: str | Path, *, source_format: str | None = None) -> LoadedSession: session_path = Path(path) resolved_format = source_format or _detect_source_format(session_path) if resolved_format not in SOURCE_FORMATS: raise ValueError(f"unsupported source_format: {resolved_format}") if resolved_format == "raw_live_events_jsonl": return _load_raw_jsonl_session(session_path) return _load_streaming_json_session(session_path) def propose_candidates( loaded: LoadedSession, *, machine_id: str, output_dir: Path, max_candidates: int, repo_root: Path, ) -> tuple[list[CandidateDraft], list[dict[str, Any]]]: candidates: list[CandidateDraft] = [] rejected: list[dict[str, Any]] = [] used_action_indices: set[int] = set() for index, event in enumerate(loaded.events): if len(candidates) >= max_candidates: break event_type = _event_type(event) if event_type not in {"key_combo", "mouse_click", "mouse_scroll"}: continue if index in used_action_indices: continue action = _action_method(event, index) if action is None: rejected.append( { "reason": _reject_reason_for_action(event), "segment_indices": [index], "validator_codes": _reject_codes_for_action(event), } ) continue state_index = _find_durable_state_index(loaded.events, index + 1, index + 6) if state_index is None: rejected.append( { "reason": "no durable state event within 5 events after action", "segment_indices": [index], "validator_codes": [], } ) continue draft = _sequence_candidate( loaded=loaded, machine_id=machine_id, action_index=index, action_method=action, state_index=state_index, output_dir=output_dir, repo_root=repo_root, ) candidates.append(draft) used_action_indices.add(index) for start, indices in _text_input_groups(loaded.events): if len(candidates) >= max_candidates: break if any(index in used_action_indices for index in indices): continue state_index = _find_durable_state_index(loaded.events, indices[-1] + 1, indices[-1] + 6) if state_index is None: rejected.append( { "reason": "text_input burst has no durable post-input state event", "segment_indices": indices, "validator_codes": [], } ) continue candidates.append( _text_input_candidate( loaded=loaded, machine_id=machine_id, text_indices=indices, state_index=state_index, output_dir=output_dir, repo_root=repo_root, ) ) used_action_indices.update(indices) return candidates, rejected def _candidate_report(candidate: CandidateDraft, repo_root: Path) -> dict[str, Any]: validator_status, validator_codes = _validate_candidate_yaml(candidate.yaml_data, candidate.competence_id, repo_root) duplicate_of = _duplicate_competence_id(candidate, repo_root) apply_eligible = ( validator_status == "would_pass" and candidate.confidence >= APPLY_MIN_CONFIDENCE and duplicate_of is None and not (set(candidate.t2_gaps_detected) & BLOCKING_APPLY_GAPS) ) return { "competence_id": candidate.competence_id, "confidence": candidate.confidence, "apply_eligible": apply_eligible, "quality_flags": _quality_flags(candidate, validator_status, duplicate_of), "segment": candidate.segment, "methods_execution": candidate.methods_execution, "primitive_refs": candidate.primitive_refs, "t2_gaps_detected": candidate.t2_gaps_detected, "validator_status": validator_status, "validator_codes": validator_codes, "human_review_notes": candidate.human_review_notes, "yaml_path_would_be": candidate.yaml_path_would_be, "duplicate_existing": duplicate_of is not None, "duplicate_of": duplicate_of, } def _parse_allow_list(allow_list: str | list[str] | tuple[str, ...] | None) -> list[str]: if allow_list is None: raise ValueError("--allow-list is required when --apply is used") if isinstance(allow_list, str): ids = [item.strip() for item in allow_list.split(",")] else: ids = [str(item).strip() for item in allow_list] ids = [item for item in ids if item] if not ids: raise ValueError("--allow-list must contain at least one competence id") duplicates = sorted({item for item in ids if ids.count(item) > 1}) if duplicates: raise ValueError(f"--allow-list-duplicate-id: {','.join(duplicates)}") return ids def _select_allowed_candidates( *, candidates: list[CandidateDraft], candidate_reports: list[dict[str, Any]], allow_ids: list[str], ) -> list[CandidateDraft]: drafts_by_id: dict[str, list[CandidateDraft]] = {} reports_by_id: dict[str, list[dict[str, Any]]] = {} for draft, report in zip(candidates, candidate_reports, strict=True): drafts_by_id.setdefault(draft.competence_id, []).append(draft) reports_by_id.setdefault(str(report["competence_id"]), []).append(report) selected: list[CandidateDraft] = [] for competence_id in allow_ids: drafts = drafts_by_id.get(competence_id) reports = reports_by_id.get(competence_id) if not drafts or not reports: raise ValueError(f"--allow-list-id-not-found: {competence_id}") if len(drafts) > 1 or len(reports) > 1: raise ValueError(f"--allow-list-id-ambiguous: {competence_id}") report = reports[0] if not report["apply_eligible"]: raise ValueError(f"--allow-list-id-not-apply-eligible: {competence_id}") selected.append(drafts[0]) return selected def _apply_candidates(selected: list[CandidateDraft], *, repo_root: Path) -> list[dict[str, str]]: if not selected: return [] final_paths = [_candidate_output_path(candidate, repo_root) for candidate in selected] for final_path in final_paths: if final_path.exists(): raise ValueError(f"apply-output-file-exists: {_display_path(final_path, repo_root)}") with tempfile.TemporaryDirectory(prefix="lea_extract_apply_") as tmp_dir: staged: list[tuple[CandidateDraft, Path, Path]] = [] for candidate, final_path in zip(selected, final_paths, strict=True): staged_path = Path(tmp_dir) / final_path.name staged_path.write_text( yaml.safe_dump(candidate.yaml_data, sort_keys=False, allow_unicode=True), encoding="utf-8", ) staged.append((candidate, staged_path, final_path)) _validate_apply_yaml_files([staged_path for _, staged_path, _ in staged], repo_root=repo_root) written: list[Path] = [] try: for _, staged_path, final_path in staged: final_path.parent.mkdir(parents=True, exist_ok=True) if final_path.exists(): raise ValueError(f"apply-output-file-exists: {_display_path(final_path, repo_root)}") staged_path.replace(final_path) written.append(final_path) except Exception: for path in written: try: path.unlink() except FileNotFoundError: pass raise return [ { "competence_id": candidate.competence_id, "path": _display_path(final_path, repo_root), } for candidate, final_path in zip(selected, final_paths, strict=True) ] def _candidate_output_path(candidate: CandidateDraft, repo_root: Path) -> Path: path = Path(candidate.yaml_path_would_be) return path if path.is_absolute() else repo_root / path def _validate_apply_yaml_files(paths: list[Path], *, repo_root: Path) -> None: failures: list[str] = [] for path in paths: report = validate_competence_file(path, repo_root=repo_root) if report.valid: continue codes = ",".join(issue.code for issue in report.issues) failures.append(f"{path.name}:{codes}") if failures: raise ValueError(f"apply-validation-failed: {'; '.join(failures)}") def _sequence_candidate( *, loaded: LoadedSession, machine_id: str, action_index: int, action_method: dict[str, Any], state_index: int, output_dir: Path, repo_root: Path, ) -> CandidateDraft: action_event = loaded.events[action_index] state_event = loaded.events[state_index] title = _event_title(state_event) process = _event_process(state_event) action_ref = action_method["primitive_ref"] competence_id = _sequence_competence_id(action_event, state_event) gaps = _detected_sequence_gaps(action_event, state_event, loaded.events, action_index, state_index, action_ref) confidence = 0.9 if not gaps else 0.7 method_indices = [action_index, state_index] keep_indices = list(range(max(0, action_index - 2), state_index + 1)) yaml_data = _base_competence_yaml( competence_id=competence_id, name=_human_name(competence_id), intent=f"executer l'action observee puis attendre {title or process}", machine_id=machine_id, loaded=loaded, keep_indices=keep_indices, method_indices=method_indices, success_indices=[state_index], stop_before_index=state_index + 1, output_dir=output_dir, source_notes=[ f"Event #{action_index} detecte comme {action_ref}.", f"Event #{state_index} detecte comme wait_for_state durable.", ], ) yaml_data["methods_execution"] = "sequence" yaml_data["methods"] = [ { **action_method["method"], "id": f"step_1_{action_method['id_suffix']}", "observed": True, "trace_source": "live_events.jsonl" if loaded.source_format == "raw_live_events_jsonl" else "streaming_session.json", "trace_event_indices": [action_index], }, { "id": "step_2_wait_state", "kind": "wait_state", "primitive_ref": "wait_for_state", "parameters": { "expected_state": _expected_state(state_event), "timeout_ms": 5000, "poll_interval_ms": 250, "evidence_required": "window_or_process", }, "description": f"Attente de l'etat {title or process}", "observed": True, "trace_source": "live_events.jsonl" if loaded.source_format == "raw_live_events_jsonl" else "streaming_session.json", "trace_event_indices": [state_index], }, ] yaml_data["success_marker"] = _success_marker(state_event) yaml_data["failure_message_template"] = _failure_template( intention=f"atteindre la fenetre {title or process}", attendu=f"voir {title or process} au premier plan", demande=f"ouvrir {title or process} puis me rendre la main", ) yaml_data["promotion"]["t2_known_gaps"] = _gap_records(gaps) return CandidateDraft( competence_id=competence_id, confidence=confidence, segment={"keep": keep_indices, "method": method_indices, "success": [state_index]}, methods_execution="sequence", primitive_refs=[action_ref, "wait_for_state"], t2_gaps_detected=gaps, human_review_notes=_review_notes(action_ref, state_event), yaml_path_would_be=str(output_dir / f"{competence_id}.yaml"), yaml_data=yaml_data, ) def _text_input_candidate( *, loaded: LoadedSession, machine_id: str, text_indices: list[int], state_index: int, output_dir: Path, repo_root: Path, ) -> CandidateDraft: del repo_root text = "".join(str(loaded.events[index].get("text") or "") for index in text_indices) state_event = loaded.events[state_index] title = _event_title(state_event) process = _event_process(state_event) competence_id = _slug(f"saisir_texte_{process or title or loaded.session_id}")[:80].strip("_") keep_indices = list(range(text_indices[0], state_index + 1)) yaml_data = _base_competence_yaml( competence_id=competence_id, name=_human_name(competence_id), intent=f"saisir le texte observe dans {title or process}", machine_id=machine_id, loaded=loaded, keep_indices=keep_indices, method_indices=text_indices, success_indices=[state_index], stop_before_index=state_index + 1, output_dir=output_dir, source_notes=[f"Events {text_indices} detectes comme text_input_focused."], ) yaml_data["methods"] = [ { "id": "text_input_concat", "kind": "text_input", "primitive_ref": "text_input_focused", "parameters": {"text": text, "concat_rule": "concat_in_order"}, "description": f"Saisie texte observee dans {title or process}", "observed": True, "trace_source": "live_events.jsonl" if loaded.source_format == "raw_live_events_jsonl" else "streaming_session.json", "trace_event_indices": text_indices, "reconstructed_text": text, } ] yaml_data["success_marker"] = _success_marker(state_event) yaml_data["failure_message_template"] = _failure_template( intention=f"saisir du texte dans {title or process}", attendu=f"voir le texte saisi dans {title or process}", demande=f"saisir le texte attendu dans {title or process} puis me rendre la main", ) yaml_data["promotion"]["t2_known_gaps"] = _gap_records(["no_ocr_offline"]) return CandidateDraft( competence_id=competence_id, confidence=0.65, segment={"keep": keep_indices, "method": text_indices, "success": [state_index]}, methods_execution="alternatives", primitive_refs=["text_input_focused"], t2_gaps_detected=["no_ocr_offline"], human_review_notes=["Verifier que le texte reconstruit est bien le contenu attendu."], yaml_path_would_be=str(output_dir / f"{competence_id}.yaml"), yaml_data=yaml_data, ) def _base_competence_yaml( *, competence_id: str, name: str, intent: str, machine_id: str, loaded: LoadedSession, keep_indices: list[int], method_indices: list[int], success_indices: list[int], stop_before_index: int, output_dir: Path, source_notes: list[str], ) -> dict[str, Any]: del output_dir chain_refs: dict[str, Any] = { "source_session": loaded.session_id, "machine_id": machine_id, "cleaned_segment": { "status": "documented_offline", "source_event_format": loaded.source_format, "keep_event_indices": keep_indices, "method_event_indices": method_indices, "success_event_indices": success_indices, "excluded_event_indices": [], "stop_before_event_index": stop_before_index, "stop_before": ["end_of_extracted_candidate_segment"], "ignored_after_success": [], "notes": source_notes, }, "workflow_pipeline_id": None, "graph_node_id": None, "faiss_state_signatures": [], "target_memory_keys": [], "dashboard_knowledge_visible": False, } if loaded.source_format == "raw_live_events_jsonl": chain_refs["live_events_path"] = _display_path(loaded.path, REPO_ROOT) else: chain_refs["streaming_session_path"] = _display_path(loaded.path, REPO_ROOT) timestamp = datetime.now(timezone.utc).replace(microsecond=0).isoformat() return { "schema_version": 1, "id": competence_id, "name": name, "version": 1, "learning_state": "observed", "intent": {"fr": intent}, "parameters": {}, "preconditions": [ { "id": "source_session_available", "kind": "source_trace_present", "source_session": loaded.session_id, } ], "methods": [], "success_marker": {}, "failure_message_template": {}, "chain_refs": chain_refs, "promotion": { "candidate_requires": [ "cleaned_segment_validated", "method_trace_present", "success_marker_defined", "failure_message_template_valid", "primitive_ref_satisfied", ], "supervised_requires": ["replay_verified_once", "human_validation"], "stable_requires": {"min_successes": 3, "distinct_contexts": 3, "max_unexplained_failures": 0}, "t2_known_gaps": [], }, "generalisation": {"seen_contexts": [], "method_success_rate": {}, "variance_log": []}, "failure_log": [], "created_at": timestamp, "last_updated_at": timestamp, } def _action_method(event: dict[str, Any], index: int) -> dict[str, Any] | None: event_type = _event_type(event) if event_type == "key_combo": keys = _normalize_shortcut_keys(event.get("keys")) if not _is_usable_keys(keys): return None return { "primitive_ref": "key_combo", "id_suffix": "key_combo", "method": { "kind": "key_combo", "primitive_ref": "key_combo", "parameters": {"keys": keys}, "keys": keys, "description": f"Raccourci clavier observe a l'event #{index}", }, } if event_type == "mouse_click": anchor_ref = _click_anchor_ref(event) if anchor_ref is None: return None return { "primitive_ref": "click_anchor", "id_suffix": "click_anchor", "method": { "kind": "click", "primitive_ref": "click_anchor", "parameters": {"anchor_ref": anchor_ref, "button": str(event.get("button") or "left"), "click_count": 1}, "description": f"Clic observe a l'event #{index}", }, } if event_type == "mouse_scroll": delta = event.get("delta") if not _is_scroll_delta(delta): return None return { "primitive_ref": "scroll_view", "id_suffix": "scroll_view", "method": { "kind": "scroll", "primitive_ref": "scroll_view", "parameters": {"direction": _scroll_direction(delta), "amount": 3, "unit": "lines"}, "description": f"Scroll observe a l'event #{index}", }, } return None def _detected_sequence_gaps( action_event: dict[str, Any], state_event: dict[str, Any], events: list[dict[str, Any]], action_index: int, state_index: int, primitive_ref: str, ) -> list[str]: gaps: list[str] = [] if primitive_ref == "click_anchor": gaps.append("click_target_semantics_not_observed_offline") gaps.append("no_ocr_offline") if primitive_ref == "scroll_view": gaps.append("scroll_no_observable_marker") if _event_type(state_event) != "window_focus_change": gaps.append("wait_state_inferred_from_action") if any(_event_type(events[index]) in HUMAN_CONTINUATION_TYPES for index in range(action_index + 1, state_index)): gaps.append("marker_satisfied_by_human_continuation") return _dedupe_text(gaps) def _gap_records(gap_ids: list[str]) -> list[dict[str, str]]: descriptions = { "no_ocr_offline": ( "Aucune preuve OCR offline n'est produite par l'extracteur.", "La revue supervisee doit confirmer le libelle visible si le replay en depend.", "Verifier par OCR ou replay supervise avant promotion supervised.", ), "marker_satisfied_by_human_continuation": ( "Une action humaine existe entre la methode et l'etat de succes detecte.", "L'effet peut dependre de cette continuation humaine et pas seulement de la methode extraite.", "Ajouter wait_state sur un event durable plus proche ou scinder la competence.", ), "click_target_semantics_not_observed_offline": ( "Le clic dispose d'une ancre UIA dans la trace, mais la resolution runtime n'est pas rejouee offline.", "La revue supervisee doit confirmer que click_anchor retrouve la meme cible sans coordonnees source.", "Ajouter replay supervise ou resolution UIA/OCR runtime avant promotion supervised.", ), "scroll_no_observable_marker": ( "Le scroll observe ne prouve pas a lui seul le changement de contenu attendu.", "La competence doit etre revue avec un marqueur visible ou un etat durable post-scroll.", "Ajouter un marqueur UI/OCR ou un wait_state plus precis avant promotion.", ), "wait_state_inferred_from_action": ( "L'etat attendu est infere sans window_focus_change explicite.", "La preuve d'etat est moins robuste qu'un changement de focus durable.", "Preferer un window_focus_change ou confirmer par replay supervise.", ), } records: list[dict[str, str]] = [] acted_at = datetime.now(timezone.utc).replace(microsecond=0).isoformat() for gap_id in gap_ids: description, impact, resolution = descriptions.get( gap_id, ( f"Gap T2 detecte automatiquement: {gap_id}.", "Revue humaine requise avant promotion.", "Qualifier le gap et ajouter une preuve supervisee.", ), ) records.append( { "id": gap_id, "description": description, "impact": impact, "proposed_resolution": resolution, "acted_by": "extract_competences_from_session.py", "acted_at": acted_at, } ) return records def _validate_candidate_yaml(yaml_data: dict[str, Any], competence_id: str, repo_root: Path) -> tuple[str, list[str]]: with tempfile.TemporaryDirectory(prefix="lea_extract_") as tmp_dir: path = Path(tmp_dir) / f"{competence_id}.yaml" path.write_text(yaml.safe_dump(yaml_data, sort_keys=False, allow_unicode=True), encoding="utf-8") report = validate_competence_file(path, repo_root=repo_root) if report.valid: return "would_pass", [] return "would_fail", [issue.code for issue in report.issues] def _load_raw_jsonl_session(path: Path) -> LoadedSession: events: list[dict[str, Any]] = [] session_id = "" for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1): if not line.strip(): continue payload = json.loads(line) if not isinstance(payload, dict): raise ValueError(f"jsonl line {line_number} must be a mapping") if not session_id and isinstance(payload.get("session_id"), str): session_id = payload["session_id"] events.append(_normalize_event(payload)) return LoadedSession(path=path, source_format="raw_live_events_jsonl", session_id=session_id or path.parent.name, events=events) def _load_streaming_json_session(path: Path) -> LoadedSession: payload = json.loads(path.read_text(encoding="utf-8")) if not isinstance(payload, dict): raise ValueError("streaming session must be a mapping") raw_events = payload.get("events") if not isinstance(raw_events, list): raise ValueError("streaming session events must be a list") events = [_normalize_event(event) for event in raw_events if isinstance(event, dict)] session_id = str(payload.get("session_id") or path.stem) return LoadedSession(path=path, source_format="streaming_session_json", session_id=session_id, events=events) def _normalize_event(raw: dict[str, Any]) -> dict[str, Any]: nested = raw.get("event") if isinstance(nested, dict) and isinstance(nested.get("type"), str): event = dict(nested) for key in ("session_id", "timestamp", "machine_id"): if key not in event and key in raw: event[key] = raw[key] return event return dict(raw) def _detect_source_format(path: Path) -> str: if path.suffix == ".jsonl": return "raw_live_events_jsonl" return "streaming_session_json" def _find_durable_state_index(events: list[dict[str, Any]], start: int, stop: int) -> int | None: for index in range(start, min(stop, len(events))): event = events[index] event_type = _event_type(event) if event_type == "window_focus_change" and (_event_title(event) or _event_process(event)): return index if event_type == "heartbeat" and (_event_title(event) or _event_process(event)): return index return None def _text_input_groups(events: list[dict[str, Any]]) -> list[tuple[int, list[int]]]: groups: list[tuple[int, list[int]]] = [] current: list[int] = [] for index, event in enumerate(events): if _event_type(event) == "text_input" and str(event.get("text") or ""): current.append(index) continue if current: groups.append((current[0], current)) current = [] if current: groups.append((current[0], current)) return groups def _success_marker(event: dict[str, Any]) -> dict[str, Any]: markers: list[dict[str, Any]] = [] title = _event_title(event) process = _event_process(event) if title: markers.append({"kind": "active_window_title_in", "values": [title]}) if process: markers.append({"kind": "active_process_name_is", "value": process}) return { "mode": "all_of", "timeout_ms": 5000, "markers": markers or [{"kind": "active_window_title_in", "values": ["unknown_window"]}], "supervised_requires": [ { "kind": "human_validation", "required_for": "replay_verified", } ], } def _expected_state(event: dict[str, Any]) -> dict[str, Any]: state: dict[str, Any] = {} title = _event_title(event) process = _event_process(event) if title: state["window_title_in"] = [title] if process: state["process_active"] = process return state or {"window_title_contains": "unknown_window"} def _failure_template(*, intention: str, attendu: str, demande: str) -> dict[str, str]: return { "intention": intention, "attendu": attendu, "vu": "{observed_human_state}", "demande": demande, } def _review_notes(primitive_ref: str, state_event: dict[str, Any]) -> list[str]: notes = ["Verifier que le segment ne melange pas deux intentions utilisateur."] if primitive_ref == "click_anchor": notes.append("Verifier que anchor_ref multi-critere suffit au runtime.") if _event_type(state_event) == "heartbeat": notes.append("Heartbeat accepte comme preuve partielle; preferer window_focus_change si disponible.") return notes def _click_anchor_ref(event: dict[str, Any]) -> dict[str, Any] | None: snapshot = event.get("uia_snapshot") if not isinstance(snapshot, dict): return None if _fragile_anchor_code(event): return None anchor: dict[str, Any] = {} if isinstance(snapshot.get("name"), str) and snapshot["name"].strip(): anchor["text"] = snapshot["name"] if isinstance(snapshot.get("control_type"), str) and snapshot["control_type"].strip(): anchor["role"] = snapshot["control_type"] if isinstance(snapshot.get("automation_id"), str) and snapshot["automation_id"].strip(): anchor["automation_id"] = snapshot["automation_id"] parent_hint = _parent_hint(snapshot) if parent_hint: anchor["parent_hint"] = parent_hint return anchor or None def _parent_hint(snapshot: dict[str, Any]) -> str: parent_path = snapshot.get("parent_path") if not isinstance(parent_path, list): return "" for item in reversed(parent_path): if isinstance(item, dict) and isinstance(item.get("name"), str) and item["name"].strip(): return item["name"] return "" def _sequence_competence_id(action_event: dict[str, Any], state_event: dict[str, Any]) -> str: action_type = _event_type(action_event) process = _event_process(state_event) title = _event_title(state_event) if action_type == "mouse_click": snapshot = action_event.get("uia_snapshot") if isinstance(action_event.get("uia_snapshot"), dict) else {} automation_id = str(snapshot.get("automation_id") or "") anchor = str(snapshot.get("name") or automation_id or "anchor") if automation_id and not _weak_automation_id(automation_id): anchor = automation_id return _slug(f"click_{anchor}_wait_{process or title}")[:80].strip("_") if action_type == "key_combo": keys = "_".join(str(key) for key in _normalize_shortcut_keys(action_event.get("keys"))) return _slug(f"key_{keys}_wait_{process or title}")[:80].strip("_") if action_type == "mouse_scroll": return _slug(f"scroll_wait_{process or title}")[:80].strip("_") return _slug(f"candidate_{process or title}")[:80].strip("_") def _reject_reason_for_action(event: dict[str, Any]) -> str: event_type = _event_type(event) if event_type == "mouse_click": if not isinstance(event.get("uia_snapshot"), dict): return "click without uia_snapshot anchor" code = _fragile_anchor_code(event) if code == "anchor_ref_systray_fragile": return "click on fragile system tray anchor" if code == "anchor_ref_dom_autogenerated": return "click on autogenerated DOM anchor" if code == "anchor_ref_unknown_window": return "click in unknown or overflow window" if code == "anchor_ref_browser_contextual": return "click on contextual browser chrome anchor" if code == "anchor_ref_contextual_button": return "click on contextual UI chrome button" if code == "anchor_ref_too_generic": return "click with too generic anchor" return "click with weak uia_snapshot anchor" if event_type == "mouse_scroll": return "mouse_scroll without usable delta" if event_type == "key_combo": return "key_combo empty or modifier-only" return f"unsupported action event: {event_type}" def _reject_codes_for_action(event: dict[str, Any]) -> list[str]: event_type = _event_type(event) if event_type == "mouse_click": if not isinstance(event.get("uia_snapshot"), dict): return ["anchor_ref_uia_missing"] code = _fragile_anchor_code(event) if code: return [code] return ["anchor_ref_weak"] if event_type == "mouse_scroll": return ["scroll_delta_missing"] if event_type == "key_combo": return ["key_combo_invalid"] return [] def _event_type(event: dict[str, Any]) -> str: return str(event.get("type") or "") def _event_title(event: dict[str, Any]) -> str: window = event.get("window") if isinstance(event.get("window"), dict) else {} to_window = event.get("to") if isinstance(event.get("to"), dict) else {} return str(window.get("title") or event.get("active_window_title") or to_window.get("title") or "") def _event_process(event: dict[str, Any]) -> str: window = event.get("window") if isinstance(event.get("window"), dict) else {} to_window = event.get("to") if isinstance(event.get("to"), dict) else {} return str(window.get("app_name") or to_window.get("app_name") or "") def _is_usable_keys(keys: Any) -> bool: if not isinstance(keys, list) or not keys: return False normalized = {str(key).strip().casefold() for key in keys if str(key).strip()} return bool(normalized) and not normalized.issubset(MODIFIER_ONLY_KEYS) def _normalize_shortcut_keys(keys: Any) -> list[str]: if not isinstance(keys, list): return [] normalized = [str(key).strip().casefold() for key in keys if str(key).strip()] if set(normalized) in ({"shift", "ctrl", "@"}, {"shift", "ctrl", "\x13"}): return ["ctrl", "s"] return normalized def _is_scroll_delta(value: Any) -> bool: return ( isinstance(value, list) and len(value) >= 2 and isinstance(value[0], int) and isinstance(value[1], int) and not isinstance(value[0], bool) and not isinstance(value[1], bool) ) def _scroll_direction(delta: list[int]) -> str: if abs(delta[0]) > abs(delta[1]): return "right" if delta[0] > 0 else "left" return "up" if delta[1] > 0 else "down" def _duplicate_competence_id(candidate: CandidateDraft, repo_root: Path) -> str | None: exact = [ repo_root / "data" / "competences" / state / f"{candidate.competence_id}.yaml" for state in ("observed", "candidate", "supervised", "stable") ] for path in exact: if path.is_file(): return candidate.competence_id candidate_chain = candidate.yaml_data.get("chain_refs") if isinstance(candidate.yaml_data.get("chain_refs"), dict) else {} candidate_cleaned = candidate_chain.get("cleaned_segment") if isinstance(candidate_chain.get("cleaned_segment"), dict) else {} source_session = candidate_chain.get("source_session") method_indices = candidate_cleaned.get("method_event_indices") success_indices = candidate_cleaned.get("success_event_indices") source_format = candidate_cleaned.get("source_event_format") for path in (repo_root / "data" / "competences").glob("*/*.yaml"): data = _read_yaml_mapping(path) if data is None: continue chain = data.get("chain_refs") if isinstance(data.get("chain_refs"), dict) else {} cleaned = chain.get("cleaned_segment") if isinstance(chain.get("cleaned_segment"), dict) else {} if ( chain.get("source_session") == source_session and cleaned.get("method_event_indices") == method_indices and cleaned.get("success_event_indices") == success_indices and cleaned.get("source_event_format", "streaming_session_json") == source_format ): return str(data.get("id") or path.stem) return None def _quality_flags(candidate: CandidateDraft, validator_status: str, duplicate_of: str | None) -> list[str]: flags: list[str] = [] if candidate.confidence < APPLY_MIN_CONFIDENCE: flags.append("below_apply_confidence_threshold") for gap in candidate.t2_gaps_detected: if gap in BLOCKING_APPLY_GAPS: flags.append(f"blocking_gap:{gap}") if duplicate_of is not None: flags.append("duplicate_existing_competence") if validator_status != "would_pass": flags.append("validator_would_fail") return flags def _read_yaml_mapping(path: Path) -> dict[str, Any] | None: try: data = yaml.safe_load(path.read_text(encoding="utf-8")) except (OSError, yaml.YAMLError): return None return data if isinstance(data, dict) else None def _fragile_anchor_code(event: dict[str, Any]) -> str | None: snapshot = event.get("uia_snapshot") if not isinstance(snapshot, dict): return "anchor_ref_uia_missing" name = str(snapshot.get("name") or "").strip() automation_id = str(snapshot.get("automation_id") or "").strip() control_type = str(snapshot.get("control_type") or "").strip() window_title = _event_title(event) combined = " ".join([name, automation_id, control_type, window_title]).casefold() if _is_systray_anchor(combined): return "anchor_ref_systray_fragile" if _is_autogenerated_dom_id(automation_id): return "anchor_ref_dom_autogenerated" if _is_unknown_or_overflow_window(window_title): return "anchor_ref_unknown_window" if _is_browser_contextual_anchor(event, name, automation_id, control_type): return "anchor_ref_browser_contextual" if _is_contextual_button_anchor(event, name, automation_id, control_type): return "anchor_ref_contextual_button" if _too_generic_anchor(name, automation_id, control_type): return "anchor_ref_too_generic" return None def _is_systray_anchor(value: str) -> bool: patterns = ( r"system\s*tray", r"systemtray", r"notification.*area", r"zone.*notification", r"taskbar.*overflow", r"tray[_\s-]*icon", r"systray", ) return any(re.search(pattern, value, re.IGNORECASE) for pattern in patterns) def _is_autogenerated_dom_id(value: str) -> bool: stripped = value.strip() if not stripped: return False return bool( re.fullmatch(r"[a-z_]+_[a-z0-9]{10,}_\d+", stripped, re.IGNORECASE) or re.fullmatch(r"so_[a-z0-9]{10,}.*", stripped, re.IGNORECASE) ) def _is_unknown_or_overflow_window(title: str) -> bool: normalized = title.strip().casefold() return ( normalized.startswith("unknown_window") or "fenetre de depassement" in normalized or "fenêtre de dépassement" in normalized or "overflow" in normalized ) def _is_browser_contextual_anchor(event: dict[str, Any], name: str, automation_id: str, control_type: str) -> bool: process = _event_process(event).casefold() title = _event_title(event).casefold() if not ( process in {"chrome.exe", "msedge.exe", "firefox.exe", "brave.exe"} or "google chrome" in title or "microsoft edge" in title or "firefox" in title ): return False snapshot = event.get("uia_snapshot") if isinstance(event.get("uia_snapshot"), dict) else {} class_name = str(snapshot.get("class_name") or "").casefold() parent_path = snapshot.get("parent_path") if isinstance(snapshot.get("parent_path"), list) else [] parent_controls = " ".join( str(item.get("control_type") or "") for item in parent_path if isinstance(item, dict) ).casefold() anchor_text = " ".join([name, automation_id, control_type, class_name, parent_controls]).casefold() return bool( "tabstrip" in class_name or "tabulation" in parent_controls or re.search(r"\b(?:nouvel onglet|new tab)\b", anchor_text, re.IGNORECASE) ) def _is_contextual_button_anchor(event: dict[str, Any], name: str, automation_id: str, control_type: str) -> bool: normalized_role = control_type.strip().casefold() if normalized_role not in {"button", "bouton"}: return False snapshot = event.get("uia_snapshot") if isinstance(event.get("uia_snapshot"), dict) else {} class_name = str(snapshot.get("class_name") or "").casefold() parent_path = snapshot.get("parent_path") if isinstance(snapshot.get("parent_path"), list) else [] parent_controls = " ".join( str(item.get("control_type") or "") for item in parent_path if isinstance(item, dict) ).casefold() anchor_text = " ".join([name, automation_id, class_name, parent_controls]).casefold() has_add_button_identity = bool( automation_id.strip().casefold() == "addbutton" or re.search(r"\b(?:add button|bouton ajouter)\b", anchor_text, re.IGNORECASE) or re.search(r"\b(?:ajouter|add)\s+(?:un\s+)?(?:nouvel\s+)?(?:onglet|tab)\b", anchor_text, re.IGNORECASE) ) if not has_add_button_identity: return False return bool( "onglet" in parent_controls or "tabulation" in parent_controls or re.search(r"\b(?:tab|tabitem|tab\s*control)\b", parent_controls, re.IGNORECASE) or "tabstrip" in class_name or re.search(r"\b(?:nouvel onglet|new tab)\b", anchor_text, re.IGNORECASE) ) def _too_generic_anchor(name: str, automation_id: str, control_type: str) -> bool: if not name and not automation_id and not control_type: return True generic_controls = { "groupe", "group", "volet", "pane", "window", "fenetre", "fenêtre", "region", "région", "area", "image", "graphic", "element graphique", "élément graphique", "static", } if control_type.strip().casefold() in generic_controls and (not automation_id or _weak_automation_id(automation_id)): return True generic_names = { "button", "bouton", "element", "élément", "icon", "icone", "icône", "group", "groupe", } normalized_name = name.strip().casefold() normalized_id = automation_id.strip().casefold() if normalized_name in generic_names and (not automation_id or _weak_automation_id(automation_id)): return True return bool( re.fullmatch(r"(?:icon|icone|icône|button|bouton|element|élément)_?\d+", normalized_name, re.IGNORECASE) or re.fullmatch(r"(?:icon|button|element)_?\d+", normalized_id, re.IGNORECASE) ) def _weak_uia_anchor(snapshot: dict[str, Any]) -> bool: name = str(snapshot.get("name") or "").strip() automation_id = str(snapshot.get("automation_id") or "").strip() control_type = str(snapshot.get("control_type") or "").strip().casefold() if not name and not automation_id: return True generic_controls = {"groupe", "group", "volet", "pane", "window", "fenetre", "fenêtre"} if control_type in generic_controls and (not automation_id or _weak_automation_id(automation_id)): return True return False def _weak_automation_id(value: str) -> bool: stripped = value.strip() return not stripped or stripped == "0" or stripped.isdigit() def _dedupe_text(values: list[str]) -> list[str]: seen: set[str] = set() result: list[str] = [] for value in values: if value in seen: continue seen.add(value) result.append(value) return result def _human_name(competence_id: str) -> str: return competence_id.replace("_", " ").capitalize() def _slug(value: str) -> str: slug = re.sub(r"[^a-zA-Z0-9]+", "_", value.casefold()).strip("_") if not slug or not slug[0].isalpha(): slug = f"candidate_{slug}" return slug def _display_path(path: Path, repo_root: Path) -> str: try: return str(path.resolve().relative_to(repo_root.resolve())) except (OSError, ValueError): return str(path) def render_markdown_report(report: dict[str, Any]) -> str: lines = [ f"# Extraction report {report['run_id']}", "", f"- session: `{report['session']}`", f"- source_format: `{report['source_format']}`", f"- mode: `{report['mode']}`", f"- candidates: {report['summary']['candidates_generated']}", f"- rejected: {report['summary']['candidates_rejected']}", "", ] for candidate in report["candidates"]: lines.extend( [ f"## {candidate['competence_id']}", "", f"- validator_status: `{candidate['validator_status']}`", f"- primitive_refs: {', '.join(candidate['primitive_refs'])}", f"- segment: `{candidate['segment']}`", f"- t2_gaps: {', '.join(candidate['t2_gaps_detected']) or 'none'}", "", ] ) return "\n".join(lines) def main(argv: list[str] | None = None) -> int: parser = argparse.ArgumentParser(description="Lea competence extraction from one session") parser.add_argument("--session", required=True, help="Path to streaming JSON or raw live_events.jsonl") parser.add_argument("--source-format", choices=sorted(SOURCE_FORMATS), default=None) parser.add_argument("--machine-id", required=True) parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR)) parser.add_argument("--max-candidates", type=int, default=5) mode = parser.add_mutually_exclusive_group() mode.add_argument("--dry-run", action="store_true", help="Emit report without writing competences") mode.add_argument("--apply", action="store_true", help="Write allowed observed competences") parser.add_argument("--allow-list", default=None, help="Comma-separated competence ids allowed for --apply") parser.add_argument("--report-format", choices=("json", "markdown"), default="json") parser.add_argument("--report-path", default=None) args = parser.parse_args(argv) try: report = build_report( session_path=args.session, source_format=args.source_format, machine_id=args.machine_id, output_dir=args.output_dir, max_candidates=args.max_candidates, mode="apply" if args.apply else "dry_run", allow_list=args.allow_list, ) except (OSError, ValueError, json.JSONDecodeError) as exc: print(f"extract_competences_from_session: {exc}", file=sys.stderr) return 2 if args.report_format == "markdown": output = render_markdown_report(report) else: output = json.dumps(report, ensure_ascii=False, indent=2) if args.report_path: Path(args.report_path).write_text(output + "\n", encoding="utf-8") else: print(output) return 0 if __name__ == "__main__": raise SystemExit(main())