feat(competences): plan supervised replay tests

This commit is contained in:
Dom
2026-05-29 11:38:12 +02:00
parent 762e75a077
commit a49f59b4d6
5 changed files with 364 additions and 2 deletions

View File

@@ -5,10 +5,17 @@ from .catalog import (
load_competence_catalog_actions,
load_competences,
)
from .replay import (
build_competence_replay_actions,
build_competence_replay_payload,
find_competence,
)
__all__ = [
"CompetenceSummary",
"build_competence_replay_actions",
"build_competence_replay_payload",
"find_competence",
"load_competence_catalog_actions",
"load_competences",
]

View File

@@ -155,6 +155,20 @@ def competence_to_catalog_action(summary: CompetenceSummary) -> dict[str, Any]:
"default": True,
"description": "Exécuter en mode supervisé humain",
},
"start_replay": {
"type": "boolean",
"required": False,
"default": False,
"description": "Injecter immédiatement le replay dans le streaming server",
},
},
"test_action": {
"type": "test_competence",
"parameters": {
"competence_id": summary.id,
"supervised": True,
"start_replay": False,
},
},
"methods": list(summary.methods),
"success_marker": summary.success_marker,
@@ -167,6 +181,7 @@ def competence_to_catalog_action(summary: CompetenceSummary) -> dict[str, Any]:
"parameters": {
"competence_id": summary.id,
"supervised": True,
"start_replay": False,
},
}
],
@@ -198,4 +213,3 @@ def _method_summaries(methods: Any) -> list[dict[str, Any]]:
}
)
return summaries

165
core/competences/replay.py Normal file
View File

@@ -0,0 +1,165 @@
"""Convert persisted competence YAML files into supervised replay actions."""
from __future__ import annotations
from pathlib import Path
from typing import Any, Iterable
from .catalog import DEFAULT_COMPETENCE_ROOT, CompetenceSummary, load_competences
def find_competence(
competence_id: str,
*,
root: Path | str = DEFAULT_COMPETENCE_ROOT,
states: Iterable[str] | None = None,
) -> CompetenceSummary:
"""Find one competence by id across persisted YAML states."""
for competence in load_competences(root=root, states=states):
if competence.id == competence_id:
return competence
raise KeyError(f"Competence '{competence_id}' not found")
def build_competence_replay_actions(
competence_id: str,
*,
root: Path | str = DEFAULT_COMPETENCE_ROOT,
supervised: bool = True,
) -> list[dict[str, Any]]:
"""Build Agent V1 raw replay actions for a competence.
Candidate competences are intentionally wrapped with human pauses. This
makes the first runtime pass an explicit supervised test instead of an
autonomous assertion that the competence is already stable.
"""
competence = find_competence(competence_id, root=root)
actions: list[dict[str, Any]] = []
if supervised:
actions.append(_pause_action(competence, phase="before"))
for index, method in enumerate(competence.methods, start=1):
action = _method_to_replay_action(competence, method, index)
if action:
actions.append(action)
if supervised:
actions.append(_pause_action(competence, phase="after"))
return actions
def build_competence_replay_payload(
competence_id: str,
*,
root: Path | str = DEFAULT_COMPETENCE_ROOT,
supervised: bool = True,
machine_id: str | None = None,
session_id: str | None = None,
) -> dict[str, Any]:
"""Build the payload expected by `/api/v1/traces/stream/replay/raw`."""
competence = find_competence(competence_id, root=root)
actions = build_competence_replay_actions(competence_id, root=root, supervised=supervised)
payload: dict[str, Any] = {
"actions": actions,
"task_description": f"Test compétence Léa: {competence.intent_fr}",
"params": {
"execution_mode": "supervised" if supervised else "autonomous",
"competence_id": competence.id,
"learning_state": competence.learning_state,
},
}
if machine_id:
payload["machine_id"] = machine_id
if session_id:
payload["session_id"] = session_id
return payload
def _method_to_replay_action(
competence: CompetenceSummary,
method: dict[str, Any],
index: int,
) -> dict[str, Any] | None:
kind = method.get("kind")
params = method.get("parameters") if isinstance(method.get("parameters"), dict) else {}
action_id = f"competence_{competence.id}_{index}_{kind or 'step'}"
if kind == "key_combo":
keys = params.get("keys")
if not isinstance(keys, list) or not keys:
return None
return {
"action_id": action_id,
"type": "key_combo",
"keys": [str(key) for key in keys],
"intention": competence.intent_fr,
"competence_id": competence.id,
"source_method_id": method.get("id"),
}
if kind == "wait_state":
expected = params.get("expected_state") if isinstance(params.get("expected_state"), dict) else {}
titles = expected.get("window_title_in") if isinstance(expected.get("window_title_in"), list) else []
timeout_ms = params.get("timeout_ms") if isinstance(params.get("timeout_ms"), int) else 5000
if titles:
return {
"action_id": action_id,
"type": "verify_screen",
"expected_node": f"competence:{competence.id}:wait_state",
"expected_window_title_contains": [str(title) for title in titles],
"timeout_ms": timeout_ms,
"intention": competence.intent_fr,
"competence_id": competence.id,
"source_method_id": method.get("id"),
"expected_state": expected,
}
return {
"action_id": action_id,
"type": "wait",
"duration_ms": min(timeout_ms, 5000),
"intention": competence.intent_fr,
"competence_id": competence.id,
"source_method_id": method.get("id"),
}
return None
def _pause_action(competence: CompetenceSummary, *, phase: str) -> dict[str, Any]:
failure = competence.failure_message_template
gaps = ", ".join(str(gap.get("id")) for gap in competence.t2_known_gaps if gap.get("id"))
if phase == "before":
message = (
f"Prépare le test supervisé de la compétence '{competence.id}'. "
f"Intention: {competence.intent_fr}. "
f"Attendu: {failure.get('attendu', 'état attendu non renseigné')}."
)
if gaps:
message += f" Points à surveiller: {gaps}."
else:
message = (
f"Valide le résultat de la compétence '{competence.id}'. "
f"Intention: {failure.get('intention', competence.intent_fr)}. "
f"Attendu: {failure.get('attendu', 'état attendu non renseigné')}. "
"Indique si Léa peut enregistrer ce test comme succès supervisé ou si une correction est nécessaire."
)
return {
"action_id": f"competence_{competence.id}_pause_{phase}",
"type": "pause_for_human",
"competence_id": competence.id,
"parameters": {
"message": message,
"intention": failure.get("intention", competence.intent_fr),
"attendu": failure.get("attendu", ""),
"demande": failure.get("demande", ""),
"phase": phase,
},
}

View File

@@ -1,4 +1,7 @@
from flask import Flask
from core.competences.catalog import load_competence_catalog_actions, load_competences
from core.competences.replay import build_competence_replay_actions, build_competence_replay_payload
def test_load_candidate_competences_from_yaml_catalog():
@@ -23,6 +26,8 @@ def test_competence_catalog_actions_include_runtime_gap_metadata():
assert alt_f4["source"] == "competence_yaml"
assert "fermer la fenêtre Bloc-notes" in alt_f4["name"]
assert alt_f4["parameters"]["supervised"]["default"] is True
assert alt_f4["parameters"]["start_replay"]["default"] is False
assert alt_f4["test_action"]["type"] == "test_competence"
assert alt_f4["t2_known_gaps"][0]["id"] == "alt_f4_confirmation_dialog_not_covered"
@@ -32,3 +37,56 @@ def test_competence_catalog_actions_are_deterministic():
assert [action["id"] for action in first] == [action["id"] for action in second]
def test_build_competence_supervised_replay_actions():
actions = build_competence_replay_actions("key_win_r_wait_explorer_exe")
assert [action["type"] for action in actions] == [
"pause_for_human",
"key_combo",
"verify_screen",
"pause_for_human",
]
assert actions[1]["keys"] == ["win", "r"]
assert actions[2]["expected_window_title_contains"] == ["Exécuter"]
assert actions[2]["expected_state"]["process_active"] == "explorer.exe"
def test_build_competence_raw_replay_payload_is_supervised():
payload = build_competence_replay_payload("key_alt_f4_wait_windowsterminal_exe", machine_id="win")
assert payload["machine_id"] == "win"
assert payload["params"]["execution_mode"] == "supervised"
assert payload["params"]["competence_id"] == "key_alt_f4_wait_windowsterminal_exe"
assert payload["actions"][1]["type"] == "key_combo"
assert payload["actions"][1]["keys"] == ["alt", "f4"]
def test_vwb_catalog_execute_plans_competence_replay():
from visual_workflow_builder.backend.catalog_routes_v2_vlm import catalog_bp
app = Flask(__name__)
app.register_blueprint(catalog_bp)
with app.test_client() as client:
response = client.post(
"/api/vwb/catalog/execute",
json={
"type": "lea_competence_key_ctrl_s_wait_notepad_exe",
"step_id": "step_test",
"parameters": {"supervised": True},
},
)
assert response.status_code == 200
data = response.get_json()
assert data["success"] is True
result = data["result"]
assert result["status"] == "planned"
assert result["output_data"]["competence_id"] == "key_ctrl_s_wait_notepad_exe"
assert [action["type"] for action in result["output_data"]["actions"]] == [
"pause_for_human",
"key_combo",
"verify_screen",
"pause_for_human",
]

View File

@@ -86,11 +86,13 @@ except ImportError:
try:
from core.competences.catalog import load_competence_catalog_actions
from core.competences.replay import build_competence_replay_payload
COMPETENCE_CATALOG_AVAILABLE = True
except ImportError as e:
print(f"⚠️ Catalogue compétences Léa non disponible: {e}")
COMPETENCE_CATALOG_AVAILABLE = False
load_competence_catalog_actions = None
build_competence_replay_payload = None
# ============================================================================
@@ -1088,6 +1090,119 @@ def _load_lea_competence_actions() -> List[Dict[str, Any]]:
return []
def _extract_competence_id(action_type: str, parameters: Dict[str, Any]) -> str:
competence_id = str(parameters.get("competence_id") or "").strip()
if competence_id:
return competence_id
prefix = "lea_competence_"
if action_type.startswith(prefix):
return action_type[len(prefix):]
return ""
def _execute_lea_competence_action(
data: Dict[str, Any],
action_type: str,
step_id: str,
parameters: Dict[str, Any],
):
"""Plan or start a supervised replay from a persisted competence YAML."""
if not COMPETENCE_CATALOG_AVAILABLE or build_competence_replay_payload is None:
return jsonify({
"success": False,
"error": "Catalogue compétences Léa non disponible",
}), 503
competence_id = _extract_competence_id(action_type, parameters)
if not competence_id:
return jsonify({
"success": False,
"error": "Paramètre competence_id requis",
}), 400
supervised = bool(parameters.get("supervised", True))
try:
payload = build_competence_replay_payload(
competence_id,
supervised=supervised,
machine_id=data.get("machine_id") or data.get("machineId") or parameters.get("machine_id"),
session_id=data.get("session_id") or data.get("sessionId") or parameters.get("session_id"),
)
except KeyError:
return jsonify({
"success": False,
"error": f"Compétence '{competence_id}' introuvable",
}), 404
start_replay = bool(parameters.get("start_replay", False))
if not start_replay:
return jsonify({
"success": True,
"result": {
"action_id": f"test_competence_{competence_id}_{step_id}",
"step_id": step_id,
"status": "planned",
"output_data": {
"competence_id": competence_id,
"supervised": supervised,
"actions": payload["actions"],
"raw_replay_payload": payload,
"message": "Plan de replay supervisé prêt. Relancer avec start_replay=true pour injecter dans le streaming server.",
},
"evidence_list": [],
"error": None,
},
})
try:
stream_token = os.environ.get("RPA_API_TOKEN", "")
headers = {"Authorization": f"Bearer {stream_token}"} if stream_token else {}
response = requests.post(
f"{os.environ.get('RPA_STREAMING_SERVER_URL', 'http://localhost:5005')}/api/v1/traces/stream/replay/raw",
json=payload,
headers=headers,
timeout=5,
)
except Exception as e:
return jsonify({
"success": False,
"error": f"Impossible de contacter le streaming server: {e}",
"raw_replay_payload": payload,
}), 502
try:
replay_data = response.json()
except Exception:
replay_data = {"raw": response.text}
if not response.ok:
return jsonify({
"success": False,
"error": "Le streaming server a refusé le replay compétence",
"status_code": response.status_code,
"detail": replay_data,
"raw_replay_payload": payload,
}), response.status_code
return jsonify({
"success": True,
"result": {
"action_id": f"test_competence_{competence_id}_{step_id}",
"step_id": step_id,
"status": "started",
"output_data": {
"competence_id": competence_id,
"supervised": supervised,
"replay": replay_data,
"raw_replay_payload": payload,
},
"evidence_list": [],
"error": None,
},
})
def get_screen_capturer():
"""
Obtient l'instance du ScreenCapturer (initialisation paresseuse).
@@ -1804,6 +1919,9 @@ def execute_action():
parameters = data.get('parameters', {})
if action_type == "test_competence" or action_type.startswith("lea_competence_"):
return _execute_lea_competence_action(data, action_type, step_id, parameters)
# LOG DEBUG - Voir ce qui arrive du frontend
print(f"\n{'='*60}")
print(f"🔥 REQUÊTE EXECUTE REÇUE:")