feat: ExecutionCompiler — compile WorkflowIR en plan d'exécution borné

Pièce maîtresse de l'architecture V4 :
- ExecutionPlan : nœuds avec stratégies de résolution pré-compilées
- ExecutionCompiler : WorkflowIR → ExecutionPlan déterministe
- Résolution : OCR (primaire, 100ms) > template > VLM (exception handler)
- Chaque nœud : timeout, max_retries, recovery, condition de succès
- Variables substituables, versionné, sérialisable JSON
- 18 tests (compilation, stratégies, fallbacks, variables, roundtrip)

"Le LLM compile. Le runtime exécute."

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-09 22:21:40 +02:00
parent cc673755f7
commit bffcfb2db3
3 changed files with 795 additions and 0 deletions

View File

@@ -0,0 +1,280 @@
# core/workflow/execution_compiler.py
"""
ExecutionCompiler — Compile un WorkflowIR en ExecutionPlan.
Pièce maîtresse de l'architecture V4.
"Le LLM prépare et compile. Le runtime exécute."
Le compilateur :
1. Prend chaque étape du WorkflowIR
2. Compile une stratégie de résolution pour chaque action (OCR > template > VLM)
3. Définit les timeouts, retries, fallbacks et recovery
4. Produit un ExecutionPlan déterministe et borné
L'objectif : zéro VLM au runtime pour les cas normaux.
Le VLM est un exception handler, pas le chemin principal.
Le compilateur utilise :
- Les données de l'enregistrement (crops, textes OCR) pour pré-compiler
- L'historique d'apprentissage (ReplayLearner) pour choisir la meilleure stratégie
- Le contexte métier (DomainContext) pour adapter les paramètres
"""
import logging
import os
import time
import uuid
from typing import Any, Dict, List, Optional
from .workflow_ir import WorkflowIR, Step, Action
from .execution_plan import (
ExecutionPlan, ExecutionNode, ResolutionStrategy, SuccessCondition,
)
logger = logging.getLogger(__name__)
# Temps estimé par type d'action (ms)
_ACTION_TIME_ESTIMATES = {
"click": 200, # OCR lookup + clic
"type": 500, # Frappe char-by-char
"key_combo": 100,
"wait": 0, # Le duration_ms est dans l'action
"scroll": 200,
}
class ExecutionCompiler:
"""Compile un WorkflowIR en ExecutionPlan.
Usage :
compiler = ExecutionCompiler()
plan = compiler.compile(workflow_ir, target_machine="VM_Win11")
plan.save("data/plans/")
"""
def __init__(self, learning_dir: str = ""):
self._learning_dir = learning_dir or "data/learning/replay_results"
def compile(
self,
ir: WorkflowIR,
target_machine: str = "",
target_resolution: str = "1280x800",
params: Optional[Dict[str, str]] = None,
) -> ExecutionPlan:
"""Compiler un WorkflowIR en ExecutionPlan.
Args:
ir: Le WorkflowIR à compiler
target_machine: Machine cible (pour adapter les stratégies)
target_resolution: Résolution de la machine cible
params: Variables à substituer
"""
t_start = time.time()
plan = ExecutionPlan(
plan_id=f"plan_{uuid.uuid4().hex[:8]}",
workflow_id=ir.workflow_id,
version=ir.version,
created_at=time.time(),
domain=ir.domain,
target_machine=target_machine,
target_resolution=target_resolution,
variables=params or {v.name: v.default for v in ir.variables},
)
# Consulter l'historique d'apprentissage
learned_strategies = self._load_learned_strategies()
# Compiler chaque étape
for step in ir.steps:
nodes = self._compile_step(step, ir, learned_strategies)
plan.nodes.extend(nodes)
# Statistiques de compilation
plan.total_nodes = len(plan.nodes)
plan.nodes_with_ocr = sum(
1 for n in plan.nodes
if n.strategy_primary and n.strategy_primary.method == "ocr"
)
plan.nodes_with_template = sum(
1 for n in plan.nodes
if n.strategy_primary and n.strategy_primary.method == "template"
)
plan.nodes_with_vlm = sum(
1 for n in plan.nodes
if n.strategy_primary and n.strategy_primary.method == "vlm"
)
plan.estimated_duration_s = sum(
_ACTION_TIME_ESTIMATES.get(n.action_type, 200) + n.duration_ms
for n in plan.nodes
) / 1000.0
elapsed = time.time() - t_start
logger.info(
f"Compilation: {plan.total_nodes} nœuds en {elapsed:.1f}s — "
f"OCR={plan.nodes_with_ocr}, template={plan.nodes_with_template}, "
f"VLM={plan.nodes_with_vlm} (exception handler)"
)
return plan
def _compile_step(
self,
step: Step,
ir: WorkflowIR,
learned: Dict[str, str],
) -> List[ExecutionNode]:
"""Compiler une étape en nœuds d'exécution."""
nodes = []
for i, action in enumerate(step.actions):
node = self._compile_action(
action=action,
step=step,
action_index=i,
ir=ir,
learned=learned,
)
nodes.append(node)
return nodes
def _compile_action(
self,
action: Action,
step: Step,
action_index: int,
ir: WorkflowIR,
learned: Dict[str, str],
) -> ExecutionNode:
"""Compiler une action en nœud d'exécution avec stratégie de résolution."""
node = ExecutionNode(
node_id=f"n_{step.step_id}_{action_index}",
action_type=action.type,
intent=step.intent,
step_id=step.step_id,
is_optional=step.is_optional,
)
if action.type == "click":
# Compiler les stratégies de résolution pour ce clic
node.strategy_primary, node.strategy_fallbacks = self._compile_click_resolution(
action, step, learned,
)
node.timeout_ms = 10000
node.max_retries = 2
node.recovery_action = "escape"
# Condition de succès basée sur la postcondition
if step.postcondition:
node.success_condition = SuccessCondition(
method="screen_changed",
description=step.postcondition,
)
elif action.type == "type":
node.text = action.text
node.variable_name = action.text.strip("{}") if action.variable else ""
node.timeout_ms = 5000
node.max_retries = 0 # Pas de retry sur la frappe
node.recovery_action = "undo"
elif action.type == "key_combo":
node.keys = action.keys
node.timeout_ms = 3000
node.max_retries = 0
node.recovery_action = "undo"
elif action.type == "wait":
node.duration_ms = action.duration_ms or 1000
node.timeout_ms = action.duration_ms + 2000
node.max_retries = 0
node.recovery_action = "none"
elif action.type == "scroll":
node.timeout_ms = 3000
node.max_retries = 0
node.recovery_action = "none"
return node
def _compile_click_resolution(
self,
action: Action,
step: Step,
learned: Dict[str, str],
) -> tuple:
"""Compiler les stratégies de résolution pour un clic.
Ordre de priorité :
1. OCR exact (si texte connu) — 100ms, pixel-perfect
2. Template matching (si crop disponible) — 10ms, même interface
3. Position relative (si hint disponible) — instantané, fragile
4. VLM (dernier recours) — 2-5s, exception handler
Le learning peut réordonner si une stratégie a mieux marché avant.
"""
primary = None
fallbacks = []
target_text = action.anchor_hint or action.target
learned_method = learned.get(target_text, "")
# Stratégie OCR — le texte visible est la meilleure ancre
if target_text:
ocr_strategy = ResolutionStrategy(
method="ocr",
target_text=target_text,
threshold=0.8,
)
# Si le learning dit que l'OCR marche pour cette cible, c'est la primaire
if not learned_method or learned_method in ("ocr", "som_text_match", "hybrid_text_direct"):
primary = ocr_strategy
else:
fallbacks.append(ocr_strategy)
# Stratégie template — le crop visuel de l'enregistrement
if action.anchor_hint:
template_strategy = ResolutionStrategy(
method="template",
target_text=action.anchor_hint,
threshold=0.85,
)
if learned_method in ("anchor_template", "template_matching"):
primary = template_strategy
else:
fallbacks.append(template_strategy)
# Stratégie VLM — exception handler (dernier recours)
vlm_description = action.target or step.intent
vlm_strategy = ResolutionStrategy(
method="vlm",
vlm_description=vlm_description,
threshold=0.6,
)
fallbacks.append(vlm_strategy)
# Si aucune primaire trouvée, utiliser le VLM
if primary is None:
if fallbacks:
primary = fallbacks.pop(0)
else:
primary = vlm_strategy
return primary, fallbacks
def _load_learned_strategies(self) -> Dict[str, str]:
"""Charger les stratégies apprises (ReplayLearner)."""
try:
from agent_v0.server_v1.replay_learner import ReplayLearner
learner = ReplayLearner(learning_dir=self._learning_dir)
# Construire un mapping target → best_method depuis l'historique
strategies = {}
for outcome in learner._recent:
if outcome.success and outcome.resolution_method and outcome.target_description:
strategies[outcome.target_description] = outcome.resolution_method
return strategies
except Exception:
return {}

View File

@@ -0,0 +1,251 @@
# core/workflow/execution_plan.py
"""
ExecutionPlan — Plan d'exécution strict, borné et versionné.
C'est ce que le runtime exécute. Pas d'improvisation — tout est pré-compilé :
- chaque nœud a une stratégie de résolution primaire + fallbacks
- chaque nœud a un timeout, un retry policy, une condition de succès
- le VLM n'intervient qu'en exception handler (pas en chemin principal)
Le runtime ne fait que : exécuter → observer → vérifier → suite ou fallback.
Cycle : WorkflowIR → ExecutionCompiler → ExecutionPlan → Runtime
"""
import json
import logging
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
@dataclass
class ResolutionStrategy:
"""Stratégie de résolution visuelle pour un élément UI.
Pré-compilée — le runtime n'a pas besoin du VLM pour résoudre.
"""
method: str # "ocr", "template", "position", "vlm"
target_text: str = "" # Texte à chercher (pour OCR)
anchor_b64: str = "" # Crop de référence (pour template matching)
zone: Dict[str, float] = field(default_factory=dict) # Zone de recherche {x_min, y_min, x_max, y_max}
position_hint: str = "" # "en haut à droite", "dans la barre des tâches"
vlm_description: str = "" # Description VLM (dernier recours)
threshold: float = 0.8 # Seuil de confiance
def to_dict(self) -> Dict[str, Any]:
d = {"method": self.method}
if self.target_text:
d["target_text"] = self.target_text
if self.anchor_b64:
d["anchor_b64"] = self.anchor_b64[:50] + "..." # Tronqué pour la lisibilité
if self.zone:
d["zone"] = self.zone
if self.position_hint:
d["position_hint"] = self.position_hint
if self.vlm_description:
d["vlm_description"] = self.vlm_description
d["threshold"] = self.threshold
return d
@classmethod
def from_dict(cls, d: Dict) -> "ResolutionStrategy":
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
@dataclass
class SuccessCondition:
"""Condition de succès d'un nœud — comment vérifier que l'action a marché."""
method: str = "screen_changed" # "screen_changed", "title_match", "text_visible", "none"
expected_title: str = "" # Titre fenêtre attendu après l'action
expected_text: str = "" # Texte qui doit apparaître
description: str = "" # Description pour le Critic VLM (exception handler)
def to_dict(self) -> Dict[str, Any]:
d = {"method": self.method}
if self.expected_title:
d["expected_title"] = self.expected_title
if self.expected_text:
d["expected_text"] = self.expected_text
if self.description:
d["description"] = self.description
return d
@classmethod
def from_dict(cls, d: Dict) -> "SuccessCondition":
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
@dataclass
class ExecutionNode:
"""Nœud d'exécution — une action à exécuter avec sa stratégie complète."""
node_id: str
action_type: str # click, type, key_combo, wait, scroll
intent: str = "" # Intention métier (pour le logging/audit)
# Résolution visuelle pré-compilée
strategy_primary: Optional[ResolutionStrategy] = None
strategy_fallbacks: List[ResolutionStrategy] = field(default_factory=list)
# Données de l'action
text: str = "" # Texte à taper
keys: List[str] = field(default_factory=list)
duration_ms: int = 0
variable_name: str = "" # Si le texte est une variable
# Bornes d'exécution
timeout_ms: int = 10000 # Timeout pour cette action
max_retries: int = 1 # Nombre de retries autorisés
retry_delay_ms: int = 2000 # Délai entre retries
# Vérification
success_condition: Optional[SuccessCondition] = None
# Recovery
recovery_action: str = "escape" # "escape", "undo", "close", "none"
# Contexte
step_id: str = "" # Référence vers l'étape WorkflowIR
is_optional: bool = False
def to_dict(self) -> Dict[str, Any]:
d = {
"node_id": self.node_id,
"action_type": self.action_type,
}
if self.intent:
d["intent"] = self.intent
if self.strategy_primary:
d["strategy_primary"] = self.strategy_primary.to_dict()
if self.strategy_fallbacks:
d["strategy_fallbacks"] = [s.to_dict() for s in self.strategy_fallbacks]
if self.text:
d["text"] = self.text
if self.keys:
d["keys"] = self.keys
if self.duration_ms:
d["duration_ms"] = self.duration_ms
if self.variable_name:
d["variable_name"] = self.variable_name
d["timeout_ms"] = self.timeout_ms
d["max_retries"] = self.max_retries
if self.success_condition:
d["success_condition"] = self.success_condition.to_dict()
d["recovery_action"] = self.recovery_action
if self.is_optional:
d["is_optional"] = True
return d
@classmethod
def from_dict(cls, d: Dict) -> "ExecutionNode":
primary = ResolutionStrategy.from_dict(d["strategy_primary"]) if d.get("strategy_primary") else None
fallbacks = [ResolutionStrategy.from_dict(f) for f in d.get("strategy_fallbacks", [])]
success = SuccessCondition.from_dict(d["success_condition"]) if d.get("success_condition") else None
return cls(
node_id=d["node_id"],
action_type=d["action_type"],
intent=d.get("intent", ""),
strategy_primary=primary,
strategy_fallbacks=fallbacks,
text=d.get("text", ""),
keys=d.get("keys", []),
duration_ms=d.get("duration_ms", 0),
variable_name=d.get("variable_name", ""),
timeout_ms=d.get("timeout_ms", 10000),
max_retries=d.get("max_retries", 1),
retry_delay_ms=d.get("retry_delay_ms", 2000),
success_condition=success,
recovery_action=d.get("recovery_action", "escape"),
step_id=d.get("step_id", ""),
is_optional=d.get("is_optional", False),
)
@dataclass
class ExecutionPlan:
"""Plan d'exécution versionné — ce que le runtime exécute."""
plan_id: str
workflow_id: str # Référence vers le WorkflowIR source
version: int = 1
created_at: float = 0.0
# Nœuds d'exécution (séquence ordonnée)
nodes: List[ExecutionNode] = field(default_factory=list)
# Variables à substituer avant exécution
variables: Dict[str, str] = field(default_factory=dict)
# Configuration globale
domain: str = "generic"
target_machine: str = "" # Machine cible
target_resolution: str = "" # "1280x800", "1920x1080"
# Métriques de compilation
total_nodes: int = 0
nodes_with_ocr: int = 0 # Résolution OCR (rapide, précis)
nodes_with_template: int = 0 # Résolution template (rapide)
nodes_with_vlm: int = 0 # Résolution VLM (lent, dernier recours)
estimated_duration_s: float = 0.0
def to_dict(self) -> Dict[str, Any]:
return {
"plan_id": self.plan_id,
"workflow_id": self.workflow_id,
"version": self.version,
"created_at": self.created_at,
"domain": self.domain,
"target_machine": self.target_machine,
"target_resolution": self.target_resolution,
"variables": self.variables,
"nodes": [n.to_dict() for n in self.nodes],
"stats": {
"total_nodes": self.total_nodes,
"nodes_with_ocr": self.nodes_with_ocr,
"nodes_with_template": self.nodes_with_template,
"nodes_with_vlm": self.nodes_with_vlm,
"estimated_duration_s": round(self.estimated_duration_s, 1),
},
}
def to_json(self, indent: int = 2) -> str:
return json.dumps(self.to_dict(), ensure_ascii=False, indent=indent)
@classmethod
def from_dict(cls, d: Dict) -> "ExecutionPlan":
nodes = [ExecutionNode.from_dict(n) for n in d.get("nodes", [])]
stats = d.get("stats", {})
return cls(
plan_id=d["plan_id"],
workflow_id=d.get("workflow_id", ""),
version=d.get("version", 1),
created_at=d.get("created_at", 0),
domain=d.get("domain", "generic"),
target_machine=d.get("target_machine", ""),
target_resolution=d.get("target_resolution", ""),
variables=d.get("variables", {}),
nodes=nodes,
total_nodes=stats.get("total_nodes", len(nodes)),
nodes_with_ocr=stats.get("nodes_with_ocr", 0),
nodes_with_template=stats.get("nodes_with_template", 0),
nodes_with_vlm=stats.get("nodes_with_vlm", 0),
estimated_duration_s=stats.get("estimated_duration_s", 0),
)
@classmethod
def from_json(cls, json_str: str) -> "ExecutionPlan":
return cls.from_dict(json.loads(json_str))
def save(self, directory: str) -> Path:
dir_path = Path(directory)
dir_path.mkdir(parents=True, exist_ok=True)
file_path = dir_path / f"{self.plan_id}.json"
file_path.write_text(self.to_json(), encoding="utf-8")
return file_path
@classmethod
def load(cls, file_path: str) -> "ExecutionPlan":
return cls.from_json(Path(file_path).read_text(encoding="utf-8"))

View File

@@ -0,0 +1,264 @@
"""
Tests de l'ExecutionCompiler et de l'ExecutionPlan.
Vérifie que :
- Le compilateur produit un plan déterministe depuis un WorkflowIR
- Les stratégies de résolution sont correctement compilées (OCR > template > VLM)
- Les timeouts, retries et recovery sont définis
- Le plan est sérialisable et versionné
"""
import json
import shutil
import sys
import tempfile
from pathlib import Path
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from core.workflow.workflow_ir import WorkflowIR
from core.workflow.execution_plan import ExecutionPlan, ExecutionNode, ResolutionStrategy, SuccessCondition
from core.workflow.execution_compiler import ExecutionCompiler
# =========================================================================
# ExecutionPlan — format et sérialisation
# =========================================================================
class TestExecutionPlan:
def test_serialisation_roundtrip(self):
plan = ExecutionPlan(
plan_id="plan_test",
workflow_id="wf_123",
version=1,
)
plan.nodes.append(ExecutionNode(
node_id="n1",
action_type="click",
intent="Cliquer sur Enregistrer",
strategy_primary=ResolutionStrategy(method="ocr", target_text="Enregistrer"),
strategy_fallbacks=[ResolutionStrategy(method="vlm", vlm_description="bouton Enregistrer")],
success_condition=SuccessCondition(method="title_match", expected_title="Fichier sauvegardé"),
))
json_str = plan.to_json()
plan2 = ExecutionPlan.from_json(json_str)
assert plan2.plan_id == "plan_test"
assert len(plan2.nodes) == 1
assert plan2.nodes[0].strategy_primary.method == "ocr"
assert len(plan2.nodes[0].strategy_fallbacks) == 1
assert plan2.nodes[0].success_condition.method == "title_match"
def test_save_load(self):
tmpdir = tempfile.mkdtemp()
try:
plan = ExecutionPlan(plan_id="plan_save", workflow_id="wf_1")
plan.nodes.append(ExecutionNode(node_id="n1", action_type="click"))
path = plan.save(tmpdir)
plan2 = ExecutionPlan.load(str(path))
assert plan2.plan_id == "plan_save"
assert len(plan2.nodes) == 1
finally:
shutil.rmtree(tmpdir)
def test_node_avec_variable(self):
node = ExecutionNode(
node_id="n1",
action_type="type",
text="{patient}",
variable_name="patient",
)
d = node.to_dict()
assert d["variable_name"] == "patient"
assert d["text"] == "{patient}"
def test_node_timeout_et_retry(self):
node = ExecutionNode(
node_id="n1",
action_type="click",
timeout_ms=5000,
max_retries=3,
recovery_action="undo",
)
assert node.timeout_ms == 5000
assert node.max_retries == 3
assert node.recovery_action == "undo"
# =========================================================================
# ExecutionCompiler — compilation WorkflowIR → ExecutionPlan
# =========================================================================
class TestExecutionCompiler:
def _make_ir(self):
"""Créer un WorkflowIR de test."""
ir = WorkflowIR.new("Test workflow", domain="generic")
ir.add_step(
"Ouvrir le fichier",
actions=[
{"type": "click", "target": "bouton Ouvrir", "anchor_hint": "Ouvrir"},
{"type": "wait", "duration_ms": 2000},
],
precondition="L'application est ouverte",
postcondition="La fenêtre Ouvrir est affichée",
)
ir.add_step(
"Saisir le nom",
actions=[
{"type": "type", "text": "{nom_fichier}", "variable": True},
{"type": "key_combo", "keys": ["enter"]},
],
)
ir.add_variable("nom_fichier", description="Nom du fichier", default="rapport.pdf")
return ir
def test_compilation_basique(self):
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
assert plan.workflow_id == ir.workflow_id
assert plan.total_nodes == 4 # click + wait + type + key_combo
assert plan.domain == "generic"
def test_click_a_strategie_resolution(self):
"""Un clic doit avoir une stratégie primaire et des fallbacks."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
click_nodes = [n for n in plan.nodes if n.action_type == "click"]
assert len(click_nodes) == 1
click = click_nodes[0]
assert click.strategy_primary is not None
assert click.strategy_primary.method in ("ocr", "template", "vlm")
assert len(click.strategy_fallbacks) >= 1
def test_ocr_est_prioritaire(self):
"""Quand du texte est disponible, OCR est la stratégie primaire."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
click = [n for n in plan.nodes if n.action_type == "click"][0]
assert click.strategy_primary.method == "ocr"
assert click.strategy_primary.target_text == "Ouvrir"
def test_vlm_est_fallback(self):
"""Le VLM est toujours en dernier fallback (exception handler)."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
click = [n for n in plan.nodes if n.action_type == "click"][0]
vlm_fallbacks = [f for f in click.strategy_fallbacks if f.method == "vlm"]
assert len(vlm_fallbacks) >= 1
def test_type_a_variable(self):
"""Une action type avec variable a le bon variable_name."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
type_nodes = [n for n in plan.nodes if n.action_type == "type"]
assert len(type_nodes) == 1
assert type_nodes[0].variable_name == "nom_fichier"
def test_wait_a_duration(self):
"""Un wait a la bonne durée."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
wait_nodes = [n for n in plan.nodes if n.action_type == "wait"]
assert len(wait_nodes) == 1
assert wait_nodes[0].duration_ms == 2000
def test_click_a_recovery(self):
"""Un clic a une politique de recovery."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
click = [n for n in plan.nodes if n.action_type == "click"][0]
assert click.recovery_action in ("escape", "undo", "close", "none")
assert click.max_retries >= 1
def test_postcondition_devient_success_condition(self):
"""La postcondition du WorkflowIR devient la condition de succès du nœud."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
click = [n for n in plan.nodes if n.action_type == "click"][0]
assert click.success_condition is not None
assert "Ouvrir" in click.success_condition.description
def test_statistiques_compilation(self):
"""Les statistiques de compilation sont correctes."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
assert plan.total_nodes == 4
assert plan.nodes_with_ocr >= 1
assert plan.estimated_duration_s > 0
def test_variables_dans_le_plan(self):
"""Les variables du WorkflowIR sont dans le plan avec leurs valeurs par défaut."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
assert "nom_fichier" in plan.variables
assert plan.variables["nom_fichier"] == "rapport.pdf"
def test_params_override_defaults(self):
"""Les params passés au compile écrasent les valeurs par défaut."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir, params={"nom_fichier": "facture_mars.pdf"})
assert plan.variables["nom_fichier"] == "facture_mars.pdf"
def test_plan_json_roundtrip(self):
"""Compiler → JSON → recharger → même plan."""
compiler = ExecutionCompiler()
ir = self._make_ir()
plan = compiler.compile(ir)
json_str = plan.to_json()
plan2 = ExecutionPlan.from_json(json_str)
assert plan2.total_nodes == plan.total_nodes
assert plan2.workflow_id == plan.workflow_id
assert len(plan2.nodes) == len(plan.nodes)
def test_compilation_workflow_vide(self):
"""Un workflow vide produit un plan vide."""
compiler = ExecutionCompiler()
ir = WorkflowIR.new("Vide")
plan = compiler.compile(ir)
assert plan.total_nodes == 0
assert plan.nodes == []
def test_plusieurs_domaines(self):
"""Le compilateur fonctionne pour différents domaines."""
compiler = ExecutionCompiler()
for domain in ["tim_codage", "comptabilite", "rh_paie", "generic"]:
ir = WorkflowIR.new("Test", domain=domain)
ir.add_step("Action", actions=[{"type": "click", "target": "bouton"}])
plan = compiler.compile(ir)
assert plan.domain == domain