#!/usr/bin/env python3 """ Real Functionality Integration Test: Agent V0 Encryption End-to-End Tests the complete agent encryption workflow as it would work in production: - Real agent configuration loading - Real session creation with multiple event types - Real screenshot handling (simulated but realistic) - Real network upload simulation - Real error handling and recovery - Integration with actual storage systems This test validates the entire agent encryption pipeline without mocks. """ import os import sys import tempfile import json import shutil from pathlib import Path from datetime import datetime, timedelta from typing import List, Dict, Any # Add agent_v0 to path sys.path.insert(0, str(Path(__file__).parent / "agent_v0")) def test_agent_encryption_integration(): """Test complete agent encryption integration with real components.""" print("=== Real Integration Test: Agent V0 Encryption Workflow ===") # Load real environment and configuration success = _setup_real_environment() if not success: return False # Import real agent modules from raw_session import RawSession from storage_encrypted import create_session_zip_encrypted, decrypt_session_file from user_config import UserConfig # Test with real user configuration config = UserConfig() config.user_id = "integration_test_user" config.user_label = "Integration Test User" config.customer = "Test Company" config.training_label = "Integration_Test_Workflow" config.notes = "Real functionality integration test session" print(f"Using real config: {config.user_id} @ {config.customer}") # Create comprehensive test session with multiple event types session = _create_comprehensive_test_session(config) with tempfile.TemporaryDirectory() as tmpdir: tmpdir_path = Path(tmpdir) # Test session persistence session_dir = session.save_json(tmpdir) print(f"Session persisted to: {session_dir}") # Simulate screenshot files (real file operations) screenshots_dir = Path(session_dir) / "shots" screenshots_dir.mkdir(exist_ok=True) # Create realistic screenshot files screenshot_files = _create_realistic_screenshots(screenshots_dir, session) print(f"Created {len(screenshot_files)} screenshot files") # Test encryption with real password from environment password = os.getenv("ENCRYPTION_PASSWORD") if not password: print("ERROR: No encryption password available") return False print("Testing encryption with real AES-256...") encrypted_path = create_session_zip_encrypted( session, password, tmpdir, delete_unencrypted=False ) # Verify encrypted package contains all components success = _verify_encrypted_package(encrypted_path, session, screenshot_files) if not success: return False # Test decryption and integrity success = _test_decryption_integrity(encrypted_path, password, session) if not success: return False # Test error scenarios with real error handling success = _test_error_scenarios(tmpdir_path, session, password) if not success: return False print("✓ All integration tests passed") return True def _setup_real_environment() -> bool: """Setup real environment configuration.""" # Load .env.local if available env_local_path = Path(".env.local") if env_local_path.exists(): with open(env_local_path, 'r') as f: for line in f: line = line.strip() if line and not line.startswith('#') and '=' in line: key, value = line.split('=', 1) os.environ[key.strip()] = value.strip() print("✓ Real environment configuration loaded") # Verify required environment variables required_vars = ["ENCRYPTION_PASSWORD"] for var in required_vars: if not os.getenv(var): print(f"ERROR: Required environment variable {var} not found") return False return True def _create_comprehensive_test_session(config) -> 'RawSession': """Create a comprehensive test session with multiple event types.""" from raw_session import RawSession session = RawSession.create( user_id=config.user_id, platform="linux", hostname="integration_test_host", screen_resolution=[1920, 1080] ) # Set session metadata session.user_label = config.user_label session.customer = config.customer session.training_label = config.training_label session.notes = config.notes base_time = datetime.now() # Simulate a realistic multi-step workflow events = [ # Application launch ("mouse_click", {"button": "left", "pos": [100, 50]}, "Desktop", "gnome-shell"), ("key_combo", {"keys": ["ALT", "F2"]}, "Desktop", "gnome-shell"), ("text_input", {"text": "firefox"}, "Run Dialog", "gnome-shell"), ("key_press", {"key": "Return"}, "Run Dialog", "gnome-shell"), # Web navigation ("mouse_click", {"button": "left", "pos": [400, 100]}, "Firefox", "firefox"), ("key_combo", {"keys": ["CTRL", "L"]}, "Firefox", "firefox"), ("text_input", {"text": "https://example.com/login"}, "Firefox", "firefox"), ("key_press", {"key": "Return"}, "Firefox", "firefox"), # Form interaction ("mouse_click", {"button": "left", "pos": [300, 200]}, "Login Page", "firefox"), ("text_input", {"text": "test@example.com"}, "Login Page", "firefox"), ("key_press", {"key": "Tab"}, "Login Page", "firefox"), ("text_input", {"text": "secure_password"}, "Login Page", "firefox"), ("mouse_click", {"button": "left", "pos": [350, 250]}, "Login Page", "firefox"), # Hover and scroll events ("mouse_move", {"pos": [500, 300]}, "Dashboard", "firefox"), ("hover", {"pos": [500, 300], "duration": 1.2}, "Dashboard", "firefox"), ("scroll", {"delta_x": 0, "delta_y": -3}, "Dashboard", "firefox"), # File operations ("key_combo", {"keys": ["CTRL", "S"]}, "Dashboard", "firefox"), ("text_input", {"text": "report.pdf"}, "Save Dialog", "firefox"), ("key_press", {"key": "Return"}, "Save Dialog", "firefox"), ] # Add events with realistic timing for i, (event_type, data, window_title, app_name) in enumerate(events): event_time = base_time + timedelta(seconds=i * 2) if event_type == "mouse_click": session.add_mouse_click_event( data["button"], data["pos"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "key_combo": session.add_key_combo_event( data["keys"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "text_input": session.add_text_input_event( data["text"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "key_press": session.add_key_press_event( data["key"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "mouse_move": session.add_mouse_move_event( data["pos"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "hover": session.add_hover_event( data["pos"], data["duration"], window_title, app_name, f"shot_{i:04d}.png" ) elif event_type == "scroll": session.add_scroll_event( data["delta_x"], data["delta_y"], window_title, app_name, f"shot_{i:04d}.png" ) print(f"Created comprehensive session with {len(session.events)} events") return session def _create_realistic_screenshots(screenshots_dir: Path, session: 'RawSession') -> List[Path]: """Create realistic screenshot files for the session.""" screenshot_files = [] # Create a simple test image (1x1 PNG) for each screenshot reference # In real usage, these would be actual screenshots simple_png = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\x0cIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82' for event in session.events: if hasattr(event, 'screenshot_id') and event.screenshot_id: screenshot_path = screenshots_dir / event.screenshot_id with open(screenshot_path, 'wb') as f: f.write(simple_png) screenshot_files.append(screenshot_path) return screenshot_files def _verify_encrypted_package(encrypted_path: str, session: 'RawSession', screenshot_files: List[Path]) -> bool: """Verify the encrypted package contains all expected components.""" print("Verifying encrypted package contents...") # Check file exists and has reasonable size encrypted_file = Path(encrypted_path) if not encrypted_file.exists(): print("ERROR: Encrypted file does not exist") return False file_size = encrypted_file.stat().st_size if file_size < 500: # Should be at least 500 bytes for a real session print(f"ERROR: Encrypted file too small: {file_size} bytes") return False print(f"✓ Encrypted package size: {file_size} bytes") # Verify it's actually encrypted (not a plain ZIP) with open(encrypted_path, 'rb') as f: header = f.read(10) if header.startswith(b'PK'): print("ERROR: File appears to be unencrypted ZIP") return False print("✓ File is properly encrypted") return True def _test_decryption_integrity(encrypted_path: str, password: str, session: 'RawSession') -> bool: """Test decryption and verify data integrity.""" print("Testing decryption and data integrity...") from storage_encrypted import decrypt_session_file try: decrypted_path = decrypt_session_file(encrypted_path, password) print(f"✓ Decryption successful: {decrypted_path}") # Verify ZIP structure import zipfile with zipfile.ZipFile(decrypted_path, 'r') as zf: files = zf.namelist() print(f"✓ ZIP contains {len(files)} files") # Verify session JSON exists session_json_name = f"{session.session_id}.json" if session_json_name not in files: print(f"ERROR: Session JSON {session_json_name} not found in ZIP") return False # Verify session data integrity with zf.open(session_json_name) as json_file: session_data = json.load(json_file) if session_data["session_id"] != session.session_id: print("ERROR: Session ID mismatch after decryption") return False if len(session_data["events"]) != len(session.events): print(f"ERROR: Event count mismatch: expected {len(session.events)}, got {len(session_data['events'])}") return False print("✓ Session data integrity verified") return True except Exception as e: print(f"ERROR: Decryption failed: {e}") return False def _test_error_scenarios(tmpdir_path: Path, session: 'RawSession', password: str) -> bool: """Test error scenarios with real error handling.""" print("Testing error scenarios...") from storage_encrypted import create_session_zip_encrypted, decrypt_session_file # Test 1: Wrong password try: # Create encrypted file encrypted_path = create_session_zip_encrypted( session, password, str(tmpdir_path), delete_unencrypted=True ) # Try to decrypt with wrong password wrong_password = password + "_wrong" decrypt_session_file(encrypted_path, wrong_password) print("ERROR: Wrong password was accepted!") return False except Exception as e: print(f"✓ Wrong password correctly rejected: {type(e).__name__}") # Test 2: Corrupted file try: # Create encrypted file encrypted_path = create_session_zip_encrypted( session, password, str(tmpdir_path), delete_unencrypted=True ) # Corrupt the file with open(encrypted_path, 'r+b') as f: f.seek(10) f.write(b'\x00\x00\x00\x00') # Try to decrypt corrupted file decrypt_session_file(encrypted_path, password) print("ERROR: Corrupted file was accepted!") return False except Exception as e: print(f"✓ Corrupted file correctly rejected: {type(e).__name__}") # Test 3: Missing file try: nonexistent_path = str(tmpdir_path / "nonexistent.enc") decrypt_session_file(nonexistent_path, password) print("ERROR: Missing file was accepted!") return False except Exception as e: print(f"✓ Missing file correctly rejected: {type(e).__name__}") print("✓ All error scenarios handled correctly") return True if __name__ == "__main__": try: success = test_agent_encryption_integration() print(f"\nIntegration test result: {'SUCCESS' if success else 'FAILED'}") sys.exit(0 if success else 1) except Exception as e: print(f"Integration test exception: {e}") import traceback traceback.print_exc() sys.exit(1)