fix(wp-b): verrou d'enrôlement du parc (RPA_FLEET_ENROLL_LOCKED)

Ferme le contournement "poste révoqué + nouveau machine_id + token global" :
quand RPA_FLEET_ENROLL_LOCKED=true, l'enrôlement d'un machine_id INCONNU est refusé
(FleetEnrollLockedError). Les machines déjà connues conservent leur comportement :
active -> AlreadyEnrolled, désinstallé non-revoke -> réactivable, admin_revoke -> Revoked.

- agent_registry.py : _fleet_enroll_locked() + FleetEnrollLockedError + gate avant INSERT
- tests/unit/test_fleet_enroll_lock_wpb.py : 6 tests (verts)

NB : le handler HTTP 403 (api_stream.py /api/v1/agents/enroll) reste dans le WIP de la
branche (api_stream déjà modifié par le préflight non committé) — sera embarqué au commit
de consolidation api_stream. La logique de sécurité (gate) est dans agent_registry, committée.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-08 15:43:04 +02:00
parent 549ea0631b
commit f18de016d7
2 changed files with 108 additions and 1 deletions

View File

@@ -29,6 +29,7 @@ Schema de la table `enrolled_agents` :
from __future__ import annotations
import logging
import os
import sqlite3
import threading
from datetime import datetime, timezone
@@ -47,6 +48,17 @@ def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def _fleet_enroll_locked() -> bool:
"""WP-B : parc verrouille -> aucun NOUVEAU machine_id ne peut s'enroler.
Pilote par l'env `RPA_FLEET_ENROLL_LOCKED` (true/1/yes), reversible (relu a
chaque appel). Ferme le contournement « poste revoque + nouveau machine_id +
token global » : les machines deja connues gardent leur comportement, seul
l'enrolement d'un machine_id inconnu est refuse quand le parc est verrouille.
"""
return os.getenv("RPA_FLEET_ENROLL_LOCKED", "").strip().lower() in ("1", "true", "yes")
class AgentRegistry:
"""Gestion CRUD des agents enrolles (SQLite)."""
@@ -209,7 +221,9 @@ class AgentRegistry:
).fetchone()
return {"created": False, "reactivated": True, "agent": dict(row)}
# Nouvelle inscription
# Nouvelle inscription — WP-B : refusee si le parc est verrouille
if _fleet_enroll_locked():
raise FleetEnrollLockedError(machine_id)
conn.execute(
"""
INSERT INTO enrolled_agents (
@@ -310,3 +324,15 @@ class AgentRevokedError(Exception):
f"machine_id={existing_row.get('machine_id')} revoque "
f"(reason={existing_row.get('uninstall_reason')})"
)
class FleetEnrollLockedError(Exception):
"""Levee si le parc est verrouille (RPA_FLEET_ENROLL_LOCKED) et qu'on tente
d'enroler un nouveau machine_id inconnu (WP-B)."""
def __init__(self, machine_id: str):
self.machine_id = machine_id
super().__init__(
f"enrolement refuse : parc verrouille (RPA_FLEET_ENROLL_LOCKED), "
f"machine_id={machine_id} inconnu"
)

View File

@@ -0,0 +1,81 @@
"""WP-B — verrou d'enrôlement du parc (RPA_FLEET_ENROLL_LOCKED).
Ferme le contournement « poste révoqué + nouveau machine_id + token global » :
quand le parc est verrouillé, aucun NOUVEAU machine_id ne peut s'enrôler, tandis
que les machines déjà connues conservent leur comportement.
"""
from __future__ import annotations
import pytest
from agent_v0.server_v1.agent_registry import (
AgentRegistry,
AgentAlreadyEnrolledError,
AgentRevokedError,
FleetEnrollLockedError,
)
@pytest.fixture
def registry(tmp_path):
return AgentRegistry(db_path=tmp_path / "fleet.db")
def _lock(monkeypatch, on: bool):
if on:
monkeypatch.setenv("RPA_FLEET_ENROLL_LOCKED", "true")
else:
monkeypatch.delenv("RPA_FLEET_ENROLL_LOCKED", raising=False)
def test_unlocked_new_machine_enrolls(registry, monkeypatch):
_lock(monkeypatch, False)
res = registry.enroll(machine_id="PC-NEW-1")
assert res["created"] is True
def test_locked_new_machine_refused(registry, monkeypatch):
_lock(monkeypatch, True)
with pytest.raises(FleetEnrollLockedError):
registry.enroll(machine_id="PC-INTRUS")
def test_locked_known_active_keeps_behavior(registry, monkeypatch):
# machine déjà active : conserve AgentAlreadyEnrolledError, pas FleetEnrollLocked
_lock(monkeypatch, False)
registry.enroll(machine_id="PC-A")
_lock(monkeypatch, True)
with pytest.raises(AgentAlreadyEnrolledError):
registry.enroll(machine_id="PC-A")
def test_locked_known_uninstalled_can_reactivate(registry, monkeypatch):
# machine connue puis désinstallée (raison normale) : réactivation autorisée même locked
_lock(monkeypatch, False)
registry.enroll(machine_id="PC-B")
registry.uninstall(machine_id="PC-B", reason="user_uninstall")
_lock(monkeypatch, True)
res = registry.enroll(machine_id="PC-B")
assert res["reactivated"] is True
def test_revoked_stays_revoked_and_new_id_blocked(registry, monkeypatch):
# le contournement fermé : poste révoqué NE se réactive pas, et un nouveau
# machine_id est refusé quand le parc est verrouillé.
_lock(monkeypatch, False)
registry.enroll(machine_id="PC-C")
registry.uninstall(machine_id="PC-C", reason="admin_revoke")
_lock(monkeypatch, True)
with pytest.raises(AgentRevokedError):
registry.enroll(machine_id="PC-C")
with pytest.raises(FleetEnrollLockedError):
registry.enroll(machine_id="PC-C-BIS") # nouveau machine_id → bloqué
def test_error_message_has_no_token(registry, monkeypatch):
_lock(monkeypatch, True)
try:
registry.enroll(machine_id="PC-X")
except FleetEnrollLockedError as exc:
msg = str(exc).lower()
assert "token" not in msg and "bearer" not in msg