""" Audit Logging System Système de logging d'audit en format JSONL pour traçabilité sécurisée. Fiche #23: API Security & Governance """ import os import json import logging import hashlib from datetime import datetime from typing import Dict, Any, Optional from dataclasses import dataclass, asdict from pathlib import Path from enum import Enum from ..system.safety_switch import get_safety_switch logger = logging.getLogger(__name__) class AuditEventType(Enum): """Types d'événements d'audit.""" AUTHENTICATION = "authentication" AUTHORIZATION = "authorization" API_ACCESS = "api_access" SECURITY_VIOLATION = "security_violation" RATE_LIMIT_EXCEEDED = "rate_limit_exceeded" IP_BLOCKED = "ip_blocked" TOKEN_VALIDATION = "token_validation" SYSTEM_EVENT = "system_event" ERROR = "error" @dataclass class AuditEvent: """Événement d'audit structuré.""" event_type: AuditEventType timestamp: str message: str user_id: Optional[str] = None ip_address: Optional[str] = None endpoint: Optional[str] = None method: Optional[str] = None user_agent: Optional[str] = None token_hash: Optional[str] = None success: bool = True error_code: Optional[str] = None metadata: Dict[str, Any] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class AuditLogger: """ Logger d'audit avec format JSONL et rotation automatique. Fonctionnalités: - Format JSONL pour parsing facile - Rotation automatique des logs - Hachage des données sensibles - Métadonnées contextuelles """ def __init__(self): self._safety = get_safety_switch() self._load_config() self._setup_logging() @classmethod def from_env(cls): """Crée une instance depuis les variables d'environnement (compatibilité FastAPI).""" return cls() def _load_config(self): """Charge la configuration depuis les variables d'environnement.""" self.log_dir = Path(os.getenv("AUDIT_LOG_DIR", "logs/audit")) self.log_file = self.log_dir / "audit.jsonl" self.max_file_size = int(os.getenv("AUDIT_LOG_MAX_SIZE", "10485760")) # 10MB self.max_files = int(os.getenv("AUDIT_LOG_MAX_FILES", "10")) self.hash_sensitive_data = os.getenv("AUDIT_HASH_SENSITIVE", "true").lower() == "true" # Créer le répertoire de logs self.log_dir.mkdir(parents=True, exist_ok=True) logger.info(f"AuditLogger initialized: {self.log_file}") def _setup_logging(self): """Configure le logger Python pour l'audit.""" self.audit_logger = logging.getLogger("audit") self.audit_logger.setLevel(logging.INFO) # Éviter la duplication si déjà configuré if not self.audit_logger.handlers: handler = logging.FileHandler(self.log_file, encoding='utf-8') handler.setLevel(logging.INFO) # Format simple car on écrit du JSON formatter = logging.Formatter('%(message)s') handler.setFormatter(formatter) self.audit_logger.addHandler(handler) self.audit_logger.propagate = False def _hash_sensitive_value(self, value: str) -> str: """Hash une valeur sensible pour le logging.""" if not self.hash_sensitive_data: return value return hashlib.sha256(value.encode()).hexdigest()[:16] def _rotate_logs_if_needed(self): """Effectue la rotation des logs si nécessaire.""" if not self.log_file.exists(): return if self.log_file.stat().st_size > self.max_file_size: # Rotation des fichiers existants for i in range(self.max_files - 1, 0, -1): old_file = self.log_dir / f"audit.jsonl.{i}" new_file = self.log_dir / f"audit.jsonl.{i + 1}" if old_file.exists(): if new_file.exists(): new_file.unlink() old_file.rename(new_file) # Renommer le fichier actuel rotated_file = self.log_dir / "audit.jsonl.1" if rotated_file.exists(): rotated_file.unlink() self.log_file.rename(rotated_file) # Reconfigurer le handler for handler in self.audit_logger.handlers: handler.close() self.audit_logger.handlers.clear() self._setup_logging() logger.info("Audit log rotated") def log_event(self, event: AuditEvent): """ Enregistre un événement d'audit. Args: event: Événement à enregistrer """ if not self._safety.is_feature_enabled("audit_logging"): return try: # Rotation si nécessaire self._rotate_logs_if_needed() # Préparer les données pour JSON event_dict = asdict(event) event_dict["event_type"] = event.event_type.value # Hasher les données sensibles if event.token_hash and self.hash_sensitive_data: event_dict["token_hash"] = self._hash_sensitive_value(event.token_hash) if event.ip_address and self.hash_sensitive_data: # Pour les IPs, on peut garder les 3 premiers octets ip_parts = event.ip_address.split(".") if len(ip_parts) == 4: event_dict["ip_address"] = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.xxx" else: event_dict["ip_address"] = self._hash_sensitive_value(event.ip_address) # Écrire en JSONL json_line = json.dumps(event_dict, ensure_ascii=False, separators=(',', ':')) self.audit_logger.info(json_line) except Exception as e: logger.error(f"Failed to log audit event: {e}") def write(self, event_data: Dict[str, Any]): """ Écrit un événement d'audit (compatibilité FastAPI). Args: event_data: Données de l'événement """ try: # Convertir les données en AuditEvent event_type_str = event_data.get("event", "system_event") # Mapper les types d'événements event_type_map = { "ip_block": AuditEventType.IP_BLOCKED, "auth_fail": AuditEventType.AUTHENTICATION, "killswitch_block": AuditEventType.SECURITY_VIOLATION, "demo_safe_block": AuditEventType.SECURITY_VIOLATION, "rate_limit": AuditEventType.RATE_LIMIT_EXCEEDED, "request_success": AuditEventType.API_ACCESS, } event_type = event_type_map.get(event_type_str, AuditEventType.SYSTEM_EVENT) event = AuditEvent( event_type=event_type, timestamp=datetime.utcnow().isoformat() + "Z", message=event_data.get("event", "Unknown event"), user_id=event_data.get("user_id"), ip_address=event_data.get("ip"), endpoint=event_data.get("path"), method=event_data.get("method"), token_hash=event_data.get("token_hash"), success=event_type_str == "request_success", metadata=event_data ) self.log_event(event) except Exception as e: logger.error(f"Failed to write audit event: {e}") def log_authentication(self, user_id: Optional[str], ip_address: str, success: bool, method: str = "token", error_code: Optional[str] = None, **metadata): """Log un événement d'authentification.""" event = AuditEvent( event_type=AuditEventType.AUTHENTICATION, timestamp=datetime.utcnow().isoformat() + "Z", message=f"Authentication {'successful' if success else 'failed'} for user {user_id or 'anonymous'}", user_id=user_id, ip_address=ip_address, success=success, error_code=error_code, metadata={**metadata, "auth_method": method} ) self.log_event(event) def log_api_access(self, endpoint: str, method: str, ip_address: str, user_id: Optional[str] = None, status_code: int = 200, user_agent: Optional[str] = None, **metadata): """Log un accès API.""" event = AuditEvent( event_type=AuditEventType.API_ACCESS, timestamp=datetime.utcnow().isoformat() + "Z", message=f"{method} {endpoint} - {status_code}", user_id=user_id, ip_address=ip_address, endpoint=endpoint, method=method, user_agent=user_agent, success=200 <= status_code < 400, metadata={**metadata, "status_code": status_code} ) self.log_event(event) def log_security_violation(self, violation_type: str, ip_address: str, details: str, user_id: Optional[str] = None, **metadata): """Log une violation de sécurité.""" event = AuditEvent( event_type=AuditEventType.SECURITY_VIOLATION, timestamp=datetime.utcnow().isoformat() + "Z", message=f"Security violation: {violation_type} - {details}", user_id=user_id, ip_address=ip_address, success=False, metadata={**metadata, "violation_type": violation_type} ) self.log_event(event) def log_rate_limit_exceeded(self, identifier: str, endpoint: Optional[str], ip_address: str, **metadata): """Log un dépassement de limite de débit.""" event = AuditEvent( event_type=AuditEventType.RATE_LIMIT_EXCEEDED, timestamp=datetime.utcnow().isoformat() + "Z", message=f"Rate limit exceeded for {identifier} on {endpoint or 'default'}", ip_address=ip_address, endpoint=endpoint, success=False, metadata={**metadata, "identifier": identifier} ) self.log_event(event) def log_ip_blocked(self, ip_address: str, reason: str = "Not in allowlist", **metadata): """Log un blocage d'IP.""" event = AuditEvent( event_type=AuditEventType.IP_BLOCKED, timestamp=datetime.utcnow().isoformat() + "Z", message=f"IP {ip_address} blocked: {reason}", ip_address=ip_address, success=False, metadata={**metadata, "block_reason": reason} ) self.log_event(event) def log_token_validation(self, token_hash: str, ip_address: str, success: bool, user_id: Optional[str] = None, error_code: Optional[str] = None, **metadata): """Log une validation de token.""" event = AuditEvent( event_type=AuditEventType.TOKEN_VALIDATION, timestamp=datetime.utcnow().isoformat() + "Z", message=f"Token validation {'successful' if success else 'failed'}", user_id=user_id, ip_address=ip_address, token_hash=token_hash, success=success, error_code=error_code, metadata=metadata ) self.log_event(event) def log_system_event(self, event_name: str, details: str, **metadata): """Log un événement système.""" event = AuditEvent( event_type=AuditEventType.SYSTEM_EVENT, timestamp=datetime.utcnow().isoformat() + "Z", message=f"System event: {event_name} - {details}", metadata={**metadata, "event_name": event_name} ) self.log_event(event) def log_error(self, error_message: str, error_code: Optional[str] = None, ip_address: Optional[str] = None, user_id: Optional[str] = None, **metadata): """Log une erreur.""" event = AuditEvent( event_type=AuditEventType.ERROR, timestamp=datetime.utcnow().isoformat() + "Z", message=f"Error: {error_message}", user_id=user_id, ip_address=ip_address, success=False, error_code=error_code, metadata=metadata ) self.log_event(event) def get_audit_stats(self) -> Dict: """Retourne des statistiques sur les logs d'audit.""" stats = { "log_file": str(self.log_file), "log_file_exists": self.log_file.exists(), "log_file_size": self.log_file.stat().st_size if self.log_file.exists() else 0, "max_file_size": self.max_file_size, "max_files": self.max_files, "hash_sensitive_data": self.hash_sensitive_data } # Compter les lignes si le fichier existe if self.log_file.exists(): try: with open(self.log_file, 'r', encoding='utf-8') as f: stats["total_events"] = sum(1 for _ in f) except Exception as e: stats["total_events"] = f"Error counting: {e}" else: stats["total_events"] = 0 return stats # Instance globale _audit_logger = None def get_audit_logger() -> AuditLogger: """Retourne l'instance globale de l'audit logger.""" global _audit_logger if _audit_logger is None: _audit_logger = AuditLogger() return _audit_logger def log_security_event(event_type: str, message: str, **kwargs): """ Fonction utilitaire pour logger un événement de sécurité. Args: event_type: Type d'événement message: Message descriptif **kwargs: Métadonnées additionnelles """ audit_logger = get_audit_logger() if event_type == "authentication": audit_logger.log_authentication(**kwargs) elif event_type == "security_violation": audit_logger.log_security_violation(message, **kwargs) elif event_type == "rate_limit": audit_logger.log_rate_limit_exceeded(**kwargs) elif event_type == "ip_blocked": audit_logger.log_ip_blocked(**kwargs) else: audit_logger.log_system_event(event_type, message, **kwargs) def log_api_access(endpoint: str, method: str, ip_address: str, **kwargs): """ Fonction utilitaire pour logger un accès API. Args: endpoint: Endpoint accédé method: Méthode HTTP ip_address: IP du client **kwargs: Métadonnées additionnelles """ get_audit_logger().log_api_access(endpoint, method, ip_address, **kwargs)