Files
rpa_vision_v3/core/security/rate_limiter.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

400 lines
14 KiB
Python

"""
Rate Limiting System
Système de limitation de débit avec algorithme token bucket.
Fiche #23: API Security & Governance
"""
import os
import time
import threading
import logging
from typing import Dict, Optional, Tuple
from dataclasses import dataclass, field
from collections import defaultdict
from ..system.safety_switch import get_safety_switch
logger = logging.getLogger(__name__)
@dataclass
class TokenBucket:
"""
Implémentation d'un token bucket pour rate limiting.
Algorithme:
- Capacité maximale de tokens
- Refill rate (tokens par seconde)
- Consommation de tokens par requête
"""
capacity: int
refill_rate: float # tokens par seconde
tokens: float = field(init=False)
last_refill: float = field(init=False)
lock: threading.Lock = field(default_factory=threading.Lock, init=False)
def __post_init__(self):
self.tokens = float(self.capacity)
self.last_refill = time.time()
def consume(self, tokens_needed: int = 1) -> bool:
"""
Tente de consommer des tokens.
Args:
tokens_needed: Nombre de tokens nécessaires
Returns:
True si les tokens ont pu être consommés
"""
with self.lock:
now = time.time()
# Refill des tokens basé sur le temps écoulé
time_passed = now - self.last_refill
tokens_to_add = time_passed * self.refill_rate
self.tokens = min(self.capacity, self.tokens + tokens_to_add)
self.last_refill = now
# Vérifier si on a assez de tokens
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
def get_status(self) -> Dict:
"""Retourne le statut actuel du bucket."""
with self.lock:
now = time.time()
time_passed = now - self.last_refill
current_tokens = min(self.capacity, self.tokens + (time_passed * self.refill_rate))
return {
"capacity": self.capacity,
"current_tokens": current_tokens,
"refill_rate": self.refill_rate,
"time_to_refill": max(0, (self.capacity - current_tokens) / self.refill_rate) if self.refill_rate > 0 else 0
}
@dataclass
class RateLimitConfig:
"""Configuration pour un endpoint ou utilisateur."""
requests_per_minute: int = 60
burst_capacity: int = 10
tokens_per_request: int = 1
class RateLimitExceeded(Exception):
"""Exception levée quand la limite de débit est dépassée."""
def __init__(self, message: str, retry_after: float = 0):
super().__init__(message)
self.retry_after = retry_after
class RateLimiter:
"""
Gestionnaire de limitation de débit avec token buckets.
Fonctionnalités:
- Limitation par IP, utilisateur, endpoint
- Configuration flexible par règle
- Headers HTTP informatifs
- Nettoyage automatique des buckets inactifs
"""
def __init__(self):
self._buckets: Dict[str, TokenBucket] = {}
self._configs: Dict[str, RateLimitConfig] = {}
self._lock = threading.Lock()
self._safety = get_safety_switch()
self._load_config()
# Nettoyage périodique des buckets inactifs
self._last_cleanup = time.time()
self._cleanup_interval = 300 # 5 minutes
@classmethod
def from_env(cls):
"""Crée une instance depuis les variables d'environnement (compatibilité FastAPI)."""
return cls()
def _load_config(self):
"""Charge la configuration depuis les variables d'environnement."""
# Configuration par défaut
default_rpm = int(os.getenv("DEFAULT_RATE_LIMIT_RPM", "60"))
default_burst = int(os.getenv("DEFAULT_RATE_LIMIT_BURST", "10"))
self._default_config = RateLimitConfig(
requests_per_minute=default_rpm,
burst_capacity=default_burst
)
# Configurations spécifiques par endpoint
# Format: RATE_LIMIT_<ENDPOINT>=rpm:burst
for key, value in os.environ.items():
if key.startswith("RATE_LIMIT_") and key != "RATE_LIMIT_RPM" and key != "RATE_LIMIT_BURST":
endpoint = key[11:].lower() # Enlever "RATE_LIMIT_"
try:
if ":" in value:
rpm_str, burst_str = value.split(":", 1)
rpm = int(rpm_str)
burst = int(burst_str)
else:
rpm = int(value)
burst = default_burst
self._configs[endpoint] = RateLimitConfig(
requests_per_minute=rpm,
burst_capacity=burst
)
logger.debug(f"Rate limit config for {endpoint}: {rpm} RPM, {burst} burst")
except ValueError as e:
logger.error(f"Invalid rate limit config for {key}: {value} - {e}")
logger.info(f"RateLimiter initialized with default {default_rpm} RPM, {default_burst} burst")
def _get_config(self, endpoint: Optional[str] = None) -> RateLimitConfig:
"""Retourne la configuration pour un endpoint."""
if endpoint and endpoint.lower() in self._configs:
return self._configs[endpoint.lower()]
return self._default_config
def _get_bucket_key(self, identifier: str, endpoint: Optional[str] = None) -> str:
"""Génère une clé unique pour le bucket."""
if endpoint:
return f"{identifier}:{endpoint}"
return identifier
def _get_or_create_bucket(self, key: str, config: RateLimitConfig) -> TokenBucket:
"""Récupère ou crée un token bucket."""
with self._lock:
if key not in self._buckets:
# Convertir RPM en tokens par seconde
refill_rate = config.requests_per_minute / 60.0
self._buckets[key] = TokenBucket(
capacity=config.burst_capacity,
refill_rate=refill_rate
)
logger.debug(f"Created token bucket for {key}: {config.burst_capacity} capacity, {refill_rate:.2f} refill/s")
return self._buckets[key]
def check(self, identifier: str) -> Tuple[bool, float]:
"""
Vérifie la limite de débit (compatibilité FastAPI).
Args:
identifier: Identifiant unique
Returns:
Tuple (allowed, retry_after_seconds)
"""
allowed, headers = self.check_rate_limit(identifier)
retry_after = float(headers.get("Retry-After", "0"))
return allowed, retry_after
def check_rate_limit(self, identifier: str, endpoint: Optional[str] = None,
tokens_needed: int = 1) -> Tuple[bool, Dict[str, str]]:
"""
Vérifie la limite de débit pour un identifiant.
Args:
identifier: Identifiant unique (IP, user_id, etc.)
endpoint: Endpoint optionnel pour des limites spécifiques
tokens_needed: Nombre de tokens nécessaires
Returns:
Tuple (allowed, headers) où headers contient les informations de rate limiting
"""
if not self._safety.is_feature_enabled("rate_limiting"):
return True, {}
# Nettoyage périodique
self._cleanup_inactive_buckets()
config = self._get_config(endpoint)
bucket_key = self._get_bucket_key(identifier, endpoint)
bucket = self._get_or_create_bucket(bucket_key, config)
# Tenter de consommer les tokens
allowed = bucket.consume(tokens_needed)
# Générer les headers informatifs
status = bucket.get_status()
headers = {
"X-RateLimit-Limit": str(config.requests_per_minute),
"X-RateLimit-Remaining": str(int(status["current_tokens"])),
"X-RateLimit-Reset": str(int(time.time() + status["time_to_refill"]))
}
if not allowed:
headers["Retry-After"] = str(int(status["time_to_refill"]) + 1)
logger.warning(f"Rate limit exceeded for {identifier} on {endpoint or 'default'}")
return allowed, headers
def enforce_rate_limit(self, identifier: str, endpoint: Optional[str] = None,
tokens_needed: int = 1) -> Dict[str, str]:
"""
Applique la limitation de débit et lève une exception si dépassée.
Args:
identifier: Identifiant unique
endpoint: Endpoint optionnel
tokens_needed: Nombre de tokens nécessaires
Returns:
Headers de rate limiting
Raises:
RateLimitExceeded: Si la limite est dépassée
"""
allowed, headers = self.check_rate_limit(identifier, endpoint, tokens_needed)
if not allowed:
retry_after = float(headers.get("Retry-After", "60"))
raise RateLimitExceeded(
f"Rate limit exceeded for {identifier}. Try again in {retry_after} seconds.",
retry_after=retry_after
)
return headers
def _cleanup_inactive_buckets(self):
"""Nettoie les buckets inactifs pour économiser la mémoire."""
now = time.time()
if now - self._last_cleanup < self._cleanup_interval:
return
with self._lock:
inactive_keys = []
cutoff_time = now - 3600 # 1 heure d'inactivité
for key, bucket in self._buckets.items():
if bucket.last_refill < cutoff_time:
inactive_keys.append(key)
for key in inactive_keys:
del self._buckets[key]
if inactive_keys:
logger.debug(f"Cleaned up {len(inactive_keys)} inactive rate limit buckets")
self._last_cleanup = now
def reset_rate_limit(self, identifier: str, endpoint: Optional[str] = None) -> bool:
"""
Remet à zéro la limite de débit pour un identifiant.
Args:
identifier: Identifiant à remettre à zéro
endpoint: Endpoint optionnel
Returns:
True si remis à zéro avec succès
"""
bucket_key = self._get_bucket_key(identifier, endpoint)
with self._lock:
if bucket_key in self._buckets:
del self._buckets[bucket_key]
logger.info(f"Reset rate limit for {identifier} on {endpoint or 'default'}")
return True
return False
def get_rate_limit_status(self, identifier: str, endpoint: Optional[str] = None) -> Dict:
"""
Retourne le statut de rate limiting pour un identifiant.
Args:
identifier: Identifiant à vérifier
endpoint: Endpoint optionnel
Returns:
Statut du rate limiting
"""
bucket_key = self._get_bucket_key(identifier, endpoint)
config = self._get_config(endpoint)
with self._lock:
if bucket_key in self._buckets:
bucket_status = self._buckets[bucket_key].get_status()
return {
"identifier": identifier,
"endpoint": endpoint,
"config": {
"requests_per_minute": config.requests_per_minute,
"burst_capacity": config.burst_capacity
},
"current_status": bucket_status
}
else:
return {
"identifier": identifier,
"endpoint": endpoint,
"config": {
"requests_per_minute": config.requests_per_minute,
"burst_capacity": config.burst_capacity
},
"current_status": {
"capacity": config.burst_capacity,
"current_tokens": config.burst_capacity,
"refill_rate": config.requests_per_minute / 60.0,
"time_to_refill": 0
}
}
def get_global_status(self) -> Dict:
"""Retourne le statut global du rate limiter."""
with self._lock:
return {
"enabled": self._safety.is_feature_enabled("rate_limiting"),
"active_buckets": len(self._buckets),
"default_config": {
"requests_per_minute": self._default_config.requests_per_minute,
"burst_capacity": self._default_config.burst_capacity
},
"endpoint_configs": {
endpoint: {
"requests_per_minute": config.requests_per_minute,
"burst_capacity": config.burst_capacity
}
for endpoint, config in self._configs.items()
}
}
# Instance globale
_rate_limiter = None
def get_rate_limiter() -> RateLimiter:
"""Retourne l'instance globale du rate limiter."""
global _rate_limiter
if _rate_limiter is None:
_rate_limiter = RateLimiter()
return _rate_limiter
def check_rate_limit(identifier: str, endpoint: Optional[str] = None) -> Tuple[bool, Dict[str, str]]:
"""
Fonction utilitaire pour vérifier une limite de débit.
Args:
identifier: Identifiant unique
endpoint: Endpoint optionnel
Returns:
Tuple (allowed, headers)
"""
return get_rate_limiter().check_rate_limit(identifier, endpoint)