#!/usr/bin/env python3 """ server/worker_daemon.py Fiche #21 (prod): worker "external" pour traiter la queue de sessions. Pourquoi : séparer l'API (réponse HTTP) du traitement lourd (ScreenStates, embeddings, FAISS, graph building) pour éviter qu'un traitement long bloque la prod. Mode d'emploi: # Lancer à la main python server/worker_daemon.py # En systemd : voir deploy/systemd/rpa-vision-v3-worker.service Variables d'env: RPA_WORKER_HEARTBEAT_PATH=data/runtime/health/worker_heartbeat.json RPA_WORKER_HEARTBEAT_EVERY_S=10 RPA_TRAINING_BASE_PATH=data/training """ from __future__ import annotations import json import logging import os import signal import sys import time from datetime import datetime from pathlib import Path # Ajouter le répertoire parent au path sys.path.insert(0, str(Path(__file__).parent.parent)) # NOTE: ce script vit dans server/ ; on importe donc en "local" pour rester compatible # avec les usages existants (api_upload.py fait pareil). from processing_queue import get_queue, start_processing_worker, stop_processing_worker from processing_pipeline import process_session_async from core.system import initialize_system_cleanup, shutdown_system logger = logging.getLogger("rpa.worker") def _ensure_dir(p: Path) -> None: p.parent.mkdir(parents=True, exist_ok=True) class _GracefulStop: stop = False def _handle_signal(signum, frame): logger.info(f"Received signal {signum}, stopping worker...") _GracefulStop.stop = True def _write_heartbeat(path: Path, payload: dict) -> None: _ensure_dir(path) tmp = path.with_suffix(path.suffix + ".tmp") with open(tmp, "w", encoding="utf-8") as f: json.dump(payload, f, indent=2, ensure_ascii=False) tmp.replace(path) def main() -> int: logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) # Init cleanup (safe on server) initialize_system_cleanup() heartbeat_path = Path(os.getenv("RPA_WORKER_HEARTBEAT_PATH", "data/runtime/health/worker_heartbeat.json")) every_s = int(os.getenv("RPA_WORKER_HEARTBEAT_EVERY_S", "10")) # Capture signals signal.signal(signal.SIGINT, _handle_signal) signal.signal(signal.SIGTERM, _handle_signal) # Start worker logger.info("Starting external processing worker...") start_processing_worker(process_session_async) queue = get_queue() last_hb = 0.0 try: while not _GracefulStop.stop: now = time.time() if now - last_hb >= every_s: items = queue.get_all() payload = { "timestamp": datetime.now().isoformat(), "queue": { "total": len(items), "pending": sum(1 for i in items if i.get("status") == "pending"), "processing": sum(1 for i in items if i.get("status") == "processing"), "completed": sum(1 for i in items if i.get("status") == "completed"), "failed": sum(1 for i in items if i.get("status") == "failed"), }, "pid": os.getpid(), } try: _write_heartbeat(heartbeat_path, payload) except Exception as e: logger.warning(f"Could not write heartbeat: {e}") last_hb = now time.sleep(1.0) logger.info("Stopping external worker...") stop_processing_worker() shutdown_system() return 0 except Exception as e: logger.exception(f"Worker crashed: {e}") try: stop_processing_worker() except Exception: pass shutdown_system() return 2 if __name__ == "__main__": raise SystemExit(main())