213 lines
7.6 KiB
Python
213 lines
7.6 KiB
Python
"""Tests securite /api/v1/lea/competences/candidate/persist.
|
|
|
|
Specs §6 :
|
|
- Token Bearer obligatoire
|
|
- Couplage machine_id (via guard fleet)
|
|
- Rate limit 10/min/machine_id
|
|
- Path traversal interdit (slug strict)
|
|
- PII detection (regle d'or HDS)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
_ROOT = str(Path(__file__).resolve().parents[2])
|
|
if _ROOT not in sys.path:
|
|
sys.path.insert(0, _ROOT)
|
|
|
|
|
|
pytestmark = pytest.mark.security
|
|
|
|
_TEST_API_TOKEN = "test_persist_security_token_xyz"
|
|
|
|
|
|
@pytest.fixture
|
|
def persist_client(monkeypatch, tmp_path):
|
|
monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN)
|
|
monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False)
|
|
monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "agents.db"))
|
|
|
|
from fastapi.testclient import TestClient
|
|
from agent_v0.server_v1 import api_stream
|
|
from agent_v0.server_v1.agent_registry import AgentRegistry
|
|
from core.competences import persist as P
|
|
|
|
monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN)
|
|
test_registry = AgentRegistry(db_path=str(tmp_path / "agents.db"))
|
|
monkeypatch.setattr(api_stream, "agent_registry", test_registry)
|
|
|
|
candidate_dir = tmp_path / "competences" / "candidate"
|
|
candidate_dir.mkdir(parents=True, exist_ok=True)
|
|
monkeypatch.setattr(P, "COMPETENCES_ROOT", tmp_path / "competences")
|
|
monkeypatch.setattr(P, "CANDIDATE_DIR", candidate_dir)
|
|
monkeypatch.setattr(P, "AUDIT_PATH", tmp_path / "competences" / "persist_audit.jsonl")
|
|
monkeypatch.setattr(
|
|
P, "INCOMPLETE_PATH", tmp_path / "competences" / "incomplete_learnings.jsonl"
|
|
)
|
|
P.persist_rate_limiter.reset()
|
|
|
|
client = TestClient(api_stream.app, raise_server_exceptions=False)
|
|
return client, tmp_path
|
|
|
|
|
|
def _good_payload(name="Securite Test", persist_id="uuid-sec-1"):
|
|
return {
|
|
"name": name,
|
|
"machine_id": "machine_sec_x",
|
|
"workflow_ir": {
|
|
"steps": [{"kind": "click", "parameters": {"target": "OK"}}],
|
|
"preconditions": [],
|
|
},
|
|
"learning_metadata": {"persist_id": persist_id},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Token Bearer
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPersistAuthToken:
|
|
def test_no_token_returns_401(self, persist_client):
|
|
client, _ = persist_client
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=_good_payload(),
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
def test_wrong_token_returns_401(self, persist_client):
|
|
client, _ = persist_client
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=_good_payload(),
|
|
headers={"Authorization": "Bearer wrong_token_xyz"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
def test_valid_token_returns_201(self, persist_client):
|
|
client, _ = persist_client
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=_good_payload(),
|
|
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
|
|
)
|
|
assert resp.status_code == 201
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Rate limit
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPersistRateLimit:
|
|
def test_rate_limit_11th_call_returns_429(self, persist_client, monkeypatch):
|
|
client, _ = persist_client
|
|
from core.competences import persist as P
|
|
|
|
# Forcer max_per_minute=3 pour rendre le test rapide et deterministe
|
|
P.persist_rate_limiter.max_per_minute = 3
|
|
P.persist_rate_limiter.reset()
|
|
|
|
headers = {"Authorization": f"Bearer {_TEST_API_TOKEN}"}
|
|
# 3 appels OK
|
|
for i in range(3):
|
|
payload = _good_payload(name=f"Rate {i}", persist_id=f"uuid-rate-{i}")
|
|
r = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=payload,
|
|
headers=headers,
|
|
)
|
|
assert r.status_code in (201, 409), f"call #{i}: {r.text}"
|
|
# 4eme appel -> 429
|
|
r4 = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=_good_payload(name="Rate Trop", persist_id="uuid-rate-overflow"),
|
|
headers=headers,
|
|
)
|
|
assert r4.status_code == 429
|
|
assert "Retry-After" in {k.title() for k in r4.headers.keys()}
|
|
|
|
# Cleanup pour ne pas polluer d'autres tests
|
|
P.persist_rate_limiter.max_per_minute = 10
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path traversal & slug strict
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPersistPathTraversal:
|
|
def test_path_traversal_in_name_blocked(self, persist_client):
|
|
client, tmp_path = persist_client
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json={
|
|
**_good_payload(),
|
|
"name": "../../etc/passwd",
|
|
},
|
|
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
|
|
)
|
|
# Le slug strict supprime les `/`, `.`, etc. -> resultat = 'etcpasswd'
|
|
# ou bien rejete si la longueur tombe sous le minimum.
|
|
# Dans tous les cas, AUCUN fichier ne doit etre ecrit hors CANDIDATE_DIR.
|
|
if resp.status_code == 201:
|
|
yaml_path = resp.json()["yaml_path"]
|
|
assert yaml_path.startswith("data/competences/candidate/")
|
|
# Verifier aucun fichier hors candidate
|
|
etc_target = Path("/etc/passwd.yaml")
|
|
assert not etc_target.exists() or etc_target.is_file() # existant ok
|
|
else:
|
|
assert resp.status_code == 400
|
|
|
|
def test_slug_with_null_byte_blocked(self, persist_client):
|
|
client, _ = persist_client
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json={
|
|
**_good_payload(),
|
|
"name": "abc\x00xyz",
|
|
},
|
|
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
|
|
)
|
|
# null byte est non-ASCII -> retire par slugify -> "abcxyz" valide
|
|
# ou rejet si l'encodage casse. Tolerer les deux mais pas de 500.
|
|
assert resp.status_code in (201, 400)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PII detection
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPersistPiiDetection:
|
|
def test_email_in_workflow_rejected(self, persist_client):
|
|
client, _ = persist_client
|
|
payload = _good_payload(persist_id="uuid-pii-email")
|
|
payload["workflow_ir"]["steps"].append(
|
|
{"kind": "type", "parameters": {"value": "patient: john.doe@hopital.fr"}}
|
|
)
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=payload,
|
|
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
|
|
)
|
|
assert resp.status_code == 400
|
|
assert resp.json()["detail"]["error"] == "pii_detected"
|
|
|
|
def test_phone_in_annotations_rejected(self, persist_client):
|
|
client, _ = persist_client
|
|
payload = _good_payload(persist_id="uuid-pii-phone")
|
|
payload["annotations_semantiques"] = {"intent_fr": "appeler 01 23 45 67 89"}
|
|
resp = client.post(
|
|
"/api/v1/lea/competences/candidate/persist",
|
|
json=payload,
|
|
headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"},
|
|
)
|
|
assert resp.status_code == 400
|
|
assert resp.json()["detail"]["error"] == "pii_detected"
|