"""Tests securite /api/v1/lea/competences/candidate/persist. Specs ยง6 : - Token Bearer obligatoire - Couplage machine_id (via guard fleet) - Rate limit 10/min/machine_id - Path traversal interdit (slug strict) - PII detection (regle d'or HDS) """ from __future__ import annotations import sys from pathlib import Path import pytest _ROOT = str(Path(__file__).resolve().parents[2]) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) pytestmark = pytest.mark.security _TEST_API_TOKEN = "test_persist_security_token_xyz" @pytest.fixture def persist_client(monkeypatch, tmp_path): monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN) monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False) monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "agents.db")) from fastapi.testclient import TestClient from agent_v0.server_v1 import api_stream from agent_v0.server_v1.agent_registry import AgentRegistry from core.competences import persist as P monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN) test_registry = AgentRegistry(db_path=str(tmp_path / "agents.db")) monkeypatch.setattr(api_stream, "agent_registry", test_registry) candidate_dir = tmp_path / "competences" / "candidate" candidate_dir.mkdir(parents=True, exist_ok=True) monkeypatch.setattr(P, "COMPETENCES_ROOT", tmp_path / "competences") monkeypatch.setattr(P, "CANDIDATE_DIR", candidate_dir) monkeypatch.setattr(P, "AUDIT_PATH", tmp_path / "competences" / "persist_audit.jsonl") monkeypatch.setattr( P, "INCOMPLETE_PATH", tmp_path / "competences" / "incomplete_learnings.jsonl" ) P.persist_rate_limiter.reset() client = TestClient(api_stream.app, raise_server_exceptions=False) return client, tmp_path def _good_payload(name="Securite Test", persist_id="uuid-sec-1"): return { "name": name, "machine_id": "machine_sec_x", "workflow_ir": { "steps": [{"kind": "click", "parameters": {"target": "OK"}}], "preconditions": [], }, "learning_metadata": {"persist_id": persist_id}, } # --------------------------------------------------------------------------- # Token Bearer # --------------------------------------------------------------------------- class TestPersistAuthToken: def test_no_token_returns_401(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json=_good_payload(), ) assert resp.status_code == 401 def test_wrong_token_returns_401(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json=_good_payload(), headers={"Authorization": "Bearer wrong_token_xyz"}, ) assert resp.status_code == 401 def test_valid_token_returns_201(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json=_good_payload(), headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"}, ) assert resp.status_code == 201 # --------------------------------------------------------------------------- # Rate limit # --------------------------------------------------------------------------- class TestPersistRateLimit: def test_rate_limit_11th_call_returns_429(self, persist_client, monkeypatch): client, _ = persist_client from core.competences import persist as P # Forcer max_per_minute=3 pour rendre le test rapide et deterministe P.persist_rate_limiter.max_per_minute = 3 P.persist_rate_limiter.reset() headers = {"Authorization": f"Bearer {_TEST_API_TOKEN}"} # 3 appels OK for i in range(3): payload = _good_payload(name=f"Rate {i}", persist_id=f"uuid-rate-{i}") r = client.post( "/api/v1/lea/competences/candidate/persist", json=payload, headers=headers, ) assert r.status_code in (201, 409), f"call #{i}: {r.text}" # 4eme appel -> 429 r4 = client.post( "/api/v1/lea/competences/candidate/persist", json=_good_payload(name="Rate Trop", persist_id="uuid-rate-overflow"), headers=headers, ) assert r4.status_code == 429 assert "Retry-After" in {k.title() for k in r4.headers.keys()} # Cleanup pour ne pas polluer d'autres tests P.persist_rate_limiter.max_per_minute = 10 # --------------------------------------------------------------------------- # Path traversal & slug strict # --------------------------------------------------------------------------- class TestPersistPathTraversal: def test_path_traversal_in_name_blocked(self, persist_client): client, tmp_path = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ **_good_payload(), "name": "../../etc/passwd", }, headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"}, ) # Le slug strict supprime les `/`, `.`, etc. -> resultat = 'etcpasswd' # ou bien rejete si la longueur tombe sous le minimum. # Dans tous les cas, AUCUN fichier ne doit etre ecrit hors CANDIDATE_DIR. if resp.status_code == 201: yaml_path = resp.json()["yaml_path"] assert yaml_path.startswith("data/competences/candidate/") # Verifier aucun fichier hors candidate etc_target = Path("/etc/passwd.yaml") assert not etc_target.exists() or etc_target.is_file() # existant ok else: assert resp.status_code == 400 def test_slug_with_null_byte_blocked(self, persist_client): client, _ = persist_client resp = client.post( "/api/v1/lea/competences/candidate/persist", json={ **_good_payload(), "name": "abc\x00xyz", }, headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"}, ) # null byte est non-ASCII -> retire par slugify -> "abcxyz" valide # ou rejet si l'encodage casse. Tolerer les deux mais pas de 500. assert resp.status_code in (201, 400) # --------------------------------------------------------------------------- # PII detection # --------------------------------------------------------------------------- class TestPersistPiiDetection: def test_email_in_workflow_rejected(self, persist_client): client, _ = persist_client payload = _good_payload(persist_id="uuid-pii-email") payload["workflow_ir"]["steps"].append( {"kind": "type", "parameters": {"value": "patient: john.doe@hopital.fr"}} ) resp = client.post( "/api/v1/lea/competences/candidate/persist", json=payload, headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"}, ) assert resp.status_code == 400 assert resp.json()["detail"]["error"] == "pii_detected" def test_phone_in_annotations_rejected(self, persist_client): client, _ = persist_client payload = _good_payload(persist_id="uuid-pii-phone") payload["annotations_semantiques"] = {"intent_fr": "appeler 01 23 45 67 89"} resp = client.post( "/api/v1/lea/competences/candidate/persist", json=payload, headers={"Authorization": f"Bearer {_TEST_API_TOKEN}"}, ) assert resp.status_code == 400 assert resp.json()["detail"]["error"] == "pii_detected"