#!/usr/bin/env python3 """ Real functionality test for encryption key mismatch between agent and server. This test validates the actual encryption/decryption flow using real: - Environment variable loading (.env.local) - File system operations (create, encrypt, decrypt) - Agent and server encryption implementations - Actual password generation and validation No mocks - tests the complete encryption workflow as it runs in production. """ import os import sys import tempfile import shutil from pathlib import Path # Add paths for imports sys.path.insert(0, str(Path(__file__).parent / "agent_v0")) sys.path.insert(0, str(Path(__file__).parent)) def load_real_environment(): """Load actual environment variables from .env.local file.""" env_local_path = Path(".env.local") original_env = {} if env_local_path.exists(): print(f"Loading environment from: {env_local_path}") with open(env_local_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Store original value for cleanup original_env[key] = os.environ.get(key) os.environ[key] = value print(f" Set {key}={value[:20]}...") else: print("No .env.local file found - using system environment") return original_env def restore_environment(original_env): """Restore original environment variables.""" for key, value in original_env.items(): if value is None: os.environ.pop(key, None) else: os.environ[key] = value def create_real_session_data(): """Create a real RawSession with actual data structure.""" from agent_v0.raw_session import RawSession # Create session with realistic data session = RawSession.create( user_id="test_encryption_user", user_label="Test Encryption User", customer="Test Company", training_label="Encryption Test Workflow", notes="Testing encryption key mismatch", platform="linux", hostname="test_encryption_host", screen_resolution=[1920, 1080] ) # Add some realistic events using the correct API session.add_mouse_click_event( button="left", pos=[450, 320], window_title="Test App", app_name="test.exe", screenshot_id="shot_0001" ) session.add_key_combo_event( keys=["CTRL", "C"], window_title="Test App", app_name="test.exe", screenshot_id="shot_0002" ) session.add_hover_idle_event( pos=[500, 350], idle_ms=1200, window_title="Test App", app_name="test.exe", screenshot_id="shot_0003" ) session.add_scroll_event( pos=[600, 400], delta=[0, -3], window_title="Test App", app_name="test.exe", screenshot_id="shot_0004" ) return session def test_encryption_key_mismatch(): """Test that demonstrates the real encryption key mismatch issue.""" # Load real environment original_env = load_real_environment() try: # Import after setting environment from agent_v0.storage_encrypted import create_session_zip_encrypted from server.storage_encrypted import decrypt_session_file print("=== Testing Real Encryption Key Mismatch ===") # Get actual passwords used by agent and server server_password = os.getenv("ENCRYPTION_PASSWORD", "rpa_vision_v3_default_key") print(f"Server password from env: {server_password[:20]}...") # Create real session with actual data test_session = create_real_session_data() session_id = test_session.session_id # Agent generates session-based password (current behavior) agent_password = f"rpa_vision_v3_{session_id}" print(f"Agent password (session-based): {agent_password[:20]}...") print(f"Passwords match: {agent_password == server_password}") with tempfile.TemporaryDirectory() as tmpdir: # Save the real session to filesystem session_dir = test_session.save_json(tmpdir) print(f"Session saved to: {session_dir}") # Agent encrypts with session-based password (real implementation) try: encrypted_path = create_session_zip_encrypted( test_session, agent_password, # Agent uses session-based password tmpdir, delete_unencrypted=False ) print(f"✅ Agent encryption successful: {encrypted_path}") # Verify encrypted file exists and has content if not Path(encrypted_path).exists(): print(f"❌ Encrypted file not created: {encrypted_path}") return False file_size = Path(encrypted_path).stat().st_size print(f" Encrypted file size: {file_size} bytes") except Exception as e: print(f"❌ Agent encryption failed: {e}") return False # Server tries to decrypt with environment password (real implementation) try: decrypted_path = decrypt_session_file( encrypted_path, server_password, # Server uses environment password os.path.join(tmpdir, "server_decrypted.zip") ) print(f"✅ Server decryption successful: {decrypted_path}") # If this succeeds, there's no mismatch (unexpected) print("⚠️ No mismatch detected - passwords are compatible") return True except Exception as e: print(f"❌ Server decryption failed: {e}") print(f" This confirms the key mismatch issue!") # Verify the mismatch by trying with correct password try: decrypted_path = decrypt_session_file( encrypted_path, agent_password, # Use agent's password os.path.join(tmpdir, "correct_decrypted.zip") ) print(f"✅ Decryption with agent password works: {decrypted_path}") # Verify decrypted content if Path(decrypted_path).exists(): decrypted_size = Path(decrypted_path).stat().st_size print(f" Decrypted file size: {decrypted_size} bytes") return False # Mismatch confirmed except Exception as e2: print(f"❌ Even agent password doesn't work: {e2}") return False finally: # Always restore environment restore_environment(original_env) def test_fix_with_shared_password(): """Test the fix using a shared password from environment - real implementation.""" # Load real environment original_env = load_real_environment() try: # Import after setting environment from agent_v0.storage_encrypted import create_session_zip_encrypted from server.storage_encrypted import decrypt_session_file print("\n=== Testing Fix with Shared Password (Real Implementation) ===") # Use the actual shared password from environment shared_password = os.getenv("ENCRYPTION_PASSWORD", "rpa_vision_v3_default_key") print(f"Shared password from env: {shared_password[:20]}...") # Create real session with actual data test_session = create_real_session_data() with tempfile.TemporaryDirectory() as tmpdir: # Save the real session to filesystem session_dir = test_session.save_json(tmpdir) print(f"Session saved to: {session_dir}") # Agent encrypts with shared password (fixed behavior) try: encrypted_path = create_session_zip_encrypted( test_session, shared_password, # Use shared password instead of session-based tmpdir, delete_unencrypted=False ) print(f"✅ Agent encryption with shared password: {encrypted_path}") # Verify encrypted file if not Path(encrypted_path).exists(): print(f"❌ Encrypted file not created: {encrypted_path}") return False file_size = Path(encrypted_path).stat().st_size print(f" Encrypted file size: {file_size} bytes") except Exception as e: print(f"❌ Agent encryption failed: {e}") return False # Server decrypts with same shared password (real implementation) try: decrypted_path = decrypt_session_file( encrypted_path, shared_password, # Same shared password os.path.join(tmpdir, "shared_decrypted.zip") ) print(f"✅ Server decryption with shared password: {decrypted_path}") # Verify decrypted file if not Path(decrypted_path).exists(): print(f"❌ Decrypted file not created: {decrypted_path}") return False decrypted_size = Path(decrypted_path).stat().st_size print(f" Decrypted file size: {decrypted_size} bytes") # Additional validation: try to extract and validate content try: import zipfile with zipfile.ZipFile(decrypted_path, 'r') as zf: file_list = zf.namelist() print(f" Decrypted ZIP contains: {len(file_list)} files") if file_list: print(f" First file: {file_list[0]}") except Exception as e: print(f" Warning: Could not validate ZIP content: {e}") return True except Exception as e: print(f"❌ Server decryption failed: {e}") return False finally: # Always restore environment restore_environment(original_env) def test_end_to_end_encryption_workflow(): """Test the complete end-to-end encryption workflow with real data.""" # Load real environment original_env = load_real_environment() try: from agent_v0.storage_encrypted import create_session_zip_encrypted from server.storage_encrypted import decrypt_session_file import zipfile import json print("\n=== Testing End-to-End Encryption Workflow ===") # Create a comprehensive session with multiple events and screenshots test_session = create_real_session_data() # Add more realistic events using correct API test_session.add_mouse_click_event( button="left", pos=[520, 450], window_title="Login Form", app_name="browser.exe", screenshot_id="shot_0005" ) # Add screenshot metadata test_session.add_screenshot("shot_0001", "shots/shot_0001.png") test_session.add_screenshot("shot_0002", "shots/shot_0002.png") test_session.add_screenshot("shot_0003", "shots/shot_0003.png") test_session.add_screenshot("shot_0004", "shots/shot_0004.png") test_session.add_screenshot("shot_0005", "shots/shot_0005.png") shared_password = os.getenv("ENCRYPTION_PASSWORD", "rpa_vision_v3_default_key") with tempfile.TemporaryDirectory() as tmpdir: # Save session with real file structure session_json_path = test_session.save_json(tmpdir) session_dir = os.path.dirname(session_json_path) # Create mock screenshot files to simulate real workflow shots_dir = Path(session_dir) / "shots" shots_dir.mkdir(exist_ok=True) for i in range(1, 6): shot_file = shots_dir / f"shot_{i:04d}.png" # Create a small mock PNG file shot_file.write_bytes(b'\x89PNG\r\n\x1a\n' + b'mock_image_data' * 10) print(f"Created session with {len(list(shots_dir.glob('*.png')))} screenshot files") # Test encryption with real file structure encrypted_path = create_session_zip_encrypted( test_session, shared_password, tmpdir, delete_unencrypted=False ) print(f"✅ Encrypted session: {encrypted_path}") # Verify encrypted file exists and has content (but don't try to read as ZIP - it's encrypted!) encrypted_size = Path(encrypted_path).stat().st_size print(f" Encrypted file size: {encrypted_size} bytes") # Test decryption decrypted_path = decrypt_session_file( encrypted_path, shared_password, os.path.join(tmpdir, "end_to_end_decrypted.zip") ) print(f"✅ Decrypted session: {decrypted_path}") # Verify decrypted content matches original with zipfile.ZipFile(decrypted_path, 'r') as zf: decrypted_files = zf.namelist() print(f" Decrypted ZIP contains {len(decrypted_files)} files") # Verify JSON content json_files = [f for f in decrypted_files if f.endswith('.json')] if json_files: json_content = zf.read(json_files[0]) session_data = json.loads(json_content) print(f" Session ID: {session_data.get('session_id', 'unknown')}") print(f" Events: {len(session_data.get('events', []))}") print(f" Screenshots: {len(session_data.get('screenshots', []))}") # Verify screenshot files png_files = [f for f in decrypted_files if f.endswith('.png')] print(f" Screenshot files: {len(png_files)}") return len(decrypted_files) > 0 # Success if we have decrypted files except Exception as e: print(f"❌ End-to-end test failed: {e}") import traceback traceback.print_exc() return False finally: restore_environment(original_env) if __name__ == "__main__": print("Testing real encryption functionality between agent and server...") print("=" * 60) # Test 1: Demonstrate the mismatch with real implementations print("\n🔍 Test 1: Encryption Key Mismatch Detection") mismatch_result = test_encryption_key_mismatch() # Test 2: Test the fix with real implementations print("\n🔧 Test 2: Shared Password Fix") fix_result = test_fix_with_shared_password() # Test 3: Complete end-to-end workflow print("\n🚀 Test 3: End-to-End Encryption Workflow") e2e_result = test_end_to_end_encryption_workflow() print(f"\n{'=' * 60}") print(f"📊 RESULTS SUMMARY") print(f"{'=' * 60}") print(f"Mismatch detection: {'✅ DETECTED' if not mismatch_result else '❌ NOT DETECTED'}") print(f"Shared password fix: {'✅ WORKS' if fix_result else '❌ FAILED'}") print(f"End-to-end workflow: {'✅ WORKS' if e2e_result else '❌ FAILED'}") if not mismatch_result and fix_result and e2e_result: print(f"\n🎉 ALL TESTS PASSED!") print(f"✅ Key mismatch confirmed and fix validated!") print(f"✅ Complete encryption workflow tested successfully!") print(f"\n📋 Next steps:") print(f" 1. Update agent to use ENCRYPTION_PASSWORD from environment") print(f" 2. Ensure both agent and server load the same .env.local file") print(f" 3. Test with production environment variables") else: print(f"\n⚠️ SOME TESTS FAILED - Investigation needed:") if mismatch_result: print(f" - Mismatch not detected (unexpected)") if not fix_result: print(f" - Shared password fix failed") if not e2e_result: print(f" - End-to-end workflow failed") # Return appropriate exit code sys.exit(0 if (not mismatch_result and fix_result and e2e_result) else 1)