Files
rpa_vision_v3/test_fiche23_integration_complete.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

419 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Test d'intégration complet pour la Fiche #23 - API Security & Governance
Vérifie que tous les composants de sécurité sont correctement intégrés
dans le serveur API upload avec des tests de fonctionnalité réelle.
"""
import sys
import os
import json
import tempfile
import shutil
from pathlib import Path
from fastapi.testclient import TestClient
sys.path.insert(0, '.')
def test_security_components_integration():
"""Test l'intégration réelle des composants de sécurité."""
print("🔍 Testing security components integration...")
try:
# Import des composants
from server.api_upload import app
from core.security.api_tokens import get_token_manager, TokenRole
from core.security.ip_allowlist import get_ip_allowlist
from core.security.rate_limiter import get_rate_limiter
from core.security.audit_log import get_audit_logger
from core.system.safety_switch import get_safety_switch
# Vérifier que les composants sont initialisés
token_manager = get_token_manager()
ip_allowlist = get_ip_allowlist()
rate_limiter = get_rate_limiter()
audit_logger = get_audit_logger()
safety_switch = get_safety_switch()
print("✅ All security components initialized successfully")
# Vérifier que les middlewares sont installés
middleware_count = len(app.user_middleware)
if middleware_count == 0:
print("❌ No security middlewares installed")
return False
print(f"✅ Security middlewares installed ({middleware_count} middlewares)")
return True
except Exception as e:
print(f"❌ Security components integration failed: {e}")
return False
def test_api_endpoints_with_security():
"""Test les endpoints API avec les contrôles de sécurité réels."""
print("\n🔍 Testing API endpoints with security...")
try:
from server.api_upload import app
client = TestClient(app)
# Test 1: Endpoint public (healthz) - devrait fonctionner
response = client.get("/healthz")
if response.status_code != 200:
print(f"❌ Healthz endpoint failed: {response.status_code}")
return False
print("✅ Public endpoint (healthz) accessible")
# Test 2: Endpoint admin sans token - devrait être refusé
response = client.get("/admin/security/status")
if response.status_code not in [401, 403]:
print(f"❌ Admin endpoint should require auth, got: {response.status_code}")
return False
print("✅ Admin endpoint properly protected")
# Test 3: Headers de sécurité
response = client.get("/")
security_headers = [
"X-Content-Type-Options",
"X-Frame-Options",
"X-XSS-Protection",
"Content-Security-Policy"
]
missing_headers = []
for header in security_headers:
if header not in response.headers:
missing_headers.append(header)
if missing_headers:
print(f"❌ Missing security headers: {missing_headers}")
return False
print("✅ Security headers properly set")
return True
except Exception as e:
print(f"❌ API endpoints security test failed: {e}")
return False
def test_safety_switch_functionality():
"""Test la fonctionnalité réelle du safety switch."""
print("\n🔍 Testing safety switch functionality...")
temp_dir = None
try:
from core.system.safety_switch import (
demo_safe_enabled, kill_switch_enabled, set_kill_switch,
kill_switch_file_path, kill_switch_file_enabled
)
# Créer un répertoire temporaire pour le test
temp_dir = Path(tempfile.mkdtemp())
# Sauvegarder l'état original
original_file_path = kill_switch_file_path()
original_state = kill_switch_enabled()
# Temporairement changer le chemin du fichier
os.environ["RPA_KILL_SWITCH_FILE"] = str(temp_dir / "test_kill_switch.json")
# Test 1: État initial
initial_state = kill_switch_enabled()
print(f"✅ Initial kill switch state: {initial_state}")
# Test 2: Activation du kill switch
set_kill_switch(True, "test_activation")
if not kill_switch_enabled():
print("❌ Kill switch activation failed")
return False
print("✅ Kill switch activation works")
# Test 3: Vérifier le contenu du fichier
kill_file = Path(os.environ["RPA_KILL_SWITCH_FILE"])
if not kill_file.exists():
print("❌ Kill switch file not created")
return False
with open(kill_file, 'r') as f:
data = json.load(f)
if not data.get("enabled") or data.get("reason") != "test_activation":
print(f"❌ Kill switch file content incorrect: {data}")
return False
print("✅ Kill switch file content correct")
# Test 4: Désactivation
set_kill_switch(False, "test_deactivation")
if kill_switch_enabled():
print("❌ Kill switch deactivation failed")
return False
print("✅ Kill switch deactivation works")
# Test 5: Mode demo safe
demo_mode = demo_safe_enabled()
print(f"✅ Demo safe mode: {demo_mode}")
return True
except Exception as e:
print(f"❌ Safety switch functionality test failed: {e}")
return False
finally:
# Nettoyer
if temp_dir and temp_dir.exists():
shutil.rmtree(temp_dir)
# Restaurer l'environnement
if "RPA_KILL_SWITCH_FILE" in os.environ:
del os.environ["RPA_KILL_SWITCH_FILE"]
def test_token_management_real():
"""Test la gestion réelle des tokens."""
print("\n🔍 Testing token management...")
try:
from core.security.api_tokens import get_token_manager, TokenRole
token_manager = get_token_manager()
# Test 1: Génération de token
admin_token = token_manager.generate_token("test_admin", TokenRole.ADMIN)
if not admin_token:
print("❌ Admin token generation failed")
return False
print("✅ Admin token generated successfully")
# Test 2: Validation de token
token_info = token_manager.validate_token(admin_token)
if token_info.user_id != "test_admin" or token_info.role != TokenRole.ADMIN:
print(f"❌ Token validation failed: {token_info}")
return False
print("✅ Token validation works correctly")
# Test 3: Token read-only
readonly_token = token_manager.generate_token("test_readonly", TokenRole.READ_ONLY)
readonly_info = token_manager.validate_token(readonly_token)
if readonly_info.role != TokenRole.READ_ONLY:
print(f"❌ Read-only token validation failed: {readonly_info}")
return False
print("✅ Read-only token works correctly")
# Test 4: Token invalide
try:
token_manager.validate_token("invalid_token_12345")
print("❌ Invalid token should have failed validation")
return False
except Exception:
print("✅ Invalid token properly rejected")
return True
except Exception as e:
print(f"❌ Token management test failed: {e}")
return False
def test_rate_limiting_real():
"""Test le rate limiting réel."""
print("\n🔍 Testing rate limiting...")
try:
from core.security.rate_limiter import get_rate_limiter, RateLimitExceeded
rate_limiter = get_rate_limiter()
# Test avec une IP fictive
test_ip = "192.168.1.100"
test_path = "/api/test"
# Test 1: Première requête - devrait passer
try:
headers = rate_limiter.enforce_rate_limit(test_ip, test_path)
print("✅ First request passed rate limiting")
except RateLimitExceeded:
print("❌ First request should not be rate limited")
return False
# Test 2: Vérifier les headers
expected_headers = ["X-RateLimit-Limit", "X-RateLimit-Remaining"]
missing_headers = [h for h in expected_headers if h not in headers]
if missing_headers:
print(f"❌ Missing rate limit headers: {missing_headers}")
return False
print("✅ Rate limit headers present")
# Test 3: Statistiques
stats = rate_limiter.get_stats()
if "total_requests" not in stats or stats["total_requests"] == 0:
print(f"❌ Rate limiter stats not working: {stats}")
return False
print("✅ Rate limiter statistics working")
return True
except Exception as e:
print(f"❌ Rate limiting test failed: {e}")
return False
def test_audit_logging_real():
"""Test l'audit logging réel."""
print("\n🔍 Testing audit logging...")
try:
from core.security.audit_log import get_audit_logger
audit_logger = get_audit_logger()
# Test 1: Log d'accès API
audit_logger.log_api_access(
endpoint="/api/test",
method="GET",
ip_address="192.168.1.100",
status_code=200,
user_agent="test-client",
processing_time=0.1
)
print("✅ API access logging works")
# Test 2: Log de violation de sécurité
audit_logger.log_security_violation(
"test_violation",
"192.168.1.100",
"Test security violation"
)
print("✅ Security violation logging works")
# Test 3: Log de validation de token
audit_logger.log_token_validation(
token_hash="abc123...",
ip_address="192.168.1.100",
success=True,
user_id="test_user"
)
print("✅ Token validation logging works")
# Test 4: Récupération des logs récents
recent_logs = audit_logger.get_recent_logs(limit=10)
if len(recent_logs) == 0:
print("❌ No recent logs found")
return False
print(f"✅ Recent logs retrieved ({len(recent_logs)} entries)")
return True
except Exception as e:
print(f"❌ Audit logging test failed: {e}")
return False
def test_admin_api_real_functionality():
"""Test la fonctionnalité réelle des APIs admin."""
print("\n🔍 Testing admin API real functionality...")
try:
from server.api_upload import app
from core.security.api_tokens import get_token_manager, TokenRole
client = TestClient(app)
token_manager = get_token_manager()
# Générer un token admin pour les tests
admin_token = token_manager.generate_token("test_admin", TokenRole.ADMIN)
headers = {"Authorization": f"Bearer {admin_token}"}
# Test 1: Status de sécurité
response = client.get("/admin/security/status", headers=headers)
if response.status_code != 200:
print(f"❌ Security status endpoint failed: {response.status_code}")
return False
data = response.json()
required_fields = ["demo_safe", "killswitch", "killswitch_file"]
missing_fields = [f for f in required_fields if f not in data]
if missing_fields:
print(f"❌ Missing fields in security status: {missing_fields}")
return False
print("✅ Security status endpoint works correctly")
# Test 2: Kill switch toggle
original_state = data["killswitch"]
# Activer le kill switch
toggle_response = client.post(
"/admin/security/killswitch",
headers=headers,
json={"enabled": True, "reason": "test"}
)
if toggle_response.status_code != 200:
print(f"❌ Kill switch toggle failed: {toggle_response.status_code}")
return False
# Vérifier que l'état a changé
status_response = client.get("/admin/security/status", headers=headers)
new_data = status_response.json()
if new_data["killswitch"] == original_state:
print("❌ Kill switch state did not change")
return False
print("✅ Kill switch toggle works correctly")
# Restaurer l'état original
client.post(
"/admin/security/killswitch",
headers=headers,
json={"enabled": original_state, "reason": "restore"}
)
return True
except Exception as e:
print(f"❌ Admin API functionality test failed: {e}")
return False
def main():
"""Exécuter tous les tests."""
print("🚀 Starting Fiche #23 integration tests with real functionality...\n")
tests = [
("Security Components Integration", test_security_components_integration),
("API Endpoints Security", test_api_endpoints_with_security),
("Safety Switch Functionality", test_safety_switch_functionality),
("Token Management", test_token_management_real),
("Rate Limiting", test_rate_limiting_real),
("Audit Logging", test_audit_logging_real),
("Admin API Functionality", test_admin_api_real_functionality),
]
passed = 0
total = len(tests)
for test_name, test_func in tests:
try:
if test_func():
passed += 1
print(f"{test_name} test PASSED")
else:
print(f"{test_name} test FAILED")
except Exception as e:
print(f"{test_name} test ERROR: {e}")
import traceback
traceback.print_exc()
print(f"\n📊 Test Results: {passed}/{total} tests passed")
if passed == total:
print("🎉 All tests passed! Fiche #23 integration with real functionality is complete.")
return 0
else:
print("💥 Some tests failed. Check the output above.")
return 1
if __name__ == "__main__":
sys.exit(main())