#!/usr/bin/env python3 """ Test de synchronisation des clés de chiffrement entre agent v0 et serveur. Ce test vérifie que: 1. L'agent v0 peut chiffrer avec la clé configurée 2. Le serveur peut déchiffrer avec la même clé 3. Le cycle complet fonctionne avec de vraies données 4. La compatibilité des formats de fichiers 5. L'intégrité des données après chiffrement/déchiffrement Test de fonctionnalité réelle - utilise les vrais composants du système. """ import os import sys import tempfile import json import zipfile import hashlib from pathlib import Path from datetime import datetime # Ajouter les chemins nécessaires sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent / "agent_v0")) def load_real_environment_config(): """Charger la vraie configuration d'environnement du système.""" config = {} # Charger .env.local (configuration serveur) env_local_path = Path(".env.local") if env_local_path.exists(): with open(env_local_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) config[f"server_{key.strip()}"] = value.strip() # Charger agent_config.json (configuration agent) agent_config_path = Path("agent_config.json") if agent_config_path.exists(): with open(agent_config_path, 'r') as f: agent_config = json.load(f) for key, value in agent_config.items(): config[f"agent_{key}"] = value return config def create_realistic_test_session(): """Créer une session de test avec des données réalistes.""" from agent_v0.raw_session import RawSession # Créer une session avec des données qui ressemblent à une vraie session session = RawSession.create( user_id="test_encryption_sync", user_label="Test User", customer="Test Company", training_label="Encryption Test Workflow", notes="Test session for encryption validation", platform="linux", hostname="test_host", screen_resolution=[1920, 1080] ) # Ajouter des événements réalistes avec les vraies méthodes de l'API session.add_mouse_click_event( button="left", pos=[450, 320], window_title="Test Application", app_name="test_app", screenshot_id="shot_0001" ) session.add_key_combo_event( keys=["CTRL", "C"], window_title="Test Application", app_name="test_app", screenshot_id="shot_0002" ) session.add_hover_idle_event( pos=[500, 400], idle_ms=1200, window_title="Test Application", app_name="test_app", screenshot_id="shot_0003" ) session.add_scroll_event( pos=[600, 300], delta=[0, -3], window_title="Test Application", app_name="test_app", screenshot_id="shot_0004" ) # Ajouter des screenshots réalistes session.add_screenshot("shot_0001", "shots/shot_0001.png") session.add_screenshot("shot_0002", "shots/shot_0002.png") session.add_screenshot("shot_0003", "shots/shot_0003.png") session.add_screenshot("shot_0004", "shots/shot_0004.png") # Fermer la session session.close() return session def verify_zip_integrity(zip_path, expected_files=None): """Vérifier l'intégrité d'un fichier ZIP avec de vraies vérifications.""" try: with zipfile.ZipFile(zip_path, 'r') as zf: # Test d'intégrité du ZIP bad_files = zf.testzip() if bad_files: return False, f"Fichiers corrompus: {bad_files}" files = zf.namelist() # Vérifier la présence des fichiers attendus if expected_files: for expected_file in expected_files: if not any(f.endswith(expected_file) for f in files): return False, f"Fichier manquant: {expected_file}" # Vérifier qu'il y a au moins un JSON de session json_files = [f for f in files if f.endswith('.json')] if not json_files: return False, "Aucun fichier JSON de session trouvé" # Vérifier le contenu du JSON principal main_json = json_files[0] with zf.open(main_json) as json_file: session_data = json.load(json_file) # Vérifications de structure réaliste required_fields = ['schema_version', 'session_id', 'events', 'started_at'] for field in required_fields: if field not in session_data: return False, f"Champ manquant dans session: {field}" # Vérifier que les événements ont une structure cohérente if 'events' in session_data and session_data['events']: for event in session_data['events']: if 'type' not in event or 't' not in event: return False, "Structure d'événement invalide" return True, f"ZIP valide avec {len(files)} fichiers" except Exception as e: return False, f"Erreur lors de la vérification: {e}" def calculate_file_hash(file_path): """Calculer le hash SHA256 d'un fichier pour vérifier l'intégrité.""" sha256_hash = hashlib.sha256() with open(file_path, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): sha256_hash.update(chunk) return sha256_hash.hexdigest() def test_encryption_key_sync(): """Test complet de synchronisation des clés de chiffrement avec vraies données.""" print("=== Test de synchronisation des clés de chiffrement (Real Functionality) ===") # 1. Charger les vraies configurations du système print("\n1. Chargement des configurations réelles...") config = load_real_environment_config() server_encryption_password = config.get("server_ENCRYPTION_PASSWORD") agent_encryption_password = config.get("agent_encryption_password") print(f" Serveur ENCRYPTION_PASSWORD: {'✓ Défini' if server_encryption_password else '✗ Non défini'}") print(f" Agent encryption_password: {'✓ Défini' if agent_encryption_password else '✗ Non défini'}") if server_encryption_password: print(f" Serveur (preview): {server_encryption_password[:8]}...") if agent_encryption_password: print(f" Agent (preview): {agent_encryption_password[:8]}...") # 2. Vérifier que les clés correspondent print("\n2. Vérification de la correspondance des clés...") if not server_encryption_password: print("❌ Clé serveur non définie dans .env.local") return False if not agent_encryption_password: print("❌ Clé agent non définie dans agent_config.json") return False if server_encryption_password != agent_encryption_password: print("❌ Les clés ne correspondent pas!") print(f" Serveur: {server_encryption_password}") print(f" Agent: {agent_encryption_password}") return False print("✅ Les clés correspondent parfaitement") # 3. Test avec de vraies données et vrais composants print("\n3. Test du cycle complet avec données réalistes...") try: # Importer les vrais modules (pas de mocks) from agent_v0.storage_encrypted import create_session_zip_encrypted, decrypt_session_file # Créer une session réaliste test_session = create_realistic_test_session() print(f" Session créée: {test_session.session_id}") print(f" Événements: {len(test_session.events)}") print(f" Screenshots: {len(test_session.screenshots)}") with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) # Sauvegarder la session avec de vrais fichiers session_dir = tmpdir_path / test_session.session_id test_session.save_json(str(tmpdir_path)) # Créer de vrais fichiers de screenshots (même si vides pour le test) shots_dir = session_dir / "shots" shots_dir.mkdir(parents=True, exist_ok=True) for screenshot in test_session.screenshots: shot_path = shots_dir / screenshot.relative_path.split("/")[-1] # Créer un fichier PNG minimal mais valide shot_path.write_bytes(b'\x89PNG\r\n\x1a\n' + b'\x00' * 100) print(f" Fichiers créés dans: {session_dir}") # Calculer hash avant chiffrement original_files = list(session_dir.rglob("*")) original_hashes = {} for file_path in original_files: if file_path.is_file(): original_hashes[file_path.name] = calculate_file_hash(file_path) # Chiffrer avec le vrai système de chiffrement print(" Chiffrement en cours...") encrypted_path = create_session_zip_encrypted( test_session, agent_encryption_password, str(tmpdir_path), delete_unencrypted=False ) print(f" Fichier chiffré: {encrypted_path}") # Vérifications sur le fichier chiffré if not os.path.exists(encrypted_path): print("❌ Fichier chiffré non créé") return False encrypted_size = os.path.getsize(encrypted_path) print(f" Taille chiffrée: {encrypted_size} bytes") # Le fichier chiffré doit être plus grand que juste les métadonnées if encrypted_size < 100: # Au minimum salt + iv + données + padding print("❌ Fichier chiffré suspicieusement petit") return False # Déchiffrer avec le vrai système serveur print(" Déchiffrement en cours...") decrypted_path = decrypt_session_file( encrypted_path, server_encryption_password, str(tmpdir_path / "test_decrypted.zip") ) print(f" Fichier déchiffré: {decrypted_path}") # Vérifications approfondies du fichier déchiffré if not os.path.exists(decrypted_path): print("❌ Fichier déchiffré non créé") return False # Vérifier l'intégrité du ZIP déchiffré is_valid, message = verify_zip_integrity( decrypted_path, expected_files=['.json', '.png'] ) if not is_valid: print(f"❌ ZIP déchiffré invalide: {message}") return False print(f" ✅ {message}") # Extraire et vérifier le contenu extract_dir = tmpdir_path / "extracted" extract_dir.mkdir() with zipfile.ZipFile(decrypted_path, 'r') as zf: zf.extractall(extract_dir) # Vérifier que les données sont identiques extracted_files = list(extract_dir.rglob("*")) print(f" Fichiers extraits: {len([f for f in extracted_files if f.is_file()])}") # Vérifier l'intégrité des données par comparaison de hash integrity_check_passed = True for file_path in extracted_files: if file_path.is_file() and file_path.name in original_hashes: extracted_hash = calculate_file_hash(file_path) if extracted_hash != original_hashes[file_path.name]: print(f"❌ Intégrité compromise pour {file_path.name}") integrity_check_passed = False if integrity_check_passed: print(" ✅ Intégrité des données vérifiée") else: print("❌ Échec de vérification d'intégrité") return False # Test de performance (optionnel) file_size_kb = encrypted_size / 1024 if file_size_kb > 0: print(f" Performance: {file_size_kb:.1f} KB chiffrés") print("✅ Cycle complet chiffrement/déchiffrement réussi") except ImportError as e: print(f"❌ Modules de chiffrement non disponibles: {e}") print(" Vérifiez que agent_v0/storage_encrypted.py est accessible") return False except Exception as e: print(f"❌ Erreur lors du test de chiffrement: {e}") import traceback traceback.print_exc() return False # 4. Test de robustesse avec différents types de données print("\n4. Test de robustesse...") try: # Test avec des caractères spéciaux dans les données special_session = create_realistic_test_session() special_session.context["notes"] = "Test avec caractères spéciaux: éàü 中文 🚀" special_session.user["label"] = "Utilisateur Tëst" with tempfile.TemporaryDirectory() as tmpdir: special_session.save_json(tmpdir) encrypted_special = create_session_zip_encrypted( special_session, agent_encryption_password, tmpdir, delete_unencrypted=True ) decrypted_special = decrypt_session_file( encrypted_special, server_encryption_password, os.path.join(tmpdir, "special_decrypted.zip") ) # Vérifier que les caractères spéciaux sont préservés with zipfile.ZipFile(decrypted_special, 'r') as zf: json_files = [f for f in zf.namelist() if f.endswith('.json')] if json_files: with zf.open(json_files[0]) as json_file: restored_data = json.load(json_file) if restored_data.get("context", {}).get("notes") != special_session.context["notes"]: print("❌ Caractères spéciaux non préservés") return False print(" ✅ Caractères spéciaux préservés") except Exception as e: print(f"⚠️ Test de robustesse échoué: {e}") # Ne pas faire échouer le test principal pour ça print("\n=== Test terminé avec succès ===") print("✅ Synchronisation des clés validée") print("✅ Chiffrement/déchiffrement fonctionnel") print("✅ Intégrité des données vérifiée") print("✅ Compatibilité agent ↔ serveur confirmée") return True if __name__ == "__main__": success = test_encryption_key_sync() sys.exit(0 if success else 1)