"""Replay orphan watchdog for in-flight replay actions. This module watches `_retry_pending` and re-pushes actions that were dispatched by the server but never acknowledged by the Windows agent. """ from __future__ import annotations import asyncio import contextlib import logging import os import time from typing import Any, Callable, Dict, List, Optional, Tuple logger = logging.getLogger(__name__) def _env_bool(name: str, default: str) -> bool: return os.environ.get(name, default).strip().lower() in { "1", "true", "yes", "on", } def _env_float(name: str, default: float) -> float: try: return float(os.environ.get(name, str(default))) except (TypeError, ValueError): logger.warning("Watchdog: invalid env %s, fallback=%s", name, default) return default def _env_int(name: str, default: int) -> int: try: return int(os.environ.get(name, str(default))) except (TypeError, ValueError): logger.warning("Watchdog: invalid env %s, fallback=%s", name, default) return default def _env_max_resends(default: int) -> int: raw = os.environ.get("RPA_WATCHDOG_MAX_RESENDS") if raw is None or not str(raw).strip(): raw = os.environ.get("RPA_WATCHDOG_MAX_RETRIES") try: return int(raw) if raw is not None else default except (TypeError, ValueError): logger.warning("Watchdog: invalid max resend env, fallback=%s", default) return default WATCHDOG_ENABLED = _env_bool("RPA_WATCHDOG_ENABLED", "1") WATCHDOG_SCAN_INTERVAL_S = _env_float("RPA_WATCHDOG_SCAN_INTERVAL_S", 10.0) WATCHDOG_ORPHAN_TIMEOUT_S = _env_float("RPA_WATCHDOG_ORPHAN_TIMEOUT_S", 45.0) WATCHDOG_MAX_RESENDS = _env_max_resends(2) WATCHDOG_REPUSH_POSITION = ( os.environ.get("RPA_WATCHDOG_REPUSH_POSITION", "head").strip().lower() ) _metrics_lock = asyncio.Lock() _metrics: Dict[str, Any] = { "orphans_detected_total": 0, "orphans_resent_total": 0, "orphans_giveup_total": 0, "scans_total": 0, "scans_failed_total": 0, "last_scan_ts": 0.0, "last_scan_duration_ms": 0.0, "current_in_flight_count": 0, "current_orphan_count": 0, } async def _bump(key: str, delta: int = 1) -> None: async with _metrics_lock: _metrics[key] = _metrics.get(key, 0) + delta def get_metrics_snapshot() -> Dict[str, Any]: return dict(_metrics) SseNotifier = Callable[[str, str], None] class ReplayWatchdog: """Background coroutine that re-pushes orphaned replay actions.""" def __init__( self, retry_pending: Dict[str, Dict[str, Any]], replay_queues: Dict[str, List[Dict[str, Any]]], async_lock_factory: Callable[[], Any], sse_notifier: Optional[SseNotifier] = None, ) -> None: self._retry_pending = retry_pending self._replay_queues = replay_queues self._async_lock = async_lock_factory self._sse_notifier = sse_notifier self._task: Optional[asyncio.Task] = None self._stopped = asyncio.Event() async def start(self) -> None: if not WATCHDOG_ENABLED: logger.info("[WATCHDOG] disabled via RPA_WATCHDOG_ENABLED=0") return if self._task is not None and not self._task.done(): logger.warning("[WATCHDOG] already started") return self._stopped.clear() self._task = asyncio.create_task(self._run(), name="replay_watchdog") logger.info( "[WATCHDOG] started scan=%.1fs orphan_timeout=%.1fs max_resends=%d repush=%s", WATCHDOG_SCAN_INTERVAL_S, WATCHDOG_ORPHAN_TIMEOUT_S, WATCHDOG_MAX_RESENDS, WATCHDOG_REPUSH_POSITION, ) async def stop(self, timeout_s: float = 5.0) -> None: if self._task is None: return self._stopped.set() self._task.cancel() try: await asyncio.wait_for(self._task, timeout=timeout_s) except asyncio.CancelledError: pass except asyncio.TimeoutError: logger.warning("[WATCHDOG] stop timeout after %.1fs", timeout_s) except Exception: logger.exception("[WATCHDOG] unexpected stop error") self._task = None logger.info("[WATCHDOG] stopped") async def _run(self) -> None: try: while not self._stopped.is_set(): try: await asyncio.wait_for( self._stopped.wait(), timeout=WATCHDOG_SCAN_INTERVAL_S, ) break except asyncio.TimeoutError: pass try: await self._scan_once() except Exception: await _bump("scans_failed_total") logger.exception("[WATCHDOG] scan failed") except asyncio.CancelledError: logger.info("[WATCHDOG] cancelled") raise finally: logger.info("[WATCHDOG] loop terminated") async def _scan_once(self) -> Dict[str, int]: t0 = time.time() await _bump("scans_total") resent = 0 gaveup = 0 skipped = 0 in_flight = 0 orphans = 0 orphan_targets: List[Tuple[str, Dict[str, Any]]] = [] async with self._async_lock(): for action_id, info in list(self._retry_pending.items()): dispatched_at = info.get("dispatched_at", 0.0) or 0.0 if dispatched_at <= 0: skipped += 1 continue age = t0 - dispatched_at in_flight += 1 if age < WATCHDOG_ORPHAN_TIMEOUT_S: continue orphans += 1 orphan_targets.append((action_id, dict(info))) for action_id, info in orphan_targets: await _bump("orphans_detected_total") resent_count = int(info.get("resent_count", 0) or 0) if resent_count >= WATCHDOG_MAX_RESENDS: async with self._async_lock(): self._retry_pending.pop(action_id, None) age_total = t0 - float(info.get("first_dispatched_at", t0) or t0) logger.error( "[BUS] lea:dispatch_orphan_giveup action_id=%s resent=%d age_total=%.1fs " "session=%s machine=%s replay=%s", action_id, resent_count, age_total, info.get("session_id", "?"), info.get("machine_id", "?"), info.get("replay_id", "?"), ) gaveup += 1 await _bump("orphans_giveup_total") continue session_id = info.get("session_id") machine_id = info.get("machine_id", "default") action = info.get("dispatched_action") or info.get("action") if not session_id or not isinstance(action, dict): logger.warning( "[WATCHDOG] invalid schema for %s session_id=%r action_type=%s", action_id, session_id, type(action).__name__, ) async with self._async_lock(): self._retry_pending.pop(action_id, None) continue async with self._async_lock(): existing = self._retry_pending.get(action_id) if existing is None: logger.debug( "[WATCHDOG] %s acked between snapshot and resend; skip", action_id, ) continue queue = self._replay_queues.setdefault(session_id, []) if WATCHDOG_REPUSH_POSITION == "tail": queue.append(dict(action)) else: queue.insert(0, dict(action)) existing["resent_count"] = resent_count + 1 existing["last_resent_at"] = time.time() existing["dispatched_at"] = 0.0 age_total = t0 - float(info.get("first_dispatched_at", t0) or t0) logger.warning( "[BUS] lea:dispatch_orphan_resent action_id=%s resent=%d/%d age=%.1fs " "session=%s machine=%s replay=%s", action_id, resent_count + 1, WATCHDOG_MAX_RESENDS, age_total, session_id, machine_id, info.get("replay_id", "?"), ) resent += 1 await _bump("orphans_resent_total") if self._sse_notifier is not None: try: self._sse_notifier(session_id, machine_id) except Exception as exc: logger.debug("[WATCHDOG] sse notifier failed: %s", exc) elapsed_ms = (time.time() - t0) * 1000.0 async with _metrics_lock: _metrics["last_scan_ts"] = t0 _metrics["last_scan_duration_ms"] = elapsed_ms _metrics["current_in_flight_count"] = in_flight _metrics["current_orphan_count"] = orphans scans_total = _metrics["scans_total"] if orphans or gaveup: logger.info( "[METRIC] watchdog scan=%d orphans=%d resent=%d gaveup=%d " "in_flight=%d skipped=%d elapsed_ms=%.1f", scans_total, orphans, resent, gaveup, in_flight, skipped, elapsed_ms, ) return { "orphans": orphans, "resent": resent, "gaveup": gaveup, "skipped": skipped, "in_flight": in_flight, } _singleton: Optional[ReplayWatchdog] = None def get_or_create_watchdog( retry_pending: Dict[str, Dict[str, Any]], replay_queues: Dict[str, List[Dict[str, Any]]], async_lock_factory: Callable[[], Any], sse_notifier: Optional[SseNotifier] = None, ) -> ReplayWatchdog: global _singleton if _singleton is None: _singleton = ReplayWatchdog( retry_pending=retry_pending, replay_queues=replay_queues, async_lock_factory=async_lock_factory, sse_notifier=sse_notifier, ) return _singleton @contextlib.asynccontextmanager async def watchdog_lifespan( retry_pending: Dict[str, Dict[str, Any]], replay_queues: Dict[str, List[Dict[str, Any]]], async_lock_factory: Callable[[], Any], sse_notifier: Optional[SseNotifier] = None, ): watchdog = get_or_create_watchdog( retry_pending=retry_pending, replay_queues=replay_queues, async_lock_factory=async_lock_factory, sse_notifier=sse_notifier, ) await watchdog.start() try: yield watchdog finally: await watchdog.stop()