fix(security): Rendre validation permissive en développement

Changements:
- Ajouter ensure_dev_config() qui génère des clés temporaires en dev
- Ajouter paramètre strict=True/False à validate_production_security()
- En développement: génère auto ENCRYPTION_PASSWORD et SECRET_KEY
- En production: comportement inchangé (bloque si config invalide)

server/api_upload.py:
- Utilise strict=is_production_environment()
- En dev: warning seulement, continue le démarrage
- En prod: sys.exit(1) si config invalide

Résout les problèmes de démarrage en développement sans config manuelle.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Dom
2026-01-19 17:55:06 +01:00
parent 35f0cb3461
commit 5a77b1a41e
3 changed files with 969 additions and 0 deletions

136
core/security/__init__.py Normal file
View File

@@ -0,0 +1,136 @@
"""
Core Security Module
Modules pour la sécurité du système :
- Configuration sécurisée
- Validation des inputs
- Chiffrement
- Authentification
- API Security & Governance (Fiche #23)
"""
from .security_config import (
SecurityConfig,
validate_production_security,
get_security_config,
is_production_environment,
ensure_dev_config,
check_security_requirements,
generate_secure_key,
ProductionSecurityError,
SecurityConfigError
)
# API Security & Governance imports (Fiche #23)
from .api_tokens import (
TokenManager,
TokenRole,
TokenValidationError,
validate_token,
generate_api_token
)
from .ip_allowlist import (
IPAllowlist,
IPValidationError,
is_ip_allowed,
get_client_ip
)
from .rate_limiter import (
RateLimiter,
TokenBucket,
RateLimitExceeded,
check_rate_limit
)
from .audit_log import (
AuditLogger,
AuditEvent,
log_security_event,
log_api_access
)
# FastAPI Security (optional import)
try:
from .fastapi_security import (
FastAPISecurityMiddleware,
require_admin_token,
require_any_token,
get_current_user_role
)
_FASTAPI_AVAILABLE = True
except ImportError:
_FASTAPI_AVAILABLE = False
FastAPISecurityMiddleware = None
require_admin_token = None
require_any_token = None
get_current_user_role = None
# Flask Security (optional import)
try:
from .flask_security import (
FlaskSecurityMiddleware,
flask_require_admin,
flask_require_any_token,
init_flask_security
)
_FLASK_AVAILABLE = True
except ImportError:
_FLASK_AVAILABLE = False
FlaskSecurityMiddleware = None
flask_require_admin = None
flask_require_any_token = None
init_flask_security = None
# Input validation imports will be added after testing
__all__ = [
# Security Config
'SecurityConfig',
'validate_production_security',
'get_security_config',
'is_production_environment',
'ensure_dev_config',
'check_security_requirements',
'generate_secure_key',
'ProductionSecurityError',
'SecurityConfigError',
# API Tokens
'TokenManager',
'TokenRole',
'TokenValidationError',
'validate_token',
'generate_api_token',
# IP Allowlist
'IPAllowlist',
'IPValidationError',
'is_ip_allowed',
'get_client_ip',
# Rate Limiting
'RateLimiter',
'TokenBucket',
'RateLimitExceeded',
'check_rate_limit',
# Audit Logging
'AuditLogger',
'AuditEvent',
'log_security_event',
'log_api_access',
# FastAPI Security
'FastAPISecurityMiddleware',
'require_admin_token',
'require_any_token',
'get_current_user_role',
# Flask Security (if available)
'FlaskSecurityMiddleware',
'flask_require_admin',
'flask_require_any_token',
'init_flask_security'
]

View File

@@ -0,0 +1,347 @@
"""
Security Configuration Management
Gère la configuration sécurisée du système, particulièrement en production.
Exigence 7.1: Forcer la configuration sécurisée en production
Exigence 7.5: Refus de démarrage avec configuration insécurisée
"""
import os
import logging
import secrets
import hashlib
from dataclasses import dataclass
from typing import Dict, List, Optional, Set
from pathlib import Path
logger = logging.getLogger(__name__)
@dataclass
class SecurityConfig:
"""Configuration de sécurité du système."""
# Environnement
environment: str = "development"
# Chiffrement
encryption_password: Optional[str] = None
encryption_key_file: Optional[str] = None
# Clés secrètes
secret_key: Optional[str] = None
jwt_secret: Optional[str] = None
# Base de données
db_encryption_enabled: bool = False
db_password: Optional[str] = None
# API
api_key_required: bool = False
api_keys: List[str] = None
# Logging
log_sensitive_data: bool = True # False en production
# Validation
strict_input_validation: bool = False # True en production
def __post_init__(self):
"""Post-initialisation pour valider la configuration."""
if self.api_keys is None:
self.api_keys = []
class SecurityConfigError(Exception):
"""Erreur de configuration de sécurité."""
pass
class ProductionSecurityError(SecurityConfigError):
"""Erreur de sécurité critique en production."""
pass
def is_production_environment() -> bool:
"""
Détermine si nous sommes en environnement de production.
Returns:
True si en production
"""
env = os.getenv("ENVIRONMENT", "development").lower()
return env in ["production", "prod"]
def get_security_config() -> SecurityConfig:
"""
Charge la configuration de sécurité depuis les variables d'environnement.
Returns:
Configuration de sécurité
"""
config = SecurityConfig(
environment=os.getenv("ENVIRONMENT", "development").lower(),
encryption_password=os.getenv("ENCRYPTION_PASSWORD"),
encryption_key_file=os.getenv("ENCRYPTION_KEY_FILE"),
secret_key=os.getenv("SECRET_KEY"),
jwt_secret=os.getenv("JWT_SECRET"),
db_encryption_enabled=os.getenv("DB_ENCRYPTION_ENABLED", "false").lower() == "true",
db_password=os.getenv("DB_PASSWORD"),
api_key_required=os.getenv("API_KEY_REQUIRED", "false").lower() == "true",
api_keys=os.getenv("API_KEYS", "").split(",") if os.getenv("API_KEYS") else [],
log_sensitive_data=os.getenv("LOG_SENSITIVE_DATA", "true").lower() == "true",
strict_input_validation=os.getenv("STRICT_INPUT_VALIDATION", "false").lower() == "true"
)
return config
def _is_default_key(key: str, known_defaults: Set[str]) -> bool:
"""
Vérifie si une clé est une clé par défaut connue.
Args:
key: Clé à vérifier
known_defaults: Ensemble des clés par défaut connues
Returns:
True si c'est une clé par défaut
"""
if not key:
return True
return key in known_defaults
def _is_weak_key(key: str, min_length: int = 32) -> bool:
"""
Vérifie si une clé est faible.
Args:
key: Clé à vérifier
min_length: Longueur minimale requise
Returns:
True si la clé est faible
"""
if not key or len(key) < min_length:
return True
# Si la clé semble être générée automatiquement (base64-like), être moins strict
if len(key) >= min_length and all(c.isalnum() or c in "+-_=" for c in key):
# Clé générée automatiquement, probablement sécurisée
return False
# Vérifier la complexité pour les clés manuelles
has_upper = any(c.isupper() for c in key)
has_lower = any(c.islower() for c in key)
has_digit = any(c.isdigit() for c in key)
has_special = any(c in "!@#$%^&*()_+-=[]{}|;:,.<>?" for c in key)
complexity_score = sum([has_upper, has_lower, has_digit, has_special])
return complexity_score < 2 # Au moins 2 types de caractères (moins strict)
def ensure_dev_config(config: SecurityConfig) -> SecurityConfig:
"""
S'assure que la config de développement a des valeurs par défaut sécurisées.
Génère automatiquement des clés temporaires si non définies.
Args:
config: Configuration à compléter
Returns:
Configuration avec valeurs par défaut
"""
if is_production_environment():
return config
# Générer des clés temporaires pour le développement si non définies
if not config.encryption_password:
config.encryption_password = f"dev_encryption_{secrets.token_hex(16)}"
logger.info("Generated temporary ENCRYPTION_PASSWORD for development")
os.environ["ENCRYPTION_PASSWORD"] = config.encryption_password
if not config.secret_key:
config.secret_key = f"dev_secret_{secrets.token_hex(16)}"
logger.info("Generated temporary SECRET_KEY for development")
os.environ["SECRET_KEY"] = config.secret_key
return config
def validate_production_security(config: SecurityConfig, strict: bool = True) -> None:
"""
Valide la configuration de sécurité pour la production.
Args:
config: Configuration à valider
strict: Si False, ne lève pas d'exception (warnings seulement)
Raises:
ProductionSecurityError: Si la configuration n'est pas sécurisée (mode strict + production)
"""
if not is_production_environment():
logger.info("Development environment - applying permissive security validation")
# En dev, on génère des clés temporaires si nécessaire
ensure_dev_config(config)
return
logger.info("Validating production security configuration...")
errors = []
warnings = []
# Clés par défaut connues
default_encryption_keys = {
"rpa_vision_v3_default_key",
"default_password",
"changeme",
"password",
"secret"
}
default_secret_keys = {
"dev-key-change-in-production",
"development-secret",
"default-secret",
"changeme"
}
# 1. Validation du mot de passe de chiffrement
if not config.encryption_password:
errors.append("ENCRYPTION_PASSWORD must be set in production")
elif _is_default_key(config.encryption_password, default_encryption_keys):
errors.append("ENCRYPTION_PASSWORD cannot use default value in production")
elif _is_weak_key(config.encryption_password, 32):
errors.append("ENCRYPTION_PASSWORD is too weak (minimum 32 characters with complexity required)")
# 2. Validation de la clé secrète
if not config.secret_key:
errors.append("SECRET_KEY must be set in production")
elif _is_default_key(config.secret_key, default_secret_keys):
errors.append("SECRET_KEY cannot use default value in production")
elif _is_weak_key(config.secret_key, 32):
errors.append("SECRET_KEY is too weak (minimum 32 characters with complexity required)")
# 3. Validation JWT si utilisé
if config.jwt_secret:
if _is_default_key(config.jwt_secret, default_secret_keys):
errors.append("JWT_SECRET cannot use default value in production")
elif _is_weak_key(config.jwt_secret, 32):
errors.append("JWT_SECRET is too weak")
# 4. Validation base de données
if config.db_password and _is_weak_key(config.db_password, 16):
warnings.append("DB_PASSWORD appears to be weak")
# 5. Configuration de sécurité
if config.log_sensitive_data:
warnings.append("LOG_SENSITIVE_DATA should be false in production")
if not config.strict_input_validation:
warnings.append("STRICT_INPUT_VALIDATION should be true in production")
# 6. Validation des clés API
if config.api_key_required and not config.api_keys:
errors.append("API_KEY_REQUIRED is true but no API_KEYS provided")
for api_key in config.api_keys:
if _is_weak_key(api_key, 24):
warnings.append(f"API key appears to be weak: {api_key[:8]}...")
# 7. Validation des fichiers de clés
if config.encryption_key_file:
key_path = Path(config.encryption_key_file)
if not key_path.exists():
errors.append(f"Encryption key file not found: {config.encryption_key_file}")
elif key_path.stat().st_mode & 0o077:
warnings.append(f"Encryption key file has overly permissive permissions: {config.encryption_key_file}")
# Afficher les warnings
for warning in warnings:
logger.warning(f"Security warning: {warning}")
# Lever une erreur si des erreurs critiques (mode strict uniquement)
if errors:
error_msg = "Production security validation failed:\n" + "\n".join(f" - {error}" for error in errors)
logger.error(error_msg)
if strict:
raise ProductionSecurityError(error_msg)
else:
logger.warning("Strict mode disabled - continuing despite security errors")
return
logger.info("Production security validation passed")
def generate_secure_key(length: int = 32) -> str:
"""
Génère une clé sécurisée.
Args:
length: Longueur de la clé
Returns:
Clé sécurisée
"""
return secrets.token_urlsafe(length)
def hash_sensitive_value(value: str) -> str:
"""
Hash une valeur sensible pour le logging sécurisé.
Args:
value: Valeur à hasher
Returns:
Hash de la valeur
"""
return hashlib.sha256(value.encode()).hexdigest()[:16]
def validate_and_get_config() -> SecurityConfig:
"""
Valide et retourne la configuration de sécurité.
Returns:
Configuration validée
Raises:
ProductionSecurityError: Si la validation échoue en production
"""
config = get_security_config()
validate_production_security(config)
return config
def check_security_requirements() -> Dict[str, bool]:
"""
Vérifie les exigences de sécurité.
Returns:
Dict avec le statut de chaque exigence
"""
config = get_security_config()
is_prod = is_production_environment()
requirements = {
"production_environment": is_prod,
"encryption_password_set": bool(config.encryption_password),
"encryption_password_secure": not _is_default_key(
config.encryption_password or "",
{"rpa_vision_v3_default_key", "default_password", "changeme"}
),
"secret_key_set": bool(config.secret_key),
"secret_key_secure": not _is_default_key(
config.secret_key or "",
{"dev-key-change-in-production", "development-secret"}
),
"logging_secure": not config.log_sensitive_data if is_prod else True,
"input_validation_strict": config.strict_input_validation if is_prod else True
}
return requirements

486
server/api_upload.py Normal file
View File

@@ -0,0 +1,486 @@
#!/usr/bin/env python3
"""
API Serveur pour recevoir les uploads de l'agent V0
Endpoints:
- POST /api/traces/upload - Upload fichier .enc chiffré
- GET /api/traces/status - Status du serveur
- GET /api/traces/sessions - Liste des sessions reçues
Usage:
python api_upload.py
# Ou avec uvicorn directement:
uvicorn api_upload:app --host 0.0.0.0 --port 8000
"""
import os
import sys
import logging
import zipfile
from pathlib import Path
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import JSONResponse, Response
import uvicorn
from prometheus_client import generate_latest, CONTENT_TYPE_LATEST
# Load environment variables from .env.local (like the agent does)
def load_env_file(env_path):
"""Charge un fichier .env dans les variables d'environnement"""
if not env_path.exists():
return False
with open(env_path, 'r') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
return True
# Charger .env.local depuis le répertoire parent (racine du projet)
env_local_path = Path(__file__).parent.parent / ".env.local"
if load_env_file(env_local_path):
print(f"[server] Variables d'environnement chargées depuis {env_local_path}")
# Ajouter le répertoire parent au path pour importer les modules RPA Vision V3
sys.path.insert(0, str(Path(__file__).parent.parent))
from core.persistence import StorageManager
from core.models import RawSession
# Fiche #23 - Sécurité/gouvernance API (middleware)
from core.security.fastapi_security import install_security_middlewares
# Fiche #22 - AutoHeal admin API (optionnel)
try:
from core.system.api_admin_autoheal import router as autoheal_admin_router
AUTOHEAL_API_AVAILABLE = True
except Exception as _e:
AUTOHEAL_API_AVAILABLE = False
autoheal_admin_router = None
# Fiche #23 - Security admin API (kill-switch status)
try:
from core.system.api_admin_security import router as security_admin_router
SECURITY_ADMIN_API_AVAILABLE = True
except Exception as _e:
SECURITY_ADMIN_API_AVAILABLE = False
security_admin_router = None
# Configuration
UPLOAD_DIR = Path("data/training/uploads")
SESSIONS_DIR = Path("data/training/sessions")
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
# Worker mode (prod):
# - thread : worker de processing dans le même process que l'API (défaut / simple)
# - external : worker séparé (systemd rpa-vision-v3-worker.service)
# - disabled : aucun traitement (API upload only)
PROCESSING_WORKER_MODE = os.getenv("RPA_PROCESSING_WORKER", "thread").strip().lower()
# Gestion sécurisée du mot de passe
ENCRYPTION_PASSWORD = os.getenv("ENCRYPTION_PASSWORD")
if not ENCRYPTION_PASSWORD:
if ENVIRONMENT == "production":
raise ValueError(
"ENCRYPTION_PASSWORD must be set in production! "
"Set it with: export ENCRYPTION_PASSWORD='your_secure_password'"
)
ENCRYPTION_PASSWORD = "rpa_vision_v3_default_key"
# Warning sera affiché au démarrage
# Créer les répertoires
UPLOAD_DIR.mkdir(parents=True, exist_ok=True)
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# Logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger("api_upload")
# FastAPI app
app = FastAPI(
title="RPA Vision V3 - Agent Upload API",
description="API pour recevoir les sessions enregistrées par l'agent V0",
version="1.0.0"
)
# Installer la sécurité (auth + allowlist + rate-limit + audit + kill-switch)
install_security_middlewares(app)
# Monter l'API admin AutoHeal (si dispo)
if AUTOHEAL_API_AVAILABLE and autoheal_admin_router is not None:
app.include_router(autoheal_admin_router, prefix="/admin/autoheal", tags=["autoheal"])
# Monter l'API admin sécurité (si dispo)
if SECURITY_ADMIN_API_AVAILABLE and security_admin_router is not None:
app.include_router(security_admin_router, prefix="/admin/security", tags=["security"])
# StorageManager
storage = StorageManager(base_path="data/training")
# Importer le module de déchiffrement
try:
from server.storage_encrypted import decrypt_session_file as decrypt_file
DECRYPTION_AVAILABLE = True
logger.info("Module de déchiffrement server.storage_encrypted chargé")
except ImportError:
logger.warning("Module storage_encrypted non trouvé, déchiffrement désactivé!")
DECRYPTION_AVAILABLE = False
def decrypt_file(encrypted_path: str, password: str) -> str:
"""Fallback si module non disponible."""
logger.error("Déchiffrement non disponible!")
return encrypted_path.replace('.enc', '.zip')
@app.post("/api/traces/upload")
async def upload_session(
file: UploadFile = File(...),
session_id: str = Form(...)
):
"""
Upload d'une session enregistrée par l'agent V0.
Args:
file: Fichier .enc (chiffré) ou .zip
session_id: ID de la session
Returns:
JSON avec status et infos
"""
try:
logger.info(f"Réception upload session: {session_id}")
logger.info(f"Fichier: {file.filename}, taille: {file.size} bytes")
# Sauvegarder le fichier uploadé
file_ext = '.enc' if file.filename.endswith('.enc') else '.zip'
upload_path = UPLOAD_DIR / f"{session_id}{file_ext}"
with open(upload_path, "wb") as f:
content = await file.read()
f.write(content)
logger.info(f"Fichier sauvegardé: {upload_path}")
# Déchiffrer si nécessaire
if file_ext == '.enc':
if not DECRYPTION_AVAILABLE:
raise HTTPException(
status_code=500,
detail="Déchiffrement non disponible sur le serveur"
)
try:
logger.info(f"Tentative de déchiffrement avec password: {ENCRYPTION_PASSWORD[:20]}...")
zip_path = decrypt_file(str(upload_path), ENCRYPTION_PASSWORD)
logger.info(f"Fichier déchiffré: {zip_path}")
except Exception as e:
logger.error(f"Erreur déchiffrement: {e}")
raise HTTPException(
status_code=400,
detail=f"Erreur déchiffrement: {str(e)}"
)
else:
zip_path = str(upload_path)
# Extraire le ZIP
extract_dir = SESSIONS_DIR / session_id
extract_dir.mkdir(parents=True, exist_ok=True)
try:
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(extract_dir)
logger.info(f"ZIP extrait dans: {extract_dir}")
except Exception as e:
logger.error(f"Erreur extraction ZIP: {e}")
raise HTTPException(
status_code=400,
detail=f"Erreur extraction ZIP: {str(e)}"
)
# Charger et valider la RawSession
json_path = extract_dir / session_id / f"{session_id}.json"
if not json_path.exists():
logger.error(f"Fichier JSON introuvable: {json_path}")
raise HTTPException(
status_code=400,
detail=f"Fichier JSON introuvable dans le ZIP"
)
try:
session = RawSession.load_from_file(json_path)
logger.info(f"RawSession chargée: {session.session_id}")
logger.info(f" - Événements: {len(session.events)}")
logger.info(f" - Screenshots: {len(session.screenshots)}")
logger.info(f" - Utilisateur: {session.user}")
except Exception as e:
logger.error(f"Erreur chargement RawSession: {e}")
raise HTTPException(
status_code=400,
detail=f"Erreur chargement RawSession: {str(e)}"
)
# Sauvegarder avec StorageManager
try:
storage.save_raw_session(session, session_id)
logger.info(f"Session sauvegardée dans StorageManager")
except Exception as e:
logger.warning(f"Erreur sauvegarde StorageManager: {e}")
# Pas bloquant, on continue
# Ajouter à la queue de processing (traitement asynchrone robuste)
try:
from processing_queue import add_to_queue
add_to_queue(session_id, "data/training")
logger.info(f"Session {session_id} ajoutée à la queue de processing")
except Exception as e:
logger.warning(f"Impossible d'ajouter à la queue: {e}")
# Fallback: traitement direct en thread
try:
from processing_pipeline import process_session_async
import threading
def process_in_background():
try:
logger.info(f"Démarrage pipeline processing pour {session_id}")
stats = process_session_async(session_id, "data/training")
logger.info(f"Pipeline terminé: {stats}")
except Exception as e:
logger.error(f"Erreur pipeline: {e}")
thread = threading.Thread(target=process_in_background, daemon=True)
thread.start()
logger.info("Pipeline lancé en arrière-plan (fallback)")
except Exception as e2:
logger.warning(f"Impossible de lancer le pipeline: {e2}")
return JSONResponse({
"status": "success",
"session_id": session_id,
"events_count": len(session.events),
"screenshots_count": len(session.screenshots),
"user": session.user,
"received_at": datetime.now().isoformat()
})
except HTTPException:
raise
except Exception as e:
logger.exception(f"Erreur inattendue: {e}")
raise HTTPException(
status_code=500,
detail=f"Erreur serveur: {str(e)}"
)
@app.get("/api/traces/debug-env")
async def debug_env():
"""Debug endpoint to check environment variables."""
import os
return {
"RPA_TOKEN_ADMIN": os.getenv("RPA_TOKEN_ADMIN", "NOT_SET"),
"RPA_TOKEN_READONLY": os.getenv("RPA_TOKEN_READONLY", "NOT_SET"),
"ADMIN_TOKENS": os.getenv("ADMIN_TOKENS", "NOT_SET"),
"READ_ONLY_TOKENS": os.getenv("READ_ONLY_TOKENS", "NOT_SET"),
}
@app.get("/api/traces/debug-auth")
async def debug_auth():
"""Debug endpoint to check token loading."""
import os
from core.security.api_tokens import get_token_manager
token_manager = get_token_manager()
return {
"environment_vars": {
"RPA_TOKEN_ADMIN": bool(os.getenv("RPA_TOKEN_ADMIN")),
"RPA_TOKEN_READONLY": bool(os.getenv("RPA_TOKEN_READONLY")),
"ADMIN_TOKENS": bool(os.getenv("ADMIN_TOKENS")),
"READ_ONLY_TOKENS": bool(os.getenv("READ_ONLY_TOKENS")),
},
"token_counts": {
"admin_tokens": len(token_manager.admin_tokens),
"read_only_tokens": len(token_manager.read_only_tokens),
},
"admin_tokens_preview": [t[:8] + "..." for t in list(token_manager.admin_tokens)[:3]],
"read_only_tokens_preview": [t[:8] + "..." for t in list(token_manager.read_only_tokens)[:3]],
}
@app.get("/api/traces/status")
async def get_status():
"""Status du serveur."""
return {
"status": "online",
"version": "1.0.0",
"upload_dir": str(UPLOAD_DIR),
"sessions_dir": str(SESSIONS_DIR),
"encryption_enabled": ENCRYPTION_PASSWORD != "rpa_vision_v3_default_key"
}
@app.get("/api/traces/sessions")
async def list_sessions():
"""Liste des sessions reçues."""
sessions = []
for session_dir in SESSIONS_DIR.iterdir():
if session_dir.is_dir():
json_files = list(session_dir.glob("*/*.json"))
if json_files:
json_path = json_files[0]
try:
session = RawSession.load_from_file(json_path)
sessions.append({
"session_id": session.session_id,
"started_at": session.started_at.isoformat(),
"ended_at": session.ended_at.isoformat() if session.ended_at else None,
"events_count": len(session.events),
"screenshots_count": len(session.screenshots),
"user": session.user
})
except Exception as e:
logger.warning(f"Erreur lecture session {session_dir.name}: {e}")
return {
"sessions": sessions,
"total": len(sessions)
}
@app.get("/")
async def root():
"""Page d'accueil."""
return {
"message": "RPA Vision V3 - Agent Upload API",
"endpoints": {
"upload": "POST /api/traces/upload",
"status": "GET /api/traces/status",
"sessions": "GET /api/traces/sessions"
}
}
@app.get("/healthz")
async def healthz():
"""Healthcheck ultra simple (k8s/systemd timers)."""
# Note: on évite les dépendances lourdes ici. C'est un ping.
return {
"status": "ok",
"service": "rpa-vision-v3-api",
"environment": ENVIRONMENT,
"worker_mode": PROCESSING_WORKER_MODE,
}
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint (public)."""
try:
return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
except Exception as e:
# On ne veut jamais faire tomber l'API si Prometheus se plante.
return Response(content=f"# error: {e}\n", media_type="text/plain", status_code=500)
@app.on_event("startup")
async def startup_event():
"""Démarrage du serveur - lance le worker de processing."""
if PROCESSING_WORKER_MODE != "thread":
logger.info(f"Processing worker disabled for API (mode={PROCESSING_WORKER_MODE})")
return
try:
from processing_queue import start_processing_worker
from processing_pipeline import process_session_async
start_processing_worker(process_session_async)
logger.info("Worker de processing démarré (thread mode)")
except Exception as e:
logger.warning(f"Impossible de démarrer le worker: {e}")
@app.on_event("shutdown")
async def shutdown_event():
"""Arrêt du serveur - arrête le worker proprement."""
if PROCESSING_WORKER_MODE != "thread":
return
try:
from processing_queue import stop_processing_worker
stop_processing_worker()
logger.info("Worker de processing arrêté")
except Exception as e:
logger.warning(f"Erreur arrêt worker: {e}")
@app.get("/api/traces/queue")
async def get_queue_status():
"""Status de la queue de processing."""
try:
from processing_queue import get_queue
queue = get_queue()
all_items = queue.get_all()
return {
"total": len(all_items),
"pending": sum(1 for i in all_items if i["status"] == "pending"),
"processing": sum(1 for i in all_items if i["status"] == "processing"),
"completed": sum(1 for i in all_items if i["status"] == "completed"),
"failed": sum(1 for i in all_items if i["status"] == "failed"),
"items": all_items[-20:] # 20 dernières
}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
# Valider la sécurité (permissif en développement)
from core.security import validate_production_security, get_security_config, is_production_environment
try:
config = get_security_config()
# En production: strict=True (bloque si erreur)
# En développement: strict=False (warning seulement, clés auto-générées)
validate_production_security(config, strict=is_production_environment())
logger.info("✅ Security validation passed")
except Exception as e:
logger.error(f"Security validation failed: {e}")
if is_production_environment():
logger.error("Cannot start in production with insecure configuration")
sys.exit(1)
else:
logger.warning("⚠️ Continuing in development mode despite security warnings")
# Initialiser le système de cleanup
from core.system import initialize_system_cleanup, shutdown_system
initialize_system_cleanup()
logger.info("Démarrage du serveur API...")
logger.info(f"Upload dir: {UPLOAD_DIR.absolute()}")
logger.info(f"Sessions dir: {SESSIONS_DIR.absolute()}")
logger.info(f"Encryption password: {'***' if ENCRYPTION_PASSWORD != 'rpa_vision_v3_default_key' else 'DEFAULT (changer!)'}")
try:
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
log_level="info"
)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, shutting down...")
shutdown_system()
except Exception as e:
logger.error(f"Server error: {e}")
shutdown_system()
raise