Files
rpa_vision_v3/test_agent_upload.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

366 lines
13 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Real functionality test for agent upload system.
Tests the complete upload flow using actual RPA Vision V3 components:
- Creates a real RawSession with proper schema
- Uses actual agent storage and encryption
- Tests real server authentication and processing
- Validates complete data flow end-to-end
"""
import os
import sys
import tempfile
import zipfile
import json
from pathlib import Path
from datetime import datetime
from typing import Dict, Any
import requests
# Add project root to path for imports
sys.path.insert(0, str(Path(__file__).parent))
# Import real RPA Vision V3 components
from core.models.raw_session import RawSession, Event, Screenshot, RawWindowContext
from agent_v0.storage_encrypted import create_session_zip_encrypted
from agent_v0.uploader import upload_session_zip
# Load environment configuration
def load_env_config() -> Dict[str, str]:
"""Load environment variables from .env.local"""
env_vars = {}
env_file = Path(".env.local")
if env_file.exists():
with open(env_file, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
env_vars[key] = value
os.environ[key] = value
return env_vars
# Configuration
env_config = load_env_config()
ADMIN_TOKEN = env_config.get('RPA_TOKEN_ADMIN') or os.getenv('RPA_TOKEN_ADMIN')
ENCRYPTION_PASSWORD = env_config.get('ENCRYPTION_PASSWORD') or os.getenv('ENCRYPTION_PASSWORD')
API_URL = "http://localhost:8000/api/traces/upload"
def create_realistic_raw_session() -> RawSession:
"""Create a realistic RawSession with proper schema and data."""
session_id = f"test_session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Create session with real structure
session = RawSession(
session_id=session_id,
agent_version="0.1.0",
environment={
"platform": "linux",
"hostname": "test-machine",
"screen": {
"primary_resolution": [1920, 1080],
"display_scale": 1.0
}
},
user={
"id": "test_user",
"label": "Test User"
},
context={
"customer": "Test Organization",
"training_label": "Upload Test Workflow",
"notes": "Automated test session for upload functionality"
},
started_at=datetime.now(),
ended_at=datetime.now()
)
# Add realistic events
window_context = RawWindowContext(
title="Test Application",
app_name="test_app"
)
# Mouse click event
click_event = Event(
t=1.0,
type="mouse_click",
window=window_context,
screenshot_id="shot_0001",
data={
"button": "left",
"pos": [450, 320]
}
)
session.add_event(click_event)
# Keyboard input event
key_event = Event(
t=2.5,
type="key_combo",
window=window_context,
screenshot_id="shot_0002",
data={
"keys": ["CTRL", "C"]
}
)
session.add_event(key_event)
# Add screenshots
screenshot1 = Screenshot(
screenshot_id="shot_0001",
relative_path="shots/shot_0001.png",
captured_at=datetime.now().isoformat()
)
session.add_screenshot(screenshot1)
screenshot2 = Screenshot(
screenshot_id="shot_0002",
relative_path="shots/shot_0002.png",
captured_at=datetime.now().isoformat()
)
session.add_screenshot(screenshot2)
return session
def create_session_files(session: RawSession, base_dir: Path) -> Path:
"""Create actual session files on disk like the real agent does."""
session_dir = base_dir / session.session_id
session_dir.mkdir(parents=True, exist_ok=True)
# Create shots directory
shots_dir = session_dir / "shots"
shots_dir.mkdir(exist_ok=True)
# Save session JSON
session_json_path = session_dir / f"{session.session_id}.json"
session.save_to_file(session_json_path)
# Create dummy screenshot files
for screenshot in session.screenshots:
shot_path = session_dir / screenshot.relative_path
shot_path.parent.mkdir(parents=True, exist_ok=True)
# Create a minimal PNG file (1x1 pixel)
png_data = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\tpHYs\x00\x00\x0b\x13\x00\x00\x0b\x13\x01\x00\x9a\x9c\x18\x00\x00\x00\x0cIDATx\x9cc```\x00\x00\x00\x04\x00\x01\xdd\x8d\xb4\x1c\x00\x00\x00\x00IEND\xaeB`\x82'
with open(shot_path, 'wb') as f:
f.write(png_data)
return session_dir
def test_real_upload_flow():
"""Test the complete upload flow using real components."""
print("🧪 Testing Real Agent Upload Flow")
print("=" * 50)
if not ADMIN_TOKEN:
print("❌ No admin token found in environment")
return False
if not ENCRYPTION_PASSWORD:
print("❌ No encryption password found in environment")
return False
print(f"📡 API URL: {API_URL}")
print(f"🔑 Using token: {ADMIN_TOKEN[:8]}...")
print(f"🔒 Encryption: {'Enabled' if ENCRYPTION_PASSWORD else 'Disabled'}")
print()
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
try:
# 1. Create realistic RawSession
print("1⃣ Creating realistic RawSession...")
session = create_realistic_raw_session()
print(f" Session ID: {session.session_id}")
print(f" Events: {len(session.events)}")
print(f" Screenshots: {len(session.screenshots)}")
print(f" User: {session.user['label']}")
# 2. Create session files on disk
print("\n2⃣ Creating session files...")
session_dir = create_session_files(session, temp_path)
print(f" Session directory: {session_dir}")
# Verify files exist
json_file = session_dir / f"{session.session_id}.json"
assert json_file.exists(), "Session JSON not created"
# Verify JSON is valid RawSession
loaded_session = RawSession.load_from_file(json_file)
assert loaded_session.session_id == session.session_id
print(f" ✅ Session JSON validated")
# 3. Create encrypted ZIP using real agent code
print("\n3⃣ Creating encrypted ZIP...")
encrypted_path = create_session_zip_encrypted(
session=session,
password=ENCRYPTION_PASSWORD,
base_dir=str(temp_path)
)
print(f" Encrypted file: {encrypted_path}")
print(f" File size: {os.path.getsize(encrypted_path)} bytes")
# 4. Test upload using real uploader
print("\n4⃣ Testing upload with real uploader...")
# Set environment for uploader
os.environ['RPA_TOKEN_ADMIN'] = ADMIN_TOKEN
# Use real uploader function
success = upload_session_zip(
zip_path=encrypted_path,
session_id=session.session_id,
max_retries=1
)
if success:
print(" ✅ Upload successful via real uploader!")
else:
print(" ❌ Upload failed via real uploader")
return False
# 5. Verify server response
print("\n5⃣ Verifying server processed the session...")
# Check sessions endpoint
try:
headers = {'Authorization': f'Bearer {ADMIN_TOKEN}'}
response = requests.get(f"http://localhost:8000/api/traces/sessions", headers=headers)
if response.status_code == 200:
sessions_data = response.json()
uploaded_sessions = [s for s in sessions_data['sessions'] if s['session_id'] == session.session_id]
if uploaded_sessions:
uploaded_session = uploaded_sessions[0]
print(f" ✅ Session found on server!")
print(f" Events: {uploaded_session['events_count']}")
print(f" Screenshots: {uploaded_session['screenshots_count']}")
print(f" User: {uploaded_session['user']['label']}")
# Verify data integrity
assert uploaded_session['events_count'] == len(session.events)
assert uploaded_session['screenshots_count'] == len(session.screenshots)
print(f" ✅ Data integrity verified!")
else:
print(" ❌ Session not found on server")
return False
else:
print(f" ❌ Failed to query sessions: {response.status_code}")
return False
except Exception as e:
print(f" ❌ Error verifying server: {e}")
return False
print("\n🎉 All tests passed! Real functionality working correctly.")
return True
except Exception as e:
print(f"\n❌ Test failed with error: {e}")
import traceback
traceback.print_exc()
return False
def test_authentication_scenarios():
"""Test various authentication scenarios."""
print("\n🔐 Testing Authentication Scenarios")
print("=" * 40)
# Test with invalid token
print("Testing invalid token...")
headers = {'Authorization': 'Bearer invalid_token_123'}
with tempfile.NamedTemporaryFile(suffix='.zip') as tmp_file:
with zipfile.ZipFile(tmp_file.name, 'w') as zf:
zf.writestr('test.json', '{"test": "data"}')
with open(tmp_file.name, 'rb') as f:
files = {'file': ('test.zip', f, 'application/zip')}
data = {'session_id': 'test_invalid_auth'}
response = requests.post(API_URL, files=files, data=data, headers=headers)
if response.status_code == 401:
print(" ✅ Invalid token correctly rejected")
else:
print(f" ❌ Expected 401, got {response.status_code}")
return False
# Test without token
print("Testing missing token...")
with tempfile.NamedTemporaryFile(suffix='.zip') as tmp_file:
with zipfile.ZipFile(tmp_file.name, 'w') as zf:
zf.writestr('test.json', '{"test": "data"}')
with open(tmp_file.name, 'rb') as f:
files = {'file': ('test.zip', f, 'application/zip')}
data = {'session_id': 'test_no_auth'}
response = requests.post(API_URL, files=files, data=data)
if response.status_code == 401:
print(" ✅ Missing token correctly rejected")
else:
print(f" ❌ Expected 401, got {response.status_code}")
return False
print(" ✅ Authentication tests passed!")
return True
def test_server_availability():
"""Test if the server is running and accessible."""
print("🌐 Testing Server Availability")
print("=" * 30)
try:
response = requests.get("http://localhost:8000/api/traces/status", timeout=5)
if response.status_code == 200:
status_data = response.json()
print(f" ✅ Server online: {status_data['status']}")
print(f" Version: {status_data['version']}")
print(f" Encryption: {'Enabled' if status_data['encryption_enabled'] else 'Disabled'}")
return True
else:
print(f" ❌ Server returned {response.status_code}")
return False
except requests.exceptions.ConnectionError:
print(" ❌ Server not accessible. Is it running on port 8000?")
return False
except Exception as e:
print(f" ❌ Error checking server: {e}")
return False
def main():
"""Run all tests."""
print("🚀 RPA Vision V3 - Real Agent Upload Test")
print("=" * 60)
# Check server availability first
if not test_server_availability():
print("\n💡 Start the server with: ./run.sh --server")
return False
# Test authentication
if not test_authentication_scenarios():
return False
# Test real upload flow
if not test_real_upload_flow():
return False
print("\n🎉 All tests completed successfully!")
print("✅ Real functionality verified end-to-end")
return True
if __name__ == "__main__":
success = main()
sys.exit(0 if success else 1)