#!/usr/bin/env python3 """ Tests d'intégration pour le pipeline serveur Teste le flux complet: 1. Upload d'une session chiffrée 2. Déchiffrement 3. Traitement par le pipeline 4. Génération des artefacts """ import pytest import sys import json import tempfile import zipfile from pathlib import Path from datetime import datetime # Ajouter le répertoire parent au path sys.path.insert(0, str(Path(__file__).parent.parent.parent)) try: from core.models import RawSession, Event, Screenshot, WindowContext from server.storage_encrypted import decrypt_file from server.processing_pipeline import ProcessingPipeline except ImportError as e: pytest.skip(f"Server components not available: {e}", allow_module_level=True) # Import du module de chiffrement (copie simplifiée pour les tests) import os import hashlib from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import padding as crypto_padding def encrypt_session_file(zip_path: str, password: str) -> str: """Chiffre un fichier ZIP (version simplifiée pour tests).""" encrypted_path = zip_path + '.enc' # Générer salt salt = os.urandom(16) # Dériver clé key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000, dklen=32) # Générer IV iv = os.urandom(16) # Lire le fichier with open(zip_path, 'rb') as f: plaintext = f.read() # Padding PKCS7 padder = crypto_padding.PKCS7(128).padder() padded_data = padder.update(plaintext) + padder.finalize() # Chiffrer cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() ciphertext = encryptor.update(padded_data) + encryptor.finalize() # Écrire le fichier chiffré with open(encrypted_path, 'wb') as f: f.write(salt) f.write(iv) f.write(ciphertext) return encrypted_path @pytest.fixture def sample_session(): """Crée une session de test.""" user = {"id": "test_user", "label": "Test User"} context = { "training_label": "test_workflow", "customer": "Test Corp", "notes": "Session de test" } environment = { "os": "Linux", "screen": {"primary_resolution": [1920, 1080]} } session = RawSession( session_id="test_session_001", agent_version="v0.1.0", started_at=datetime.now(), user=user, context=context, environment=environment ) # Ajouter des événements for i in range(3): window = WindowContext( app_name="TestApp", title="Test Window" ) event = Event( type="click", t=float(i), window=window, screenshot_id=f"screenshot_{i:04d}" ) session.events.append(event) # Ajouter des screenshots for i in range(3): screenshot = Screenshot( screenshot_id=f"screenshot_{i:04d}", relative_path=f"screenshots/screenshot_{i:04d}.png", captured_at=datetime.now().isoformat() ) session.screenshots.append(screenshot) return session def test_encryption_decryption_roundtrip(sample_session, tmp_path): """Test: Chiffrement puis déchiffrement d'une session.""" # Créer un fichier ZIP temporaire zip_path = tmp_path / "session.zip" with zipfile.ZipFile(zip_path, 'w') as zf: # Ajouter le JSON json_data = json.dumps(sample_session.to_json(), indent=2) zf.writestr(f"{sample_session.session_id}/{sample_session.session_id}.json", json_data) # Ajouter des screenshots factices for screenshot in sample_session.screenshots: zf.writestr( f"{sample_session.session_id}/{screenshot.relative_path}", b"fake_image_data" ) # Chiffrer password = "test_password_123" encrypted_path = encrypt_session_file(str(zip_path), password) assert Path(encrypted_path).exists() assert Path(encrypted_path).suffix == '.enc' # Déchiffrer decrypted_path = decrypt_file(encrypted_path, password) assert Path(decrypted_path).exists() assert Path(decrypted_path).suffix == '.zip' # Vérifier le contenu with zipfile.ZipFile(decrypted_path, 'r') as zf: files = zf.namelist() assert f"{sample_session.session_id}/{sample_session.session_id}.json" in files assert len([f for f in files if f.endswith('.png')]) == 3 def test_decryption_wrong_password(sample_session, tmp_path): """Test: Déchiffrement avec mauvais mot de passe échoue.""" # Créer et chiffrer zip_path = tmp_path / "session.zip" with zipfile.ZipFile(zip_path, 'w') as zf: zf.writestr("test.txt", "test data") encrypted_path = encrypt_session_file(str(zip_path), "correct_password") # Tenter de déchiffrer avec mauvais mot de passe with pytest.raises(ValueError, match="(mot de passe incorrect|padding|corrompu)"): decrypt_file(encrypted_path, "wrong_password") def test_processing_pipeline_basic(sample_session, tmp_path): """Test: Pipeline de traitement basique.""" # Créer une structure de session sur disque session_dir = tmp_path / "sessions" / sample_session.session_id session_dir.mkdir(parents=True) # Sauvegarder le JSON json_path = session_dir / sample_session.session_id / f"{sample_session.session_id}.json" json_path.parent.mkdir(parents=True) with open(json_path, 'w') as f: f.write(json.dumps(sample_session.to_json(), indent=2)) # Créer des screenshots factices for screenshot in sample_session.screenshots: screenshot_path = session_dir / sample_session.session_id / screenshot.relative_path screenshot_path.parent.mkdir(parents=True, exist_ok=True) screenshot_path.write_bytes(b"fake_image_data") # Initialiser le pipeline pipeline = ProcessingPipeline(base_path=str(tmp_path)) # Traiter la session stats = pipeline.process_session(sample_session.session_id) # Vérifier les statistiques assert stats['status'] == 'success' assert stats['session_id'] == sample_session.session_id assert stats['screen_states_created'] == 3 # 3 événements avec screenshots # Note: embeddings et UI detection peuvent échouer si modèles non disponibles # On vérifie juste qu'il n'y a pas d'erreur fatale def test_processing_pipeline_missing_session(tmp_path): """Test: Pipeline avec session inexistante.""" pipeline = ProcessingPipeline(base_path=str(tmp_path)) stats = pipeline.process_session("nonexistent_session") assert stats['status'] == 'error' assert len(stats['errors']) > 0 def test_processing_pipeline_corrupted_json(tmp_path): """Test: Pipeline avec JSON corrompu.""" # Créer une session avec JSON invalide session_dir = tmp_path / "sessions" / "corrupted_session" session_dir.mkdir(parents=True) json_path = session_dir / "corrupted_session" / "corrupted_session.json" json_path.parent.mkdir(parents=True) json_path.write_text("{ invalid json }") pipeline = ProcessingPipeline(base_path=str(tmp_path)) stats = pipeline.process_session("corrupted_session") assert stats['status'] == 'error' assert len(stats['errors']) > 0 if __name__ == "__main__": pytest.main([__file__, "-v"])