feat: DAG executor async + intégration IA/LLM dans le VWB

- DAGExecutor : exécution workflow par graphe de dépendances,
  étapes LLM parallèles, UI séquentielles, injection ${step.result}
- LLMActionHandler : analyze_text, translate, extract_data, generate_text
  via Ollama /api/chat (qwen3-vl:8b, temperature 0.1)
- VWB palette : catégorie "IA / LLM" avec 4 actions draggables
- VWB propriétés : éditeurs pour chaque action LLM (modèle, prompt, langue)
- VWB endpoint : POST /api/v3/workflow/<id>/execute-dag
- 37 tests unitaires DAG executor (tous passent)
- Fix log spam cache workflows (info → debug)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-03-16 22:58:44 +01:00
parent ad15237fe0
commit 5e3865d328
11 changed files with 2911 additions and 2 deletions

View File

@@ -17,4 +17,10 @@ from . import execute
from . import match # Matching sémantique des workflows
from . import review # Review/Validation de workflows importés
# DAG Executor — exécution parallèle avec étapes LLM
try:
from . import dag_execute # noqa: F401
except ImportError as e:
print(f"⚠️ Module dag_execute désactivé: {e}")
__all__ = ['api_v3_bp']

View File

@@ -0,0 +1,340 @@
"""
API v3 - Exécution DAG de Workflows avec étapes LLM
Convertit un workflow VWB (nœuds + edges) en DAGExecutor steps
et lance l'exécution parallèle (UI séquentiel, LLM parallèle).
POST /api/v3/workflow/<id>/execute-dag → Lance l'exécution DAG
GET /api/v3/workflow/<id>/dag-status → Statut de l'exécution en cours
Auteur : Dom, Claude — 16 mars 2026
"""
import json
import logging
import sys
import traceback
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from flask import jsonify, request
from . import api_v3_bp
logger = logging.getLogger(__name__)
# Ajouter le répertoire racine pour les imports core
_ROOT = str(Path(__file__).resolve().parent.parent.parent.parent)
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
from core.execution.dag_executor import (
DAGExecutionResult,
DAGExecutor,
StepStatus,
StepType,
WorkflowStep,
)
from core.execution.llm_actions import LLMActionHandler
# ---------------------------------------------------------------------------
# Types d'actions VWB → StepType du DAGExecutor
# ---------------------------------------------------------------------------
# Les action_types VWB qui correspondent à des appels LLM
_LLM_ACTION_TYPES = {
"llm_analyze",
"llm_translate",
"llm_extract_data",
"llm_generate",
}
# Mapping action_type VWB → llm_action du LLMActionHandler
_LLM_ACTION_MAP = {
"llm_analyze": "analyze_text",
"llm_translate": "translate",
"llm_extract_data": "extract_data",
"llm_generate": "generate_text",
}
# Actions VWB de type attente
_WAIT_ACTION_TYPES = {"wait_for_anchor"}
# Actions VWB de type condition
_CONDITION_ACTION_TYPES = {"visual_condition"}
def _classify_step_type(action_type: str) -> StepType:
"""Détermine le StepType DAG à partir du action_type VWB."""
if action_type in _LLM_ACTION_TYPES:
return StepType.LLM_CALL
if action_type in _WAIT_ACTION_TYPES:
return StepType.WAIT
if action_type in _CONDITION_ACTION_TYPES:
return StepType.CONDITION
return StepType.UI_ACTION
def _build_llm_action(action_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
"""Construit le dict d'action LLM attendu par le LLMActionHandler.
Ajoute la clé 'llm_action' et recopie les paramètres pertinents.
"""
llm_action = _LLM_ACTION_MAP.get(action_type)
if not llm_action:
raise ValueError(f"Type d'action LLM inconnu : {action_type}")
action = {"llm_action": llm_action}
# Copier les paramètres pertinents sans modification
for key in ("text", "instruction", "model", "temperature",
"target_lang", "source_lang", "schema", "prompt", "context"):
if key in parameters and parameters[key]:
val = parameters[key]
# Le schema peut être une chaîne JSON — le parser
if key == "schema" and isinstance(val, str):
try:
val = json.loads(val)
except json.JSONDecodeError:
pass
action[key] = val
return action
# ---------------------------------------------------------------------------
# Conversion VWB workflow → DAG steps
# ---------------------------------------------------------------------------
def _convert_vwb_to_dag_steps(
steps_data: List[Dict[str, Any]],
edges_data: List[Dict[str, Any]],
) -> List[WorkflowStep]:
"""Convertit les nœuds et edges VWB en liste de WorkflowStep DAG.
Les edges définissent les dépendances : si un edge va de A vers B,
alors B dépend de A.
Args:
steps_data: Liste de dicts (step.to_dict() depuis le modèle SQLAlchemy)
edges_data: Liste de dicts {"source": "step_id_A", "target": "step_id_B"}
Returns:
Liste de WorkflowStep prêtes à être chargées dans le DAGExecutor
"""
# Construire le mapping des dépendances depuis les edges
depends_map: Dict[str, List[str]] = {}
for edge in edges_data:
source = edge.get("source", "")
target = edge.get("target", "")
if source and target:
depends_map.setdefault(target, []).append(source)
dag_steps = []
for step_data in steps_data:
step_id = step_data["id"]
action_type = step_data["action_type"]
parameters = step_data.get("parameters", {})
step_type = _classify_step_type(action_type)
depends_on = depends_map.get(step_id, [])
# Construire l'action selon le type
if step_type == StepType.LLM_CALL:
action = _build_llm_action(action_type, parameters)
else:
# Pour les actions UI, passer les paramètres tels quels
action = {"type": action_type, **parameters}
dag_step = WorkflowStep(
step_id=step_id,
step_type=step_type,
action=action,
depends_on=depends_on,
)
dag_steps.append(dag_step)
return dag_steps
# ---------------------------------------------------------------------------
# Instance globale du dernier exécuteur (pour le status polling)
# ---------------------------------------------------------------------------
_current_executor: Optional[DAGExecutor] = None
_last_result: Optional[DAGExecutionResult] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@api_v3_bp.route('/workflow/<workflow_id>/execute-dag', methods=['POST'])
def execute_dag(workflow_id: str):
"""
Lance l'exécution DAG d'un workflow VWB.
Les étapes LLM (llm_analyze, llm_translate, llm_extract_data, llm_generate)
sont exécutées en parallèle via Ollama. Les étapes UI restent séquentielles.
Body (optionnel) :
{
"edges": [{"source": "step_A", "target": "step_B"}, ...],
"timeout": 300,
"model": "qwen3-vl:8b",
"ollama_endpoint": "http://localhost:11434"
}
Response :
{
"success": true/false,
"execution": { "success": ..., "steps": {...}, "results": {...}, ... }
}
"""
global _current_executor, _last_result
try:
from db.models import Workflow, Step
workflow = Workflow.query.get(workflow_id)
if not workflow:
return jsonify({
'success': False,
'error': f"Workflow '{workflow_id}' non trouvé"
}), 404
# Récupérer les étapes depuis la BDD
steps_db = Step.query.filter_by(
workflow_id=workflow_id
).order_by(Step.order).all()
if not steps_db:
return jsonify({
'success': False,
'error': "Le workflow n'a aucune étape"
}), 400
steps_data = [s.to_dict() for s in steps_db]
# Récupérer les edges depuis le body (le frontend les envoie)
data = request.get_json() or {}
edges_data = data.get("edges", [])
# Si pas d'edges fournis, créer une chaîne linéaire par défaut
if not edges_data:
for i in range(len(steps_data) - 1):
edges_data.append({
"source": steps_data[i]["id"],
"target": steps_data[i + 1]["id"],
})
# Paramètres optionnels
timeout = data.get("timeout", 300)
model = data.get("model", "qwen3-vl:8b")
ollama_endpoint = data.get("ollama_endpoint", "http://localhost:11434")
# Convertir en étapes DAG
dag_steps = _convert_vwb_to_dag_steps(steps_data, edges_data)
# Vérifier s'il y a des étapes LLM
has_llm_steps = any(s.step_type == StepType.LLM_CALL for s in dag_steps)
# Créer le handler LLM si nécessaire
llm_handler = None
if has_llm_steps:
llm_handler = LLMActionHandler(
ollama_endpoint=ollama_endpoint,
model=model,
)
# Créer et configurer l'exécuteur
executor = DAGExecutor(
max_llm_workers=2,
max_ui_workers=1,
llm_handler=llm_handler,
)
# Charger le workflow dans le DAG
executor.load_workflow(dag_steps)
# Garder une référence pour le status
_current_executor = executor
_last_result = None
logger.info(
"Lancement exécution DAG pour workflow '%s' : %d étapes (%d LLM)",
workflow_id,
len(dag_steps),
sum(1 for s in dag_steps if s.step_type == StepType.LLM_CALL),
)
# Exécuter (bloquant — le timeout protège)
result = executor.execute(timeout=timeout)
_last_result = result
logger.info(
"Exécution DAG terminée : success=%s, durée=%.2fs",
result.success,
result.duration_seconds,
)
return jsonify({
'success': True,
'execution': result.to_dict(),
})
except ValueError as e:
return jsonify({
'success': False,
'error': f"Erreur de validation : {str(e)}"
}), 400
except Exception as e:
traceback.print_exc()
return jsonify({
'success': False,
'error': f"Erreur d'exécution : {str(e)}"
}), 500
@api_v3_bp.route('/workflow/<workflow_id>/dag-status', methods=['GET'])
def get_dag_status(workflow_id: str):
"""
Retourne le statut de la dernière exécution DAG.
Response :
{
"success": true,
"status": { "steps": {...}, "results": {...}, "summary": {...} }
}
"""
global _current_executor, _last_result
try:
# Si une exécution est terminée, retourner le résultat
if _last_result is not None:
return jsonify({
'success': True,
'completed': True,
'status': _last_result.to_dict(),
})
# Si un exécuteur est en cours, retourner son état
if _current_executor is not None:
return jsonify({
'success': True,
'completed': False,
'status': _current_executor.get_status(),
})
return jsonify({
'success': True,
'completed': False,
'status': None,
'message': "Aucune exécution DAG en cours",
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500