Files
rpa_vision_v3/tests/integration/test_server_pipeline.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

237 lines
7.5 KiB
Python

#!/usr/bin/env python3
"""
Tests d'intégration pour le pipeline serveur
Teste le flux complet:
1. Upload d'une session chiffrée
2. Déchiffrement
3. Traitement par le pipeline
4. Génération des artefacts
"""
import pytest
import sys
import json
import tempfile
import zipfile
from pathlib import Path
from datetime import datetime
# Ajouter le répertoire parent au path
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
try:
from core.models import RawSession, Event, Screenshot, WindowContext
from server.storage_encrypted import decrypt_file
from server.processing_pipeline import ProcessingPipeline
except ImportError as e:
pytest.skip(f"Server components not available: {e}", allow_module_level=True)
# Import du module de chiffrement (copie simplifiée pour les tests)
import os
import hashlib
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import padding as crypto_padding
def encrypt_session_file(zip_path: str, password: str) -> str:
"""Chiffre un fichier ZIP (version simplifiée pour tests)."""
encrypted_path = zip_path + '.enc'
# Générer salt
salt = os.urandom(16)
# Dériver clé
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000, dklen=32)
# Générer IV
iv = os.urandom(16)
# Lire le fichier
with open(zip_path, 'rb') as f:
plaintext = f.read()
# Padding PKCS7
padder = crypto_padding.PKCS7(128).padder()
padded_data = padder.update(plaintext) + padder.finalize()
# Chiffrer
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# Écrire le fichier chiffré
with open(encrypted_path, 'wb') as f:
f.write(salt)
f.write(iv)
f.write(ciphertext)
return encrypted_path
@pytest.fixture
def sample_session():
"""Crée une session de test."""
user = {"id": "test_user", "label": "Test User"}
context = {
"training_label": "test_workflow",
"customer": "Test Corp",
"notes": "Session de test"
}
environment = {
"os": "Linux",
"screen": {"primary_resolution": [1920, 1080]}
}
session = RawSession(
session_id="test_session_001",
agent_version="v0.1.0",
started_at=datetime.now(),
user=user,
context=context,
environment=environment
)
# Ajouter des événements
for i in range(3):
window = WindowContext(
app_name="TestApp",
title="Test Window"
)
event = Event(
type="click",
t=float(i),
window=window,
screenshot_id=f"screenshot_{i:04d}"
)
session.events.append(event)
# Ajouter des screenshots
for i in range(3):
screenshot = Screenshot(
screenshot_id=f"screenshot_{i:04d}",
relative_path=f"screenshots/screenshot_{i:04d}.png",
captured_at=datetime.now().isoformat()
)
session.screenshots.append(screenshot)
return session
def test_encryption_decryption_roundtrip(sample_session, tmp_path):
"""Test: Chiffrement puis déchiffrement d'une session."""
# Créer un fichier ZIP temporaire
zip_path = tmp_path / "session.zip"
with zipfile.ZipFile(zip_path, 'w') as zf:
# Ajouter le JSON
json_data = json.dumps(sample_session.to_json(), indent=2)
zf.writestr(f"{sample_session.session_id}/{sample_session.session_id}.json", json_data)
# Ajouter des screenshots factices
for screenshot in sample_session.screenshots:
zf.writestr(
f"{sample_session.session_id}/{screenshot.relative_path}",
b"fake_image_data"
)
# Chiffrer
password = "test_password_123"
encrypted_path = encrypt_session_file(str(zip_path), password)
assert Path(encrypted_path).exists()
assert Path(encrypted_path).suffix == '.enc'
# Déchiffrer
decrypted_path = decrypt_file(encrypted_path, password)
assert Path(decrypted_path).exists()
assert Path(decrypted_path).suffix == '.zip'
# Vérifier le contenu
with zipfile.ZipFile(decrypted_path, 'r') as zf:
files = zf.namelist()
assert f"{sample_session.session_id}/{sample_session.session_id}.json" in files
assert len([f for f in files if f.endswith('.png')]) == 3
def test_decryption_wrong_password(sample_session, tmp_path):
"""Test: Déchiffrement avec mauvais mot de passe échoue."""
# Créer et chiffrer
zip_path = tmp_path / "session.zip"
with zipfile.ZipFile(zip_path, 'w') as zf:
zf.writestr("test.txt", "test data")
encrypted_path = encrypt_session_file(str(zip_path), "correct_password")
# Tenter de déchiffrer avec mauvais mot de passe
with pytest.raises(ValueError, match="(mot de passe incorrect|padding|corrompu)"):
decrypt_file(encrypted_path, "wrong_password")
def test_processing_pipeline_basic(sample_session, tmp_path):
"""Test: Pipeline de traitement basique."""
# Créer une structure de session sur disque
session_dir = tmp_path / "sessions" / sample_session.session_id
session_dir.mkdir(parents=True)
# Sauvegarder le JSON
json_path = session_dir / sample_session.session_id / f"{sample_session.session_id}.json"
json_path.parent.mkdir(parents=True)
with open(json_path, 'w') as f:
f.write(json.dumps(sample_session.to_json(), indent=2))
# Créer des screenshots factices
for screenshot in sample_session.screenshots:
screenshot_path = session_dir / sample_session.session_id / screenshot.relative_path
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
screenshot_path.write_bytes(b"fake_image_data")
# Initialiser le pipeline
pipeline = ProcessingPipeline(base_path=str(tmp_path))
# Traiter la session
stats = pipeline.process_session(sample_session.session_id)
# Vérifier les statistiques
assert stats['status'] == 'success'
assert stats['session_id'] == sample_session.session_id
assert stats['screen_states_created'] == 3 # 3 événements avec screenshots
# Note: embeddings et UI detection peuvent échouer si modèles non disponibles
# On vérifie juste qu'il n'y a pas d'erreur fatale
def test_processing_pipeline_missing_session(tmp_path):
"""Test: Pipeline avec session inexistante."""
pipeline = ProcessingPipeline(base_path=str(tmp_path))
stats = pipeline.process_session("nonexistent_session")
assert stats['status'] == 'error'
assert len(stats['errors']) > 0
def test_processing_pipeline_corrupted_json(tmp_path):
"""Test: Pipeline avec JSON corrompu."""
# Créer une session avec JSON invalide
session_dir = tmp_path / "sessions" / "corrupted_session"
session_dir.mkdir(parents=True)
json_path = session_dir / "corrupted_session" / "corrupted_session.json"
json_path.parent.mkdir(parents=True)
json_path.write_text("{ invalid json }")
pipeline = ProcessingPipeline(base_path=str(tmp_path))
stats = pipeline.process_session("corrupted_session")
assert stats['status'] == 'error'
assert len(stats['errors']) > 0
if __name__ == "__main__":
pytest.main([__file__, "-v"])