Feat: Intégration système d'apprentissage VWB

- Création service learning_integration.py (pont VWB <-> LearningManager)
- Enregistrement automatique des workflows à la création
- Enregistrement des résultats d'exécution (succès/échec + confiance)
- Endpoints API: /workflows/<id>/feedback et /workflows/<id>/learning
- Boutons feedback (pouce vert/rouge) dans VWBExecutorExtension
- Fix: VariableAutocomplete inputRef pour setSelectionRange
- Amélioration: Chips cliquables pour insérer les variables

Le système apprend maintenant des exécutions et feedbacks utilisateur.
États: OBSERVATION -> COACHING -> AUTO_CANDIDATE -> AUTO_CONFIRMED

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-14 21:30:23 +01:00
parent e7657ee1e5
commit c636f7f163
5 changed files with 3484 additions and 0 deletions

View File

@@ -0,0 +1,716 @@
"""
Workflows API Blueprint
Provides REST endpoints for workflow CRUD operations.
"""
from flask import Blueprint, request, jsonify
from datetime import datetime
from typing import Dict, List, Any
import traceback
from models import (
VisualWorkflow,
VisualNode,
VisualEdge,
Variable,
WorkflowSettings,
generate_id
)
from services.serialization import (
WorkflowSerializer,
WorkflowDatabase,
SerializationError,
ValidationError as SerializationValidationError,
create_empty_workflow
)
from services.converter import (
VisualToGraphConverter,
ConversionError,
convert_visual_to_graph
)
from services.execution_integration import (
get_executor,
ExecutionStatus
)
from services.learning_integration import (
register_workflow_for_learning,
record_workflow_execution,
get_workflow_learning_state,
get_workflow_stats
)
from .errors import (
ValidationError,
NotFoundError,
BadRequestError,
error_response
)
from .validation import validate_workflow_data, validate_update_data
workflows_bp = Blueprint('workflows', __name__)
# Database instance for persistent storage
db = WorkflowDatabase("data/workflows")
# Keep in-memory store for backward compatibility (will be removed later)
workflows_store: Dict[str, VisualWorkflow] = {}
def _refresh_store_from_db() -> None:
"""Synchronise le cache mémoire depuis le stockage persistant."""
try:
workflows = db.list()
workflows_store.clear()
for w in workflows:
workflows_store[w.id] = w
except Exception:
# Ne jamais empêcher le serveur de démarrer si le disque est indisponible.
pass
def _get_workflow_cached(workflow_id: str) -> VisualWorkflow | None:
"""Retourne le workflow depuis le cache, sinon tente de le charger depuis la DB."""
if workflow_id in workflows_store:
return workflows_store[workflow_id]
try:
w = db.load(workflow_id)
except Exception:
w = None
if w is not None:
workflows_store[workflow_id] = w
return w
# Warmup cache on import (best-effort)
_refresh_store_from_db()
@workflows_bp.route('/', methods=['GET'])
def list_workflows():
"""
List all workflows
Query parameters:
- category: Filter by category
- is_template: Filter templates (true/false)
- tags: Comma-separated list of tags
"""
try:
# Get query parameters
category = request.args.get('category')
is_template = request.args.get('is_template')
tags = request.args.get('tags')
# Refresh cache from DB (source of truth)
_refresh_store_from_db()
# Filter workflows
workflows = list(workflows_store.values())
if category:
workflows = [w for w in workflows if w.category == category]
if is_template is not None:
is_template_bool = is_template.lower() == 'true'
workflows = [w for w in workflows if w.is_template == is_template_bool]
if tags:
tag_list = [t.strip() for t in tags.split(',')]
workflows = [w for w in workflows if any(tag in w.tags for tag in tag_list)]
# Serialize workflows
result = [w.to_dict() for w in workflows]
return jsonify(result), 200
except Exception as e:
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/', methods=['POST'])
def create_workflow():
"""
Create a new workflow
Request body:
{
"name": "Workflow Name",
"description": "Optional description",
"created_by": "user_id",
"nodes": [],
"edges": [],
"variables": [],
"settings": {},
"tags": [],
"category": "optional"
}
"""
try:
data = request.get_json(force=True, silent=True)
if data is None:
raise BadRequestError("Request body is required")
# Validate required fields
if 'name' not in data:
raise ValidationError("Field 'name' is required")
if 'created_by' not in data:
raise ValidationError("Field 'created_by' is required")
# Validate workflow data
validate_workflow_data(data)
# Create workflow with auto-generated ID
workflow = create_empty_workflow(
name=data['name'],
description=data.get('description', ''),
created_by=data['created_by']
)
# Add optional data
if 'nodes' in data:
workflow.nodes = [VisualNode.from_dict(n) for n in data['nodes']]
if 'edges' in data:
workflow.edges = [VisualEdge.from_dict(e) for e in data['edges']]
if 'variables' in data:
workflow.variables = [Variable.from_dict(v) for v in data['variables']]
if 'settings' in data:
workflow.settings = WorkflowSettings.from_dict(data['settings'])
if 'tags' in data:
workflow.tags = data['tags']
if 'category' in data:
workflow.category = data['category']
if 'is_template' in data:
workflow.is_template = data['is_template']
# Validate workflow structure
errors = workflow.validate()
if errors:
raise ValidationError(f"Workflow validation failed: {', '.join(errors)}")
# Save to database
db.save(workflow)
# Also store in memory for backward compatibility
workflows_store[workflow.id] = workflow
# Enregistrer dans le système d'apprentissage
register_workflow_for_learning(workflow)
return jsonify(workflow.to_dict()), 201
except ValidationError as e:
return error_response(400, str(e))
except BadRequestError as e:
return error_response(400, str(e))
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>', methods=['GET'])
def get_workflow(workflow_id: str):
"""Get a specific workflow by ID"""
try:
workflow = _get_workflow_cached(workflow_id)
if workflow is None:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
return jsonify(workflow.to_dict()), 200
except NotFoundError as e:
return error_response(404, str(e))
except Exception as e:
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>', methods=['PUT'])
def update_workflow(workflow_id: str):
"""
Update a workflow
Request body can include any of:
- name
- description
- nodes
- edges
- variables
- settings
- tags
- category
"""
try:
workflow = _get_workflow_cached(workflow_id)
if workflow is None:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
data = request.get_json()
if not data:
raise BadRequestError("Request body is required")
# Validate update data
validate_update_data(data)
# Get existing workflow
workflow_dict = workflow.to_dict()
# Update fields
if 'name' in data:
workflow_dict['name'] = data['name']
if 'description' in data:
workflow_dict['description'] = data['description']
if 'nodes' in data:
workflow_dict['nodes'] = data['nodes']
if 'edges' in data:
workflow_dict['edges'] = data['edges']
if 'variables' in data:
workflow_dict['variables'] = data['variables']
if 'settings' in data:
workflow_dict['settings'] = data['settings']
if 'tags' in data:
workflow_dict['tags'] = data['tags']
if 'category' in data:
workflow_dict['category'] = data['category']
# Update timestamp
workflow_dict['updated_at'] = datetime.now().isoformat()
# Create updated workflow
updated_workflow = VisualWorkflow.from_dict(workflow_dict)
# Validate
errors = updated_workflow.validate()
if errors:
raise ValidationError(f"Workflow validation failed: {', '.join(errors)}")
# Persist + cache
db.save(updated_workflow)
workflows_store[workflow_id] = updated_workflow
# Mettre à jour le système d'apprentissage
register_workflow_for_learning(updated_workflow)
return jsonify(updated_workflow.to_dict()), 200
except NotFoundError as e:
return error_response(404, str(e))
except ValidationError as e:
return error_response(400, str(e))
except BadRequestError as e:
return error_response(400, str(e))
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>', methods=['DELETE'])
def delete_workflow(workflow_id: str):
"""Delete a workflow"""
try:
if not db.exists(workflow_id) and workflow_id not in workflows_store:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
# Delete from persistent storage first
if db.exists(workflow_id):
db.delete(workflow_id)
workflows_store.pop(workflow_id, None)
return '', 204
except NotFoundError as e:
return error_response(404, str(e))
except Exception as e:
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/validate', methods=['POST'])
def validate_workflow(workflow_id: str):
"""
Validate a workflow structure
Returns validation errors and warnings
"""
try:
workflow = _get_workflow_cached(workflow_id)
if workflow is None:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
errors = workflow.validate()
# Check for disconnected nodes (warnings)
warnings = []
if len(workflow.nodes) > 1:
connected_nodes = set()
for edge in workflow.edges:
connected_nodes.add(edge.source)
connected_nodes.add(edge.target)
for node in workflow.nodes:
if node.id not in connected_nodes:
warnings.append(f"Node '{node.id}' is not connected to any other nodes")
return jsonify({
'is_valid': len(errors) == 0,
'errors': errors,
'warnings': warnings
}), 200
except NotFoundError as e:
return error_response(404, str(e))
except Exception as e:
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/convert', methods=['POST'])
def convert_workflow(workflow_id: str):
"""
Convert a visual workflow to WorkflowGraph format
Returns the converted workflow in WorkflowGraph format
"""
try:
# Load workflow
workflow = db.load(workflow_id)
if workflow is None:
if workflow_id not in workflows_store:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
workflow = workflows_store[workflow_id]
# Convert to WorkflowGraph
converter = VisualToGraphConverter()
workflow_graph = converter.convert(workflow)
# Get warnings if any
warnings = converter.get_warnings()
return jsonify({
'workflow_graph': workflow_graph.to_dict(),
'warnings': warnings,
'message': 'Conversion successful'
}), 200
except NotFoundError as e:
return error_response(404, str(e))
except ConversionError as e:
return error_response(400, f"Conversion error: {str(e)}")
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/execute', methods=['POST'])
def execute_workflow(workflow_id: str):
"""
Execute a visual workflow via ExecutionLoop
Body parameters:
- variables: Dict of input variables (optional)
Exigence: 20.1
"""
try:
# Check if workflow exists
if not db.exists(workflow_id) and workflow_id not in workflows_store:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
# Get request data
data = request.get_json() or {}
variables = data.get('variables', {})
# Execute workflow
executor = get_executor()
execution_id = executor.execute_workflow(
workflow_id=workflow_id,
variables=variables
)
return jsonify({
'execution_id': execution_id,
'status': ExecutionStatus.PENDING,
'message': 'Workflow execution started',
'workflow_id': workflow_id
}), 202
except NotFoundError as e:
return error_response(404, str(e))
except ValueError as e:
return error_response(400, str(e))
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/executions/<execution_id>', methods=['GET'])
def get_execution_status(execution_id: str):
"""
Get the status of a workflow execution
Exigence: 20.1
"""
try:
executor = get_executor()
result = executor.get_execution_status(execution_id)
if result is None:
return error_response(404, f"Execution '{execution_id}' not found")
return jsonify(result.to_dict()), 200
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/executions/<execution_id>', methods=['DELETE'])
def cancel_execution(execution_id: str):
"""
Cancel a running workflow execution
Exigence: 20.1
"""
try:
executor = get_executor()
cancelled = executor.cancel_execution(execution_id)
if not cancelled:
return error_response(400, "Execution cannot be cancelled (not running or not found)")
return jsonify({
'message': 'Execution cancelled successfully',
'execution_id': execution_id
}), 200
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/executions', methods=['GET'])
def list_executions():
"""
List workflow executions
Query parameters:
- workflow_id: Filter by workflow ID (optional)
- limit: Maximum number of results (default: 50)
Exigence: 20.1
"""
try:
workflow_id = request.args.get('workflow_id')
limit = int(request.args.get('limit', 50))
executor = get_executor()
executions = executor.list_executions(workflow_id=workflow_id)
# Apply limit
if limit > 0:
executions = executions[:limit]
return jsonify({
'executions': executions,
'total': len(executions)
}), 200
except ValueError as e:
return error_response(400, f"Invalid parameter: {str(e)}")
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/export', methods=['GET'])
def export_workflow(workflow_id: str):
"""
Export a workflow in JSON or YAML format
Query parameters:
- format: 'json' or 'yaml' (default: 'json')
"""
try:
# Load from database
workflow = db.load(workflow_id)
if workflow is None:
# Fallback to memory store
if workflow_id not in workflows_store:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
workflow = workflows_store[workflow_id]
# Get format from query params
export_format = request.args.get('format', 'json').lower()
if export_format not in ['json', 'yaml']:
raise BadRequestError(f"Invalid format: {export_format}. Use 'json' or 'yaml'")
# Serialize
serialized = WorkflowSerializer.serialize(workflow, format=export_format)
# Return with appropriate content type
if export_format == 'json':
return serialized, 200, {'Content-Type': 'application/json'}
else:
return serialized, 200, {'Content-Type': 'application/x-yaml'}
except NotFoundError as e:
return error_response(404, str(e))
except BadRequestError as e:
return error_response(400, str(e))
except SerializationError as e:
return error_response(500, f"Serialization error: {str(e)}")
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/import', methods=['POST'])
def import_workflow():
"""
Import a workflow from JSON or YAML
Query parameters:
- format: 'json' or 'yaml' (default: 'json')
- generate_new_id: 'true' or 'false' (default: 'false')
"""
try:
# Get format from query params
import_format = request.args.get('format', 'json').lower()
generate_new_id = request.args.get('generate_new_id', 'false').lower() == 'true'
if import_format not in ['json', 'yaml']:
raise BadRequestError(f"Invalid format: {import_format}. Use 'json' or 'yaml'")
# Get raw data
if import_format == 'json':
data = request.get_data(as_text=True)
else:
data = request.get_data(as_text=True)
if not data:
raise BadRequestError("Request body is required")
# Deserialize
workflow = WorkflowSerializer.deserialize(data, format=import_format)
# Generate new ID if requested
if generate_new_id:
old_id = workflow.id
workflow.id = WorkflowSerializer.generate_workflow_id()
workflow.created_at = datetime.now()
workflow.updated_at = datetime.now()
print(f"Generated new ID: {old_id} -> {workflow.id}")
# Check if workflow already exists
if db.exists(workflow.id):
raise BadRequestError(f"Workflow with ID '{workflow.id}' already exists. Use generate_new_id=true to create a copy.")
# Save to database
db.save(workflow)
workflows_store[workflow.id] = workflow
return jsonify({
'message': 'Workflow imported successfully',
'workflow': workflow.to_dict()
}), 201
except BadRequestError as e:
return error_response(400, str(e))
except SerializationValidationError as e:
return error_response(400, f"Validation error: {', '.join(e.errors)}")
except SerializationError as e:
return error_response(400, f"Import error: {str(e)}")
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/feedback', methods=['POST'])
def submit_workflow_feedback(workflow_id: str):
"""
Submit user feedback for a workflow execution.
Body:
{
"execution_id": "exec_xxx",
"success": true/false,
"confidence": 0.0-1.0 (optional),
"comment": "optional comment"
}
"""
try:
# Vérifier que le workflow existe
workflow = _get_workflow_cached(workflow_id)
if workflow is None:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
data = request.get_json()
if not data:
raise BadRequestError("Request body is required")
if 'success' not in data:
raise ValidationError("Field 'success' is required")
success = bool(data['success'])
# Confiance élevée si l'utilisateur confirme, plus basse s'il corrige
confidence = data.get('confidence', 0.95 if success else 0.3)
# Enregistrer le feedback dans le système d'apprentissage
recorded = record_workflow_execution(
workflow_id=workflow_id,
success=success,
confidence=confidence
)
# Récupérer l'état mis à jour
new_state = get_workflow_learning_state(workflow_id)
stats = get_workflow_stats(workflow_id)
return jsonify({
'message': 'Feedback enregistré',
'workflow_id': workflow_id,
'feedback_recorded': recorded,
'learning_state': new_state,
'stats': stats
}), 200
except NotFoundError as e:
return error_response(404, str(e))
except (ValidationError, BadRequestError) as e:
return error_response(400, str(e))
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")
@workflows_bp.route('/<workflow_id>/learning', methods=['GET'])
def get_workflow_learning_info(workflow_id: str):
"""
Get learning state and statistics for a workflow
Returns learning state, execution count, success rate, etc.
"""
try:
# Vérifier que le workflow existe
workflow = _get_workflow_cached(workflow_id)
if workflow is None:
raise NotFoundError(f"Workflow '{workflow_id}' not found")
# Récupérer les stats d'apprentissage
stats = get_workflow_stats(workflow_id)
state = get_workflow_learning_state(workflow_id)
if stats is None:
# Workflow pas encore dans le système d'apprentissage
return jsonify({
'workflow_id': workflow_id,
'learning_state': 'NOT_REGISTERED',
'message': 'Ce workflow n\'a pas encore été enregistré dans le système d\'apprentissage',
'stats': None
}), 200
return jsonify({
'workflow_id': workflow_id,
'learning_state': state,
'stats': stats,
'can_auto_execute': state in ['AUTO_CANDIDATE', 'AUTO_CONFIRMED']
}), 200
except NotFoundError as e:
return error_response(404, str(e))
except Exception as e:
traceback.print_exc()
return error_response(500, f"Internal server error: {str(e)}")