snapshot: WIP 5j replay reliability (B1 watchdog + dialog handlers + grounding drift)

Snapshot avant correction du blocage relance Léa (3 incidents 24h: SSH refusé,
polls morts ×2). Point de rollback stable.

Contenu:
- agent_v1/core/executor.py: 5 patchs dialog handling (saveas drift, close_tab
  hotkey fallback, confirm_save Unicode apostrophe, foreground dialog
  recontextualization, runtime_dialog in-loop) + helpers normalize_window_hint,
  requires_post_verify_window_transition
- agent_v1/core/grounding.py: garde drift template fix (fallback_x/y plumbed)
- server_v1/replay_watchdog.py (NEW): orphan watchdog B1, scan 10s timeout 30s
- server_v1/api_stream.py: dispatched_action plumbing, watchdog lifespan,
  metrics endpoint
- server_v1/replay_engine.py: _schedule_retry préserve original_action +
  dispatched_action
- stream_processor.py: gardes _infer_tab_switch_target (no false switch_tab
  on save_as dialog open) + _attach_expected_window_before
- tests/integration: test_replay_watchdog.py (8 cas), test_stream_processor.py
- tests/unit: test_executor_verify_window_guard.py (start_button, close_tab,
  runtime_dialog, post_verify, transition fallbacks)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-05-24 16:48:37 +02:00
parent 5ea4960e65
commit 7df51d2c79
47 changed files with 9811 additions and 451 deletions

View File

@@ -61,7 +61,9 @@ MAX_ACTIONS_PER_REPLAY = 500 # Max actions par requête de replay
MAX_REPLAY_STATES = 1000 # Max entrées dans _replay_states
REPLAY_STATE_TTL_SECONDS = 3600 # Nettoyage auto des replays terminés après 1h
# Actions en cours de retry : action_id -> {"action": ..., "retry_count": N, "replay_id": ...}
# Actions in-flight / retry : action_id -> transport + retry metadata.
# `action` remains the semantic/original action for reporting/retry logic,
# while `dispatched_action` tracks the exact payload last sent to Lea.
_retry_pending: Dict[str, Dict[str, Any]] = {}
# Callbacks d'erreur par replay_id : replay_id -> callback_url
@@ -207,12 +209,14 @@ from .replay_engine import (
_MAX_ACTION_TEXT_LENGTH,
_MAX_KEYS_PER_COMBO,
_KNOWN_KEY_NAMES,
_auto_launch_replay_after_finalize,
_validate_replay_action,
_APP_LAUNCH_COMMANDS,
_APP_VISUAL_SEARCH,
_SETUP_IGNORE_APPS,
_extract_required_apps_from_events,
_extract_required_apps_from_workflow,
_trim_redundant_setup_events,
_resolve_launch_command,
_infer_app_from_window_titles,
_get_visual_search_info,
@@ -475,6 +479,19 @@ def _clear_replay_lock():
logger.error(f"Erreur suppression replay lock : {e}")
def _memory_window_title_for_action(action_meta: Dict[str, Any]) -> str:
"""Résoudre le meilleur window_title disponible pour la mémoire persistante."""
action_meta = action_meta or {}
target_spec = action_meta.get("target_spec") or {}
context_hints = target_spec.get("context_hints") or {}
return (
action_meta.get("expected_window_before", "")
or target_spec.get("window_title", "")
or context_hints.get("window_title", "")
or action_meta.get("window_title", "")
)
def _get_worker_queue_status() -> Dict[str, Any]:
"""Retourne l'état de la queue du worker VLM (pour le monitoring)."""
queue = []
@@ -544,6 +561,34 @@ _machine_replay_target: Dict[str, str] = {}
_replay_states: Dict[str, Dict[str, Any]] = {}
def _remove_queued_action_duplicates(session_id: str, action_id: str) -> int:
"""Retirer d'une queue les copies exactes d'une action déjà acquittée.
Le watchdog peut re-pousser une action orpheline en tête de queue. Si le
report original arrive juste après, cette copie resend doit être jetée,
sinon Léa ré-exécute la même action avec le même `action_id` et peut
toggler l'état UI (ex: touche Windows qui referme Démarrer).
"""
if not session_id or not action_id:
return 0
queue = _replay_queues.get(session_id, [])
if not queue:
return 0
filtered: List[Dict[str, Any]] = []
removed = 0
for queued_action in queue:
queued_id = str((queued_action or {}).get("action_id", "") or "")
if queued_id == action_id:
removed += 1
continue
filtered.append(queued_action)
if removed:
_replay_queues[session_id] = filtered
return removed
class StreamEvent(BaseModel):
session_id: str
timestamp: float
@@ -832,6 +877,16 @@ async def startup():
threading.Thread(target=_preload_easyocr, daemon=True, name="preload_easyocr").start()
from .replay_watchdog import get_or_create_watchdog
app.state.replay_watchdog = get_or_create_watchdog(
retry_pending=_retry_pending,
replay_queues=_replay_queues,
async_lock_factory=_async_replay_lock,
sse_notifier=None,
)
await app.state.replay_watchdog.start()
logger.info(
"API Streaming démarrée — StreamProcessor, Worker et Cleanup prêts. "
"VLM Worker dans un process séparé (run_worker.py)."
@@ -886,6 +941,9 @@ def _load_existing_workflows():
async def shutdown():
global _cleanup_running
_cleanup_running = False
watchdog = getattr(app.state, "replay_watchdog", None)
if watchdog is not None:
await watchdog.stop(timeout_s=3.0)
worker.stop()
# Nettoyer le replay lock au shutdown (sinon le worker VLM resterait bloqué)
_clear_replay_lock()
@@ -1477,17 +1535,24 @@ def _process_screenshot_thread(session_id: str, shot_id: str, path: str):
# =========================================================================
@app.post("/api/v1/traces/stream/finalize")
async def finalize(session_id: str, machine_id: str = "default"):
async def finalize(
session_id: str,
machine_id: str = "default",
launch_replay: bool = False,
):
"""Clôture la session et place le traitement en file d'attente.
Ne bloque plus : marque la session comme finalisée et l'ajoute à la queue
du worker VLM (process séparé) pour analyse + construction workflow.
Le client peut suivre la progression via GET /api/v1/traces/stream/processing/status.
Optionnellement, il peut aussi déclencher immédiatement un replay direct
depuis la session finalisée (chemin Lea-first, sans attendre le workflow VLM).
Args:
session_id: Identifiant de la session à finaliser
machine_id: Identifiant machine (informatif, le machine_id est déjà dans la session)
launch_replay: Si vrai, tente de lancer immédiatement /replay-session
"""
# Vérifier que la session existe
session = processor.session_manager.get_session(session_id)
@@ -1501,6 +1566,10 @@ async def finalize(session_id: str, machine_id: str = "default"):
processor.session_manager.finalize(session_id)
logger.info(f"Session {session_id} finalisée, ajout à la queue du worker VLM")
resolved_machine_id = machine_id
if resolved_machine_id == "default" and getattr(session, "machine_id", ""):
resolved_machine_id = session.machine_id
# Nettoyer les structures d'enrichissement temps réel pour cette session
with _enrichment_lock:
keys_to_remove = [k for k in _pending_click_enrichments if k[0] == session_id]
@@ -1521,17 +1590,70 @@ async def finalize(session_id: str, machine_id: str = "default"):
if shots_dir.exists():
full_shots_count = len(list(shots_dir.glob("shot_*_full.png")))
return {
# Patch 2026-05-23 (brief 0902 deferred-workflow) : par défaut, on
# ne propose plus le replay direct immédiat post-finalize — le chemin
# produit cible est le workflow compilé par le worker VLM. Le client
# attend la disponibilité du workflow nommé pour proposer un test.
# Le replay direct reste accessible (smoke/debug) en activant
# RPA_AUTO_LAUNCH_REPLAY_AFTER_FINALIZE=true côté serveur, OU
# en appelant explicitement POST /api/v1/traces/stream/replay-session
# depuis un outil de test.
_direct_replay_enabled = _auto_launch_replay_after_finalize()
response = {
"status": "queued_for_processing",
"session_id": session_id,
"machine_id": session.machine_id,
"screenshots_to_analyze": full_shots_count,
"replay_ready": _direct_replay_enabled,
"message": (
f"Session finalisée. {full_shots_count} screenshots seront analysés "
"en arrière-plan. Suivez la progression via "
"GET /api/v1/traces/stream/processing/status"
"GET /api/v1/traces/stream/processing/status."
),
}
if _direct_replay_enabled:
response["replay_request"] = {
"endpoint": "/api/v1/traces/stream/replay-session",
"session_id": session_id,
"machine_id": resolved_machine_id,
}
response["message"] += (
" Le replay direct est disponible via "
"POST /api/v1/traces/stream/replay-session"
)
if not launch_replay:
return response
try:
replay_result = await replay_from_session(
session_id=session_id,
machine_id=resolved_machine_id,
)
except HTTPException as exc:
logger.warning(
"Finalize %s : replay direct non lancé (%s)",
session_id,
exc.detail,
)
response["replay_launch"] = {
"status": "failed",
"status_code": exc.status_code,
"detail": exc.detail,
}
response["message"] += (
" Le lancement automatique du replay direct a échoué ; "
"la session reste finalisée et re-jouable manuellement."
)
return response
response["replay_launch"] = {
"status": "started",
"replay": replay_result,
}
response["message"] += " Le replay direct a été lancé immédiatement."
return response
# =========================================================================
@@ -2262,18 +2384,39 @@ async def replay_from_session(
if session_mem and session_mem.events:
_merge_enrichments_into_raw_events(raw_events, session_mem.events)
# ── 3. Construire le replay propre depuis les events bruts ──
# Passer le répertoire de session pour activer le visual replay (crops de référence)
# Répertoire de session utilisé par le visual replay et les anchors setup
session_dir = str(events_file.parent)
# ── 3. Préparer le setup environnement et couper le préambule source ──
setup_actions = []
app_info = _extract_required_apps_from_events(
raw_events,
session_dir=session_dir,
)
replay_raw_events = raw_events
if app_info:
setup_actions = _generate_setup_actions(app_info, setup_id_prefix="setup_sess")
if setup_actions:
replay_raw_events = _trim_redundant_setup_events(raw_events, app_info)
logger.info(
"replay-session %s : %d actions de setup préparées avant le replay "
"(app=%s, cmd=%s, raw_trim=%d%d)",
session_id, len(setup_actions),
app_info.get("primary_app"), app_info.get("primary_launch_cmd"),
len(raw_events), len(replay_raw_events),
)
# ── 4. Construire le replay propre depuis les events bruts ──
# Passer le répertoire de session pour activer le visual replay (crops de référence)
actions = build_replay_from_raw_events(
raw_events, session_id=session_id, session_dir=session_dir,
replay_raw_events, session_id=session_id, session_dir=session_dir,
)
if not actions:
raise HTTPException(
status_code=400,
detail=f"Session '{session_id}' : aucune action exploitable après nettoyage "
f"({len(raw_events)} événements bruts)"
f"({len(replay_raw_events)} événements bruts)"
)
# Limite de sécurité
@@ -2305,23 +2448,10 @@ async def replay_from_session(
if _gesture_catalog and actions:
actions = _gesture_catalog.optimize_replay_actions(actions)
# ── 3b. Setup environnement — ouvrir les applications nécessaires ──
# Analyser les événements bruts pour détecter quelles applications sont requises
# et injecter des actions de setup en tête de la queue de replay.
setup_actions = []
app_info = _extract_required_apps_from_events(raw_events)
if app_info:
setup_actions = _generate_setup_actions(app_info, setup_id_prefix="setup_sess")
if setup_actions:
actions = setup_actions + actions
logger.info(
"replay-session %s : %d actions de setup injectées avant le replay "
"(app=%s, cmd=%s)",
session_id, len(setup_actions),
app_info.get("primary_app"), app_info.get("primary_launch_cmd"),
)
if setup_actions:
actions = setup_actions + actions
# ── 4. Trouver la session de replay cible (Agent V1 actif) ──
# ── 5. Trouver la session de replay cible (Agent V1 actif) ──
# L'agent actif peut avoir une session différente de la session source
target_session_id = _find_active_agent_session(machine_id=machine_id)
if not target_session_id:
@@ -2335,7 +2465,7 @@ async def replay_from_session(
"Lancez l'Agent V1 sur le PC cible."
)
# ── 5. Injecter dans la queue de replay ──
# ── 6. Injecter dans la queue de replay ──
replay_id = f"replay_sess_{uuid.uuid4().hex[:8]}"
async with _async_replay_lock():
@@ -3265,11 +3395,35 @@ async def get_next_action(session_id: str, machine_id: str = "default"):
# NE PAS écraser si _schedule_retry a déjà mis le bon retry_count
action_id_sent = action.get("action_id", "")
if action_id_sent and action_id_sent not in _retry_pending:
now = time.time()
_retry_pending[action_id_sent] = {
"action": dict(action),
"dispatched_action": dict(action),
"retry_count": 0,
"replay_id": "",
"replay_id": owning_replay.get("replay_id", "") if owning_replay else "",
"session_id": session_id,
"machine_id": machine_id,
"dispatched_at": now,
"first_dispatched_at": now,
"resent_count": 0,
"last_resent_at": 0.0,
}
elif action_id_sent:
existing = _retry_pending.get(action_id_sent)
if existing is not None:
now = time.time()
existing.setdefault("action", dict(action))
existing["dispatched_action"] = dict(action)
existing["replay_id"] = existing.get("replay_id") or (
owning_replay.get("replay_id", "") if owning_replay else ""
)
existing["session_id"] = session_id
existing["machine_id"] = machine_id
existing["dispatched_at"] = now
if not existing.get("first_dispatched_at"):
existing["first_dispatched_at"] = now
existing.setdefault("resent_count", 0)
existing.setdefault("last_resent_at", 0.0)
# [REPLAY] log structuré pour suivre une action à travers toute la chaîne
# Grep facile : journalctl --user -u rpa-streaming -f | grep REPLAY
@@ -3400,6 +3554,15 @@ async def report_action_result(report: ReplayResultReport):
)
return {"status": "no_active_replay", "session_id": session_id}
removed_dupes = _remove_queued_action_duplicates(session_id, action_id)
if removed_dupes:
logger.warning(
"[REPLAY] REPORT cleanup session=%s action_id=%s removed_queue_duplicates=%d",
session_id,
action_id,
removed_dupes,
)
# Récupérer l'info de retry pour cette action (si c'est un retry)
retry_info = _retry_pending.pop(action_id, None)
retry_count = retry_info["retry_count"] if retry_info else 0
@@ -3631,10 +3794,7 @@ async def report_action_result(report: ReplayResultReport):
_current = _actions_meta[_idx] or {}
if _current.get("type") == "click":
_mem_target_spec = _current.get("target_spec") or {}
_mem_window_title = (
_mem_target_spec.get("window_title", "")
or _mem_target_spec.get("expected_window_before", "")
)
_mem_window_title = _memory_window_title_for_action(_current)
if _mem_window_title:
_mem_success = (
@@ -3749,6 +3909,7 @@ async def report_action_result(report: ReplayResultReport):
"target_description": f"Dialogue système : {_sys_category}",
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": _tspec_sys,
"original_action": dict(original_action or {}),
"reason": "system_dialog",
"system_dialog": _sys_info,
"error_detail": _sys_reason or (report.error or ""),
@@ -3814,6 +3975,7 @@ async def report_action_result(report: ReplayResultReport):
"target_description": _target_desc_ww,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": _tspec_ww,
"original_action": dict(original_action or {}),
"reason": "wrong_window",
"error_detail": report.error or "",
}
@@ -3888,6 +4050,7 @@ async def report_action_result(report: ReplayResultReport):
"target_description": _target_desc,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": _tspec,
"original_action": dict(original_action or {}),
"reason": "no_screen_change_strict",
"resolution_method": report.resolution_method or "",
"resolution_score": report.resolution_score or 0,
@@ -3947,6 +4110,7 @@ async def report_action_result(report: ReplayResultReport):
"target_description": target_desc,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": report.target_spec,
"original_action": dict(original_action or {}),
}
replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran"
error_entry = {
@@ -3989,6 +4153,7 @@ async def report_action_result(report: ReplayResultReport):
"target_description": target_desc,
"screenshot_b64": screenshot_after or report.screenshot,
"target_spec": report.target_spec,
"original_action": dict(original_action or {}),
}
replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran"
error_entry = {
@@ -4341,8 +4506,14 @@ async def resume_replay(
and failed_action.get("reason") != "user_request"):
# Reconstruire l'action a partir du retry_pending ou de l'original
original_action_id = failed_action["action_id"]
original = failed_action.get("original_action")
if isinstance(original, dict) and original:
original = dict(original)
else:
original = None
# Chercher l'action originale dans les retry_pending
original = _retry_pending.pop(original_action_id, {}).get("action")
if not original:
original = _retry_pending.pop(original_action_id, {}).get("action")
if not original:
# Reconstruire un minimum depuis le failed_action context
original = {
@@ -4358,8 +4529,15 @@ async def resume_replay(
# Stocker dans retry_pending pour le suivi
_retry_pending[resume_id] = {
"action": original,
"dispatched_action": dict(resume_action),
"retry_count": 0,
"replay_id": replay_id,
"session_id": session_id,
"machine_id": state.get("machine_id", "default"),
"dispatched_at": 0.0,
"first_dispatched_at": 0.0,
"resent_count": 0,
"last_resent_at": 0.0,
"reason": "resume_after_pause",
}
queue = _replay_queues.get(session_id, [])
@@ -4399,6 +4577,13 @@ async def cancel_replay(replay_id: str):
return {"status": "cancelled", "replay_id": replay_id, "session_id": session_id}
@app.get("/api/v1/traces/stream/replay/watchdog/metrics")
async def watchdog_metrics():
from .replay_watchdog import get_metrics_snapshot
return {"watchdog": get_metrics_snapshot()}
# =========================================================================
# Visual Replay — Résolution visuelle des cibles (module resolve_engine)
# =========================================================================
@@ -4545,10 +4730,13 @@ async def resolve_target(request: ResolveTargetRequest):
# Validation qualité en sortie de cascade : seuil de score + garde
# de proximité contre les coords enregistrées. Single point of
# insertion, n'altère pas la cascade existante.
# target_spec propagé pour relaxation contextuelle (switch_tab +
# som_element calibré, cf. resolve_engine.py 2026-05-22).
result = _validate_resolution_quality(
result,
request.fallback_x_pct,
request.fallback_y_pct,
target_spec=request.target_spec,
)
# Pré-check sémantique post-cascade : OCR sur une zone autour de la
@@ -4581,6 +4769,15 @@ async def resolve_target(request: ResolveTargetRequest):
_by_text = (request.target_spec.get("by_text") or "").strip()
if _by_text:
from agent_v0.server_v1.resolve_engine import _validate_text_at_position
# Propager la bbox SoM enregistrée (si présente) au
# pré-check OCR : pour les éléments étroits (onglets
# Notepad moderne, ~30-40px haut), le radius générique
# capture du texte voisin et rejette à tort.
# Patch 2026-05-23 — cf. inbox_codex/…_notepad-tab-ocr-precheck.
_som_bbox = (
(request.target_spec.get("som_element") or {})
.get("bbox_norm")
)
_is_valid, _observed, _ocr_ms = _validate_text_at_position(
tmp_path,
float(result.get("x_pct", 0) or 0),
@@ -4588,6 +4785,7 @@ async def resolve_target(request: ResolveTargetRequest):
_by_text,
effective_w,
effective_h,
som_bbox_norm=_som_bbox,
)
logger.info(
"[REPLAY] Pre-check OCR ACTIF : '%s' attendu @ (%.4f, %.4f) "
@@ -4600,7 +4798,16 @@ async def resolve_target(request: ResolveTargetRequest):
_is_valid,
_ocr_ms,
)
if not _is_valid:
# Patch 2026-05-23 : rejet uniquement si OCR a effectivement
# lu *autre chose* que la cible. Si observed est vide, l'OCR
# n'a rien lu (crop bbox SoM trop petit / contraste faible
# sur onglet Notepad moderne) — ambigu, on garde la
# résolution serveur. La garde drift ANCHOR-TM côté agent
# bloque les vrais faux positifs.
from agent_v0.server_v1.resolve_engine import (
_should_reject_on_text_mismatch,
)
if _should_reject_on_text_mismatch(_is_valid, _observed):
logger.warning(
"[REPLAY] Pre-check OCR REJET : '%s' attendu @ (%.4f, %.4f) "
"via %s mais OCR voit '%s' (%.0fms)",
@@ -4620,6 +4827,15 @@ async def resolve_target(request: ResolveTargetRequest):
"x_pct": None,
"y_pct": None,
}
elif not _is_valid:
# observed vide → on log mais on accepte
logger.info(
"[REPLAY] Pre-check OCR observed='' (crop trop "
"petit/contraste faible) — on garde la résolution "
"via %s (score=%s), garde drift agent protège en aval",
result.get("method", "?"),
result.get("score"),
)
# [REPLAY] log structuré de sortie résolution (après validation)
# Note: x_pct/y_pct peuvent être None quand le pré-check OCR rejette