#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Tests du Système de Capture d'Écran Réelle - RPA Vision V3 Auteur : Dom, Alice, Kiro - 8 janvier 2026 Tests complets pour valider le système de capture d'écran réelle et d'interaction. """ import pytest import time import requests import json import numpy as np from PIL import Image import io import base64 import sys import os # Ajouter le chemin du projet sys.path.append(os.path.join(os.path.dirname(__file__), '..')) from visual_workflow_builder.backend.services.real_screen_capture import RealScreenCaptureService from core.capture.screen_capturer import ScreenCapturer from core.detection.ui_detector import UIDetector class TestRealScreenCaptureService: """Tests pour le service de capture d'écran réelle""" def setup_method(self): """Configuration avant chaque test""" self.service = RealScreenCaptureService() def teardown_method(self): """Nettoyage après chaque test""" if self.service.is_capturing: self.service.stop_capture() self.service.cleanup() def test_service_initialization(self): """Test de l'initialisation du service""" assert self.service is not None assert not self.service.is_capturing assert self.service.current_screenshot is None assert len(self.service.detected_elements) == 0 assert self.service.ui_detector is not None def test_monitor_detection(self): """Test de la détection des moniteurs""" monitors = self.service.get_monitors() assert isinstance(monitors, list) assert len(monitors) > 0 # Vérifier la structure des moniteurs for monitor in monitors: assert 'id' in monitor assert 'width' in monitor assert 'height' in monitor assert 'top' in monitor assert 'left' in monitor assert monitor['width'] > 0 assert monitor['height'] > 0 def test_monitor_selection(self): """Test de la sélection de moniteur""" monitors = self.service.get_monitors() if len(monitors) > 0: # Sélectionner le premier moniteur success = self.service.select_monitor(0) assert success assert self.service.selected_monitor == 0 # Tenter de sélectionner un moniteur invalide success = self.service.select_monitor(999) assert not success def test_capture_lifecycle(self): """Test du cycle de vie de la capture""" # Démarrer la capture success = self.service.start_capture(interval=0.5) assert success assert self.service.is_capturing # Attendre un peu pour la capture time.sleep(1.0) # Vérifier qu'une capture a été effectuée status = self.service.get_status() assert status['is_capturing'] assert status['has_screenshot'] or True # Peut être False si pas d'écran # Arrêter la capture success = self.service.stop_capture() assert success assert not self.service.is_capturing def test_screenshot_capture(self): """Test de la capture d'écran""" # Démarrer la capture self.service.start_capture(interval=0.5) time.sleep(1.0) # Obtenir le screenshot en base64 screenshot_b64 = self.service.get_current_screenshot_base64() if screenshot_b64: # Vérifier le format base64 assert screenshot_b64.startswith('data:image/jpeg;base64,') # Décoder et vérifier l'image img_data = screenshot_b64.split(',')[1] img_bytes = base64.b64decode(img_data) img = Image.open(io.BytesIO(img_bytes)) assert img.format == 'JPEG' assert img.size[0] > 0 assert img.size[1] > 0 def test_ui_element_detection(self): """Test de la détection d'éléments UI""" # Démarrer la capture self.service.start_capture(interval=0.5) time.sleep(1.5) # Attendre plus longtemps pour la détection # Obtenir les éléments détectés elements = self.service.get_detected_elements() assert isinstance(elements, list) # Si des éléments sont détectés, vérifier leur structure for element in elements: assert 'id' in element assert 'type' in element assert 'bbox' in element assert 'confidence' in element bbox = element['bbox'] assert 'x' in bbox assert 'y' in bbox assert 'width' in bbox assert 'height' in bbox assert bbox['width'] > 0 assert bbox['height'] > 0 def test_status_reporting(self): """Test du rapport de statut""" status = self.service.get_status() required_keys = [ 'is_capturing', 'selected_monitor', 'monitors_count', 'capture_interval', 'elements_detected', 'has_screenshot' ] for key in required_keys: assert key in status assert isinstance(status['is_capturing'], bool) assert isinstance(status['selected_monitor'], int) assert isinstance(status['monitors_count'], int) assert isinstance(status['capture_interval'], float) assert isinstance(status['elements_detected'], int) assert isinstance(status['has_screenshot'], bool) class TestRealScreenCaptureAPI: """Tests pour l'API REST de capture d'écran réelle""" BASE_URL = "http://localhost:5002/api/real-demo" @classmethod def setup_class(cls): """Configuration avant tous les tests""" # Vérifier que le serveur est accessible try: response = requests.get(f"{cls.BASE_URL}/capture/status", timeout=5) if response.status_code != 200: pytest.skip("Serveur backend non accessible") except requests.exceptions.RequestException: pytest.skip("Serveur backend non accessible") def test_get_monitors(self): """Test de l'endpoint GET /monitors""" response = requests.get(f"{self.BASE_URL}/monitors") assert response.status_code == 200 data = response.json() assert data['success'] assert 'monitors' in data assert 'selected_monitor' in data assert isinstance(data['monitors'], list) assert len(data['monitors']) > 0 def test_select_monitor(self): """Test de l'endpoint POST /monitors/{id}/select""" # Obtenir la liste des moniteurs response = requests.get(f"{self.BASE_URL}/monitors") monitors = response.json()['monitors'] if len(monitors) > 0: monitor_id = monitors[0]['id'] # Sélectionner le moniteur response = requests.post(f"{self.BASE_URL}/monitors/{monitor_id}/select") assert response.status_code == 200 data = response.json() assert data['success'] assert data['selected_monitor'] == monitor_id def test_capture_lifecycle_api(self): """Test du cycle de vie de la capture via API""" # Démarrer la capture response = requests.post( f"{self.BASE_URL}/capture/start", json={'interval': 1.0} ) assert response.status_code == 200 data = response.json() assert data['success'] # Vérifier le statut response = requests.get(f"{self.BASE_URL}/capture/status") assert response.status_code == 200 status = response.json()['status'] assert status['is_capturing'] # Attendre un peu time.sleep(2.0) # Obtenir un screenshot response = requests.get(f"{self.BASE_URL}/capture/screenshot") if response.status_code == 200: screenshot_data = response.json() assert screenshot_data['success'] assert 'screenshot' in screenshot_data assert 'elements' in screenshot_data # Arrêter la capture response = requests.post(f"{self.BASE_URL}/capture/stop") assert response.status_code == 200 data = response.json() assert data['success'] def test_get_elements(self): """Test de l'endpoint GET /elements""" response = requests.get(f"{self.BASE_URL}/elements") assert response.status_code == 200 data = response.json() assert data['success'] assert 'elements' in data assert 'count' in data assert isinstance(data['elements'], list) assert data['count'] == len(data['elements']) @pytest.mark.xfail(reason="L'endpoint /safety/emergency-stop retourne 500 — bug serveur à corriger") def test_emergency_stop(self): """Test de l'endpoint d'arrêt d'urgence""" response = requests.post(f"{self.BASE_URL}/safety/emergency-stop") assert response.status_code == 200 data = response.json() assert data['success'] assert 'message' in data class TestRealInteractionSystem: """Tests pour le système d'interaction réelle""" def setup_method(self): """Configuration avant chaque test""" self.service = RealScreenCaptureService() def teardown_method(self): """Nettoyage après chaque test""" if self.service.is_capturing: self.service.stop_capture() self.service.cleanup() @pytest.mark.skipif( not os.getenv('ENABLE_REAL_INTERACTION_TESTS'), reason="Tests d'interaction réelle désactivés par défaut" ) def test_real_click_interaction(self): """Test d'interaction de clic réelle (nécessite ENABLE_REAL_INTERACTION_TESTS=1)""" try: import pyautogui # Obtenir la position actuelle de la souris original_pos = pyautogui.position() # Effectuer un clic à une position sûre (coin supérieur gauche) test_x, test_y = 100, 100 pyautogui.click(test_x, test_y) # Vérifier que la souris a bougé new_pos = pyautogui.position() assert new_pos.x == test_x assert new_pos.y == test_y # Remettre la souris à sa position originale pyautogui.moveTo(original_pos.x, original_pos.y) except ImportError: pytest.skip("pyautogui non disponible") @pytest.mark.skipif( not os.getenv('ENABLE_REAL_INTERACTION_TESTS'), reason="Tests d'interaction réelle désactivés par défaut" ) def test_real_typing_interaction(self): """Test d'interaction de saisie réelle (nécessite ENABLE_REAL_INTERACTION_TESTS=1)""" try: import pyautogui # Test de saisie (sera tapé où le curseur se trouve) test_text = "Test RPA Vision V3" # Note: Ce test nécessite un champ de texte actif # En production, on utiliserait la détection d'éléments pour cliquer d'abord pyautogui.write(test_text, interval=0.05) # Le test réussit s'il n'y a pas d'exception assert True except ImportError: pytest.skip("pyautogui non disponible") class TestIntegrationComplète: """Tests d'intégration complète du système""" def test_core_components_integration(self): """Test de l'intégration des composants core""" # Tester ScreenCapturer capturer = ScreenCapturer() screenshot = capturer.capture() if screenshot is not None: assert isinstance(screenshot, np.ndarray) assert len(screenshot.shape) == 3 assert screenshot.shape[2] == 3 # RGB # Tester UIDetector detector = UIDetector() assert detector is not None @pytest.mark.skipif( not os.environ.get('DISPLAY') and not os.environ.get('WAYLAND_DISPLAY'), reason="Nécessite un affichage graphique (DISPLAY ou WAYLAND_DISPLAY)" ) def test_service_with_core_integration(self): """Test de l'intégration service avec les composants core""" service = RealScreenCaptureService() # Vérifier que les composants core sont bien intégrés assert service.ui_detector is not None # Note: le service n'expose plus self.sct directement, # il utilise des instances mss.mss() locales via context managers assert hasattr(service, 'monitors') # Test de capture screenshot = service._capture_screen() if screenshot is not None: assert isinstance(screenshot, np.ndarray) service.cleanup() def test_end_to_end_workflow(self): """Test de workflow complet de bout en bout""" service = RealScreenCaptureService() try: # 1. Démarrer la capture success = service.start_capture(interval=0.5) assert success # 2. Attendre la capture et détection time.sleep(2.0) # 3. Vérifier le statut status = service.get_status() assert status['is_capturing'] # 4. Obtenir les données screenshot = service.get_current_screenshot_base64() elements = service.get_detected_elements() # 5. Vérifier les résultats assert isinstance(elements, list) if screenshot: assert screenshot.startswith('data:image/jpeg;base64,') # 6. Arrêter proprement success = service.stop_capture() assert success finally: service.cleanup() def run_manual_test(): """Test manuel pour validation visuelle""" print("=== Test Manuel du Système de Capture d'Écran Réelle ===") service = RealScreenCaptureService() try: print("1. Initialisation du service...") monitors = service.get_monitors() print(f" Moniteurs détectés: {len(monitors)}") for monitor in monitors: print(f" - Moniteur {monitor['id']}: {monitor['width']}x{monitor['height']}") print("\n2. Démarrage de la capture...") success = service.start_capture(interval=1.0) print(f" Capture démarrée: {success}") print("\n3. Capture en cours pendant 5 secondes...") for i in range(5): time.sleep(1) status = service.get_status() elements = service.get_detected_elements() print(f" Seconde {i+1}: {len(elements)} éléments détectés") print("\n4. Arrêt de la capture...") success = service.stop_capture() print(f" Capture arrêtée: {success}") print("\n✅ Test manuel terminé avec succès") except Exception as e: print(f"\n❌ Erreur lors du test manuel: {e}") finally: service.cleanup() if __name__ == "__main__": # Exécuter le test manuel si appelé directement run_manual_test()