Files
rpa_vision_v3/visual_workflow_builder/test_analytics_integration.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

326 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Test script for Analytics Integration - Visual Workflow Builder
Tests the analytics integration functionality including:
- Analytics API endpoints
- Metrics collection during execution
- Dashboard data retrieval
Exigence: 18.3
"""
import sys
import os
import time
import json
import requests
from pathlib import Path
# Ajouter le chemin racine pour les imports
sys.path.insert(0, str(Path(__file__).parent.parent))
def test_analytics_endpoints():
"""Test les endpoints Analytics"""
base_url = "http://localhost:5002"
print("🧪 Test des endpoints Analytics...")
# Test 1: Dashboard summary
print("\n1. Test du résumé du dashboard...")
try:
response = requests.get(f"{base_url}/api/analytics/dashboard/summary?hours=24")
print(f" Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f" ✅ Résumé récupéré: {data.get('success', False)}")
if data.get('success'):
summary = data.get('summary', {})
print(f" 📊 Exécutions totales: {summary.get('total_executions', 0)}")
print(f" 📈 Taux de succès: {summary.get('success_rate_percent', 0)}%")
elif response.status_code == 503:
print(" ⚠️ Analytics service non disponible (normal si pas configuré)")
else:
print(f" ❌ Erreur: {response.text}")
except Exception as e:
print(f" ❌ Erreur de connexion: {e}")
# Test 2: Dashboard workflows
print("\n2. Test du dashboard des workflows...")
try:
response = requests.get(f"{base_url}/api/analytics/dashboard/workflows?hours=24")
print(f" Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f" ✅ Dashboard récupéré: {data.get('success', False)}")
if data.get('success'):
workflows = data.get('dashboard', {}).get('workflows', [])
print(f" 📋 Nombre de workflows: {len(workflows)}")
elif response.status_code == 503:
print(" ⚠️ Analytics service non disponible")
else:
print(f" ❌ Erreur: {response.text}")
except Exception as e:
print(f" ❌ Erreur de connexion: {e}")
# Test 3: Insights
print("\n3. Test des insights...")
try:
response = requests.get(f"{base_url}/api/analytics/insights?hours=168")
print(f" Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
print(f" ✅ Insights récupérés: {data.get('success', False)}")
if data.get('success'):
insights = data.get('insights', [])
print(f" 💡 Nombre d'insights: {len(insights)}")
elif response.status_code == 503:
print(" ⚠️ Analytics service non disponible")
else:
print(f" ❌ Erreur: {response.text}")
except Exception as e:
print(f" ❌ Erreur de connexion: {e}")
def test_workflow_execution_with_analytics():
"""Test l'exécution d'un workflow avec collecte d'analytics"""
base_url = "http://localhost:5002"
print("\n🚀 Test d'exécution avec Analytics...")
# Créer un workflow de test
test_workflow = {
"name": "Test Analytics Workflow",
"description": "Workflow de test pour les analytics",
"nodes": [
{
"id": "start",
"type": "start",
"position": {"x": 100, "y": 100},
"label": "Début",
"parameters": {},
"input_ports": [],
"output_ports": [{"id": "out", "name": "Output", "type": "output"}]
},
{
"id": "action1",
"type": "click",
"position": {"x": 300, "y": 100},
"label": "Clic Test",
"parameters": {
"target": "Button Test",
"timeout": 5000
},
"input_ports": [{"id": "in", "name": "Input", "type": "input"}],
"output_ports": [{"id": "out", "name": "Output", "type": "output"}]
},
{
"id": "end",
"type": "end",
"position": {"x": 500, "y": 100},
"label": "Fin",
"parameters": {},
"input_ports": [{"id": "in", "name": "Input", "type": "input"}],
"output_ports": []
}
],
"edges": [
{
"id": "edge1",
"source": "start",
"target": "action1",
"source_port": "out",
"target_port": "in"
},
{
"id": "edge2",
"source": "action1",
"target": "end",
"source_port": "out",
"target_port": "in"
}
],
"variables": {},
"metadata": {
"created_at": "2024-12-14T00:00:00Z",
"version": "1.0.0"
}
}
try:
# 1. Sauvegarder le workflow
print("\n1. Sauvegarde du workflow de test...")
response = requests.post(
f"{base_url}/api/workflows",
json=test_workflow,
headers={"Content-Type": "application/json"}
)
if response.status_code != 201:
print(f" ❌ Erreur sauvegarde: {response.text}")
return
workflow_data = response.json()
workflow_id = workflow_data.get('workflow_id')
print(f" ✅ Workflow sauvegardé: {workflow_id}")
# 2. Exécuter le workflow
print("\n2. Exécution du workflow...")
response = requests.post(
f"{base_url}/api/executions",
json={
"workflow_id": workflow_id,
"variables": {}
},
headers={"Content-Type": "application/json"}
)
if response.status_code != 200:
print(f" ❌ Erreur exécution: {response.text}")
return
execution_data = response.json()
execution_id = execution_data.get('execution_id')
print(f" ✅ Exécution démarrée: {execution_id}")
# 3. Attendre la fin de l'exécution
print("\n3. Attente de la fin d'exécution...")
max_wait = 30 # 30 secondes max
wait_time = 0
while wait_time < max_wait:
response = requests.get(f"{base_url}/api/executions/{execution_id}")
if response.status_code == 200:
exec_status = response.json()
status = exec_status.get('execution', {}).get('status')
print(f" 📊 Statut: {status}")
if status in ['completed', 'failed', 'cancelled']:
break
time.sleep(2)
wait_time += 2
# 4. Vérifier les métriques du workflow
print("\n4. Vérification des métriques...")
response = requests.get(
f"{base_url}/api/analytics/workflow/{workflow_id}/performance?hours=1"
)
if response.status_code == 200:
data = response.json()
print(f" ✅ Métriques récupérées: {data.get('success', False)}")
if data.get('success'):
perf = data.get('performance')
success_rate = data.get('success_rate')
if perf:
print(f" 📈 Durée moyenne: {perf.get('avg_duration_ms', 0)}ms")
print(f" 📊 Exécutions: {perf.get('total_executions', 0)}")
if success_rate:
print(f" ✅ Taux de succès: {success_rate.get('success_rate', 0)}%")
elif response.status_code == 503:
print(" ⚠️ Analytics service non disponible")
else:
print(f" ❌ Erreur métriques: {response.text}")
# 5. Vérifier l'historique des exécutions
print("\n5. Vérification de l'historique...")
response = requests.get(
f"{base_url}/api/analytics/workflow/{workflow_id}/executions"
)
if response.status_code == 200:
data = response.json()
print(f" ✅ Historique récupéré: {data.get('success', False)}")
if data.get('success'):
executions = data.get('executions', [])
print(f" 📋 Nombre d'exécutions: {len(executions)}")
if executions:
latest = executions[0]
print(f" 🕐 Dernière exécution: {latest.get('status')} - {latest.get('duration_ms', 0)}ms")
else:
print(f" ❌ Erreur historique: {response.text}")
except Exception as e:
print(f" ❌ Erreur générale: {e}")
def test_frontend_integration():
"""Test l'intégration frontend"""
print("\n🎨 Test de l'intégration frontend...")
# Vérifier que les composants Analytics existent
frontend_path = Path(__file__).parent / "frontend" / "src" / "components"
analytics_dashboard = frontend_path / "AnalyticsDashboard" / "index.tsx"
metrics_display = frontend_path / "MetricsDisplay" / "index.tsx"
print(f" 📁 AnalyticsDashboard: {'' if analytics_dashboard.exists() else ''}")
print(f" 📁 MetricsDisplay: {'' if metrics_display.exists() else ''}")
# Vérifier le hook Analytics
hooks_path = Path(__file__).parent / "frontend" / "src" / "hooks"
analytics_hook = hooks_path / "useAnalytics.ts"
print(f" 🪝 useAnalytics hook: {'' if analytics_hook.exists() else ''}")
# Vérifier l'intégration dans App.tsx
app_file = Path(__file__).parent / "frontend" / "src" / "App.tsx"
if app_file.exists():
content = app_file.read_text()
has_analytics_import = "AnalyticsDashboard" in content
has_analytics_button = "Analytics" in content
print(f" 📱 Import AnalyticsDashboard: {'' if has_analytics_import else ''}")
print(f" 🔘 Bouton Analytics: {'' if has_analytics_button else ''}")
else:
print(" ❌ App.tsx non trouvé")
def main():
"""Fonction principale de test"""
print("=" * 60)
print("🧪 TEST ANALYTICS INTEGRATION - Visual Workflow Builder")
print("=" * 60)
# Vérifier que le serveur backend est démarré
try:
response = requests.get("http://localhost:5002/health", timeout=5)
if response.status_code == 200:
print("✅ Serveur backend accessible")
else:
print("❌ Serveur backend non accessible")
return
except Exception as e:
print(f"❌ Impossible de se connecter au serveur: {e}")
print("💡 Assurez-vous que le serveur backend est démarré sur le port 5002")
return
# Exécuter les tests
test_analytics_endpoints()
test_workflow_execution_with_analytics()
test_frontend_integration()
print("\n" + "=" * 60)
print("✅ TESTS ANALYTICS INTEGRATION TERMINÉS")
print("=" * 60)
print("\n📋 Résumé de l'intégration Analytics:")
print(" • Endpoints API Analytics créés")
print(" • Hooks de collecte de métriques ajoutés")
print(" • Composants frontend Analytics développés")
print(" • Dashboard de métriques intégré")
print(" • Affichage temps réel des métriques")
print("\n🚀 Pour tester complètement:")
print(" 1. Démarrez le serveur: cd backend && python app.py")
print(" 2. Démarrez le frontend: cd frontend && npm start")
print(" 3. Créez et exécutez des workflows")
print(" 4. Consultez le dashboard Analytics")
if __name__ == "__main__":
main()