Files
rpa_vision_v3/tests/security/test_persist_auth.py

213 lines
7.6 KiB
Python

"""Tests securite /api/v1/lea/competences/candidate/persist.
Specs §6 :
- Token Bearer obligatoire
- Couplage machine_id (via guard fleet)
- Rate limit 10/min/machine_id
- Path traversal interdit (slug strict)
- PII detection (regle d'or HDS)
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
pytestmark = pytest.mark.security
_TEST_API_TOKEN = "test_persist_security_token_xyz"
@pytest.fixture
def persist_client(monkeypatch, tmp_path):
monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN)
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "agents.db"))
from fastapi.testclient import TestClient
from agent_v0.server_v1 import api_stream
from agent_v0.server_v1.agent_registry import AgentRegistry
from core.competences import persist as P
monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN)
test_registry = AgentRegistry(db_path=str(tmp_path / "agents.db"))
monkeypatch.setattr(api_stream, "agent_registry", test_registry)
candidate_dir = tmp_path / "competences" / "candidate"
candidate_dir.mkdir(parents=True, exist_ok=True)
monkeypatch.setattr(P, "COMPETENCES_ROOT", tmp_path / "competences")
monkeypatch.setattr(P, "CANDIDATE_DIR", candidate_dir)
monkeypatch.setattr(P, "AUDIT_PATH", tmp_path / "competences" / "persist_audit.jsonl")
monkeypatch.setattr(
P, "INCOMPLETE_PATH", tmp_path / "competences" / "incomplete_learnings.jsonl"
)
P.persist_rate_limiter.reset()
client = TestClient(api_stream.app, raise_server_exceptions=False)
return client, tmp_path
def _good_payload(name="Securite Test", persist_id="uuid-sec-1"):
return {
"name": name,
"machine_id": "machine_sec_x",
"workflow_ir": {
"steps": [{"kind": "click", "parameters": {"target": "OK"}}],
"preconditions": [],
},
"learning_metadata": {"persist_id": persist_id},
}
# ---------------------------------------------------------------------------
# Token Bearer
# ---------------------------------------------------------------------------
class TestPersistAuthToken:
def test_no_token_returns_401(self, persist_client):
client, _ = persist_client
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json=_good_payload(),
)
assert resp.status_code == 401
def test_wrong_token_returns_401(self, persist_client):
client, _ = persist_client
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json=_good_payload(),
headers={"Authorization": "Bearer wrong_token_xyz"},
)
assert resp.status_code == 401
def test_valid_token_returns_201(self, persist_client):
client, _ = persist_client
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json=_good_payload(),
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
)
assert resp.status_code == 201
# ---------------------------------------------------------------------------
# Rate limit
# ---------------------------------------------------------------------------
class TestPersistRateLimit:
def test_rate_limit_11th_call_returns_429(self, persist_client, monkeypatch):
client, _ = persist_client
from core.competences import persist as P
# Forcer max_per_minute=3 pour rendre le test rapide et deterministe
P.persist_rate_limiter.max_per_minute = 3
P.persist_rate_limiter.reset()
headers = {"Authorization": f"Bearer {_TEST_API_TOKEN}"}
# 3 appels OK
for i in range(3):
payload = _good_payload(name=f"Rate {i}", persist_id=f"uuid-rate-{i}")
r = client.post(
"/api/v1/lea/competences/candidate/persist",
json=payload,
headers=headers,
)
assert r.status_code in (201, 409), f"call #{i}: {r.text}"
# 4eme appel -> 429
r4 = client.post(
"/api/v1/lea/competences/candidate/persist",
json=_good_payload(name="Rate Trop", persist_id="uuid-rate-overflow"),
headers=headers,
)
assert r4.status_code == 429
assert "Retry-After" in {k.title() for k in r4.headers.keys()}
# Cleanup pour ne pas polluer d'autres tests
P.persist_rate_limiter.max_per_minute = 10
# ---------------------------------------------------------------------------
# Path traversal & slug strict
# ---------------------------------------------------------------------------
class TestPersistPathTraversal:
def test_path_traversal_in_name_blocked(self, persist_client):
client, tmp_path = persist_client
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json={
**_good_payload(),
"name": "../../etc/passwd",
},
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
)
# Le slug strict supprime les `/`, `.`, etc. -> resultat = 'etcpasswd'
# ou bien rejete si la longueur tombe sous le minimum.
# Dans tous les cas, AUCUN fichier ne doit etre ecrit hors CANDIDATE_DIR.
if resp.status_code == 201:
yaml_path = resp.json()["yaml_path"]
assert yaml_path.startswith("data/competences/candidate/")
# Verifier aucun fichier hors candidate
etc_target = Path("/etc/passwd.yaml")
assert not etc_target.exists() or etc_target.is_file() # existant ok
else:
assert resp.status_code == 400
def test_slug_with_null_byte_blocked(self, persist_client):
client, _ = persist_client
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json={
**_good_payload(),
"name": "abc\x00xyz",
},
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
)
# null byte est non-ASCII -> retire par slugify -> "abcxyz" valide
# ou rejet si l'encodage casse. Tolerer les deux mais pas de 500.
assert resp.status_code in (201, 400)
# ---------------------------------------------------------------------------
# PII detection
# ---------------------------------------------------------------------------
class TestPersistPiiDetection:
def test_email_in_workflow_rejected(self, persist_client):
client, _ = persist_client
payload = _good_payload(persist_id="uuid-pii-email")
payload["workflow_ir"]["steps"].append(
{"kind": "type", "parameters": {"value": "patient: john.doe@hopital.fr"}}
)
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json=payload,
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
)
assert resp.status_code == 400
assert resp.json()["detail"]["error"] == "pii_detected"
def test_phone_in_annotations_rejected(self, persist_client):
client, _ = persist_client
payload = _good_payload(persist_id="uuid-pii-phone")
payload["annotations_semantiques"] = {"intent_fr": "appeler 01 23 45 67 89"}
resp = client.post(
"/api/v1/lea/competences/candidate/persist",
json=payload,
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
)
assert resp.status_code == 400
assert resp.json()["detail"]["error"] == "pii_detected"