"""Tests de sécurité : sérialiseur JSON signé HMAC. Couvre : - round-trip JSON signé - rejet d'un fichier altéré - fallback pickle legacy + migration - intégration FAISSManager (lecture / rejet HMAC) """ from __future__ import annotations import json import os import pickle from datetime import datetime from pathlib import Path import numpy as np import pytest from core.security.signed_serializer import ( SignatureVerificationError, UnsupportedFormatError, dumps_signed, load_signed, loads_signed, save_signed, ) @pytest.fixture(autouse=True) def _signing_key(monkeypatch): """Force une clé de signature stable pour les tests.""" monkeypatch.setenv("RPA_SIGNING_KEY", "test-signing-key-for-unit-tests-only") monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "1") yield # --------------------------------------------------------------------------- # Round-trip et types étendus # --------------------------------------------------------------------------- class TestRoundTrip: def test_primitive_types(self, tmp_path: Path): payload = {"a": 1, "b": "texte", "c": [1, 2, 3], "d": None} path = tmp_path / "data.json.signed" save_signed(path, payload) assert load_signed(path) == payload def test_numpy_roundtrip(self, tmp_path: Path): arr = np.arange(12, dtype=np.float32).reshape(3, 4) path = tmp_path / "arr.json.signed" save_signed(path, {"embedding": arr}) loaded = load_signed(path) assert isinstance(loaded["embedding"], np.ndarray) assert loaded["embedding"].shape == (3, 4) assert loaded["embedding"].dtype == np.float32 np.testing.assert_array_equal(loaded["embedding"], arr) def test_datetime_roundtrip(self, tmp_path: Path): now = datetime(2026, 4, 13, 10, 0, 0) path = tmp_path / "dt.json.signed" save_signed(path, {"created_at": now}) loaded = load_signed(path) assert loaded["created_at"] == now def test_bytes_payload(self): raw = dumps_signed({"blob": b"\x00\x01\x02"}) out = loads_signed(raw) assert out["blob"] == b"\x00\x01\x02" # --------------------------------------------------------------------------- # Rejet d'un fichier altéré # --------------------------------------------------------------------------- class TestTampering: def test_rejects_tampered_payload(self, tmp_path: Path): path = tmp_path / "f.signed" save_signed(path, {"score": 0.5}) raw = path.read_bytes() # Altérer un caractère quelque part dans le payload base64. idx = raw.rfind(b'"payload_b64":"') + len(b'"payload_b64":"') tampered = raw[:idx] + (b"X" if raw[idx:idx + 1] != b"X" else b"Y") + raw[idx + 1:] path.write_bytes(tampered) with pytest.raises((SignatureVerificationError, Exception)): load_signed(path) def test_rejects_tampered_hmac(self, tmp_path: Path): path = tmp_path / "f.signed" save_signed(path, {"score": 0.5}) raw = path.read_bytes() tampered = raw.replace(b'"hmac":"', b'"hmac":"0') path.write_bytes(tampered) with pytest.raises(SignatureVerificationError): load_signed(path) def test_rejects_wrong_key(self, tmp_path: Path, monkeypatch): path = tmp_path / "f.signed" save_signed(path, {"score": 0.5}) # Changer la clé : la vérification doit échouer. monkeypatch.setenv("RPA_SIGNING_KEY", "other-key") with pytest.raises(SignatureVerificationError): load_signed(path) # --------------------------------------------------------------------------- # Fallback pickle + migration # --------------------------------------------------------------------------- class TestPickleFallback: def test_pickle_fallback_loads_and_migrates(self, tmp_path: Path): # Écrire un vieux fichier pickle (format legacy). path = tmp_path / "legacy.pkl" payload = {"score": 0.42, "label": "legacy"} with open(path, "wb") as fp: pickle.dump(payload, fp) # Chargement : doit réussir ET migrer le fichier en signé. loaded = load_signed(path, allow_pickle_fallback=True, migrate_on_fallback=True) assert loaded == payload # Le fichier doit maintenant être au format signé. new_raw = path.read_bytes() assert new_raw.startswith(b"RPA_SIGNED_V1\n") # Et relisable via le format signé. loaded2 = load_signed(path) assert loaded2 == payload def test_pickle_fallback_disabled(self, tmp_path: Path, monkeypatch): monkeypatch.setenv("RPA_ALLOW_PICKLE_FALLBACK", "0") path = tmp_path / "legacy.pkl" with open(path, "wb") as fp: pickle.dump({"x": 1}, fp) with pytest.raises(UnsupportedFormatError): load_signed(path) def test_pickle_fallback_explicit_off(self, tmp_path: Path): path = tmp_path / "legacy.pkl" with open(path, "wb") as fp: pickle.dump({"x": 1}, fp) with pytest.raises(UnsupportedFormatError): load_signed(path, allow_pickle_fallback=False) # --------------------------------------------------------------------------- # Intégration FAISSManager # --------------------------------------------------------------------------- pytest.importorskip("faiss", reason="FAISS non installé.") class TestFAISSManagerSignedMetadata: def test_save_and_load_roundtrip(self, tmp_path: Path): from core.embedding.faiss_manager import FAISSManager manager = FAISSManager(dimensions=8, index_type="Flat", metric="cosine") vec = np.random.rand(8).astype(np.float32) manager.add_embedding("emb_1", vec, metadata={"label": "target"}) index_path = tmp_path / "index.bin" meta_path = tmp_path / "meta.signed" manager.save(index_path, meta_path) # Le fichier métadonnées doit être signé. raw = meta_path.read_bytes() assert raw.startswith(b"RPA_SIGNED_V1\n") # Recharger. reloaded = FAISSManager.load(index_path, meta_path) assert reloaded.dimensions == 8 assert reloaded.next_id == 1 assert 0 in reloaded.metadata_store assert reloaded.metadata_store[0]["embedding_id"] == "emb_1" def test_load_refuses_tampered_metadata(self, tmp_path: Path): from core.embedding.faiss_manager import FAISSManager manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine") manager.add_embedding("e", np.ones(4, dtype=np.float32), metadata={}) index_path = tmp_path / "index.bin" meta_path = tmp_path / "meta.signed" manager.save(index_path, meta_path) # Altérer la signature du fichier. raw = meta_path.read_bytes() meta_path.write_bytes(raw.replace(b'"hmac":"', b'"hmac":"0')) with pytest.raises(SignatureVerificationError): FAISSManager.load(index_path, meta_path) def test_load_migrates_legacy_pickle(self, tmp_path: Path): """Un fichier métadonnées pickle legacy doit être migré.""" from core.embedding.faiss_manager import FAISSManager import faiss # Construire manuellement un fichier legacy (comme l'ancienne version). manager = FAISSManager(dimensions=4, index_type="Flat", metric="cosine") vec = np.ones(4, dtype=np.float32) manager.add_embedding("legacy_emb", vec, metadata={"tag": "old"}) index_path = tmp_path / "index.bin" meta_path = tmp_path / "meta.pkl" # Écrire l'index FAISS normalement... index_to_save = manager.index faiss.write_index(index_to_save, str(index_path)) # ...mais les métadonnées en pickle brut (format pré-correctif). legacy = { "dimensions": 4, "index_type": "Flat", "metric": "cosine", "next_id": manager.next_id, "metadata_store": manager.metadata_store, "nlist": None, "nprobe": 8, "is_trained": True, "auto_optimize": True, } with open(meta_path, "wb") as fp: pickle.dump(legacy, fp) # Chargement : doit réussir + migrer vers format signé. reloaded = FAISSManager.load(index_path, meta_path) assert reloaded.dimensions == 4 assert reloaded.next_id == 1 # Le fichier a été ré-écrit en signé. assert meta_path.read_bytes().startswith(b"RPA_SIGNED_V1\n")