Files
rpa_vision_v3/tests/unit/test_api_stream_revocation_gaps.py

351 lines
12 KiB
Python

"""
Tests des gaps de revocation fleet sur agent_v0/server_v1/api_stream.py.
Couvre :
1. test_result_guard_without_pending — le garde est appliqué sur /replay/result
meme sans _retry_pending (garde inconditionnel).
2. test_finalize_revoked_agent — enroll + revoke + finalize → 403
3. test_finalize_unknown_machine_registered — registre avec agents →
machine_id inconnu → 403
4. test_guard_default_machine_id_registered — registre avec agents →
machine_id="default" → 403
"""
from __future__ import annotations
import sys
import time
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
ROOT = str(Path(__file__).resolve().parents[2])
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
@pytest.fixture
def isolated_fleet_state(monkeypatch, tmp_path):
"""Fixture qui isole le registre AgentRegistry et les structures replay."""
monkeypatch.setenv("RPA_API_TOKEN", "test_revocation_gaps_token")
from agent_v0.server_v1 import api_stream
from agent_v0.server_v1.agent_registry import AgentRegistry
# Aligner le token attendu par le middleware
monkeypatch.setattr(api_stream, "API_TOKEN", "test_revocation_gaps_token")
# Substituer le registre global par une instance dediee au test
original_registry = api_stream.agent_registry
test_registry = AgentRegistry(db_path=str(tmp_path / "test_agents.db"))
monkeypatch.setattr(api_stream, "agent_registry", test_registry)
# Sauver et nettoyer les structures replay
saved_states = dict(api_stream._replay_states)
api_stream._replay_states.clear()
_auth_headers = {"Authorization": "Bearer test_revocation_gaps_token"}
yield api_stream, test_registry, _auth_headers
# Restauration
api_stream._replay_states.clear()
api_stream._replay_states.update(saved_states)
monkeypatch.setattr(api_stream, "agent_registry", original_registry)
# ---------------------------------------------------------------------------
# Test 1 : /replay/result — le garde est appliqué sans _retry_pending
# ---------------------------------------------------------------------------
def test_result_guard_without_pending(isolated_fleet_state, monkeypatch):
"""Le garde sur /replay/result s'applique meme sans entrée dans _retry_pending.
Scénario : un agent enrolle et actif envoie un rapport de résultat pour
une action qui n'est pas dans _retry_pending (cas nominal, pas un retry).
Le garde doit être appelé et laisser passer car l'agent est actif.
Si l'agent est révoqué, le même rapport doit être bloqué (403).
"""
api_stream, registry, auth_headers = isolated_fleet_state
# Enroller un agent actif
registry.enroll(
machine_id="test-machine-result-guard",
user_name="Test User",
hostname="TEST-HOST",
)
# Forger une session avec machine_id pour que le garde puisse le résoudre
from agent_v0.server_v1.live_session_manager import LiveSessionState
session = LiveSessionState(
session_id="sess-result-guard",
machine_id="test-machine-result-guard",
)
monkeypatch.setattr(
api_stream.processor.session_manager,
"get_session",
lambda sid: session if sid == "sess-result-guard" else None,
)
from fastapi.testclient import TestClient
client = TestClient(api_stream.app, raise_server_exceptions=False)
# Rapport sans _retry_pending — le garde doit quand meme s'appliquer
# et laisser passer car l'agent est actif
resp = client.post(
"/api/v1/traces/stream/replay/result",
json={
"session_id": "sess-result-guard",
"action_id": "act-no-retry",
"success": True,
},
headers=auth_headers,
)
# Ne doit PAS etre 403 (agent actif)
assert resp.status_code != 403, (
f"/replay/result a été bloqué (403) pour un agent actif sans retry. "
f"Body: {resp.text}"
)
# Maintenant révoquer l'agent
registry.uninstall(
machine_id="test-machine-result-guard",
reason="admin_revoke",
)
# Le meme rapport doit maintenant être bloqué
resp = client.post(
"/api/v1/traces/stream/replay/result",
json={
"session_id": "sess-result-guard",
"action_id": "act-no-retry-2",
"success": True,
},
headers=auth_headers,
)
assert resp.status_code == 403, (
f"/replay/result DOIT être bloqué (403) pour un agent révoqué. "
f"Body: {resp.text}"
)
# ---------------------------------------------------------------------------
# Test 2 : /finalize — enroll + revoke + finalize → 403
# ---------------------------------------------------------------------------
def test_finalize_revoked_agent(isolated_fleet_state, monkeypatch):
"""Un agent enrolle puis révoqué doit être bloqué sur /finalize."""
api_stream, registry, auth_headers = isolated_fleet_state
# Enroller un agent
registry.enroll(
machine_id="test-machine-revoked-finalize",
user_name="Test User",
hostname="TEST-HOST",
)
# Le révoquer
registry.uninstall(
machine_id="test-machine-revoked-finalize",
reason="admin_revoke",
)
# Forger une session pour que le finalize trouve la session
from agent_v0.server_v1.live_session_manager import LiveSessionState
session = LiveSessionState(
session_id="sess-revoked-finalize",
machine_id="test-machine-revoked-finalize",
)
monkeypatch.setattr(
api_stream.processor.session_manager,
"get_session",
lambda sid: session if sid == "sess-revoked-finalize" else None,
)
# finalize() appelle aussi finalize() sur le session_manager — mock pour éviter I/O
monkeypatch.setattr(
api_stream.processor.session_manager,
"finalize",
lambda sid: None,
)
monkeypatch.setattr(api_stream.processor, "_find_session_dir", lambda sid: None)
from fastapi.testclient import TestClient
client = TestClient(api_stream.app, raise_server_exceptions=False)
resp = client.post(
"/api/v1/traces/stream/finalize",
params={"session_id": "sess-revoked-finalize"},
headers=auth_headers,
)
assert resp.status_code == 403, (
f"/finalize DOIT être bloqué (403) pour un agent révoqué. "
f"Body: {resp.text}"
)
# ---------------------------------------------------------------------------
# Test 3 : /finalize — machine_id inconnu avec registre non vide → 403
# ---------------------------------------------------------------------------
def test_finalize_unknown_machine_registered(isolated_fleet_state, monkeypatch):
"""Quand le registre contient au moins un agent, un machine_id inconnu → 403."""
api_stream, registry, auth_headers = isolated_fleet_state
# Enroller un agent (le registre n'est donc pas vide)
registry.enroll(
machine_id="known-machine-xyz",
user_name="Known User",
hostname="KNOWN-HOST",
)
# Forger une session avec un machine_id inconnu du registre
from agent_v0.server_v1.live_session_manager import LiveSessionState
session = LiveSessionState(
session_id="sess-unknown-finalize",
machine_id="unknown-machine-abc", # Pas dans le registre
)
monkeypatch.setattr(
api_stream.processor.session_manager,
"get_session",
lambda sid: session if sid == "sess-unknown-finalize" else None,
)
monkeypatch.setattr(
api_stream.processor.session_manager,
"finalize",
lambda sid: None,
)
monkeypatch.setattr(api_stream.processor, "_find_session_dir", lambda sid: None)
from fastapi.testclient import TestClient
client = TestClient(api_stream.app, raise_server_exceptions=False)
resp = client.post(
"/api/v1/traces/stream/finalize",
params={"session_id": "sess-unknown-finalize"},
headers=auth_headers,
)
assert resp.status_code == 403, (
f"/finalize DOIT être bloqué (403) pour un machine_id inconnu "
f"quand le registre contient des agents. Body: {resp.text}"
)
body = resp.json()
assert body.get("detail", {}).get("error") == "agent_unknown", (
f"Erreur attendue 'agent_unknown', obtenu: {body}"
)
# ---------------------------------------------------------------------------
# Test 4 : _guard_agent_registry_access — machine_id="default" avec registre → 403
# ---------------------------------------------------------------------------
def test_guard_default_machine_id_registered(isolated_fleet_state):
"""Quand le registre contient des agents, machine_id='default' → 403."""
api_stream, registry, _auth_headers = isolated_fleet_state
# Enroller un agent (le registre n'est donc pas vide)
registry.enroll(
machine_id="some-enrolled-agent",
user_name="Enrolled User",
hostname="ENROLLED-HOST",
)
from fastapi import HTTPException
# Appel direct du garde avec machine_id="default"
with pytest.raises(HTTPException) as exc_info:
api_stream._guard_agent_registry_access(
"default",
endpoint="/api/v1/traces/stream/finalize",
)
assert exc_info.value.status_code == 403
detail = exc_info.value.detail
assert detail.get("error") == "agent_enrollment_required", (
f"Erreur attendue 'agent_enrollment_required', obtenu: {detail}"
)
# Idem avec machine_id="" (vide)
with pytest.raises(HTTPException) as exc_info:
api_stream._guard_agent_registry_access(
"",
endpoint="/api/v1/traces/stream/event",
)
assert exc_info.value.status_code == 403
assert exc_info.value.detail.get("error") == "agent_enrollment_required"
# Idem avec machine_id=None
with pytest.raises(HTTPException) as exc_info:
api_stream._guard_agent_registry_access(
None,
endpoint="/api/v1/traces/stream/event",
)
assert exc_info.value.status_code == 403
assert exc_info.value.detail.get("error") == "agent_enrollment_required"
# ---------------------------------------------------------------------------
# Test 5 : /replay-session — enroll + revoke → 403
# ---------------------------------------------------------------------------
def test_replay_session_revoked_agent(isolated_fleet_state, monkeypatch):
"""Un agent révoqué ne doit pas pouvoir lancer un replay-session."""
api_stream, registry, auth_headers = isolated_fleet_state
registry.enroll(
machine_id="test-machine-replay-session",
user_name="Test User",
hostname="TEST-HOST",
)
registry.uninstall(
machine_id="test-machine-replay-session",
reason="admin_revoke",
)
from fastapi.testclient import TestClient
client = TestClient(api_stream.app, raise_server_exceptions=False)
resp = client.post(
"/api/v1/traces/stream/replay-session",
params={
"session_id": "sess-some-replay",
"machine_id": "test-machine-replay-session",
},
headers=auth_headers,
)
assert resp.status_code == 403, (
f"/replay-session DOIT être bloqué (403) pour un agent révoqué. "
f"Body: {resp.text}"
)
def test_replay_session_unknown_machine_registered(isolated_fleet_state):
"""Quand le registre contient des agents, machine_id inconnu sur replay-session → 403."""
api_stream, registry, auth_headers = isolated_fleet_state
registry.enroll(
machine_id="known-machine-replay",
user_name="Known User",
hostname="KNOWN-HOST",
)
from fastapi.testclient import TestClient
client = TestClient(api_stream.app, raise_server_exceptions=False)
resp = client.post(
"/api/v1/traces/stream/replay-session",
params={
"session_id": "sess-some-replay",
"machine_id": "unknown-machine-replay",
},
headers=auth_headers,
)
assert resp.status_code == 403
assert resp.json().get("detail", {}).get("error") == "agent_unknown"