feat(wp-c): méthode verify_token côté registre (patch 3, inerte)

Ajoute AgentRegistry.verify_token(token) -> machine_id|None : compare le
SHA-256 du token aux token_hash des agents 'active' via hmac.compare_digest
(temps constant). Agent désinstallé/révoqué refusé ; rotation à l'enroll
invalide l'ancien token.

Inerte au runtime : méthode non branchée sur l'auth HTTP (le branchement
derrière flag RPA_FLEET_PER_AGENT_TOKEN sera le Patch 4). api_stream.py
intouché. TDD : 6 tests + non-régression WP-C/WP-B (53 verts). Voir
PLAN-WPC-TDD-EXECUTABLE.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-10 14:21:04 +02:00
parent 9fb2c7bfee
commit b20d17882e
2 changed files with 89 additions and 0 deletions

View File

@@ -29,6 +29,7 @@ Schema de la table `enrolled_agents` :
from __future__ import annotations
import hashlib
import hmac
import logging
import os
import secrets
@@ -172,6 +173,31 @@ class AgentRegistry:
).fetchone()
return int(row["n"]) if row else 0
def verify_token(self, token: str | None) -> Optional[str]:
"""WP-C : verifie un token poste, retourne le machine_id actif ou None.
Compare le SHA-256 du token presente aux `token_hash` des agents
`status='active'` via `hmac.compare_digest` (comparaison a temps
constant, evite les fuites par timing). Un agent desinstalle/revoque
n'est pas 'active' donc refuse ; la rotation a l'enrolement invalide
l'ancien token.
INERTE : non branchee sur l'auth runtime (le branchement derriere flag
sera le Patch 4). Aucun appelant runtime a ce stade.
"""
if not token:
return None
token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest()
with _DB_LOCK, self._connect() as conn:
rows = conn.execute(
"SELECT machine_id, token_hash FROM enrolled_agents "
"WHERE status = 'active' AND token_hash IS NOT NULL"
).fetchall()
for row in rows:
if hmac.compare_digest(str(row["token_hash"]), token_hash):
return str(row["machine_id"])
return None
# ------------------------------------------------------------------
# Ecriture
# ------------------------------------------------------------------

View File

@@ -0,0 +1,63 @@
"""WP-C Patch 3 — vérification d'un token poste (TDD).
`AgentRegistry.verify_token(token)` retourne le `machine_id` de l'agent **actif**
dont l'empreinte SHA-256 correspond, ou `None`. Comparaison à temps constant
(`hmac.compare_digest`). Un agent désinstallé/révoqué est refusé, et la rotation
(réenrôlement) invalide l'ancien token.
Méthode INERTE : non branchée sur l'auth runtime (ce sera le Patch 4, derrière
flag). Voir PLAN-WPC-TDD-EXECUTABLE.
"""
from __future__ import annotations
import pytest
from agent_v0.server_v1.agent_registry import AgentRegistry
@pytest.fixture
def registry(tmp_path):
return AgentRegistry(db_path=tmp_path / "fleet.db")
def test_verify_token_accepts_valid(registry):
"""Un token issu d'un enrôlement actif est reconnu (→ machine_id)."""
token = registry.enroll(machine_id="PC-1")["token"]
assert registry.verify_token(token) == "PC-1"
def test_verify_token_rejects_unknown(registry):
"""Un token inconnu est refusé (→ None)."""
registry.enroll(machine_id="PC-1")
assert registry.verify_token("deadbeef" * 8) is None
def test_verify_token_rejects_empty(registry):
"""Token vide ou None → None (pas d'exception)."""
registry.enroll(machine_id="PC-1")
assert registry.verify_token("") is None
assert registry.verify_token(None) is None
def test_verify_token_rejects_after_uninstall(registry):
"""Après désinstallation, le token n'est plus accepté (agent non actif)."""
token = registry.enroll(machine_id="PC-1")["token"]
registry.uninstall(machine_id="PC-1", reason="user_uninstall")
assert registry.verify_token(token) is None
def test_verify_token_rotation_invalidates_old(registry):
"""La réactivation génère un nouveau token ; l'ancien est invalidé."""
t1 = registry.enroll(machine_id="PC-1")["token"]
registry.uninstall(machine_id="PC-1", reason="user_uninstall")
t2 = registry.enroll(machine_id="PC-1")["token"]
assert registry.verify_token(t2) == "PC-1"
assert registry.verify_token(t1) is None
def test_verify_token_distinguishes_agents(registry):
"""Chaque token actif ne reconnaît que son propre poste."""
t1 = registry.enroll(machine_id="PC-1")["token"]
t2 = registry.enroll(machine_id="PC-2")["token"]
assert registry.verify_token(t1) == "PC-1"
assert registry.verify_token(t2) == "PC-2"