fix(p0): secure agent revocation and R6 worker queue

This commit is contained in:
Dom
2026-06-02 15:52:35 +02:00
parent 2dd306724c
commit 7a1a5cb6fd
11 changed files with 2869 additions and 109 deletions

View File

@@ -25,6 +25,7 @@ Le worker :
5. Se suspend quand un replay est actif (libère le GPU)
"""
import json
import logging
import os
import signal
@@ -67,6 +68,7 @@ class VLMWorker:
self._running = False
self._processor = None # Initialisé au premier besoin (lazy loading GPU)
self._current_session: Optional[str] = None
self._started_at: str = datetime.now().isoformat()
# Stats
self._stats: Dict[str, int] = {
@@ -83,7 +85,10 @@ class VLMWorker:
if self._processor is None:
logger.info("Initialisation du StreamProcessor (chargement GPU)...")
from .stream_processor import StreamProcessor
self._processor = StreamProcessor(data_dir=str(LIVE_SESSIONS_DIR))
self._processor = StreamProcessor(
data_dir=str(DATA_DIR),
enable_vlm=True,
)
logger.info("StreamProcessor initialisé.")
return self._processor
@@ -98,6 +103,11 @@ class VLMWorker:
logger.info(" Sessions dir : %s", LIVE_SESSIONS_DIR)
logger.info(" Poll interval : %ds", POLL_INTERVAL)
# N2 + N3 : santé initiale + signal READY systemd dès le démarrage
# (avant tout chargement GPU, pour ne pas dépasser le timeout de start).
self._write_health("healthy")
self._sd_notify("READY=1")
while self._running:
try:
# Vérifier si un replay est actif
@@ -110,6 +120,7 @@ class VLMWorker:
if session_id:
self._process_session(session_id)
else:
self._write_health("healthy") # N2 : cycle idle
time.sleep(POLL_INTERVAL)
except KeyboardInterrupt:
@@ -119,6 +130,7 @@ class VLMWorker:
logger.error("Erreur dans la boucle principale : %s", e, exc_info=True)
time.sleep(5) # Éviter une boucle d'erreurs rapide
self._write_health("stopped") # N2 : santé finale
logger.info("VLM Worker arrêté.")
def stop(self):
@@ -126,6 +138,103 @@ class VLMWorker:
self._running = False
logger.info("Arrêt demandé.")
# =========================================================================
# N2 — Health file (_worker_health.json)
# =========================================================================
#
# Garde-fou anti-blocage silencieux : expose l'état de santé du worker sur
# disque pour qu'un superviseur (humain, dashboard, watchdog) détecte un
# worker dégradé sans avoir à fouiller les logs. Écriture atomique.
#
# CONFIDENTIALITÉ (HDS) : n'écrit AUCUNE donnée patient — uniquement des
# identifiants techniques (session_id), des compteurs et des booléens de
# composants. Jamais d'OCR, de noms de fichiers screenshots, ni de contenu
# de session.
def _sd_notify(self, state: str) -> bool:
"""Notifie systemd via $NOTIFY_SOCKET, sans dépendance `systemd.daemon`.
Implémentation pure socket (AF_UNIX SOCK_DGRAM) : fonctionne sous systemd
`Type=notify` pour `READY=1` et le heartbeat `WATCHDOG=1`. No-op silencieux
hors systemd (variable absente) ou en cas d'erreur — jamais bloquant.
Retourne True si le message a été émis.
"""
addr = os.environ.get("NOTIFY_SOCKET")
if not addr:
return False
try:
import socket
# Namespace abstrait systemd : '@' → octet nul de préfixe
connect_addr = "\0" + addr[1:] if addr.startswith("@") else addr
with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock:
sock.connect(connect_addr)
sock.sendall(state.encode("utf-8"))
return True
except Exception as e:
logger.debug("sd_notify(%s) échoué : %s", state, e)
return False
def _health_components(self) -> Dict[str, bool]:
"""Statut booléen de chaque composant lourd, dérivé du processor."""
proc = self._processor
return {
"screen_analyzer": proc is not None and getattr(proc, "_screen_analyzer", None) is not None,
"clip_embedder": proc is not None and getattr(proc, "_clip_embedder", None) is not None,
"faiss_manager": proc is not None and getattr(proc, "_faiss_manager", None) is not None,
"state_embedding_builder": proc is not None and getattr(proc, "_state_embedding_builder", None) is not None,
}
def _write_health(self, status: str) -> None:
"""Écrit data/training/_worker_health.json de façon atomique.
`status` attendu : healthy | busy | degraded | stopped. Si le worker
tourne en mode VLM mais que ScreenAnalyzer est absent, le statut est
forcé à 'degraded' quelle que soit la valeur demandée.
"""
try:
components = self._health_components()
proc = self._processor
vlm_mode = proc is not None and getattr(proc, "_enable_vlm", False)
if vlm_mode and not components["screen_analyzer"]:
status = "degraded"
queue_path = DATA_DIR / "_worker_queue.txt"
try:
queue_length = len(
[ln for ln in queue_path.read_text(encoding="utf-8").splitlines() if ln.strip()]
) if queue_path.exists() else 0
except Exception:
queue_length = 0
payload = {
"pid": os.getpid(),
"started_at": self._started_at,
"last_cycle": datetime.now().isoformat(),
"current_session": self._current_session,
"queue_length": queue_length,
"components": components,
"stats": dict(self._stats),
"status": status,
}
health_path = DATA_DIR / "_worker_health.json"
tmp_path = health_path.with_suffix(".json.tmp")
tmp_path.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
tmp_path.rename(health_path)
except Exception as e:
# Le health file est un garde-fou, jamais un point de défaillance.
logger.warning("Écriture health file échouée : %s", e)
# N3 : chaque écriture santé sert aussi de heartbeat watchdog systemd
# (sauf à l'arrêt). No-op hors systemd.
if status != "stopped":
self._sd_notify("WATCHDOG=1")
# =========================================================================
# Queue management (fichier _worker_queue.txt)
# =========================================================================
@@ -206,6 +315,9 @@ class VLMWorker:
REPLAY_WAIT_TIMEOUT,
)
break
# N3 : heartbeat pendant la pause replay (peut durer jusqu'à 120s,
# sinon le watchdog tuerait un worker pourtant sain et en attente).
self._sd_notify("WATCHDOG=1")
time.sleep(REPLAY_CHECK_INTERVAL)
elapsed = time.time() - start
@@ -220,6 +332,7 @@ class VLMWorker:
"""Traite une session complète (analyse VLM + construction workflow)."""
self._current_session = session_id
logger.info("=== Début traitement session %s ===", session_id)
self._write_health("busy") # N2 : début de session
start_time = time.time()
try:
@@ -331,6 +444,7 @@ class VLMWorker:
finally:
self._current_session = None
self._write_health("healthy") # N2 : fin de session (ou degraded auto)
logger.info("=== Fin traitement session %s ===", session_id)
@@ -347,6 +461,8 @@ class VLMWorker:
f" ({shot_id})" if shot_id else "",
)
self._write_health("busy") # N2 : heartbeat à chaque screenshot
# Vérifier si un replay est devenu actif pendant le traitement
if self._is_replay_active():
logger.info(