feat: WorkflowRunner, matching sémantique et replay distant (P0-4, P0-6, P0-7)

P0-4: WorkflowRunner — orchestrateur de replay intelligent
- Boucle capture → match FAISS → résolution sémantique → exécution
- Mode dry_run, substitution de variables, anti-boucle (max 200 steps)
- Découplé de pyautogui via executor_callback

P0-6: Unification des répertoires workflows
- SemanticMatcher scanne data/workflows/ + data/training/workflows/
- Auto-reload sur changement de répertoire (60s)

P0-7: Matching sémantique via Ollama
- Pré-filtrage Jaccard + re-ranking LLM (qwen2.5:7b)
- Score final : 40% Jaccard + 60% LLM, fallback si Ollama indisponible

Agent Chat: exécution distante via streaming server
- POST http://localhost:5005/api/v1/traces/stream/replay
- Fallback sur exécution locale si serveur indisponible

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-14 11:23:33 +01:00
parent de779af5a1
commit 148321dffd
4 changed files with 1615 additions and 144 deletions

View File

@@ -7,6 +7,7 @@ Provides classes for executing workflow actions automatically.
from .action_executor import ActionExecutor
from .target_resolver import TargetResolver, ResolvedTarget
from .error_handler import ErrorHandler, ErrorType, RecoveryStrategy
from .workflow_runner import WorkflowRunner, RunResult, RunStatus, RunnerConfig
# Import tardif pour éviter import circulaire avec pipeline
def _get_execution_loop():
@@ -14,11 +15,15 @@ def _get_execution_loop():
return ExecutionLoop, ExecutionMode, ExecutionState, create_execution_loop
__all__ = [
'ActionExecutor',
'TargetResolver',
'ActionExecutor',
'TargetResolver',
'ResolvedTarget',
'ErrorHandler',
'ErrorType',
'RecoveryStrategy',
'WorkflowRunner',
'RunResult',
'RunStatus',
'RunnerConfig',
# ExecutionLoop accessible via import direct du module
]

View File

@@ -0,0 +1,920 @@
"""
WorkflowRunner — Orchestrateur de replay de workflows appris
Exécute un Workflow du début à la fin en utilisant la compréhension sémantique
de l'UI plutôt que de simples coordonnées X/Y.
Boucle principale :
1. Capturer l'écran
2. Analyser via ScreenAnalyzer → ScreenState
3. Matcher l'état courant contre les noeuds du workflow (CLIP/FAISS)
4. Choisir l'edge sortant et résoudre la cible sémantiquement
5. Exécuter l'action via le callback (local ou distant)
6. Attendre la stabilisation de l'écran
7. Vérifier qu'on a atteint le noeud suivant
8. Boucler jusqu'au noeud END
Le Runner est découplé de l'exécution physique : il utilise un executor_callback
qui peut être pyautogui (local) ou une commande API (distant).
Auteur : Dom, Alice Kiro
Date : 14 mars 2026
"""
import hashlib
import logging
import re
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional
import numpy as np
from PIL import Image
from ..models.workflow_graph import Action, Workflow, WorkflowEdge, WorkflowNode
logger = logging.getLogger(__name__)
# =============================================================================
# Résultat d'exécution
# =============================================================================
class RunStatus(str, Enum):
"""Statut final d'une exécution de workflow."""
SUCCESS = "success"
FAILED = "failed"
TIMEOUT = "timeout"
ABORTED = "aborted"
DRY_RUN = "dry_run"
@dataclass
class StepResult:
"""Résultat d'une étape individuelle du workflow."""
node_id: str
edge_id: Optional[str] = None
action_type: Optional[str] = None
action_details: Optional[Dict[str, Any]] = None
match_confidence: float = 0.0
duration_seconds: float = 0.0
success: bool = True
error: Optional[str] = None
retries: int = 0
@dataclass
class RunResult:
"""Résultat complet d'une exécution de workflow."""
success: bool
status: RunStatus = RunStatus.FAILED
nodes_visited: List[str] = field(default_factory=list)
actions_executed: List[Dict[str, Any]] = field(default_factory=list)
errors: List[str] = field(default_factory=list)
duration_seconds: float = 0.0
steps: List[StepResult] = field(default_factory=list)
def to_dict(self) -> Dict[str, Any]:
"""Sérialiser le résultat."""
return {
"success": self.success,
"status": self.status.value,
"nodes_visited": self.nodes_visited,
"actions_executed": self.actions_executed,
"errors": self.errors,
"duration_seconds": round(self.duration_seconds, 3),
"steps_count": len(self.steps),
"steps": [
{
"node_id": s.node_id,
"edge_id": s.edge_id,
"action_type": s.action_type,
"match_confidence": round(s.match_confidence, 4),
"duration_seconds": round(s.duration_seconds, 3),
"success": s.success,
"error": s.error,
"retries": s.retries,
}
for s in self.steps
],
}
# =============================================================================
# Configuration du Runner
# =============================================================================
@dataclass
class RunnerConfig:
"""Configuration de l'orchestrateur de replay."""
# Seuils de matching
min_node_similarity: float = 0.75
# Stabilisation écran
stabilization_frames: int = 3
stabilization_interval: float = 0.3 # secondes entre chaque frame de vérification
stabilization_timeout: float = 10.0 # secondes max pour stabiliser
# Timeouts
node_match_timeout: float = 30.0 # secondes max pour matcher un noeud
action_timeout: float = 15.0 # secondes max pour exécuter une action
global_timeout: float = 300.0 # 5 minutes max pour le workflow entier
# Retries
max_retries_per_action: int = 3
retry_delay: float = 1.0 # secondes entre retries
# Capture
capture_dir: str = "data/runner_captures"
# Sécurité
max_steps: int = 200 # limite anti-boucle infinie
# =============================================================================
# WorkflowRunner
# =============================================================================
class WorkflowRunner:
"""
Orchestre l'exécution complète d'un workflow appris.
Utilise la compréhension sémantique de l'UI (CLIP embeddings + FAISS)
plutôt que de simples coordonnées pour naviguer entre les états.
Args:
workflow: Le workflow à exécuter
screen_analyzer: ScreenAnalyzer pour analyser les captures d'écran
clip_embedder: CLIPEmbedder pour générer les embeddings d'image
faiss_manager: FAISSManager avec les prototypes du workflow indexés
executor_callback: Fonction qui exécute une action physiquement.
Signature : callback(action_dict: dict) -> bool
L'action_dict contient : type, target, parameters, resolved_position
Retourne True si succès, False sinon.
capture_callback: Fonction optionnelle de capture d'écran.
Signature : callback() -> Optional[PIL.Image]
Si None, utilise ScreenCapturer par défaut.
config: Configuration du runner (optionnel)
"""
def __init__(
self,
workflow: Workflow,
screen_analyzer,
clip_embedder,
faiss_manager,
executor_callback: Callable[[Dict[str, Any]], bool],
capture_callback: Optional[Callable[[], Optional[Image.Image]]] = None,
config: Optional[RunnerConfig] = None,
):
self.workflow = workflow
self.screen_analyzer = screen_analyzer
self.clip_embedder = clip_embedder
self.faiss_manager = faiss_manager
self.executor_callback = executor_callback
self.capture_callback = capture_callback
self.config = config or RunnerConfig()
# État interne
self._current_node_id: Optional[str] = None
self._aborted = False
self._last_screen_hash: Optional[str] = None
# Répertoire de captures temporaires
Path(self.config.capture_dir).mkdir(parents=True, exist_ok=True)
# Index des embeddings de noeuds pour matching rapide
# Sera construit lors du premier run si le faiss_manager est vide
self._node_embeddings_indexed = False
# =========================================================================
# API publique
# =========================================================================
def run(self, params: Optional[Dict[str, Any]] = None, dry_run: bool = False) -> RunResult:
"""
Boucle principale de replay.
Args:
params: Paramètres de substitution (ex: {{nom}} → "Dupont")
dry_run: Si True, ne pas exécuter les actions, juste simuler
Returns:
RunResult avec le détail de l'exécution
"""
params = params or {}
start_time = time.time()
result = RunResult(success=False, status=RunStatus.FAILED)
# Validation du workflow
validation_error = self._validate_workflow()
if validation_error:
result.errors.append(validation_error)
result.duration_seconds = time.time() - start_time
return result
# Noeud de départ
start_node_id = self.workflow.entry_nodes[0]
self._current_node_id = start_node_id
result.nodes_visited.append(start_node_id)
logger.info(
f"Démarrage du workflow '{self.workflow.name}' "
f"(ID: {self.workflow.workflow_id}) depuis le noeud {start_node_id} "
f"{'[DRY RUN]' if dry_run else ''}"
)
step_count = 0
try:
while not self._aborted:
elapsed = time.time() - start_time
# Vérification timeout global
if elapsed > self.config.global_timeout:
msg = (
f"Timeout global atteint ({self.config.global_timeout}s). "
f"Dernier noeud : {self._current_node_id}"
)
logger.error(msg)
result.errors.append(msg)
result.status = RunStatus.TIMEOUT
break
# Protection anti-boucle infinie
step_count += 1
if step_count > self.config.max_steps:
msg = f"Limite de {self.config.max_steps} étapes atteinte — abandon"
logger.error(msg)
result.errors.append(msg)
result.status = RunStatus.FAILED
break
# Récupérer le noeud courant
current_node = self.workflow.get_node(self._current_node_id)
if current_node is None:
msg = f"Noeud introuvable : {self._current_node_id}"
logger.error(msg)
result.errors.append(msg)
break
# Vérifier si on a atteint un noeud END
if current_node.is_end or self._current_node_id in self.workflow.end_nodes:
logger.info(f"Noeud END atteint : {self._current_node_id}")
result.success = True
result.status = RunStatus.DRY_RUN if dry_run else RunStatus.SUCCESS
break
# Récupérer les edges sortants
outgoing_edges = self.workflow.get_outgoing_edges(self._current_node_id)
if not outgoing_edges:
# Pas d'edge sortant = fin implicite
logger.info(
f"Aucun edge sortant pour {self._current_node_id} — fin du workflow"
)
result.success = True
result.status = RunStatus.DRY_RUN if dry_run else RunStatus.SUCCESS
break
# Choisir l'edge à exécuter
edge = self._select_edge(outgoing_edges, current_node)
if edge is None:
msg = (
f"Aucun edge exécutable depuis {self._current_node_id} "
f"({len(outgoing_edges)} edges disponibles)"
)
logger.error(msg)
result.errors.append(msg)
break
# Préparer l'action
action = edge.action
action_dict = self._build_action_dict(action, params)
step_start = time.time()
step = StepResult(
node_id=self._current_node_id,
edge_id=edge.edge_id,
action_type=action.type,
action_details=action_dict,
)
if dry_run:
# Mode simulation : ne pas exécuter, juste enregistrer
logger.info(
f"[DRY RUN] Étape {step_count}: {self._current_node_id} "
f"{edge.to_node} via {action.type}"
)
step.success = True
step.duration_seconds = time.time() - step_start
result.steps.append(step)
result.actions_executed.append(action_dict)
# Avancer directement au noeud suivant
self._current_node_id = edge.to_node
result.nodes_visited.append(edge.to_node)
continue
# --- Exécution réelle ---
# Étape 1 : Capturer et vérifier l'état courant
screen_state, screen_image = self._capture_and_analyze()
if screen_state is None:
msg = "Échec de capture/analyse de l'écran"
logger.error(msg)
result.errors.append(msg)
step.success = False
step.error = msg
result.steps.append(step)
break
# Étape 2 : Matcher l'état courant pour confirmer le noeud
matched_node_id, confidence = self._match_current_state(screen_image)
step.match_confidence = confidence
if matched_node_id and matched_node_id != self._current_node_id:
logger.warning(
f"État écran correspond au noeud {matched_node_id} "
f"(attendu: {self._current_node_id}, confiance: {confidence:.3f})"
)
# On continue quand même si la confiance est suffisante
# Le workflow pourrait avoir légèrement dévié
# Étape 3 : Résoudre la cible de l'action
resolved_action = self._resolve_action_target(action, screen_state, action_dict)
# Étape 4 : Exécuter avec retries
action_success = False
for attempt in range(1, self.config.max_retries_per_action + 1):
step.retries = attempt - 1
try:
action_success = self.executor_callback(resolved_action)
if action_success:
logger.info(
f"Action exécutée ({action.type}) sur {self._current_node_id} "
f"{edge.to_node} (tentative {attempt})"
)
break
else:
logger.warning(
f"Action échouée (tentative {attempt}/{self.config.max_retries_per_action})"
)
except Exception as e:
logger.warning(
f"Exception lors de l'exécution (tentative {attempt}): {e}"
)
action_success = False
if attempt < self.config.max_retries_per_action:
time.sleep(self.config.retry_delay)
if not action_success:
msg = (
f"Action échouée après {self.config.max_retries_per_action} tentatives "
f"sur {self._current_node_id}"
)
logger.error(msg)
result.errors.append(msg)
step.success = False
step.error = msg
step.duration_seconds = time.time() - step_start
result.steps.append(step)
break
# Étape 5 : Attendre la stabilisation de l'écran
stabilized = self._wait_for_stabilization(
timeout=self.config.stabilization_timeout
)
if not stabilized:
logger.warning(
"Écran non stabilisé dans le délai imparti — on continue quand même"
)
# Étape 6 : Vérifier qu'on a atteint le noeud suivant
next_node_id = edge.to_node
verified = self._verify_transition(next_node_id)
if not verified:
logger.warning(
f"Transition vers {next_node_id} non confirmée visuellement — "
f"on fait confiance au workflow"
)
# Enregistrer le résultat de l'étape
step.success = True
step.duration_seconds = time.time() - step_start
result.steps.append(step)
result.actions_executed.append(resolved_action)
# Avancer au noeud suivant
self._current_node_id = next_node_id
result.nodes_visited.append(next_node_id)
except Exception as e:
msg = f"Erreur inattendue dans le runner : {e}"
logger.exception(msg)
result.errors.append(msg)
result.duration_seconds = time.time() - start_time
if self._aborted:
result.status = RunStatus.ABORTED
result.errors.append("Exécution interrompue par l'utilisateur")
logger.info(
f"Workflow terminé : {result.status.value} "
f"({len(result.nodes_visited)} noeuds, "
f"{len(result.actions_executed)} actions, "
f"{result.duration_seconds:.1f}s)"
)
return result
def abort(self) -> None:
"""Interrompre l'exécution en cours (thread-safe)."""
logger.info("Demande d'interruption du workflow")
self._aborted = True
# =========================================================================
# Capture et analyse d'écran
# =========================================================================
def _capture_screen(self) -> Optional[Image.Image]:
"""
Capturer l'écran actuel.
Utilise le capture_callback si fourni, sinon crée un ScreenCapturer.
"""
if self.capture_callback:
try:
return self.capture_callback()
except Exception as e:
logger.error(f"Erreur capture callback : {e}")
return None
# Fallback : ScreenCapturer
try:
from ..capture.screen_capturer import ScreenCapturer
capturer = ScreenCapturer()
return capturer.capture_screen()
except Exception as e:
logger.error(f"Erreur ScreenCapturer : {e}")
return None
def _capture_and_analyze(self):
"""
Capturer l'écran et l'analyser.
Returns:
(ScreenState, PIL.Image) ou (None, None)
"""
image = self._capture_screen()
if image is None:
return None, None
try:
# Sauvegarder temporairement pour l'analyse
capture_path = Path(self.config.capture_dir) / f"capture_{int(time.time() * 1000)}.png"
image.save(str(capture_path))
# Analyser via ScreenAnalyzer
screen_state = self.screen_analyzer.analyze(str(capture_path))
return screen_state, image
except Exception as e:
logger.error(f"Erreur analyse écran : {e}")
return None, None
# =========================================================================
# Matching d'état (CLIP + FAISS)
# =========================================================================
def _match_current_state(self, screen_image: Image.Image) -> tuple:
"""
Matcher le screenshot actuel contre les noeuds du workflow.
Utilise l'embedding CLIP de l'image et cherche dans l'index FAISS
les prototypes de noeuds les plus similaires.
Args:
screen_image: Image PIL du screenshot actuel
Returns:
(node_id: str ou None, confidence: float)
"""
try:
# Générer l'embedding CLIP de l'écran actuel
embedding = self.clip_embedder.embed_image(screen_image)
# Chercher dans FAISS les prototypes les plus proches
results = self.faiss_manager.search_similar(
query_vector=embedding,
k=3,
min_similarity=self.config.min_node_similarity,
)
if not results:
logger.debug("Aucun noeud matché au-dessus du seuil de similarité")
return None, 0.0
# Le meilleur résultat
best = results[0]
node_id = best.metadata.get("node_id") if best.metadata else best.embedding_id
confidence = best.similarity
logger.debug(
f"Meilleur match : noeud={node_id}, "
f"similarité={confidence:.4f}"
)
return node_id, confidence
except Exception as e:
logger.error(f"Erreur matching état : {e}")
return None, 0.0
# =========================================================================
# Sélection d'edge
# =========================================================================
def _select_edge(
self,
edges: List[WorkflowEdge],
current_node: WorkflowNode,
) -> Optional[WorkflowEdge]:
"""
Choisir l'edge à exécuter parmi les edges sortants.
Stratégie :
1. S'il n'y a qu'un seul edge → le prendre
2. Sinon, prendre celui avec le meilleur taux de succès
3. Vérifier les pre-conditions de chaque edge
Args:
edges: Liste des edges sortants du noeud courant
current_node: Le noeud courant
Returns:
L'edge sélectionné ou None
"""
if len(edges) == 1:
return edges[0]
# Filtrer les edges dont les pre-conditions sont satisfaites
eligible = []
for edge in edges:
can_exec, reason = edge.can_execute(current_node)
if can_exec:
eligible.append(edge)
else:
logger.debug(f"Edge {edge.edge_id} non éligible : {reason}")
if not eligible:
logger.warning("Aucun edge éligible trouvé")
return None
if len(eligible) == 1:
return eligible[0]
# Trier par taux de succès décroissant
eligible.sort(key=lambda e: e.stats.success_rate, reverse=True)
logger.info(
f"Sélection parmi {len(eligible)} edges — "
f"choisi : {eligible[0].edge_id} "
f"(succès: {eligible[0].stats.success_rate:.0%})"
)
return eligible[0]
# =========================================================================
# Résolution de cible
# =========================================================================
def _resolve_action_target(
self,
action: Action,
screen_state,
action_dict: Dict[str, Any],
) -> Dict[str, Any]:
"""
Résoudre la cible de l'action en utilisant le ScreenState.
Essaye la résolution sémantique (par texte, rôle, similarité visuelle).
En fallback, utilise les coordonnées brutes si disponibles.
Args:
action: L'action du workflow
screen_state: Le ScreenState analysé
action_dict: Le dict d'action déjà construit
Returns:
action_dict enrichi avec resolved_position
"""
target = action.target
resolved = dict(action_dict)
# Stratégie 1 : Résolution par texte
if target.by_text and hasattr(screen_state, "ui_elements"):
for elem in screen_state.ui_elements:
elem_text = getattr(elem, "text", "") or ""
if target.by_text.lower() in elem_text.lower():
bbox = getattr(elem, "bbox", None)
if bbox:
cx, cy = self._bbox_center(bbox)
resolved["resolved_position"] = {"x": cx, "y": cy}
resolved["resolution_method"] = "by_text"
logger.debug(
f"Cible résolue par texte '{target.by_text}' → ({cx}, {cy})"
)
return resolved
# Stratégie 2 : Résolution par rôle
if target.by_role and hasattr(screen_state, "ui_elements"):
candidates = [
elem
for elem in screen_state.ui_elements
if getattr(elem, "role", None) == target.by_role
or getattr(elem, "type", None) == target.by_role
]
if candidates:
# Appliquer la politique de sélection
elem = self._apply_selection_policy(candidates, target)
if elem:
bbox = getattr(elem, "bbox", None)
if bbox:
cx, cy = self._bbox_center(bbox)
resolved["resolved_position"] = {"x": cx, "y": cy}
resolved["resolution_method"] = "by_role"
logger.debug(
f"Cible résolue par rôle '{target.by_role}' → ({cx}, {cy})"
)
return resolved
# Stratégie 3 : Coordonnées brutes (fallback)
if target.by_position:
resolved["resolved_position"] = {
"x": target.by_position[0],
"y": target.by_position[1],
}
resolved["resolution_method"] = "by_position"
logger.debug(
f"Cible résolue par position brute → {target.by_position}"
)
return resolved
# Aucune résolution possible — on laisse le callback gérer
resolved["resolution_method"] = "unresolved"
logger.warning(
f"Cible non résolue pour action {action.type}"
f"le callback devra se débrouiller"
)
return resolved
def _apply_selection_policy(self, candidates: list, target) -> Optional[Any]:
"""
Appliquer la politique de sélection sur les candidats.
Args:
candidates: Liste d'éléments UI candidats
target: TargetSpec avec la politique
Returns:
L'élément sélectionné ou None
"""
policy = getattr(target, "selection_policy", "first")
if policy == "last":
return candidates[-1] if candidates else None
elif policy == "by_position":
# Trier par position (haut-gauche d'abord)
candidates.sort(
key=lambda e: (
getattr(getattr(e, "bbox", None), "y", 0),
getattr(getattr(e, "bbox", None), "x", 0),
)
)
return candidates[0] if candidates else None
else:
# "first" par défaut
return candidates[0] if candidates else None
# =========================================================================
# Stabilisation de l'écran
# =========================================================================
def _wait_for_stabilization(self, timeout: float = 10.0) -> bool:
"""
Attendre que l'écran se stabilise.
L'écran est considéré stable quand le hash de l'image reste identique
sur N captures consécutives.
Args:
timeout: Délai maximum en secondes
Returns:
True si l'écran s'est stabilisé, False si timeout
"""
start_time = time.time()
consecutive_same = 0
last_hash = None
while (time.time() - start_time) < timeout:
image = self._capture_screen()
if image is None:
time.sleep(self.config.stabilization_interval)
continue
current_hash = self._compute_image_hash(image)
if current_hash == last_hash:
consecutive_same += 1
if consecutive_same >= self.config.stabilization_frames:
elapsed = time.time() - start_time
logger.debug(
f"Écran stabilisé après {elapsed:.1f}s "
f"({consecutive_same} frames identiques)"
)
self._last_screen_hash = current_hash
return True
else:
consecutive_same = 1
last_hash = current_hash
time.sleep(self.config.stabilization_interval)
logger.warning(f"Stabilisation échouée après {timeout}s")
return False
# =========================================================================
# Vérification de transition
# =========================================================================
def _verify_transition(self, expected_node_id: str) -> bool:
"""
Vérifier qu'on a bien atteint le noeud attendu après une action.
Capture l'écran, génère un embedding CLIP, et vérifie la similarité
avec le prototype du noeud attendu.
Args:
expected_node_id: ID du noeud qu'on devrait avoir atteint
Returns:
True si le noeud est confirmé
"""
image = self._capture_screen()
if image is None:
return False
matched_node_id, confidence = self._match_current_state(image)
if matched_node_id == expected_node_id and confidence >= self.config.min_node_similarity:
logger.debug(
f"Transition confirmée vers {expected_node_id} "
f"(confiance: {confidence:.3f})"
)
return True
if confidence >= self.config.min_node_similarity:
logger.warning(
f"Transition : noeud matché = {matched_node_id} "
f"(attendu: {expected_node_id}, confiance: {confidence:.3f})"
)
else:
logger.warning(
f"Transition non confirmée vers {expected_node_id} "
f"(meilleur match: {matched_node_id}, confiance: {confidence:.3f})"
)
return False
# =========================================================================
# Construction du dict d'action
# =========================================================================
def _build_action_dict(
self,
action: Action,
params: Dict[str, Any],
) -> Dict[str, Any]:
"""
Construire le dictionnaire d'action à passer au callback.
Effectue la substitution des paramètres {{variable}} dans les
valeurs de l'action.
Args:
action: L'action du workflow
params: Paramètres de substitution
Returns:
Dict prêt pour le callback
"""
action_dict = {
"type": action.type,
"target": action.target.to_dict(),
"parameters": self._substitute_params(action.parameters, params),
}
return action_dict
def _substitute_params(
self,
data: Any,
params: Dict[str, Any],
) -> Any:
"""
Remplacer les variables {{param}} dans les données de l'action.
Supporte la substitution récursive dans les dicts et listes.
Args:
data: Données à traiter (str, dict, list, ou autre)
params: Dictionnaire de paramètres
Returns:
Données avec les variables substituées
"""
if isinstance(data, str):
# Remplacer {{variable}} par la valeur correspondante
def replacer(match):
var_name = match.group(1).strip()
return str(params.get(var_name, match.group(0)))
return re.sub(r"\{\{(.+?)\}\}", replacer, data)
elif isinstance(data, dict):
return {k: self._substitute_params(v, params) for k, v in data.items()}
elif isinstance(data, list):
return [self._substitute_params(item, params) for item in data]
return data
# =========================================================================
# Utilitaires
# =========================================================================
def _validate_workflow(self) -> Optional[str]:
"""
Valider le workflow avant exécution.
Returns:
Message d'erreur si invalide, None si OK
"""
if not self.workflow.entry_nodes:
return "Le workflow n'a pas de noeud d'entrée (entry_nodes vide)"
start_id = self.workflow.entry_nodes[0]
start_node = self.workflow.get_node(start_id)
if start_node is None:
return f"Le noeud d'entrée '{start_id}' n'existe pas dans le workflow"
if not self.workflow.nodes:
return "Le workflow n'a aucun noeud"
if not self.workflow.edges and not start_node.is_end:
return "Le workflow n'a aucun edge et le noeud d'entrée n'est pas un noeud END"
return None
@staticmethod
def _compute_image_hash(image: Image.Image) -> str:
"""
Calculer un hash rapide d'une image PIL pour détecter les changements.
Sous-échantillonne l'image pour un hash rapide (même méthode que
ScreenCapturer._compute_hash).
Args:
image: Image PIL
Returns:
Hash MD5 de l'image sous-échantillonnée
"""
img_array = np.array(image)
# Sous-échantillonner comme ScreenCapturer
small = img_array[::20, ::20, :].tobytes()
return hashlib.md5(small).hexdigest()
@staticmethod
def _bbox_center(bbox) -> tuple:
"""
Calculer le centre d'un bounding box.
Supporte les formats (x, y, w, h) tuple ou objet avec attributs.
Args:
bbox: Bounding box
Returns:
(cx, cy) centre du bbox
"""
if hasattr(bbox, "to_tuple"):
x, y, w, h = bbox.to_tuple()
elif hasattr(bbox, "x") and hasattr(bbox, "width"):
x, y, w, h = bbox.x, bbox.y, bbox.width, bbox.height
elif isinstance(bbox, (list, tuple)) and len(bbox) >= 4:
x, y, w, h = bbox[0], bbox[1], bbox[2], bbox[3]
else:
logger.warning(f"Format bbox inconnu : {type(bbox)}")
return (0, 0)
return (float(x + w / 2), float(y + h / 2))