diff --git a/agent_v0/agent_v1/config.py b/agent_v0/agent_v1/config.py index fbd5ebe8e..fdd64b78b 100644 --- a/agent_v0/agent_v1/config.py +++ b/agent_v0/agent_v1/config.py @@ -82,6 +82,17 @@ BLUR_SENSITIVE = os.environ.get("RPA_BLUR_SENSITIVE", "true").lower() in ("true" # Configurable via variable d'environnement pour permettre l'ajustement LOG_RETENTION_DAYS = int(os.environ.get("RPA_LOG_RETENTION_DAYS", "180")) +# Remontée automatique des logs vers le serveur (push-log-DGX). +# Diagnostic des postes clinique SANS AnyDesk : les logs (déjà écrits sur disque) +# sont poussés au serveur, rangés par machine_id, consultables au dashboard. +# Défaut PRUDENT = désactivé : on l'active poste par poste via config.txt / +# variable d'environnement, sans rebuild de l'installateur. +LOG_SHIP_ENABLED = os.environ.get("RPA_LOG_SHIP_ENABLED", "false").lower() in ( + "true", "1", "yes", +) +# Intervalle de flush du buffer de logs (secondes). +LOG_SHIP_INTERVAL_S = float(os.environ.get("RPA_LOG_SHIP_INTERVAL_S", "30")) + # Monitoring PERF_MONITOR_INTERVAL_S = 30 LOGS_DIR = BASE_DIR / "logs" diff --git a/agent_v0/agent_v1/main.py b/agent_v0/agent_v1/main.py index 085e2b822..6a9a5c415 100644 --- a/agent_v0/agent_v1/main.py +++ b/agent_v0/agent_v1/main.py @@ -17,7 +17,7 @@ import threading from .config import ( SESSIONS_ROOT, AGENT_VERSION, SERVER_URL, MACHINE_ID, LOG_RETENTION_DAYS, LOG_FILE, SCREEN_RESOLUTION, DPI_SCALE, OS_THEME, API_TOKEN, MAX_SESSION_DURATION_S, - STREAMING_ENDPOINT, + STREAMING_ENDPOINT, LOG_SHIP_ENABLED, LOG_SHIP_INTERVAL_S, ) from .core.captor import EventCaptorV1 from .core.executor import ActionExecutorV1 @@ -62,6 +62,26 @@ except Exception: for _noisy in ("urllib3", "requests.packages.urllib3", "PIL", "mss"): logging.getLogger(_noisy).setLevel(logging.WARNING) +# push-log-DGX : remontée automatique des logs vers le serveur (diagnostic des +# postes SANS AnyDesk). GARDÉ derrière RPA_LOG_SHIP_ENABLED (défaut désactivé) — +# activable poste par poste via config.txt, sans rebuild. Le handler est attaché +# au logger racine APRÈS setup_logging (les logs partent aussi dans le fichier). +_log_shipper = None +if LOG_SHIP_ENABLED: + try: + from .network.log_shipper import LogShipper + _log_shipper = LogShipper( + machine_id=MACHINE_ID, + max_batch=int(os.environ.get("RPA_AGENT_LOGS_MAX_BATCH", "1000")), + flush_interval_s=LOG_SHIP_INTERVAL_S, + ) + logging.getLogger().addHandler(_log_shipper.handler) + _log_shipper.start() + except Exception as _e: + # Ne JAMAIS empêcher Léa de démarrer pour un problème de remontée de logs. + logging.getLogger(__name__).warning("Log shipper non démarré : %s", _e) + _log_shipper = None + logger = logging.getLogger(__name__) # Intervalle de polling replay (secondes) diff --git a/agent_v0/agent_v1/network/log_shipper.py b/agent_v0/agent_v1/network/log_shipper.py new file mode 100644 index 000000000..d1f5e1fd9 --- /dev/null +++ b/agent_v0/agent_v1/network/log_shipper.py @@ -0,0 +1,317 @@ +# agent_v1/network/log_shipper.py +"""Remontée AUTOMATIQUE des logs du client Léa vers le serveur (push-log-DGX). + +But : diagnostiquer les postes Windows clinique SANS AnyDesk. Les logs déjà +écrits sur disque par `logging_setup.py` (rotation quotidienne, rétention 180 j, +Règlement IA Art. 12) sont en plus poussés au serveur, rangés par `machine_id`, +consultables au dashboard. + +Serveur (déjà prêt — NE PAS toucher) : + POST /api/v1/agents/logs + body = {machine_id: str, logs: [{ts, level, logger, message}]} + borne RPA_AGENT_LOGS_MAX_BATCH (défaut 1000) — 413 si dépassée. + +Conception : + - `LogShipperHandler(logging.Handler)` : sur `emit(record)`, formate au + schéma EXACT `{ts, level, logger, message}`, applique un assainissement + PII au message (défense en profondeur — la discipline `log_safe` à la + source logue déjà des hashes/longueurs, pas du contenu brut), puis + empile dans un buffer borné. + - `LogShipper` : flush par BATCH (≤ max_batch) via un `sender` callable + INJECTABLE `(machine_id, logs) -> bool`. Défaut = POST réel Bearer + (pattern `streamer.py`). + - Résilience (ZÉRO perte) : si `sender` renvoie False ou lève, les logs + RESTENT dans le buffer et sont rejoués au flush suivant. Le fichier de + log local reste de toute façon la source durable (survit au crash) ; le + buffer RAM est un best-effort de remontée, volontairement NON persisté en + SQLite (le `PersistentBuffer` est session/event-scoped — y mêler des logs + polluerait la DB d'events). Borne mémoire = `max_buffer` (drop des plus + VIEUX au-delà — un log récent vaut mieux qu'un vieux pour le diagnostic). + +Pattern d'import PII : on tente `anonymize_text` (server_v1.pii_sanitizer, +source de vérité des tokens typés) via le même import paresseux tolérant que +`ui/messages.py`. Sur un vrai poste (sans server_v1), on retombe sur l'identité : +acceptable car la PII de message est déjà neutralisée à la source par la +discipline `log_safe`. Le sanitizer reste INJECTABLE pour les tests/évolutions. + +Branche feat/push-log-dgx. +""" + +from __future__ import annotations + +import logging +import threading +import time +from collections import deque +from typing import Callable, Deque, Dict, List, Optional + +logger = logging.getLogger(__name__) + +# Schéma d'une entrée de log poussée au serveur. +# ts : epoch (float) — l'heure de l'évènement +# level : nom du niveau ("INFO", "WARNING"...) +# logger : nom du logger (record.name) +# message : message formaté (args interpolés) ET assaini PII + +# Défaut aligné sur la borne serveur RPA_AGENT_LOGS_MAX_BATCH (api_stream.py). +DEFAULT_MAX_BATCH = 1000 + +# Borne mémoire du buffer : au-delà, on droppe les plus VIEUX (diagnostic = +# on préfère les logs récents). Quelques milliers d'entrées = quelques Mo RAM. +DEFAULT_MAX_BUFFER = 5000 + + +# --------------------------------------------------------------------------- +# Assainissement PII du message (défense en profondeur) +# --------------------------------------------------------------------------- + +def _default_message_sanitizer(text: str) -> str: + """Sanitizer par défaut côté client = identité. + + Le **rempart PII des logs est le SERVEUR** : `sanitize_log_entries` + ré-assainit chaque message à la réception (`/api/v1/agents/logs`), via le + même `anonymize_text` que les events. Tenter un import de `server_v1` côté + poste à CHAQUE ligne de log est inutile (absent du bundle client) et coûteux + (exception attrapée par emit). La discipline `log_safe` neutralise déjà la + PII à la source. Reste INJECTABLE pour tests/évolutions. + """ + return text + + +# --------------------------------------------------------------------------- +# Handler — empile les LogRecords dans un buffer partagé +# --------------------------------------------------------------------------- + +class LogShipperHandler(logging.Handler): + """Handler logging qui sérialise chaque record et l'empile pour envoi. + + Ne fait AUCUN réseau : il alimente seulement le buffer du `LogShipper`. + L'envoi est piloté par `LogShipper.flush()` (thread dédié périodique). + """ + + def __init__( + self, + buffer: Deque[Dict], + lock: threading.Lock, + message_sanitizer: Callable[[str], str], + max_buffer: int = DEFAULT_MAX_BUFFER, + level=logging.NOTSET, + ): + super().__init__(level=level) + self._buffer = buffer + self._lock = lock + self._sanitize = message_sanitizer + self._max_buffer = max_buffer + + def _format_record(self, record: logging.LogRecord) -> Dict: + """Construit l'entrée au schéma EXACT {ts, level, logger, message}. + + `record.getMessage()` interpole les args (%s...). Le message est ensuite + passé au sanitizer PII. Tolérant : un message non formatable ne doit pas + faire perdre l'entrée. + """ + try: + message = record.getMessage() + except Exception: + message = str(record.msg) + try: + message = self._sanitize(message) + except Exception: + # Le sanitizer ne doit jamais casser le logging. + pass + return { + "ts": record.created, + "level": record.levelname, + "logger": record.name, + "message": message, + } + + def emit(self, record: logging.LogRecord) -> None: + """Sérialise et empile le record (best-effort, ne lève jamais).""" + try: + entry = self._format_record(record) + with self._lock: + # deque(maxlen) droppe automatiquement le plus VIEUX au-delà + # de la borne — pas de croissance mémoire non bornée. + self._buffer.append(entry) + except Exception: + # handleError respecte logging.raiseExceptions (silencieux en prod). + self.handleError(record) + + +# --------------------------------------------------------------------------- +# Shipper — flush périodique par batch via un sender injectable +# --------------------------------------------------------------------------- + +class LogShipper: + """Orchestre la remontée des logs : buffer + flush par batch. + + Args: + machine_id : identifiant du poste (config.MACHINE_ID en prod). + sender : callable INJECTABLE `(machine_id, logs) -> bool`. True = + accusé de réception serveur. Défaut = POST réel Bearer. + max_batch : taille max d'un batch (≤ borne serveur). Défaut 1000. + max_buffer : borne mémoire du buffer (drop des plus vieux au-delà). + message_sanitizer : assainissement PII du message. Défaut = pii_sanitizer + si disponible, sinon identité. + """ + + def __init__( + self, + machine_id: str, + sender: Optional[Callable[[str, List[Dict]], bool]] = None, + max_batch: int = DEFAULT_MAX_BATCH, + max_buffer: int = DEFAULT_MAX_BUFFER, + message_sanitizer: Optional[Callable[[str], str]] = None, + flush_interval_s: float = 30.0, + ): + self.machine_id = machine_id + self.max_batch = max(1, int(max_batch)) + self.flush_interval_s = flush_interval_s + self._sender = sender if sender is not None else self._default_sender + self._sanitize = message_sanitizer or _default_message_sanitizer + self._lock = threading.Lock() + self._buffer: Deque[Dict] = deque(maxlen=max_buffer) + self.handler = LogShipperHandler( + buffer=self._buffer, + lock=self._lock, + message_sanitizer=self._sanitize, + max_buffer=max_buffer, + ) + self._running = False + self._thread: Optional[threading.Thread] = None + + # ------------------------------------------------------------------ + # Introspection (diagnostic / tests) + # ------------------------------------------------------------------ + + def peek_buffer(self) -> List[Dict]: + """Copie des entrées en attente (lecture seule, pour diagnostic/tests).""" + with self._lock: + return list(self._buffer) + + def pending(self) -> int: + with self._lock: + return len(self._buffer) + + # ------------------------------------------------------------------ + # Flush — envoie le buffer par batches ≤ max_batch + # ------------------------------------------------------------------ + + def flush(self) -> int: + """Envoie le buffer par batches successifs. Retourne le nb de logs ACK. + + Résilience ZÉRO perte : on retire un batch du buffer, on tente l'envoi. + - Succès → les entrées sont définitivement consommées. + - Échec (False ou exception) → on REMET les entrées en tête du buffer + et on ARRÊTE la passe (serveur probablement down) ; rejeu au flush + suivant. Les entrées non encore extraites restent en place. + """ + sent = 0 + while True: + with self._lock: + if not self._buffer: + break + batch: List[Dict] = [] + for _ in range(min(self.max_batch, len(self._buffer))): + batch.append(self._buffer.popleft()) + + try: + ok = self._sender(self.machine_id, batch) + except Exception as e: + ok = False + logger.debug("Log shipper sender a levé : %s", e) + + if ok: + sent += len(batch) + continue + + # Échec : on remet le batch en tête (ordre préservé) et on arrête. + with self._lock: + self._buffer.extendleft(reversed(batch)) + break + + return sent + + # ------------------------------------------------------------------ + # Sender réel — POST Bearer (pattern streamer.py) + # ------------------------------------------------------------------ + + @staticmethod + def _auth_headers() -> dict: + """Headers Bearer (pattern streamer.py).""" + try: + from ..config import API_TOKEN + except Exception: + API_TOKEN = "" + if API_TOKEN: + return {"Authorization": f"Bearer {API_TOKEN}"} + return {} + + def _default_sender(self, machine_id: str, logs: List[Dict]) -> bool: + """POST réel vers /api/v1/agents/logs. True si HTTP 2xx. + + Best-effort : tout échec réseau/serveur → False (logs conservés, + rejoués). Aucune exception ne remonte au-delà du sender. + """ + try: + import requests + + from ..config import SERVER_URL + + url = f"{SERVER_URL}/agents/logs" + resp = requests.post( + url, + json={"machine_id": machine_id, "logs": logs}, + headers=self._auth_headers(), + timeout=5, + allow_redirects=False, + ) + return bool(resp.ok) + except Exception as e: + logger.debug("Log shipper POST échoué : %s", e) + return False + + # ------------------------------------------------------------------ + # Boucle de flush périodique (thread daemon) + # ------------------------------------------------------------------ + + def start(self) -> None: + """Démarre le thread de flush périodique (idempotent).""" + if self._running: + return + self._running = True + self._thread = threading.Thread( + target=self._flush_loop, daemon=True, name="lea-log-shipper" + ) + self._thread.start() + logger.info( + "Log shipper démarré (machine_id=%s, intervalle=%.0fs, batch≤%d)", + self.machine_id, self.flush_interval_s, self.max_batch, + ) + + def stop(self, final_flush: bool = True) -> None: + """Arrête la boucle et tente un dernier flush (best-effort).""" + self._running = False + if self._thread: + self._thread.join(timeout=2.0) + if final_flush: + try: + self.flush() + except Exception: + pass + + def _flush_loop(self) -> None: + while self._running: + # Découpe l'attente pour réagir vite à stop(). + waited = 0.0 + step = 0.5 + while self._running and waited < self.flush_interval_s: + time.sleep(step) + waited += step + if not self._running: + break + try: + self.flush() + except Exception as e: + logger.debug("Log shipper flush loop : %s", e) diff --git a/tests/unit/test_agent_v1_log_shipper.py b/tests/unit/test_agent_v1_log_shipper.py new file mode 100644 index 000000000..b537dd311 --- /dev/null +++ b/tests/unit/test_agent_v1_log_shipper.py @@ -0,0 +1,220 @@ +"""TDD — push-log-DGX : log shipper client Léa (remontée auto des logs). + +Le serveur expose déjà `POST /api/v1/agents/logs` (body +`{machine_id, logs:[{ts, level, logger, message}]}`, borne +`RPA_AGENT_LOGS_MAX_BATCH`). Côté client, on veut : + + - `LogShipperHandler(logging.Handler)` : sur `emit`, formate un LogRecord + au schéma exact `{ts, level, logger, message}`, applique un assainissement + PII au message, et empile dans un buffer. + - `LogShipper` : flush périodique du buffer par BATCH (≤ max_batch) via un + `sender` callable INJECTABLE `(machine_id, logs) -> bool`. Résilience : + si `sender` renvoie False ou lève, les logs RESTENT (rejoués au flush + suivant — ZÉRO perte ; conformité AI Act Art. 12). + +Le module est chargé par chemin (importlib) pour ne dépendre d'aucun import +lourd du package client (cf. DETTE-011/013, comme test_agent_v1_logging.py). +""" +import importlib.util +import logging +from pathlib import Path + +import pytest + +_MOD_PATH = ( + Path(__file__).resolve().parents[2] + / "agent_v0" / "agent_v1" / "network" / "log_shipper.py" +) + + +def _load_module(): + spec = importlib.util.spec_from_file_location("lea_log_shipper", _MOD_PATH) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +@pytest.fixture +def mod(): + return _load_module() + + +def _make_record(name="lea.test", level=logging.INFO, msg="hello %s", args=("world",)): + """Construit un vrai LogRecord (pas un mock) pour tester le formatage.""" + return logging.LogRecord( + name=name, level=level, pathname=__file__, lineno=1, + msg=msg, args=args, exc_info=None, + ) + + +# --------------------------------------------------------------------------- +# 1. emit formate un LogRecord au schéma exact {ts, level, logger, message} +# --------------------------------------------------------------------------- + +def test_emit_formate_au_schema_exact(mod): + shipper = mod.LogShipper(machine_id="poste-1", sender=lambda m, l: True) + handler = shipper.handler + + handler.emit(_make_record(name="lea.captor", level=logging.WARNING, + msg="bonjour %s", args=("monde",))) + + buffered = shipper.peek_buffer() + assert len(buffered) == 1 + entry = buffered[0] + # Schéma EXACT : pas de clé en plus, pas de clé en moins. + assert set(entry.keys()) == {"ts", "level", "logger", "message"} + assert entry["level"] == "WARNING" + assert entry["logger"] == "lea.captor" + assert entry["message"] == "bonjour monde" # args interpolés + assert isinstance(entry["ts"], (int, float)) + + +# --------------------------------------------------------------------------- +# 2. log_safe / assainissement PII appliqué au message avant envoi +# --------------------------------------------------------------------------- + +def test_pii_assaini_avant_envoi(mod): + # Sanitizer injecté déterministe : PII -> token (mime anonymize_text). + def fake_sanitizer(text): + return text.replace("ROSSIGNOL", "[NOM_1]") + + shipper = mod.LogShipper( + machine_id="poste-1", sender=lambda m, l: True, + message_sanitizer=fake_sanitizer, + ) + shipper.handler.emit(_make_record(msg="clic sur patient ROSSIGNOL", args=None)) + + entry = shipper.peek_buffer()[0] + assert "ROSSIGNOL" not in entry["message"] + assert "[NOM_1]" in entry["message"] + + +# --------------------------------------------------------------------------- +# 3. flush envoie un batch <= max et appelle sender(machine_id, logs) +# --------------------------------------------------------------------------- + +def test_flush_envoie_batch_borne_et_appelle_sender(mod): + calls = [] + + def sender(machine_id, logs): + calls.append((machine_id, logs)) + return True + + shipper = mod.LogShipper(machine_id="poste-42", sender=sender, max_batch=10) + for i in range(5): + shipper.handler.emit(_make_record(msg=f"event {i}", args=None)) + + sent = shipper.flush() + + assert sent == 5 + assert len(calls) == 1 + machine_id, logs = calls[0] + assert machine_id == "poste-42" + assert len(logs) == 5 + assert logs[0]["message"] == "event 0" + # Buffer vidé après succès + assert shipper.peek_buffer() == [] + + +# --------------------------------------------------------------------------- +# 4. sender échoue (False / exception) -> logs CONSERVÉS, rejoués au flush suivant +# --------------------------------------------------------------------------- + +def test_sender_echec_false_conserve_les_logs(mod): + state = {"fail": True, "received": None} + + def flaky_sender(machine_id, logs): + if state["fail"]: + return False # échec récupérable + state["received"] = list(logs) + return True + + shipper = mod.LogShipper(machine_id="p", sender=flaky_sender) + for i in range(3): + shipper.handler.emit(_make_record(msg=f"m{i}", args=None)) + + sent = shipper.flush() # échec + assert sent == 0 + assert len(shipper.peek_buffer()) == 3 # ZÉRO perte + + state["fail"] = False + sent = shipper.flush() # rejeu + assert sent == 3 + assert [e["message"] for e in state["received"]] == ["m0", "m1", "m2"] + assert shipper.peek_buffer() == [] + + +def test_sender_exception_conserve_les_logs(mod): + def exploding_sender(machine_id, logs): + raise ConnectionError("serveur down") + + shipper = mod.LogShipper(machine_id="p", sender=exploding_sender) + shipper.handler.emit(_make_record(msg="important", args=None)) + + sent = shipper.flush() # ne doit PAS propager + assert sent == 0 + assert len(shipper.peek_buffer()) == 1 # log conservé + + +# --------------------------------------------------------------------------- +# 5. buffer vide -> sender NON appelé +# --------------------------------------------------------------------------- + +def test_buffer_vide_sender_non_appele(mod): + calls = [] + shipper = mod.LogShipper( + machine_id="p", sender=lambda m, l: calls.append((m, l)) or True + ) + + sent = shipper.flush() + + assert sent == 0 + assert calls == [] + + +# --------------------------------------------------------------------------- +# 6. > max_batch entrées -> découpage en plusieurs batches +# --------------------------------------------------------------------------- + +def test_decoupage_en_plusieurs_batches(mod): + batches = [] + + def sender(machine_id, logs): + batches.append(len(logs)) + return True + + shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=3) + for i in range(7): + shipper.handler.emit(_make_record(msg=f"x{i}", args=None)) + + sent = shipper.flush() + + assert sent == 7 + # 7 entrées, max_batch=3 -> 3 + 3 + 1 + assert batches == [3, 3, 1] + # Chaque batch <= max_batch + assert all(n <= 3 for n in batches) + assert shipper.peek_buffer() == [] + + +def test_decoupage_echec_partiel_conserve_le_reste(mod): + """Si un batch intermédiaire échoue, on arrête et on garde le reste (0 perte).""" + batches = [] + + def sender(machine_id, logs): + batches.append([e["message"] for e in logs]) + # Le 2e batch échoue + return len(batches) != 2 + + shipper = mod.LogShipper(machine_id="p", sender=sender, max_batch=2) + for i in range(6): + shipper.handler.emit(_make_record(msg=f"x{i}", args=None)) + + sent = shipper.flush() + + # 1er batch (x0,x1) part ; 2e (x2,x3) échoue -> on arrête. + assert sent == 2 + assert batches[0] == ["x0", "x1"] + # x2..x5 restent dans le buffer dans l'ordre. + restant = [e["message"] for e in shipper.peek_buffer()] + assert restant == ["x2", "x3", "x4", "x5"]