#!/usr/bin/env python3 """ Test WebSocket - Visual Workflow Builder Test manuel des fonctionnalités WebSocket pour les mises à jour en temps réel. Exigences: 6.2, 6.3, 6.4 """ import sys import time from socketio import Client import threading sys.path.insert(0, '.') # Configuration SERVER_URL = "http://localhost:5002" def test_websocket_connection(): """Test de connexion WebSocket""" print("🧪 Test 1: Connexion WebSocket") try: sio = Client() # Événements reçus events_received = [] @sio.on('connected') def on_connected(data): print(f" ✅ Connecté: {data}") events_received.append(('connected', data)) @sio.on('error') def on_error(data): print(f" ❌ Erreur: {data}") events_received.append(('error', data)) # Connexion sio.connect(SERVER_URL) time.sleep(0.5) # Vérifier la connexion assert sio.connected, "Client devrait être connecté" assert len(events_received) > 0, "Devrait avoir reçu un événement" # Déconnexion sio.disconnect() time.sleep(0.2) print(" ✅ Connexion/Déconnexion réussie") return True except Exception as e: print(f" ❌ Erreur: {e}") import traceback traceback.print_exc() return False def test_execution_subscription(): """Test de souscription à une exécution""" print("\n🧪 Test 2: Souscription à une exécution") try: sio = Client() # Événements reçus events_received = [] @sio.on('connected') def on_connected(data): print(f" Connecté") @sio.on('execution_status') def on_execution_status(data): print(f" 📊 Statut reçu: {data.get('status')}") events_received.append(('execution_status', data)) @sio.on('error') def on_error(data): print(f" ⚠️ Erreur: {data.get('message')}") events_received.append(('error', data)) @sio.on('unsubscribed') def on_unsubscribed(data): print(f" Désabonné: {data}") events_received.append(('unsubscribed', data)) # Connexion sio.connect(SERVER_URL) time.sleep(0.5) # Souscrire à une exécution (qui n'existe pas) sio.emit('subscribe_execution', {'execution_id': 'test_exec_123'}) time.sleep(0.5) # Devrait recevoir une erreur assert len(events_received) > 0, "Devrait avoir reçu un événement" # Désabonnement sio.emit('unsubscribe_execution', {'execution_id': 'test_exec_123'}) time.sleep(0.5) # Déconnexion sio.disconnect() print(" ✅ Souscription/Désabonnement réussi") return True except Exception as e: print(f" ❌ Erreur: {e}") import traceback traceback.print_exc() return False def test_execution_with_websocket(): """Test d'exécution avec mises à jour WebSocket""" print("\n🧪 Test 3: Exécution avec WebSocket") try: # Créer un workflow de test from services.serialization import create_empty_workflow, WorkflowDatabase from models.visual_workflow import VisualNode, VisualEdge, Position, Size, Port workflow = create_empty_workflow("Test WebSocket Workflow") # Ajouter des nodes workflow.nodes.extend([ VisualNode( id="node_1", type="wait", position=Position(100, 100), size=Size(200, 80), parameters={'duration': 100}, input_ports=[], output_ports=[Port('out', 'Output', 'output')] ), VisualNode( id="node_2", type="wait", position=Position(400, 100), size=Size(200, 80), parameters={'duration': 100}, input_ports=[Port('in', 'Input', 'input')], output_ports=[] ) ]) workflow.edges.append(VisualEdge( id="edge_1", source="node_1", target="node_2", source_port="out", target_port="in" )) # Sauvegarder db = WorkflowDatabase() db.save(workflow) # Démarrer l'exécution from services.execution_integration import get_executor executor = get_executor() # Événements WebSocket reçus ws_events = [] # Client WebSocket sio = Client() @sio.on('connected') def on_connected(data): print(f" WebSocket connecté") @sio.on('execution_started') def on_execution_started(data): print(f" 🚀 Exécution démarrée: {data.get('execution_id')}") ws_events.append(('started', data)) @sio.on('node_status') def on_node_status(data): print(f" 📍 Node {data.get('node_id')}: {data.get('status')}") ws_events.append(('node_status', data)) @sio.on('execution_progress') def on_execution_progress(data): progress = data.get('progress', {}).get('progress', 0) print(f" 📊 Progression: {progress:.1f}%") ws_events.append(('progress', data)) @sio.on('execution_complete') def on_execution_complete(data): print(f" ✅ Exécution terminée: {data.get('status')}") ws_events.append(('complete', data)) @sio.on('execution_error') def on_execution_error(data): print(f" ❌ Erreur: {data.get('error')}") ws_events.append(('error', data)) # Connexion WebSocket sio.connect(SERVER_URL) time.sleep(0.5) # Démarrer l'exécution execution_id = executor.execute_workflow(workflow_id=workflow.id) print(f" Exécution démarrée: {execution_id}") # Souscrire aux mises à jour sio.emit('subscribe_execution', {'execution_id': execution_id}) time.sleep(0.2) # Attendre la fin de l'exécution max_wait = 5 waited = 0 while waited < max_wait: result = executor.get_execution_status(execution_id) if result and result.status in ['completed', 'failed', 'cancelled']: break time.sleep(0.1) waited += 0.1 # Attendre un peu pour recevoir tous les événements time.sleep(0.5) # Déconnexion sio.disconnect() # Vérifier les événements reçus print(f"\n 📊 Événements WebSocket reçus: {len(ws_events)}") for event_type, data in ws_events: print(f" - {event_type}") # On devrait avoir reçu des événements assert len(ws_events) > 0, "Devrait avoir reçu des événements WebSocket" print("\n ✅ Exécution avec WebSocket réussie") return True except Exception as e: print(f" ❌ Erreur: {e}") import traceback traceback.print_exc() return False def main(): """Exécute tous les tests""" print("=" * 60) print("🚀 Tests WebSocket - Visual Workflow Builder") print("=" * 60) print() print("⚠️ IMPORTANT: Le serveur Flask doit être démarré sur le port 5002") print(" Commande: cd backend && python app.py") print() # Vérifier si le serveur est accessible import requests try: response = requests.get(f"{SERVER_URL}/health", timeout=2) print(f"✅ Serveur accessible: {response.json()}") print() except Exception as e: print(f"❌ Serveur non accessible: {e}") print(" Démarrez le serveur avec: cd backend && python app.py") return 1 tests = [ test_websocket_connection, test_execution_subscription, test_execution_with_websocket ] results = [] for test in tests: try: result = test() results.append(result) except Exception as e: print(f"\n❌ Test échoué avec exception: {e}") import traceback traceback.print_exc() results.append(False) print("\n" + "=" * 60) passed = sum(results) total = len(results) if passed == total: print(f"✅ TOUS LES TESTS RÉUSSIS! ({passed}/{total})") print("=" * 60) return 0 else: print(f"❌ {total - passed} test(s) échoué(s) sur {total}") print("=" * 60) return 1 if __name__ == '__main__': sys.exit(main())