feat(server): endpoint POST /api/v1/agents/logs (push-log-DGX, brique 2)

Reçoit un batch de logs client, range via AgentLogsStore par machine_id.
Garde-fous : auth Bearer (401), agent actif via _guard_agent_registry_access
(403 si révoqué/inconnu, + touch_last_seen), cap anti-flood 413 (G3 Qwen,
RPA_AGENT_LOGS_MAX_BATCH=1000). TDD 4/4 ; non-régression enroll 16/16.

refs DETTE-020 DETTE-021

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-26 16:25:14 +02:00
parent a29b7a2f21
commit bbe897e614
2 changed files with 173 additions and 0 deletions

View File

@@ -583,6 +583,17 @@ _AGENTS_DB_PATH = os.environ.get(
)
agent_registry = AgentRegistry(db_path=_AGENTS_DB_PATH)
# push-log-DGX : store des logs poussés par les clients, rangés par machine_id
# (observabilité des postes sans AnyDesk — DETTE-020/021).
from .agent_logs_store import AgentLogsStore # noqa: E402
_AGENT_LOGS_DIR = os.environ.get(
"RPA_AGENT_LOGS_DIR", str(ROOT_DIR / "data" / "agent_logs")
)
# Garde-fou anti-flood (G3) : nb max d'entrées acceptées par batch.
_AGENT_LOGS_MAX_BATCH = int(os.environ.get("RPA_AGENT_LOGS_MAX_BATCH", "1000"))
agent_logs_store = AgentLogsStore(base_dir=_AGENT_LOGS_DIR)
def _agent_registry_has_entries() -> bool:
try:
@@ -1562,6 +1573,16 @@ class AgentUninstallRequest(BaseModel):
reason: Optional[str] = None
class AgentLogsRequest(BaseModel):
"""Batch de logs poussé par un client Léa (push-log-DGX).
`logs` = liste d'entrées {ts, level, logger, message} (format libre côté
serveur ; le client garantit le PII-safe avant push).
"""
machine_id: str
logs: list[dict] = []
# Thread de nettoyage périodique des replays terminés et sessions expirées
_cleanup_thread: Optional[threading.Thread] = None
_cleanup_running = False
@@ -7200,6 +7221,35 @@ async def agents_fleet():
}
@app.post("/api/v1/agents/logs")
async def agents_logs(request: AgentLogsRequest):
"""Réception des logs poussés par un client Léa (push-log-DGX).
Range les logs par machine_id (AgentLogsStore) pour consultation au
dashboard — diagnostic des postes sans AnyDesk. Mêmes garde-fous fleet
que stream/poll : un poste révoqué/inconnu est refusé (403).
"""
machine_id = (request.machine_id or "").strip()
if not machine_id:
raise HTTPException(status_code=400, detail="machine_id est obligatoire")
if len(request.logs) > _AGENT_LOGS_MAX_BATCH:
raise HTTPException(
status_code=413,
detail={
"error": "batch_too_large",
"max_batch": _AGENT_LOGS_MAX_BATCH,
"received": len(request.logs),
},
)
# Bloque les postes révoqués/désinstallés + met à jour last_seen_at.
_guard_agent_registry_access(machine_id, endpoint="agents/logs")
received = agent_logs_store.append(machine_id, request.logs)
return {"status": "ok", "received": received, "machine_id": machine_id}
# =========================================================================
# R2 MVP P0 — DialogResolver (catalogue centralisé des modaux runtime)
# Flag OFF par défaut. Activer en posant RPA_DIALOG_RESOLVER_ENABLED=true.

View File

@@ -0,0 +1,123 @@
"""Tests d'intégration de l'endpoint POST /api/v1/agents/logs (push-log-DGX).
Le client Léa pousse ses logs (batch JSON) vers le DGX ; le serveur les range
par machine_id (AgentLogsStore) pour consultation au dashboard — diagnostic des
postes sans AnyDesk. Mêmes garde-fous fleet que stream/poll (agent actif).
Branche feat/push-log-dgx — DETTE-020/021.
"""
from __future__ import annotations
import sys
from pathlib import Path
import pytest
_ROOT = str(Path(__file__).resolve().parents[2])
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
_TEST_API_TOKEN = "test_token_logs_endpoint_0123456789abcdef"
@pytest.fixture
def logs_client(monkeypatch, tmp_path):
"""Client FastAPI de test avec registre ET store de logs isolés sur disque."""
monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN)
monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "test_agents.db"))
from fastapi.testclient import TestClient
from agent_v0.server_v1 import api_stream
from agent_v0.server_v1.agent_registry import AgentRegistry
from agent_v0.server_v1.agent_logs_store import AgentLogsStore
monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN)
test_registry = AgentRegistry(db_path=str(tmp_path / "test_agents.db"))
monkeypatch.setattr(api_stream, "agent_registry", test_registry)
test_store = AgentLogsStore(base_dir=tmp_path / "agent_logs")
monkeypatch.setattr(api_stream, "agent_logs_store", test_store, raising=False)
client = TestClient(api_stream.app, raise_server_exceptions=False)
yield client, _TEST_API_TOKEN, test_store
def _auth_headers(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
def _enroll(client, token, machine_id):
return client.post(
"/api/v1/agents/enroll",
json={"machine_id": machine_id, "user_name": machine_id},
headers=_auth_headers(token),
)
def test_post_logs_persists_for_active_agent(logs_client):
client, token, store = logs_client
_enroll(client, token, "lea-emilie-001")
payload = {
"machine_id": "lea-emilie-001",
"logs": [
{"ts": "2026-06-26T16:00:00", "level": "WARNING",
"logger": "agent_v1.core.executor", "message": "popup detectee"},
],
}
resp = client.post(
"/api/v1/agents/logs", json=payload, headers=_auth_headers(token)
)
assert resp.status_code == 200, resp.text
assert resp.json()["received"] == 1
stored = store.read("lea-emilie-001")
assert len(stored) == 1
assert stored[0]["message"] == "popup detectee"
assert stored[0]["level"] == "WARNING"
def test_post_logs_without_token_returns_401(logs_client):
client, _, _ = logs_client
resp = client.post(
"/api/v1/agents/logs", json={"machine_id": "lea-001", "logs": []}
)
assert resp.status_code == 401
def test_post_logs_rejected_for_revoked_agent(logs_client):
"""Un poste révoqué ne peut plus pousser de logs (même garde-fou que stream/poll)."""
client, token, store = logs_client
_enroll(client, token, "lea-revoked")
client.post(
"/api/v1/agents/uninstall",
json={"machine_id": "lea-revoked", "reason": "admin_revoke"},
headers=_auth_headers(token),
)
resp = client.post(
"/api/v1/agents/logs",
json={"machine_id": "lea-revoked", "logs": [{"message": "x"}]},
headers=_auth_headers(token),
)
assert resp.status_code == 403, resp.text
assert resp.json()["detail"]["error"] == "agent_not_active"
assert store.read("lea-revoked") == [] # rien persisté
def test_post_logs_rejects_oversized_batch(logs_client):
"""Anti-flood (G3) : un batch trop volumineux est rejeté (413), rien persisté."""
client, token, store = logs_client
_enroll(client, token, "lea-flood")
big = [{"level": "INFO", "message": f"l{i}"} for i in range(1001)]
resp = client.post(
"/api/v1/agents/logs",
json={"machine_id": "lea-flood", "logs": big},
headers=_auth_headers(token),
)
assert resp.status_code == 413, resp.text
assert store.read("lea-flood") == []