#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Test de Connexions API Complètes - Visual Workflow Builder Auteur : Dom, Alice, Kiro - 09 janvier 2026 Ce test valide que toutes les connexions API sont fonctionnelles : - API de capture d'écran standard (Option A ultra stable) - API de capture d'écran réelle avec détection d'éléments UI - API d'embedding visuel - API de gestion des workflows - Connexion frontend-backend complète """ import requests import json import time import subprocess import sys import base64 from pathlib import Path from PIL import Image import io # Configuration BACKEND_URL = "http://localhost:5003" API_BASE = f"{BACKEND_URL}/api" FRONTEND_URL = "http://localhost:3000" def test_backend_health_complete(): """Test 1: Vérifier la santé complète du backend""" print("🔍 Test 1: Santé complète du backend") try: response = requests.get(f"{API_BASE}/health", timeout=5) if response.status_code == 200: data = response.json() print(f"✅ Backend sain - Version: {data.get('version')}") print(f" Mode: {data.get('mode')}") print(f" Features: screen_capture={data.get('features', {}).get('screen_capture')}") print(f" Features: visual_embedding={data.get('features', {}).get('visual_embedding')}") # Vérifier les endpoints disponibles endpoints = data.get('endpoints', []) expected_endpoints = [ '/health', '/api/workflows', '/api/screen-capture', '/api/visual-embedding', '/api/real-screen-capture', '/api/real-screen-capture/start', '/api/real-screen-capture/stop', '/api/real-screen-capture/status' ] missing_endpoints = [ep for ep in expected_endpoints if ep not in endpoints] if missing_endpoints: print(f"⚠️ Endpoints manquants: {missing_endpoints}") else: print("✅ Tous les endpoints requis sont disponibles") return True else: print(f"❌ Backend unhealthy - Status: {response.status_code}") return False except Exception as e: print(f"❌ Backend inaccessible: {e}") return False def test_standard_screen_capture(): """Test 2: API de capture d'écran standard (Option A)""" print("\n🔍 Test 2: API de capture d'écran standard (Option A)") try: payload = { "format": "png", "quality": 90 } start_time = time.time() response = requests.post( f"{API_BASE}/screen-capture", json=payload, headers={'Content-Type': 'application/json'}, timeout=20 ) end_time = time.time() if response.status_code == 200: data = response.json() if data.get('success'): print(f"✅ Capture standard réussie en {end_time - start_time:.2f}s") print(f" Résolution: {data.get('width')}x{data.get('height')}") print(f" Méthode: {data.get('method')}") # Valider que c'est bien l'Option A if data.get('method') == 'ultra_stable_mss': print("✅ Option A confirmée (ultra_stable_mss)") return data else: print(f"❌ Capture standard échouée: {data.get('error')}") return None else: print(f"❌ Erreur HTTP {response.status_code}: {response.text}") return None except Exception as e: print(f"❌ Erreur capture standard: {e}") return None def test_real_screen_capture_status(): """Test 3: Statut du service de capture réelle""" print("\n🔍 Test 3: Statut du service de capture réelle") try: response = requests.get(f"{API_BASE}/real-screen-capture/status", timeout=10) if response.status_code == 200: data = response.json() if data.get('success'): status = data.get('status', {}) monitors = data.get('monitors', []) print(f"✅ Service de capture réelle disponible") print(f" Capture en cours: {status.get('is_capturing')}") print(f" Moniteur sélectionné: {status.get('selected_monitor')}") print(f" Nombre de moniteurs: {len(monitors)}") print(f" Éléments détectés: {status.get('elements_detected')}") return True else: print(f"❌ Service indisponible: {data.get('error')}") return False elif response.status_code == 503: print("⚠️ Service de capture réelle non disponible (dépendances manquantes)") return True # Considérer comme OK si le service n'est pas disponible else: print(f"❌ Erreur HTTP {response.status_code}: {response.text}") return False except Exception as e: print(f"❌ Erreur statut capture réelle: {e}") return False def test_real_screen_capture(): """Test 4: Capture d'écran réelle avec détection d'éléments""" print("\n🔍 Test 4: Capture d'écran réelle avec détection d'éléments") try: payload = { "monitor_id": 0, "detect_elements": True } start_time = time.time() response = requests.post( f"{API_BASE}/real-screen-capture", json=payload, headers={'Content-Type': 'application/json'}, timeout=25 ) end_time = time.time() if response.status_code == 200: data = response.json() if data.get('success'): elements = data.get('elements', []) monitors = data.get('monitors', []) status = data.get('status', {}) print(f"✅ Capture réelle réussie en {end_time - start_time:.2f}s") print(f" Éléments UI détectés: {len(elements)}") print(f" Moniteurs disponibles: {len(monitors)}") print(f" Méthode: {data.get('method')}") # Afficher quelques éléments détectés if elements: print(" Exemples d'éléments détectés:") for i, element in enumerate(elements[:3]): print(f" - {element.get('type', 'unknown')}: '{element.get('text', '')[:30]}...'") return data else: print(f"❌ Capture réelle échouée: {data.get('error')}") return None elif response.status_code == 503: print("⚠️ Service de capture réelle non disponible") return True # Considérer comme OK else: print(f"❌ Erreur HTTP {response.status_code}: {response.text}") return None except Exception as e: print(f"❌ Erreur capture réelle: {e}") return None def test_visual_embedding_api(): """Test 5: API d'embedding visuel""" print("\n🔍 Test 5: API d'embedding visuel") try: # Utiliser la capture standard pour l'embedding capture_data = test_standard_screen_capture() if not capture_data: print("❌ Impossible de capturer l'écran pour l'embedding") return False # Créer l'embedding embedding_payload = { "screenshot": capture_data['screenshot'], "boundingBox": { "x": 100, "y": 100, "width": 200, "height": 150 }, "stepId": "test_api_complete" } start_time = time.time() response = requests.post( f"{API_BASE}/visual-embedding", json=embedding_payload, headers={'Content-Type': 'application/json'}, timeout=25 ) end_time = time.time() if response.status_code == 200: data = response.json() if data.get('success'): print(f"✅ Embedding visuel créé en {end_time - start_time:.2f}s") print(f" ID: {data.get('embedding_id')}") print(f" Dimension: {data.get('dimension')}") print(f" Image de référence: {data.get('reference_image')}") # Vérifier l'embedding embedding = data.get('embedding', []) if len(embedding) > 0: print(f"✅ Embedding valide - {len(embedding)} dimensions") return True else: print("❌ Embedding vide") return False else: print(f"❌ Embedding échoué: {data.get('error')}") return False else: print(f"❌ Erreur HTTP {response.status_code}: {response.text}") return False except Exception as e: print(f"❌ Erreur embedding: {e}") return False def test_workflows_api(): """Test 6: API de gestion des workflows""" print("\n🔍 Test 6: API de gestion des workflows") try: # Test 1: Lister les workflows response = requests.get(f"{API_BASE}/workflows", timeout=10) if response.status_code == 200: workflows = response.json() print(f"✅ Liste des workflows obtenue - {len(workflows)} workflows") else: print(f"❌ Erreur liste workflows: {response.status_code}") return False # Test 2: Créer un workflow de test test_workflow = { "name": "Test Workflow API Complete", "description": "Workflow de test pour validation des connexions API", "created_by": "test_system", "category": "test", "tags": ["test", "api", "validation"] } response = requests.post( f"{API_BASE}/workflows", json=test_workflow, headers={'Content-Type': 'application/json'}, timeout=10 ) if response.status_code == 201: created_workflow = response.json() workflow_id = created_workflow.get('id') print(f"✅ Workflow créé - ID: {workflow_id}") # Test 3: Récupérer le workflow créé response = requests.get(f"{API_BASE}/workflows/{workflow_id}", timeout=10) if response.status_code == 200: retrieved_workflow = response.json() print(f"✅ Workflow récupéré - Nom: {retrieved_workflow.get('name')}") return True else: print(f"❌ Erreur récupération workflow: {response.status_code}") return False else: print(f"❌ Erreur création workflow: {response.status_code}") return False except Exception as e: print(f"❌ Erreur API workflows: {e}") return False def test_cors_complete(): """Test 7: Configuration CORS complète""" print("\n🔍 Test 7: Configuration CORS complète") endpoints_to_test = [ '/api/screen-capture', '/api/visual-embedding', '/api/real-screen-capture', '/api/workflows' ] cors_ok = True for endpoint in endpoints_to_test: try: headers = { 'Origin': FRONTEND_URL, 'Access-Control-Request-Method': 'POST', 'Access-Control-Request-Headers': 'Content-Type', } response = requests.options(f"{API_BASE}{endpoint}", headers=headers, timeout=5) if response.status_code == 200: cors_origin = response.headers.get('Access-Control-Allow-Origin') print(f"✅ CORS OK pour {endpoint} - Origin: {cors_origin}") else: print(f"❌ CORS échoué pour {endpoint} - Status: {response.status_code}") cors_ok = False except Exception as e: print(f"❌ Erreur CORS pour {endpoint}: {e}") cors_ok = False return cors_ok def test_frontend_backend_integration(): """Test 8: Intégration frontend-backend complète""" print("\n🔍 Test 8: Intégration frontend-backend complète") try: # Vérifier que le frontend est accessible response = requests.get(FRONTEND_URL, timeout=5) if response.status_code != 200: print(f"❌ Frontend inaccessible - Status: {response.status_code}") return False print("✅ Frontend accessible") # Simuler une séquence complète comme le ferait le frontend print(" Simulation séquence complète...") # 1. Vérifier la santé du backend health_response = requests.get(f"{API_BASE}/health", timeout=5) if health_response.status_code != 200: print("❌ Health check échoué") return False # 2. Capturer l'écran capture_response = requests.post( f"{API_BASE}/screen-capture", json={"format": "png", "quality": 90}, headers={ 'Content-Type': 'application/json', 'Origin': FRONTEND_URL, 'Referer': f'{FRONTEND_URL}/', }, timeout=15 ) if capture_response.status_code != 200: print(f"❌ Capture échouée dans l'intégration: {capture_response.status_code}") return False capture_data = capture_response.json() if not capture_data.get('success'): print(f"❌ Capture échouée: {capture_data.get('error')}") return False # 3. Créer un embedding embedding_response = requests.post( f"{API_BASE}/visual-embedding", json={ "screenshot": capture_data['screenshot'], "boundingBox": {"x": 50, "y": 50, "width": 100, "height": 100}, "stepId": "integration_test" }, headers={ 'Content-Type': 'application/json', 'Origin': FRONTEND_URL, 'Referer': f'{FRONTEND_URL}/', }, timeout=15 ) if embedding_response.status_code != 200: print(f"❌ Embedding échoué dans l'intégration: {embedding_response.status_code}") return False embedding_data = embedding_response.json() if not embedding_data.get('success'): print(f"❌ Embedding échoué: {embedding_data.get('error')}") return False # 4. Créer un workflow workflow_response = requests.post( f"{API_BASE}/workflows", json={ "name": "Workflow Intégration Test", "description": "Test d'intégration frontend-backend", "created_by": "integration_test" }, headers={ 'Content-Type': 'application/json', 'Origin': FRONTEND_URL, 'Referer': f'{FRONTEND_URL}/', }, timeout=10 ) if workflow_response.status_code != 201: print(f"❌ Création workflow échouée dans l'intégration: {workflow_response.status_code}") return False print("✅ Intégration frontend-backend complète réussie") print(" - Health check: OK") print(" - Capture d'écran: OK") print(" - Embedding visuel: OK") print(" - Gestion workflows: OK") return True except Exception as e: print(f"❌ Erreur intégration: {e}") return False def main(): """Fonction principale de validation""" print("=" * 80) print(" VALIDATION COMPLÈTE - CONNEXIONS API VISUAL WORKFLOW BUILDER") print("=" * 80) print("Auteur : Dom, Alice, Kiro - 09 janvier 2026") print(f"Backend: {BACKEND_URL}") print(f"Frontend: {FRONTEND_URL}") print("") tests = [ ("Santé complète du backend", test_backend_health_complete), ("Capture d'écran standard", lambda: test_standard_screen_capture() is not None), ("Statut capture réelle", test_real_screen_capture_status), ("Capture réelle avec détection", lambda: test_real_screen_capture() is not None), ("API d'embedding visuel", test_visual_embedding_api), ("API de gestion workflows", test_workflows_api), ("Configuration CORS", test_cors_complete), ("Intégration frontend-backend", test_frontend_backend_integration), ] results = [] for test_name, test_func in tests: try: print(f"\n{'='*60}") result = test_func() results.append((test_name, result)) if result: print(f"✅ {test_name}: RÉUSSI") else: print(f"❌ {test_name}: ÉCHOUÉ") except Exception as e: print(f"❌ {test_name}: ERREUR - {e}") results.append((test_name, False)) time.sleep(1) # Pause entre les tests # Résumé final print("\n" + "=" * 80) print(" RÉSUMÉ DE LA VALIDATION COMPLÈTE") print("=" * 80) passed = sum(1 for _, result in results if result) total = len(results) print(f"Tests réussis: {passed}/{total}") for test_name, result in results: status = "✅ RÉUSSI" if result else "❌ ÉCHOUÉ" print(f" {test_name}: {status}") if passed == total: print("\n🎉 VALIDATION COMPLÈTE RÉUSSIE !") print("✅ Toutes les connexions API sont fonctionnelles") print("✅ L'intégration frontend-backend est opérationnelle") print("✅ Le système est prêt pour une utilisation complète") print("\n🚀 TOUTES LES API SONT CONNECTÉES ET FONCTIONNELLES !") else: print(f"\n⚠️ VALIDATION PARTIELLE ({passed}/{total})") print("🔧 Certaines connexions API nécessitent une attention") # Recommandations spécifiques failed_tests = [name for name, result in results if not result] if failed_tests: print("\n🔧 ACTIONS RECOMMANDÉES:") for test_name in failed_tests: if "backend" in test_name.lower(): print("- Vérifier que le backend Flask est démarré sur le port 5003") elif "frontend" in test_name.lower(): print("- Vérifier que le frontend React est démarré sur le port 3000") elif "capture réelle" in test_name.lower(): print("- Vérifier les dépendances de capture réelle (optionnelles)") elif "cors" in test_name.lower(): print("- Vérifier la configuration CORS du backend") elif "embedding" in test_name.lower(): print("- Vérifier les dépendances d'embedding (CLIP, transformers)") elif "workflow" in test_name.lower(): print("- Vérifier l'accès au système de fichiers pour les workflows") print("\n" + "=" * 80) return passed == total if __name__ == "__main__": success = main() sys.exit(0 if success else 1)