From b20d17882ee832587451233b5d43015d3335ff36 Mon Sep 17 00:00:00 2001 From: Dom Date: Wed, 10 Jun 2026 14:21:04 +0200 Subject: [PATCH] =?UTF-8?q?feat(wp-c):=20m=C3=A9thode=20verify=5Ftoken=20c?= =?UTF-8?q?=C3=B4t=C3=A9=20registre=20(patch=203,=20inerte)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ajoute AgentRegistry.verify_token(token) -> machine_id|None : compare le SHA-256 du token aux token_hash des agents 'active' via hmac.compare_digest (temps constant). Agent désinstallé/révoqué refusé ; rotation à l'enroll invalide l'ancien token. Inerte au runtime : méthode non branchée sur l'auth HTTP (le branchement derrière flag RPA_FLEET_PER_AGENT_TOKEN sera le Patch 4). api_stream.py intouché. TDD : 6 tests + non-régression WP-C/WP-B (53 verts). Voir PLAN-WPC-TDD-EXECUTABLE. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/agent_registry.py | 26 ++++++++++++ tests/unit/test_wpc_verify.py | 63 ++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) create mode 100644 tests/unit/test_wpc_verify.py diff --git a/agent_v0/server_v1/agent_registry.py b/agent_v0/server_v1/agent_registry.py index 37880ba7e..dbc8135ce 100644 --- a/agent_v0/server_v1/agent_registry.py +++ b/agent_v0/server_v1/agent_registry.py @@ -29,6 +29,7 @@ Schema de la table `enrolled_agents` : from __future__ import annotations import hashlib +import hmac import logging import os import secrets @@ -172,6 +173,31 @@ class AgentRegistry: ).fetchone() return int(row["n"]) if row else 0 + def verify_token(self, token: str | None) -> Optional[str]: + """WP-C : verifie un token poste, retourne le machine_id actif ou None. + + Compare le SHA-256 du token presente aux `token_hash` des agents + `status='active'` via `hmac.compare_digest` (comparaison a temps + constant, evite les fuites par timing). Un agent desinstalle/revoque + n'est pas 'active' donc refuse ; la rotation a l'enrolement invalide + l'ancien token. + + INERTE : non branchee sur l'auth runtime (le branchement derriere flag + sera le Patch 4). Aucun appelant runtime a ce stade. + """ + if not token: + return None + token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest() + with _DB_LOCK, self._connect() as conn: + rows = conn.execute( + "SELECT machine_id, token_hash FROM enrolled_agents " + "WHERE status = 'active' AND token_hash IS NOT NULL" + ).fetchall() + for row in rows: + if hmac.compare_digest(str(row["token_hash"]), token_hash): + return str(row["machine_id"]) + return None + # ------------------------------------------------------------------ # Ecriture # ------------------------------------------------------------------ diff --git a/tests/unit/test_wpc_verify.py b/tests/unit/test_wpc_verify.py new file mode 100644 index 000000000..0bcb68508 --- /dev/null +++ b/tests/unit/test_wpc_verify.py @@ -0,0 +1,63 @@ +"""WP-C Patch 3 — vérification d'un token poste (TDD). + +`AgentRegistry.verify_token(token)` retourne le `machine_id` de l'agent **actif** +dont l'empreinte SHA-256 correspond, ou `None`. Comparaison à temps constant +(`hmac.compare_digest`). Un agent désinstallé/révoqué est refusé, et la rotation +(réenrôlement) invalide l'ancien token. + +Méthode INERTE : non branchée sur l'auth runtime (ce sera le Patch 4, derrière +flag). Voir PLAN-WPC-TDD-EXECUTABLE. +""" +from __future__ import annotations + +import pytest + +from agent_v0.server_v1.agent_registry import AgentRegistry + + +@pytest.fixture +def registry(tmp_path): + return AgentRegistry(db_path=tmp_path / "fleet.db") + + +def test_verify_token_accepts_valid(registry): + """Un token issu d'un enrôlement actif est reconnu (→ machine_id).""" + token = registry.enroll(machine_id="PC-1")["token"] + assert registry.verify_token(token) == "PC-1" + + +def test_verify_token_rejects_unknown(registry): + """Un token inconnu est refusé (→ None).""" + registry.enroll(machine_id="PC-1") + assert registry.verify_token("deadbeef" * 8) is None + + +def test_verify_token_rejects_empty(registry): + """Token vide ou None → None (pas d'exception).""" + registry.enroll(machine_id="PC-1") + assert registry.verify_token("") is None + assert registry.verify_token(None) is None + + +def test_verify_token_rejects_after_uninstall(registry): + """Après désinstallation, le token n'est plus accepté (agent non actif).""" + token = registry.enroll(machine_id="PC-1")["token"] + registry.uninstall(machine_id="PC-1", reason="user_uninstall") + assert registry.verify_token(token) is None + + +def test_verify_token_rotation_invalidates_old(registry): + """La réactivation génère un nouveau token ; l'ancien est invalidé.""" + t1 = registry.enroll(machine_id="PC-1")["token"] + registry.uninstall(machine_id="PC-1", reason="user_uninstall") + t2 = registry.enroll(machine_id="PC-1")["token"] + assert registry.verify_token(t2) == "PC-1" + assert registry.verify_token(t1) is None + + +def test_verify_token_distinguishes_agents(registry): + """Chaque token actif ne reconnaît que son propre poste.""" + t1 = registry.enroll(machine_id="PC-1")["token"] + t2 = registry.enroll(machine_id="PC-2")["token"] + assert registry.verify_token(t1) == "PC-1" + assert registry.verify_token(t2) == "PC-2"