feat(security): eval()→AST parseur + pickle→JSON+HMAC signé
Vulnérabilité 1 — eval() dans DAG executor : - Nouveau module safe_condition_evaluator.py - Parseur AST avec whitelist (Constants, Names, Compare, BoolOp, BinOp) - Rejet explicite Call/Lambda/Import/__dunder__/walrus/comprehensions - Expression non sûre → logged ERROR + évaluée à False (pas de crash) - 31 tests (12 valides, 17 malveillantes rejetées, 2 intégration) Vulnérabilité 2 — 3× pickle.load() non sécurisés : - Nouveau module signed_serializer.py (JSON+HMAC-SHA256) - Format : RPA_SIGNED_V1\\n + JSON(hmac + payload base64) - Migration automatique transparente au premier chargement - Fallback pickle avec WARNING (désactivable RPA_ALLOW_PICKLE_FALLBACK=0) - Remplacement dans faiss_manager, visual_embedding_manager, visual_persistence_manager - 13 tests Clé signature : RPA_SIGNING_KEY (fallback TOKEN_SECRET_KEY puis hostname-derived). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
179
tests/unit/test_security_safe_condition.py
Normal file
179
tests/unit/test_security_safe_condition.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""Tests de sécurité : évaluateur de conditions AST restreint."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from core.execution.safe_condition_evaluator import (
|
||||
SafeConditionEvaluator,
|
||||
UnsafeExpressionError,
|
||||
safe_eval_condition,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cas valides — expressions que les workflows doivent pouvoir évaluer
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestValidExpressions:
|
||||
def test_literal_true(self):
|
||||
assert safe_eval_condition("True", {}) is True
|
||||
|
||||
def test_literal_false(self):
|
||||
assert safe_eval_condition("False", {}) is False
|
||||
|
||||
def test_numeric_comparison(self):
|
||||
assert safe_eval_condition("1 < 2", {}) is True
|
||||
assert safe_eval_condition("2 < 1", {}) is False
|
||||
|
||||
def test_chained_comparison(self):
|
||||
assert safe_eval_condition("1 < 2 < 3", {}) is True
|
||||
assert safe_eval_condition("1 < 3 < 2", {}) is False
|
||||
|
||||
def test_variable_access(self):
|
||||
assert safe_eval_condition("x > 5", {"x": 10}) is True
|
||||
|
||||
def test_subscript_dict(self):
|
||||
ctx = {"results": {"step_1": {"score": 0.9}}}
|
||||
assert safe_eval_condition(
|
||||
"results['step_1']['score'] >= 0.8", ctx
|
||||
) is True
|
||||
|
||||
def test_boolean_and(self):
|
||||
assert safe_eval_condition("True and False", {}) is False
|
||||
assert safe_eval_condition("True and True", {}) is True
|
||||
|
||||
def test_boolean_or(self):
|
||||
assert safe_eval_condition("False or True", {}) is True
|
||||
|
||||
def test_not_operator(self):
|
||||
assert safe_eval_condition("not False", {}) is True
|
||||
|
||||
def test_arithmetic(self):
|
||||
assert safe_eval_condition("(a + b) * 2 > 10", {"a": 3, "b": 4}) is True
|
||||
|
||||
def test_in_operator(self):
|
||||
assert safe_eval_condition("'ok' in status", {"status": ["ok", "done"]}) is True
|
||||
|
||||
def test_list_literal(self):
|
||||
assert safe_eval_condition("x in [1, 2, 3]", {"x": 2}) is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cas malveillants — tentatives d'injection / RCE
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestMaliciousExpressions:
|
||||
"""Toutes ces expressions DOIVENT lever UnsafeExpressionError."""
|
||||
|
||||
def test_rejects_import(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("__import__('os').system('echo pwn')", {})
|
||||
|
||||
def test_rejects_function_call(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("print('hello')", {"print": print})
|
||||
|
||||
def test_rejects_eval(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("eval('1+1')", {})
|
||||
|
||||
def test_rejects_exec(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("exec('x=1')", {})
|
||||
|
||||
def test_rejects_dunder_attribute(self):
|
||||
# Classique : remonter à __builtins__ via __class__.__mro__
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("x.__class__", {"x": "abc"})
|
||||
|
||||
def test_rejects_dunder_subclasses(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition(
|
||||
"x.__class__.__mro__[-1].__subclasses__()",
|
||||
{"x": []},
|
||||
)
|
||||
|
||||
def test_rejects_undefined_variable(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("secret > 0", {})
|
||||
|
||||
def test_rejects_lambda(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("(lambda: 42)()", {})
|
||||
|
||||
def test_rejects_list_comprehension(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("[x for x in range(3)]", {})
|
||||
|
||||
def test_rejects_generator(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("(x for x in [1])", {})
|
||||
|
||||
def test_rejects_walrus(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("(x := 1)", {})
|
||||
|
||||
def test_rejects_ifexp(self):
|
||||
# IfExp (conditional) non autorisé par défaut — si besoin ajouter plus tard.
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("1 if True else 2", {})
|
||||
|
||||
def test_rejects_starred(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("[*x]", {"x": [1, 2]})
|
||||
|
||||
def test_rejects_attribute_call_chain(self):
|
||||
# Même si 'dict' est fourni dans le contexte, on n'autorise pas les
|
||||
# appels de méthode.
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition(
|
||||
"results.keys()", {"results": {"a": 1}}
|
||||
)
|
||||
|
||||
def test_rejects_huge_expression(self):
|
||||
big = "0+" * 1000 + "0"
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition(big, {})
|
||||
|
||||
def test_rejects_syntax_error(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition("1 + ", {})
|
||||
|
||||
def test_rejects_non_string(self):
|
||||
with pytest.raises(UnsafeExpressionError):
|
||||
safe_eval_condition(12345, {}) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Intégration avec DAGExecutor : le step condition doit refuser l'injection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDAGExecutorIntegration:
|
||||
def test_condition_step_refuses_malicious_payload(self):
|
||||
"""Un workflow injectant __import__ dans 'condition' doit être refusé
|
||||
silencieusement (result = False) sans exécuter le code."""
|
||||
from core.execution.dag_executor import DAGExecutor, WorkflowStep, StepType
|
||||
|
||||
executor = DAGExecutor()
|
||||
step = WorkflowStep(
|
||||
step_id="malicious",
|
||||
step_type=StepType.CONDITION,
|
||||
action={"condition": "__import__('os').system('echo PWNED')"},
|
||||
)
|
||||
# Accès direct à la méthode privée pour isoler le comportement.
|
||||
result = executor._execute_condition_step(step, step.action)
|
||||
assert result is False
|
||||
|
||||
def test_condition_step_accepts_safe_expression(self):
|
||||
from core.execution.dag_executor import DAGExecutor, WorkflowStep, StepType
|
||||
|
||||
executor = DAGExecutor()
|
||||
executor._results["step_prev"] = {"ok": True}
|
||||
step = WorkflowStep(
|
||||
step_id="cond",
|
||||
step_type=StepType.CONDITION,
|
||||
action={"condition": "results['step_prev']['ok']"},
|
||||
)
|
||||
result = executor._execute_condition_step(step, step.action)
|
||||
assert result is True
|
||||
239
tests/unit/test_security_signed_serializer.py
Normal file
239
tests/unit/test_security_signed_serializer.py
Normal file
@@ -0,0 +1,239 @@
|
||||
"""Tests de sécurité : sérialiseur JSON signé HMAC.
|
||||
|
||||
Couvre :
|
||||
- round-trip JSON signé
|
||||
- rejet d'un fichier altéré
|
||||
- fallback pickle legacy + migration
|
||||
- intégration FAISSManager (lecture / rejet HMAC)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import pickle
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
from core.security.signed_serializer import (
|
||||
SignatureVerificationError,
|
||||
UnsupportedFormatError,
|
||||
dumps_signed,
|
||||
load_signed,
|
||||
loads_signed,
|
||||
save_signed,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _signing_key(monkeypatch):
|
||||
"""Force une clé de signature stable pour les tests."""
|
||||
monkeypatch.setenv("RPA_SIGNING_KEY", "test-signing-key-for-unit-tests-only")
|
||||
monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "1")
|
||||
yield
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round-trip et types étendus
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRoundTrip:
|
||||
def test_primitive_types(self, tmp_path: Path):
|
||||
payload = {"a": 1, "b": "texte", "c": [1, 2, 3], "d": None}
|
||||
path = tmp_path / "data.json.signed"
|
||||
save_signed(path, payload)
|
||||
assert load_signed(path) == payload
|
||||
|
||||
def test_numpy_roundtrip(self, tmp_path: Path):
|
||||
arr = np.arange(12, dtype=np.float32).reshape(3, 4)
|
||||
path = tmp_path / "arr.json.signed"
|
||||
save_signed(path, {"embedding": arr})
|
||||
loaded = load_signed(path)
|
||||
assert isinstance(loaded["embedding"], np.ndarray)
|
||||
assert loaded["embedding"].shape == (3, 4)
|
||||
assert loaded["embedding"].dtype == np.float32
|
||||
np.testing.assert_array_equal(loaded["embedding"], arr)
|
||||
|
||||
def test_datetime_roundtrip(self, tmp_path: Path):
|
||||
now = datetime(2026, 4, 13, 10, 0, 0)
|
||||
path = tmp_path / "dt.json.signed"
|
||||
save_signed(path, {"created_at": now})
|
||||
loaded = load_signed(path)
|
||||
assert loaded["created_at"] == now
|
||||
|
||||
def test_bytes_payload(self):
|
||||
raw = dumps_signed({"blob": b"\x00\x01\x02"})
|
||||
out = loads_signed(raw)
|
||||
assert out["blob"] == b"\x00\x01\x02"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Rejet d'un fichier altéré
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestTampering:
|
||||
def test_rejects_tampered_payload(self, tmp_path: Path):
|
||||
path = tmp_path / "f.signed"
|
||||
save_signed(path, {"score": 0.5})
|
||||
|
||||
raw = path.read_bytes()
|
||||
# Altérer un caractère quelque part dans le payload base64.
|
||||
idx = raw.rfind(b'"payload_b64":"') + len(b'"payload_b64":"')
|
||||
tampered = raw[:idx] + (b"X" if raw[idx:idx + 1] != b"X" else b"Y") + raw[idx + 1:]
|
||||
path.write_bytes(tampered)
|
||||
|
||||
with pytest.raises((SignatureVerificationError, Exception)):
|
||||
load_signed(path)
|
||||
|
||||
def test_rejects_tampered_hmac(self, tmp_path: Path):
|
||||
path = tmp_path / "f.signed"
|
||||
save_signed(path, {"score": 0.5})
|
||||
|
||||
raw = path.read_bytes()
|
||||
tampered = raw.replace(b'"hmac":"', b'"hmac":"0')
|
||||
path.write_bytes(tampered)
|
||||
|
||||
with pytest.raises(SignatureVerificationError):
|
||||
load_signed(path)
|
||||
|
||||
def test_rejects_wrong_key(self, tmp_path: Path, monkeypatch):
|
||||
path = tmp_path / "f.signed"
|
||||
save_signed(path, {"score": 0.5})
|
||||
|
||||
# Changer la clé : la vérification doit échouer.
|
||||
monkeypatch.setenv("RPA_SIGNING_KEY", "other-key")
|
||||
with pytest.raises(SignatureVerificationError):
|
||||
load_signed(path)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Fallback pickle + migration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestPickleFallback:
|
||||
def test_pickle_fallback_loads_and_migrates(self, tmp_path: Path):
|
||||
# Écrire un vieux fichier pickle (format legacy).
|
||||
path = tmp_path / "legacy.pkl"
|
||||
payload = {"score": 0.42, "label": "legacy"}
|
||||
with open(path, "wb") as fp:
|
||||
pickle.dump(payload, fp)
|
||||
|
||||
# Chargement : doit réussir ET migrer le fichier en signé.
|
||||
loaded = load_signed(path, allow_pickle_fallback=True, migrate_on_fallback=True)
|
||||
assert loaded == payload
|
||||
|
||||
# Le fichier doit maintenant être au format signé.
|
||||
new_raw = path.read_bytes()
|
||||
assert new_raw.startswith(b"RPA_SIGNED_V1\n")
|
||||
|
||||
# Et relisable via le format signé.
|
||||
loaded2 = load_signed(path)
|
||||
assert loaded2 == payload
|
||||
|
||||
def test_pickle_fallback_disabled(self, tmp_path: Path, monkeypatch):
|
||||
monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "0")
|
||||
path = tmp_path / "legacy.pkl"
|
||||
with open(path, "wb") as fp:
|
||||
pickle.dump({"x": 1}, fp)
|
||||
|
||||
with pytest.raises(UnsupportedFormatError):
|
||||
load_signed(path)
|
||||
|
||||
def test_pickle_fallback_explicit_off(self, tmp_path: Path):
|
||||
path = tmp_path / "legacy.pkl"
|
||||
with open(path, "wb") as fp:
|
||||
pickle.dump({"x": 1}, fp)
|
||||
|
||||
with pytest.raises(UnsupportedFormatError):
|
||||
load_signed(path, allow_pickle_fallback=False)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Intégration FAISSManager
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
pytest.importorskip("faiss", reason="FAISS non installé.")
|
||||
|
||||
|
||||
class TestFAISSManagerSignedMetadata:
|
||||
def test_save_and_load_roundtrip(self, tmp_path: Path):
|
||||
from core.embedding.faiss_manager import FAISSManager
|
||||
|
||||
manager = FAISSManager(dimensions=8, index_type="Flat", metric="cosine")
|
||||
vec = np.random.rand(8).astype(np.float32)
|
||||
manager.add_embedding("emb_1", vec, metadata={"label": "target"})
|
||||
|
||||
index_path = tmp_path / "index.bin"
|
||||
meta_path = tmp_path / "meta.signed"
|
||||
manager.save(index_path, meta_path)
|
||||
|
||||
# Le fichier métadonnées doit être signé.
|
||||
raw = meta_path.read_bytes()
|
||||
assert raw.startswith(b"RPA_SIGNED_V1\n")
|
||||
|
||||
# Recharger.
|
||||
reloaded = FAISSManager.load(index_path, meta_path)
|
||||
assert reloaded.dimensions == 8
|
||||
assert reloaded.next_id == 1
|
||||
assert 0 in reloaded.metadata_store
|
||||
assert reloaded.metadata_store[0]["embedding_id"] == "emb_1"
|
||||
|
||||
def test_load_refuses_tampered_metadata(self, tmp_path: Path):
|
||||
from core.embedding.faiss_manager import FAISSManager
|
||||
|
||||
manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine")
|
||||
manager.add_embedding("e", np.ones(4, dtype=np.float32), metadata={})
|
||||
|
||||
index_path = tmp_path / "index.bin"
|
||||
meta_path = tmp_path / "meta.signed"
|
||||
manager.save(index_path, meta_path)
|
||||
|
||||
# Altérer la signature du fichier.
|
||||
raw = meta_path.read_bytes()
|
||||
meta_path.write_bytes(raw.replace(b'"hmac":"', b'"hmac":"0'))
|
||||
|
||||
with pytest.raises(SignatureVerificationError):
|
||||
FAISSManager.load(index_path, meta_path)
|
||||
|
||||
def test_load_migrates_legacy_pickle(self, tmp_path: Path):
|
||||
"""Un fichier métadonnées pickle legacy doit être migré."""
|
||||
from core.embedding.faiss_manager import FAISSManager
|
||||
import faiss
|
||||
|
||||
# Construire manuellement un fichier legacy (comme l'ancienne version).
|
||||
manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine")
|
||||
vec = np.ones(4, dtype=np.float32)
|
||||
manager.add_embedding("legacy_emb", vec, metadata={"tag": "old"})
|
||||
|
||||
index_path = tmp_path / "index.bin"
|
||||
meta_path = tmp_path / "meta.pkl"
|
||||
|
||||
# Écrire l'index FAISS normalement...
|
||||
index_to_save = manager.index
|
||||
faiss.write_index(index_to_save, str(index_path))
|
||||
|
||||
# ...mais les métadonnées en pickle brut (format pré-correctif).
|
||||
legacy = {
|
||||
"dimensions": 4,
|
||||
"index_type": "Flat",
|
||||
"metric": "cosine",
|
||||
"next_id": manager.next_id,
|
||||
"metadata_store": manager.metadata_store,
|
||||
"nlist": None,
|
||||
"nprobe": 8,
|
||||
"is_trained": True,
|
||||
"auto_optimize": True,
|
||||
}
|
||||
with open(meta_path, "wb") as fp:
|
||||
pickle.dump(legacy, fp)
|
||||
|
||||
# Chargement : doit réussir + migrer vers format signé.
|
||||
reloaded = FAISSManager.load(index_path, meta_path)
|
||||
assert reloaded.dimensions == 4
|
||||
assert reloaded.next_id == 1
|
||||
|
||||
# Le fichier a été ré-écrit en signé.
|
||||
assert meta_path.read_bytes().startswith(b"RPA_SIGNED_V1\n")
|
||||
Reference in New Issue
Block a user