Files
rpa_vision_v3/core/security/flask_security.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

257 lines
8.7 KiB
Python

"""core/security/flask_security.py
Fiche #23 - Sécurité pour applis Flask/Flask-SocketIO
Fiche #24 - Observabilité (métriques HTTP + request id)
But:
- Protéger /api/* via tokens (admin vs read-only)
- IP allowlist
- Rate limit
- Audit log
- Kill-switch + DEMO_SAFE
- Enregistrer des métriques Prometheus (y compris sur blocage)
- Ajouter un header X-Request-Id
UX:
- Permet de poser le token en cookie via '?token=...' sur la page d'accueil.
- Laisse l'HTML et les assets accessibles (mais l'API nécessite token).
Env:
- mêmes variables que FastAPI (voir core/security/fastapi_security.py)
- RPA_SERVICE_NAME (labels Prometheus)
"""
from __future__ import annotations
import os
import time
import uuid
from typing import Optional
from flask import request, jsonify, redirect, make_response, g
from .api_tokens import (
TokenRole,
auth_required,
can_read,
can_write,
classify_request_simple as classify_request,
)
from .ip_allowlist import IPAllowlist
from .rate_limiter import RateLimiter
from .audit_log import AuditLogger
from core.system.safety_switch import demo_safe_enabled, kill_switch_enabled
# Fiche #24 metrics
from core.monitoring.http_server_metrics import (
record_http_request,
in_flight_inc,
in_flight_dec,
record_security_block,
)
DEFAULT_PUBLIC_PREFIXES = (
"/static/",
"/assets/",
"/favicon.ico",
)
DEFAULT_PUBLIC_PATHS = {
"/",
"/health",
"/healthz",
"/metrics",
}
def _client_ip(trust_proxy: bool) -> str:
"""Extrait l'IP client en tenant compte des proxies."""
if trust_proxy:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
return forwarded.split(",")[0].strip()
real_ip = request.headers.get("X-Real-IP")
if real_ip:
return real_ip.strip()
return request.remote_addr or "unknown"
def _is_read_only_method(method: str) -> bool:
"""Vérifie si la méthode HTTP est en lecture seule."""
return method.upper() in {"GET", "HEAD", "OPTIONS"}
def _is_public(path: str) -> bool:
"""Vérifie si le path est public (pas d'auth/RL)."""
if path in DEFAULT_PUBLIC_PATHS:
return True
return any(path.startswith(p) for p in DEFAULT_PUBLIC_PREFIXES)
def install_flask_security(app, protect_api_prefix: str = "/api/", service_name: Optional[str] = None) -> None:
"""Installe la sécurité sur une app Flask.
- protège seulement les routes commençant par protect_api_prefix (par défaut /api/)
- laisse l'HTML et les assets accessibles (mais l'API nécessite token)
- ajoute X-Request-Id et métriques HTTP
Args:
app: Flask app
protect_api_prefix: préfixe API à protéger
service_name: label Prometheus (par défaut: RPA_SERVICE_NAME ou 'rpa-vision-v3-dashboard')
"""
allowlist = IPAllowlist.from_env()
limiter = RateLimiter.from_env()
audit = AuditLogger.from_env()
svc = service_name or os.getenv("RPA_SERVICE_NAME") or "rpa-vision-v3-dashboard"
@app.before_request
def _observability_before_request():
# request id (propagation si déjà présent)
rid = request.headers.get("X-Request-Id") or str(uuid.uuid4())
g.rpa_request_id = rid
g.rpa_start_time = time.monotonic()
in_flight_inc(svc)
@app.after_request
def _observability_after_request(response):
# X-Request-Id
rid = getattr(g, "rpa_request_id", None)
if rid:
response.headers["X-Request-Id"] = rid
# Metrics (même pour réponses générées dans before_request)
try:
start = getattr(g, "rpa_start_time", None)
duration = (time.monotonic() - start) if start else 0.0
# template: url_rule.rule si dispo, sinon path raw
path_template = None
try:
if request.url_rule is not None:
path_template = request.url_rule.rule
except Exception:
path_template = None
record_http_request(
service=svc,
method=request.method,
path_template=path_template or (request.path or "/"),
status_code=getattr(response, "status_code", 200),
duration_seconds=duration,
)
except Exception:
# best-effort
pass
finally:
try:
in_flight_dec(svc)
except Exception:
pass
return response
@app.before_request
def _security_before_request():
"""Middleware de sécurité principal."""
path = request.path
method = request.method
# Public (assets, health, metrics) : pas de sécurité
if _is_public(path):
return None
# API seulement (par défaut /api/*)
if not path.startswith(protect_api_prefix):
return None
# IP allowlist (early)
client_ip = _client_ip(allowlist.trust_proxy)
if allowlist.enabled and not allowlist.is_allowed(client_ip):
record_security_block(svc, "ip_block")
audit.write({"event": "ip_block", "ip": client_ip, "method": method, "path": path})
return jsonify({"error": "forbidden"}), 403
# Auth
ctx, source = classify_request(
headers=dict(request.headers),
cookies=dict(request.cookies),
query_params=dict(request.args),
)
if auth_required():
if ctx.role == TokenRole.ANON:
record_security_block(svc, "auth_fail")
audit.write({
"event": "auth_fail",
"ip": client_ip,
"method": method,
"path": path,
"source": source,
"token_present": ctx.token_present,
})
return jsonify({"error": "unauthorized"}), 401
# Kill-switch
if kill_switch_enabled():
# Bloque tout sauf admin/security (permet de désactiver) et health.
if not path.startswith("/admin/security"):
record_security_block(svc, "killswitch")
audit.write({"event": "killswitch_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role})
return jsonify({"error": "killswitch_enabled"}), 423
if demo_safe_enabled():
# Exception: autorise GET /admin/security/status
if path.startswith("/admin/"):
if not (path == "/admin/security/status" and _is_read_only_method(method)):
record_security_block(svc, "demo_safe")
audit.write({"event": "demo_safe_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role})
return jsonify({"error": "demo_safe"}), 423
if not _is_read_only_method(method):
record_security_block(svc, "demo_safe")
audit.write({"event": "demo_safe_block", "ip": client_ip, "method": method, "path": path, "role": ctx.role})
return jsonify({"error": "demo_safe"}), 423
# RBAC
if _is_read_only_method(method):
if auth_required() and not can_read(ctx.role):
record_security_block(svc, "forbidden")
return jsonify({"error": "forbidden"}), 403
else:
if auth_required() and not can_write(ctx.role):
record_security_block(svc, "forbidden")
return jsonify({"error": "forbidden"}), 403
# RL
rl_key = ctx.token_hash if ctx.token_hash else client_ip
allowed, retry_after = limiter.check(rl_key)
if not allowed:
record_security_block(svc, "rate_limit")
audit.write({"event": "rate_limit", "ip": client_ip, "method": method, "path": path, "role": ctx.role, "retry_after_s": retry_after})
resp = jsonify({"error": "rate_limited"})
resp.status_code = 429
resp.headers["Retry-After"] = str(int(max(1, retry_after)))
return resp
# Succès : on laisse passer
return None
def handle_token_in_url(app) -> None:
"""Permet de passer le token via ?token=... et le stocker en cookie."""
@app.route("/")
def _index_with_token():
token = request.args.get("token")
if token:
# Stocker en cookie et rediriger sans le token dans l'URL
resp = make_response(redirect("/"))
resp.set_cookie("rpa_token", token, httponly=True, secure=False, samesite="Lax")
return resp
# Page normale
return """
<h1>RPA Vision V3 Dashboard</h1>
<p><a href="/admin/">Admin Panel</a></p>
<p><a href="/metrics">Metrics</a></p>
"""