diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index aa620853b..1723b8f07 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -583,6 +583,17 @@ _AGENTS_DB_PATH = os.environ.get( ) agent_registry = AgentRegistry(db_path=_AGENTS_DB_PATH) +# push-log-DGX : store des logs poussés par les clients, rangés par machine_id +# (observabilité des postes sans AnyDesk — DETTE-020/021). +from .agent_logs_store import AgentLogsStore # noqa: E402 + +_AGENT_LOGS_DIR = os.environ.get( + "RPA_AGENT_LOGS_DIR", str(ROOT_DIR / "data" / "agent_logs") +) +# Garde-fou anti-flood (G3) : nb max d'entrées acceptées par batch. +_AGENT_LOGS_MAX_BATCH = int(os.environ.get("RPA_AGENT_LOGS_MAX_BATCH", "1000")) +agent_logs_store = AgentLogsStore(base_dir=_AGENT_LOGS_DIR) + def _agent_registry_has_entries() -> bool: try: @@ -1562,6 +1573,16 @@ class AgentUninstallRequest(BaseModel): reason: Optional[str] = None +class AgentLogsRequest(BaseModel): + """Batch de logs poussé par un client Léa (push-log-DGX). + + `logs` = liste d'entrées {ts, level, logger, message} (format libre côté + serveur ; le client garantit le PII-safe avant push). + """ + machine_id: str + logs: list[dict] = [] + + # Thread de nettoyage périodique des replays terminés et sessions expirées _cleanup_thread: Optional[threading.Thread] = None _cleanup_running = False @@ -7200,6 +7221,35 @@ async def agents_fleet(): } +@app.post("/api/v1/agents/logs") +async def agents_logs(request: AgentLogsRequest): + """Réception des logs poussés par un client Léa (push-log-DGX). + + Range les logs par machine_id (AgentLogsStore) pour consultation au + dashboard — diagnostic des postes sans AnyDesk. Mêmes garde-fous fleet + que stream/poll : un poste révoqué/inconnu est refusé (403). + """ + machine_id = (request.machine_id or "").strip() + if not machine_id: + raise HTTPException(status_code=400, detail="machine_id est obligatoire") + + if len(request.logs) > _AGENT_LOGS_MAX_BATCH: + raise HTTPException( + status_code=413, + detail={ + "error": "batch_too_large", + "max_batch": _AGENT_LOGS_MAX_BATCH, + "received": len(request.logs), + }, + ) + + # Bloque les postes révoqués/désinstallés + met à jour last_seen_at. + _guard_agent_registry_access(machine_id, endpoint="agents/logs") + + received = agent_logs_store.append(machine_id, request.logs) + return {"status": "ok", "received": received, "machine_id": machine_id} + + # ========================================================================= # R2 MVP P0 — DialogResolver (catalogue centralisé des modaux runtime) # Flag OFF par défaut. Activer en posant RPA_DIALOG_RESOLVER_ENABLED=true. diff --git a/tests/integration/test_agent_logs_api.py b/tests/integration/test_agent_logs_api.py new file mode 100644 index 000000000..b03fefe4f --- /dev/null +++ b/tests/integration/test_agent_logs_api.py @@ -0,0 +1,123 @@ +"""Tests d'intégration de l'endpoint POST /api/v1/agents/logs (push-log-DGX). + +Le client Léa pousse ses logs (batch JSON) vers le DGX ; le serveur les range +par machine_id (AgentLogsStore) pour consultation au dashboard — diagnostic des +postes sans AnyDesk. Mêmes garde-fous fleet que stream/poll (agent actif). + +Branche feat/push-log-dgx — DETTE-020/021. +""" + +from __future__ import annotations + +import sys +from pathlib import Path + +import pytest + +_ROOT = str(Path(__file__).resolve().parents[2]) +if _ROOT not in sys.path: + sys.path.insert(0, _ROOT) + + +_TEST_API_TOKEN = "test_token_logs_endpoint_0123456789abcdef" + + +@pytest.fixture +def logs_client(monkeypatch, tmp_path): + """Client FastAPI de test avec registre ET store de logs isolés sur disque.""" + monkeypatch.setenv("RPA_API_TOKEN", _TEST_API_TOKEN) + monkeypatch.setenv("RPA_AGENTS_DB_PATH", str(tmp_path / "test_agents.db")) + + from fastapi.testclient import TestClient + from agent_v0.server_v1 import api_stream + from agent_v0.server_v1.agent_registry import AgentRegistry + from agent_v0.server_v1.agent_logs_store import AgentLogsStore + + monkeypatch.setattr(api_stream, "API_TOKEN", _TEST_API_TOKEN) + test_registry = AgentRegistry(db_path=str(tmp_path / "test_agents.db")) + monkeypatch.setattr(api_stream, "agent_registry", test_registry) + test_store = AgentLogsStore(base_dir=tmp_path / "agent_logs") + monkeypatch.setattr(api_stream, "agent_logs_store", test_store, raising=False) + + client = TestClient(api_stream.app, raise_server_exceptions=False) + yield client, _TEST_API_TOKEN, test_store + + +def _auth_headers(token: str) -> dict: + return {"Authorization": f"Bearer {token}"} + + +def _enroll(client, token, machine_id): + return client.post( + "/api/v1/agents/enroll", + json={"machine_id": machine_id, "user_name": machine_id}, + headers=_auth_headers(token), + ) + + +def test_post_logs_persists_for_active_agent(logs_client): + client, token, store = logs_client + _enroll(client, token, "lea-emilie-001") + + payload = { + "machine_id": "lea-emilie-001", + "logs": [ + {"ts": "2026-06-26T16:00:00", "level": "WARNING", + "logger": "agent_v1.core.executor", "message": "popup detectee"}, + ], + } + resp = client.post( + "/api/v1/agents/logs", json=payload, headers=_auth_headers(token) + ) + + assert resp.status_code == 200, resp.text + assert resp.json()["received"] == 1 + stored = store.read("lea-emilie-001") + assert len(stored) == 1 + assert stored[0]["message"] == "popup detectee" + assert stored[0]["level"] == "WARNING" + + +def test_post_logs_without_token_returns_401(logs_client): + client, _, _ = logs_client + resp = client.post( + "/api/v1/agents/logs", json={"machine_id": "lea-001", "logs": []} + ) + assert resp.status_code == 401 + + +def test_post_logs_rejected_for_revoked_agent(logs_client): + """Un poste révoqué ne peut plus pousser de logs (même garde-fou que stream/poll).""" + client, token, store = logs_client + _enroll(client, token, "lea-revoked") + client.post( + "/api/v1/agents/uninstall", + json={"machine_id": "lea-revoked", "reason": "admin_revoke"}, + headers=_auth_headers(token), + ) + + resp = client.post( + "/api/v1/agents/logs", + json={"machine_id": "lea-revoked", "logs": [{"message": "x"}]}, + headers=_auth_headers(token), + ) + + assert resp.status_code == 403, resp.text + assert resp.json()["detail"]["error"] == "agent_not_active" + assert store.read("lea-revoked") == [] # rien persisté + + +def test_post_logs_rejects_oversized_batch(logs_client): + """Anti-flood (G3) : un batch trop volumineux est rejeté (413), rien persisté.""" + client, token, store = logs_client + _enroll(client, token, "lea-flood") + big = [{"level": "INFO", "message": f"l{i}"} for i in range(1001)] + + resp = client.post( + "/api/v1/agents/logs", + json={"machine_id": "lea-flood", "logs": big}, + headers=_auth_headers(token), + ) + + assert resp.status_code == 413, resp.text + assert store.read("lea-flood") == []