- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
237 lines
7.5 KiB
Python
237 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Tests d'intégration pour le pipeline serveur
|
|
|
|
Teste le flux complet:
|
|
1. Upload d'une session chiffrée
|
|
2. Déchiffrement
|
|
3. Traitement par le pipeline
|
|
4. Génération des artefacts
|
|
"""
|
|
|
|
import pytest
|
|
import sys
|
|
import json
|
|
import tempfile
|
|
import zipfile
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
# Ajouter le répertoire parent au path
|
|
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
|
|
|
try:
|
|
from core.models import RawSession, Event, Screenshot, WindowContext
|
|
from server.storage_encrypted import decrypt_file
|
|
from server.processing_pipeline import ProcessingPipeline
|
|
except ImportError as e:
|
|
pytest.skip(f"Server components not available: {e}", allow_module_level=True)
|
|
|
|
# Import du module de chiffrement (copie simplifiée pour les tests)
|
|
import os
|
|
import hashlib
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import padding as crypto_padding
|
|
|
|
|
|
def encrypt_session_file(zip_path: str, password: str) -> str:
|
|
"""Chiffre un fichier ZIP (version simplifiée pour tests)."""
|
|
encrypted_path = zip_path + '.enc'
|
|
|
|
# Générer salt
|
|
salt = os.urandom(16)
|
|
|
|
# Dériver clé
|
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000, dklen=32)
|
|
|
|
# Générer IV
|
|
iv = os.urandom(16)
|
|
|
|
# Lire le fichier
|
|
with open(zip_path, 'rb') as f:
|
|
plaintext = f.read()
|
|
|
|
# Padding PKCS7
|
|
padder = crypto_padding.PKCS7(128).padder()
|
|
padded_data = padder.update(plaintext) + padder.finalize()
|
|
|
|
# Chiffrer
|
|
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend())
|
|
encryptor = cipher.encryptor()
|
|
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
|
|
|
|
# Écrire le fichier chiffré
|
|
with open(encrypted_path, 'wb') as f:
|
|
f.write(salt)
|
|
f.write(iv)
|
|
f.write(ciphertext)
|
|
|
|
return encrypted_path
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_session():
|
|
"""Crée une session de test."""
|
|
user = {"id": "test_user", "label": "Test User"}
|
|
context = {
|
|
"training_label": "test_workflow",
|
|
"customer": "Test Corp",
|
|
"notes": "Session de test"
|
|
}
|
|
environment = {
|
|
"os": "Linux",
|
|
"screen": {"primary_resolution": [1920, 1080]}
|
|
}
|
|
|
|
session = RawSession(
|
|
session_id="test_session_001",
|
|
agent_version="v0.1.0",
|
|
started_at=datetime.now(),
|
|
user=user,
|
|
context=context,
|
|
environment=environment
|
|
)
|
|
|
|
# Ajouter des événements
|
|
for i in range(3):
|
|
window = WindowContext(
|
|
app_name="TestApp",
|
|
title="Test Window"
|
|
)
|
|
event = Event(
|
|
type="click",
|
|
t=float(i),
|
|
window=window,
|
|
screenshot_id=f"screenshot_{i:04d}"
|
|
)
|
|
session.events.append(event)
|
|
|
|
# Ajouter des screenshots
|
|
for i in range(3):
|
|
screenshot = Screenshot(
|
|
screenshot_id=f"screenshot_{i:04d}",
|
|
relative_path=f"screenshots/screenshot_{i:04d}.png",
|
|
captured_at=datetime.now().isoformat()
|
|
)
|
|
session.screenshots.append(screenshot)
|
|
|
|
return session
|
|
|
|
|
|
def test_encryption_decryption_roundtrip(sample_session, tmp_path):
|
|
"""Test: Chiffrement puis déchiffrement d'une session."""
|
|
# Créer un fichier ZIP temporaire
|
|
zip_path = tmp_path / "session.zip"
|
|
|
|
with zipfile.ZipFile(zip_path, 'w') as zf:
|
|
# Ajouter le JSON
|
|
json_data = json.dumps(sample_session.to_json(), indent=2)
|
|
zf.writestr(f"{sample_session.session_id}/{sample_session.session_id}.json", json_data)
|
|
|
|
# Ajouter des screenshots factices
|
|
for screenshot in sample_session.screenshots:
|
|
zf.writestr(
|
|
f"{sample_session.session_id}/{screenshot.relative_path}",
|
|
b"fake_image_data"
|
|
)
|
|
|
|
# Chiffrer
|
|
password = "test_password_123"
|
|
encrypted_path = encrypt_session_file(str(zip_path), password)
|
|
|
|
assert Path(encrypted_path).exists()
|
|
assert Path(encrypted_path).suffix == '.enc'
|
|
|
|
# Déchiffrer
|
|
decrypted_path = decrypt_file(encrypted_path, password)
|
|
|
|
assert Path(decrypted_path).exists()
|
|
assert Path(decrypted_path).suffix == '.zip'
|
|
|
|
# Vérifier le contenu
|
|
with zipfile.ZipFile(decrypted_path, 'r') as zf:
|
|
files = zf.namelist()
|
|
assert f"{sample_session.session_id}/{sample_session.session_id}.json" in files
|
|
assert len([f for f in files if f.endswith('.png')]) == 3
|
|
|
|
|
|
def test_decryption_wrong_password(sample_session, tmp_path):
|
|
"""Test: Déchiffrement avec mauvais mot de passe échoue."""
|
|
# Créer et chiffrer
|
|
zip_path = tmp_path / "session.zip"
|
|
with zipfile.ZipFile(zip_path, 'w') as zf:
|
|
zf.writestr("test.txt", "test data")
|
|
|
|
encrypted_path = encrypt_session_file(str(zip_path), "correct_password")
|
|
|
|
# Tenter de déchiffrer avec mauvais mot de passe
|
|
with pytest.raises(ValueError, match="(mot de passe incorrect|padding|corrompu)"):
|
|
decrypt_file(encrypted_path, "wrong_password")
|
|
|
|
|
|
def test_processing_pipeline_basic(sample_session, tmp_path):
|
|
"""Test: Pipeline de traitement basique."""
|
|
# Créer une structure de session sur disque
|
|
session_dir = tmp_path / "sessions" / sample_session.session_id
|
|
session_dir.mkdir(parents=True)
|
|
|
|
# Sauvegarder le JSON
|
|
json_path = session_dir / sample_session.session_id / f"{sample_session.session_id}.json"
|
|
json_path.parent.mkdir(parents=True)
|
|
|
|
with open(json_path, 'w') as f:
|
|
f.write(json.dumps(sample_session.to_json(), indent=2))
|
|
|
|
# Créer des screenshots factices
|
|
for screenshot in sample_session.screenshots:
|
|
screenshot_path = session_dir / sample_session.session_id / screenshot.relative_path
|
|
screenshot_path.parent.mkdir(parents=True, exist_ok=True)
|
|
screenshot_path.write_bytes(b"fake_image_data")
|
|
|
|
# Initialiser le pipeline
|
|
pipeline = ProcessingPipeline(base_path=str(tmp_path))
|
|
|
|
# Traiter la session
|
|
stats = pipeline.process_session(sample_session.session_id)
|
|
|
|
# Vérifier les statistiques
|
|
assert stats['status'] == 'success'
|
|
assert stats['session_id'] == sample_session.session_id
|
|
assert stats['screen_states_created'] == 3 # 3 événements avec screenshots
|
|
|
|
# Note: embeddings et UI detection peuvent échouer si modèles non disponibles
|
|
# On vérifie juste qu'il n'y a pas d'erreur fatale
|
|
|
|
|
|
def test_processing_pipeline_missing_session(tmp_path):
|
|
"""Test: Pipeline avec session inexistante."""
|
|
pipeline = ProcessingPipeline(base_path=str(tmp_path))
|
|
|
|
stats = pipeline.process_session("nonexistent_session")
|
|
|
|
assert stats['status'] == 'error'
|
|
assert len(stats['errors']) > 0
|
|
|
|
|
|
def test_processing_pipeline_corrupted_json(tmp_path):
|
|
"""Test: Pipeline avec JSON corrompu."""
|
|
# Créer une session avec JSON invalide
|
|
session_dir = tmp_path / "sessions" / "corrupted_session"
|
|
session_dir.mkdir(parents=True)
|
|
|
|
json_path = session_dir / "corrupted_session" / "corrupted_session.json"
|
|
json_path.parent.mkdir(parents=True)
|
|
json_path.write_text("{ invalid json }")
|
|
|
|
pipeline = ProcessingPipeline(base_path=str(tmp_path))
|
|
|
|
stats = pipeline.process_session("corrupted_session")
|
|
|
|
assert stats['status'] == 'error'
|
|
assert len(stats['errors']) > 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"])
|