From 6461f0a21b2e31c1c44db258017ed42a0487ac36 Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 29 Jun 2026 10:39:27 +0200 Subject: [PATCH] =?UTF-8?q?feat(server):=20c=C3=A2ble=20sanitize=5Fevent?= =?UTF-8?q?=20au=20chokepoint=20stream=5Fevent=20(PII)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Assainissement PII appliqué une seule fois à l'entrée de stream_event(), avec un mapping de tokens par session (cohérence intra-session). Les chemins de persistance et de traitement (jsonl, worker.process_event_direct, shadow_observe_event, enrichissement SOM) consomment tous la copie assainie au lieu de l'event brut — plus aucune PII patient en clair côté serveur. Test de non-régression du câblage: stream_event ne doit jamais écrire de PII brute (IPP/contenu saisi) dans live_events.jsonl ni la propager au worker/shadow. Co-Authored-By: Claude Opus 4.8 (1M context) --- agent_v0/server_v1/api_stream.py | 22 +++++-- tests/unit/test_stream_event_pii_wiring.py | 68 ++++++++++++++++++++++ 2 files changed, 86 insertions(+), 4 deletions(-) create mode 100644 tests/unit/test_stream_event_pii_wiring.py diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index e7b40675d..9ef079aed 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -27,6 +27,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel +from .pii_sanitizer import sanitize_event from .replay_failure_logger import log_replay_failure from .replay_verifier import ReplayVerifier, VerificationResult from .replay_learner import ReplayLearner @@ -1922,6 +1923,11 @@ async def stream_event(data: StreamEvent): # Auto-enregistrer la session si inconnue (robustesse au redémarrage serveur) _ensure_session_registered(session_id, machine_id=machine_id) + # ── Assainissement PII : sanitize une fois, les 3 chemins reçoivent la copie ── + sanitized_event = sanitize_event( + data.event, mapping=_session_pii_mapping[session_id] + ) + # Persister sur disque (journal JSONL, dans un sous-dossier par machine si multi-machine) if machine_id and machine_id != "default": session_path = LIVE_SESSIONS_DIR / machine_id / session_id @@ -1930,21 +1936,26 @@ async def stream_event(data: StreamEvent): session_path.mkdir(parents=True, exist_ok=True) event_file = session_path / "live_events.jsonl" with open(event_file, "a", encoding="utf-8") as f: - f.write(json.dumps(data.dict()) + "\n") + f.write(json.dumps({ + "session_id": data.session_id, + "timestamp": data.timestamp, + "event": sanitized_event, + "machine_id": machine_id, + }) + "\n") # Traitement direct via StreamProcessor - result = worker.process_event_direct(session_id, data.event) + result = worker.process_event_direct(session_id, sanitized_event) # ── Observation Shadow (si mode Shadow activé pour cette session) ── # L'appel est protégé et non bloquant : si l'observer n'est pas # actif, ou s'il lève, la capture continue normalement. - shadow_observe_event(session_id, data.event) + shadow_observe_event(session_id, sanitized_event) # ── Enrichissement SomEngine temps réel pour les mouse_click ── # Après l'enregistrement de l'event, tenter l'enrichissement si le # screenshot est déjà arrivé. Sinon, l'event est mis en attente et # sera enrichi quand le screenshot arrivera (voir stream_image). - event = data.event + event = sanitized_event if event.get("type") == "mouse_click" and event.get("screenshot_id"): session = processor.session_manager.get_session(session_id) if session: @@ -1962,6 +1973,9 @@ async def stream_event(data: StreamEvent): # ========================================================================= # Ensemble des screenshots déjà analysés (évite les doublons de retry) +# Mapping PII par session — tokens cohérents intra-session (même patient → même [NOM_1]) +_session_pii_mapping: Dict[str, Dict] = defaultdict(dict) + _analyzed_shots: Dict[str, set] = defaultdict(set) # Hash du dernier screenshot analysé par session (déduplication par similarité) diff --git a/tests/unit/test_stream_event_pii_wiring.py b/tests/unit/test_stream_event_pii_wiring.py new file mode 100644 index 000000000..cc65b210a --- /dev/null +++ b/tests/unit/test_stream_event_pii_wiring.py @@ -0,0 +1,68 @@ +"""Non-régression sécurité : câblage PII au chokepoint ``stream_event``. + +Invariant : un event contenant de la PII patient (titre de fenêtre + contenu +saisi) passé à ``stream_event`` ne doit JAMAIS écrire la PII brute dans le +journal ``live_events.jsonl``, ni la propager au worker ou au shadow observer. +L'assainissement a lieu une seule fois, en amont des chemins de +persistance/traitement (``api_stream.py``, hook ``sanitize_event``). +""" +import asyncio +import json +import os + +# Le module serveur refuse de se charger sans token (sécurité prod) ; +# en test unitaire on désactive l'auth pour pouvoir importer le module. +os.environ.setdefault("RPA_AUTH_DISABLED", "true") + +import agent_v0.server_v1.api_stream as api + + +def _event_avec_pii(): + # PII captée par la couche 1 : IPP (structurel) + contenu saisi. + # Contexte = logiciel métier réel du POC (pas la maquette Easily abandonnée). + # (Les noms libres sans marqueur relèvent de la couche 2 NER — hors scope ici.) + return { + "type": "text_input", + "text": "anticoagulant 75mg matin", + "active_window_title": "Gxd5diag - Recherche dossier (IPP: 123456)", + } + + +def test_stream_event_assainit_et_propage_sur_les_chemins(tmp_path, monkeypatch): + """Le chokepoint applique sanitize_event UNE fois et tous les chemins + (jsonl, worker, shadow) reçoivent la copie assainie — pas la valeur brute.""" + captured = {} + monkeypatch.setattr(api, "_ensure_session_registered", lambda *a, **k: None) + monkeypatch.setattr( + api.worker, + "process_event_direct", + lambda sid, ev: (captured.__setitem__("worker", ev), {})[1], + ) + monkeypatch.setattr( + api, "shadow_observe_event", lambda sid, ev: captured.__setitem__("shadow", ev) + ) + monkeypatch.setattr(api, "LIVE_SESSIONS_DIR", tmp_path) + api._session_pii_mapping.pop("sess_pii", None) + + se = api.StreamEvent( + session_id="sess_pii", + machine_id="lea-test", + timestamp=1000.0, + event=_event_avec_pii(), + ) + + asyncio.run(api.stream_event(se)) + + # 1. le journal sur disque ne contient ni l'IPP brut ni le contenu saisi + jsonl = (tmp_path / "lea-test" / "sess_pii" / "live_events.jsonl").read_text( + encoding="utf-8" + ) + assert "123456" not in jsonl + assert "anticoagulant 75mg" not in jsonl + # 2. contenu saisi masqué + IPP tokenisé (preuve que le titre est traité) + assert "[SAISIE]" in jsonl + assert "[IPP_1]" in jsonl + # 3. worker et shadow reçoivent l'event assaini, pas la valeur brute + assert captured["worker"]["text"] == "[SAISIE]" + assert "123456" not in json.dumps(captured["worker"], ensure_ascii=False) + assert "123456" not in json.dumps(captured["shadow"], ensure_ascii=False)