Files
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

426 lines
13 KiB
Python

"""
Analytics API endpoints for Visual Workflow Builder.
Provides analytics data and metrics for workflows executed through the visual builder.
Exigence: 18.3
"""
import sys
from pathlib import Path
from flask import Blueprint, request, jsonify
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
# Ajouter le chemin racine pour importer les modules core
sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent))
try:
from core.analytics.analytics_system import get_analytics_system
from core.analytics.integration.execution_integration import get_analytics_integration
ANALYTICS_AVAILABLE = True
except ImportError:
ANALYTICS_AVAILABLE = False
from services.execution_integration import get_executor
from services.serialization import WorkflowDatabase
# Blueprint pour les endpoints Analytics
analytics_bp = Blueprint('analytics', __name__)
@analytics_bp.route('/workflow/<workflow_id>/metrics', methods=['GET'])
def get_workflow_metrics(workflow_id: str):
"""
Récupère les métriques d'un workflow.
Exigence: 18.3
Query Parameters:
- hours: Fenêtre de temps en heures (défaut: 24)
- metric_type: Type de métrique (execution, step, performance)
"""
try:
if not ANALYTICS_AVAILABLE:
return jsonify({
'success': False,
'error': 'Analytics system not available'
}), 503
hours = int(request.args.get('hours', 24))
metric_type = request.args.get('metric_type', 'execution')
# Récupérer les métriques via l'exécuteur
executor = get_executor()
analytics_data = executor.get_workflow_analytics(workflow_id, hours)
if analytics_data is None:
return jsonify({
'success': False,
'error': 'No analytics data available'
}), 404
return jsonify({
'success': True,
'workflow_id': workflow_id,
'time_window_hours': hours,
'metric_type': metric_type,
'data': analytics_data
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@analytics_bp.route('/workflow/<workflow_id>/performance', methods=['GET'])
def get_workflow_performance(workflow_id: str):
"""
Récupère les métriques de performance d'un workflow.
Exigence: 18.3
"""
try:
if not ANALYTICS_AVAILABLE:
return jsonify({
'success': False,
'error': 'Analytics system not available'
}), 503
hours = int(request.args.get('hours', 24))
analytics_system = get_analytics_system()
# Calculer la fenêtre de temps
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# Analyser les performances
performance_stats = analytics_system.performance_analyzer.analyze_performance(
workflow_id=workflow_id,
start_time=start_time,
end_time=end_time
)
# Calculer le taux de succès
success_stats = analytics_system.success_rate_calculator.calculate_success_rate(
workflow_id=workflow_id,
time_window_hours=hours
)
return jsonify({
'success': True,
'workflow_id': workflow_id,
'time_window_hours': hours,
'performance': performance_stats.to_dict() if performance_stats else None,
'success_rate': success_stats.to_dict() if success_stats else None
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@analytics_bp.route('/workflow/<workflow_id>/executions', methods=['GET'])
def get_workflow_executions(workflow_id: str):
"""
Récupère l'historique des exécutions d'un workflow.
Exigence: 18.3
"""
try:
executor = get_executor()
executions = executor.list_executions(workflow_id=workflow_id)
# Ajouter des métriques calculées
for execution in executions:
if execution.get('analytics_data'):
# Enrichir avec des métriques calculées
execution['calculated_metrics'] = _calculate_execution_metrics(execution)
return jsonify({
'success': True,
'workflow_id': workflow_id,
'executions': executions,
'total_count': len(executions)
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@analytics_bp.route('/dashboard/workflows', methods=['GET'])
def get_workflows_dashboard():
"""
Récupère les données du dashboard pour tous les workflows.
Exigence: 18.3
"""
try:
if not ANALYTICS_AVAILABLE:
return jsonify({
'success': False,
'error': 'Analytics system not available'
}), 503
hours = int(request.args.get('hours', 24))
# Récupérer tous les workflows
try:
db = WorkflowDatabase()
all_workflows = db.list_workflows()
except Exception as e:
return jsonify({
'success': False,
'error': f'Database error: {str(e)}'
}), 500
dashboard_data = {
'summary': {
'total_workflows': len(all_workflows),
'time_window_hours': hours,
'generated_at': datetime.now().isoformat()
},
'workflows': []
}
try:
analytics_system = get_analytics_system()
except Exception as e:
return jsonify({
'success': False,
'error': f'Analytics system error: {str(e)}'
}), 503
# Collecter les métriques pour chaque workflow
for workflow_info in all_workflows:
workflow_id = workflow_info['workflow_id']
try:
# Métriques de performance
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
performance_stats = analytics_system.performance_analyzer.analyze_performance(
workflow_id=workflow_id,
start_time=start_time,
end_time=end_time
)
success_stats = analytics_system.success_rate_calculator.calculate_success_rate(
workflow_id=workflow_id,
time_window_hours=hours
)
# Exécutions récentes
executor = get_executor()
recent_executions = executor.list_executions(workflow_id=workflow_id)[:5] # 5 plus récentes
workflow_metrics = {
'workflow_id': workflow_id,
'name': workflow_info.get('name', 'Unnamed Workflow'),
'performance': performance_stats.to_dict() if performance_stats else None,
'success_rate': success_stats.to_dict() if success_stats else None,
'recent_executions': recent_executions,
'last_execution': recent_executions[0] if recent_executions else None
}
dashboard_data['workflows'].append(workflow_metrics)
except Exception as e:
# Continuer même si un workflow échoue
dashboard_data['workflows'].append({
'workflow_id': workflow_id,
'name': workflow_info.get('name', 'Unnamed Workflow'),
'error': str(e)
})
return jsonify({
'success': True,
'dashboard': dashboard_data
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@analytics_bp.route('/dashboard/summary', methods=['GET'])
def get_dashboard_summary():
"""
Récupère un résumé global des métriques.
Exigence: 18.3
"""
try:
if not ANALYTICS_AVAILABLE:
return jsonify({
'success': False,
'error': 'Analytics system not available'
}), 503
hours = int(request.args.get('hours', 24))
analytics_system = get_analytics_system()
executor = get_executor()
# Statistiques globales
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# Compter les exécutions totales
all_executions = executor.list_executions()
recent_executions = [
exec for exec in all_executions
if exec.get('start_time') and
datetime.fromisoformat(exec['start_time']) >= start_time
]
successful_executions = [
exec for exec in recent_executions
if exec.get('status') == 'completed'
]
failed_executions = [
exec for exec in recent_executions
if exec.get('status') == 'failed'
]
# Calculer les métriques de résumé
total_executions = len(recent_executions)
success_rate = (len(successful_executions) / total_executions * 100) if total_executions > 0 else 0
# Durée moyenne
durations = [
exec.get('duration_ms', 0) for exec in successful_executions
if exec.get('duration_ms')
]
avg_duration = sum(durations) / len(durations) if durations else 0
summary = {
'time_window_hours': hours,
'total_executions': total_executions,
'successful_executions': len(successful_executions),
'failed_executions': len(failed_executions),
'success_rate_percent': round(success_rate, 2),
'average_duration_ms': round(avg_duration, 2),
'generated_at': datetime.now().isoformat()
}
return jsonify({
'success': True,
'summary': summary
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@analytics_bp.route('/insights', methods=['GET'])
def get_analytics_insights():
"""
Récupère les insights Analytics générés automatiquement.
Exigence: 18.3
"""
try:
if not ANALYTICS_AVAILABLE:
return jsonify({
'success': False,
'error': 'Analytics system not available'
}), 503
hours = int(request.args.get('hours', 168)) # 1 semaine par défaut
try:
analytics_system = get_analytics_system()
except Exception as e:
return jsonify({
'success': False,
'error': f'Analytics system error: {str(e)}'
}), 503
end_time = datetime.now()
start_time = end_time - timedelta(hours=hours)
# Générer les insights
try:
insights = analytics_system.insight_generator.generate_insights(
start_time=start_time,
end_time=end_time
)
except Exception as e:
return jsonify({
'success': False,
'error': f'Insights generation error: {str(e)}'
}), 500
return jsonify({
'success': True,
'time_window_hours': hours,
'insights': [insight.to_dict() for insight in insights],
'generated_at': datetime.now().isoformat()
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
def _calculate_execution_metrics(execution: Dict[str, Any]) -> Dict[str, Any]:
"""
Calcule des métriques supplémentaires pour une exécution.
Args:
execution: Données d'exécution
Returns:
Métriques calculées
"""
metrics = {}
try:
# Efficacité (steps completed / steps total)
steps_completed = execution.get('steps_completed', 0)
steps_total = execution.get('steps_total', 0)
if steps_total > 0:
metrics['efficiency_percent'] = round((steps_completed / steps_total) * 100, 2)
# Vitesse (steps par seconde)
duration_ms = execution.get('duration_ms', 0)
if duration_ms > 0 and steps_completed > 0:
duration_sec = duration_ms / 1000
metrics['steps_per_second'] = round(steps_completed / duration_sec, 2)
# Statut de santé
if execution.get('status') == 'completed':
metrics['health_status'] = 'healthy'
elif execution.get('status') == 'failed':
metrics['health_status'] = 'unhealthy'
else:
metrics['health_status'] = 'unknown'
except Exception as e:
metrics['calculation_error'] = str(e)
return metrics
# Fonction pour enregistrer le blueprint
def register_analytics_blueprint(app):
"""Enregistre le blueprint Analytics dans l'application Flask."""
app.register_blueprint(analytics_bp)