""" Rate Limiting System Système de limitation de débit avec algorithme token bucket. Fiche #23: API Security & Governance """ import os import time import threading import logging from typing import Dict, Optional, Tuple from dataclasses import dataclass, field from collections import defaultdict from ..system.safety_switch import get_safety_switch logger = logging.getLogger(__name__) @dataclass class TokenBucket: """ Implémentation d'un token bucket pour rate limiting. Algorithme: - Capacité maximale de tokens - Refill rate (tokens par seconde) - Consommation de tokens par requête """ capacity: int refill_rate: float # tokens par seconde tokens: float = field(init=False) last_refill: float = field(init=False) lock: threading.Lock = field(default_factory=threading.Lock, init=False) def __post_init__(self): self.tokens = float(self.capacity) self.last_refill = time.time() def consume(self, tokens_needed: int = 1) -> bool: """ Tente de consommer des tokens. Args: tokens_needed: Nombre de tokens nécessaires Returns: True si les tokens ont pu être consommés """ with self.lock: now = time.time() # Refill des tokens basé sur le temps écoulé time_passed = now - self.last_refill tokens_to_add = time_passed * self.refill_rate self.tokens = min(self.capacity, self.tokens + tokens_to_add) self.last_refill = now # Vérifier si on a assez de tokens if self.tokens >= tokens_needed: self.tokens -= tokens_needed return True return False def get_status(self) -> Dict: """Retourne le statut actuel du bucket.""" with self.lock: now = time.time() time_passed = now - self.last_refill current_tokens = min(self.capacity, self.tokens + (time_passed * self.refill_rate)) return { "capacity": self.capacity, "current_tokens": current_tokens, "refill_rate": self.refill_rate, "time_to_refill": max(0, (self.capacity - current_tokens) / self.refill_rate) if self.refill_rate > 0 else 0 } @dataclass class RateLimitConfig: """Configuration pour un endpoint ou utilisateur.""" requests_per_minute: int = 60 burst_capacity: int = 10 tokens_per_request: int = 1 class RateLimitExceeded(Exception): """Exception levée quand la limite de débit est dépassée.""" def __init__(self, message: str, retry_after: float = 0): super().__init__(message) self.retry_after = retry_after class RateLimiter: """ Gestionnaire de limitation de débit avec token buckets. Fonctionnalités: - Limitation par IP, utilisateur, endpoint - Configuration flexible par règle - Headers HTTP informatifs - Nettoyage automatique des buckets inactifs """ def __init__(self): self._buckets: Dict[str, TokenBucket] = {} self._configs: Dict[str, RateLimitConfig] = {} self._lock = threading.Lock() self._safety = get_safety_switch() self._load_config() # Nettoyage périodique des buckets inactifs self._last_cleanup = time.time() self._cleanup_interval = 300 # 5 minutes @classmethod def from_env(cls): """Crée une instance depuis les variables d'environnement (compatibilité FastAPI).""" return cls() def _load_config(self): """Charge la configuration depuis les variables d'environnement.""" # Configuration par défaut default_rpm = int(os.getenv("DEFAULT_RATE_LIMIT_RPM", "60")) default_burst = int(os.getenv("DEFAULT_RATE_LIMIT_BURST", "10")) self._default_config = RateLimitConfig( requests_per_minute=default_rpm, burst_capacity=default_burst ) # Configurations spécifiques par endpoint # Format: RATE_LIMIT_=rpm:burst for key, value in os.environ.items(): if key.startswith("RATE_LIMIT_") and key != "RATE_LIMIT_RPM" and key != "RATE_LIMIT_BURST": endpoint = key[11:].lower() # Enlever "RATE_LIMIT_" try: if ":" in value: rpm_str, burst_str = value.split(":", 1) rpm = int(rpm_str) burst = int(burst_str) else: rpm = int(value) burst = default_burst self._configs[endpoint] = RateLimitConfig( requests_per_minute=rpm, burst_capacity=burst ) logger.debug(f"Rate limit config for {endpoint}: {rpm} RPM, {burst} burst") except ValueError as e: logger.error(f"Invalid rate limit config for {key}: {value} - {e}") logger.info(f"RateLimiter initialized with default {default_rpm} RPM, {default_burst} burst") def _get_config(self, endpoint: Optional[str] = None) -> RateLimitConfig: """Retourne la configuration pour un endpoint.""" if endpoint and endpoint.lower() in self._configs: return self._configs[endpoint.lower()] return self._default_config def _get_bucket_key(self, identifier: str, endpoint: Optional[str] = None) -> str: """Génère une clé unique pour le bucket.""" if endpoint: return f"{identifier}:{endpoint}" return identifier def _get_or_create_bucket(self, key: str, config: RateLimitConfig) -> TokenBucket: """Récupère ou crée un token bucket.""" with self._lock: if key not in self._buckets: # Convertir RPM en tokens par seconde refill_rate = config.requests_per_minute / 60.0 self._buckets[key] = TokenBucket( capacity=config.burst_capacity, refill_rate=refill_rate ) logger.debug(f"Created token bucket for {key}: {config.burst_capacity} capacity, {refill_rate:.2f} refill/s") return self._buckets[key] def check(self, identifier: str) -> Tuple[bool, float]: """ Vérifie la limite de débit (compatibilité FastAPI). Args: identifier: Identifiant unique Returns: Tuple (allowed, retry_after_seconds) """ allowed, headers = self.check_rate_limit(identifier) retry_after = float(headers.get("Retry-After", "0")) return allowed, retry_after def check_rate_limit(self, identifier: str, endpoint: Optional[str] = None, tokens_needed: int = 1) -> Tuple[bool, Dict[str, str]]: """ Vérifie la limite de débit pour un identifiant. Args: identifier: Identifiant unique (IP, user_id, etc.) endpoint: Endpoint optionnel pour des limites spécifiques tokens_needed: Nombre de tokens nécessaires Returns: Tuple (allowed, headers) où headers contient les informations de rate limiting """ if not self._safety.is_feature_enabled("rate_limiting"): return True, {} # Nettoyage périodique self._cleanup_inactive_buckets() config = self._get_config(endpoint) bucket_key = self._get_bucket_key(identifier, endpoint) bucket = self._get_or_create_bucket(bucket_key, config) # Tenter de consommer les tokens allowed = bucket.consume(tokens_needed) # Générer les headers informatifs status = bucket.get_status() headers = { "X-RateLimit-Limit": str(config.requests_per_minute), "X-RateLimit-Remaining": str(int(status["current_tokens"])), "X-RateLimit-Reset": str(int(time.time() + status["time_to_refill"])) } if not allowed: headers["Retry-After"] = str(int(status["time_to_refill"]) + 1) logger.warning(f"Rate limit exceeded for {identifier} on {endpoint or 'default'}") return allowed, headers def enforce_rate_limit(self, identifier: str, endpoint: Optional[str] = None, tokens_needed: int = 1) -> Dict[str, str]: """ Applique la limitation de débit et lève une exception si dépassée. Args: identifier: Identifiant unique endpoint: Endpoint optionnel tokens_needed: Nombre de tokens nécessaires Returns: Headers de rate limiting Raises: RateLimitExceeded: Si la limite est dépassée """ allowed, headers = self.check_rate_limit(identifier, endpoint, tokens_needed) if not allowed: retry_after = float(headers.get("Retry-After", "60")) raise RateLimitExceeded( f"Rate limit exceeded for {identifier}. Try again in {retry_after} seconds.", retry_after=retry_after ) return headers def _cleanup_inactive_buckets(self): """Nettoie les buckets inactifs pour économiser la mémoire.""" now = time.time() if now - self._last_cleanup < self._cleanup_interval: return with self._lock: inactive_keys = [] cutoff_time = now - 3600 # 1 heure d'inactivité for key, bucket in self._buckets.items(): if bucket.last_refill < cutoff_time: inactive_keys.append(key) for key in inactive_keys: del self._buckets[key] if inactive_keys: logger.debug(f"Cleaned up {len(inactive_keys)} inactive rate limit buckets") self._last_cleanup = now def reset_rate_limit(self, identifier: str, endpoint: Optional[str] = None) -> bool: """ Remet à zéro la limite de débit pour un identifiant. Args: identifier: Identifiant à remettre à zéro endpoint: Endpoint optionnel Returns: True si remis à zéro avec succès """ bucket_key = self._get_bucket_key(identifier, endpoint) with self._lock: if bucket_key in self._buckets: del self._buckets[bucket_key] logger.info(f"Reset rate limit for {identifier} on {endpoint or 'default'}") return True return False def get_rate_limit_status(self, identifier: str, endpoint: Optional[str] = None) -> Dict: """ Retourne le statut de rate limiting pour un identifiant. Args: identifier: Identifiant à vérifier endpoint: Endpoint optionnel Returns: Statut du rate limiting """ bucket_key = self._get_bucket_key(identifier, endpoint) config = self._get_config(endpoint) with self._lock: if bucket_key in self._buckets: bucket_status = self._buckets[bucket_key].get_status() return { "identifier": identifier, "endpoint": endpoint, "config": { "requests_per_minute": config.requests_per_minute, "burst_capacity": config.burst_capacity }, "current_status": bucket_status } else: return { "identifier": identifier, "endpoint": endpoint, "config": { "requests_per_minute": config.requests_per_minute, "burst_capacity": config.burst_capacity }, "current_status": { "capacity": config.burst_capacity, "current_tokens": config.burst_capacity, "refill_rate": config.requests_per_minute / 60.0, "time_to_refill": 0 } } def get_global_status(self) -> Dict: """Retourne le statut global du rate limiter.""" with self._lock: return { "enabled": self._safety.is_feature_enabled("rate_limiting"), "active_buckets": len(self._buckets), "default_config": { "requests_per_minute": self._default_config.requests_per_minute, "burst_capacity": self._default_config.burst_capacity }, "endpoint_configs": { endpoint: { "requests_per_minute": config.requests_per_minute, "burst_capacity": config.burst_capacity } for endpoint, config in self._configs.items() } } # Instance globale _rate_limiter = None def get_rate_limiter() -> RateLimiter: """Retourne l'instance globale du rate limiter.""" global _rate_limiter if _rate_limiter is None: _rate_limiter = RateLimiter() return _rate_limiter def check_rate_limit(identifier: str, endpoint: Optional[str] = None) -> Tuple[bool, Dict[str, str]]: """ Fonction utilitaire pour vérifier une limite de débit. Args: identifier: Identifiant unique endpoint: Endpoint optionnel Returns: Tuple (allowed, headers) """ return get_rate_limiter().check_rate_limit(identifier, endpoint)