From f18de016d75da0c82764b5fd9c0b5a886fb04757 Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 8 Jun 2026 15:43:04 +0200 Subject: [PATCH] =?UTF-8?q?fix(wp-b):=20verrou=20d'enr=C3=B4lement=20du=20?= =?UTF-8?q?parc=20(RPA=5FFLEET=5FENROLL=5FLOCKED)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ferme le contournement "poste révoqué + nouveau machine_id + token global" : quand RPA_FLEET_ENROLL_LOCKED=true, l'enrôlement d'un machine_id INCONNU est refusé (FleetEnrollLockedError). Les machines déjà connues conservent leur comportement : active -> AlreadyEnrolled, désinstallé non-revoke -> réactivable, admin_revoke -> Revoked. - agent_registry.py : _fleet_enroll_locked() + FleetEnrollLockedError + gate avant INSERT - tests/unit/test_fleet_enroll_lock_wpb.py : 6 tests (verts) NB : le handler HTTP 403 (api_stream.py /api/v1/agents/enroll) reste dans le WIP de la branche (api_stream déjà modifié par le préflight non committé) — sera embarqué au commit de consolidation api_stream. La logique de sécurité (gate) est dans agent_registry, committée. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/agent_registry.py | 28 +++++++- tests/unit/test_fleet_enroll_lock_wpb.py | 81 ++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 tests/unit/test_fleet_enroll_lock_wpb.py diff --git a/agent_v0/server_v1/agent_registry.py b/agent_v0/server_v1/agent_registry.py index b1ca759b4..629d2c326 100644 --- a/agent_v0/server_v1/agent_registry.py +++ b/agent_v0/server_v1/agent_registry.py @@ -29,6 +29,7 @@ Schema de la table `enrolled_agents` : from __future__ import annotations import logging +import os import sqlite3 import threading from datetime import datetime, timezone @@ -47,6 +48,17 @@ def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() +def _fleet_enroll_locked() -> bool: + """WP-B : parc verrouille -> aucun NOUVEAU machine_id ne peut s'enroler. + + Pilote par l'env `RPA_FLEET_ENROLL_LOCKED` (true/1/yes), reversible (relu a + chaque appel). Ferme le contournement « poste revoque + nouveau machine_id + + token global » : les machines deja connues gardent leur comportement, seul + l'enrolement d'un machine_id inconnu est refuse quand le parc est verrouille. + """ + return os.getenv("RPA_FLEET_ENROLL_LOCKED", "").strip().lower() in ("1", "true", "yes") + + class AgentRegistry: """Gestion CRUD des agents enrolles (SQLite).""" @@ -209,7 +221,9 @@ class AgentRegistry: ).fetchone() return {"created": False, "reactivated": True, "agent": dict(row)} - # Nouvelle inscription + # Nouvelle inscription — WP-B : refusee si le parc est verrouille + if _fleet_enroll_locked(): + raise FleetEnrollLockedError(machine_id) conn.execute( """ INSERT INTO enrolled_agents ( @@ -310,3 +324,15 @@ class AgentRevokedError(Exception): f"machine_id={existing_row.get('machine_id')} revoque " f"(reason={existing_row.get('uninstall_reason')})" ) + + +class FleetEnrollLockedError(Exception): + """Levee si le parc est verrouille (RPA_FLEET_ENROLL_LOCKED) et qu'on tente + d'enroler un nouveau machine_id inconnu (WP-B).""" + + def __init__(self, machine_id: str): + self.machine_id = machine_id + super().__init__( + f"enrolement refuse : parc verrouille (RPA_FLEET_ENROLL_LOCKED), " + f"machine_id={machine_id} inconnu" + ) diff --git a/tests/unit/test_fleet_enroll_lock_wpb.py b/tests/unit/test_fleet_enroll_lock_wpb.py new file mode 100644 index 000000000..408292c10 --- /dev/null +++ b/tests/unit/test_fleet_enroll_lock_wpb.py @@ -0,0 +1,81 @@ +"""WP-B — verrou d'enrôlement du parc (RPA_FLEET_ENROLL_LOCKED). + +Ferme le contournement « poste révoqué + nouveau machine_id + token global » : +quand le parc est verrouillé, aucun NOUVEAU machine_id ne peut s'enrôler, tandis +que les machines déjà connues conservent leur comportement. +""" +from __future__ import annotations + +import pytest + +from agent_v0.server_v1.agent_registry import ( + AgentRegistry, + AgentAlreadyEnrolledError, + AgentRevokedError, + FleetEnrollLockedError, +) + + +@pytest.fixture +def registry(tmp_path): + return AgentRegistry(db_path=tmp_path / "fleet.db") + + +def _lock(monkeypatch, on: bool): + if on: + monkeypatch.setenv("RPA_FLEET_ENROLL_LOCKED", "true") + else: + monkeypatch.delenv("RPA_FLEET_ENROLL_LOCKED", raising=False) + + +def test_unlocked_new_machine_enrolls(registry, monkeypatch): + _lock(monkeypatch, False) + res = registry.enroll(machine_id="PC-NEW-1") + assert res["created"] is True + + +def test_locked_new_machine_refused(registry, monkeypatch): + _lock(monkeypatch, True) + with pytest.raises(FleetEnrollLockedError): + registry.enroll(machine_id="PC-INTRUS") + + +def test_locked_known_active_keeps_behavior(registry, monkeypatch): + # machine déjà active : conserve AgentAlreadyEnrolledError, pas FleetEnrollLocked + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-A") + _lock(monkeypatch, True) + with pytest.raises(AgentAlreadyEnrolledError): + registry.enroll(machine_id="PC-A") + + +def test_locked_known_uninstalled_can_reactivate(registry, monkeypatch): + # machine connue puis désinstallée (raison normale) : réactivation autorisée même locked + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-B") + registry.uninstall(machine_id="PC-B", reason="user_uninstall") + _lock(monkeypatch, True) + res = registry.enroll(machine_id="PC-B") + assert res["reactivated"] is True + + +def test_revoked_stays_revoked_and_new_id_blocked(registry, monkeypatch): + # le contournement fermé : poste révoqué NE se réactive pas, et un nouveau + # machine_id est refusé quand le parc est verrouillé. + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-C") + registry.uninstall(machine_id="PC-C", reason="admin_revoke") + _lock(monkeypatch, True) + with pytest.raises(AgentRevokedError): + registry.enroll(machine_id="PC-C") + with pytest.raises(FleetEnrollLockedError): + registry.enroll(machine_id="PC-C-BIS") # nouveau machine_id → bloqué + + +def test_error_message_has_no_token(registry, monkeypatch): + _lock(monkeypatch, True) + try: + registry.enroll(machine_id="PC-X") + except FleetEnrollLockedError as exc: + msg = str(exc).lower() + assert "token" not in msg and "bearer" not in msg