From a49f59b4d66cad8f70cad0ec48f04943b26918d2 Mon Sep 17 00:00:00 2001 From: Dom Date: Fri, 29 May 2026 11:38:12 +0200 Subject: [PATCH] feat(competences): plan supervised replay tests --- core/competences/__init__.py | 9 +- core/competences/catalog.py | 16 +- core/competences/replay.py | 165 ++++++++++++++++++ tests/unit/test_competence_catalog_loader.py | 58 ++++++ .../backend/catalog_routes_v2_vlm.py | 118 +++++++++++++ 5 files changed, 364 insertions(+), 2 deletions(-) create mode 100644 core/competences/replay.py diff --git a/core/competences/__init__.py b/core/competences/__init__.py index d926390de..dd9c87d51 100644 --- a/core/competences/__init__.py +++ b/core/competences/__init__.py @@ -5,10 +5,17 @@ from .catalog import ( load_competence_catalog_actions, load_competences, ) +from .replay import ( + build_competence_replay_actions, + build_competence_replay_payload, + find_competence, +) __all__ = [ "CompetenceSummary", + "build_competence_replay_actions", + "build_competence_replay_payload", + "find_competence", "load_competence_catalog_actions", "load_competences", ] - diff --git a/core/competences/catalog.py b/core/competences/catalog.py index 9a7d68ba8..313227f1e 100644 --- a/core/competences/catalog.py +++ b/core/competences/catalog.py @@ -155,6 +155,20 @@ def competence_to_catalog_action(summary: CompetenceSummary) -> dict[str, Any]: "default": True, "description": "Exécuter en mode supervisé humain", }, + "start_replay": { + "type": "boolean", + "required": False, + "default": False, + "description": "Injecter immédiatement le replay dans le streaming server", + }, + }, + "test_action": { + "type": "test_competence", + "parameters": { + "competence_id": summary.id, + "supervised": True, + "start_replay": False, + }, }, "methods": list(summary.methods), "success_marker": summary.success_marker, @@ -167,6 +181,7 @@ def competence_to_catalog_action(summary: CompetenceSummary) -> dict[str, Any]: "parameters": { "competence_id": summary.id, "supervised": True, + "start_replay": False, }, } ], @@ -198,4 +213,3 @@ def _method_summaries(methods: Any) -> list[dict[str, Any]]: } ) return summaries - diff --git a/core/competences/replay.py b/core/competences/replay.py new file mode 100644 index 000000000..c4c4ebf97 --- /dev/null +++ b/core/competences/replay.py @@ -0,0 +1,165 @@ +"""Convert persisted competence YAML files into supervised replay actions.""" + +from __future__ import annotations + +from pathlib import Path +from typing import Any, Iterable + +from .catalog import DEFAULT_COMPETENCE_ROOT, CompetenceSummary, load_competences + + +def find_competence( + competence_id: str, + *, + root: Path | str = DEFAULT_COMPETENCE_ROOT, + states: Iterable[str] | None = None, +) -> CompetenceSummary: + """Find one competence by id across persisted YAML states.""" + + for competence in load_competences(root=root, states=states): + if competence.id == competence_id: + return competence + raise KeyError(f"Competence '{competence_id}' not found") + + +def build_competence_replay_actions( + competence_id: str, + *, + root: Path | str = DEFAULT_COMPETENCE_ROOT, + supervised: bool = True, +) -> list[dict[str, Any]]: + """Build Agent V1 raw replay actions for a competence. + + Candidate competences are intentionally wrapped with human pauses. This + makes the first runtime pass an explicit supervised test instead of an + autonomous assertion that the competence is already stable. + """ + + competence = find_competence(competence_id, root=root) + actions: list[dict[str, Any]] = [] + + if supervised: + actions.append(_pause_action(competence, phase="before")) + + for index, method in enumerate(competence.methods, start=1): + action = _method_to_replay_action(competence, method, index) + if action: + actions.append(action) + + if supervised: + actions.append(_pause_action(competence, phase="after")) + + return actions + + +def build_competence_replay_payload( + competence_id: str, + *, + root: Path | str = DEFAULT_COMPETENCE_ROOT, + supervised: bool = True, + machine_id: str | None = None, + session_id: str | None = None, +) -> dict[str, Any]: + """Build the payload expected by `/api/v1/traces/stream/replay/raw`.""" + + competence = find_competence(competence_id, root=root) + actions = build_competence_replay_actions(competence_id, root=root, supervised=supervised) + payload: dict[str, Any] = { + "actions": actions, + "task_description": f"Test compétence Léa: {competence.intent_fr}", + "params": { + "execution_mode": "supervised" if supervised else "autonomous", + "competence_id": competence.id, + "learning_state": competence.learning_state, + }, + } + if machine_id: + payload["machine_id"] = machine_id + if session_id: + payload["session_id"] = session_id + return payload + + +def _method_to_replay_action( + competence: CompetenceSummary, + method: dict[str, Any], + index: int, +) -> dict[str, Any] | None: + kind = method.get("kind") + params = method.get("parameters") if isinstance(method.get("parameters"), dict) else {} + action_id = f"competence_{competence.id}_{index}_{kind or 'step'}" + + if kind == "key_combo": + keys = params.get("keys") + if not isinstance(keys, list) or not keys: + return None + return { + "action_id": action_id, + "type": "key_combo", + "keys": [str(key) for key in keys], + "intention": competence.intent_fr, + "competence_id": competence.id, + "source_method_id": method.get("id"), + } + + if kind == "wait_state": + expected = params.get("expected_state") if isinstance(params.get("expected_state"), dict) else {} + titles = expected.get("window_title_in") if isinstance(expected.get("window_title_in"), list) else [] + timeout_ms = params.get("timeout_ms") if isinstance(params.get("timeout_ms"), int) else 5000 + if titles: + return { + "action_id": action_id, + "type": "verify_screen", + "expected_node": f"competence:{competence.id}:wait_state", + "expected_window_title_contains": [str(title) for title in titles], + "timeout_ms": timeout_ms, + "intention": competence.intent_fr, + "competence_id": competence.id, + "source_method_id": method.get("id"), + "expected_state": expected, + } + return { + "action_id": action_id, + "type": "wait", + "duration_ms": min(timeout_ms, 5000), + "intention": competence.intent_fr, + "competence_id": competence.id, + "source_method_id": method.get("id"), + } + + return None + + +def _pause_action(competence: CompetenceSummary, *, phase: str) -> dict[str, Any]: + failure = competence.failure_message_template + gaps = ", ".join(str(gap.get("id")) for gap in competence.t2_known_gaps if gap.get("id")) + + if phase == "before": + message = ( + f"Prépare le test supervisé de la compétence '{competence.id}'. " + f"Intention: {competence.intent_fr}. " + f"Attendu: {failure.get('attendu', 'état attendu non renseigné')}." + ) + if gaps: + message += f" Points à surveiller: {gaps}." + else: + message = ( + f"Valide le résultat de la compétence '{competence.id}'. " + f"Intention: {failure.get('intention', competence.intent_fr)}. " + f"Attendu: {failure.get('attendu', 'état attendu non renseigné')}. " + "Indique si Léa peut enregistrer ce test comme succès supervisé ou si une correction est nécessaire." + ) + + return { + "action_id": f"competence_{competence.id}_pause_{phase}", + "type": "pause_for_human", + "competence_id": competence.id, + "parameters": { + "message": message, + "intention": failure.get("intention", competence.intent_fr), + "attendu": failure.get("attendu", ""), + "demande": failure.get("demande", ""), + "phase": phase, + }, + } + diff --git a/tests/unit/test_competence_catalog_loader.py b/tests/unit/test_competence_catalog_loader.py index d76816e21..097a81a4b 100644 --- a/tests/unit/test_competence_catalog_loader.py +++ b/tests/unit/test_competence_catalog_loader.py @@ -1,4 +1,7 @@ +from flask import Flask + from core.competences.catalog import load_competence_catalog_actions, load_competences +from core.competences.replay import build_competence_replay_actions, build_competence_replay_payload def test_load_candidate_competences_from_yaml_catalog(): @@ -23,6 +26,8 @@ def test_competence_catalog_actions_include_runtime_gap_metadata(): assert alt_f4["source"] == "competence_yaml" assert "fermer la fenêtre Bloc-notes" in alt_f4["name"] assert alt_f4["parameters"]["supervised"]["default"] is True + assert alt_f4["parameters"]["start_replay"]["default"] is False + assert alt_f4["test_action"]["type"] == "test_competence" assert alt_f4["t2_known_gaps"][0]["id"] == "alt_f4_confirmation_dialog_not_covered" @@ -32,3 +37,56 @@ def test_competence_catalog_actions_are_deterministic(): assert [action["id"] for action in first] == [action["id"] for action in second] + +def test_build_competence_supervised_replay_actions(): + actions = build_competence_replay_actions("key_win_r_wait_explorer_exe") + + assert [action["type"] for action in actions] == [ + "pause_for_human", + "key_combo", + "verify_screen", + "pause_for_human", + ] + assert actions[1]["keys"] == ["win", "r"] + assert actions[2]["expected_window_title_contains"] == ["Exécuter"] + assert actions[2]["expected_state"]["process_active"] == "explorer.exe" + + +def test_build_competence_raw_replay_payload_is_supervised(): + payload = build_competence_replay_payload("key_alt_f4_wait_windowsterminal_exe", machine_id="win") + + assert payload["machine_id"] == "win" + assert payload["params"]["execution_mode"] == "supervised" + assert payload["params"]["competence_id"] == "key_alt_f4_wait_windowsterminal_exe" + assert payload["actions"][1]["type"] == "key_combo" + assert payload["actions"][1]["keys"] == ["alt", "f4"] + + +def test_vwb_catalog_execute_plans_competence_replay(): + from visual_workflow_builder.backend.catalog_routes_v2_vlm import catalog_bp + + app = Flask(__name__) + app.register_blueprint(catalog_bp) + + with app.test_client() as client: + response = client.post( + "/api/vwb/catalog/execute", + json={ + "type": "lea_competence_key_ctrl_s_wait_notepad_exe", + "step_id": "step_test", + "parameters": {"supervised": True}, + }, + ) + + assert response.status_code == 200 + data = response.get_json() + assert data["success"] is True + result = data["result"] + assert result["status"] == "planned" + assert result["output_data"]["competence_id"] == "key_ctrl_s_wait_notepad_exe" + assert [action["type"] for action in result["output_data"]["actions"]] == [ + "pause_for_human", + "key_combo", + "verify_screen", + "pause_for_human", + ] diff --git a/visual_workflow_builder/backend/catalog_routes_v2_vlm.py b/visual_workflow_builder/backend/catalog_routes_v2_vlm.py index 1b1552e94..e7ae67a7f 100644 --- a/visual_workflow_builder/backend/catalog_routes_v2_vlm.py +++ b/visual_workflow_builder/backend/catalog_routes_v2_vlm.py @@ -86,11 +86,13 @@ except ImportError: try: from core.competences.catalog import load_competence_catalog_actions + from core.competences.replay import build_competence_replay_payload COMPETENCE_CATALOG_AVAILABLE = True except ImportError as e: print(f"⚠️ Catalogue compétences Léa non disponible: {e}") COMPETENCE_CATALOG_AVAILABLE = False load_competence_catalog_actions = None + build_competence_replay_payload = None # ============================================================================ @@ -1088,6 +1090,119 @@ def _load_lea_competence_actions() -> List[Dict[str, Any]]: return [] +def _extract_competence_id(action_type: str, parameters: Dict[str, Any]) -> str: + competence_id = str(parameters.get("competence_id") or "").strip() + if competence_id: + return competence_id + prefix = "lea_competence_" + if action_type.startswith(prefix): + return action_type[len(prefix):] + return "" + + +def _execute_lea_competence_action( + data: Dict[str, Any], + action_type: str, + step_id: str, + parameters: Dict[str, Any], +): + """Plan or start a supervised replay from a persisted competence YAML.""" + + if not COMPETENCE_CATALOG_AVAILABLE or build_competence_replay_payload is None: + return jsonify({ + "success": False, + "error": "Catalogue compétences Léa non disponible", + }), 503 + + competence_id = _extract_competence_id(action_type, parameters) + if not competence_id: + return jsonify({ + "success": False, + "error": "Paramètre competence_id requis", + }), 400 + + supervised = bool(parameters.get("supervised", True)) + try: + payload = build_competence_replay_payload( + competence_id, + supervised=supervised, + machine_id=data.get("machine_id") or data.get("machineId") or parameters.get("machine_id"), + session_id=data.get("session_id") or data.get("sessionId") or parameters.get("session_id"), + ) + except KeyError: + return jsonify({ + "success": False, + "error": f"Compétence '{competence_id}' introuvable", + }), 404 + + start_replay = bool(parameters.get("start_replay", False)) + if not start_replay: + return jsonify({ + "success": True, + "result": { + "action_id": f"test_competence_{competence_id}_{step_id}", + "step_id": step_id, + "status": "planned", + "output_data": { + "competence_id": competence_id, + "supervised": supervised, + "actions": payload["actions"], + "raw_replay_payload": payload, + "message": "Plan de replay supervisé prêt. Relancer avec start_replay=true pour injecter dans le streaming server.", + }, + "evidence_list": [], + "error": None, + }, + }) + + try: + stream_token = os.environ.get("RPA_API_TOKEN", "") + headers = {"Authorization": f"Bearer {stream_token}"} if stream_token else {} + response = requests.post( + f"{os.environ.get('RPA_STREAMING_SERVER_URL', 'http://localhost:5005')}/api/v1/traces/stream/replay/raw", + json=payload, + headers=headers, + timeout=5, + ) + except Exception as e: + return jsonify({ + "success": False, + "error": f"Impossible de contacter le streaming server: {e}", + "raw_replay_payload": payload, + }), 502 + + try: + replay_data = response.json() + except Exception: + replay_data = {"raw": response.text} + + if not response.ok: + return jsonify({ + "success": False, + "error": "Le streaming server a refusé le replay compétence", + "status_code": response.status_code, + "detail": replay_data, + "raw_replay_payload": payload, + }), response.status_code + + return jsonify({ + "success": True, + "result": { + "action_id": f"test_competence_{competence_id}_{step_id}", + "step_id": step_id, + "status": "started", + "output_data": { + "competence_id": competence_id, + "supervised": supervised, + "replay": replay_data, + "raw_replay_payload": payload, + }, + "evidence_list": [], + "error": None, + }, + }) + + def get_screen_capturer(): """ Obtient l'instance du ScreenCapturer (initialisation paresseuse). @@ -1804,6 +1919,9 @@ def execute_action(): parameters = data.get('parameters', {}) + if action_type == "test_competence" or action_type.startswith("lea_competence_"): + return _execute_lea_competence_action(data, action_type, step_id, parameters) + # LOG DEBUG - Voir ce qui arrive du frontend print(f"\n{'='*60}") print(f"🔥 REQUÊTE EXECUTE REÇUE:")