"""core/security/fastapi_security.py Fiche #23 - Middleware FastAPI (auth + allowlist + rate limit + audit + kill-switch + demo-safe) Fiche #24 - Observabilité (métriques HTTP + request id) Intégration: from core.security.fastapi_security import install_security_middlewares install_security_middlewares(app) Variables d'environnement: - RPA_AUTH_REQUIRED / RPA_ADMIN_TOKEN / RPA_READ_TOKEN - RPA_IP_ALLOWLIST / RPA_IP_TRUST_PROXY - RPA_RL_RPS / RPA_RL_BURST - RPA_AUDIT_DIR / RPA_AUDIT_ENABLED - DEMO_SAFE / RPA_KILL_SWITCH (ou RPA_KILL_SWITCH_FILE) - RPA_SERVICE_NAME (labels Prometheus) Notes: - /metrics, /healthz, / sont publics pour ne pas casser systemd/Prometheus. - Les métriques sont enregistrées aussi pour les requêtes bloquées (401/403/423/429) """ from __future__ import annotations import os import time import uuid from typing import Callable from fastapi import Request from fastapi.responses import JSONResponse from .api_tokens import ( TokenRole, auth_required, can_read, can_write, classify_request_simple as classify_request, ) from .ip_allowlist import IPAllowlist from .rate_limiter import RateLimiter from .audit_log import AuditLogger from core.system.safety_switch import demo_safe_enabled, kill_switch_enabled # Fiche #24 - métriques HTTP from core.monitoring.http_server_metrics import ( record_http_request, in_flight_inc, in_flight_dec, record_security_block, safe_template, ) DEFAULT_PUBLIC_PATHS = { "/healthz", "/metrics", "/", "/docs", "/redoc", "/openapi.json", "/api/traces/debug-auth", # Debug endpoint "/api/traces/debug-env", # Debug endpoint } def _client_ip_from_request(request: Request, trust_proxy: bool) -> str: """Extrait l'IP client en tenant compte des proxies.""" if trust_proxy: forwarded = request.headers.get("x-forwarded-for") if forwarded: return forwarded.split(",")[0].strip() real_ip = request.headers.get("x-real-ip") if real_ip: return real_ip.strip() return request.client.host if request.client else "unknown" def _is_public(request: Request) -> bool: """Vérifie si l'endpoint est public (pas d'auth/RL).""" path = request.url.path if path in DEFAULT_PUBLIC_PATHS: return True # Exact match uniquement pour éviter les bypasses return False def _route_template(request: Request) -> str: route = request.scope.get("route") template = getattr(route, "path", None) return safe_template(template, request.url.path) def _ensure_request_id(request: Request) -> str: # Si un reverse proxy injecte déjà un ID, on le conserve. rid = request.headers.get("x-request-id") if rid: return rid.strip() return uuid.uuid4().hex def install_security_middlewares(app) -> None: """Installe le middleware de sécurité sur une app FastAPI.""" allowlist = IPAllowlist.from_env() limiter = RateLimiter.from_env() audit = AuditLogger.from_env() service_name = os.getenv("RPA_SERVICE_NAME", "rpa-vision-v3-api") @app.middleware("http") async def _security_middleware(request: Request, call_next: Callable): path = request.url.path method = request.method.upper() path_tpl = _route_template(request) request_id = _ensure_request_id(request) # exposé aux handlers applicatifs si besoin request.state.request_id = request_id t0 = time.monotonic() in_flight_inc(service_name) def _finalize(resp: JSONResponse) -> JSONResponse: # attache toujours le request id try: resp.headers["X-Request-Id"] = request_id except Exception: pass duration = time.monotonic() - t0 status = getattr(resp, "status_code", 0) or 0 record_http_request(service_name, method, path_tpl, status, duration) in_flight_dec(service_name) return resp # --- IP allowlist (early) --- client_ip = _client_ip_from_request(request, allowlist.trust_proxy) if allowlist.enabled and not allowlist.is_allowed(client_ip): audit.write({ "event": "ip_block", "ip": client_ip, "method": method, "path": path, "request_id": request_id, }) record_security_block(service_name, "ip_block") return _finalize(JSONResponse(status_code=403, content={"error": "forbidden"})) # Public endpoints (health/metrics) : pas d'auth, pas de RL (pour ne pas casser systemd) if _is_public(request): try: response = await call_next(request) response.headers["X-Request-Id"] = request_id return _finalize(response) except Exception: # call_next a pu lever: on compte 500 record_http_request(service_name, method, path_tpl, 500, time.monotonic() - t0) in_flight_dec(service_name) raise # --- Auth --- ctx, source = classify_request( headers=dict(request.headers), cookies=dict(request.cookies), query_params=dict(request.query_params), ) if auth_required(): if ctx.role == TokenRole.ANON: audit.write({ "event": "auth_fail", "ip": client_ip, "method": method, "path": path, "source": source, "token_present": ctx.token_present, "request_id": request_id, }) record_security_block(service_name, "auth_fail") return _finalize(JSONResponse(status_code=401, content={"error": "unauthorized"})) # --- Kill-switch / DEMO_SAFE --- if kill_switch_enabled(): # Bloque tout sauf admin/security (permet de désactiver) if not path.startswith("/admin/security"): audit.write({ "event": "killswitch_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role, "request_id": request_id, }) record_security_block(service_name, "killswitch") return _finalize(JSONResponse(status_code=423, content={"error": "killswitch_enabled"})) if demo_safe_enabled(): # En mode démo : pas d'écriture, et on bloque explicitement l'admin. if path.startswith("/admin/"): # Exception: autorise GET /admin/security/status if not (path == "/admin/security/status" and method == "GET"): audit.write({ "event": "demo_safe_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role, "request_id": request_id, }) record_security_block(service_name, "demo_safe") return _finalize(JSONResponse(status_code=423, content={"error": "demo_safe"})) if not _is_read_only_method(method): audit.write({ "event": "demo_safe_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role, "request_id": request_id, }) record_security_block(service_name, "demo_safe") return _finalize(JSONResponse(status_code=423, content={"error": "demo_safe"})) # --- RBAC simple --- if _is_read_only_method(method): if auth_required() and not can_read(ctx.role): record_security_block(service_name, "forbidden_read") return _finalize(JSONResponse(status_code=403, content={"error": "forbidden"})) else: if auth_required() and not can_write(ctx.role): record_security_block(service_name, "forbidden_write") return _finalize(JSONResponse(status_code=403, content={"error": "forbidden"})) # --- Rate limit --- # Key = token (si valide) sinon IP rl_key = ctx.token_hash if ctx.token_hash else client_ip allowed, retry_after = limiter.check(rl_key) if not allowed: audit.write({ "event": "rate_limit", "ip": client_ip, "method": method, "path": path, "role": ctx.role, "retry_after_s": retry_after, "request_id": request_id, }) record_security_block(service_name, "rate_limit") resp = JSONResponse(status_code=429, content={"error": "rate_limited"}) resp.headers["Retry-After"] = str(int(max(1, retry_after))) return _finalize(resp) # --- Exécution --- response = await call_next(request) # Audit des succès audit.write({ "event": "request_success", "ip": client_ip, "method": method, "path": path, "status": getattr(response, "status_code", None), "request_id": request_id, }) # Note: finalize ajoute header + enregistre métriques response.headers["X-Request-Id"] = request_id return _finalize(response) def _is_read_only_method(method: str) -> bool: """Vérifie si la méthode HTTP est en lecture seule.""" return method.upper() in {"GET", "HEAD", "OPTIONS"}