diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 853b25260..ab3d55ae7 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -32,6 +32,10 @@ from .replay_learner import ReplayLearner from .audit_trail import AuditTrail, AuditEntry from .stream_processor import StreamProcessor, build_replay_from_raw_events, enrich_click_from_screenshot from .worker_stream import StreamWorker +from .execution_plan_runner import ( + execution_plan_to_actions, + inject_plan_into_queue, +) # Instance globale du vérificateur de replay (comparaison screenshots avant/après) _replay_verifier = ReplayVerifier() @@ -438,6 +442,34 @@ class SingleActionRequest(BaseModel): machine_id: Optional[str] = None # Machine cible (multi-machine) +class PlanReplayRequest(BaseModel): + """Requête de lancement de replay depuis un ExecutionPlan (pipeline V4). + + Deux modes supportés : + 1. Référence par ID : fournir `plan_id` → le serveur charge le plan + depuis `data/plans/{plan_id}.json`. + 2. Plan inline : fournir `plan` (dict JSON) → utilisé directement. + + Les `variables` écrasent celles du plan. + """ + plan_id: Optional[str] = None + plan: Optional[Dict[str, Any]] = None + variables: Optional[Dict[str, Any]] = None + session_id: str = "" + machine_id: Optional[str] = None + + +class CompileWorkflowRequest(BaseModel): + """Requête de compilation d'une session en WorkflowIR + ExecutionPlan.""" + session_id: str + machine_id: str = "default" + domain: str = "generic" + name: str = "" + target_machine: str = "" + target_resolution: str = "1280x800" + params: Optional[Dict[str, str]] = None + + class ReplayResultReport(BaseModel): """Rapport de résultat d'exécution d'une action par l'Agent V1.""" session_id: str @@ -1906,6 +1938,369 @@ async def enqueue_single_action(request: SingleActionRequest): } +# ========================================================================= +# Pipeline V4 — ExecutionPlan → Runtime (nouveau chemin) +# ========================================================================= +# RawTrace → IRBuilder → WorkflowIR → ExecutionCompiler → ExecutionPlan → Runtime +# +# Ces deux endpoints sont optionnels et coexistent avec le chemin legacy +# (build_replay_from_raw_events() dans stream_processor.py). Ils permettent +# de lancer un replay depuis un plan pré-compilé, déterministe et borné. +# ========================================================================= + +# Répertoires par défaut pour la persistance du pipeline V4 +WORKFLOWS_IR_DIR = ROOT_DIR / "data" / "workflows_ir" +EXECUTION_PLANS_DIR = ROOT_DIR / "data" / "plans" + + +def _load_execution_plan(plan_id: str): + """Charger un ExecutionPlan depuis le disque (data/plans/{id}.json).""" + from core.workflow.execution_plan import ExecutionPlan + + # Chemin direct + candidate = EXECUTION_PLANS_DIR / f"{plan_id}.json" + if candidate.exists(): + return ExecutionPlan.load(str(candidate)) + + # Fallback : recherche par prefix (plan_id sans _vN) + if EXECUTION_PLANS_DIR.exists(): + for p in EXECUTION_PLANS_DIR.glob(f"{plan_id}*.json"): + return ExecutionPlan.load(str(p)) + + return None + + +@app.post("/api/v1/traces/stream/replay/plan") +async def launch_replay_from_plan(request: PlanReplayRequest): + """Lancer un replay depuis un ExecutionPlan (pipeline V4). + + Pipeline : + 1. Charger le plan (depuis plan_id sur disque ou depuis le body inline) + 2. Convertir chaque ExecutionNode en action replay via + execution_plan_runner.execution_plan_to_actions() + 3. Appliquer les variables (body > plan.variables) + 4. Valider chaque action (sécurité HIGH) + 5. Injecter dans la queue de replay de la session Agent V1 cible + + Pas de dépendance au VLM au runtime pour les cas normaux — les stratégies + de résolution sont déjà pré-compilées dans le plan. + """ + from core.workflow.execution_plan import ExecutionPlan + + # ── 1. Charger / parser le plan ── + plan = None + if request.plan_id: + plan = _load_execution_plan(request.plan_id) + if plan is None: + raise HTTPException( + status_code=404, + detail=f"ExecutionPlan '{request.plan_id}' introuvable dans " + f"{EXECUTION_PLANS_DIR}/", + ) + elif request.plan: + try: + plan = ExecutionPlan.from_dict(request.plan) + except Exception as e: + raise HTTPException( + status_code=400, + detail=f"Impossible de parser le plan inline : {e}", + ) + else: + raise HTTPException( + status_code=400, + detail="Fournir 'plan_id' (référence) ou 'plan' (inline).", + ) + + if not plan.nodes: + raise HTTPException( + status_code=400, + detail=f"ExecutionPlan '{plan.plan_id}' : aucun nœud à exécuter.", + ) + + # ── 2. Convertir les nœuds en actions replay ── + try: + actions = execution_plan_to_actions( + plan=plan, + variables=request.variables, + id_prefix="act_plan", + ) + except Exception as e: + logger.exception("Erreur conversion ExecutionPlan → actions") + raise HTTPException( + status_code=500, + detail=f"Erreur de conversion du plan : {e}", + ) + + if not actions: + raise HTTPException( + status_code=400, + detail=f"ExecutionPlan '{plan.plan_id}' : aucune action exploitable " + f"après conversion ({plan.total_nodes} nœuds).", + ) + + # Limite de sécurité + if len(actions) > MAX_ACTIONS_PER_REPLAY: + raise HTTPException( + status_code=400, + detail=f"Trop d'actions ({len(actions)} > {MAX_ACTIONS_PER_REPLAY}).", + ) + + # ── 3. Validation de chaque action (sécurité HIGH) ── + validated: List[Dict[str, Any]] = [] + for i, action in enumerate(actions): + error = _validate_replay_action(action) + if error: + logger.warning( + "replay/plan : action #%d invalide (%s), suppression", i, error, + ) + continue + validated.append(action) + + if not validated: + raise HTTPException( + status_code=400, + detail=f"ExecutionPlan '{plan.plan_id}' : toutes les actions " + f"ont été rejetées par la validation.", + ) + + # ── 4. Trouver la session Agent V1 cible ── + target_session_id = request.session_id + if not target_session_id or target_session_id.startswith("chat_"): + active_session = _find_active_agent_session(machine_id=request.machine_id) + if active_session: + target_session_id = active_session + else: + machine_hint = ( + f" sur la machine '{request.machine_id}'" if request.machine_id else "" + ) + raise HTTPException( + status_code=404, + detail=f"Aucune session Agent V1 active{machine_hint}. " + "Lancez l'Agent V1 sur le PC cible.", + ) + + # ── 5. Injecter dans la queue de replay ── + replay_id = f"replay_plan_{uuid.uuid4().hex[:8]}" + + session_obj = processor.session_manager.get_session(target_session_id) + resolved_machine_id = ( + request.machine_id + or (session_obj.machine_id if session_obj else "default") + ) + + with _replay_lock: + _replay_queues[target_session_id] = list(validated) + _replay_states[replay_id] = _create_replay_state( + replay_id=replay_id, + workflow_id=f"execution_plan:{plan.plan_id}", + session_id=target_session_id, + total_actions=len(validated), + params=dict(plan.variables or {}), + machine_id=resolved_machine_id, + ) + if resolved_machine_id and resolved_machine_id != "default": + _machine_replay_target[resolved_machine_id] = target_session_id + + # Signaler au worker VLM qu'un replay est actif → se suspendre + _set_replay_lock(replay_id) + + logger.info( + "Replay plan V4 démarré : %s | plan=%s (v%d) | session=%s | " + "machine=%s | %d actions (total_nodes=%d, rejected=%d)", + replay_id, plan.plan_id, plan.version, target_session_id, + resolved_machine_id, len(validated), plan.total_nodes, + len(actions) - len(validated), + ) + + return { + "replay_id": replay_id, + "status": "running", + "plan_id": plan.plan_id, + "workflow_id": plan.workflow_id, + "plan_version": plan.version, + "session_id": target_session_id, + "machine_id": resolved_machine_id, + "total_actions": len(validated), + "total_nodes": plan.total_nodes, + "rejected_actions": len(actions) - len(validated), + "stats": { + "nodes_with_ocr": plan.nodes_with_ocr, + "nodes_with_template": plan.nodes_with_template, + "nodes_with_vlm": plan.nodes_with_vlm, + "estimated_duration_s": plan.estimated_duration_s, + }, + } + + +@app.post("/api/v1/traces/stream/workflow/compile") +async def compile_workflow_endpoint(request: CompileWorkflowRequest): + """Compiler une session en WorkflowIR + ExecutionPlan (pipeline V4). + + Pipeline : + 1. Charger les événements bruts de la session (live_events.jsonl) + 2. IRBuilder.build() → WorkflowIR (connaissance métier) + 3. WorkflowIR.save() → persistance dans data/workflows_ir/ + 4. ExecutionCompiler.compile() → ExecutionPlan (plan déterministe) + 5. ExecutionPlan.save() → persistance dans data/plans/ + 6. Retourner les IDs pour lancer ensuite /replay/plan + + Cette endpoint NE LANCE PAS le replay — elle prépare le plan. + L'appelant doit ensuite appeler /replay/plan avec plan_id. + """ + from core.workflow.execution_compiler import ExecutionCompiler + from core.workflow.ir_builder import IRBuilder + + session_id = request.session_id + machine_id = request.machine_id or "default" + + if not session_id: + raise HTTPException(status_code=400, detail="session_id requis") + + # ── 1. Trouver le fichier live_events.jsonl de la session ── + events_file = None + if machine_id and machine_id != "default": + candidate = LIVE_SESSIONS_DIR / machine_id / session_id / "live_events.jsonl" + if candidate.exists(): + events_file = candidate + + if not events_file and LIVE_SESSIONS_DIR.exists(): + for machine_dir in LIVE_SESSIONS_DIR.iterdir(): + if not machine_dir.is_dir(): + continue + candidate = machine_dir / session_id / "live_events.jsonl" + if candidate.exists(): + events_file = candidate + if machine_id == "default": + machine_id = machine_dir.name + break + + if not events_file: + candidate = LIVE_SESSIONS_DIR / session_id / "live_events.jsonl" + if candidate.exists(): + events_file = candidate + + if not events_file: + raise HTTPException( + status_code=404, + detail=f"Session '{session_id}' : live_events.jsonl introuvable.", + ) + + # ── 2. Charger les événements ── + raw_events: List[Dict[str, Any]] = [] + try: + for line in events_file.read_text(encoding="utf-8").splitlines(): + line = line.strip() + if not line: + continue + try: + raw_events.append(json.loads(line)) + except json.JSONDecodeError: + continue + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Erreur lecture events : {e}", + ) + + if not raw_events: + raise HTTPException( + status_code=400, + detail=f"Session '{session_id}' : aucun événement.", + ) + + # ── 3. IRBuilder → WorkflowIR ── + try: + builder = IRBuilder() + ir = builder.build( + events=raw_events, + session_id=session_id, + session_dir=str(events_file.parent), + domain=request.domain, + name=request.name, + ) + except Exception as e: + logger.exception("Erreur IRBuilder.build()") + raise HTTPException( + status_code=500, + detail=f"Erreur de construction WorkflowIR : {e}", + ) + + if not ir.steps: + raise HTTPException( + status_code=400, + detail=f"Session '{session_id}' : aucune étape détectée " + f"(pipeline IRBuilder a produit un workflow vide).", + ) + + # ── 4. Sauvegarder le WorkflowIR ── + try: + WORKFLOWS_IR_DIR.mkdir(parents=True, exist_ok=True) + ir_path = ir.save(str(WORKFLOWS_IR_DIR)) + except Exception as e: + logger.exception("Erreur sauvegarde WorkflowIR") + raise HTTPException( + status_code=500, + detail=f"Erreur sauvegarde WorkflowIR : {e}", + ) + + # ── 5. ExecutionCompiler → ExecutionPlan ── + try: + compiler = ExecutionCompiler() + plan = compiler.compile( + ir=ir, + target_machine=request.target_machine, + target_resolution=request.target_resolution, + params=request.params, + ) + except Exception as e: + logger.exception("Erreur ExecutionCompiler.compile()") + raise HTTPException( + status_code=500, + detail=f"Erreur de compilation du plan : {e}", + ) + + # ── 6. Sauvegarder l'ExecutionPlan ── + try: + EXECUTION_PLANS_DIR.mkdir(parents=True, exist_ok=True) + plan_path = plan.save(str(EXECUTION_PLANS_DIR)) + except Exception as e: + logger.exception("Erreur sauvegarde ExecutionPlan") + raise HTTPException( + status_code=500, + detail=f"Erreur sauvegarde ExecutionPlan : {e}", + ) + + logger.info( + "Compilation V4 : session=%s → workflow_ir=%s (v%d) → plan=%s " + "(%d nœuds, OCR=%d, template=%d, VLM=%d)", + session_id, ir.workflow_id, ir.version, plan.plan_id, + plan.total_nodes, plan.nodes_with_ocr, plan.nodes_with_template, + plan.nodes_with_vlm, + ) + + return { + "session_id": session_id, + "machine_id": machine_id, + "workflow_id": ir.workflow_id, + "workflow_version": ir.version, + "workflow_ir_path": str(ir_path), + "workflow_name": ir.name, + "domain": ir.domain, + "steps": len(ir.steps), + "variables": len(ir.variables), + "applications": ir.applications, + "plan_id": plan.plan_id, + "plan_path": str(plan_path), + "total_nodes": plan.total_nodes, + "stats": { + "nodes_with_ocr": plan.nodes_with_ocr, + "nodes_with_template": plan.nodes_with_template, + "nodes_with_vlm": plan.nodes_with_vlm, + "estimated_duration_s": plan.estimated_duration_s, + }, + } + + # ========================================================================= # Pre-check écran — Vérification pré-action par embedding CLIP # ========================================================================= diff --git a/agent_v0/server_v1/execution_plan_runner.py b/agent_v0/server_v1/execution_plan_runner.py new file mode 100644 index 000000000..dcb5df692 --- /dev/null +++ b/agent_v0/server_v1/execution_plan_runner.py @@ -0,0 +1,322 @@ +# agent_v0/server_v1/execution_plan_runner.py +""" +ExecutionPlanRunner — Adaptateur ExecutionPlan → actions replay. + +Pièce d'intégration du pipeline V4 : + RawTrace → IRBuilder → WorkflowIR → ExecutionCompiler → ExecutionPlan → Runtime + +Ce module convertit un `ExecutionPlan` (plan pré-compilé, déterministe) en +liste d'actions au format attendu par l'executor replay actuel (clé x_pct, +y_pct, target_spec, etc.), puis les injecte dans `_replay_queues`. + +L'ancien chemin `build_replay_from_raw_events()` dans stream_processor.py +reste inchangé — les deux chemins coexistent pendant la transition. + +Format d'action produit (compatible executor existant) : + { + "action_id": "act_...", + "type": "click", + "x_pct": 0.5, + "y_pct": 0.3, + "visual_mode": True, + "target_spec": { + "by_text": "...", + "window_title": "...", + "vlm_description": "...", + "anchor_image_base64": "...", + }, + "expected_window_title": "...", + } + +Auteur: Dom, Alice - Avril 2026 +""" + +from __future__ import annotations + +import logging +import re +import threading +import uuid +from typing import Any, Dict, List, Optional + +from core.workflow.execution_plan import ( + ExecutionNode, + ExecutionPlan, + ResolutionStrategy, +) + +logger = logging.getLogger(__name__) + + +# ========================================================================= +# Substitution de variables +# ========================================================================= +# Le WorkflowIR utilise la syntaxe `{var}` dans les champs texte. +# Ici on supporte les deux : `{var}` (IR natif) et `${var}` (replay legacy). +_VARIABLE_RE_CURLY = re.compile(r"\{(\w+)\}") +_VARIABLE_RE_DOLLAR = re.compile(r"\$\{(\w+)\}") + + +def substitute_variables(text: str, variables: Dict[str, Any]) -> str: + """Remplacer `{var}` et `${var}` par leurs valeurs. + + Priorité : variables fournies > placeholder brut (inchangé si inconnu). + """ + if not text or not variables: + return text + + def replacer(match: "re.Match[str]") -> str: + var_name = match.group(1) + if var_name in variables: + return str(variables[var_name]) + return match.group(0) + + text = _VARIABLE_RE_DOLLAR.sub(replacer, text) + text = _VARIABLE_RE_CURLY.sub(replacer, text) + return text + + +# ========================================================================= +# Conversion ExecutionNode → action replay +# ========================================================================= + + +def _strategy_to_target_spec( + strategy: Optional[ResolutionStrategy], + fallbacks: Optional[List[ResolutionStrategy]] = None, + intent: str = "", +) -> Dict[str, Any]: + """Construire un `target_spec` depuis les stratégies de résolution. + + Fusionne la primaire et les fallbacks pour donner un maximum d'indices + au resolve_engine : + - OCR → by_text + - template → anchor_image_base64 (depuis anchor_b64) + - VLM → vlm_description + + Règle : la stratégie primaire dicte la méthode préférée, mais on expose + toutes les ancres connues pour que le runtime puisse retomber dessus. + """ + spec: Dict[str, Any] = {} + + all_strategies: List[ResolutionStrategy] = [] + if strategy is not None: + all_strategies.append(strategy) + if fallbacks: + all_strategies.extend(fallbacks) + + by_text_candidate = "" + anchor_candidate = "" + vlm_candidate = "" + + for strat in all_strategies: + if not strat: + continue + if strat.method == "ocr" and strat.target_text and not by_text_candidate: + by_text_candidate = strat.target_text + elif strat.method == "template": + if strat.anchor_b64 and not anchor_candidate: + anchor_candidate = strat.anchor_b64 + if strat.target_text and not by_text_candidate: + by_text_candidate = strat.target_text + elif strat.method == "vlm" and strat.vlm_description and not vlm_candidate: + vlm_candidate = strat.vlm_description + + if by_text_candidate: + spec["by_text"] = by_text_candidate + if anchor_candidate: + spec["anchor_image_base64"] = anchor_candidate + if vlm_candidate: + spec["vlm_description"] = vlm_candidate + elif intent and "vlm_description" not in spec: + # L'intention métier devient le prompt VLM de dernier recours + spec["vlm_description"] = intent + + return spec + + +def execution_node_to_action( + node: ExecutionNode, + variables: Optional[Dict[str, Any]] = None, + id_prefix: str = "act_plan", +) -> Optional[Dict[str, Any]]: + """Convertir un `ExecutionNode` en action replay. + + Retourne `None` si le nœud n'est pas exécutable (type inconnu). + + Args: + node: Le nœud à convertir. + variables: Dictionnaire de variables pour substituer les {var}. + id_prefix: Préfixe pour l'action_id générée. + """ + variables = variables or {} + + action: Dict[str, Any] = { + "action_id": f"{id_prefix}_{uuid.uuid4().hex[:8]}", + "plan_node_id": node.node_id, + } + + if node.intent: + action["intention"] = node.intent + if node.step_id: + action["plan_step_id"] = node.step_id + if node.is_optional: + action["is_optional"] = True + + # Métadonnées d'exécution utiles au runtime + if node.timeout_ms: + action["timeout_ms"] = node.timeout_ms + if node.max_retries: + action["max_retries"] = node.max_retries + if node.recovery_action: + action["recovery_action"] = node.recovery_action + if node.success_condition: + action["success_condition"] = node.success_condition.to_dict() + + action_type = node.action_type + + if action_type == "click": + action["type"] = "click" + + strategy = node.strategy_primary + fallbacks = node.strategy_fallbacks or [] + + # ── Déduction des coordonnées depuis la stratégie primaire ── + # - OCR : pas de coordonnées (le runtime trouve via OCR) + # - template : l'anchor sera utilisé au runtime + # - VLM : la description sera utilisée au runtime + # Dans tous les cas le resolve_engine retrouve les pixels au replay. + # On expose néanmoins un centre (0.5, 0.5) neutre pour rester + # compatible avec les validations de queue existantes. + action["x_pct"] = 0.5 + action["y_pct"] = 0.5 + action["visual_mode"] = True + + target_spec = _strategy_to_target_spec( + strategy=strategy, + fallbacks=fallbacks, + intent=node.intent, + ) + + # Titre fenêtre attendu (pour la vérification post-action / pre-check) + if node.success_condition and node.success_condition.expected_title: + action["expected_window_title"] = node.success_condition.expected_title + target_spec.setdefault("window_title", node.success_condition.expected_title) + + if target_spec: + action["target_spec"] = target_spec + + elif action_type == "type": + action["type"] = "type" + text = node.text or "" + # Substituer les variables avant d'envoyer (ex: {patient} → "DUPONT") + action["text"] = substitute_variables(text, variables) + if node.variable_name: + action["variable_name"] = node.variable_name + + elif action_type in ("key_combo", "key_press"): + action["type"] = "key_combo" + keys = list(node.keys or []) + if not keys: + return None + action["keys"] = keys + + elif action_type == "wait": + action["type"] = "wait" + duration = node.duration_ms or 1000 + action["duration_ms"] = int(duration) + + elif action_type == "scroll": + action["type"] = "scroll" + # Les stratégies peuvent contenir une zone — pas exploitée ici, + # le scroll est implicitement sur la fenêtre active. + action["delta"] = -3 + + else: + logger.debug("execution_node_to_action: type inconnu '%s' ignoré", action_type) + return None + + return action + + +def execution_plan_to_actions( + plan: ExecutionPlan, + variables: Optional[Dict[str, Any]] = None, + id_prefix: str = "act_plan", +) -> List[Dict[str, Any]]: + """Convertir un `ExecutionPlan` complet en liste d'actions replay. + + Les variables passées en argument écrasent celles du plan. + """ + merged_vars: Dict[str, Any] = dict(plan.variables or {}) + if variables: + merged_vars.update(variables) + + actions: List[Dict[str, Any]] = [] + for node in plan.nodes: + action = execution_node_to_action( + node=node, + variables=merged_vars, + id_prefix=id_prefix, + ) + if action is not None: + actions.append(action) + + logger.info( + "execution_plan_to_actions(%s) : %d nœuds → %d actions replay " + "(vars=%d)", + plan.plan_id, plan.total_nodes, len(actions), len(merged_vars), + ) + return actions + + +# ========================================================================= +# Injection dans la queue de replay +# ========================================================================= + + +def inject_plan_into_queue( + plan: ExecutionPlan, + session_id: str, + replay_queues: Dict[str, List[Dict[str, Any]]], + variables: Optional[Dict[str, Any]] = None, + lock: Optional[threading.Lock] = None, + replace: bool = True, + id_prefix: str = "act_plan", +) -> List[Dict[str, Any]]: + """Injecter un `ExecutionPlan` dans la queue de replay d'une session. + + Args: + plan: Le plan à exécuter. + session_id: La session Agent V1 cible. + replay_queues: Le dict global `_replay_queues` partagé par le serveur. + variables: Variables à substituer dans les actions. + lock: Verrou optionnel à acquérir avant d'écrire (threadsafe). + replace: Si True (défaut), remplace la queue existante. Sinon, append. + id_prefix: Préfixe pour les action_id générés. + + Returns: + La liste des actions injectées (après substitution). + """ + actions = execution_plan_to_actions( + plan=plan, variables=variables, id_prefix=id_prefix, + ) + + def _write() -> None: + if replace: + replay_queues[session_id] = list(actions) + else: + replay_queues[session_id].extend(actions) + + if lock is not None: + with lock: + _write() + else: + _write() + + logger.info( + "inject_plan_into_queue(%s) : %d actions injectées dans la queue " + "de la session '%s' (replace=%s)", + plan.plan_id, len(actions), session_id, replace, + ) + return actions diff --git a/tests/unit/test_execution_plan_runner.py b/tests/unit/test_execution_plan_runner.py new file mode 100644 index 000000000..8da90357b --- /dev/null +++ b/tests/unit/test_execution_plan_runner.py @@ -0,0 +1,565 @@ +""" +Tests de execution_plan_runner — adaptateur ExecutionPlan → queue de replay. + +Vérifie que : +- Un ExecutionNode est correctement converti en action replay +- Les stratégies de résolution (OCR / template / VLM) produisent le bon target_spec +- Les variables {var} et ${var} sont substituées dans les textes +- L'injection dans la queue _replay_queues est correcte (avec et sans lock) +- La conversion d'un plan complet respecte l'ordre et les limites +- Les types d'actions non exécutables sont ignorés + +Ces tests sont isolés et ne dépendent pas du serveur FastAPI (on importe +uniquement execution_plan_runner et les dataclasses du core). +""" + +import sys +import threading +from collections import defaultdict +from pathlib import Path + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.workflow.execution_plan import ( + ExecutionNode, + ExecutionPlan, + ResolutionStrategy, + SuccessCondition, +) +from core.workflow.execution_compiler import ExecutionCompiler +from core.workflow.workflow_ir import WorkflowIR + +from agent_v0.server_v1.execution_plan_runner import ( + execution_node_to_action, + execution_plan_to_actions, + inject_plan_into_queue, + substitute_variables, +) + + +# ========================================================================= +# Substitution de variables +# ========================================================================= + + +class TestSubstituteVariables: + + def test_substitution_curly(self): + assert substitute_variables("{nom}", {"nom": "Dupont"}) == "Dupont" + + def test_substitution_dollar(self): + assert substitute_variables("${nom}", {"nom": "Dupont"}) == "Dupont" + + def test_substitution_dans_phrase(self): + assert ( + substitute_variables("Bonjour {nom}, votre code est ${code}", + {"nom": "Alice", "code": "A42"}) + == "Bonjour Alice, votre code est A42" + ) + + def test_variable_inconnue_inchangee(self): + # Une variable inconnue reste dans le texte (pas de KeyError) + assert substitute_variables("{inconnu}", {"autre": "val"}) == "{inconnu}" + + def test_texte_sans_variable(self): + assert substitute_variables("texte simple", {"x": "1"}) == "texte simple" + + def test_texte_vide(self): + assert substitute_variables("", {"x": "1"}) == "" + + def test_variables_vides(self): + assert substitute_variables("{x}", {}) == "{x}" + + +# ========================================================================= +# Conversion ExecutionNode → action replay +# ========================================================================= + + +class TestExecutionNodeToAction: + + def test_click_avec_strategie_ocr(self): + """Un clic avec stratégie OCR produit une action click visuelle avec by_text.""" + node = ExecutionNode( + node_id="n1", + action_type="click", + intent="Cliquer sur Enregistrer", + strategy_primary=ResolutionStrategy( + method="ocr", + target_text="Enregistrer", + threshold=0.8, + ), + ) + action = execution_node_to_action(node) + + assert action is not None + assert action["type"] == "click" + assert action["action_id"].startswith("act_plan_") + assert action["plan_node_id"] == "n1" + assert action["intention"] == "Cliquer sur Enregistrer" + assert action["visual_mode"] is True + assert "x_pct" in action and "y_pct" in action + assert action["target_spec"]["by_text"] == "Enregistrer" + + def test_click_avec_strategie_template(self): + """Un clic avec stratégie template expose l'anchor_image_base64.""" + node = ExecutionNode( + node_id="n2", + action_type="click", + strategy_primary=ResolutionStrategy( + method="template", + anchor_b64="AAABBBCCCDDD", + target_text="Ouvrir", + ), + ) + action = execution_node_to_action(node) + + assert action is not None + assert action["type"] == "click" + assert action["target_spec"]["anchor_image_base64"] == "AAABBBCCCDDD" + assert action["target_spec"]["by_text"] == "Ouvrir" + assert action["visual_mode"] is True + + def test_click_avec_strategie_vlm(self): + """Un clic avec stratégie VLM expose vlm_description.""" + node = ExecutionNode( + node_id="n3", + action_type="click", + strategy_primary=ResolutionStrategy( + method="vlm", + vlm_description="bouton rouge en haut à droite", + ), + ) + action = execution_node_to_action(node) + + assert action is not None + assert action["target_spec"]["vlm_description"] == "bouton rouge en haut à droite" + assert action["visual_mode"] is True + + def test_click_avec_fallbacks_ajoute_hints(self): + """Les fallbacks enrichissent le target_spec avec toutes les ancres disponibles.""" + node = ExecutionNode( + node_id="n4", + action_type="click", + intent="Ouvrir le menu", + strategy_primary=ResolutionStrategy(method="ocr", target_text="Menu"), + strategy_fallbacks=[ + ResolutionStrategy( + method="template", anchor_b64="XYZ", target_text="Menu", + ), + ResolutionStrategy( + method="vlm", vlm_description="menu déroulant", + ), + ], + ) + action = execution_node_to_action(node) + + spec = action["target_spec"] + assert spec["by_text"] == "Menu" + assert spec["anchor_image_base64"] == "XYZ" + assert spec["vlm_description"] == "menu déroulant" + + def test_click_avec_success_condition_expected_title(self): + """La success_condition avec expected_title passe dans expected_window_title.""" + node = ExecutionNode( + node_id="n5", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="OK"), + success_condition=SuccessCondition( + method="title_match", + expected_title="Document sauvegardé", + ), + ) + action = execution_node_to_action(node) + + assert action["expected_window_title"] == "Document sauvegardé" + assert action["target_spec"]["window_title"] == "Document sauvegardé" + + def test_type_avec_variable_substitution(self): + """Un node type avec variable {patient} est substitué.""" + node = ExecutionNode( + node_id="n6", + action_type="type", + text="{patient}", + variable_name="patient", + ) + action = execution_node_to_action(node, variables={"patient": "DUPONT"}) + + assert action["type"] == "type" + assert action["text"] == "DUPONT" + assert action["variable_name"] == "patient" + + def test_type_sans_variable(self): + """Un texte sans variable est inchangé.""" + node = ExecutionNode( + node_id="n7", + action_type="type", + text="Bonjour", + ) + action = execution_node_to_action(node) + assert action["text"] == "Bonjour" + + def test_key_combo(self): + """Un key_combo expose les touches.""" + node = ExecutionNode( + node_id="n8", + action_type="key_combo", + keys=["ctrl", "s"], + ) + action = execution_node_to_action(node) + + assert action["type"] == "key_combo" + assert action["keys"] == ["ctrl", "s"] + + def test_key_combo_vide_retourne_none(self): + """Un key_combo sans touches est ignoré.""" + node = ExecutionNode( + node_id="n9", + action_type="key_combo", + keys=[], + ) + assert execution_node_to_action(node) is None + + def test_wait(self): + """Un wait expose duration_ms.""" + node = ExecutionNode( + node_id="n10", + action_type="wait", + duration_ms=2500, + ) + action = execution_node_to_action(node) + + assert action["type"] == "wait" + assert action["duration_ms"] == 2500 + + def test_wait_sans_duration_default(self): + """Un wait sans duration a un défaut de 1000ms.""" + node = ExecutionNode(node_id="n11", action_type="wait") + action = execution_node_to_action(node) + assert action["duration_ms"] == 1000 + + def test_scroll(self): + """Un scroll produit une action scroll.""" + node = ExecutionNode(node_id="n12", action_type="scroll") + action = execution_node_to_action(node) + + assert action["type"] == "scroll" + assert "delta" in action + + def test_type_inconnu_retourne_none(self): + """Un type d'action inconnu est ignoré (retourne None).""" + node = ExecutionNode(node_id="n13", action_type="unknown_thing") + assert execution_node_to_action(node) is None + + def test_metadonnees_execution_propagees(self): + """timeout_ms, max_retries, recovery_action passent dans l'action.""" + node = ExecutionNode( + node_id="n14", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="X"), + timeout_ms=15000, + max_retries=3, + recovery_action="undo", + ) + action = execution_node_to_action(node) + + assert action["timeout_ms"] == 15000 + assert action["max_retries"] == 3 + assert action["recovery_action"] == "undo" + + def test_node_optionnel(self): + """is_optional est propagé.""" + node = ExecutionNode( + node_id="n15", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="X"), + is_optional=True, + ) + action = execution_node_to_action(node) + assert action["is_optional"] is True + + def test_id_prefix_custom(self): + """Le préfixe d'id peut être personnalisé.""" + node = ExecutionNode( + node_id="n16", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="X"), + ) + action = execution_node_to_action(node, id_prefix="act_custom") + assert action["action_id"].startswith("act_custom_") + + +# ========================================================================= +# Conversion ExecutionPlan → liste d'actions +# ========================================================================= + + +class TestExecutionPlanToActions: + + def _make_plan(self) -> ExecutionPlan: + plan = ExecutionPlan( + plan_id="plan_test", + workflow_id="wf_test", + version=1, + variables={"nom_fichier": "rapport.pdf"}, + ) + plan.nodes = [ + ExecutionNode( + node_id="n1", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="Ouvrir"), + ), + ExecutionNode( + node_id="n2", + action_type="type", + text="{nom_fichier}", + variable_name="nom_fichier", + ), + ExecutionNode( + node_id="n3", + action_type="key_combo", + keys=["enter"], + ), + ExecutionNode( + node_id="n4", + action_type="wait", + duration_ms=1500, + ), + ] + plan.total_nodes = 4 + return plan + + def test_conversion_ordre_respecte(self): + plan = self._make_plan() + actions = execution_plan_to_actions(plan) + + assert len(actions) == 4 + assert actions[0]["type"] == "click" + assert actions[1]["type"] == "type" + assert actions[2]["type"] == "key_combo" + assert actions[3]["type"] == "wait" + + def test_variables_du_plan_appliquees(self): + plan = self._make_plan() + actions = execution_plan_to_actions(plan) + type_action = next(a for a in actions if a["type"] == "type") + assert type_action["text"] == "rapport.pdf" + + def test_variables_override(self): + """Les variables passées en argument écrasent celles du plan.""" + plan = self._make_plan() + actions = execution_plan_to_actions( + plan, variables={"nom_fichier": "facture.pdf"}, + ) + type_action = next(a for a in actions if a["type"] == "type") + assert type_action["text"] == "facture.pdf" + + def test_plan_vide(self): + plan = ExecutionPlan(plan_id="empty", workflow_id="wf_empty") + actions = execution_plan_to_actions(plan) + assert actions == [] + + def test_noeud_non_convertible_ignore(self): + """Un nœud inconnu ne bloque pas la conversion.""" + plan = ExecutionPlan(plan_id="p", workflow_id="wf") + plan.nodes = [ + ExecutionNode( + node_id="n1", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="OK"), + ), + ExecutionNode(node_id="n2", action_type="unknown_type"), + ExecutionNode( + node_id="n3", + action_type="type", + text="hello", + ), + ] + actions = execution_plan_to_actions(plan) + assert len(actions) == 2 + assert actions[0]["type"] == "click" + assert actions[1]["type"] == "type" + + +# ========================================================================= +# Injection dans la queue de replay +# ========================================================================= + + +class TestInjectPlanIntoQueue: + + def _make_simple_plan(self) -> ExecutionPlan: + plan = ExecutionPlan(plan_id="p_inj", workflow_id="wf_inj") + plan.nodes = [ + ExecutionNode( + node_id="n1", + action_type="click", + strategy_primary=ResolutionStrategy(method="ocr", target_text="Go"), + ), + ExecutionNode(node_id="n2", action_type="wait", duration_ms=500), + ] + return plan + + def test_injection_replace(self): + """Par défaut, la queue est remplacée.""" + plan = self._make_simple_plan() + queues: dict = defaultdict(list) + queues["sess_abc"] = [{"type": "click", "action_id": "old"}] + + actions = inject_plan_into_queue( + plan=plan, + session_id="sess_abc", + replay_queues=queues, + ) + + assert len(actions) == 2 + assert len(queues["sess_abc"]) == 2 + # L'ancienne action a été remplacée + assert all(a["action_id"] != "old" for a in queues["sess_abc"]) + + def test_injection_append(self): + """Avec replace=False, on ajoute aux actions existantes.""" + plan = self._make_simple_plan() + queues: dict = defaultdict(list) + queues["sess_abc"] = [{"type": "click", "action_id": "existing"}] + + inject_plan_into_queue( + plan=plan, + session_id="sess_abc", + replay_queues=queues, + replace=False, + ) + + assert len(queues["sess_abc"]) == 3 + assert queues["sess_abc"][0]["action_id"] == "existing" + + def test_injection_avec_lock(self): + """Le lock est respecté pendant l'injection.""" + plan = self._make_simple_plan() + queues: dict = defaultdict(list) + lock = threading.Lock() + + actions = inject_plan_into_queue( + plan=plan, + session_id="sess_x", + replay_queues=queues, + lock=lock, + ) + + assert len(actions) == 2 + assert len(queues["sess_x"]) == 2 + # Le lock est bien libéré après l'injection + assert lock.acquire(blocking=False) is True + lock.release() + + def test_injection_avec_variables(self): + """Les variables sont substituées lors de l'injection.""" + plan = ExecutionPlan(plan_id="p_var", workflow_id="wf_var") + plan.nodes = [ + ExecutionNode( + node_id="n1", + action_type="type", + text="{patient}", + variable_name="patient", + ), + ] + + queues: dict = defaultdict(list) + actions = inject_plan_into_queue( + plan=plan, + session_id="sess_v", + replay_queues=queues, + variables={"patient": "MARTIN"}, + ) + + assert actions[0]["text"] == "MARTIN" + assert queues["sess_v"][0]["text"] == "MARTIN" + + +# ========================================================================= +# Intégration : pipeline complet IR → Plan → Actions +# ========================================================================= + + +class TestFullPipelineV4: + """Teste le pipeline complet : WorkflowIR → ExecutionPlan → actions replay.""" + + def test_pipeline_complet_ir_vers_actions(self): + # 1. Construire un WorkflowIR + ir = WorkflowIR.new("Test pipeline V4", domain="generic") + ir.add_step( + "Ouvrir le fichier", + actions=[ + {"type": "click", "target": "bouton Ouvrir", "anchor_hint": "Ouvrir"}, + {"type": "wait", "duration_ms": 1000}, + ], + postcondition="La fenêtre Ouvrir est visible", + ) + ir.add_step( + "Saisir le nom", + actions=[ + {"type": "type", "text": "{nom_fichier}", "variable": True}, + {"type": "key_combo", "keys": ["enter"]}, + ], + ) + ir.add_variable("nom_fichier", description="Fichier", default="doc.pdf") + + # 2. Compiler → ExecutionPlan + compiler = ExecutionCompiler() + plan = compiler.compile(ir) + assert plan.total_nodes == 4 + + # 3. Convertir → actions replay + actions = execution_plan_to_actions(plan) + assert len(actions) == 4 + + types = [a["type"] for a in actions] + assert types == ["click", "wait", "type", "key_combo"] + + # Le clic a une stratégie OCR → by_text + click = actions[0] + assert click["visual_mode"] is True + assert click["target_spec"].get("by_text") == "Ouvrir" + + # Le type a substitué la variable depuis le plan + type_action = actions[2] + assert type_action["text"] == "doc.pdf" + + # Le key_combo a les touches + assert actions[3]["keys"] == ["enter"] + + def test_pipeline_avec_params_override(self): + """Les params passés à l'injection prévalent sur le plan.""" + ir = WorkflowIR.new("Variables override") + ir.add_step("Saisie", actions=[ + {"type": "type", "text": "{code}", "variable": True}, + ]) + ir.add_variable("code", default="DEFAULT") + + compiler = ExecutionCompiler() + plan = compiler.compile(ir) + + actions = execution_plan_to_actions( + plan, variables={"code": "RUNTIME"}, + ) + assert actions[0]["text"] == "RUNTIME" + + def test_pipeline_plan_serialise_et_recharge(self): + """Le plan peut être sérialisé/rechargé puis converti en actions.""" + ir = WorkflowIR.new("Roundtrip") + ir.add_step("X", actions=[ + {"type": "click", "target": "btn", "anchor_hint": "Valider"}, + ]) + compiler = ExecutionCompiler() + plan = compiler.compile(ir) + + json_str = plan.to_json() + plan2 = ExecutionPlan.from_json(json_str) + + actions = execution_plan_to_actions(plan2) + assert len(actions) == 1 + assert actions[0]["type"] == "click"