- Frontend v4 accessible sur réseau local (192.168.1.40) - Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard) - Ollama GPU fonctionnel - Self-healing interactif - Dashboard confiance Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
408 lines
15 KiB
Python
408 lines
15 KiB
Python
"""
|
|
Audit Logging System
|
|
|
|
Système de logging d'audit en format JSONL pour traçabilité sécurisée.
|
|
Fiche #23: API Security & Governance
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import logging
|
|
import hashlib
|
|
from datetime import datetime
|
|
from typing import Dict, Any, Optional
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
from enum import Enum
|
|
|
|
from ..system.safety_switch import get_safety_switch
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AuditEventType(Enum):
|
|
"""Types d'événements d'audit."""
|
|
AUTHENTICATION = "authentication"
|
|
AUTHORIZATION = "authorization"
|
|
API_ACCESS = "api_access"
|
|
SECURITY_VIOLATION = "security_violation"
|
|
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
|
|
IP_BLOCKED = "ip_blocked"
|
|
TOKEN_VALIDATION = "token_validation"
|
|
SYSTEM_EVENT = "system_event"
|
|
ERROR = "error"
|
|
|
|
|
|
@dataclass
|
|
class AuditEvent:
|
|
"""Événement d'audit structuré."""
|
|
event_type: AuditEventType
|
|
timestamp: str
|
|
message: str
|
|
user_id: Optional[str] = None
|
|
ip_address: Optional[str] = None
|
|
endpoint: Optional[str] = None
|
|
method: Optional[str] = None
|
|
user_agent: Optional[str] = None
|
|
token_hash: Optional[str] = None
|
|
success: bool = True
|
|
error_code: Optional[str] = None
|
|
metadata: Dict[str, Any] = None
|
|
|
|
def __post_init__(self):
|
|
if self.metadata is None:
|
|
self.metadata = {}
|
|
|
|
|
|
class AuditLogger:
|
|
"""
|
|
Logger d'audit avec format JSONL et rotation automatique.
|
|
|
|
Fonctionnalités:
|
|
- Format JSONL pour parsing facile
|
|
- Rotation automatique des logs
|
|
- Hachage des données sensibles
|
|
- Métadonnées contextuelles
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._safety = get_safety_switch()
|
|
self._load_config()
|
|
self._setup_logging()
|
|
|
|
@classmethod
|
|
def from_env(cls):
|
|
"""Crée une instance depuis les variables d'environnement (compatibilité FastAPI)."""
|
|
return cls()
|
|
|
|
def _load_config(self):
|
|
"""Charge la configuration depuis les variables d'environnement."""
|
|
self.log_dir = Path(os.getenv("AUDIT_LOG_DIR", "logs/audit"))
|
|
self.log_file = self.log_dir / "audit.jsonl"
|
|
self.max_file_size = int(os.getenv("AUDIT_LOG_MAX_SIZE", "10485760")) # 10MB
|
|
self.max_files = int(os.getenv("AUDIT_LOG_MAX_FILES", "10"))
|
|
self.hash_sensitive_data = os.getenv("AUDIT_HASH_SENSITIVE", "true").lower() == "true"
|
|
|
|
# Créer le répertoire de logs
|
|
self.log_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
logger.info(f"AuditLogger initialized: {self.log_file}")
|
|
|
|
def _setup_logging(self):
|
|
"""Configure le logger Python pour l'audit."""
|
|
self.audit_logger = logging.getLogger("audit")
|
|
self.audit_logger.setLevel(logging.INFO)
|
|
|
|
# Éviter la duplication si déjà configuré
|
|
if not self.audit_logger.handlers:
|
|
handler = logging.FileHandler(self.log_file, encoding='utf-8')
|
|
handler.setLevel(logging.INFO)
|
|
|
|
# Format simple car on écrit du JSON
|
|
formatter = logging.Formatter('%(message)s')
|
|
handler.setFormatter(formatter)
|
|
|
|
self.audit_logger.addHandler(handler)
|
|
self.audit_logger.propagate = False
|
|
|
|
def _hash_sensitive_value(self, value: str) -> str:
|
|
"""Hash une valeur sensible pour le logging."""
|
|
if not self.hash_sensitive_data:
|
|
return value
|
|
|
|
return hashlib.sha256(value.encode()).hexdigest()[:16]
|
|
|
|
def _rotate_logs_if_needed(self):
|
|
"""Effectue la rotation des logs si nécessaire."""
|
|
if not self.log_file.exists():
|
|
return
|
|
|
|
if self.log_file.stat().st_size > self.max_file_size:
|
|
# Rotation des fichiers existants
|
|
for i in range(self.max_files - 1, 0, -1):
|
|
old_file = self.log_dir / f"audit.jsonl.{i}"
|
|
new_file = self.log_dir / f"audit.jsonl.{i + 1}"
|
|
|
|
if old_file.exists():
|
|
if new_file.exists():
|
|
new_file.unlink()
|
|
old_file.rename(new_file)
|
|
|
|
# Renommer le fichier actuel
|
|
rotated_file = self.log_dir / "audit.jsonl.1"
|
|
if rotated_file.exists():
|
|
rotated_file.unlink()
|
|
self.log_file.rename(rotated_file)
|
|
|
|
# Reconfigurer le handler
|
|
for handler in self.audit_logger.handlers:
|
|
handler.close()
|
|
self.audit_logger.handlers.clear()
|
|
self._setup_logging()
|
|
|
|
logger.info("Audit log rotated")
|
|
|
|
def log_event(self, event: AuditEvent):
|
|
"""
|
|
Enregistre un événement d'audit.
|
|
|
|
Args:
|
|
event: Événement à enregistrer
|
|
"""
|
|
if not self._safety.is_feature_enabled("audit_logging"):
|
|
return
|
|
|
|
try:
|
|
# Rotation si nécessaire
|
|
self._rotate_logs_if_needed()
|
|
|
|
# Préparer les données pour JSON
|
|
event_dict = asdict(event)
|
|
event_dict["event_type"] = event.event_type.value
|
|
|
|
# Hasher les données sensibles
|
|
if event.token_hash and self.hash_sensitive_data:
|
|
event_dict["token_hash"] = self._hash_sensitive_value(event.token_hash)
|
|
|
|
if event.ip_address and self.hash_sensitive_data:
|
|
# Pour les IPs, on peut garder les 3 premiers octets
|
|
ip_parts = event.ip_address.split(".")
|
|
if len(ip_parts) == 4:
|
|
event_dict["ip_address"] = f"{ip_parts[0]}.{ip_parts[1]}.{ip_parts[2]}.xxx"
|
|
else:
|
|
event_dict["ip_address"] = self._hash_sensitive_value(event.ip_address)
|
|
|
|
# Écrire en JSONL
|
|
json_line = json.dumps(event_dict, ensure_ascii=False, separators=(',', ':'))
|
|
self.audit_logger.info(json_line)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to log audit event: {e}")
|
|
|
|
def write(self, event_data: Dict[str, Any]):
|
|
"""
|
|
Écrit un événement d'audit (compatibilité FastAPI).
|
|
|
|
Args:
|
|
event_data: Données de l'événement
|
|
"""
|
|
try:
|
|
# Convertir les données en AuditEvent
|
|
event_type_str = event_data.get("event", "system_event")
|
|
|
|
# Mapper les types d'événements
|
|
event_type_map = {
|
|
"ip_block": AuditEventType.IP_BLOCKED,
|
|
"auth_fail": AuditEventType.AUTHENTICATION,
|
|
"killswitch_block": AuditEventType.SECURITY_VIOLATION,
|
|
"demo_safe_block": AuditEventType.SECURITY_VIOLATION,
|
|
"rate_limit": AuditEventType.RATE_LIMIT_EXCEEDED,
|
|
"request_success": AuditEventType.API_ACCESS,
|
|
}
|
|
|
|
event_type = event_type_map.get(event_type_str, AuditEventType.SYSTEM_EVENT)
|
|
|
|
event = AuditEvent(
|
|
event_type=event_type,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=event_data.get("event", "Unknown event"),
|
|
user_id=event_data.get("user_id"),
|
|
ip_address=event_data.get("ip"),
|
|
endpoint=event_data.get("path"),
|
|
method=event_data.get("method"),
|
|
token_hash=event_data.get("token_hash"),
|
|
success=event_type_str == "request_success",
|
|
metadata=event_data
|
|
)
|
|
|
|
self.log_event(event)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to write audit event: {e}")
|
|
|
|
def log_authentication(self, user_id: Optional[str], ip_address: str,
|
|
success: bool, method: str = "token",
|
|
error_code: Optional[str] = None, **metadata):
|
|
"""Log un événement d'authentification."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.AUTHENTICATION,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"Authentication {'successful' if success else 'failed'} for user {user_id or 'anonymous'}",
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
success=success,
|
|
error_code=error_code,
|
|
metadata={**metadata, "auth_method": method}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_api_access(self, endpoint: str, method: str, ip_address: str,
|
|
user_id: Optional[str] = None, status_code: int = 200,
|
|
user_agent: Optional[str] = None, **metadata):
|
|
"""Log un accès API."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.API_ACCESS,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"{method} {endpoint} - {status_code}",
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
endpoint=endpoint,
|
|
method=method,
|
|
user_agent=user_agent,
|
|
success=200 <= status_code < 400,
|
|
metadata={**metadata, "status_code": status_code}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_security_violation(self, violation_type: str, ip_address: str,
|
|
details: str, user_id: Optional[str] = None, **metadata):
|
|
"""Log une violation de sécurité."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.SECURITY_VIOLATION,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"Security violation: {violation_type} - {details}",
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
success=False,
|
|
metadata={**metadata, "violation_type": violation_type}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_rate_limit_exceeded(self, identifier: str, endpoint: Optional[str],
|
|
ip_address: str, **metadata):
|
|
"""Log un dépassement de limite de débit."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.RATE_LIMIT_EXCEEDED,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"Rate limit exceeded for {identifier} on {endpoint or 'default'}",
|
|
ip_address=ip_address,
|
|
endpoint=endpoint,
|
|
success=False,
|
|
metadata={**metadata, "identifier": identifier}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_ip_blocked(self, ip_address: str, reason: str = "Not in allowlist", **metadata):
|
|
"""Log un blocage d'IP."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.IP_BLOCKED,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"IP {ip_address} blocked: {reason}",
|
|
ip_address=ip_address,
|
|
success=False,
|
|
metadata={**metadata, "block_reason": reason}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_token_validation(self, token_hash: str, ip_address: str,
|
|
success: bool, user_id: Optional[str] = None,
|
|
error_code: Optional[str] = None, **metadata):
|
|
"""Log une validation de token."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.TOKEN_VALIDATION,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"Token validation {'successful' if success else 'failed'}",
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
token_hash=token_hash,
|
|
success=success,
|
|
error_code=error_code,
|
|
metadata=metadata
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_system_event(self, event_name: str, details: str, **metadata):
|
|
"""Log un événement système."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.SYSTEM_EVENT,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"System event: {event_name} - {details}",
|
|
metadata={**metadata, "event_name": event_name}
|
|
)
|
|
self.log_event(event)
|
|
|
|
def log_error(self, error_message: str, error_code: Optional[str] = None,
|
|
ip_address: Optional[str] = None, user_id: Optional[str] = None, **metadata):
|
|
"""Log une erreur."""
|
|
event = AuditEvent(
|
|
event_type=AuditEventType.ERROR,
|
|
timestamp=datetime.utcnow().isoformat() + "Z",
|
|
message=f"Error: {error_message}",
|
|
user_id=user_id,
|
|
ip_address=ip_address,
|
|
success=False,
|
|
error_code=error_code,
|
|
metadata=metadata
|
|
)
|
|
self.log_event(event)
|
|
|
|
def get_audit_stats(self) -> Dict:
|
|
"""Retourne des statistiques sur les logs d'audit."""
|
|
stats = {
|
|
"log_file": str(self.log_file),
|
|
"log_file_exists": self.log_file.exists(),
|
|
"log_file_size": self.log_file.stat().st_size if self.log_file.exists() else 0,
|
|
"max_file_size": self.max_file_size,
|
|
"max_files": self.max_files,
|
|
"hash_sensitive_data": self.hash_sensitive_data
|
|
}
|
|
|
|
# Compter les lignes si le fichier existe
|
|
if self.log_file.exists():
|
|
try:
|
|
with open(self.log_file, 'r', encoding='utf-8') as f:
|
|
stats["total_events"] = sum(1 for _ in f)
|
|
except Exception as e:
|
|
stats["total_events"] = f"Error counting: {e}"
|
|
else:
|
|
stats["total_events"] = 0
|
|
|
|
return stats
|
|
|
|
|
|
# Instance globale
|
|
_audit_logger = None
|
|
|
|
|
|
def get_audit_logger() -> AuditLogger:
|
|
"""Retourne l'instance globale de l'audit logger."""
|
|
global _audit_logger
|
|
if _audit_logger is None:
|
|
_audit_logger = AuditLogger()
|
|
return _audit_logger
|
|
|
|
|
|
def log_security_event(event_type: str, message: str, **kwargs):
|
|
"""
|
|
Fonction utilitaire pour logger un événement de sécurité.
|
|
|
|
Args:
|
|
event_type: Type d'événement
|
|
message: Message descriptif
|
|
**kwargs: Métadonnées additionnelles
|
|
"""
|
|
audit_logger = get_audit_logger()
|
|
|
|
if event_type == "authentication":
|
|
audit_logger.log_authentication(**kwargs)
|
|
elif event_type == "security_violation":
|
|
audit_logger.log_security_violation(message, **kwargs)
|
|
elif event_type == "rate_limit":
|
|
audit_logger.log_rate_limit_exceeded(**kwargs)
|
|
elif event_type == "ip_blocked":
|
|
audit_logger.log_ip_blocked(**kwargs)
|
|
else:
|
|
audit_logger.log_system_event(event_type, message, **kwargs)
|
|
|
|
|
|
def log_api_access(endpoint: str, method: str, ip_address: str, **kwargs):
|
|
"""
|
|
Fonction utilitaire pour logger un accès API.
|
|
|
|
Args:
|
|
endpoint: Endpoint accédé
|
|
method: Méthode HTTP
|
|
ip_address: IP du client
|
|
**kwargs: Métadonnées additionnelles
|
|
"""
|
|
get_audit_logger().log_api_access(endpoint, method, ip_address, **kwargs) |