Files
rpa_vision_v3/test_encryption_real_utils.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

368 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Real functionality testing utilities for encryption workflows.
Provides helper functions for testing actual encryption/decryption
without mocks or simulations.
"""
import os
import sys
import json
import zipfile
import tempfile
from pathlib import Path
from typing import Dict, Any, Optional, Tuple
# Add paths for imports
sys.path.insert(0, str(Path(__file__).parent / "agent_v0"))
sys.path.insert(0, str(Path(__file__).parent))
def create_realistic_session_with_files(base_dir: str) -> Tuple[Any, str]:
"""
Create a realistic RawSession with actual files on disk.
Returns:
(session_object, session_directory_path)
"""
from agent_v0.raw_session import RawSession
# Create session with realistic metadata
session = RawSession.create(
user_id="real_test_user",
user_label="Real Test User",
customer="Test Company",
training_label="Real Functionality Test",
notes="Testing real encryption workflow",
platform="linux",
hostname="test_workstation",
screen_resolution=[1920, 1080]
)
# Add realistic interaction events using correct API
session.add_mouse_click_event(
button="left",
pos=[450, 320],
window_title="Login Form",
app_name="firefox",
screenshot_id="shot_0001"
)
session.add_key_combo_event(
keys=["CTRL", "A"],
window_title="Login Form",
app_name="firefox",
screenshot_id="shot_0002"
)
session.add_hover_idle_event(
pos=[500, 350],
idle_ms=800,
window_title="Login Form",
app_name="firefox",
screenshot_id="shot_0003"
)
session.add_scroll_event(
pos=[600, 400],
delta=[0, -2],
window_title="Login Form",
app_name="firefox",
screenshot_id="shot_0004"
)
session.add_mouse_click_event(
button="left",
pos=[520, 450],
window_title="Login Form",
app_name="firefox",
screenshot_id="shot_0005"
)
# Add screenshot metadata
for i in range(1, 6):
shot_id = f"shot_{i:04d}"
shot_path = f"shots/{shot_id}.png"
session.add_screenshot(shot_id, shot_path)
# Save session to disk
session_json_path = session.save_json(base_dir)
session_dir = os.path.dirname(session_json_path)
# Create actual screenshot files
shots_dir = Path(session_dir) / "shots"
shots_dir.mkdir(exist_ok=True)
for i in range(1, 6):
shot_file = shots_dir / f"shot_{i:04d}.png"
# Create mock PNG files with different sizes to simulate real screenshots
png_header = b'\x89PNG\r\n\x1a\n'
mock_data = b'mock_screenshot_data_' + str(i).encode() * (100 + i * 50)
shot_file.write_bytes(png_header + mock_data)
return session, session_dir
def validate_encrypted_file(encrypted_path: str) -> Dict[str, Any]:
"""
Validate an encrypted ZIP file without decrypting it.
Returns:
Dictionary with validation results
"""
results = {
"exists": False,
"size_bytes": 0,
"is_zip": False,
"is_encrypted": False,
"error": None
}
try:
path = Path(encrypted_path)
if not path.exists():
results["error"] = "File does not exist"
return results
results["exists"] = True
results["size_bytes"] = path.stat().st_size
# Try to open as ZIP
try:
with zipfile.ZipFile(encrypted_path, 'r') as zf:
results["is_zip"] = True
# Try to read file list - if encrypted, this should fail
try:
file_list = zf.namelist()
results["is_encrypted"] = False # Not encrypted if we can read
results["file_count"] = len(file_list)
except Exception:
results["is_encrypted"] = True # Encrypted if we can't read
except zipfile.BadZipFile:
results["error"] = "Not a valid ZIP file"
except Exception as e:
results["error"] = f"ZIP validation error: {e}"
except Exception as e:
results["error"] = f"Validation error: {e}"
return results
def validate_decrypted_content(decrypted_path: str, expected_session_id: str) -> Dict[str, Any]:
"""
Validate decrypted content matches expected session structure.
Returns:
Dictionary with validation results
"""
results = {
"valid_zip": False,
"has_json": False,
"has_screenshots": False,
"session_id_matches": False,
"event_count": 0,
"screenshot_count": 0,
"file_count": 0,
"error": None
}
try:
with zipfile.ZipFile(decrypted_path, 'r') as zf:
results["valid_zip"] = True
file_list = zf.namelist()
results["file_count"] = len(file_list)
# Find JSON file
json_files = [f for f in file_list if f.endswith('.json')]
if json_files:
results["has_json"] = True
# Validate JSON content
json_content = zf.read(json_files[0])
session_data = json.loads(json_content)
# Check session ID
if session_data.get('session_id') == expected_session_id:
results["session_id_matches"] = True
# Count events and screenshots
results["event_count"] = len(session_data.get('events', []))
results["screenshot_count"] = len(session_data.get('screenshots', []))
# Check for screenshot files
png_files = [f for f in file_list if f.endswith('.png')]
if png_files:
results["has_screenshots"] = True
results["actual_screenshot_files"] = len(png_files)
except Exception as e:
results["error"] = f"Content validation error: {e}"
return results
def compare_encryption_methods(session, password1: str, password2: str, base_dir: str) -> Dict[str, Any]:
"""
Compare encryption results using different passwords.
Returns:
Comparison results
"""
from agent_v0.storage_encrypted import create_session_zip_encrypted
from server.storage_encrypted import decrypt_session_file
results = {
"password1_encrypt": False,
"password2_encrypt": False,
"cross_decrypt_1_2": False, # Encrypt with 1, decrypt with 2
"cross_decrypt_2_1": False, # Encrypt with 2, decrypt with 1
"same_decrypt_1": False, # Encrypt with 1, decrypt with 1
"same_decrypt_2": False, # Encrypt with 2, decrypt with 2
"errors": []
}
try:
with tempfile.TemporaryDirectory() as tmpdir:
# Test encryption with password1
try:
encrypted1 = create_session_zip_encrypted(
session, password1, tmpdir, delete_unencrypted=False
)
results["password1_encrypt"] = True
# Test decryption with same password
try:
decrypt_session_file(
encrypted1, password1,
os.path.join(tmpdir, "decrypt1_same.zip")
)
results["same_decrypt_1"] = True
except Exception as e:
results["errors"].append(f"Same decrypt 1 failed: {e}")
# Test cross-decryption with password2
try:
decrypt_session_file(
encrypted1, password2,
os.path.join(tmpdir, "decrypt1_cross.zip")
)
results["cross_decrypt_1_2"] = True
except Exception as e:
results["errors"].append(f"Cross decrypt 1->2 failed: {e}")
except Exception as e:
results["errors"].append(f"Encrypt with password1 failed: {e}")
# Test encryption with password2
try:
encrypted2 = create_session_zip_encrypted(
session, password2, tmpdir, delete_unencrypted=False
)
results["password2_encrypt"] = True
# Test decryption with same password
try:
decrypt_session_file(
encrypted2, password2,
os.path.join(tmpdir, "decrypt2_same.zip")
)
results["same_decrypt_2"] = True
except Exception as e:
results["errors"].append(f"Same decrypt 2 failed: {e}")
# Test cross-decryption with password1
try:
decrypt_session_file(
encrypted2, password1,
os.path.join(tmpdir, "decrypt2_cross.zip")
)
results["cross_decrypt_2_1"] = True
except Exception as e:
results["errors"].append(f"Cross decrypt 2->1 failed: {e}")
except Exception as e:
results["errors"].append(f"Encrypt with password2 failed: {e}")
except Exception as e:
results["errors"].append(f"Overall test failed: {e}")
return results
def load_environment_safely() -> Tuple[Dict[str, Optional[str]], bool]:
"""
Load environment variables from .env.local safely.
Returns:
(original_env_backup, success)
"""
original_env = {}
success = False
try:
env_local_path = Path(".env.local")
if env_local_path.exists():
with open(env_local_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
# Backup original value
original_env[key] = os.environ.get(key)
os.environ[key] = value
success = True
else:
print("Warning: No .env.local file found")
except Exception as e:
print(f"Error loading environment: {e}")
return original_env, success
def restore_environment_safely(original_env: Dict[str, Optional[str]]) -> None:
"""Restore environment variables safely."""
try:
for key, value in original_env.items():
if value is None:
os.environ.pop(key, None)
else:
os.environ[key] = value
except Exception as e:
print(f"Warning: Error restoring environment: {e}")
def print_test_summary(test_name: str, results: Dict[str, Any]) -> None:
"""Print a formatted test summary."""
print(f"\n📋 {test_name} Summary:")
print("-" * 50)
for key, value in results.items():
if key == "errors" and isinstance(value, list):
if value:
print(f"❌ Errors ({len(value)}):")
for error in value:
print(f"{error}")
else:
print("✅ No errors")
elif isinstance(value, bool):
status = "" if value else ""
print(f"{status} {key.replace('_', ' ').title()}: {value}")
elif isinstance(value, (int, str)):
print(f" {key.replace('_', ' ').title()}: {value}")
if __name__ == "__main__":
print("Testing encryption utilities...")
# Test utility functions
with tempfile.TemporaryDirectory() as tmpdir:
session, session_dir = create_realistic_session_with_files(tmpdir)
print(f"✅ Created test session: {session.session_id}")
print(f" Session directory: {session_dir}")
print(f" Files created: {len(list(Path(session_dir).rglob('*')))}")