Files
rpa_vision_v3/test_encryption_key_sync.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

389 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Test de synchronisation des clés de chiffrement entre agent v0 et serveur.
Ce test vérifie que:
1. L'agent v0 peut chiffrer avec la clé configurée
2. Le serveur peut déchiffrer avec la même clé
3. Le cycle complet fonctionne avec de vraies données
4. La compatibilité des formats de fichiers
5. L'intégrité des données après chiffrement/déchiffrement
Test de fonctionnalité réelle - utilise les vrais composants du système.
"""
import os
import sys
import tempfile
import json
import zipfile
import hashlib
from pathlib import Path
from datetime import datetime
# Ajouter les chemins nécessaires
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent / "agent_v0"))
def load_real_environment_config():
"""Charger la vraie configuration d'environnement du système."""
config = {}
# Charger .env.local (configuration serveur)
env_local_path = Path(".env.local")
if env_local_path.exists():
with open(env_local_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
config[f"server_{key.strip()}"] = value.strip()
# Charger agent_config.json (configuration agent)
agent_config_path = Path("agent_config.json")
if agent_config_path.exists():
with open(agent_config_path, 'r') as f:
agent_config = json.load(f)
for key, value in agent_config.items():
config[f"agent_{key}"] = value
return config
def create_realistic_test_session():
"""Créer une session de test avec des données réalistes."""
from agent_v0.raw_session import RawSession
# Créer une session avec des données qui ressemblent à une vraie session
session = RawSession.create(
user_id="test_encryption_sync",
user_label="Test User",
customer="Test Company",
training_label="Encryption Test Workflow",
notes="Test session for encryption validation",
platform="linux",
hostname="test_host",
screen_resolution=[1920, 1080]
)
# Ajouter des événements réalistes avec les vraies méthodes de l'API
session.add_mouse_click_event(
button="left",
pos=[450, 320],
window_title="Test Application",
app_name="test_app",
screenshot_id="shot_0001"
)
session.add_key_combo_event(
keys=["CTRL", "C"],
window_title="Test Application",
app_name="test_app",
screenshot_id="shot_0002"
)
session.add_hover_idle_event(
pos=[500, 400],
idle_ms=1200,
window_title="Test Application",
app_name="test_app",
screenshot_id="shot_0003"
)
session.add_scroll_event(
pos=[600, 300],
delta=[0, -3],
window_title="Test Application",
app_name="test_app",
screenshot_id="shot_0004"
)
# Ajouter des screenshots réalistes
session.add_screenshot("shot_0001", "shots/shot_0001.png")
session.add_screenshot("shot_0002", "shots/shot_0002.png")
session.add_screenshot("shot_0003", "shots/shot_0003.png")
session.add_screenshot("shot_0004", "shots/shot_0004.png")
# Fermer la session
session.close()
return session
def verify_zip_integrity(zip_path, expected_files=None):
"""Vérifier l'intégrité d'un fichier ZIP avec de vraies vérifications."""
try:
with zipfile.ZipFile(zip_path, 'r') as zf:
# Test d'intégrité du ZIP
bad_files = zf.testzip()
if bad_files:
return False, f"Fichiers corrompus: {bad_files}"
files = zf.namelist()
# Vérifier la présence des fichiers attendus
if expected_files:
for expected_file in expected_files:
if not any(f.endswith(expected_file) for f in files):
return False, f"Fichier manquant: {expected_file}"
# Vérifier qu'il y a au moins un JSON de session
json_files = [f for f in files if f.endswith('.json')]
if not json_files:
return False, "Aucun fichier JSON de session trouvé"
# Vérifier le contenu du JSON principal
main_json = json_files[0]
with zf.open(main_json) as json_file:
session_data = json.load(json_file)
# Vérifications de structure réaliste
required_fields = ['schema_version', 'session_id', 'events', 'started_at']
for field in required_fields:
if field not in session_data:
return False, f"Champ manquant dans session: {field}"
# Vérifier que les événements ont une structure cohérente
if 'events' in session_data and session_data['events']:
for event in session_data['events']:
if 'type' not in event or 't' not in event:
return False, "Structure d'événement invalide"
return True, f"ZIP valide avec {len(files)} fichiers"
except Exception as e:
return False, f"Erreur lors de la vérification: {e}"
def calculate_file_hash(file_path):
"""Calculer le hash SHA256 d'un fichier pour vérifier l'intégrité."""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
def test_encryption_key_sync():
"""Test complet de synchronisation des clés de chiffrement avec vraies données."""
print("=== Test de synchronisation des clés de chiffrement (Real Functionality) ===")
# 1. Charger les vraies configurations du système
print("\n1. Chargement des configurations réelles...")
config = load_real_environment_config()
server_encryption_password = config.get("server_ENCRYPTION_PASSWORD")
agent_encryption_password = config.get("agent_encryption_password")
print(f" Serveur ENCRYPTION_PASSWORD: {'✓ Défini' if server_encryption_password else '✗ Non défini'}")
print(f" Agent encryption_password: {'✓ Défini' if agent_encryption_password else '✗ Non défini'}")
if server_encryption_password:
print(f" Serveur (preview): {server_encryption_password[:8]}...")
if agent_encryption_password:
print(f" Agent (preview): {agent_encryption_password[:8]}...")
# 2. Vérifier que les clés correspondent
print("\n2. Vérification de la correspondance des clés...")
if not server_encryption_password:
print("❌ Clé serveur non définie dans .env.local")
return False
if not agent_encryption_password:
print("❌ Clé agent non définie dans agent_config.json")
return False
if server_encryption_password != agent_encryption_password:
print("❌ Les clés ne correspondent pas!")
print(f" Serveur: {server_encryption_password}")
print(f" Agent: {agent_encryption_password}")
return False
print("✅ Les clés correspondent parfaitement")
# 3. Test avec de vraies données et vrais composants
print("\n3. Test du cycle complet avec données réalistes...")
try:
# Importer les vrais modules (pas de mocks)
from agent_v0.storage_encrypted import create_session_zip_encrypted, decrypt_session_file
# Créer une session réaliste
test_session = create_realistic_test_session()
print(f" Session créée: {test_session.session_id}")
print(f" Événements: {len(test_session.events)}")
print(f" Screenshots: {len(test_session.screenshots)}")
with tempfile.TemporaryDirectory() as tmpdir:
tmpdir_path = Path(tmpdir)
# Sauvegarder la session avec de vrais fichiers
session_dir = tmpdir_path / test_session.session_id
test_session.save_json(str(tmpdir_path))
# Créer de vrais fichiers de screenshots (même si vides pour le test)
shots_dir = session_dir / "shots"
shots_dir.mkdir(parents=True, exist_ok=True)
for screenshot in test_session.screenshots:
shot_path = shots_dir / screenshot.relative_path.split("/")[-1]
# Créer un fichier PNG minimal mais valide
shot_path.write_bytes(b'\x89PNG\r\n\x1a\n' + b'\x00' * 100)
print(f" Fichiers créés dans: {session_dir}")
# Calculer hash avant chiffrement
original_files = list(session_dir.rglob("*"))
original_hashes = {}
for file_path in original_files:
if file_path.is_file():
original_hashes[file_path.name] = calculate_file_hash(file_path)
# Chiffrer avec le vrai système de chiffrement
print(" Chiffrement en cours...")
encrypted_path = create_session_zip_encrypted(
test_session,
agent_encryption_password,
str(tmpdir_path),
delete_unencrypted=False
)
print(f" Fichier chiffré: {encrypted_path}")
# Vérifications sur le fichier chiffré
if not os.path.exists(encrypted_path):
print("❌ Fichier chiffré non créé")
return False
encrypted_size = os.path.getsize(encrypted_path)
print(f" Taille chiffrée: {encrypted_size} bytes")
# Le fichier chiffré doit être plus grand que juste les métadonnées
if encrypted_size < 100: # Au minimum salt + iv + données + padding
print("❌ Fichier chiffré suspicieusement petit")
return False
# Déchiffrer avec le vrai système serveur
print(" Déchiffrement en cours...")
decrypted_path = decrypt_session_file(
encrypted_path,
server_encryption_password,
str(tmpdir_path / "test_decrypted.zip")
)
print(f" Fichier déchiffré: {decrypted_path}")
# Vérifications approfondies du fichier déchiffré
if not os.path.exists(decrypted_path):
print("❌ Fichier déchiffré non créé")
return False
# Vérifier l'intégrité du ZIP déchiffré
is_valid, message = verify_zip_integrity(
decrypted_path,
expected_files=['.json', '.png']
)
if not is_valid:
print(f"❌ ZIP déchiffré invalide: {message}")
return False
print(f"{message}")
# Extraire et vérifier le contenu
extract_dir = tmpdir_path / "extracted"
extract_dir.mkdir()
with zipfile.ZipFile(decrypted_path, 'r') as zf:
zf.extractall(extract_dir)
# Vérifier que les données sont identiques
extracted_files = list(extract_dir.rglob("*"))
print(f" Fichiers extraits: {len([f for f in extracted_files if f.is_file()])}")
# Vérifier l'intégrité des données par comparaison de hash
integrity_check_passed = True
for file_path in extracted_files:
if file_path.is_file() and file_path.name in original_hashes:
extracted_hash = calculate_file_hash(file_path)
if extracted_hash != original_hashes[file_path.name]:
print(f"❌ Intégrité compromise pour {file_path.name}")
integrity_check_passed = False
if integrity_check_passed:
print(" ✅ Intégrité des données vérifiée")
else:
print("❌ Échec de vérification d'intégrité")
return False
# Test de performance (optionnel)
file_size_kb = encrypted_size / 1024
if file_size_kb > 0:
print(f" Performance: {file_size_kb:.1f} KB chiffrés")
print("✅ Cycle complet chiffrement/déchiffrement réussi")
except ImportError as e:
print(f"❌ Modules de chiffrement non disponibles: {e}")
print(" Vérifiez que agent_v0/storage_encrypted.py est accessible")
return False
except Exception as e:
print(f"❌ Erreur lors du test de chiffrement: {e}")
import traceback
traceback.print_exc()
return False
# 4. Test de robustesse avec différents types de données
print("\n4. Test de robustesse...")
try:
# Test avec des caractères spéciaux dans les données
special_session = create_realistic_test_session()
special_session.context["notes"] = "Test avec caractères spéciaux: éàü 中文 🚀"
special_session.user["label"] = "Utilisateur Tëst"
with tempfile.TemporaryDirectory() as tmpdir:
special_session.save_json(tmpdir)
encrypted_special = create_session_zip_encrypted(
special_session,
agent_encryption_password,
tmpdir,
delete_unencrypted=True
)
decrypted_special = decrypt_session_file(
encrypted_special,
server_encryption_password,
os.path.join(tmpdir, "special_decrypted.zip")
)
# Vérifier que les caractères spéciaux sont préservés
with zipfile.ZipFile(decrypted_special, 'r') as zf:
json_files = [f for f in zf.namelist() if f.endswith('.json')]
if json_files:
with zf.open(json_files[0]) as json_file:
restored_data = json.load(json_file)
if restored_data.get("context", {}).get("notes") != special_session.context["notes"]:
print("❌ Caractères spéciaux non préservés")
return False
print(" ✅ Caractères spéciaux préservés")
except Exception as e:
print(f"⚠️ Test de robustesse échoué: {e}")
# Ne pas faire échouer le test principal pour ça
print("\n=== Test terminé avec succès ===")
print("✅ Synchronisation des clés validée")
print("✅ Chiffrement/déchiffrement fonctionnel")
print("✅ Intégrité des données vérifiée")
print("✅ Compatibilité agent ↔ serveur confirmée")
return True
if __name__ == "__main__":
success = test_encryption_key_sync()
sys.exit(0 if success else 1)