diff --git a/core/workflow/execution_compiler.py b/core/workflow/execution_compiler.py new file mode 100644 index 000000000..ada04a5c6 --- /dev/null +++ b/core/workflow/execution_compiler.py @@ -0,0 +1,280 @@ +# core/workflow/execution_compiler.py +""" +ExecutionCompiler — Compile un WorkflowIR en ExecutionPlan. + +Pièce maîtresse de l'architecture V4. +"Le LLM prépare et compile. Le runtime exécute." + +Le compilateur : +1. Prend chaque étape du WorkflowIR +2. Compile une stratégie de résolution pour chaque action (OCR > template > VLM) +3. Définit les timeouts, retries, fallbacks et recovery +4. Produit un ExecutionPlan déterministe et borné + +L'objectif : zéro VLM au runtime pour les cas normaux. +Le VLM est un exception handler, pas le chemin principal. + +Le compilateur utilise : +- Les données de l'enregistrement (crops, textes OCR) pour pré-compiler +- L'historique d'apprentissage (ReplayLearner) pour choisir la meilleure stratégie +- Le contexte métier (DomainContext) pour adapter les paramètres +""" + +import logging +import os +import time +import uuid +from typing import Any, Dict, List, Optional + +from .workflow_ir import WorkflowIR, Step, Action +from .execution_plan import ( + ExecutionPlan, ExecutionNode, ResolutionStrategy, SuccessCondition, +) + +logger = logging.getLogger(__name__) + +# Temps estimé par type d'action (ms) +_ACTION_TIME_ESTIMATES = { + "click": 200, # OCR lookup + clic + "type": 500, # Frappe char-by-char + "key_combo": 100, + "wait": 0, # Le duration_ms est dans l'action + "scroll": 200, +} + + +class ExecutionCompiler: + """Compile un WorkflowIR en ExecutionPlan. + + Usage : + compiler = ExecutionCompiler() + plan = compiler.compile(workflow_ir, target_machine="VM_Win11") + plan.save("data/plans/") + """ + + def __init__(self, learning_dir: str = ""): + self._learning_dir = learning_dir or "data/learning/replay_results" + + def compile( + self, + ir: WorkflowIR, + target_machine: str = "", + target_resolution: str = "1280x800", + params: Optional[Dict[str, str]] = None, + ) -> ExecutionPlan: + """Compiler un WorkflowIR en ExecutionPlan. + + Args: + ir: Le WorkflowIR à compiler + target_machine: Machine cible (pour adapter les stratégies) + target_resolution: Résolution de la machine cible + params: Variables à substituer + """ + t_start = time.time() + + plan = ExecutionPlan( + plan_id=f"plan_{uuid.uuid4().hex[:8]}", + workflow_id=ir.workflow_id, + version=ir.version, + created_at=time.time(), + domain=ir.domain, + target_machine=target_machine, + target_resolution=target_resolution, + variables=params or {v.name: v.default for v in ir.variables}, + ) + + # Consulter l'historique d'apprentissage + learned_strategies = self._load_learned_strategies() + + # Compiler chaque étape + for step in ir.steps: + nodes = self._compile_step(step, ir, learned_strategies) + plan.nodes.extend(nodes) + + # Statistiques de compilation + plan.total_nodes = len(plan.nodes) + plan.nodes_with_ocr = sum( + 1 for n in plan.nodes + if n.strategy_primary and n.strategy_primary.method == "ocr" + ) + plan.nodes_with_template = sum( + 1 for n in plan.nodes + if n.strategy_primary and n.strategy_primary.method == "template" + ) + plan.nodes_with_vlm = sum( + 1 for n in plan.nodes + if n.strategy_primary and n.strategy_primary.method == "vlm" + ) + plan.estimated_duration_s = sum( + _ACTION_TIME_ESTIMATES.get(n.action_type, 200) + n.duration_ms + for n in plan.nodes + ) / 1000.0 + + elapsed = time.time() - t_start + logger.info( + f"Compilation: {plan.total_nodes} nœuds en {elapsed:.1f}s — " + f"OCR={plan.nodes_with_ocr}, template={plan.nodes_with_template}, " + f"VLM={plan.nodes_with_vlm} (exception handler)" + ) + + return plan + + def _compile_step( + self, + step: Step, + ir: WorkflowIR, + learned: Dict[str, str], + ) -> List[ExecutionNode]: + """Compiler une étape en nœuds d'exécution.""" + nodes = [] + + for i, action in enumerate(step.actions): + node = self._compile_action( + action=action, + step=step, + action_index=i, + ir=ir, + learned=learned, + ) + nodes.append(node) + + return nodes + + def _compile_action( + self, + action: Action, + step: Step, + action_index: int, + ir: WorkflowIR, + learned: Dict[str, str], + ) -> ExecutionNode: + """Compiler une action en nœud d'exécution avec stratégie de résolution.""" + + node = ExecutionNode( + node_id=f"n_{step.step_id}_{action_index}", + action_type=action.type, + intent=step.intent, + step_id=step.step_id, + is_optional=step.is_optional, + ) + + if action.type == "click": + # Compiler les stratégies de résolution pour ce clic + node.strategy_primary, node.strategy_fallbacks = self._compile_click_resolution( + action, step, learned, + ) + node.timeout_ms = 10000 + node.max_retries = 2 + node.recovery_action = "escape" + + # Condition de succès basée sur la postcondition + if step.postcondition: + node.success_condition = SuccessCondition( + method="screen_changed", + description=step.postcondition, + ) + + elif action.type == "type": + node.text = action.text + node.variable_name = action.text.strip("{}") if action.variable else "" + node.timeout_ms = 5000 + node.max_retries = 0 # Pas de retry sur la frappe + node.recovery_action = "undo" + + elif action.type == "key_combo": + node.keys = action.keys + node.timeout_ms = 3000 + node.max_retries = 0 + node.recovery_action = "undo" + + elif action.type == "wait": + node.duration_ms = action.duration_ms or 1000 + node.timeout_ms = action.duration_ms + 2000 + node.max_retries = 0 + node.recovery_action = "none" + + elif action.type == "scroll": + node.timeout_ms = 3000 + node.max_retries = 0 + node.recovery_action = "none" + + return node + + def _compile_click_resolution( + self, + action: Action, + step: Step, + learned: Dict[str, str], + ) -> tuple: + """Compiler les stratégies de résolution pour un clic. + + Ordre de priorité : + 1. OCR exact (si texte connu) — 100ms, pixel-perfect + 2. Template matching (si crop disponible) — 10ms, même interface + 3. Position relative (si hint disponible) — instantané, fragile + 4. VLM (dernier recours) — 2-5s, exception handler + + Le learning peut réordonner si une stratégie a mieux marché avant. + """ + primary = None + fallbacks = [] + + target_text = action.anchor_hint or action.target + learned_method = learned.get(target_text, "") + + # Stratégie OCR — le texte visible est la meilleure ancre + if target_text: + ocr_strategy = ResolutionStrategy( + method="ocr", + target_text=target_text, + threshold=0.8, + ) + # Si le learning dit que l'OCR marche pour cette cible, c'est la primaire + if not learned_method or learned_method in ("ocr", "som_text_match", "hybrid_text_direct"): + primary = ocr_strategy + else: + fallbacks.append(ocr_strategy) + + # Stratégie template — le crop visuel de l'enregistrement + if action.anchor_hint: + template_strategy = ResolutionStrategy( + method="template", + target_text=action.anchor_hint, + threshold=0.85, + ) + if learned_method in ("anchor_template", "template_matching"): + primary = template_strategy + else: + fallbacks.append(template_strategy) + + # Stratégie VLM — exception handler (dernier recours) + vlm_description = action.target or step.intent + vlm_strategy = ResolutionStrategy( + method="vlm", + vlm_description=vlm_description, + threshold=0.6, + ) + fallbacks.append(vlm_strategy) + + # Si aucune primaire trouvée, utiliser le VLM + if primary is None: + if fallbacks: + primary = fallbacks.pop(0) + else: + primary = vlm_strategy + + return primary, fallbacks + + def _load_learned_strategies(self) -> Dict[str, str]: + """Charger les stratégies apprises (ReplayLearner).""" + try: + from agent_v0.server_v1.replay_learner import ReplayLearner + learner = ReplayLearner(learning_dir=self._learning_dir) + # Construire un mapping target → best_method depuis l'historique + strategies = {} + for outcome in learner._recent: + if outcome.success and outcome.resolution_method and outcome.target_description: + strategies[outcome.target_description] = outcome.resolution_method + return strategies + except Exception: + return {} diff --git a/core/workflow/execution_plan.py b/core/workflow/execution_plan.py new file mode 100644 index 000000000..038a8ff46 --- /dev/null +++ b/core/workflow/execution_plan.py @@ -0,0 +1,251 @@ +# core/workflow/execution_plan.py +""" +ExecutionPlan — Plan d'exécution strict, borné et versionné. + +C'est ce que le runtime exécute. Pas d'improvisation — tout est pré-compilé : +- chaque nœud a une stratégie de résolution primaire + fallbacks +- chaque nœud a un timeout, un retry policy, une condition de succès +- le VLM n'intervient qu'en exception handler (pas en chemin principal) + +Le runtime ne fait que : exécuter → observer → vérifier → suite ou fallback. + +Cycle : WorkflowIR → ExecutionCompiler → ExecutionPlan → Runtime +""" + +import json +import logging +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +@dataclass +class ResolutionStrategy: + """Stratégie de résolution visuelle pour un élément UI. + + Pré-compilée — le runtime n'a pas besoin du VLM pour résoudre. + """ + method: str # "ocr", "template", "position", "vlm" + target_text: str = "" # Texte à chercher (pour OCR) + anchor_b64: str = "" # Crop de référence (pour template matching) + zone: Dict[str, float] = field(default_factory=dict) # Zone de recherche {x_min, y_min, x_max, y_max} + position_hint: str = "" # "en haut à droite", "dans la barre des tâches" + vlm_description: str = "" # Description VLM (dernier recours) + threshold: float = 0.8 # Seuil de confiance + + def to_dict(self) -> Dict[str, Any]: + d = {"method": self.method} + if self.target_text: + d["target_text"] = self.target_text + if self.anchor_b64: + d["anchor_b64"] = self.anchor_b64[:50] + "..." # Tronqué pour la lisibilité + if self.zone: + d["zone"] = self.zone + if self.position_hint: + d["position_hint"] = self.position_hint + if self.vlm_description: + d["vlm_description"] = self.vlm_description + d["threshold"] = self.threshold + return d + + @classmethod + def from_dict(cls, d: Dict) -> "ResolutionStrategy": + return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) + + +@dataclass +class SuccessCondition: + """Condition de succès d'un nœud — comment vérifier que l'action a marché.""" + method: str = "screen_changed" # "screen_changed", "title_match", "text_visible", "none" + expected_title: str = "" # Titre fenêtre attendu après l'action + expected_text: str = "" # Texte qui doit apparaître + description: str = "" # Description pour le Critic VLM (exception handler) + + def to_dict(self) -> Dict[str, Any]: + d = {"method": self.method} + if self.expected_title: + d["expected_title"] = self.expected_title + if self.expected_text: + d["expected_text"] = self.expected_text + if self.description: + d["description"] = self.description + return d + + @classmethod + def from_dict(cls, d: Dict) -> "SuccessCondition": + return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__}) + + +@dataclass +class ExecutionNode: + """Nœud d'exécution — une action à exécuter avec sa stratégie complète.""" + node_id: str + action_type: str # click, type, key_combo, wait, scroll + intent: str = "" # Intention métier (pour le logging/audit) + + # Résolution visuelle pré-compilée + strategy_primary: Optional[ResolutionStrategy] = None + strategy_fallbacks: List[ResolutionStrategy] = field(default_factory=list) + + # Données de l'action + text: str = "" # Texte à taper + keys: List[str] = field(default_factory=list) + duration_ms: int = 0 + variable_name: str = "" # Si le texte est une variable + + # Bornes d'exécution + timeout_ms: int = 10000 # Timeout pour cette action + max_retries: int = 1 # Nombre de retries autorisés + retry_delay_ms: int = 2000 # Délai entre retries + + # Vérification + success_condition: Optional[SuccessCondition] = None + + # Recovery + recovery_action: str = "escape" # "escape", "undo", "close", "none" + + # Contexte + step_id: str = "" # Référence vers l'étape WorkflowIR + is_optional: bool = False + + def to_dict(self) -> Dict[str, Any]: + d = { + "node_id": self.node_id, + "action_type": self.action_type, + } + if self.intent: + d["intent"] = self.intent + if self.strategy_primary: + d["strategy_primary"] = self.strategy_primary.to_dict() + if self.strategy_fallbacks: + d["strategy_fallbacks"] = [s.to_dict() for s in self.strategy_fallbacks] + if self.text: + d["text"] = self.text + if self.keys: + d["keys"] = self.keys + if self.duration_ms: + d["duration_ms"] = self.duration_ms + if self.variable_name: + d["variable_name"] = self.variable_name + d["timeout_ms"] = self.timeout_ms + d["max_retries"] = self.max_retries + if self.success_condition: + d["success_condition"] = self.success_condition.to_dict() + d["recovery_action"] = self.recovery_action + if self.is_optional: + d["is_optional"] = True + return d + + @classmethod + def from_dict(cls, d: Dict) -> "ExecutionNode": + primary = ResolutionStrategy.from_dict(d["strategy_primary"]) if d.get("strategy_primary") else None + fallbacks = [ResolutionStrategy.from_dict(f) for f in d.get("strategy_fallbacks", [])] + success = SuccessCondition.from_dict(d["success_condition"]) if d.get("success_condition") else None + return cls( + node_id=d["node_id"], + action_type=d["action_type"], + intent=d.get("intent", ""), + strategy_primary=primary, + strategy_fallbacks=fallbacks, + text=d.get("text", ""), + keys=d.get("keys", []), + duration_ms=d.get("duration_ms", 0), + variable_name=d.get("variable_name", ""), + timeout_ms=d.get("timeout_ms", 10000), + max_retries=d.get("max_retries", 1), + retry_delay_ms=d.get("retry_delay_ms", 2000), + success_condition=success, + recovery_action=d.get("recovery_action", "escape"), + step_id=d.get("step_id", ""), + is_optional=d.get("is_optional", False), + ) + + +@dataclass +class ExecutionPlan: + """Plan d'exécution versionné — ce que le runtime exécute.""" + plan_id: str + workflow_id: str # Référence vers le WorkflowIR source + version: int = 1 + created_at: float = 0.0 + + # Nœuds d'exécution (séquence ordonnée) + nodes: List[ExecutionNode] = field(default_factory=list) + + # Variables à substituer avant exécution + variables: Dict[str, str] = field(default_factory=dict) + + # Configuration globale + domain: str = "generic" + target_machine: str = "" # Machine cible + target_resolution: str = "" # "1280x800", "1920x1080" + + # Métriques de compilation + total_nodes: int = 0 + nodes_with_ocr: int = 0 # Résolution OCR (rapide, précis) + nodes_with_template: int = 0 # Résolution template (rapide) + nodes_with_vlm: int = 0 # Résolution VLM (lent, dernier recours) + estimated_duration_s: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + "plan_id": self.plan_id, + "workflow_id": self.workflow_id, + "version": self.version, + "created_at": self.created_at, + "domain": self.domain, + "target_machine": self.target_machine, + "target_resolution": self.target_resolution, + "variables": self.variables, + "nodes": [n.to_dict() for n in self.nodes], + "stats": { + "total_nodes": self.total_nodes, + "nodes_with_ocr": self.nodes_with_ocr, + "nodes_with_template": self.nodes_with_template, + "nodes_with_vlm": self.nodes_with_vlm, + "estimated_duration_s": round(self.estimated_duration_s, 1), + }, + } + + def to_json(self, indent: int = 2) -> str: + return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent) + + @classmethod + def from_dict(cls, d: Dict) -> "ExecutionPlan": + nodes = [ExecutionNode.from_dict(n) for n in d.get("nodes", [])] + stats = d.get("stats", {}) + return cls( + plan_id=d["plan_id"], + workflow_id=d.get("workflow_id", ""), + version=d.get("version", 1), + created_at=d.get("created_at", 0), + domain=d.get("domain", "generic"), + target_machine=d.get("target_machine", ""), + target_resolution=d.get("target_resolution", ""), + variables=d.get("variables", {}), + nodes=nodes, + total_nodes=stats.get("total_nodes", len(nodes)), + nodes_with_ocr=stats.get("nodes_with_ocr", 0), + nodes_with_template=stats.get("nodes_with_template", 0), + nodes_with_vlm=stats.get("nodes_with_vlm", 0), + estimated_duration_s=stats.get("estimated_duration_s", 0), + ) + + @classmethod + def from_json(cls, json_str: str) -> "ExecutionPlan": + return cls.from_dict(json.loads(json_str)) + + def save(self, directory: str) -> Path: + dir_path = Path(directory) + dir_path.mkdir(parents=True, exist_ok=True) + file_path = dir_path / f"{self.plan_id}.json" + file_path.write_text(self.to_json(), encoding="utf-8") + return file_path + + @classmethod + def load(cls, file_path: str) -> "ExecutionPlan": + return cls.from_json(Path(file_path).read_text(encoding="utf-8")) diff --git a/tests/unit/test_execution_compiler.py b/tests/unit/test_execution_compiler.py new file mode 100644 index 000000000..80c8d81b8 --- /dev/null +++ b/tests/unit/test_execution_compiler.py @@ -0,0 +1,264 @@ +""" +Tests de l'ExecutionCompiler et de l'ExecutionPlan. + +Vérifie que : +- Le compilateur produit un plan déterministe depuis un WorkflowIR +- Les stratégies de résolution sont correctement compilées (OCR > template > VLM) +- Les timeouts, retries et recovery sont définis +- Le plan est sérialisable et versionné +""" + +import json +import shutil +import sys +import tempfile +from pathlib import Path + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.workflow.workflow_ir import WorkflowIR +from core.workflow.execution_plan import ExecutionPlan, ExecutionNode, ResolutionStrategy, SuccessCondition +from core.workflow.execution_compiler import ExecutionCompiler + + +# ========================================================================= +# ExecutionPlan — format et sérialisation +# ========================================================================= + + +class TestExecutionPlan: + + def test_serialisation_roundtrip(self): + plan = ExecutionPlan( + plan_id="plan_test", + workflow_id="wf_123", + version=1, + ) + plan.nodes.append(ExecutionNode( + node_id="n1", + action_type="click", + intent="Cliquer sur Enregistrer", + strategy_primary=ResolutionStrategy(method="ocr", target_text="Enregistrer"), + strategy_fallbacks=[ResolutionStrategy(method="vlm", vlm_description="bouton Enregistrer")], + success_condition=SuccessCondition(method="title_match", expected_title="Fichier sauvegardé"), + )) + + json_str = plan.to_json() + plan2 = ExecutionPlan.from_json(json_str) + + assert plan2.plan_id == "plan_test" + assert len(plan2.nodes) == 1 + assert plan2.nodes[0].strategy_primary.method == "ocr" + assert len(plan2.nodes[0].strategy_fallbacks) == 1 + assert plan2.nodes[0].success_condition.method == "title_match" + + def test_save_load(self): + tmpdir = tempfile.mkdtemp() + try: + plan = ExecutionPlan(plan_id="plan_save", workflow_id="wf_1") + plan.nodes.append(ExecutionNode(node_id="n1", action_type="click")) + path = plan.save(tmpdir) + + plan2 = ExecutionPlan.load(str(path)) + assert plan2.plan_id == "plan_save" + assert len(plan2.nodes) == 1 + finally: + shutil.rmtree(tmpdir) + + def test_node_avec_variable(self): + node = ExecutionNode( + node_id="n1", + action_type="type", + text="{patient}", + variable_name="patient", + ) + d = node.to_dict() + assert d["variable_name"] == "patient" + assert d["text"] == "{patient}" + + def test_node_timeout_et_retry(self): + node = ExecutionNode( + node_id="n1", + action_type="click", + timeout_ms=5000, + max_retries=3, + recovery_action="undo", + ) + assert node.timeout_ms == 5000 + assert node.max_retries == 3 + assert node.recovery_action == "undo" + + +# ========================================================================= +# ExecutionCompiler — compilation WorkflowIR → ExecutionPlan +# ========================================================================= + + +class TestExecutionCompiler: + + def _make_ir(self): + """Créer un WorkflowIR de test.""" + ir = WorkflowIR.new("Test workflow", domain="generic") + ir.add_step( + "Ouvrir le fichier", + actions=[ + {"type": "click", "target": "bouton Ouvrir", "anchor_hint": "Ouvrir"}, + {"type": "wait", "duration_ms": 2000}, + ], + precondition="L'application est ouverte", + postcondition="La fenêtre Ouvrir est affichée", + ) + ir.add_step( + "Saisir le nom", + actions=[ + {"type": "type", "text": "{nom_fichier}", "variable": True}, + {"type": "key_combo", "keys": ["enter"]}, + ], + ) + ir.add_variable("nom_fichier", description="Nom du fichier", default="rapport.pdf") + return ir + + def test_compilation_basique(self): + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + assert plan.workflow_id == ir.workflow_id + assert plan.total_nodes == 4 # click + wait + type + key_combo + assert plan.domain == "generic" + + def test_click_a_strategie_resolution(self): + """Un clic doit avoir une stratégie primaire et des fallbacks.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + click_nodes = [n for n in plan.nodes if n.action_type == "click"] + assert len(click_nodes) == 1 + + click = click_nodes[0] + assert click.strategy_primary is not None + assert click.strategy_primary.method in ("ocr", "template", "vlm") + assert len(click.strategy_fallbacks) >= 1 + + def test_ocr_est_prioritaire(self): + """Quand du texte est disponible, OCR est la stratégie primaire.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + click = [n for n in plan.nodes if n.action_type == "click"][0] + assert click.strategy_primary.method == "ocr" + assert click.strategy_primary.target_text == "Ouvrir" + + def test_vlm_est_fallback(self): + """Le VLM est toujours en dernier fallback (exception handler).""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + click = [n for n in plan.nodes if n.action_type == "click"][0] + vlm_fallbacks = [f for f in click.strategy_fallbacks if f.method == "vlm"] + assert len(vlm_fallbacks) >= 1 + + def test_type_a_variable(self): + """Une action type avec variable a le bon variable_name.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + type_nodes = [n for n in plan.nodes if n.action_type == "type"] + assert len(type_nodes) == 1 + assert type_nodes[0].variable_name == "nom_fichier" + + def test_wait_a_duration(self): + """Un wait a la bonne durée.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + wait_nodes = [n for n in plan.nodes if n.action_type == "wait"] + assert len(wait_nodes) == 1 + assert wait_nodes[0].duration_ms == 2000 + + def test_click_a_recovery(self): + """Un clic a une politique de recovery.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + click = [n for n in plan.nodes if n.action_type == "click"][0] + assert click.recovery_action in ("escape", "undo", "close", "none") + assert click.max_retries >= 1 + + def test_postcondition_devient_success_condition(self): + """La postcondition du WorkflowIR devient la condition de succès du nœud.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + click = [n for n in plan.nodes if n.action_type == "click"][0] + assert click.success_condition is not None + assert "Ouvrir" in click.success_condition.description + + def test_statistiques_compilation(self): + """Les statistiques de compilation sont correctes.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + assert plan.total_nodes == 4 + assert plan.nodes_with_ocr >= 1 + assert plan.estimated_duration_s > 0 + + def test_variables_dans_le_plan(self): + """Les variables du WorkflowIR sont dans le plan avec leurs valeurs par défaut.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + assert "nom_fichier" in plan.variables + assert plan.variables["nom_fichier"] == "rapport.pdf" + + def test_params_override_defaults(self): + """Les params passés au compile écrasent les valeurs par défaut.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir, params={"nom_fichier": "facture_mars.pdf"}) + + assert plan.variables["nom_fichier"] == "facture_mars.pdf" + + def test_plan_json_roundtrip(self): + """Compiler → JSON → recharger → même plan.""" + compiler = ExecutionCompiler() + ir = self._make_ir() + plan = compiler.compile(ir) + + json_str = plan.to_json() + plan2 = ExecutionPlan.from_json(json_str) + + assert plan2.total_nodes == plan.total_nodes + assert plan2.workflow_id == plan.workflow_id + assert len(plan2.nodes) == len(plan.nodes) + + def test_compilation_workflow_vide(self): + """Un workflow vide produit un plan vide.""" + compiler = ExecutionCompiler() + ir = WorkflowIR.new("Vide") + plan = compiler.compile(ir) + + assert plan.total_nodes == 0 + assert plan.nodes == [] + + def test_plusieurs_domaines(self): + """Le compilateur fonctionne pour différents domaines.""" + compiler = ExecutionCompiler() + for domain in ["tim_codage", "comptabilite", "rh_paie", "generic"]: + ir = WorkflowIR.new("Test", domain=domain) + ir.add_step("Action", actions=[{"type": "click", "target": "bouton"}]) + plan = compiler.compile(ir) + assert plan.domain == domain