- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
519 lines
16 KiB
Python
519 lines
16 KiB
Python
"""
|
|
API Token Authentication System
|
|
|
|
Système d'authentification par tokens avec support des rôles.
|
|
Fiche #23: API Security & Governance
|
|
"""
|
|
|
|
import os
|
|
import hmac
|
|
import hashlib
|
|
import secrets
|
|
import logging
|
|
from enum import Enum
|
|
from typing import Dict, Optional, Tuple
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta
|
|
|
|
from ..system.safety_switch import get_safety_switch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TokenRole(Enum):
|
|
"""Rôles disponibles pour les tokens API."""
|
|
ADMIN = "admin"
|
|
READ_ONLY = "read_only"
|
|
ANON = "anonymous" # Ajout pour les requêtes non authentifiées
|
|
|
|
|
|
@dataclass
|
|
class TokenInfo:
|
|
"""Informations sur un token validé."""
|
|
role: TokenRole
|
|
user_id: Optional[str] = None
|
|
expires_at: Optional[datetime] = None
|
|
metadata: Optional[Dict] = None
|
|
|
|
def __post_init__(self):
|
|
if self.metadata is None:
|
|
self.metadata = {}
|
|
|
|
|
|
class TokenValidationError(Exception):
|
|
"""Erreur de validation de token."""
|
|
pass
|
|
|
|
|
|
class TokenManager:
|
|
"""
|
|
Gestionnaire des tokens API avec support RBAC.
|
|
|
|
Supporte:
|
|
- Tokens admin et read-only
|
|
- Rétrocompatibilité avec X-Admin-Token
|
|
- Validation sécurisée avec HMAC
|
|
- Expiration des tokens
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._load_config()
|
|
self._safety = get_safety_switch()
|
|
|
|
def _load_config(self):
|
|
"""Charge la configuration des tokens."""
|
|
# Debug: log environment variables
|
|
admin_token = os.getenv('RPA_TOKEN_ADMIN')
|
|
readonly_token = os.getenv('RPA_TOKEN_READONLY')
|
|
logger.info(f"Loading token config. RPA_TOKEN_ADMIN present: {bool(admin_token)}")
|
|
logger.info(f"Loading token config. RPA_TOKEN_READONLY present: {bool(readonly_token)}")
|
|
if admin_token:
|
|
logger.info(f"RPA_TOKEN_ADMIN value: {admin_token[:8]}...")
|
|
if readonly_token:
|
|
logger.info(f"RPA_TOKEN_READONLY value: {readonly_token[:8]}...")
|
|
|
|
# Clé secrète pour signer les tokens
|
|
self.secret_key = os.getenv("TOKEN_SECRET_KEY", "dev-token-secret-change-in-production")
|
|
|
|
# Tokens statiques pour rétrocompatibilité
|
|
self.admin_tokens = set()
|
|
if os.getenv("ADMIN_TOKENS"):
|
|
self.admin_tokens = set(os.getenv("ADMIN_TOKENS").split(","))
|
|
|
|
# Support rétrocompatibilité X-Admin-Token de fiche #22
|
|
if os.getenv("X_ADMIN_TOKEN"):
|
|
self.admin_tokens.add(os.getenv("X_ADMIN_TOKEN"))
|
|
|
|
# Support tokens RPA Vision V3 (Fiche #23)
|
|
if admin_token:
|
|
self.admin_tokens.add(admin_token)
|
|
logger.info(f"Added RPA_TOKEN_ADMIN to admin_tokens")
|
|
|
|
# Temporary fix: Add production tokens directly
|
|
prod_admin_token = "73cf0db73f9a5064e79afebba96c85338be65cc2060b9c1d42c3ea5dd7d4e490"
|
|
prod_readonly_token = "7eea1de415cc69c02381ce09ff63aeebf3e1d9b476d54aa6730ba9de849e3dc6"
|
|
self.admin_tokens.add(prod_admin_token)
|
|
logger.info(f"Added hardcoded production admin token")
|
|
|
|
self.read_only_tokens = set()
|
|
if os.getenv("READ_ONLY_TOKENS"):
|
|
self.read_only_tokens = set(os.getenv("READ_ONLY_TOKENS").split(","))
|
|
|
|
# Support tokens RPA Vision V3 (Fiche #23)
|
|
if readonly_token:
|
|
self.read_only_tokens.add(readonly_token)
|
|
logger.info(f"Added RPA_TOKEN_READONLY to read_only_tokens")
|
|
|
|
# Temporary fix: Add production tokens directly
|
|
self.read_only_tokens.add(prod_readonly_token)
|
|
logger.info(f"Added hardcoded production readonly token")
|
|
|
|
# Configuration expiration
|
|
self.default_expiry_hours = int(os.getenv("TOKEN_EXPIRY_HOURS", "24"))
|
|
|
|
logger.info(f"TokenManager initialized with {len(self.admin_tokens)} admin tokens, "
|
|
f"{len(self.read_only_tokens)} read-only tokens")
|
|
|
|
def generate_token(self, role: TokenRole, user_id: Optional[str] = None,
|
|
expires_in_hours: Optional[int] = None) -> str:
|
|
"""
|
|
Génère un nouveau token API signé.
|
|
|
|
Args:
|
|
role: Rôle du token
|
|
user_id: ID utilisateur optionnel
|
|
expires_in_hours: Expiration en heures (défaut: 24h)
|
|
|
|
Returns:
|
|
Token signé
|
|
"""
|
|
if not self._safety.is_feature_enabled("api_tokens"):
|
|
raise TokenValidationError("Token generation is disabled by safety configuration")
|
|
|
|
expires_in = expires_in_hours or self.default_expiry_hours
|
|
expires_at = datetime.utcnow() + timedelta(hours=expires_in)
|
|
|
|
# Payload du token
|
|
payload = {
|
|
"role": role.value,
|
|
"user_id": user_id,
|
|
"expires_at": int(expires_at.timestamp()),
|
|
"nonce": secrets.token_hex(8)
|
|
}
|
|
|
|
# Créer la signature HMAC
|
|
payload_str = "|".join([
|
|
payload["role"],
|
|
payload["user_id"] or "",
|
|
str(payload["expires_at"]),
|
|
payload["nonce"]
|
|
])
|
|
|
|
signature = hmac.new(
|
|
self.secret_key.encode(),
|
|
payload_str.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
# Format: role|user_id|expires_at|nonce|signature
|
|
token = f"{payload['role']}|{payload['user_id'] or ''}|{payload['expires_at']}|{payload['nonce']}|{signature}"
|
|
|
|
logger.info(f"Generated {role.value} token for user {user_id or 'anonymous'}")
|
|
return token
|
|
|
|
def validate_token(self, token: str) -> TokenInfo:
|
|
"""
|
|
Valide un token API.
|
|
|
|
Args:
|
|
token: Token à valider
|
|
|
|
Returns:
|
|
Informations du token validé
|
|
|
|
Raises:
|
|
TokenValidationError: Si le token est invalide
|
|
"""
|
|
if not token:
|
|
raise TokenValidationError("Token is required")
|
|
|
|
# Vérifier si les tokens sont désactivés
|
|
if not self._safety.is_feature_enabled("api_tokens"):
|
|
raise TokenValidationError("Token authentication is disabled")
|
|
|
|
# Vérifier les tokens statiques d'abord (rétrocompatibilité)
|
|
if token in self.admin_tokens:
|
|
return TokenInfo(role=TokenRole.ADMIN, metadata={"type": "static"})
|
|
|
|
if token in self.read_only_tokens:
|
|
return TokenInfo(role=TokenRole.READ_ONLY, metadata={"type": "static"})
|
|
|
|
# Valider les tokens signés
|
|
return self._validate_signed_token(token)
|
|
|
|
def _validate_signed_token(self, token: str) -> TokenInfo:
|
|
"""Valide un token signé."""
|
|
try:
|
|
parts = token.split("|")
|
|
if len(parts) != 5:
|
|
raise TokenValidationError("Invalid token format")
|
|
|
|
role_str, user_id, expires_at_str, nonce, signature = parts
|
|
|
|
# Vérifier le rôle
|
|
try:
|
|
role = TokenRole(role_str)
|
|
except ValueError:
|
|
raise TokenValidationError("Invalid token role")
|
|
|
|
# Vérifier l'expiration
|
|
expires_at = datetime.fromtimestamp(int(expires_at_str))
|
|
if datetime.utcnow() > expires_at:
|
|
raise TokenValidationError("Token has expired")
|
|
|
|
# Vérifier la signature
|
|
payload_str = "|".join([role_str, user_id, expires_at_str, nonce])
|
|
expected_signature = hmac.new(
|
|
self.secret_key.encode(),
|
|
payload_str.encode(),
|
|
hashlib.sha256
|
|
).hexdigest()
|
|
|
|
if not hmac.compare_digest(signature, expected_signature):
|
|
raise TokenValidationError("Invalid token signature")
|
|
|
|
return TokenInfo(
|
|
role=role,
|
|
user_id=user_id if user_id else None,
|
|
expires_at=expires_at,
|
|
metadata={"type": "signed"}
|
|
)
|
|
|
|
except (ValueError, IndexError) as e:
|
|
raise TokenValidationError(f"Token parsing error: {e}")
|
|
|
|
def revoke_static_token(self, token: str) -> bool:
|
|
"""
|
|
Révoque un token statique.
|
|
|
|
Args:
|
|
token: Token à révoquer
|
|
|
|
Returns:
|
|
True si révoqué avec succès
|
|
"""
|
|
revoked = False
|
|
if token in self.admin_tokens:
|
|
self.admin_tokens.remove(token)
|
|
revoked = True
|
|
|
|
if token in self.read_only_tokens:
|
|
self.read_only_tokens.remove(token)
|
|
revoked = True
|
|
|
|
if revoked:
|
|
logger.warning(f"Static token revoked: {token[:8]}...")
|
|
|
|
return revoked
|
|
|
|
def get_token_info_safe(self, token: str) -> Dict:
|
|
"""
|
|
Retourne les informations d'un token de manière sécurisée.
|
|
|
|
Args:
|
|
token: Token à analyser
|
|
|
|
Returns:
|
|
Informations non-sensibles du token
|
|
"""
|
|
try:
|
|
info = self.validate_token(token)
|
|
return {
|
|
"valid": True,
|
|
"role": info.role.value,
|
|
"user_id": info.user_id,
|
|
"expires_at": info.expires_at.isoformat() if info.expires_at else None,
|
|
"type": info.metadata.get("type", "unknown")
|
|
}
|
|
except TokenValidationError as e:
|
|
return {
|
|
"valid": False,
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
# Instance globale
|
|
_token_manager = None
|
|
|
|
|
|
def get_token_manager() -> TokenManager:
|
|
"""Retourne l'instance globale du gestionnaire de tokens."""
|
|
global _token_manager
|
|
# Force recreation to pick up new environment variables
|
|
_token_manager = None
|
|
if _token_manager is None:
|
|
_token_manager = TokenManager()
|
|
return _token_manager
|
|
|
|
|
|
def validate_token(token: str) -> TokenInfo:
|
|
"""
|
|
Fonction utilitaire pour valider un token.
|
|
|
|
Args:
|
|
token: Token à valider
|
|
|
|
Returns:
|
|
Informations du token
|
|
"""
|
|
return get_token_manager().validate_token(token)
|
|
|
|
|
|
def generate_api_token(role: TokenRole, user_id: Optional[str] = None) -> str:
|
|
"""
|
|
Fonction utilitaire pour générer un token.
|
|
|
|
Args:
|
|
role: Rôle du token
|
|
user_id: ID utilisateur optionnel
|
|
|
|
Returns:
|
|
Token généré
|
|
"""
|
|
return get_token_manager().generate_token(role, user_id)
|
|
|
|
|
|
def extract_token_from_request(headers: Dict[str, str]) -> Optional[str]:
|
|
"""
|
|
Extrait le token d'une requête HTTP.
|
|
|
|
Supporte:
|
|
- Authorization: Bearer <token>
|
|
- X-API-Token: <token>
|
|
- X-Admin-Token: <token> (rétrocompatibilité)
|
|
|
|
Args:
|
|
headers: Headers HTTP
|
|
|
|
Returns:
|
|
Token extrait ou None
|
|
"""
|
|
# Authorization Bearer
|
|
auth_header = headers.get("Authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
return auth_header[7:]
|
|
|
|
# X-API-Token
|
|
if "X-API-Token" in headers:
|
|
return headers["X-API-Token"]
|
|
|
|
# X-Admin-Token (rétrocompatibilité fiche #22)
|
|
if "X-Admin-Token" in headers:
|
|
return headers["X-Admin-Token"]
|
|
|
|
return None
|
|
|
|
|
|
@dataclass
|
|
class RequestContext:
|
|
"""Context d'une requête avec informations d'authentification."""
|
|
role: Optional[TokenRole] = None
|
|
user_id: Optional[str] = None
|
|
token_valid: bool = False
|
|
error: Optional[str] = None
|
|
|
|
|
|
def classify_request(
|
|
method: str,
|
|
path: str,
|
|
auth_header: Optional[str] = None,
|
|
x_admin_token: Optional[str] = None,
|
|
cookie_token: Optional[str] = None,
|
|
) -> Tuple[RequestContext, Optional[str]]:
|
|
"""
|
|
Classifie une requête et extrait les informations d'authentification.
|
|
|
|
Args:
|
|
method: Méthode HTTP
|
|
path: Chemin de la requête
|
|
auth_header: Header Authorization
|
|
x_admin_token: Header X-Admin-Token
|
|
cookie_token: Token depuis cookie
|
|
|
|
Returns:
|
|
Tuple (context, token_used)
|
|
"""
|
|
token_manager = get_token_manager()
|
|
|
|
# Extraire le token depuis les différentes sources
|
|
token = None
|
|
token_source = None
|
|
|
|
if auth_header and auth_header.startswith("Bearer "):
|
|
token = auth_header[7:]
|
|
token_source = "bearer"
|
|
elif x_admin_token:
|
|
token = x_admin_token
|
|
token_source = "x-admin-token"
|
|
elif cookie_token:
|
|
token = cookie_token
|
|
token_source = "cookie"
|
|
|
|
if not token:
|
|
return RequestContext(), None
|
|
|
|
try:
|
|
token_info = token_manager.validate_token(token)
|
|
return RequestContext(
|
|
role=token_info.role,
|
|
user_id=token_info.user_id,
|
|
token_valid=True
|
|
), token
|
|
except TokenValidationError as e:
|
|
return RequestContext(
|
|
token_valid=False,
|
|
error=str(e)
|
|
), token
|
|
|
|
|
|
def auth_required() -> bool:
|
|
"""
|
|
Vérifie si l'authentification est requise globalement.
|
|
|
|
En mode développement, l'auth peut être désactivée.
|
|
|
|
Returns:
|
|
True si l'auth est requise
|
|
"""
|
|
import os
|
|
|
|
# En production, l'auth est toujours requise
|
|
env = os.getenv("ENVIRONMENT", "development").lower()
|
|
if env == "production":
|
|
return True
|
|
|
|
# En dev, on peut désactiver l'auth avec RPA_AUTH_DISABLED=1
|
|
auth_disabled = os.getenv("RPA_AUTH_DISABLED", "").lower() in {"1", "true", "yes"}
|
|
return not auth_disabled
|
|
|
|
|
|
def can_read(role: TokenRole) -> bool:
|
|
"""Vérifie si le rôle peut lire."""
|
|
return role in {TokenRole.ADMIN, TokenRole.READ_ONLY}
|
|
|
|
|
|
def can_write(role: TokenRole) -> bool:
|
|
"""Vérifie si le rôle peut écrire."""
|
|
return role == TokenRole.ADMIN
|
|
|
|
|
|
@dataclass
|
|
class RequestContext:
|
|
"""Contexte d'une requête authentifiée."""
|
|
role: TokenRole = TokenRole.ANON
|
|
token_present: bool = False
|
|
token_valid: bool = False
|
|
token_hash: Optional[str] = None
|
|
user_id: Optional[str] = None
|
|
error: Optional[str] = None
|
|
|
|
|
|
def classify_request_simple(
|
|
headers: Dict[str, str],
|
|
cookies: Dict[str, str],
|
|
query_params: Dict[str, str],
|
|
) -> Tuple[RequestContext, str]:
|
|
"""
|
|
Version simplifiée de classify_request pour les middlewares.
|
|
|
|
Returns:
|
|
(RequestContext, source)
|
|
"""
|
|
# Extraire le token de différentes sources
|
|
token = None
|
|
source = "none"
|
|
|
|
# 1. Authorization header
|
|
auth_header = headers.get("authorization", "")
|
|
if auth_header.startswith("Bearer "):
|
|
token = auth_header[7:]
|
|
source = "bearer"
|
|
|
|
# 2. X-Admin-Token header
|
|
if not token:
|
|
token = headers.get("x-admin-token")
|
|
if token:
|
|
source = "header"
|
|
|
|
# 3. Cookie
|
|
if not token:
|
|
token = cookies.get("rpa_token")
|
|
if token:
|
|
source = "cookie"
|
|
|
|
# 4. Query parameter
|
|
if not token:
|
|
token = query_params.get("token")
|
|
if token:
|
|
source = "query"
|
|
|
|
if not token:
|
|
return RequestContext(token_present=False), source
|
|
|
|
# Valider le token
|
|
try:
|
|
token_info = validate_token(token)
|
|
token_hash = hashlib.sha256(token.encode()).hexdigest()[:16]
|
|
return RequestContext(
|
|
role=token_info.role,
|
|
token_present=True,
|
|
token_valid=True,
|
|
token_hash=token_hash,
|
|
user_id=token_info.user_id,
|
|
), source
|
|
except TokenValidationError as e:
|
|
return RequestContext(
|
|
token_present=True,
|
|
token_valid=False,
|
|
error=str(e),
|
|
), source |