diff --git a/agent_v0/server_v1/agent_registry.py b/agent_v0/server_v1/agent_registry.py index b1ca759b4..629d2c326 100644 --- a/agent_v0/server_v1/agent_registry.py +++ b/agent_v0/server_v1/agent_registry.py @@ -29,6 +29,7 @@ Schema de la table `enrolled_agents` : from __future__ import annotations import logging +import os import sqlite3 import threading from datetime import datetime, timezone @@ -47,6 +48,17 @@ def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() +def _fleet_enroll_locked() -> bool: + """WP-B : parc verrouille -> aucun NOUVEAU machine_id ne peut s'enroler. + + Pilote par l'env `RPA_FLEET_ENROLL_LOCKED` (true/1/yes), reversible (relu a + chaque appel). Ferme le contournement « poste revoque + nouveau machine_id + + token global » : les machines deja connues gardent leur comportement, seul + l'enrolement d'un machine_id inconnu est refuse quand le parc est verrouille. + """ + return os.getenv("RPA_FLEET_ENROLL_LOCKED", "").strip().lower() in ("1", "true", "yes") + + class AgentRegistry: """Gestion CRUD des agents enrolles (SQLite).""" @@ -209,7 +221,9 @@ class AgentRegistry: ).fetchone() return {"created": False, "reactivated": True, "agent": dict(row)} - # Nouvelle inscription + # Nouvelle inscription — WP-B : refusee si le parc est verrouille + if _fleet_enroll_locked(): + raise FleetEnrollLockedError(machine_id) conn.execute( """ INSERT INTO enrolled_agents ( @@ -310,3 +324,15 @@ class AgentRevokedError(Exception): f"machine_id={existing_row.get('machine_id')} revoque " f"(reason={existing_row.get('uninstall_reason')})" ) + + +class FleetEnrollLockedError(Exception): + """Levee si le parc est verrouille (RPA_FLEET_ENROLL_LOCKED) et qu'on tente + d'enroler un nouveau machine_id inconnu (WP-B).""" + + def __init__(self, machine_id: str): + self.machine_id = machine_id + super().__init__( + f"enrolement refuse : parc verrouille (RPA_FLEET_ENROLL_LOCKED), " + f"machine_id={machine_id} inconnu" + ) diff --git a/tests/unit/test_fleet_enroll_lock_wpb.py b/tests/unit/test_fleet_enroll_lock_wpb.py new file mode 100644 index 000000000..408292c10 --- /dev/null +++ b/tests/unit/test_fleet_enroll_lock_wpb.py @@ -0,0 +1,81 @@ +"""WP-B — verrou d'enrôlement du parc (RPA_FLEET_ENROLL_LOCKED). + +Ferme le contournement « poste révoqué + nouveau machine_id + token global » : +quand le parc est verrouillé, aucun NOUVEAU machine_id ne peut s'enrôler, tandis +que les machines déjà connues conservent leur comportement. +""" +from __future__ import annotations + +import pytest + +from agent_v0.server_v1.agent_registry import ( + AgentRegistry, + AgentAlreadyEnrolledError, + AgentRevokedError, + FleetEnrollLockedError, +) + + +@pytest.fixture +def registry(tmp_path): + return AgentRegistry(db_path=tmp_path / "fleet.db") + + +def _lock(monkeypatch, on: bool): + if on: + monkeypatch.setenv("RPA_FLEET_ENROLL_LOCKED", "true") + else: + monkeypatch.delenv("RPA_FLEET_ENROLL_LOCKED", raising=False) + + +def test_unlocked_new_machine_enrolls(registry, monkeypatch): + _lock(monkeypatch, False) + res = registry.enroll(machine_id="PC-NEW-1") + assert res["created"] is True + + +def test_locked_new_machine_refused(registry, monkeypatch): + _lock(monkeypatch, True) + with pytest.raises(FleetEnrollLockedError): + registry.enroll(machine_id="PC-INTRUS") + + +def test_locked_known_active_keeps_behavior(registry, monkeypatch): + # machine déjà active : conserve AgentAlreadyEnrolledError, pas FleetEnrollLocked + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-A") + _lock(monkeypatch, True) + with pytest.raises(AgentAlreadyEnrolledError): + registry.enroll(machine_id="PC-A") + + +def test_locked_known_uninstalled_can_reactivate(registry, monkeypatch): + # machine connue puis désinstallée (raison normale) : réactivation autorisée même locked + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-B") + registry.uninstall(machine_id="PC-B", reason="user_uninstall") + _lock(monkeypatch, True) + res = registry.enroll(machine_id="PC-B") + assert res["reactivated"] is True + + +def test_revoked_stays_revoked_and_new_id_blocked(registry, monkeypatch): + # le contournement fermé : poste révoqué NE se réactive pas, et un nouveau + # machine_id est refusé quand le parc est verrouillé. + _lock(monkeypatch, False) + registry.enroll(machine_id="PC-C") + registry.uninstall(machine_id="PC-C", reason="admin_revoke") + _lock(monkeypatch, True) + with pytest.raises(AgentRevokedError): + registry.enroll(machine_id="PC-C") + with pytest.raises(FleetEnrollLockedError): + registry.enroll(machine_id="PC-C-BIS") # nouveau machine_id → bloqué + + +def test_error_message_has_no_token(registry, monkeypatch): + _lock(monkeypatch, True) + try: + registry.enroll(machine_id="PC-X") + except FleetEnrollLockedError as exc: + msg = str(exc).lower() + assert "token" not in msg and "bearer" not in msg