diff --git a/core/execution/__init__.py b/core/execution/__init__.py index e01b04d10..cad809b9a 100644 --- a/core/execution/__init__.py +++ b/core/execution/__init__.py @@ -8,6 +8,8 @@ from .action_executor import ActionExecutor from .target_resolver import TargetResolver, ResolvedTarget from .error_handler import ErrorHandler, ErrorType, RecoveryStrategy from .workflow_runner import WorkflowRunner, RunResult, RunStatus, RunnerConfig +from .dag_executor import DAGExecutor, WorkflowStep, StepType, StepStatus, DAGExecutionResult +from .llm_actions import LLMActionHandler # Import tardif pour éviter import circulaire avec pipeline def _get_execution_loop(): @@ -25,5 +27,12 @@ __all__ = [ 'RunResult', 'RunStatus', 'RunnerConfig', + # DAG Executor — exécution parallèle basée sur un graphe de dépendances + 'DAGExecutor', + 'WorkflowStep', + 'StepType', + 'StepStatus', + 'DAGExecutionResult', + 'LLMActionHandler', # ExecutionLoop accessible via import direct du module ] diff --git a/core/execution/dag_executor.py b/core/execution/dag_executor.py new file mode 100644 index 000000000..954278cdc --- /dev/null +++ b/core/execution/dag_executor.py @@ -0,0 +1,771 @@ +""" +DAGExecutor — Exécuteur de workflow basé sur un graphe de dépendances (DAG) + +Remplace l'exécution linéaire étape-par-étape par un graphe de dépendances +où les tâches AI/LLM tournent en parallèle sans bloquer les actions UI. + +Architecture : +- Deux ThreadPools séparés : un pour les LLM (parallèle), un pour les UI (séquentiel) +- Les étapes sans dépendances non résolues démarrent immédiatement +- Injection de résultats entre étapes via la syntaxe ${step_id.result} +- Callbacks de changement d'état pour la mise à jour de l'interface + +Exemple de workflow : + 1. Ouvrir OnlyOffice (UI, rapide) + 2. Sélectionner texte (UI, dépend de 1) + 3. Analyser texte (LLM, 10-30s, dépend de 2) + 4. Traduire en français (LLM, dépend de 3, parallèle avec 5) + 5. Traduire en chinois (LLM, dépend de 3, parallèle avec 4) + 6. Ouvrir Gedit (UI, peut démarrer pendant que 4+5 tournent) + 7. Écrire résultat FR (UI, dépend de 4 ET 6) + 8. Écrire résultat CN (UI, dépend de 5 ET 7) + +Auteur : Dom, Claude +Date : 16 mars 2026 +""" + +import logging +import re +import threading +import time +from concurrent.futures import Future, ThreadPoolExecutor +from copy import deepcopy +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Callable, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Types d'étapes et statuts +# ============================================================================= + +class StepType(str, Enum): + """Type d'étape dans le workflow.""" + UI_ACTION = "ui_action" # Clic, frappe, etc. — rapide, séquentiel sur la cible + LLM_CALL = "llm_call" # Appel API Ollama — lent, peut tourner en arrière-plan + WAIT = "wait" # Attente explicite / délai + CONDITION = "condition" # Branchement conditionnel basé sur un résultat + + +class StepStatus(str, Enum): + """Statut d'exécution d'une étape.""" + PENDING = "pending" # En attente de dépendances + READY = "ready" # Toutes les dépendances résolues + RUNNING = "running" # En cours d'exécution + COMPLETED = "completed" # Terminée avec succès + FAILED = "failed" # Échouée + SKIPPED = "skipped" # Ignorée (dépendance échouée ou condition fausse) + + +# ============================================================================= +# Modèle d'une étape de workflow +# ============================================================================= + +@dataclass +class WorkflowStep: + """Représente une étape individuelle du workflow DAG. + + Attributes: + step_id: Identifiant unique de l'étape + step_type: Type d'étape (UI, LLM, etc.) + action: Paramètres de l'action à exécuter + depends_on: Liste des step_id dont cette étape dépend + status: Statut courant + result: Résultat de l'exécution (ex: texte traduit) + error: Message d'erreur en cas d'échec + started_at: Timestamp de début d'exécution + completed_at: Timestamp de fin d'exécution + """ + step_id: str + step_type: StepType + action: Dict[str, Any] = field(default_factory=dict) + depends_on: List[str] = field(default_factory=list) + status: StepStatus = StepStatus.PENDING + result: Any = None + error: Optional[str] = None + started_at: Optional[float] = None + completed_at: Optional[float] = None + + def duration(self) -> Optional[float]: + """Durée d'exécution en secondes, ou None si pas encore terminée.""" + if self.started_at and self.completed_at: + return self.completed_at - self.started_at + return None + + def to_dict(self) -> Dict[str, Any]: + """Sérialiser l'étape pour l'API / les callbacks UI.""" + return { + "step_id": self.step_id, + "step_type": self.step_type.value, + "action": self.action, + "depends_on": self.depends_on, + "status": self.status.value, + "result": self.result, + "error": self.error, + "started_at": self.started_at, + "completed_at": self.completed_at, + "duration": self.duration(), + } + + +# ============================================================================= +# Résultat global du workflow +# ============================================================================= + +@dataclass +class DAGExecutionResult: + """Résultat complet de l'exécution d'un workflow DAG.""" + success: bool + steps: Dict[str, Dict[str, Any]] = field(default_factory=dict) + results: Dict[str, Any] = field(default_factory=dict) + errors: List[str] = field(default_factory=list) + duration_seconds: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + "success": self.success, + "steps": self.steps, + "results": self.results, + "errors": self.errors, + "duration_seconds": round(self.duration_seconds, 3), + } + + +# ============================================================================= +# Patron de substitution pour l'injection de résultats +# ============================================================================= + +# Reconnaît ${step_id.result} ou ${step_id.result.champ} +_RESULT_REF_PATTERN = re.compile(r"\$\{(\w+)\.result(?:\.(\w+))?\}") + + +# ============================================================================= +# DAGExecutor — Cœur de l'exécution parallèle +# ============================================================================= + +class DAGExecutor: + """Exécuteur de workflow basé sur un graphe de dépendances. + + Les étapes sans dépendances non résolues démarrent immédiatement. + Les étapes LLM tournent dans un ThreadPool dédié et n'empêchent pas + les étapes UI de continuer sur leur propre pool (séquentiel, 1 worker). + + Args: + max_llm_workers: Nombre max de tâches LLM en parallèle + max_ui_workers: Nombre max de tâches UI en parallèle (1 = séquentiel) + llm_handler: Handler pour les appels LLM (injection de dépendance) + ui_handler: Handler pour les actions UI (injection de dépendance) + """ + + def __init__( + self, + max_llm_workers: int = 2, + max_ui_workers: int = 1, + llm_handler: Optional[Any] = None, + ui_handler: Optional[Callable] = None, + ): + self._max_llm_workers = max_llm_workers + self._max_ui_workers = max_ui_workers + self._llm_pool: Optional[ThreadPoolExecutor] = None + self._ui_pool: Optional[ThreadPoolExecutor] = None + + # Handlers d'exécution — injectables pour les tests + self._llm_handler = llm_handler + self._ui_handler = ui_handler + + # État du workflow + self._steps: Dict[str, WorkflowStep] = {} + self._results: Dict[str, Any] = {} # step_id → résultat + self._futures: Dict[str, Future] = {} # step_id → future en cours + + # Synchronisation + self._lock = threading.Lock() + self._all_done = threading.Event() + self._cancelled = False + + # Callbacks pour la mise à jour de l'interface + self._on_step_change_callbacks: List[Callable] = [] + + # ----------------------------------------------------------------- + # Chargement du workflow + # ----------------------------------------------------------------- + + def load_workflow(self, steps: List[WorkflowStep]) -> None: + """Charger les étapes du workflow et valider le DAG. + + Raises: + ValueError: Si le graphe contient un cycle ou des dépendances invalides + """ + with self._lock: + self._steps.clear() + self._results.clear() + self._futures.clear() + self._cancelled = False + self._all_done.clear() + + for step in steps: + if step.step_id in self._steps: + raise ValueError( + f"Identifiant d'étape dupliqué : '{step.step_id}'" + ) + self._steps[step.step_id] = step + + # Valider que toutes les dépendances référencent des étapes existantes + for step in self._steps.values(): + for dep_id in step.depends_on: + if dep_id not in self._steps: + raise ValueError( + f"L'étape '{step.step_id}' dépend de '{dep_id}' " + f"qui n'existe pas dans le workflow" + ) + + # Détecter les cycles via tri topologique (Kahn) + self._validate_no_cycles() + + logger.info( + "Workflow chargé : %d étapes", len(self._steps) + ) + + def _validate_no_cycles(self) -> None: + """Vérifie l'absence de cycle dans le DAG (algorithme de Kahn). + + Raises: + ValueError: Si un cycle est détecté + """ + in_degree: Dict[str, int] = { + sid: len(s.depends_on) for sid, s in self._steps.items() + } + queue = [sid for sid, deg in in_degree.items() if deg == 0] + visited = 0 + + while queue: + current = queue.pop(0) + visited += 1 + # Trouver les étapes qui dépendent de current + for sid, step in self._steps.items(): + if current in step.depends_on: + in_degree[sid] -= 1 + if in_degree[sid] == 0: + queue.append(sid) + + if visited != len(self._steps): + raise ValueError( + "Cycle détecté dans le graphe de dépendances. " + "Le workflow doit être un DAG (graphe acyclique dirigé)." + ) + + # ----------------------------------------------------------------- + # Exécution principale + # ----------------------------------------------------------------- + + def execute(self, timeout: float = 300.0) -> DAGExecutionResult: + """Exécuter le workflow en respectant les dépendances. + + Algorithme : + 1. Trouver toutes les étapes READY (dépendances résolues) + 2. Soumettre les LLM au llm_pool, les UI au ui_pool + 3. Quand une étape termine, ré-évaluer les dépendances + 4. Répéter jusqu'à ce que tout soit terminé ou échoué + + Args: + timeout: Timeout global en secondes (défaut 5 minutes) + + Returns: + DAGExecutionResult avec le statut de toutes les étapes + """ + start_time = time.monotonic() + + # Créer les pools (à chaque exécution pour éviter les problèmes + # de réutilisation après shutdown) + self._llm_pool = ThreadPoolExecutor( + max_workers=self._max_llm_workers, + thread_name_prefix="dag-llm", + ) + self._ui_pool = ThreadPoolExecutor( + max_workers=self._max_ui_workers, + thread_name_prefix="dag-ui", + ) + + try: + # Lancer la boucle initiale + self._schedule_ready_steps() + + # Attendre que tout soit terminé ou le timeout + self._all_done.wait(timeout=timeout) + + elapsed = time.monotonic() - start_time + + # Vérifier si timeout + if not self._all_done.is_set(): + logger.warning("Timeout atteint après %.1fs", elapsed) + self._cancel_remaining("Timeout global atteint") + + # Construire le résultat + return self._build_result(elapsed) + + finally: + self._llm_pool.shutdown(wait=False) + self._ui_pool.shutdown(wait=False) + self._llm_pool = None + self._ui_pool = None + + # ----------------------------------------------------------------- + # Planification des étapes + # ----------------------------------------------------------------- + + def _schedule_ready_steps(self) -> None: + """Identifier et soumettre les étapes prêtes à l'exécution.""" + with self._lock: + if self._cancelled: + return + + ready_steps = self._find_ready_steps() + + if not ready_steps and self._is_all_done(): + self._all_done.set() + return + + for step in ready_steps: + step.status = StepStatus.RUNNING + step.started_at = time.monotonic() + self._notify_step_change(step) + + # Choisir le pool selon le type d'étape + pool = self._get_pool_for_step(step) + future = pool.submit(self._execute_step, step) + self._futures[step.step_id] = future + + def _find_ready_steps(self) -> List[WorkflowStep]: + """Trouver toutes les étapes dont les dépendances sont résolues. + + Une étape est prête si : + - Son statut est PENDING + - Toutes ses dépendances sont COMPLETED + """ + ready = [] + for step in self._steps.values(): + if step.status != StepStatus.PENDING: + continue + if self._check_dependencies(step): + step.status = StepStatus.READY + ready.append(step) + return ready + + def _check_dependencies(self, step: WorkflowStep) -> bool: + """Vérifie si toutes les dépendances d'une étape sont résolues. + + Returns: + True si toutes les dépendances sont COMPLETED + """ + for dep_id in step.depends_on: + dep = self._steps.get(dep_id) + if dep is None: + return False + if dep.status != StepStatus.COMPLETED: + return False + return True + + def _is_all_done(self) -> bool: + """Vérifie si toutes les étapes sont terminées (succès, échec ou ignorées).""" + return all( + s.status in (StepStatus.COMPLETED, StepStatus.FAILED, StepStatus.SKIPPED) + for s in self._steps.values() + ) + + def _get_pool_for_step(self, step: WorkflowStep) -> ThreadPoolExecutor: + """Retourne le pool approprié selon le type d'étape.""" + if step.step_type == StepType.LLM_CALL: + return self._llm_pool + # UI_ACTION, WAIT, CONDITION → pool UI (séquentiel) + return self._ui_pool + + # ----------------------------------------------------------------- + # Exécution d'une étape + # ----------------------------------------------------------------- + + def _execute_step(self, step: WorkflowStep) -> None: + """Exécute une étape dans le pool approprié. + + Appelé par le ThreadPoolExecutor. Gère les erreurs et + déclenche la re-planification après complétion. + """ + if self._cancelled: + return + + try: + logger.info( + "Démarrage étape '%s' (type=%s)", + step.step_id, step.step_type.value, + ) + + # Injecter les résultats des dépendances dans les paramètres + resolved_action = self._inject_results(step) + + # Dispatcher selon le type + if step.step_type == StepType.LLM_CALL: + result = self._execute_llm_step(step, resolved_action) + elif step.step_type == StepType.UI_ACTION: + result = self._execute_ui_step(step, resolved_action) + elif step.step_type == StepType.WAIT: + result = self._execute_wait_step(step, resolved_action) + elif step.step_type == StepType.CONDITION: + result = self._execute_condition_step(step, resolved_action) + else: + raise ValueError(f"Type d'étape inconnu : {step.step_type}") + + # Succès + with self._lock: + step.status = StepStatus.COMPLETED + step.result = result + step.completed_at = time.monotonic() + self._results[step.step_id] = result + + logger.info( + "Étape '%s' terminée en %.2fs", + step.step_id, + step.duration() or 0, + ) + + except Exception as exc: + logger.error( + "Étape '%s' échouée : %s", step.step_id, exc, exc_info=True + ) + with self._lock: + step.status = StepStatus.FAILED + step.error = str(exc) + step.completed_at = time.monotonic() + + # Marquer les dépendants comme SKIPPED + self._skip_dependents(step.step_id) + + finally: + self._notify_step_change(step) + # Re-planifier les étapes devenues prêtes + self._schedule_ready_steps() + + def _execute_llm_step( + self, step: WorkflowStep, action: Dict[str, Any] + ) -> Any: + """Exécute un appel LLM via le handler configuré. + + Args: + step: L'étape à exécuter + action: Paramètres de l'action avec résultats injectés + + Returns: + Le résultat du LLM (texte, dict, etc.) + """ + if self._llm_handler is None: + raise RuntimeError( + "Aucun handler LLM configuré. " + "Passez un LLMActionHandler au constructeur ou via set_llm_handler()." + ) + + # Le handler LLM reçoit l'action et le contexte des résultats + context = { + "results": dict(self._results), + "step_id": step.step_id, + } + return self._llm_handler.execute(action, context) + + def _execute_ui_step( + self, step: WorkflowStep, action: Dict[str, Any] + ) -> Any: + """Exécute une action UI via le handler configuré. + + Args: + step: L'étape à exécuter + action: Paramètres de l'action avec résultats injectés + + Returns: + Le résultat de l'action UI + """ + if self._ui_handler is not None: + return self._ui_handler(action) + + # Comportement par défaut : log + retour des paramètres + logger.info( + "Action UI '%s' : %s (pas de handler configuré, simulation)", + step.step_id, action.get("type", "unknown"), + ) + return {"simulated": True, "action": action} + + def _execute_wait_step( + self, step: WorkflowStep, action: Dict[str, Any] + ) -> Any: + """Exécute une étape d'attente. + + Args: + step: L'étape à exécuter + action: Doit contenir 'duration' en secondes + + Returns: + Dict avec la durée effective + """ + duration = float(action.get("duration", 1.0)) + logger.info("Attente de %.1fs (étape '%s')", duration, step.step_id) + time.sleep(duration) + return {"waited": duration} + + def _execute_condition_step( + self, step: WorkflowStep, action: Dict[str, Any] + ) -> Any: + """Exécute une étape conditionnelle. + + Évalue une condition simple basée sur les résultats des dépendances. + Le champ 'condition' de l'action doit contenir une expression évaluable. + + Args: + step: L'étape à exécuter + action: Doit contenir 'condition' et optionnellement 'skip_on_false' + + Returns: + True/False selon le résultat de la condition + """ + condition = action.get("condition", "True") + # Contexte d'évaluation sécurisé : uniquement les résultats + eval_context = {"results": dict(self._results)} + + try: + result = bool(eval(condition, {"__builtins__": {}}, eval_context)) + except Exception as exc: + logger.warning( + "Erreur d'évaluation de condition pour '%s' : %s", + step.step_id, exc, + ) + result = False + + logger.info( + "Condition '%s' évaluée à %s", step.step_id, result + ) + + # Si la condition est fausse et skip_on_false est activé, + # marquer les dépendants pour skip + if not result and action.get("skip_on_false", False): + self._skip_dependents(step.step_id) + + return result + + # ----------------------------------------------------------------- + # Injection de résultats + # ----------------------------------------------------------------- + + def _inject_results(self, step: WorkflowStep) -> Dict[str, Any]: + """Injecte les résultats des dépendances dans les paramètres de l'étape. + + Parcourt récursivement les valeurs de l'action et remplace + les références ${step_id.result} par les résultats effectifs. + + Syntaxe supportée : + - ${step_3.result} → résultat complet de l'étape 3 + - ${step_3.result.text} → champ 'text' du résultat de l'étape 3 + + Args: + step: L'étape dont les paramètres doivent être résolus + + Returns: + Copie de l'action avec les références remplacées + """ + action = deepcopy(step.action) + return self._resolve_references(action) + + def _resolve_references(self, obj: Any) -> Any: + """Résout récursivement les références ${...} dans un objet.""" + if isinstance(obj, str): + return self._resolve_string_references(obj) + elif isinstance(obj, dict): + return {k: self._resolve_references(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [self._resolve_references(item) for item in obj] + return obj + + def _resolve_string_references(self, text: str) -> Any: + """Résout les références dans une chaîne de caractères. + + Si la chaîne entière est une référence unique, retourne la valeur brute + (pas nécessairement une chaîne). Sinon, remplace les références dans la + chaîne par leur représentation textuelle. + """ + # Cas spécial : la chaîne entière est une seule référence + match = _RESULT_REF_PATTERN.fullmatch(text) + if match: + return self._get_referenced_value(match.group(1), match.group(2)) + + # Cas général : remplacer les références dans la chaîne + def replacer(m: re.Match) -> str: + value = self._get_referenced_value(m.group(1), m.group(2)) + return str(value) if value is not None else m.group(0) + + return _RESULT_REF_PATTERN.sub(replacer, text) + + def _get_referenced_value( + self, step_id: str, field_name: Optional[str] + ) -> Any: + """Récupère la valeur référencée par un step_id et un champ optionnel.""" + value = self._results.get(step_id) + if value is None: + logger.warning( + "Référence à un résultat inexistant : %s", step_id + ) + return None + + if field_name and isinstance(value, dict): + return value.get(field_name) + + return value + + # ----------------------------------------------------------------- + # Gestion des échecs et annulation + # ----------------------------------------------------------------- + + def _skip_dependents(self, failed_step_id: str) -> None: + """Marque comme SKIPPED toutes les étapes qui dépendent d'une étape échouée. + + Propagation récursive : si A échoue, B (qui dépend de A) est SKIPPED, + et C (qui dépend de B) est aussi SKIPPED. + """ + with self._lock: + to_skip = set() + # Trouver les dépendants directs + for sid, step in self._steps.items(): + if ( + failed_step_id in step.depends_on + and step.status in (StepStatus.PENDING, StepStatus.READY) + ): + to_skip.add(sid) + + # Propager récursivement + changed = True + while changed: + changed = False + for sid, step in self._steps.items(): + if sid in to_skip: + continue + if step.status not in (StepStatus.PENDING, StepStatus.READY): + continue + if any(dep in to_skip for dep in step.depends_on): + to_skip.add(sid) + changed = True + + # Appliquer le skip + for sid in to_skip: + step = self._steps[sid] + step.status = StepStatus.SKIPPED + step.error = f"Ignorée car l'étape '{failed_step_id}' a échoué" + step.completed_at = time.monotonic() + logger.info("Étape '%s' ignorée (dépendance échouée)", sid) + self._notify_step_change(step) + + def cancel(self) -> None: + """Annule l'exécution en cours. + + Les étapes déjà en cours termineront naturellement, mais aucune + nouvelle étape ne sera planifiée. + """ + logger.info("Annulation du workflow demandée") + with self._lock: + self._cancelled = True + self._cancel_remaining("Annulé par l'utilisateur") + self._all_done.set() + + def _cancel_remaining(self, reason: str) -> None: + """Annule toutes les étapes non terminées. + + Doit être appelé avec le lock déjà acquis. + """ + for step in self._steps.values(): + if step.status in (StepStatus.PENDING, StepStatus.READY): + step.status = StepStatus.SKIPPED + step.error = reason + step.completed_at = time.monotonic() + self._notify_step_change(step) + + # ----------------------------------------------------------------- + # Observation et callbacks + # ----------------------------------------------------------------- + + def on_step_change(self, callback: Callable[[WorkflowStep], None]) -> None: + """Enregistre un callback appelé à chaque changement d'état d'une étape. + + Le callback reçoit l'objet WorkflowStep modifié. + + Args: + callback: Fonction à appeler lors d'un changement + """ + self._on_step_change_callbacks.append(callback) + + def _notify_step_change(self, step: WorkflowStep) -> None: + """Notifie tous les callbacks enregistrés d'un changement d'état.""" + for cb in self._on_step_change_callbacks: + try: + cb(step) + except Exception as exc: + logger.error( + "Erreur dans le callback de changement d'état : %s", exc + ) + + def get_status(self) -> Dict[str, Any]: + """Retourne l'état de toutes les étapes du workflow. + + Returns: + Dict avec les clés 'steps', 'results', 'summary' + """ + with self._lock: + steps_status = { + sid: step.to_dict() for sid, step in self._steps.items() + } + + # Compteurs par statut + summary = {} + for status in StepStatus: + count = sum( + 1 for s in self._steps.values() if s.status == status + ) + if count > 0: + summary[status.value] = count + + return { + "steps": steps_status, + "results": dict(self._results), + "summary": summary, + "cancelled": self._cancelled, + } + + # ----------------------------------------------------------------- + # Méthodes utilitaires + # ----------------------------------------------------------------- + + def set_llm_handler(self, handler: Any) -> None: + """Configure le handler LLM après construction.""" + self._llm_handler = handler + + def set_ui_handler(self, handler: Callable) -> None: + """Configure le handler UI après construction.""" + self._ui_handler = handler + + def _build_result(self, elapsed: float) -> DAGExecutionResult: + """Construit le résultat final de l'exécution.""" + errors = [ + f"Étape '{sid}' : {step.error}" + for sid, step in self._steps.items() + if step.status == StepStatus.FAILED and step.error + ] + + all_success = all( + s.status in (StepStatus.COMPLETED, StepStatus.SKIPPED) + for s in self._steps.values() + ) + # Un workflow avec des étapes SKIPPED à cause d'un échec n'est pas un succès + has_failures = any( + s.status == StepStatus.FAILED for s in self._steps.values() + ) + + return DAGExecutionResult( + success=all_success and not has_failures, + steps={sid: step.to_dict() for sid, step in self._steps.items()}, + results=dict(self._results), + errors=errors, + duration_seconds=elapsed, + ) diff --git a/core/execution/llm_actions.py b/core/execution/llm_actions.py new file mode 100644 index 000000000..aa77a0d93 --- /dev/null +++ b/core/execution/llm_actions.py @@ -0,0 +1,431 @@ +""" +LLMActionHandler — Gestionnaire d'actions LLM pour les workflows DAG + +Gère les appels LLM via l'API Ollama /api/chat pour les étapes de workflow. +Chaque action est un appel synchrone (bloquant) qui sera exécuté dans +le ThreadPool LLM du DAGExecutor. + +Actions supportées : +- analyze_text : Analyser / résumer un texte +- translate : Traduire un texte vers une langue cible +- extract_data : Extraire des données structurées d'un texte +- generate_text : Générer du texte à partir d'un prompt + +Auteur : Dom, Claude +Date : 16 mars 2026 +""" + +import json +import logging +from typing import Any, Dict, Optional + +import requests + +logger = logging.getLogger(__name__) + + +class LLMActionHandler: + """Gestionnaire d'appels LLM pour les étapes de workflow. + + Utilise l'API Ollama /api/chat (mode conversationnel) pour toutes + les interactions avec le modèle de langage. + + Args: + ollama_endpoint: URL de l'API Ollama + model: Nom du modèle à utiliser + temperature: Température de génération par défaut + timeout: Timeout par appel en secondes + """ + + def __init__( + self, + ollama_endpoint: str = "http://localhost:11434", + model: str = "qwen3-vl:8b", + temperature: float = 0.1, + timeout: int = 120, + ): + self.endpoint = ollama_endpoint.rstrip("/") + self.model = model + self.temperature = temperature + self.timeout = timeout + + # ----------------------------------------------------------------- + # Dispatcher principal + # ----------------------------------------------------------------- + + def execute(self, action: Dict[str, Any], context: Dict[str, Any]) -> Any: + """Dispatcher vers la bonne action LLM. + + Args: + action: Dict contenant au minimum 'llm_action' (nom de l'action) + et les paramètres spécifiques à l'action + context: Contexte d'exécution (résultats précédents, step_id, etc.) + + Returns: + Résultat de l'action (texte, dict, etc.) + + Raises: + ValueError: Si l'action LLM est inconnue + RuntimeError: Si l'appel à Ollama échoue + """ + llm_action = action.get("llm_action", "") + + dispatch = { + "analyze_text": self._dispatch_analyze, + "translate": self._dispatch_translate, + "extract_data": self._dispatch_extract, + "generate_text": self._dispatch_generate, + } + + handler = dispatch.get(llm_action) + if handler is None: + raise ValueError( + f"Action LLM inconnue : '{llm_action}'. " + f"Actions supportées : {list(dispatch.keys())}" + ) + + return handler(action, context) + + # ----------------------------------------------------------------- + # Actions LLM + # ----------------------------------------------------------------- + + def analyze_text( + self, + text: str, + instruction: str = "Analyse et résume ce texte.", + model: Optional[str] = None, + temperature: Optional[float] = None, + ) -> str: + """Analyser ou résumer un texte. + + Args: + text: Texte à analyser + instruction: Instruction pour l'analyse + model: Modèle spécifique (sinon modèle par défaut) + temperature: Température spécifique + + Returns: + Texte de l'analyse + """ + system_prompt = ( + "Tu es un assistant d'analyse de texte. " + "Réponds de manière concise et structurée." + ) + user_message = f"{instruction}\n\nTexte :\n{text}" + return self._chat( + system_prompt=system_prompt, + user_message=user_message, + model=model, + temperature=temperature, + ) + + def translate( + self, + text: str, + target_lang: str, + source_lang: Optional[str] = None, + model: Optional[str] = None, + temperature: Optional[float] = None, + ) -> str: + """Traduire un texte vers une langue cible. + + Args: + text: Texte à traduire + target_lang: Langue cible (ex: "français", "chinois", "english") + source_lang: Langue source (détection auto si None) + model: Modèle spécifique + temperature: Température spécifique + + Returns: + Texte traduit + """ + system_prompt = ( + "Tu es un traducteur professionnel. " + "Traduis le texte fidèlement, sans ajouter d'explication. " + "Retourne uniquement la traduction." + ) + if source_lang: + user_message = ( + f"Traduis le texte suivant du {source_lang} " + f"vers le {target_lang} :\n\n{text}" + ) + else: + user_message = ( + f"Traduis le texte suivant en {target_lang} :\n\n{text}" + ) + + return self._chat( + system_prompt=system_prompt, + user_message=user_message, + model=model, + temperature=temperature, + ) + + def extract_data( + self, + text: str, + schema: Dict[str, Any], + model: Optional[str] = None, + temperature: Optional[float] = None, + ) -> Dict[str, Any]: + """Extraire des données structurées d'un texte. + + Args: + text: Texte source + schema: Schéma des données à extraire (clés attendues + descriptions) + model: Modèle spécifique + temperature: Température spécifique + + Returns: + Dict avec les données extraites + """ + schema_desc = json.dumps(schema, ensure_ascii=False, indent=2) + system_prompt = ( + "Tu es un extracteur de données. " + "Extrais les informations demandées du texte et retourne " + "un JSON valide correspondant au schéma fourni. " + "Retourne UNIQUEMENT le JSON, sans explication." + ) + user_message = ( + f"Schéma attendu :\n{schema_desc}\n\n" + f"Texte source :\n{text}" + ) + + response = self._chat( + system_prompt=system_prompt, + user_message=user_message, + model=model, + temperature=temperature, + force_json=True, + ) + + # Parser le JSON de la réponse + try: + return json.loads(response) + except json.JSONDecodeError: + # Tenter d'extraire le JSON de la réponse + logger.warning( + "Réponse LLM non-JSON pour extract_data, tentative d'extraction" + ) + return self._try_extract_json(response) + + def generate_text( + self, + prompt: str, + context: str = "", + model: Optional[str] = None, + temperature: Optional[float] = None, + ) -> str: + """Générer du texte à partir d'un prompt. + + Args: + prompt: Instruction de génération + context: Contexte additionnel + model: Modèle spécifique + temperature: Température spécifique + + Returns: + Texte généré + """ + system_prompt = ( + "Tu es un assistant de rédaction. " + "Génère le contenu demandé de manière claire et précise." + ) + user_message = prompt + if context: + user_message = f"Contexte :\n{context}\n\nInstruction :\n{prompt}" + + return self._chat( + system_prompt=system_prompt, + user_message=user_message, + model=model, + temperature=temperature, + ) + + # ----------------------------------------------------------------- + # Dispatchers internes (adaptent les paramètres d'action) + # ----------------------------------------------------------------- + + def _dispatch_analyze( + self, action: Dict[str, Any], context: Dict[str, Any] + ) -> str: + """Dispatche une action analyze_text depuis les paramètres du workflow.""" + return self.analyze_text( + text=action.get("text", ""), + instruction=action.get("instruction", "Analyse et résume ce texte."), + model=action.get("model"), + temperature=action.get("temperature"), + ) + + def _dispatch_translate( + self, action: Dict[str, Any], context: Dict[str, Any] + ) -> str: + """Dispatche une action translate depuis les paramètres du workflow.""" + return self.translate( + text=action.get("text", ""), + target_lang=action.get("target_lang", "français"), + source_lang=action.get("source_lang"), + model=action.get("model"), + temperature=action.get("temperature"), + ) + + def _dispatch_extract( + self, action: Dict[str, Any], context: Dict[str, Any] + ) -> Dict[str, Any]: + """Dispatche une action extract_data depuis les paramètres du workflow.""" + return self.extract_data( + text=action.get("text", ""), + schema=action.get("schema", {}), + model=action.get("model"), + temperature=action.get("temperature"), + ) + + def _dispatch_generate( + self, action: Dict[str, Any], context: Dict[str, Any] + ) -> str: + """Dispatche une action generate_text depuis les paramètres du workflow.""" + return self.generate_text( + prompt=action.get("prompt", ""), + context=action.get("context", ""), + model=action.get("model"), + temperature=action.get("temperature"), + ) + + # ----------------------------------------------------------------- + # Communication avec Ollama via /api/chat + # ----------------------------------------------------------------- + + def _chat( + self, + system_prompt: str, + user_message: str, + model: Optional[str] = None, + temperature: Optional[float] = None, + force_json: bool = False, + ) -> str: + """Appel à l'API /api/chat d'Ollama. + + Args: + system_prompt: Message système + user_message: Message utilisateur + model: Modèle (défaut: self.model) + temperature: Température (défaut: self.temperature) + force_json: Forcer la sortie JSON + + Returns: + Contenu de la réponse du modèle + + Raises: + RuntimeError: Si l'appel échoue + """ + effective_model = model or self.model + effective_temp = temperature if temperature is not None else self.temperature + + # Pour Qwen3, désactiver le mode thinking pour des réponses directes + effective_user_message = user_message + if "qwen" in effective_model.lower(): + effective_user_message = f"/nothink {user_message}" + + messages = [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": effective_user_message}, + ] + + payload = { + "model": effective_model, + "messages": messages, + "stream": False, + "options": { + "temperature": effective_temp, + }, + } + + if force_json: + payload["format"] = "json" + + url = f"{self.endpoint}/api/chat" + + try: + response = requests.post( + url, + json=payload, + timeout=self.timeout, + ) + + if response.status_code != 200: + raise RuntimeError( + f"Ollama /api/chat a retourné HTTP {response.status_code} : " + f"{response.text[:500]}" + ) + + data = response.json() + content = data.get("message", {}).get("content", "") + + if not content: + raise RuntimeError( + "Ollama a retourné une réponse vide" + ) + + return content + + except requests.exceptions.Timeout: + raise RuntimeError( + f"Timeout de {self.timeout}s dépassé pour l'appel LLM " + f"(modèle: {effective_model})" + ) + except requests.exceptions.ConnectionError: + raise RuntimeError( + f"Impossible de se connecter à Ollama sur {self.endpoint}. " + f"Vérifiez que le service est lancé." + ) + + # ----------------------------------------------------------------- + # Utilitaires + # ----------------------------------------------------------------- + + @staticmethod + def _try_extract_json(text: str) -> Dict[str, Any]: + """Tente d'extraire un objet JSON d'un texte libre. + + Cherche le premier { et le dernier } pour isoler le JSON. + """ + start = text.find("{") + end = text.rfind("}") + if start != -1 and end != -1 and end > start: + try: + return json.loads(text[start : end + 1]) + except json.JSONDecodeError: + pass + logger.error("Impossible d'extraire du JSON de la réponse LLM") + return {"raw_response": text, "_parse_error": True} + + def check_connection(self) -> bool: + """Vérifie la connexion à Ollama et la disponibilité du modèle. + + Returns: + True si Ollama répond et le modèle est disponible + """ + try: + response = requests.get( + f"{self.endpoint}/api/tags", timeout=5 + ) + if response.status_code == 200: + models = response.json().get("models", []) + model_names = [m["name"] for m in models] + if self.model in model_names: + return True + logger.warning( + "Modèle '%s' non trouvé dans Ollama. " + "Modèles disponibles : %s", + self.model, + model_names, + ) + return False + except Exception as exc: + logger.warning( + "Impossible de se connecter à Ollama sur %s : %s", + self.endpoint, + exc, + ) + return False diff --git a/tests/unit/test_dag_executor.py b/tests/unit/test_dag_executor.py new file mode 100644 index 000000000..182ac15bb --- /dev/null +++ b/tests/unit/test_dag_executor.py @@ -0,0 +1,1060 @@ +""" +Tests unitaires pour le DAGExecutor — Exécuteur de workflow DAG + +Teste : +- Exécution linéaire (A → B → C) +- Exécution parallèle de tâches LLM +- Injection de résultats (${step_id.result}) +- Propagation d'échecs aux dépendants +- Annulation de workflow +- Détection de cycles +- Étapes conditionnelles et wait + +Utilise des mocks LLM (time.sleep court) au lieu d'Ollama réel. + +Auteur : Dom, Claude +Date : 16 mars 2026 +""" + +import threading +import time +from unittest.mock import MagicMock, patch + +import pytest +import requests + +from core.execution.dag_executor import ( + DAGExecutionResult, + DAGExecutor, + StepStatus, + StepType, + WorkflowStep, +) +from core.execution.llm_actions import LLMActionHandler + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +class MockLLMHandler: + """Handler LLM simulé pour les tests. + + Simule un appel LLM avec un court délai et retourne + un résultat prévisible basé sur l'action. + """ + + def __init__(self, delay: float = 0.05): + self.delay = delay + self.call_count = 0 + self._lock = threading.Lock() + + def execute(self, action: dict, context: dict) -> str: + """Simule un appel LLM.""" + time.sleep(self.delay) + with self._lock: + self.call_count += 1 + + llm_action = action.get("llm_action", "unknown") + + if llm_action == "translate": + target = action.get("target_lang", "?") + text = action.get("text", "") + return f"[Traduit en {target}] {text}" + + if llm_action == "analyze_text": + return f"[Analyse] {action.get('text', '')[:50]}" + + if llm_action == "generate_text": + return f"[Généré] {action.get('prompt', '')}" + + if llm_action == "extract_data": + return {"extracted": True, "source": action.get("text", "")} + + return f"[LLM:{llm_action}]" + + +class MockUIHandler: + """Handler UI simulé pour les tests.""" + + def __init__(self, delay: float = 0.01): + self.delay = delay + self.executed_actions: list = [] + self._lock = threading.Lock() + + def __call__(self, action: dict) -> dict: + """Simule une action UI.""" + time.sleep(self.delay) + with self._lock: + self.executed_actions.append(action) + return {"success": True, "action_type": action.get("type", "unknown")} + + +@pytest.fixture +def mock_llm(): + """Handler LLM mock.""" + return MockLLMHandler(delay=0.05) + + +@pytest.fixture +def mock_ui(): + """Handler UI mock.""" + return MockUIHandler(delay=0.01) + + +@pytest.fixture +def executor(mock_llm, mock_ui): + """DAGExecutor configuré avec des handlers mock.""" + return DAGExecutor( + max_llm_workers=2, + max_ui_workers=1, + llm_handler=mock_llm, + ui_handler=mock_ui, + ) + + +# ============================================================================= +# Tests de workflow linéaire +# ============================================================================= + + +class TestLinearWorkflow: + """Teste l'exécution séquentielle A → B → C.""" + + def test_linear_three_steps(self, executor, mock_ui): + """Trois étapes UI exécutées en séquence.""" + steps = [ + WorkflowStep( + step_id="step_1", + step_type=StepType.UI_ACTION, + action={"type": "click", "target": "bouton_ouvrir"}, + ), + WorkflowStep( + step_id="step_2", + step_type=StepType.UI_ACTION, + action={"type": "click", "target": "champ_texte"}, + depends_on=["step_1"], + ), + WorkflowStep( + step_id="step_3", + step_type=StepType.UI_ACTION, + action={"type": "type_text", "text": "Bonjour"}, + depends_on=["step_2"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + assert len(result.results) == 3 + assert len(result.errors) == 0 + + # Les 3 actions UI ont été exécutées + assert len(mock_ui.executed_actions) == 3 + + def test_linear_preserves_order(self, executor): + """Les étapes linéaires s'exécutent dans l'ordre des dépendances.""" + completion_order = [] + lock = threading.Lock() + + def on_change(step): + if step.status == StepStatus.COMPLETED: + with lock: + completion_order.append(step.step_id) + + executor.on_step_change(on_change) + + steps = [ + WorkflowStep(step_id="A", step_type=StepType.UI_ACTION, action={"type": "a"}), + WorkflowStep(step_id="B", step_type=StepType.UI_ACTION, action={"type": "b"}, depends_on=["A"]), + WorkflowStep(step_id="C", step_type=StepType.UI_ACTION, action={"type": "c"}, depends_on=["B"]), + ] + + executor.load_workflow(steps) + executor.execute(timeout=10) + + assert completion_order == ["A", "B", "C"] + + def test_single_step_workflow(self, executor): + """Un workflow avec une seule étape fonctionne.""" + steps = [ + WorkflowStep( + step_id="only", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + assert result.success is True + assert "only" in result.results + + +# ============================================================================= +# Tests de parallélisme LLM +# ============================================================================= + + +class TestParallelLLMSteps: + """Teste l'exécution parallèle des étapes LLM.""" + + def test_parallel_translations(self, executor, mock_llm): + """Deux traductions en parallèle après une analyse. + + Graphe : A → B (analyse), B → C (trad FR), B → D (trad CN), C+D → E + """ + steps = [ + WorkflowStep( + step_id="select", + step_type=StepType.UI_ACTION, + action={"type": "select_text"}, + ), + WorkflowStep( + step_id="analyze", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "Hello world"}, + depends_on=["select"], + ), + WorkflowStep( + step_id="trad_fr", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "translate", + "text": "${analyze.result}", + "target_lang": "français", + }, + depends_on=["analyze"], + ), + WorkflowStep( + step_id="trad_cn", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "translate", + "text": "${analyze.result}", + "target_lang": "chinois", + }, + depends_on=["analyze"], + ), + WorkflowStep( + step_id="done", + step_type=StepType.UI_ACTION, + action={"type": "notify", "message": "Terminé"}, + depends_on=["trad_fr", "trad_cn"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + assert mock_llm.call_count == 3 # analyze + 2 traductions + + # Les traductions FR et CN contiennent le résultat de l'analyse + assert "français" in result.results["trad_fr"] + assert "chinois" in result.results["trad_cn"] + + def test_parallel_steps_faster_than_sequential(self, executor): + """Les étapes parallèles sont plus rapides que séquentielles. + + Deux LLM de 0.1s chacun en parallèle devraient prendre ~0.1s, + pas ~0.2s. + """ + slow_llm = MockLLMHandler(delay=0.1) + executor.set_llm_handler(slow_llm) + + steps = [ + WorkflowStep( + step_id="llm_a", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "A"}, + ), + WorkflowStep( + step_id="llm_b", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "B"}, + ), + ] + + executor.load_workflow(steps) + start = time.monotonic() + result = executor.execute(timeout=5) + elapsed = time.monotonic() - start + + assert result.success is True + # En parallèle : ~0.1s. En séquentiel ça serait ~0.2s. + # On laisse une marge pour la latence de threading. + assert elapsed < 0.18, f"Trop lent ({elapsed:.3f}s), pas de parallélisme ?" + + def test_ui_action_during_llm(self, executor): + """Les actions UI peuvent s'exécuter pendant que les LLM tournent. + + Graphe : + - step_1 (UI) → step_2 (LLM, lent) + - step_3 (UI, indépendant) peut démarrer immédiatement + """ + slow_llm = MockLLMHandler(delay=0.15) + executor.set_llm_handler(slow_llm) + + completion_order = [] + lock = threading.Lock() + + def on_change(step): + if step.status == StepStatus.COMPLETED: + with lock: + completion_order.append(step.step_id) + + executor.on_step_change(on_change) + + steps = [ + WorkflowStep( + step_id="open_app", + step_type=StepType.UI_ACTION, + action={"type": "open", "app": "onlyoffice"}, + ), + WorkflowStep( + step_id="slow_analysis", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "Long text..."}, + depends_on=["open_app"], + ), + WorkflowStep( + step_id="open_gedit", + step_type=StepType.UI_ACTION, + action={"type": "open", "app": "gedit"}, + # Pas de dépendance → peut démarrer en parallèle + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + # open_gedit devrait terminer avant slow_analysis + gedit_idx = completion_order.index("open_gedit") + analysis_idx = completion_order.index("slow_analysis") + assert gedit_idx < analysis_idx, ( + f"open_gedit ({gedit_idx}) aurait dû terminer avant " + f"slow_analysis ({analysis_idx})" + ) + + +# ============================================================================= +# Tests d'injection de résultats +# ============================================================================= + + +class TestDependencyInjection: + """Teste la substitution ${step_id.result} dans les paramètres.""" + + def test_simple_result_injection(self, executor, mock_llm): + """${step_id.result} est remplacé par le résultat de l'étape.""" + steps = [ + WorkflowStep( + step_id="analyze", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "Hello"}, + ), + WorkflowStep( + step_id="translate", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "translate", + "text": "${analyze.result}", + "target_lang": "français", + }, + depends_on=["analyze"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + # Le texte de la traduction contient le résultat de l'analyse + translate_result = result.results["translate"] + assert "[Analyse]" in translate_result + + def test_nested_dict_injection(self, executor): + """L'injection fonctionne dans les dicts imbriqués.""" + # Simuler un résultat de type dict + mock_llm = MagicMock() + mock_llm.execute.return_value = {"summary": "Résumé du texte", "lang": "en"} + executor.set_llm_handler(mock_llm) + + steps = [ + WorkflowStep( + step_id="extract", + step_type=StepType.LLM_CALL, + action={"llm_action": "extract_data", "text": "data"}, + ), + WorkflowStep( + step_id="use_result", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "generate_text", + "prompt": "Utilise : ${extract.result.summary}", + }, + depends_on=["extract"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + # Vérifier que le second appel a reçu la valeur injectée + second_call_action = mock_llm.execute.call_args_list[1][0][0] + assert second_call_action["prompt"] == "Utilise : Résumé du texte" + + def test_multiple_references_in_string(self, executor): + """Plusieurs références ${...} dans une même chaîne.""" + mock_llm = MagicMock() + mock_llm.execute.side_effect = ["Bonjour", "世界"] + executor.set_llm_handler(mock_llm) + + steps = [ + WorkflowStep( + step_id="trad_fr", + step_type=StepType.LLM_CALL, + action={"llm_action": "translate", "text": "Hello"}, + ), + WorkflowStep( + step_id="trad_cn", + step_type=StepType.LLM_CALL, + action={"llm_action": "translate", "text": "World"}, + ), + WorkflowStep( + step_id="combine", + step_type=StepType.UI_ACTION, + action={ + "type": "type_text", + "text": "FR: ${trad_fr.result}, CN: ${trad_cn.result}", + }, + depends_on=["trad_fr", "trad_cn"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is True + # Vérifier via le handler UI que le texte a été injecté + combine_action = result.steps["combine"]["action"] + # Le résultat de combine contient les deux traductions + # (vérifié via le mock_ui qui reçoit l'action résolue) + + def test_missing_reference_preserved(self, executor, mock_llm): + """Une référence à un résultat inexistant est conservée telle quelle.""" + steps = [ + WorkflowStep( + step_id="use_missing", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "generate_text", + "prompt": "Valeur : ${nonexistent.result}", + }, + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + assert result.success is True + + +# ============================================================================= +# Tests de gestion des échecs +# ============================================================================= + + +class TestFailureHandling: + """Teste la propagation des échecs dans le DAG.""" + + def test_failed_step_blocks_dependents(self, executor): + """Une étape échouée cause le SKIP de ses dépendants.""" + failing_llm = MagicMock() + failing_llm.execute.side_effect = RuntimeError("Ollama down") + executor.set_llm_handler(failing_llm) + + steps = [ + WorkflowStep( + step_id="will_fail", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "test"}, + ), + WorkflowStep( + step_id="dependent_1", + step_type=StepType.LLM_CALL, + action={"llm_action": "translate", "text": "${will_fail.result}"}, + depends_on=["will_fail"], + ), + WorkflowStep( + step_id="dependent_2", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + depends_on=["dependent_1"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is False + assert result.steps["will_fail"]["status"] == "failed" + assert result.steps["dependent_1"]["status"] == "skipped" + assert result.steps["dependent_2"]["status"] == "skipped" + assert len(result.errors) >= 1 + + def test_independent_steps_continue_after_failure(self, executor, mock_llm): + """Les étapes indépendantes continuent même si une autre échoue.""" + # Configurer un handler qui échoue seulement pour step_fail + original_execute = mock_llm.execute + + def selective_fail(action, context): + if action.get("_will_fail"): + raise RuntimeError("Échec intentionnel") + return original_execute(action, context) + + mock_llm.execute = selective_fail + + steps = [ + WorkflowStep( + step_id="step_fail", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "fail", "_will_fail": True}, + ), + WorkflowStep( + step_id="step_ok", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "ok"}, + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.success is False # Global = échec car step_fail a échoué + assert result.steps["step_fail"]["status"] == "failed" + assert result.steps["step_ok"]["status"] == "completed" + + def test_partial_dependency_failure(self, executor, mock_llm): + """Si une branche échoue mais l'autre réussit, le dépendant commun est SKIPPED.""" + original_execute = mock_llm.execute + + def selective_fail(action, context): + if action.get("_will_fail"): + raise RuntimeError("Branche échouée") + return original_execute(action, context) + + mock_llm.execute = selective_fail + + steps = [ + WorkflowStep( + step_id="branch_ok", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "ok"}, + ), + WorkflowStep( + step_id="branch_fail", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "_will_fail": True}, + ), + WorkflowStep( + step_id="join", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + depends_on=["branch_ok", "branch_fail"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=10) + + assert result.steps["branch_ok"]["status"] == "completed" + assert result.steps["branch_fail"]["status"] == "failed" + assert result.steps["join"]["status"] == "skipped" + + +# ============================================================================= +# Tests d'annulation +# ============================================================================= + + +class TestCancelWorkflow: + """Teste l'annulation de workflow en cours.""" + + def test_cancel_stops_new_steps(self, executor): + """L'annulation empêche le démarrage de nouvelles étapes.""" + # LLM lent pour avoir le temps d'annuler + slow_llm = MockLLMHandler(delay=0.3) + executor.set_llm_handler(slow_llm) + + steps = [ + WorkflowStep( + step_id="slow_step", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "lent"}, + ), + WorkflowStep( + step_id="after_slow", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + depends_on=["slow_step"], + ), + ] + + executor.load_workflow(steps) + + # Annuler après un court délai + def cancel_soon(): + time.sleep(0.05) + executor.cancel() + + cancel_thread = threading.Thread(target=cancel_soon) + cancel_thread.start() + + result = executor.execute(timeout=10) + cancel_thread.join() + + # Le workflow ne devrait pas être un succès complet + status = executor.get_status() + assert status["cancelled"] is True + + def test_cancel_idempotent(self, executor, mock_llm): + """Appeler cancel() plusieurs fois ne pose pas de problème.""" + steps = [ + WorkflowStep( + step_id="s1", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + ), + ] + + executor.load_workflow(steps) + executor.cancel() + executor.cancel() # Deuxième appel — ne doit pas lever d'exception + + status = executor.get_status() + assert status["cancelled"] is True + + +# ============================================================================= +# Tests de validation du DAG +# ============================================================================= + + +class TestDAGValidation: + """Teste la validation de la structure du graphe.""" + + def test_cycle_detection(self, executor): + """Un cycle dans le graphe lève une ValueError.""" + steps = [ + WorkflowStep(step_id="A", step_type=StepType.UI_ACTION, depends_on=["C"]), + WorkflowStep(step_id="B", step_type=StepType.UI_ACTION, depends_on=["A"]), + WorkflowStep(step_id="C", step_type=StepType.UI_ACTION, depends_on=["B"]), + ] + + with pytest.raises(ValueError, match="[Cc]ycle"): + executor.load_workflow(steps) + + def test_self_dependency_detected(self, executor): + """Une étape qui dépend d'elle-même est détectée comme cycle.""" + steps = [ + WorkflowStep(step_id="self_ref", step_type=StepType.UI_ACTION, depends_on=["self_ref"]), + ] + + with pytest.raises(ValueError, match="[Cc]ycle"): + executor.load_workflow(steps) + + def test_missing_dependency_raises(self, executor): + """Référencer une dépendance inexistante lève une ValueError.""" + steps = [ + WorkflowStep( + step_id="orphan", + step_type=StepType.UI_ACTION, + depends_on=["nonexistent"], + ), + ] + + with pytest.raises(ValueError, match="nonexistent"): + executor.load_workflow(steps) + + def test_duplicate_step_id_raises(self, executor): + """Deux étapes avec le même ID lèvent une ValueError.""" + steps = [ + WorkflowStep(step_id="dup", step_type=StepType.UI_ACTION), + WorkflowStep(step_id="dup", step_type=StepType.LLM_CALL), + ] + + with pytest.raises(ValueError, match="dupliqué"): + executor.load_workflow(steps) + + def test_empty_workflow(self, executor): + """Un workflow vide est valide et s'exécute immédiatement.""" + executor.load_workflow([]) + result = executor.execute(timeout=5) + assert result.success is True + + +# ============================================================================= +# Tests des étapes spéciales (WAIT, CONDITION) +# ============================================================================= + + +class TestSpecialSteps: + """Teste les étapes WAIT et CONDITION.""" + + def test_wait_step(self, executor): + """L'étape WAIT attend la durée spécifiée.""" + steps = [ + WorkflowStep( + step_id="wait_short", + step_type=StepType.WAIT, + action={"duration": 0.05}, + ), + ] + + executor.load_workflow(steps) + start = time.monotonic() + result = executor.execute(timeout=5) + elapsed = time.monotonic() - start + + assert result.success is True + assert elapsed >= 0.04 # Au moins 40ms (marge) + assert result.results["wait_short"]["waited"] == 0.05 + + def test_condition_true(self, executor, mock_llm): + """Une condition vraie laisse passer les dépendants.""" + steps = [ + WorkflowStep( + step_id="check", + step_type=StepType.CONDITION, + action={"condition": "True"}, + ), + WorkflowStep( + step_id="after_check", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + depends_on=["check"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + assert result.success is True + assert result.steps["check"]["status"] == "completed" + assert result.steps["after_check"]["status"] == "completed" + + def test_condition_false_with_skip(self, executor): + """Une condition fausse avec skip_on_false ignore les dépendants.""" + steps = [ + WorkflowStep( + step_id="check_false", + step_type=StepType.CONDITION, + action={"condition": "False", "skip_on_false": True}, + ), + WorkflowStep( + step_id="should_skip", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + depends_on=["check_false"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + assert result.steps["check_false"]["status"] == "completed" + assert result.steps["should_skip"]["status"] == "skipped" + + +# ============================================================================= +# Tests du statut et des callbacks +# ============================================================================= + + +class TestStatusAndCallbacks: + """Teste le reporting de statut et les callbacks.""" + + def test_get_status_returns_all_steps(self, executor): + """get_status() retourne l'état de toutes les étapes.""" + steps = [ + WorkflowStep(step_id="s1", step_type=StepType.UI_ACTION, action={"type": "a"}), + WorkflowStep(step_id="s2", step_type=StepType.UI_ACTION, action={"type": "b"}, depends_on=["s1"]), + ] + + executor.load_workflow(steps) + status = executor.get_status() + + assert "s1" in status["steps"] + assert "s2" in status["steps"] + assert status["steps"]["s1"]["status"] == "pending" + + def test_callbacks_called_on_status_change(self, executor, mock_llm): + """Les callbacks sont appelés à chaque changement de statut.""" + changes = [] + + def track_changes(step): + changes.append((step.step_id, step.status.value)) + + executor.on_step_change(track_changes) + + steps = [ + WorkflowStep( + step_id="only", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + ), + ] + + executor.load_workflow(steps) + executor.execute(timeout=5) + + # Au minimum : RUNNING → COMPLETED + statuses = [s for sid, s in changes if sid == "only"] + assert "running" in statuses + assert "completed" in statuses + + def test_callback_error_does_not_crash(self, executor, mock_llm): + """Un callback qui lève une exception ne fait pas planter l'exécution.""" + + def bad_callback(step): + raise RuntimeError("Callback cassé !") + + executor.on_step_change(bad_callback) + + steps = [ + WorkflowStep( + step_id="robust", + step_type=StepType.UI_ACTION, + action={"type": "click"}, + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + # L'exécution réussit malgré le callback cassé + assert result.success is True + + def test_step_duration_tracked(self, executor): + """La durée de chaque étape est enregistrée.""" + slow_llm = MockLLMHandler(delay=0.05) + executor.set_llm_handler(slow_llm) + + steps = [ + WorkflowStep( + step_id="timed", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "test"}, + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=5) + + step_data = result.steps["timed"] + assert step_data["duration"] is not None + assert step_data["duration"] >= 0.04 # Au moins ~50ms + + +# ============================================================================= +# Tests du workflow complet (scénario utilisateur) +# ============================================================================= + + +class TestFullWorkflowScenario: + """Teste le scénario complet décrit dans la spécification.""" + + def test_onlyoffice_translate_gedit_scenario(self, executor, mock_llm, mock_ui): + """Scénario complet : OnlyOffice → analyse → 2 traductions → Gedit. + + 1. Ouvrir OnlyOffice (UI) + 2. Sélectionner texte (UI, dépend de 1) + 3. Analyser texte (LLM, dépend de 2) + 4. Traduire FR (LLM, dépend de 3) + 5. Traduire CN (LLM, dépend de 3, parallèle avec 4) + 6. Ouvrir Gedit (UI, indépendant) + 7. Écrire FR (UI, dépend de 4 ET 6) + 8. Écrire CN (UI, dépend de 5 ET 7) + """ + steps = [ + WorkflowStep( + step_id="open_office", + step_type=StepType.UI_ACTION, + action={"type": "open", "app": "onlyoffice"}, + ), + WorkflowStep( + step_id="select_text", + step_type=StepType.UI_ACTION, + action={"type": "select_all"}, + depends_on=["open_office"], + ), + WorkflowStep( + step_id="analyze", + step_type=StepType.LLM_CALL, + action={"llm_action": "analyze_text", "text": "Sample document text"}, + depends_on=["select_text"], + ), + WorkflowStep( + step_id="trad_fr", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "translate", + "text": "${analyze.result}", + "target_lang": "français", + }, + depends_on=["analyze"], + ), + WorkflowStep( + step_id="trad_cn", + step_type=StepType.LLM_CALL, + action={ + "llm_action": "translate", + "text": "${analyze.result}", + "target_lang": "chinois", + }, + depends_on=["analyze"], + ), + WorkflowStep( + step_id="open_gedit", + step_type=StepType.UI_ACTION, + action={"type": "open", "app": "gedit"}, + # Pas de dépendance — peut démarrer dès le début + ), + WorkflowStep( + step_id="write_fr", + step_type=StepType.UI_ACTION, + action={ + "type": "type_text", + "text": "${trad_fr.result}", + }, + depends_on=["trad_fr", "open_gedit"], + ), + WorkflowStep( + step_id="write_cn", + step_type=StepType.UI_ACTION, + action={ + "type": "type_text", + "text": "${trad_cn.result}", + }, + depends_on=["trad_cn", "write_fr"], + ), + ] + + executor.load_workflow(steps) + result = executor.execute(timeout=30) + + assert result.success is True + assert len(result.results) == 8 + assert len(result.errors) == 0 + + # Vérifier que les traductions ont été produites + assert "français" in result.results["trad_fr"] + assert "chinois" in result.results["trad_cn"] + + # Vérifier que toutes les étapes sont COMPLETED + for sid, step_data in result.steps.items(): + assert step_data["status"] == "completed", ( + f"Étape '{sid}' devrait être completed, " + f"pas {step_data['status']}" + ) + + +# ============================================================================= +# Tests du LLMActionHandler (mock Ollama) +# ============================================================================= + + +class TestLLMActionHandler: + """Teste le LLMActionHandler avec un serveur Ollama mocké.""" + + def test_dispatch_analyze(self): + """Le dispatcher route correctement vers analyze_text.""" + handler = LLMActionHandler() + with patch.object(handler, "_chat", return_value="Résumé du texte") as mock_chat: + result = handler.execute( + {"llm_action": "analyze_text", "text": "Hello", "instruction": "Résume"}, + {}, + ) + assert result == "Résumé du texte" + mock_chat.assert_called_once() + + def test_dispatch_translate(self): + """Le dispatcher route correctement vers translate.""" + handler = LLMActionHandler() + with patch.object(handler, "_chat", return_value="Bonjour") as mock_chat: + result = handler.execute( + {"llm_action": "translate", "text": "Hello", "target_lang": "français"}, + {}, + ) + assert result == "Bonjour" + + def test_dispatch_unknown_action_raises(self): + """Une action LLM inconnue lève une ValueError.""" + handler = LLMActionHandler() + with pytest.raises(ValueError, match="inconnue"): + handler.execute({"llm_action": "unknown_action"}, {}) + + def test_extract_data_parses_json(self): + """extract_data parse la réponse JSON du LLM.""" + handler = LLMActionHandler() + json_response = '{"name": "Jean", "age": 42}' + with patch.object(handler, "_chat", return_value=json_response): + result = handler.extract_data("texte", {"name": "str", "age": "int"}) + assert result == {"name": "Jean", "age": 42} + + def test_extract_data_handles_bad_json(self): + """extract_data gère gracieusement une réponse non-JSON.""" + handler = LLMActionHandler() + with patch.object(handler, "_chat", return_value="Voici le résultat: {\"a\": 1}"): + result = handler.extract_data("texte", {"a": "int"}) + assert result["a"] == 1 + + def test_chat_uses_api_chat_endpoint(self): + """_chat appelle /api/chat (pas /api/generate).""" + handler = LLMActionHandler(ollama_endpoint="http://localhost:11434") + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "message": {"content": "Réponse test"} + } + + with patch("core.execution.llm_actions.requests.post", return_value=mock_response) as mock_post: + result = handler._chat("system", "user message") + assert result == "Réponse test" + # Vérifier que c'est bien /api/chat + call_url = mock_post.call_args[0][0] + assert call_url == "http://localhost:11434/api/chat" + + def test_chat_includes_nothink_for_qwen(self): + """Pour les modèles Qwen, /nothink est ajouté au message utilisateur.""" + handler = LLMActionHandler(model="qwen3-vl:8b") + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = {"message": {"content": "ok"}} + + with patch("core.execution.llm_actions.requests.post", return_value=mock_response) as mock_post: + handler._chat("system", "test message") + payload = mock_post.call_args[1]["json"] + user_msg = payload["messages"][1]["content"] + assert user_msg.startswith("/nothink") + + def test_connection_error_raises_runtime(self): + """Une erreur de connexion lève RuntimeError.""" + handler = LLMActionHandler(ollama_endpoint="http://localhost:99999") + with patch("core.execution.llm_actions.requests.post", side_effect=requests.exceptions.ConnectionError): + with pytest.raises(RuntimeError, match="connecter"): + handler._chat("system", "test") + + def test_timeout_error_raises_runtime(self): + """Un timeout lève RuntimeError.""" + handler = LLMActionHandler(timeout=1) + with patch("core.execution.llm_actions.requests.post", side_effect=requests.exceptions.Timeout): + with pytest.raises(RuntimeError, match="[Tt]imeout"): + handler._chat("system", "test") diff --git a/visual_workflow_builder/backend/api_v3/__init__.py b/visual_workflow_builder/backend/api_v3/__init__.py index 054fa7f1b..6f7f8b6db 100644 --- a/visual_workflow_builder/backend/api_v3/__init__.py +++ b/visual_workflow_builder/backend/api_v3/__init__.py @@ -17,4 +17,10 @@ from . import execute from . import match # Matching sémantique des workflows from . import review # Review/Validation de workflows importés +# DAG Executor — exécution parallèle avec étapes LLM +try: + from . import dag_execute # noqa: F401 +except ImportError as e: + print(f"⚠️ Module dag_execute désactivé: {e}") + __all__ = ['api_v3_bp'] diff --git a/visual_workflow_builder/backend/api_v3/dag_execute.py b/visual_workflow_builder/backend/api_v3/dag_execute.py new file mode 100644 index 000000000..cdac32ad3 --- /dev/null +++ b/visual_workflow_builder/backend/api_v3/dag_execute.py @@ -0,0 +1,340 @@ +""" +API v3 - Exécution DAG de Workflows avec étapes LLM + +Convertit un workflow VWB (nœuds + edges) en DAGExecutor steps +et lance l'exécution parallèle (UI séquentiel, LLM parallèle). + +POST /api/v3/workflow//execute-dag → Lance l'exécution DAG +GET /api/v3/workflow//dag-status → Statut de l'exécution en cours + +Auteur : Dom, Claude — 16 mars 2026 +""" + +import json +import logging +import sys +import traceback +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from flask import jsonify, request + +from . import api_v3_bp + +logger = logging.getLogger(__name__) + +# Ajouter le répertoire racine pour les imports core +_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + +from core.execution.dag_executor import ( + DAGExecutionResult, + DAGExecutor, + StepStatus, + StepType, + WorkflowStep, +) +from core.execution.llm_actions import LLMActionHandler + +# --------------------------------------------------------------------------- +# Types d'actions VWB → StepType du DAGExecutor +# --------------------------------------------------------------------------- + +# Les action_types VWB qui correspondent à des appels LLM +_LLM_ACTION_TYPES = { + "llm_analyze", + "llm_translate", + "llm_extract_data", + "llm_generate", +} + +# Mapping action_type VWB → llm_action du LLMActionHandler +_LLM_ACTION_MAP = { + "llm_analyze": "analyze_text", + "llm_translate": "translate", + "llm_extract_data": "extract_data", + "llm_generate": "generate_text", +} + +# Actions VWB de type attente +_WAIT_ACTION_TYPES = {"wait_for_anchor"} + +# Actions VWB de type condition +_CONDITION_ACTION_TYPES = {"visual_condition"} + + +def _classify_step_type(action_type: str) -> StepType: + """Détermine le StepType DAG à partir du action_type VWB.""" + if action_type in _LLM_ACTION_TYPES: + return StepType.LLM_CALL + if action_type in _WAIT_ACTION_TYPES: + return StepType.WAIT + if action_type in _CONDITION_ACTION_TYPES: + return StepType.CONDITION + return StepType.UI_ACTION + + +def _build_llm_action(action_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]: + """Construit le dict d'action LLM attendu par le LLMActionHandler. + + Ajoute la clé 'llm_action' et recopie les paramètres pertinents. + """ + llm_action = _LLM_ACTION_MAP.get(action_type) + if not llm_action: + raise ValueError(f"Type d'action LLM inconnu : {action_type}") + + action = {"llm_action": llm_action} + + # Copier les paramètres pertinents sans modification + for key in ("text", "instruction", "model", "temperature", + "target_lang", "source_lang", "schema", "prompt", "context"): + if key in parameters and parameters[key]: + val = parameters[key] + # Le schema peut être une chaîne JSON — le parser + if key == "schema" and isinstance(val, str): + try: + val = json.loads(val) + except json.JSONDecodeError: + pass + action[key] = val + + return action + + +# --------------------------------------------------------------------------- +# Conversion VWB workflow → DAG steps +# --------------------------------------------------------------------------- + +def _convert_vwb_to_dag_steps( + steps_data: List[Dict[str, Any]], + edges_data: List[Dict[str, Any]], +) -> List[WorkflowStep]: + """Convertit les nœuds et edges VWB en liste de WorkflowStep DAG. + + Les edges définissent les dépendances : si un edge va de A vers B, + alors B dépend de A. + + Args: + steps_data: Liste de dicts (step.to_dict() depuis le modèle SQLAlchemy) + edges_data: Liste de dicts {"source": "step_id_A", "target": "step_id_B"} + + Returns: + Liste de WorkflowStep prêtes à être chargées dans le DAGExecutor + """ + # Construire le mapping des dépendances depuis les edges + depends_map: Dict[str, List[str]] = {} + for edge in edges_data: + source = edge.get("source", "") + target = edge.get("target", "") + if source and target: + depends_map.setdefault(target, []).append(source) + + dag_steps = [] + for step_data in steps_data: + step_id = step_data["id"] + action_type = step_data["action_type"] + parameters = step_data.get("parameters", {}) + + step_type = _classify_step_type(action_type) + depends_on = depends_map.get(step_id, []) + + # Construire l'action selon le type + if step_type == StepType.LLM_CALL: + action = _build_llm_action(action_type, parameters) + else: + # Pour les actions UI, passer les paramètres tels quels + action = {"type": action_type, **parameters} + + dag_step = WorkflowStep( + step_id=step_id, + step_type=step_type, + action=action, + depends_on=depends_on, + ) + dag_steps.append(dag_step) + + return dag_steps + + +# --------------------------------------------------------------------------- +# Instance globale du dernier exécuteur (pour le status polling) +# --------------------------------------------------------------------------- +_current_executor: Optional[DAGExecutor] = None +_last_result: Optional[DAGExecutionResult] = None + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@api_v3_bp.route('/workflow//execute-dag', methods=['POST']) +def execute_dag(workflow_id: str): + """ + Lance l'exécution DAG d'un workflow VWB. + + Les étapes LLM (llm_analyze, llm_translate, llm_extract_data, llm_generate) + sont exécutées en parallèle via Ollama. Les étapes UI restent séquentielles. + + Body (optionnel) : + { + "edges": [{"source": "step_A", "target": "step_B"}, ...], + "timeout": 300, + "model": "qwen3-vl:8b", + "ollama_endpoint": "http://localhost:11434" + } + + Response : + { + "success": true/false, + "execution": { "success": ..., "steps": {...}, "results": {...}, ... } + } + """ + global _current_executor, _last_result + + try: + from db.models import Workflow, Step + + workflow = Workflow.query.get(workflow_id) + if not workflow: + return jsonify({ + 'success': False, + 'error': f"Workflow '{workflow_id}' non trouvé" + }), 404 + + # Récupérer les étapes depuis la BDD + steps_db = Step.query.filter_by( + workflow_id=workflow_id + ).order_by(Step.order).all() + + if not steps_db: + return jsonify({ + 'success': False, + 'error': "Le workflow n'a aucune étape" + }), 400 + + steps_data = [s.to_dict() for s in steps_db] + + # Récupérer les edges depuis le body (le frontend les envoie) + data = request.get_json() or {} + edges_data = data.get("edges", []) + + # Si pas d'edges fournis, créer une chaîne linéaire par défaut + if not edges_data: + for i in range(len(steps_data) - 1): + edges_data.append({ + "source": steps_data[i]["id"], + "target": steps_data[i + 1]["id"], + }) + + # Paramètres optionnels + timeout = data.get("timeout", 300) + model = data.get("model", "qwen3-vl:8b") + ollama_endpoint = data.get("ollama_endpoint", "http://localhost:11434") + + # Convertir en étapes DAG + dag_steps = _convert_vwb_to_dag_steps(steps_data, edges_data) + + # Vérifier s'il y a des étapes LLM + has_llm_steps = any(s.step_type == StepType.LLM_CALL for s in dag_steps) + + # Créer le handler LLM si nécessaire + llm_handler = None + if has_llm_steps: + llm_handler = LLMActionHandler( + ollama_endpoint=ollama_endpoint, + model=model, + ) + + # Créer et configurer l'exécuteur + executor = DAGExecutor( + max_llm_workers=2, + max_ui_workers=1, + llm_handler=llm_handler, + ) + + # Charger le workflow dans le DAG + executor.load_workflow(dag_steps) + + # Garder une référence pour le status + _current_executor = executor + _last_result = None + + logger.info( + "Lancement exécution DAG pour workflow '%s' : %d étapes (%d LLM)", + workflow_id, + len(dag_steps), + sum(1 for s in dag_steps if s.step_type == StepType.LLM_CALL), + ) + + # Exécuter (bloquant — le timeout protège) + result = executor.execute(timeout=timeout) + _last_result = result + + logger.info( + "Exécution DAG terminée : success=%s, durée=%.2fs", + result.success, + result.duration_seconds, + ) + + return jsonify({ + 'success': True, + 'execution': result.to_dict(), + }) + + except ValueError as e: + return jsonify({ + 'success': False, + 'error': f"Erreur de validation : {str(e)}" + }), 400 + + except Exception as e: + traceback.print_exc() + return jsonify({ + 'success': False, + 'error': f"Erreur d'exécution : {str(e)}" + }), 500 + + +@api_v3_bp.route('/workflow//dag-status', methods=['GET']) +def get_dag_status(workflow_id: str): + """ + Retourne le statut de la dernière exécution DAG. + + Response : + { + "success": true, + "status": { "steps": {...}, "results": {...}, "summary": {...} } + } + """ + global _current_executor, _last_result + + try: + # Si une exécution est terminée, retourner le résultat + if _last_result is not None: + return jsonify({ + 'success': True, + 'completed': True, + 'status': _last_result.to_dict(), + }) + + # Si un exécuteur est en cours, retourner son état + if _current_executor is not None: + return jsonify({ + 'success': True, + 'completed': False, + 'status': _current_executor.get_status(), + }) + + return jsonify({ + 'success': True, + 'completed': False, + 'status': None, + 'message': "Aucune exécution DAG en cours", + }) + + except Exception as e: + return jsonify({ + 'success': False, + 'error': str(e) + }), 500 diff --git a/visual_workflow_builder/backend/contracts/action_contracts.py b/visual_workflow_builder/backend/contracts/action_contracts.py index 359e6ab25..7b687a4b8 100644 --- a/visual_workflow_builder/backend/contracts/action_contracts.py +++ b/visual_workflow_builder/backend/contracts/action_contracts.py @@ -314,6 +314,35 @@ VWB_ACTION_CONTRACTS: Dict[str, ActionContract] = { optional_params=["match_mode", "case_sensitive"], param_validators={"visual_anchor": lambda p: has_visual_anchor({"visual_anchor": p})} ), + + # --- ACTIONS DAG LLM — Exécutées via le DAGExecutor --- + "llm_analyze": ActionContract( + action_type="llm_analyze", + description="Analyser/résumer un texte via LLM (DAGExecutor)", + required_params=[], + optional_params=["text", "instruction", "model", "temperature"], + ), + + "llm_translate": ActionContract( + action_type="llm_translate", + description="Traduire un texte via LLM (DAGExecutor)", + required_params=[], + optional_params=["text", "target_lang", "source_lang", "model", "temperature"], + ), + + "llm_extract_data": ActionContract( + action_type="llm_extract_data", + description="Extraire des données structurées d'un texte via LLM (DAGExecutor)", + required_params=[], + optional_params=["text", "schema", "model", "temperature"], + ), + + "llm_generate": ActionContract( + action_type="llm_generate", + description="Générer du texte via LLM (DAGExecutor)", + required_params=[], + optional_params=["prompt", "context", "model", "temperature"], + ), } diff --git a/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx b/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx index 67b20871a..0e36d73f9 100644 --- a/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx +++ b/visual_workflow_builder/frontend_v4/src/components/PropertiesPanel.tsx @@ -788,6 +788,194 @@ export default function PropertiesPanel({ step, onUpdateParams, onDelete }: Prop ); + // === DAG LLM === + case 'llm_analyze': + return ( + <> +
+ 🔬 Analyser texte (LLM) +
+
+ +