#!/usr/bin/env python3 """ Test d'intégration complet pour la Fiche #23 - API Security & Governance Vérifie que tous les composants de sécurité sont correctement intégrés dans le serveur API upload avec des tests de fonctionnalité réelle. """ import sys import os import json import tempfile import shutil from pathlib import Path from fastapi.testclient import TestClient sys.path.insert(0, '.') def test_security_components_integration(): """Test l'intégration réelle des composants de sécurité.""" print("🔍 Testing security components integration...") try: # Import des composants from server.api_upload import app from core.security.api_tokens import get_token_manager, TokenRole from core.security.ip_allowlist import get_ip_allowlist from core.security.rate_limiter import get_rate_limiter from core.security.audit_log import get_audit_logger from core.system.safety_switch import get_safety_switch # Vérifier que les composants sont initialisés token_manager = get_token_manager() ip_allowlist = get_ip_allowlist() rate_limiter = get_rate_limiter() audit_logger = get_audit_logger() safety_switch = get_safety_switch() print("✅ All security components initialized successfully") # Vérifier que les middlewares sont installés middleware_count = len(app.user_middleware) if middleware_count == 0: print("❌ No security middlewares installed") return False print(f"✅ Security middlewares installed ({middleware_count} middlewares)") return True except Exception as e: print(f"❌ Security components integration failed: {e}") return False def test_api_endpoints_with_security(): """Test les endpoints API avec les contrôles de sécurité réels.""" print("\n🔍 Testing API endpoints with security...") try: from server.api_upload import app client = TestClient(app) # Test 1: Endpoint public (healthz) - devrait fonctionner response = client.get("/healthz") if response.status_code != 200: print(f"❌ Healthz endpoint failed: {response.status_code}") return False print("✅ Public endpoint (healthz) accessible") # Test 2: Endpoint admin sans token - devrait être refusé response = client.get("/admin/security/status") if response.status_code not in [401, 403]: print(f"❌ Admin endpoint should require auth, got: {response.status_code}") return False print("✅ Admin endpoint properly protected") # Test 3: Headers de sécurité response = client.get("/") security_headers = [ "X-Content-Type-Options", "X-Frame-Options", "X-XSS-Protection", "Content-Security-Policy" ] missing_headers = [] for header in security_headers: if header not in response.headers: missing_headers.append(header) if missing_headers: print(f"❌ Missing security headers: {missing_headers}") return False print("✅ Security headers properly set") return True except Exception as e: print(f"❌ API endpoints security test failed: {e}") return False def test_safety_switch_functionality(): """Test la fonctionnalité réelle du safety switch.""" print("\n🔍 Testing safety switch functionality...") temp_dir = None try: from core.system.safety_switch import ( demo_safe_enabled, kill_switch_enabled, set_kill_switch, kill_switch_file_path, kill_switch_file_enabled ) # Créer un répertoire temporaire pour le test temp_dir = Path(tempfile.mkdtemp()) # Sauvegarder l'état original original_file_path = kill_switch_file_path() original_state = kill_switch_enabled() # Temporairement changer le chemin du fichier os.environ["RPA_KILL_SWITCH_FILE"] = str(temp_dir / "test_kill_switch.json") # Test 1: État initial initial_state = kill_switch_enabled() print(f"✅ Initial kill switch state: {initial_state}") # Test 2: Activation du kill switch set_kill_switch(True, "test_activation") if not kill_switch_enabled(): print("❌ Kill switch activation failed") return False print("✅ Kill switch activation works") # Test 3: Vérifier le contenu du fichier kill_file = Path(os.environ["RPA_KILL_SWITCH_FILE"]) if not kill_file.exists(): print("❌ Kill switch file not created") return False with open(kill_file, 'r') as f: data = json.load(f) if not data.get("enabled") or data.get("reason") != "test_activation": print(f"❌ Kill switch file content incorrect: {data}") return False print("✅ Kill switch file content correct") # Test 4: Désactivation set_kill_switch(False, "test_deactivation") if kill_switch_enabled(): print("❌ Kill switch deactivation failed") return False print("✅ Kill switch deactivation works") # Test 5: Mode demo safe demo_mode = demo_safe_enabled() print(f"✅ Demo safe mode: {demo_mode}") return True except Exception as e: print(f"❌ Safety switch functionality test failed: {e}") return False finally: # Nettoyer if temp_dir and temp_dir.exists(): shutil.rmtree(temp_dir) # Restaurer l'environnement if "RPA_KILL_SWITCH_FILE" in os.environ: del os.environ["RPA_KILL_SWITCH_FILE"] def test_token_management_real(): """Test la gestion réelle des tokens.""" print("\n🔍 Testing token management...") try: from core.security.api_tokens import get_token_manager, TokenRole token_manager = get_token_manager() # Test 1: Génération de token admin_token = token_manager.generate_token("test_admin", TokenRole.ADMIN) if not admin_token: print("❌ Admin token generation failed") return False print("✅ Admin token generated successfully") # Test 2: Validation de token token_info = token_manager.validate_token(admin_token) if token_info.user_id != "test_admin" or token_info.role != TokenRole.ADMIN: print(f"❌ Token validation failed: {token_info}") return False print("✅ Token validation works correctly") # Test 3: Token read-only readonly_token = token_manager.generate_token("test_readonly", TokenRole.READ_ONLY) readonly_info = token_manager.validate_token(readonly_token) if readonly_info.role != TokenRole.READ_ONLY: print(f"❌ Read-only token validation failed: {readonly_info}") return False print("✅ Read-only token works correctly") # Test 4: Token invalide try: token_manager.validate_token("invalid_token_12345") print("❌ Invalid token should have failed validation") return False except Exception: print("✅ Invalid token properly rejected") return True except Exception as e: print(f"❌ Token management test failed: {e}") return False def test_rate_limiting_real(): """Test le rate limiting réel.""" print("\n🔍 Testing rate limiting...") try: from core.security.rate_limiter import get_rate_limiter, RateLimitExceeded rate_limiter = get_rate_limiter() # Test avec une IP fictive test_ip = "192.168.1.100" test_path = "/api/test" # Test 1: Première requête - devrait passer try: headers = rate_limiter.enforce_rate_limit(test_ip, test_path) print("✅ First request passed rate limiting") except RateLimitExceeded: print("❌ First request should not be rate limited") return False # Test 2: Vérifier les headers expected_headers = ["X-RateLimit-Limit", "X-RateLimit-Remaining"] missing_headers = [h for h in expected_headers if h not in headers] if missing_headers: print(f"❌ Missing rate limit headers: {missing_headers}") return False print("✅ Rate limit headers present") # Test 3: Statistiques stats = rate_limiter.get_stats() if "total_requests" not in stats or stats["total_requests"] == 0: print(f"❌ Rate limiter stats not working: {stats}") return False print("✅ Rate limiter statistics working") return True except Exception as e: print(f"❌ Rate limiting test failed: {e}") return False def test_audit_logging_real(): """Test l'audit logging réel.""" print("\n🔍 Testing audit logging...") try: from core.security.audit_log import get_audit_logger audit_logger = get_audit_logger() # Test 1: Log d'accès API audit_logger.log_api_access( endpoint="/api/test", method="GET", ip_address="192.168.1.100", status_code=200, user_agent="test-client", processing_time=0.1 ) print("✅ API access logging works") # Test 2: Log de violation de sécurité audit_logger.log_security_violation( "test_violation", "192.168.1.100", "Test security violation" ) print("✅ Security violation logging works") # Test 3: Log de validation de token audit_logger.log_token_validation( token_hash="abc123...", ip_address="192.168.1.100", success=True, user_id="test_user" ) print("✅ Token validation logging works") # Test 4: Récupération des logs récents recent_logs = audit_logger.get_recent_logs(limit=10) if len(recent_logs) == 0: print("❌ No recent logs found") return False print(f"✅ Recent logs retrieved ({len(recent_logs)} entries)") return True except Exception as e: print(f"❌ Audit logging test failed: {e}") return False def test_admin_api_real_functionality(): """Test la fonctionnalité réelle des APIs admin.""" print("\n🔍 Testing admin API real functionality...") try: from server.api_upload import app from core.security.api_tokens import get_token_manager, TokenRole client = TestClient(app) token_manager = get_token_manager() # Générer un token admin pour les tests admin_token = token_manager.generate_token("test_admin", TokenRole.ADMIN) headers = {"Authorization": f"Bearer {admin_token}"} # Test 1: Status de sécurité response = client.get("/admin/security/status", headers=headers) if response.status_code != 200: print(f"❌ Security status endpoint failed: {response.status_code}") return False data = response.json() required_fields = ["demo_safe", "killswitch", "killswitch_file"] missing_fields = [f for f in required_fields if f not in data] if missing_fields: print(f"❌ Missing fields in security status: {missing_fields}") return False print("✅ Security status endpoint works correctly") # Test 2: Kill switch toggle original_state = data["killswitch"] # Activer le kill switch toggle_response = client.post( "/admin/security/killswitch", headers=headers, json={"enabled": True, "reason": "test"} ) if toggle_response.status_code != 200: print(f"❌ Kill switch toggle failed: {toggle_response.status_code}") return False # Vérifier que l'état a changé status_response = client.get("/admin/security/status", headers=headers) new_data = status_response.json() if new_data["killswitch"] == original_state: print("❌ Kill switch state did not change") return False print("✅ Kill switch toggle works correctly") # Restaurer l'état original client.post( "/admin/security/killswitch", headers=headers, json={"enabled": original_state, "reason": "restore"} ) return True except Exception as e: print(f"❌ Admin API functionality test failed: {e}") return False def main(): """Exécuter tous les tests.""" print("🚀 Starting Fiche #23 integration tests with real functionality...\n") tests = [ ("Security Components Integration", test_security_components_integration), ("API Endpoints Security", test_api_endpoints_with_security), ("Safety Switch Functionality", test_safety_switch_functionality), ("Token Management", test_token_management_real), ("Rate Limiting", test_rate_limiting_real), ("Audit Logging", test_audit_logging_real), ("Admin API Functionality", test_admin_api_real_functionality), ] passed = 0 total = len(tests) for test_name, test_func in tests: try: if test_func(): passed += 1 print(f"✅ {test_name} test PASSED") else: print(f"❌ {test_name} test FAILED") except Exception as e: print(f"❌ {test_name} test ERROR: {e}") import traceback traceback.print_exc() print(f"\n📊 Test Results: {passed}/{total} tests passed") if passed == total: print("🎉 All tests passed! Fiche #23 integration with real functionality is complete.") return 0 else: print("💥 Some tests failed. Check the output above.") return 1 if __name__ == "__main__": sys.exit(main())