feat(server): câble sanitize_event au chokepoint stream_event (PII)

Assainissement PII appliqué une seule fois à l'entrée de stream_event(),
avec un mapping de tokens par session (cohérence intra-session). Les chemins
de persistance et de traitement (jsonl, worker.process_event_direct,
shadow_observe_event, enrichissement SOM) consomment tous la copie assainie
au lieu de l'event brut — plus aucune PII patient en clair côté serveur.

Test de non-régression du câblage: stream_event ne doit jamais écrire de PII
brute (IPP/contenu saisi) dans live_events.jsonl ni la propager au worker/shadow.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-06-29 10:39:27 +02:00
parent e84cdee393
commit 6461f0a21b
2 changed files with 86 additions and 4 deletions

View File

@@ -27,6 +27,7 @@ from fastapi import BackgroundTasks, Depends, FastAPI, File, HTTPException, Requ
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from .pii_sanitizer import sanitize_event
from .replay_failure_logger import log_replay_failure
from .replay_verifier import ReplayVerifier, VerificationResult
from .replay_learner import ReplayLearner
@@ -1922,6 +1923,11 @@ async def stream_event(data: StreamEvent):
# Auto-enregistrer la session si inconnue (robustesse au redémarrage serveur)
_ensure_session_registered(session_id, machine_id=machine_id)
# ── Assainissement PII : sanitize une fois, les 3 chemins reçoivent la copie ──
sanitized_event = sanitize_event(
data.event, mapping=_session_pii_mapping[session_id]
)
# Persister sur disque (journal JSONL, dans un sous-dossier par machine si multi-machine)
if machine_id and machine_id != "default":
session_path = LIVE_SESSIONS_DIR / machine_id / session_id
@@ -1930,21 +1936,26 @@ async def stream_event(data: StreamEvent):
session_path.mkdir(parents=True, exist_ok=True)
event_file = session_path / "live_events.jsonl"
with open(event_file, "a", encoding="utf-8") as f:
f.write(json.dumps(data.dict()) + "\n")
f.write(json.dumps({
"session_id": data.session_id,
"timestamp": data.timestamp,
"event": sanitized_event,
"machine_id": machine_id,
}) + "\n")
# Traitement direct via StreamProcessor
result = worker.process_event_direct(session_id, data.event)
result = worker.process_event_direct(session_id, sanitized_event)
# ── Observation Shadow (si mode Shadow activé pour cette session) ──
# L'appel est protégé et non bloquant : si l'observer n'est pas
# actif, ou s'il lève, la capture continue normalement.
shadow_observe_event(session_id, data.event)
shadow_observe_event(session_id, sanitized_event)
# ── Enrichissement SomEngine temps réel pour les mouse_click ──
# Après l'enregistrement de l'event, tenter l'enrichissement si le
# screenshot est déjà arrivé. Sinon, l'event est mis en attente et
# sera enrichi quand le screenshot arrivera (voir stream_image).
event = data.event
event = sanitized_event
if event.get("type") == "mouse_click" and event.get("screenshot_id"):
session = processor.session_manager.get_session(session_id)
if session:
@@ -1962,6 +1973,9 @@ async def stream_event(data: StreamEvent):
# =========================================================================
# Ensemble des screenshots déjà analysés (évite les doublons de retry)
# Mapping PII par session — tokens cohérents intra-session (même patient → même [NOM_1])
_session_pii_mapping: Dict[str, Dict] = defaultdict(dict)
_analyzed_shots: Dict[str, set] = defaultdict(set)
# Hash du dernier screenshot analysé par session (déduplication par similarité)

View File

@@ -0,0 +1,68 @@
"""Non-régression sécurité : câblage PII au chokepoint ``stream_event``.
Invariant : un event contenant de la PII patient (titre de fenêtre + contenu
saisi) passé à ``stream_event`` ne doit JAMAIS écrire la PII brute dans le
journal ``live_events.jsonl``, ni la propager au worker ou au shadow observer.
L'assainissement a lieu une seule fois, en amont des chemins de
persistance/traitement (``api_stream.py``, hook ``sanitize_event``).
"""
import asyncio
import json
import os
# Le module serveur refuse de se charger sans token (sécurité prod) ;
# en test unitaire on désactive l'auth pour pouvoir importer le module.
os.environ.setdefault("RPA_AUTH_DISABLED", "true")
import agent_v0.server_v1.api_stream as api
def _event_avec_pii():
# PII captée par la couche 1 : IPP (structurel) + contenu saisi.
# Contexte = logiciel métier réel du POC (pas la maquette Easily abandonnée).
# (Les noms libres sans marqueur relèvent de la couche 2 NER — hors scope ici.)
return {
"type": "text_input",
"text": "anticoagulant 75mg matin",
"active_window_title": "Gxd5diag - Recherche dossier (IPP: 123456)",
}
def test_stream_event_assainit_et_propage_sur_les_chemins(tmp_path, monkeypatch):
"""Le chokepoint applique sanitize_event UNE fois et tous les chemins
(jsonl, worker, shadow) reçoivent la copie assainie — pas la valeur brute."""
captured = {}
monkeypatch.setattr(api, "_ensure_session_registered", lambda *a, **k: None)
monkeypatch.setattr(
api.worker,
"process_event_direct",
lambda sid, ev: (captured.__setitem__("worker", ev), {})[1],
)
monkeypatch.setattr(
api, "shadow_observe_event", lambda sid, ev: captured.__setitem__("shadow", ev)
)
monkeypatch.setattr(api, "LIVE_SESSIONS_DIR", tmp_path)
api._session_pii_mapping.pop("sess_pii", None)
se = api.StreamEvent(
session_id="sess_pii",
machine_id="lea-test",
timestamp=1000.0,
event=_event_avec_pii(),
)
asyncio.run(api.stream_event(se))
# 1. le journal sur disque ne contient ni l'IPP brut ni le contenu saisi
jsonl = (tmp_path / "lea-test" / "sess_pii" / "live_events.jsonl").read_text(
encoding="utf-8"
)
assert "123456" not in jsonl
assert "anticoagulant 75mg" not in jsonl
# 2. contenu saisi masqué + IPP tokenisé (preuve que le titre est traité)
assert "[SAISIE]" in jsonl
assert "[IPP_1]" in jsonl
# 3. worker et shadow reçoivent l'event assaini, pas la valeur brute
assert captured["worker"]["text"] == "[SAISIE]"
assert "123456" not in json.dumps(captured["worker"], ensure_ascii=False)
assert "123456" not in json.dumps(captured["shadow"], ensure_ascii=False)