feat(security): API streaming fail-closed + /image privé + target_memory prefix fix
P0-B — /api/v1/traces/stream/image retiré de _PUBLIC_PATHS : - Bearer token obligatoire pour upload d'image - Évite uploads anonymes de contenu arbitraire P0-C — Fail-closed si RPA_API_TOKEN absent : - sys.exit(1) au démarrage avec message fatal - Mode dev : RPA_AUTH_DISABLED=true pour désactiver explicitement - Log INFO des 8 premiers chars du token (diagnostic) Fix target_memory prefix empilé : - Strip "memory_" répétés avant stockage dans replay_memory.py - Évite "memory_memory_memory_template_matching" en base live_session_manager : améliorations mineures de la gestion sessions. 10 tests auth API stream. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
171
tests/unit/test_api_stream_auth_p0bc.py
Normal file
171
tests/unit/test_api_stream_auth_p0bc.py
Normal file
@@ -0,0 +1,171 @@
|
||||
"""
|
||||
Tests des Fix P0-B et P0-C sur agent_v0/server_v1/api_stream.py.
|
||||
|
||||
P0-B : /api/v1/traces/stream/image n'est PLUS dans _PUBLIC_PATHS.
|
||||
L'upload d'image exige désormais un Bearer token.
|
||||
|
||||
P0-C : Si RPA_API_TOKEN est absent ET RPA_AUTH_DISABLED ≠ true,
|
||||
le module DOIT refuser de se charger (sys.exit 1).
|
||||
En mode dev (RPA_AUTH_DISABLED=true), pas de crash mais log warning.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent.parent))
|
||||
|
||||
|
||||
def _reload_api_stream():
|
||||
"""Recharge le module api_stream pour appliquer les nouvelles env vars."""
|
||||
mod_name = "agent_v0.server_v1.api_stream"
|
||||
if mod_name in sys.modules:
|
||||
del sys.modules[mod_name]
|
||||
return importlib.import_module(mod_name)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fix P0-B : /image n'est plus public
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestImageEndpointNotPublic:
|
||||
"""Fix P0-B : /api/v1/traces/stream/image exige un Bearer token."""
|
||||
|
||||
def test_image_path_removed_from_public_paths(self, monkeypatch):
|
||||
"""Vérifier que la constante _PUBLIC_PATHS ne contient plus /image."""
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "deadbeef" * 4)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
mod = _reload_api_stream()
|
||||
assert "/api/v1/traces/stream/image" not in mod._PUBLIC_PATHS, (
|
||||
"L'endpoint d'upload d'image NE doit PAS être public — il accepte "
|
||||
"des bytes arbitraires et déclenche du travail VLM côté serveur."
|
||||
)
|
||||
|
||||
def test_health_still_public(self, monkeypatch):
|
||||
"""/health reste public (monitoring)."""
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "deadbeef" * 4)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
mod = _reload_api_stream()
|
||||
assert "/health" in mod._PUBLIC_PATHS
|
||||
|
||||
def test_replay_next_still_public(self, monkeypatch):
|
||||
"""/replay/next reste public (legacy agent Rust polling)."""
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "deadbeef" * 4)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
mod = _reload_api_stream()
|
||||
assert "/api/v1/traces/stream/replay/next" in mod._PUBLIC_PATHS
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fix P0-C : fail-closed si pas de token
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFailClosedTokenP0C:
|
||||
"""Fix P0-C : RPA_API_TOKEN absent → sys.exit (pas de génération silencieuse)."""
|
||||
|
||||
def test_no_token_no_disable_exits(self, monkeypatch):
|
||||
"""Sans RPA_API_TOKEN ET sans RPA_AUTH_DISABLED → SystemExit(1)."""
|
||||
monkeypatch.delenv("RPA_API_TOKEN", raising=False)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_reload_api_stream()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_empty_token_no_disable_exits(self, monkeypatch):
|
||||
"""Token explicitement vide → SystemExit (pas généré aléatoirement)."""
|
||||
monkeypatch.setenv("RPA_API_TOKEN", " ") # whitespace, strippé
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
with pytest.raises(SystemExit) as exc_info:
|
||||
_reload_api_stream()
|
||||
assert exc_info.value.code == 1
|
||||
|
||||
def test_no_token_with_disable_succeeds(self, monkeypatch):
|
||||
"""Sans token MAIS RPA_AUTH_DISABLED=true → chargement OK (mode dev)."""
|
||||
monkeypatch.delenv("RPA_API_TOKEN", raising=False)
|
||||
monkeypatch.setenv("RPA_AUTH_DISABLED", "true")
|
||||
# Doit pas crash
|
||||
mod = _reload_api_stream()
|
||||
assert mod._AUTH_DISABLED is True
|
||||
# API_TOKEN existe toujours (généré pour cohérence interne, jamais utilisé)
|
||||
assert mod.API_TOKEN, "Un token interne est toujours défini en mode dev"
|
||||
|
||||
def test_token_present_logs_prefix(self, monkeypatch, caplog):
|
||||
"""Avec un token valide, le module log les 8 premiers caractères."""
|
||||
import logging
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "abcdef0123456789" * 2)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
with caplog.at_level(logging.INFO, logger="api_stream"):
|
||||
mod = _reload_api_stream()
|
||||
# Le log INFO contient le préfixe (8 premiers chars)
|
||||
assert mod.API_TOKEN == "abcdef0123456789" * 2
|
||||
# Au moins une trace contient "abcdef01" (préfixe)
|
||||
log_text = " ".join(r.getMessage() for r in caplog.records)
|
||||
assert "abcdef01" in log_text or "Token API chargé" in log_text
|
||||
|
||||
def test_verify_token_bypass_when_disabled(self, monkeypatch):
|
||||
"""Mode dev : _verify_token doit laisser passer sans header."""
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
monkeypatch.delenv("RPA_API_TOKEN", raising=False)
|
||||
monkeypatch.setenv("RPA_AUTH_DISABLED", "true")
|
||||
mod = _reload_api_stream()
|
||||
|
||||
# Forger une requête sans header sur un endpoint normalement protégé
|
||||
req = MagicMock()
|
||||
req.url.path = "/api/v1/traces/stream/event"
|
||||
req.headers = {}
|
||||
# Ne doit pas raise
|
||||
asyncio.get_event_loop().run_until_complete(mod._verify_token(req))
|
||||
|
||||
def test_verify_token_rejects_missing_header(self, monkeypatch):
|
||||
"""Auth activée : pas de header → HTTPException 401."""
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi import HTTPException
|
||||
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "validtoken" * 4)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
mod = _reload_api_stream()
|
||||
|
||||
req = MagicMock()
|
||||
req.url.path = "/api/v1/traces/stream/image" # Désormais protégé (P0-B)
|
||||
req.headers = {}
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
asyncio.get_event_loop().run_until_complete(mod._verify_token(req))
|
||||
assert exc_info.value.status_code == 401
|
||||
|
||||
def test_verify_token_rejects_image_without_bearer(self, monkeypatch):
|
||||
"""P0-B + P0-C : POST /image sans token → 401 (l'endpoint n'est plus public)."""
|
||||
import asyncio
|
||||
from unittest.mock import MagicMock
|
||||
from fastapi import HTTPException
|
||||
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "validtoken" * 4)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
mod = _reload_api_stream()
|
||||
|
||||
req = MagicMock()
|
||||
req.url.path = "/api/v1/traces/stream/image"
|
||||
req.headers = {"Authorization": "Bearer wrong-token"}
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
asyncio.get_event_loop().run_until_complete(mod._verify_token(req))
|
||||
assert exc_info.value.status_code == 401
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _cleanup(monkeypatch):
|
||||
"""Nettoie l'environnement entre les tests pour éviter la pollution."""
|
||||
yield
|
||||
# Recharger avec un token bidon pour ne pas casser les autres suites
|
||||
monkeypatch.setenv("RPA_API_TOKEN", "cleanup-token" * 3)
|
||||
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
||||
try:
|
||||
_reload_api_stream()
|
||||
except SystemExit:
|
||||
pass
|
||||
Reference in New Issue
Block a user