#!/usr/bin/env python3 """ Test the encryption fix - agent should now use shared password from environment. """ import os import sys import tempfile from pathlib import Path # Load environment env_local_path = Path(".env.local") if env_local_path.exists(): with open(env_local_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() # Add paths sys.path.insert(0, str(Path(__file__).parent / "agent_v0")) sys.path.insert(0, str(Path(__file__).parent)) def test_encryption_fix(): """Test that agent now uses shared password from environment.""" from agent_v0.storage_encrypted import create_session_zip_with_encryption from server.storage_encrypted import decrypt_session_file from agent_v0.raw_session import RawSession from agent_v0.user_config import load_user_config print("=== Testing Encryption Fix ===") # Load agent config config = load_user_config() print(f"Agent config encryption_password: {config.get('encryption_password')}") # Get environment password env_password = os.getenv("ENCRYPTION_PASSWORD") print(f"Environment ENCRYPTION_PASSWORD: {env_password[:20] if env_password else 'NOT_SET'}...") # Create test session test_session = RawSession.create( user_id="test_encryption_fix", platform="linux", hostname="test_host", screen_resolution=[1920, 1080] ) with tempfile.TemporaryDirectory() as tmpdir: # Save the session test_session.save_json(tmpdir) # Agent creates encrypted ZIP (should now use environment password) try: zip_path = create_session_zip_with_encryption( test_session, base_dir=tmpdir, enable_encryption=config.get("enable_encryption", True), password=config.get("encryption_password") # This is None, should use env ) print(f"Agent created encrypted file: {zip_path}") except Exception as e: print(f"Agent encryption failed: {e}") return False # Server decrypts with same environment password try: decrypted_path = decrypt_session_file( zip_path, env_password, os.path.join(tmpdir, "decrypted.zip") ) print(f"Server decryption successful: {decrypted_path}") # Verify it's a valid ZIP import zipfile with zipfile.ZipFile(decrypted_path, 'r') as zf: files = zf.namelist() print(f"ZIP contains {len(files)} files: {files[:3]}") return True except Exception as e: print(f"Server decryption failed: {e}") return False if __name__ == "__main__": print("Testing encryption fix...") result = test_encryption_fix() if result: print("\nENCRYPTION FIX SUCCESSFUL!") print("Agent and server now use the same shared password from ENCRYPTION_PASSWORD") else: print("\nFix failed - need further investigation")