feat(server): store de logs clients par machine_id (push-log-DGX, brique 1)
AgentLogsStore : append/read JSONL rangés par machine_id (fichier par jour), anti path-traversal sur machine_id (entrée réseau), purge_old rétention 30j (garde-fou G4 Qwen). TDD 3/3 vert. Pas encore wired (endpoint = brique 2). refs DETTE-020 DETTE-021 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
77
agent_v0/server_v1/agent_logs_store.py
Normal file
77
agent_v0/server_v1/agent_logs_store.py
Normal file
@@ -0,0 +1,77 @@
|
||||
"""Store des logs poussés par les clients Léa (push-log-DGX).
|
||||
|
||||
Persiste les logs reçus du client, rangés par `machine_id`, pour consultation
|
||||
au dashboard (diagnostic des postes sans AnyDesk). Stockage fichier JSONL
|
||||
(un fichier par jour et par machine_id), rétention configurable.
|
||||
|
||||
DETTE-020/021 (observabilité). Branche feat/push-log-dgx.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import re
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
# machine_id = entrée réseau → neutraliser tout caractère hors liste blanche
|
||||
# (anti path-traversal : '/', '\\', '..' ne doivent pas s'échapper du base_dir).
|
||||
_SAFE_MACHINE_ID_RE = re.compile(r"[^A-Za-z0-9._-]")
|
||||
|
||||
|
||||
class AgentLogsStore:
|
||||
"""Persiste et relit les logs clients rangés par machine_id (JSONL)."""
|
||||
|
||||
def __init__(self, base_dir: str | Path = "data/agent_logs"):
|
||||
self.base_dir = Path(base_dir)
|
||||
self.base_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def _machine_dir(self, machine_id: str) -> Path:
|
||||
safe = _SAFE_MACHINE_ID_RE.sub("_", machine_id or "").strip("._") or "unknown"
|
||||
d = self.base_dir / safe
|
||||
d.mkdir(parents=True, exist_ok=True)
|
||||
return d
|
||||
|
||||
def append(self, machine_id: str, entries: list[dict]) -> int:
|
||||
"""Ajoute un batch de logs pour un poste. Retourne le nb de lignes écrites."""
|
||||
if not entries:
|
||||
return 0
|
||||
now = datetime.now(timezone.utc)
|
||||
day_file = self._machine_dir(machine_id) / f"{now.date().isoformat()}.jsonl"
|
||||
with day_file.open("a", encoding="utf-8") as f:
|
||||
for entry in entries:
|
||||
record = dict(entry)
|
||||
record.setdefault("received_at", now.isoformat())
|
||||
f.write(json.dumps(record, ensure_ascii=False) + "\n")
|
||||
return len(entries)
|
||||
|
||||
def read(self, machine_id: str) -> list[dict]:
|
||||
"""Relit toutes les entrées d'un poste, triées par fichier (date) puis ordre d'écriture."""
|
||||
d = self._machine_dir(machine_id)
|
||||
out: list[dict] = []
|
||||
for jsonl in sorted(d.glob("*.jsonl")):
|
||||
with jsonl.open(encoding="utf-8") as f:
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
if line:
|
||||
out.append(json.loads(line))
|
||||
return out
|
||||
|
||||
def purge_old(self, retention_days: int = 30, now: datetime | None = None) -> int:
|
||||
"""Supprime les fichiers-jour antérieurs à la rétention. Retourne le nb supprimé.
|
||||
|
||||
Rétention basée sur la date encodée dans le nom du fichier (`YYYY-MM-DD.jsonl`),
|
||||
pas sur le mtime (déterministe, non altérable). `now` injectable pour les tests.
|
||||
"""
|
||||
now = now or datetime.now(timezone.utc)
|
||||
cutoff = (now - timedelta(days=retention_days)).date()
|
||||
removed = 0
|
||||
for jsonl in self.base_dir.rglob("*.jsonl"):
|
||||
try:
|
||||
file_date = datetime.strptime(jsonl.stem, "%Y-%m-%d").date()
|
||||
except ValueError:
|
||||
continue # nom inattendu → on ne touche pas
|
||||
if file_date < cutoff:
|
||||
jsonl.unlink()
|
||||
removed += 1
|
||||
return removed
|
||||
78
tests/unit/test_agent_logs_store.py
Normal file
78
tests/unit/test_agent_logs_store.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Tests unitaires du store de logs poussés par les clients Léa (push-log-DGX).
|
||||
|
||||
Le store persiste les logs reçus du client, rangés par `machine_id`, pour
|
||||
consultation au dashboard (diagnostic des postes sans AnyDesk). Stockage
|
||||
fichier (JSONL par machine_id), rétention configurable.
|
||||
|
||||
Branche : feat/push-log-dgx — DETTE-020/021 (observabilité).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Racine projet pour les imports locaux (meme pattern que tests/integration)
|
||||
_ROOT = str(Path(__file__).resolve().parents[2])
|
||||
if _ROOT not in sys.path:
|
||||
sys.path.insert(0, _ROOT)
|
||||
|
||||
|
||||
def test_append_then_read_roundtrip(tmp_path):
|
||||
"""append() persiste un batch ; read() le restitue dans l'ordre."""
|
||||
from agent_v0.server_v1.agent_logs_store import AgentLogsStore
|
||||
|
||||
store = AgentLogsStore(base_dir=tmp_path / "agent_logs")
|
||||
entries = [
|
||||
{"ts": "2026-06-26T16:00:00", "level": "INFO",
|
||||
"logger": "agent_v1.main", "message": "demarrage"},
|
||||
{"ts": "2026-06-26T16:00:01", "level": "WARNING",
|
||||
"logger": "agent_v1.core.executor", "message": "popup detectee"},
|
||||
]
|
||||
|
||||
store.append("lea-emilie-001", entries)
|
||||
got = store.read("lea-emilie-001")
|
||||
|
||||
assert len(got) == 2
|
||||
assert got[0]["message"] == "demarrage"
|
||||
assert got[0]["level"] == "INFO"
|
||||
assert got[1]["level"] == "WARNING"
|
||||
assert got[1]["logger"] == "agent_v1.core.executor"
|
||||
|
||||
|
||||
def test_machine_id_path_traversal_stays_within_base(tmp_path):
|
||||
"""Un machine_id malveillant (entrée réseau) ne doit jamais écrire hors du base_dir."""
|
||||
from agent_v0.server_v1.agent_logs_store import AgentLogsStore
|
||||
|
||||
base = (tmp_path / "agent_logs").resolve()
|
||||
store = AgentLogsStore(base_dir=base)
|
||||
|
||||
store.append("../../../evil", [{"message": "pwn"}])
|
||||
|
||||
written = list(base.rglob("*.jsonl"))
|
||||
assert written, "le batch doit être persisté SOUS base (pas d'évasion ni perte)"
|
||||
for p in written:
|
||||
assert base in p.resolve().parents, f"{p} échappe à {base}"
|
||||
# Aucune fuite hors de base
|
||||
assert not list(tmp_path.glob("evil*"))
|
||||
|
||||
|
||||
def test_purge_old_removes_files_older_than_retention(tmp_path):
|
||||
"""purge_old() supprime les fichiers-jour antérieurs à la rétention (G4 Qwen)."""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from agent_v0.server_v1.agent_logs_store import AgentLogsStore
|
||||
|
||||
base = tmp_path / "agent_logs"
|
||||
store = AgentLogsStore(base_dir=base)
|
||||
mdir = base / "lea-001"
|
||||
mdir.mkdir(parents=True)
|
||||
(mdir / "2026-05-01.jsonl").write_text('{"message": "vieux"}\n', encoding="utf-8")
|
||||
(mdir / "2026-06-26.jsonl").write_text('{"message": "recent"}\n', encoding="utf-8")
|
||||
|
||||
now = datetime(2026, 6, 26, tzinfo=timezone.utc)
|
||||
removed = store.purge_old(retention_days=30, now=now)
|
||||
|
||||
remaining = {p.name for p in mdir.glob("*.jsonl")}
|
||||
assert remaining == {"2026-06-26.jsonl"}
|
||||
assert removed == 1
|
||||
Reference in New Issue
Block a user