Files
rpa_vision_v3/tests/unit/test_security_signed_serializer.py
Dom 36737cfe9d feat(security): eval()→AST parseur + pickle→JSON+HMAC signé
Vulnérabilité 1 — eval() dans DAG executor :
- Nouveau module safe_condition_evaluator.py
- Parseur AST avec whitelist (Constants, Names, Compare, BoolOp, BinOp)
- Rejet explicite Call/Lambda/Import/__dunder__/walrus/comprehensions
- Expression non sûre → logged ERROR + évaluée à False (pas de crash)
- 31 tests (12 valides, 17 malveillantes rejetées, 2 intégration)

Vulnérabilité 2 — 3× pickle.load() non sécurisés :
- Nouveau module signed_serializer.py (JSON+HMAC-SHA256)
- Format : RPA_SIGNED_V1\\n + JSON(hmac + payload base64)
- Migration automatique transparente au premier chargement
- Fallback pickle avec WARNING (désactivable RPA_ALLOW_PICKLE_FALLBACK=0)
- Remplacement dans faiss_manager, visual_embedding_manager,
  visual_persistence_manager
- 13 tests

Clé signature : RPA_SIGNING_KEY (fallback TOKEN_SECRET_KEY puis hostname-derived).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-14 16:49:17 +02:00

240 lines
8.4 KiB
Python

"""Tests de sécurité : sérialiseur JSON signé HMAC.
Couvre :
- round-trip JSON signé
- rejet d'un fichier altéré
- fallback pickle legacy + migration
- intégration FAISSManager (lecture / rejet HMAC)
"""
from __future__ import annotations
import json
import os
import pickle
from datetime import datetime
from pathlib import Path
import numpy as np
import pytest
from core.security.signed_serializer import (
SignatureVerificationError,
UnsupportedFormatError,
dumps_signed,
load_signed,
loads_signed,
save_signed,
)
@pytest.fixture(autouse=True)
def _signing_key(monkeypatch):
"""Force une clé de signature stable pour les tests."""
monkeypatch.setenv("RPA_SIGNING_KEY", "test-signing-key-for-unit-tests-only")
monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "1")
yield
# ---------------------------------------------------------------------------
# Round-trip et types étendus
# ---------------------------------------------------------------------------
class TestRoundTrip:
def test_primitive_types(self, tmp_path: Path):
payload = {"a": 1, "b": "texte", "c": [1, 2, 3], "d": None}
path = tmp_path / "data.json.signed"
save_signed(path, payload)
assert load_signed(path) == payload
def test_numpy_roundtrip(self, tmp_path: Path):
arr = np.arange(12, dtype=np.float32).reshape(3, 4)
path = tmp_path / "arr.json.signed"
save_signed(path, {"embedding": arr})
loaded = load_signed(path)
assert isinstance(loaded["embedding"], np.ndarray)
assert loaded["embedding"].shape == (3, 4)
assert loaded["embedding"].dtype == np.float32
np.testing.assert_array_equal(loaded["embedding"], arr)
def test_datetime_roundtrip(self, tmp_path: Path):
now = datetime(2026, 4, 13, 10, 0, 0)
path = tmp_path / "dt.json.signed"
save_signed(path, {"created_at": now})
loaded = load_signed(path)
assert loaded["created_at"] == now
def test_bytes_payload(self):
raw = dumps_signed({"blob": b"\x00\x01\x02"})
out = loads_signed(raw)
assert out["blob"] == b"\x00\x01\x02"
# ---------------------------------------------------------------------------
# Rejet d'un fichier altéré
# ---------------------------------------------------------------------------
class TestTampering:
def test_rejects_tampered_payload(self, tmp_path: Path):
path = tmp_path / "f.signed"
save_signed(path, {"score": 0.5})
raw = path.read_bytes()
# Altérer un caractère quelque part dans le payload base64.
idx = raw.rfind(b'"payload_b64":"') + len(b'"payload_b64":"')
tampered = raw[:idx] + (b"X" if raw[idx:idx + 1] != b"X" else b"Y") + raw[idx + 1:]
path.write_bytes(tampered)
with pytest.raises((SignatureVerificationError, Exception)):
load_signed(path)
def test_rejects_tampered_hmac(self, tmp_path: Path):
path = tmp_path / "f.signed"
save_signed(path, {"score": 0.5})
raw = path.read_bytes()
tampered = raw.replace(b'"hmac":"', b'"hmac":"0')
path.write_bytes(tampered)
with pytest.raises(SignatureVerificationError):
load_signed(path)
def test_rejects_wrong_key(self, tmp_path: Path, monkeypatch):
path = tmp_path / "f.signed"
save_signed(path, {"score": 0.5})
# Changer la clé : la vérification doit échouer.
monkeypatch.setenv("RPA_SIGNING_KEY", "other-key")
with pytest.raises(SignatureVerificationError):
load_signed(path)
# ---------------------------------------------------------------------------
# Fallback pickle + migration
# ---------------------------------------------------------------------------
class TestPickleFallback:
def test_pickle_fallback_loads_and_migrates(self, tmp_path: Path):
# Écrire un vieux fichier pickle (format legacy).
path = tmp_path / "legacy.pkl"
payload = {"score": 0.42, "label": "legacy"}
with open(path, "wb") as fp:
pickle.dump(payload, fp)
# Chargement : doit réussir ET migrer le fichier en signé.
loaded = load_signed(path, allow_pickle_fallback=True, migrate_on_fallback=True)
assert loaded == payload
# Le fichier doit maintenant être au format signé.
new_raw = path.read_bytes()
assert new_raw.startswith(b"RPA_SIGNED_V1\n")
# Et relisable via le format signé.
loaded2 = load_signed(path)
assert loaded2 == payload
def test_pickle_fallback_disabled(self, tmp_path: Path, monkeypatch):
monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "0")
path = tmp_path / "legacy.pkl"
with open(path, "wb") as fp:
pickle.dump({"x": 1}, fp)
with pytest.raises(UnsupportedFormatError):
load_signed(path)
def test_pickle_fallback_explicit_off(self, tmp_path: Path):
path = tmp_path / "legacy.pkl"
with open(path, "wb") as fp:
pickle.dump({"x": 1}, fp)
with pytest.raises(UnsupportedFormatError):
load_signed(path, allow_pickle_fallback=False)
# ---------------------------------------------------------------------------
# Intégration FAISSManager
# ---------------------------------------------------------------------------
pytest.importorskip("faiss", reason="FAISS non installé.")
class TestFAISSManagerSignedMetadata:
def test_save_and_load_roundtrip(self, tmp_path: Path):
from core.embedding.faiss_manager import FAISSManager
manager = FAISSManager(dimensions=8, index_type="Flat", metric="cosine")
vec = np.random.rand(8).astype(np.float32)
manager.add_embedding("emb_1", vec, metadata={"label": "target"})
index_path = tmp_path / "index.bin"
meta_path = tmp_path / "meta.signed"
manager.save(index_path, meta_path)
# Le fichier métadonnées doit être signé.
raw = meta_path.read_bytes()
assert raw.startswith(b"RPA_SIGNED_V1\n")
# Recharger.
reloaded = FAISSManager.load(index_path, meta_path)
assert reloaded.dimensions == 8
assert reloaded.next_id == 1
assert 0 in reloaded.metadata_store
assert reloaded.metadata_store[0]["embedding_id"] == "emb_1"
def test_load_refuses_tampered_metadata(self, tmp_path: Path):
from core.embedding.faiss_manager import FAISSManager
manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine")
manager.add_embedding("e", np.ones(4, dtype=np.float32), metadata={})
index_path = tmp_path / "index.bin"
meta_path = tmp_path / "meta.signed"
manager.save(index_path, meta_path)
# Altérer la signature du fichier.
raw = meta_path.read_bytes()
meta_path.write_bytes(raw.replace(b'"hmac":"', b'"hmac":"0'))
with pytest.raises(SignatureVerificationError):
FAISSManager.load(index_path, meta_path)
def test_load_migrates_legacy_pickle(self, tmp_path: Path):
"""Un fichier métadonnées pickle legacy doit être migré."""
from core.embedding.faiss_manager import FAISSManager
import faiss
# Construire manuellement un fichier legacy (comme l'ancienne version).
manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine")
vec = np.ones(4, dtype=np.float32)
manager.add_embedding("legacy_emb", vec, metadata={"tag": "old"})
index_path = tmp_path / "index.bin"
meta_path = tmp_path / "meta.pkl"
# Écrire l'index FAISS normalement...
index_to_save = manager.index
faiss.write_index(index_to_save, str(index_path))
# ...mais les métadonnées en pickle brut (format pré-correctif).
legacy = {
"dimensions": 4,
"index_type": "Flat",
"metric": "cosine",
"next_id": manager.next_id,
"metadata_store": manager.metadata_store,
"nlist": None,
"nprobe": 8,
"is_trained": True,
"auto_optimize": True,
}
with open(meta_path, "wb") as fp:
pickle.dump(legacy, fp)
# Chargement : doit réussir + migrer vers format signé.
reloaded = FAISSManager.load(index_path, meta_path)
assert reloaded.dimensions == 4
assert reloaded.next_id == 1
# Le fichier a été ré-écrit en signé.
assert meta_path.read_bytes().startswith(b"RPA_SIGNED_V1\n")