Files
rpa_vision_v3/server/worker_daemon.py
Dom a27b74cf22 v1.0 - Version stable: multi-PC, détection UI-DETR-1, 3 modes exécution
- Frontend v4 accessible sur réseau local (192.168.1.40)
- Ports ouverts: 3002 (frontend), 5001 (backend), 5004 (dashboard)
- Ollama GPU fonctionnel
- Self-healing interactif
- Dashboard confiance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-29 11:23:51 +01:00

129 lines
3.8 KiB
Python

#!/usr/bin/env python3
"""
server/worker_daemon.py
Fiche #21 (prod): worker "external" pour traiter la queue de sessions.
Pourquoi : séparer l'API (réponse HTTP) du traitement lourd (ScreenStates, embeddings,
FAISS, graph building) pour éviter qu'un traitement long bloque la prod.
Mode d'emploi:
# Lancer à la main
python server/worker_daemon.py
# En systemd : voir deploy/systemd/rpa-vision-v3-worker.service
Variables d'env:
RPA_WORKER_HEARTBEAT_PATH=data/runtime/health/worker_heartbeat.json
RPA_WORKER_HEARTBEAT_EVERY_S=10
RPA_TRAINING_BASE_PATH=data/training
"""
from __future__ import annotations
import json
import logging
import os
import signal
import sys
import time
from datetime import datetime
from pathlib import Path
# Ajouter le répertoire parent au path
sys.path.insert(0, str(Path(__file__).parent.parent))
# NOTE: ce script vit dans server/ ; on importe donc en "local" pour rester compatible
# avec les usages existants (api_upload.py fait pareil).
from processing_queue import get_queue, start_processing_worker, stop_processing_worker
from processing_pipeline import process_session_async
from core.system import initialize_system_cleanup, shutdown_system
logger = logging.getLogger("rpa.worker")
def _ensure_dir(p: Path) -> None:
p.parent.mkdir(parents=True, exist_ok=True)
class _GracefulStop:
stop = False
def _handle_signal(signum, frame):
logger.info(f"Received signal {signum}, stopping worker...")
_GracefulStop.stop = True
def _write_heartbeat(path: Path, payload: dict) -> None:
_ensure_dir(path)
tmp = path.with_suffix(path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(payload, f, indent=2, ensure_ascii=False)
tmp.replace(path)
def main() -> int:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
# Init cleanup (safe on server)
initialize_system_cleanup()
heartbeat_path = Path(os.getenv("RPA_WORKER_HEARTBEAT_PATH", "data/runtime/health/worker_heartbeat.json"))
every_s = int(os.getenv("RPA_WORKER_HEARTBEAT_EVERY_S", "10"))
# Capture signals
signal.signal(signal.SIGINT, _handle_signal)
signal.signal(signal.SIGTERM, _handle_signal)
# Start worker
logger.info("Starting external processing worker...")
start_processing_worker(process_session_async)
queue = get_queue()
last_hb = 0.0
try:
while not _GracefulStop.stop:
now = time.time()
if now - last_hb >= every_s:
items = queue.get_all()
payload = {
"timestamp": datetime.now().isoformat(),
"queue": {
"total": len(items),
"pending": sum(1 for i in items if i.get("status") == "pending"),
"processing": sum(1 for i in items if i.get("status") == "processing"),
"completed": sum(1 for i in items if i.get("status") == "completed"),
"failed": sum(1 for i in items if i.get("status") == "failed"),
},
"pid": os.getpid(),
}
try:
_write_heartbeat(heartbeat_path, payload)
except Exception as e:
logger.warning(f"Could not write heartbeat: {e}")
last_hb = now
time.sleep(1.0)
logger.info("Stopping external worker...")
stop_processing_worker()
shutdown_system()
return 0
except Exception as e:
logger.exception(f"Worker crashed: {e}")
try:
stop_processing_worker()
except Exception:
pass
shutdown_system()
return 2
if __name__ == "__main__":
raise SystemExit(main())