""" API Token Authentication System Système d'authentification par tokens avec support des rôles. Fiche #23: API Security & Governance """ import os import hmac import hashlib import secrets import logging from enum import Enum from typing import Dict, Optional, Tuple from dataclasses import dataclass from datetime import datetime, timedelta from ..system.safety_switch import get_safety_switch logger = logging.getLogger(__name__) class TokenRole(Enum): """Rôles disponibles pour les tokens API.""" ADMIN = "admin" READ_ONLY = "read_only" ANON = "anonymous" # Ajout pour les requêtes non authentifiées @dataclass class TokenInfo: """Informations sur un token validé.""" role: TokenRole user_id: Optional[str] = None expires_at: Optional[datetime] = None metadata: Optional[Dict] = None def __post_init__(self): if self.metadata is None: self.metadata = {} class TokenValidationError(Exception): """Erreur de validation de token.""" pass class TokenManager: """ Gestionnaire des tokens API avec support RBAC. Supporte: - Tokens admin et read-only - Rétrocompatibilité avec X-Admin-Token - Validation sécurisée avec HMAC - Expiration des tokens """ def __init__(self): self._load_config() self._safety = get_safety_switch() def _load_config(self): """Charge la configuration des tokens.""" # Debug: log environment variables admin_token = os.getenv('RPA_TOKEN_ADMIN') readonly_token = os.getenv('RPA_TOKEN_READONLY') logger.info(f"Loading token config. RPA_TOKEN_ADMIN present: {bool(admin_token)}") logger.info(f"Loading token config. RPA_TOKEN_READONLY present: {bool(readonly_token)}") if admin_token: logger.info("RPA_TOKEN_ADMIN configuré") if readonly_token: logger.info("RPA_TOKEN_READONLY configuré") # Clé secrète pour signer les tokens — OBLIGATOIRE en production self.secret_key = os.getenv("TOKEN_SECRET_KEY", "") if not self.secret_key: logger.warning( "TOKEN_SECRET_KEY non défini — utilisation d'une clé aléatoire. " "Définir TOKEN_SECRET_KEY dans .env.local pour la production." ) import secrets self.secret_key = secrets.token_hex(32) # Tokens statiques pour rétrocompatibilité self.admin_tokens = set() if os.getenv("ADMIN_TOKENS"): self.admin_tokens = set(os.getenv("ADMIN_TOKENS").split(",")) # Support rétrocompatibilité X-Admin-Token de fiche #22 if os.getenv("X_ADMIN_TOKEN"): self.admin_tokens.add(os.getenv("X_ADMIN_TOKEN")) # Support tokens RPA Vision V3 (Fiche #23) if admin_token: self.admin_tokens.add(admin_token) logger.info(f"Added RPA_TOKEN_ADMIN to admin_tokens") # Tokens de production : lus EXCLUSIVEMENT depuis les variables d'environnement. # Ne JAMAIS hardcoder de tokens dans le code source. prod_admin_token = os.getenv("RPA_PROD_ADMIN_TOKEN", "") prod_readonly_token = os.getenv("RPA_PROD_READONLY_TOKEN", "") if prod_admin_token: self.admin_tokens.add(prod_admin_token) logger.info("Added RPA_PROD_ADMIN_TOKEN to admin_tokens") self.read_only_tokens = set() if os.getenv("READ_ONLY_TOKENS"): self.read_only_tokens = set(os.getenv("READ_ONLY_TOKENS").split(",")) # Support tokens RPA Vision V3 (Fiche #23) if readonly_token: self.read_only_tokens.add(readonly_token) logger.info("Added RPA_TOKEN_READONLY to read_only_tokens") if prod_readonly_token: self.read_only_tokens.add(prod_readonly_token) logger.info("Added RPA_PROD_READONLY_TOKEN to read_only_tokens") # Configuration expiration self.default_expiry_hours = int(os.getenv("TOKEN_EXPIRY_HOURS", "24")) logger.info(f"TokenManager initialized with {len(self.admin_tokens)} admin tokens, " f"{len(self.read_only_tokens)} read-only tokens") def generate_token(self, role: TokenRole, user_id: Optional[str] = None, expires_in_hours: Optional[int] = None) -> str: """ Génère un nouveau token API signé. Args: role: Rôle du token user_id: ID utilisateur optionnel expires_in_hours: Expiration en heures (défaut: 24h) Returns: Token signé """ if not self._safety.is_feature_enabled("api_tokens"): raise TokenValidationError("Token generation is disabled by safety configuration") expires_in = expires_in_hours or self.default_expiry_hours expires_at = datetime.utcnow() + timedelta(hours=expires_in) # Payload du token payload = { "role": role.value, "user_id": user_id, "expires_at": int(expires_at.timestamp()), "nonce": secrets.token_hex(8) } # Créer la signature HMAC payload_str = "|".join([ payload["role"], payload["user_id"] or "", str(payload["expires_at"]), payload["nonce"] ]) signature = hmac.new( self.secret_key.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() # Format: role|user_id|expires_at|nonce|signature token = f"{payload['role']}|{payload['user_id'] or ''}|{payload['expires_at']}|{payload['nonce']}|{signature}" logger.info(f"Generated {role.value} token for user {user_id or 'anonymous'}") return token def validate_token(self, token: str) -> TokenInfo: """ Valide un token API. Args: token: Token à valider Returns: Informations du token validé Raises: TokenValidationError: Si le token est invalide """ if not token: raise TokenValidationError("Token is required") # Vérifier si les tokens sont désactivés if not self._safety.is_feature_enabled("api_tokens"): raise TokenValidationError("Token authentication is disabled") # Vérifier les tokens statiques d'abord (rétrocompatibilité) if token in self.admin_tokens: return TokenInfo(role=TokenRole.ADMIN, metadata={"type": "static"}) if token in self.read_only_tokens: return TokenInfo(role=TokenRole.READ_ONLY, metadata={"type": "static"}) # Valider les tokens signés return self._validate_signed_token(token) def _validate_signed_token(self, token: str) -> TokenInfo: """Valide un token signé.""" try: parts = token.split("|") if len(parts) != 5: raise TokenValidationError("Invalid token format") role_str, user_id, expires_at_str, nonce, signature = parts # Vérifier le rôle try: role = TokenRole(role_str) except ValueError: raise TokenValidationError("Invalid token role") # Vérifier l'expiration expires_at = datetime.fromtimestamp(int(expires_at_str)) if datetime.utcnow() > expires_at: raise TokenValidationError("Token has expired") # Vérifier la signature payload_str = "|".join([role_str, user_id, expires_at_str, nonce]) expected_signature = hmac.new( self.secret_key.encode(), payload_str.encode(), hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected_signature): raise TokenValidationError("Invalid token signature") return TokenInfo( role=role, user_id=user_id if user_id else None, expires_at=expires_at, metadata={"type": "signed"} ) except (ValueError, IndexError) as e: raise TokenValidationError(f"Token parsing error: {e}") def revoke_static_token(self, token: str) -> bool: """ Révoque un token statique. Args: token: Token à révoquer Returns: True si révoqué avec succès """ revoked = False if token in self.admin_tokens: self.admin_tokens.remove(token) revoked = True if token in self.read_only_tokens: self.read_only_tokens.remove(token) revoked = True if revoked: logger.warning(f"Static token revoked: {token[:8]}...") return revoked def get_token_info_safe(self, token: str) -> Dict: """ Retourne les informations d'un token de manière sécurisée. Args: token: Token à analyser Returns: Informations non-sensibles du token """ try: info = self.validate_token(token) return { "valid": True, "role": info.role.value, "user_id": info.user_id, "expires_at": info.expires_at.isoformat() if info.expires_at else None, "type": info.metadata.get("type", "unknown") } except TokenValidationError as e: return { "valid": False, "error": str(e) } # Instance globale _token_manager = None def get_token_manager() -> TokenManager: """Retourne l'instance globale du gestionnaire de tokens.""" global _token_manager # Force recreation to pick up new environment variables _token_manager = None if _token_manager is None: _token_manager = TokenManager() return _token_manager def validate_token(token: str) -> TokenInfo: """ Fonction utilitaire pour valider un token. Args: token: Token à valider Returns: Informations du token """ return get_token_manager().validate_token(token) def generate_api_token(role: TokenRole, user_id: Optional[str] = None) -> str: """ Fonction utilitaire pour générer un token. Args: role: Rôle du token user_id: ID utilisateur optionnel Returns: Token généré """ return get_token_manager().generate_token(role, user_id) def extract_token_from_request(headers: Dict[str, str]) -> Optional[str]: """ Extrait le token d'une requête HTTP. Supporte: - Authorization: Bearer - X-API-Token: - X-Admin-Token: (rétrocompatibilité) Args: headers: Headers HTTP Returns: Token extrait ou None """ # Authorization Bearer auth_header = headers.get("Authorization", "") if auth_header.startswith("Bearer "): return auth_header[7:] # X-API-Token if "X-API-Token" in headers: return headers["X-API-Token"] # X-Admin-Token (rétrocompatibilité fiche #22) if "X-Admin-Token" in headers: return headers["X-Admin-Token"] return None @dataclass class RequestContext: """Context d'une requête avec informations d'authentification.""" role: Optional[TokenRole] = None user_id: Optional[str] = None token_valid: bool = False error: Optional[str] = None def classify_request( method: str, path: str, auth_header: Optional[str] = None, x_admin_token: Optional[str] = None, cookie_token: Optional[str] = None, ) -> Tuple[RequestContext, Optional[str]]: """ Classifie une requête et extrait les informations d'authentification. Args: method: Méthode HTTP path: Chemin de la requête auth_header: Header Authorization x_admin_token: Header X-Admin-Token cookie_token: Token depuis cookie Returns: Tuple (context, token_used) """ token_manager = get_token_manager() # Extraire le token depuis les différentes sources token = None token_source = None if auth_header and auth_header.startswith("Bearer "): token = auth_header[7:] token_source = "bearer" elif x_admin_token: token = x_admin_token token_source = "x-admin-token" elif cookie_token: token = cookie_token token_source = "cookie" if not token: return RequestContext(), None try: token_info = token_manager.validate_token(token) return RequestContext( role=token_info.role, user_id=token_info.user_id, token_valid=True ), token except TokenValidationError as e: return RequestContext( token_valid=False, error=str(e) ), token def auth_required() -> bool: """ Vérifie si l'authentification est requise globalement. En mode développement, l'auth peut être désactivée. Returns: True si l'auth est requise """ import os # En production, l'auth est toujours requise env = os.getenv("ENVIRONMENT", "development").lower() if env == "production": return True # En dev, on peut désactiver l'auth avec RPA_AUTH_DISABLED=1 auth_disabled = os.getenv("RPA_AUTH_DISABLED", "").lower() in {"1", "true", "yes"} return not auth_disabled def can_read(role: TokenRole) -> bool: """Vérifie si le rôle peut lire.""" return role in {TokenRole.ADMIN, TokenRole.READ_ONLY} def can_write(role: TokenRole) -> bool: """Vérifie si le rôle peut écrire.""" return role == TokenRole.ADMIN @dataclass class RequestContext: """Contexte d'une requête authentifiée.""" role: TokenRole = TokenRole.ANON token_present: bool = False token_valid: bool = False token_hash: Optional[str] = None user_id: Optional[str] = None error: Optional[str] = None def classify_request_simple( headers: Dict[str, str], cookies: Dict[str, str], query_params: Dict[str, str], ) -> Tuple[RequestContext, str]: """ Version simplifiée de classify_request pour les middlewares. Returns: (RequestContext, source) """ # Extraire le token de différentes sources token = None source = "none" # 1. Authorization header auth_header = headers.get("authorization", "") if auth_header.startswith("Bearer "): token = auth_header[7:] source = "bearer" # 2. X-Admin-Token header if not token: token = headers.get("x-admin-token") if token: source = "header" # 3. Cookie if not token: token = cookies.get("rpa_token") if token: source = "cookie" # 4. Query parameter if not token: token = query_params.get("token") if token: source = "query" if not token: return RequestContext(token_present=False), source # Valider le token try: token_info = validate_token(token) token_hash = hashlib.sha256(token.encode()).hexdigest()[:16] return RequestContext( role=token_info.role, token_present=True, token_valid=True, token_hash=token_hash, user_id=token_info.user_id, ), source except TokenValidationError as e: return RequestContext( token_present=True, token_valid=False, error=str(e), ), source