feat(wp-c): génération token par poste à l'enroll (patch 2, inerte runtime)
Génère un token unique (secrets.token_hex(32)) à chaque (ré)enrôlement, persiste uniquement son empreinte SHA-256 dans token_hash, renseigne token_issued_at, retourne le clair une seule fois dans le résultat de enroll. Le clair n'est jamais journalisé ni persisté. Inerte au runtime : api_stream.py intouché, l'endpoint /agents/enroll ne propage ni le clair ni le hash (api_token global inchangé). Auth runtime non modifiée. Aucun branchement _verify_token. TDD : 8 tests + non-régression WP-B/WP-C (47 verts). Voir PLAN-WPC-TDD-EXECUTABLE / DETTE-015. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
106
tests/unit/test_wpc_enroll_token.py
Normal file
106
tests/unit/test_wpc_enroll_token.py
Normal file
@@ -0,0 +1,106 @@
|
||||
"""WP-C Patch 2 — génération d'un token par poste à l'enrôlement (TDD).
|
||||
|
||||
À chaque enrôlement (création OU réactivation), le registre génère un token
|
||||
unique (`secrets.token_hex(32)`), persiste UNIQUEMENT son empreinte SHA-256 dans
|
||||
`token_hash`, renseigne `token_issued_at`, et retourne le token clair une seule
|
||||
fois dans le résultat de `enroll`. Le clair n'est jamais persisté ni journalisé.
|
||||
|
||||
Auth runtime inchangée : aucun branchement sur `api_stream._verify_token` (ce
|
||||
sera l'objet de patchs WP-C ultérieurs). Voir PLAN-WPC-TDD-EXECUTABLE / DETTE-015.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import sqlite3
|
||||
|
||||
import pytest
|
||||
|
||||
from agent_v0.server_v1.agent_registry import AgentRegistry
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def registry(tmp_path):
|
||||
return AgentRegistry(db_path=tmp_path / "fleet.db")
|
||||
|
||||
|
||||
def _sha256(s: str) -> str:
|
||||
return hashlib.sha256(s.encode("utf-8")).hexdigest()
|
||||
|
||||
|
||||
def _persisted_row(db_path, machine_id):
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT * FROM enrolled_agents WHERE machine_id = ?", (machine_id,)
|
||||
).fetchone()
|
||||
return dict(row) if row else None
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def test_enroll_returns_clear_token(registry):
|
||||
"""L'enrôlement retourne un token clair (hex, 64 caractères)."""
|
||||
res = registry.enroll(machine_id="PC-1")
|
||||
token = res["token"]
|
||||
assert isinstance(token, str)
|
||||
assert len(token) == 64
|
||||
int(token, 16) # doit être hexadécimal valide
|
||||
|
||||
|
||||
def test_token_unique_per_enroll(registry):
|
||||
"""Deux enrôlements distincts produisent deux tokens différents (critère 1)."""
|
||||
t1 = registry.enroll(machine_id="PC-1")["token"]
|
||||
t2 = registry.enroll(machine_id="PC-2")["token"]
|
||||
assert t1 != t2
|
||||
|
||||
|
||||
def test_clear_token_not_persisted(registry, tmp_path):
|
||||
"""Le token clair n'est jamais persisté en base (critère 3)."""
|
||||
token = registry.enroll(machine_id="PC-1")["token"]
|
||||
row = _persisted_row(tmp_path / "fleet.db", "PC-1")
|
||||
for value in row.values():
|
||||
assert value != token
|
||||
|
||||
|
||||
def test_token_hash_persisted_as_sha256(registry, tmp_path):
|
||||
"""token_hash persisté = SHA-256 du clair (critère 4)."""
|
||||
token = registry.enroll(machine_id="PC-1")["token"]
|
||||
row = _persisted_row(tmp_path / "fleet.db", "PC-1")
|
||||
assert row["token_hash"] == _sha256(token)
|
||||
|
||||
|
||||
def test_token_issued_at_set(registry, tmp_path):
|
||||
"""token_issued_at est renseigné à l'enrôlement (critère 5)."""
|
||||
registry.enroll(machine_id="PC-1")
|
||||
row = _persisted_row(tmp_path / "fleet.db", "PC-1")
|
||||
assert row["token_issued_at"] # non NULL / non vide
|
||||
|
||||
|
||||
def test_get_never_exposes_clear_token(registry):
|
||||
"""get() ne renvoie jamais le token clair (retourné une seule fois — critère 2)."""
|
||||
token = registry.enroll(machine_id="PC-1")["token"]
|
||||
agent = registry.get("PC-1")
|
||||
assert "token" not in agent # pas de clé clair persistée
|
||||
for value in agent.values():
|
||||
assert value != token
|
||||
|
||||
|
||||
def test_reactivation_rotates_token(registry, tmp_path):
|
||||
"""La réactivation génère un nouveau token + nouveau hash (critère 1)."""
|
||||
t1 = registry.enroll(machine_id="PC-1")["token"]
|
||||
h1 = _persisted_row(tmp_path / "fleet.db", "PC-1")["token_hash"]
|
||||
registry.uninstall(machine_id="PC-1", reason="user_uninstall")
|
||||
t2 = registry.enroll(machine_id="PC-1")["token"]
|
||||
h2 = _persisted_row(tmp_path / "fleet.db", "PC-1")["token_hash"]
|
||||
assert t2 != t1
|
||||
assert h2 != h1
|
||||
assert h2 == _sha256(t2)
|
||||
|
||||
|
||||
def test_clear_token_never_logged(registry, caplog):
|
||||
"""Le token clair n'apparaît jamais dans les logs (critère 6)."""
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
token = registry.enroll(machine_id="PC-1")["token"]
|
||||
assert token not in caplog.text
|
||||
Reference in New Issue
Block a user