From 7a1a5cb6fda473563c60b1ebf07e50a405a2725a Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 2 Jun 2026 15:52:35 +0200 Subject: [PATCH] fix(p0): secure agent revocation and R6 worker queue --- agent_v0/server_v1/agent_registry.py | 18 +- agent_v0/server_v1/api_stream.py | 1567 ++++++++++++++++- agent_v0/server_v1/run_worker.py | 118 +- agent_v0/server_v1/safety_checks_provider.py | 81 +- agent_v0/server_v1/stream_processor.py | 331 +++- agent_v0/server_v1/worker_stream.py | 12 +- .../test_replay_single_inflight.py | 10 +- tests/integration/test_stream_processor.py | 459 +++++ tests/unit/test_api_stream_auth_p0bc.py | 23 +- tests/unit/test_api_stream_revocation_gaps.py | 350 ++++ tests/unit/test_safety_checks_provider.py | 9 + 11 files changed, 2869 insertions(+), 109 deletions(-) create mode 100644 tests/unit/test_api_stream_revocation_gaps.py diff --git a/agent_v0/server_v1/agent_registry.py b/agent_v0/server_v1/agent_registry.py index 9fa325052..b1ca759b4 100644 --- a/agent_v0/server_v1/agent_registry.py +++ b/agent_v0/server_v1/agent_registry.py @@ -173,6 +173,9 @@ class AgentRegistry: # Deja enrolle et actif -> conflit explicit raise AgentAlreadyEnrolledError(dict(existing)) + if existing["uninstall_reason"] == "admin_revoke": + raise AgentRevokedError(dict(existing)) + # Agent desinstalle : reactivation si autorise (defaut) if not allow_reactivate: raise AgentAlreadyEnrolledError(dict(existing)) @@ -273,13 +276,15 @@ class AgentRegistry: """Met a jour last_seen_at (appel depuis le stream / heartbeat). Silencieux si l'agent est inconnu (evite les erreurs sur vieux clients). + Ne reactive jamais un agent desinstalle/revoque. """ if not machine_id: return now = _utc_now_iso() with _DB_LOCK, self._connect() as conn: conn.execute( - "UPDATE enrolled_agents SET last_seen_at = ? WHERE machine_id = ?", + "UPDATE enrolled_agents SET last_seen_at = ? " + "WHERE machine_id = ? AND status = 'active'", (now, machine_id), ) conn.commit() @@ -294,3 +299,14 @@ class AgentAlreadyEnrolledError(Exception): f"machine_id={existing_row.get('machine_id')} deja enrole " f"(status={existing_row.get('status')})" ) + + +class AgentRevokedError(Exception): + """Levee si un administrateur a revoque ce machine_id.""" + + def __init__(self, existing_row: Dict[str, Any]): + self.existing = existing_row + super().__init__( + f"machine_id={existing_row.get('machine_id')} revoque " + f"(reason={existing_row.get('uninstall_reason')})" + ) diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 050d3ea66..fdd698015 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -31,7 +31,7 @@ from .replay_failure_logger import log_replay_failure from .replay_verifier import ReplayVerifier, VerificationResult from .replay_learner import ReplayLearner from .audit_trail import AuditTrail, AuditEntry -from .agent_registry import AgentRegistry, AgentAlreadyEnrolledError +from .agent_registry import AgentRegistry, AgentAlreadyEnrolledError, AgentRevokedError from .stream_processor import StreamProcessor, build_replay_from_raw_events, enrich_click_from_screenshot from .worker_stream import StreamWorker from .monitor_router import resolve_target_monitor # QW1 — résolution écran cible @@ -51,6 +51,15 @@ except Exception: # pragma: no cover - allows partial deployments find_competence = None COMPETENCE_API_AVAILABLE = False +try: + from agent_v0.agent_v1.ui.message_contract import ( + coerce_supervised_pause_message, + warn_visible_message, + ) +except Exception: # pragma: no cover - fallback for partial agent deployments + coerce_supervised_pause_message = None + warn_visible_message = None + # Pipeline d'anonymisation PII (OCR + NER côté serveur). # Import paresseux : on ne charge pas docTR tant qu'aucune image n'est reçue. try: @@ -107,6 +116,140 @@ MAX_ACTIONS_PER_REPLAY = 500 # Max actions par requête de replay MAX_REPLAY_STATES = 1000 # Max entrées dans _replay_states REPLAY_STATE_TTL_SECONDS = 3600 # Nettoyage auto des replays terminés après 1h + +def _base_replay_action_id(action_id: str) -> str: + """Return the original action id for retry-generated action ids.""" + base = str(action_id or "") + while True: + if base.endswith("_resume"): + base = base[: -len("_resume")] + continue + head, sep, tail = base.rpartition("_retry") + if sep and tail.isdigit(): + base = head + continue + return base + + +def _is_synthetic_retry_wait(action_id: str) -> bool: + """True for wait actions injected by the retry scheduler.""" + return str(action_id or "").startswith("wait_retry_") + + +def _bump_completed_actions(replay_state: Dict[str, Any], action_id: str) -> None: + """Advance original replay progress once per semantic action.""" + if _is_synthetic_retry_wait(action_id): + return + + completed_ids = replay_state.setdefault("_completed_action_ids", {}) + base_action_id = _base_replay_action_id(action_id) + if completed_ids.get(base_action_id): + return + + completed_ids[base_action_id] = True + total = int(replay_state.get("total_actions", 0) or 0) + replay_state["completed_actions"] = min( + int(replay_state.get("completed_actions", 0) or 0) + 1, + total, + ) + replay_state["current_action_index"] = min( + int(replay_state.get("current_action_index", 0) or 0) + 1, + total, + ) + + +def _pause_action_parameters(action: Dict[str, Any]) -> Dict[str, Any]: + """Return pause parameters, accepting both nested and top-level formats.""" + action = action or {} + params = dict(action.get("parameters") or {}) + for key in ("message", "safety_level", "safety_checks", "pause_reason"): + if key not in params or params.get(key) in (None, "", []): + if action.get(key) not in (None, "", []): + params[key] = action.get(key) + return params + + +def _pause_action_message(action: Dict[str, Any]) -> str: + """Build the human-facing text for a pause_for_human action.""" + params = _pause_action_parameters(action) + raw_message = ( + params.get("message") + or (action or {}).get("message") + or params.get("request") + or params.get("demande") + or "" + ) + return _coerce_pause_message( + raw_message, + intention=( + params.get("intention") + or (action or {}).get("intention") + or (action or {}).get("description") + ), + attendu=( + params.get("attendu") + or params.get("expected") + or (action or {}).get("expected") + ), + vu=( + params.get("vu") + or params.get("observed") + or (action or {}).get("observed") + ), + demande=params.get("demande") or params.get("request"), + ) + + +def _coerce_pause_message( + message: Any = "", + *, + intention: Any = "", + attendu: Any = "", + vu: Any = "", + demande: Any = "", +) -> str: + """Return a structured, human-readable supervised pause message.""" + if warn_visible_message is not None: + warn_visible_message( + message, + source="api_stream._coerce_pause_message.raw", + supervised_pause=False, + ) + + if coerce_supervised_pause_message is not None: + result = coerce_supervised_pause_message( + message, + intention=intention, + attendu=attendu, + vu=vu, + demande=demande, + ) + if warn_visible_message is not None: + warn_visible_message( + result, + source="api_stream._coerce_pause_message.final", + supervised_pause=True, + ) + return result + + fallback_request = "indiquer si je peux continuer ou corriger l'action attendue" + result = "\n".join( + ( + f"J'essaie de : {intention or 'continuer une etape supervisee'}", + f"J'attendais : {attendu or 'un accord humain clair avant de continuer'}", + f"Je vois : {vu or 'je suis sur une etape qui demande une verification humaine'}", + f"Peux-tu : {demande or message or fallback_request}", + ) + ) + if warn_visible_message is not None: + warn_visible_message( + result, + source="api_stream._coerce_pause_message.final_fallback", + supervised_pause=True, + ) + return result + + # Actions in-flight / retry : action_id -> transport + retry metadata. # `action` remains the semantic/original action for reporting/retry logic, # while `dispatched_action` tracks the exact payload last sent to Lea. @@ -192,16 +335,14 @@ else: # Endpoints publics (pas besoin de token) # En production, /docs et /redoc sont désactivés (voir ci-dessous) # Paths publics : pas de token requis -# /replay/next est public car l'agent Rust legacy n'envoie pas de token -# et c'est un endpoint read-only (polling, pas d'écriture) -# # Fix P0-B : /api/v1/traces/stream/image RETIRÉ de la liste publique. # L'upload d'image écrit sur disque + déclenche du travail VLM : exiger # un token Bearer. Tous les agents V1 déployés envoient déjà le token # (cf. agent_v0/agent_v1/network/streamer.py:_auth_headers). +# P0 révocation : /replay/next est également protégé. Le polling divulgue et +# distribue des actions ; un agent révoqué ne doit plus pouvoir le joindre. _PUBLIC_PATHS = { "/health", "/docs", "/openapi.json", "/redoc", - "/api/v1/traces/stream/replay/next", } @@ -437,6 +578,119 @@ _AGENTS_DB_PATH = os.environ.get( agent_registry = AgentRegistry(db_path=_AGENTS_DB_PATH) +def _agent_registry_has_entries() -> bool: + try: + return ( + agent_registry.count_by_status("active") + + agent_registry.count_by_status("uninstalled") + ) > 0 + except Exception as exc: + logger.exception("[FLEET] registre indisponible pendant le strict check") + raise HTTPException( + status_code=503, + detail={"error": "agent_registry_unavailable"}, + ) from exc + + +def _guard_agent_registry_access( + machine_id: str | None, + *, + endpoint: str, +) -> Optional[Dict[str, Any]]: + """Bloque les postes connus mais revoques/desinstalles. + + Le POC utilise encore un token global : on ne peut donc pas empecher un + poste compromis d'usurper un autre machine_id actif. Ce garde rend au + moins effective la revocation du machine_id declare, sans casser les vieux + clients ou tests qui ne sont pas encore enroles dans la fleet. + """ + normalized_machine_id = (machine_id or "").strip() + if not normalized_machine_id or normalized_machine_id == "default": + if _agent_registry_has_entries(): + logger.warning( + "[FLEET] acces refuse endpoint=%s machine_id=%r " + "(enrollment obligatoire)", + endpoint, + machine_id, + ) + raise HTTPException( + status_code=403, + detail={ + "error": "agent_enrollment_required", + "machine_id": normalized_machine_id or "", + }, + ) + return None + + try: + row = agent_registry.get(normalized_machine_id) + except Exception as exc: + logger.exception( + "[FLEET] registre indisponible endpoint=%s machine_id=%s", + endpoint, + normalized_machine_id, + ) + raise HTTPException( + status_code=503, + detail={ + "error": "agent_registry_unavailable", + "machine_id": normalized_machine_id, + }, + ) from exc + + if row is None: + if _agent_registry_has_entries(): + logger.warning( + "[FLEET] acces refuse endpoint=%s machine_id=%s inconnu", + endpoint, + normalized_machine_id, + ) + raise HTTPException( + status_code=403, + detail={ + "error": "agent_unknown", + "machine_id": normalized_machine_id, + }, + ) + return None + + if row.get("status") != "active": + logger.warning( + "[FLEET] acces refuse endpoint=%s machine_id=%s status=%s reason=%s", + endpoint, + normalized_machine_id, + row.get("status"), + row.get("uninstall_reason"), + ) + raise HTTPException( + status_code=403, + detail={ + "error": "agent_not_active", + "machine_id": normalized_machine_id, + "status": row.get("status"), + "uninstall_reason": row.get("uninstall_reason"), + }, + ) + + try: + agent_registry.touch_last_seen(normalized_machine_id) + except Exception as exc: + logger.exception( + "[FLEET] echec last_seen endpoint=%s machine_id=%s", + endpoint, + normalized_machine_id, + ) + raise HTTPException( + status_code=503, + detail={ + "error": "agent_registry_unavailable", + "machine_id": normalized_machine_id, + }, + ) from exc + + return row + + # ========================================================================= # Flush garanti à l'arrêt — signal handler + atexit (ceinture et bretelles) # ========================================================================= @@ -499,8 +753,17 @@ def _enqueue_to_worker(session_id: str): f.write(session_id + "\n") logger.info(f"Session {session_id} ajoutée à la queue worker ({WORKER_QUEUE_FILE})") - except Exception as e: - logger.error(f"Erreur écriture queue worker : {e}") + except Exception as exc: + # P0.1 R6 (2026-06-01) : promotion en CRITICAL + exc_info pour diagnostic + # du fail silencieux observé depuis le 27 mai (queue jamais alimentée). + # Comportement conservé : on n'élève pas l'exception (silent skip POC). + logger.critical( + "Failed to enqueue session %s to worker queue %s: %s", + session_id, + WORKER_QUEUE_FILE, + exc, + exc_info=True, + ) def _set_replay_lock(replay_id: str = ""): @@ -621,13 +884,23 @@ def _remove_queued_action_duplicates(session_id: str, action_id: str) -> int: if not queue: return 0 + base_action_id = _base_replay_action_id(action_id) filtered: List[Dict[str, Any]] = [] removed = 0 - for queued_action in queue: + for idx, queued_action in enumerate(queue): queued_id = str((queued_action or {}).get("action_id", "") or "") - if queued_id == action_id: + queued_base = _base_replay_action_id(queued_id) + if queued_id == action_id or (base_action_id and queued_base == base_action_id): removed += 1 continue + + if _is_synthetic_retry_wait(queued_id): + next_action = queue[idx + 1] if idx + 1 < len(queue) else {} + next_id = str((next_action or {}).get("action_id", "") or "") + if base_action_id and _base_replay_action_id(next_id) == base_action_id: + removed += 1 + continue + filtered.append(queued_action) if removed: @@ -635,6 +908,30 @@ def _remove_queued_action_duplicates(session_id: str, action_id: str) -> int: return removed +def _remove_retry_pending_duplicates( + session_id: str, + replay_id: str, + action_id: str, +) -> int: + """Remove stale retry/resume bookkeeping for an already completed action.""" + if not action_id: + return 0 + base_action_id = _base_replay_action_id(action_id) + removed = 0 + for pending_id, pending in list(_retry_pending.items()): + if pending_id == action_id: + continue + if session_id and pending.get("session_id") != session_id: + continue + if replay_id and pending.get("replay_id") != replay_id: + continue + if _base_replay_action_id(pending_id) == base_action_id: + _retry_pending.pop(pending_id, None) + removed += 1 + + return removed + + def _find_in_flight_action( session_id: str, machine_id: str, @@ -769,6 +1066,376 @@ class ReplayResultReport(BaseModel): needs_human: Optional[bool] = None +_GENERIC_TARGET_DESCRIPTIONS = { + "", + "un element", + "un élément", + "element", + "élément", + "element inconnu", + "élément inconnu", + "cette action", + "visual resolve echoue", + "visual resolve échoué", + "l'element interactif principal", + "l'élément interactif principal", + "une cible visuelle non decrite", + "une cible visuelle non décrite", +} +_NO_VISIBLE_CHANGE_CODES = { + "no_screen_change", + "no_visible_change", + "no_visual_change", + "screen_unchanged", +} +_TARGET_NOT_FOUND_CODES = {"target_not_found", "not_found"} +_VISUAL_RESOLVE_FAILURE_CODES = { + "visual_resolve_failed", + "visual_resolve_error", + "template_matching_failed", +} + + +def _report_code(value: Any) -> str: + return str(value or "").strip().lower().replace("-", "_").replace(" ", "_") + + +def _report_has_code(report: ReplayResultReport, codes: set[str]) -> bool: + return _report_code(report.error) in codes or _report_code(report.warning) in codes + + +def _is_no_visible_change_report(report: ReplayResultReport) -> bool: + if _report_has_code(report, _NO_VISIBLE_CHANGE_CODES): + return True + text = f"{report.error or ''} {report.warning or ''}".lower() + return "no_screen_change" in text or "no_visible_change" in text + + +def _is_target_not_found_report(report: ReplayResultReport) -> bool: + if _report_has_code(report, _TARGET_NOT_FOUND_CODES): + return True + text = f"{report.error or ''} {report.warning or ''}".lower() + return "target_not_found" in text + + +def _is_visual_resolve_failed_report(report: ReplayResultReport) -> bool: + if _report_has_code(report, _VISUAL_RESOLVE_FAILURE_CODES): + return True + text = f"{report.error or ''} {report.warning or ''}".lower() + return "visual resolve" in text or "visual_resolve" in text + + +def _useful_target_description(value: Any) -> str: + text = str(value or "").strip() + if not text: + return "" + lower = text.lower() + if lower in _GENERIC_TARGET_DESCRIPTIONS: + return "" + if lower.startswith("la cible visuelle anchor_"): + return "" + if lower.startswith("cible visuelle anchor_"): + return "" + return text[:120] + + +def _current_replay_action( + replay_state: Dict[str, Any], + action_id: str, + original_action: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + if isinstance(original_action, dict) and original_action: + return original_action + + actions = replay_state.get("actions", []) or [] + base_action_id = _base_replay_action_id(action_id) + idx = int(replay_state.get("current_action_index", 0) or 0) + if 0 <= idx < len(actions): + current = actions[idx] or {} + if not base_action_id or current.get("action_id") == base_action_id: + return current + + for action in actions: + if (action or {}).get("action_id") == base_action_id: + return action or {} + return {} + + +def _action_parameters(action: Dict[str, Any]) -> Dict[str, Any]: + params = action.get("parameters") if isinstance(action, dict) else {} + return params if isinstance(params, dict) else {} + + +def _visual_anchor_from_params(params: Dict[str, Any]) -> Dict[str, Any]: + visual_anchor = params.get("visual_anchor") if isinstance(params, dict) else {} + return visual_anchor if isinstance(visual_anchor, dict) else {} + + +def _anchor_id_from_context( + target_spec: Dict[str, Any], + action: Dict[str, Any], + params: Dict[str, Any], + visual_anchor: Dict[str, Any], +) -> str: + return str( + target_spec.get("anchor_id") + or action.get("anchor_id") + or visual_anchor.get("anchor_id") + or visual_anchor.get("id") + or params.get("anchor_id") + or "" + ).strip() + + +def _load_visual_anchor_context(anchor_id: str) -> Dict[str, str]: + if not anchor_id: + return {} + try: + import sqlite3 + + db_path = ROOT_DIR / "visual_workflow_builder" / "backend" / "instance" / "workflows.db" + if not db_path.exists(): + return {} + uri = f"file:{db_path}?mode=ro" + with sqlite3.connect(uri, uri=True, timeout=0.2) as conn: + row = conn.execute( + "SELECT target_text, description, ocr_description " + "FROM visual_anchors WHERE id=?", + (anchor_id,), + ).fetchone() + if not row: + return {} + return { + "target_text": str(row[0] or "").strip(), + "description": str(row[1] or "").strip(), + "ocr_description": str(row[2] or "").strip(), + } + except Exception as exc: + logger.debug("visual_anchor context lookup skipped for %s: %s", anchor_id, exc) + return {} + + +def _failed_target_context( + report: ReplayResultReport, + replay_state: Dict[str, Any], + action_id: str, + original_action: Optional[Dict[str, Any]], +) -> Dict[str, Any]: + action = _current_replay_action(replay_state, action_id, original_action) + report_tspec = report.target_spec if isinstance(report.target_spec, dict) else {} + action_tspec = action.get("target_spec") if isinstance(action.get("target_spec"), dict) else {} + target_spec = dict(action_tspec or {}) + target_spec.update(report_tspec or {}) + params = _action_parameters(action) + visual_anchor = _visual_anchor_from_params(params) + anchor_id = _anchor_id_from_context(target_spec, action, params, visual_anchor) + anchor_ctx = _load_visual_anchor_context(anchor_id) + + for key in ("target_text", "description", "ocr_description"): + if anchor_ctx.get(key) and not target_spec.get(key): + target_spec[key] = anchor_ctx[key] + if anchor_id and not target_spec.get("anchor_id"): + target_spec["anchor_id"] = anchor_id + + candidates = ( + target_spec.get("by_text"), + target_spec.get("target_text"), + params.get("target_text"), + visual_anchor.get("target_text"), + anchor_ctx.get("target_text"), + report.target_description, + target_spec.get("ocr_description"), + visual_anchor.get("ocr_description"), + anchor_ctx.get("ocr_description"), + target_spec.get("description"), + visual_anchor.get("description"), + anchor_ctx.get("description"), + target_spec.get("vlm_description"), + action.get("intention"), + ) + target_desc = "" + for candidate in candidates: + target_desc = _useful_target_description(candidate) + if target_desc: + break + if not target_desc: + target_desc = ( + f"cible visuelle {anchor_id[:24]}" if anchor_id else "cible visuelle non décrite" + ) + + descriptor_blob = " ".join( + str(v or "") + for v in ( + target_desc, + target_spec.get("description"), + target_spec.get("ocr_description"), + anchor_ctx.get("description"), + anchor_ctx.get("ocr_description"), + ) + ).lower() + return { + "action": action, + "target_spec": target_spec, + "target_description": target_desc, + "anchor_id": anchor_id, + "is_word_icon": "word" in descriptor_blob or "rapport urgenc" in descriptor_blob, + } + + +def _pause_message_for_failed_target(context: Dict[str, Any], reason: str) -> str: + action = context.get("action") or {} + params = _action_parameters(action) + target_desc = context.get("target_description") or "cible visuelle non décrite" + button = str(action.get("button") or params.get("button") or "").lower() + action_type = str(action.get("type") or "").lower() + + noun = "l'icône Word" if context.get("is_word_icon") else "la cible visuelle" + if button == "double" or action_type == "double_click_anchor": + intent = "ouvrir" + elif button == "right": + intent = "faire un clic droit sur" + else: + intent = "utiliser" + + if reason in ("no_screen_change", "no_visible_change", "verification_failed"): + return _coerce_pause_message( + intention=f"{intent} {noun} « {target_desc} »", + attendu="voir un changement visible apres mon action", + vu="l'ecran reste identique apres mon action", + demande="me montrer la bonne cible ou indiquer que je peux continuer", + ) + if reason == "wrong_window": + return _coerce_pause_message( + intention=f"{intent} {noun} « {target_desc} »", + attendu="avoir la bonne fenetre active au premier plan", + vu="une autre fenetre est active", + demande="remettre la bonne fenetre au premier plan", + ) + return _coerce_pause_message( + intention=f"{intent} {noun} « {target_desc} »", + attendu=f"voir « {target_desc} » a l'ecran", + vu="la zone attendue n'est pas identifiable de facon fiable", + demande="me montrer ou cliquer pour continuer", + ) + + +def _normalize_action_target_semantics(action: Dict[str, Any]) -> None: + """Remonter les libellés d'ancre dans target_spec avant enqueue. + + Les plans déjà sérialisés peuvent contenir `target_text`/`description` + sans `by_text`/`vlm_description`. L'agent Windows lit surtout ces deux + derniers champs pour la résolution autonome. + """ + if not isinstance(action, dict): + return + + target_spec = action.get("target_spec") + if not isinstance(target_spec, dict): + target_spec = {} + params = _action_parameters(action) + visual_anchor = _visual_anchor_from_params(params) + anchor_id = _anchor_id_from_context(target_spec, action, params, visual_anchor) + anchor_ctx = _load_visual_anchor_context(anchor_id) + + for key in ("target_text", "description", "ocr_description"): + for source in (action, params, visual_anchor, anchor_ctx): + if not isinstance(source, dict): + continue + value = _useful_target_description(source.get(key)) + if value and not target_spec.get(key): + target_spec[key] = value + break + + if anchor_id: + target_spec.setdefault("anchor_id", anchor_id) + + if not target_spec.get("by_text"): + by_text = _useful_target_description( + target_spec.get("target_text") + or action.get("target_text") + or params.get("target_text") + or visual_anchor.get("target_text") + or anchor_ctx.get("target_text") + ) + if by_text: + target_spec["by_text"] = by_text + target_spec.setdefault("by_text_source", "visual_anchor") + + if not target_spec.get("vlm_description"): + vlm_description = _useful_target_description( + target_spec.get("description") + or target_spec.get("ocr_description") + or visual_anchor.get("description") + or visual_anchor.get("ocr_description") + or anchor_ctx.get("description") + or anchor_ctx.get("ocr_description") + ) + if vlm_description: + target_spec["vlm_description"] = vlm_description + + if target_spec: + action["target_spec"] = target_spec + + label = _useful_target_description( + target_spec.get("by_text") + or target_spec.get("target_text") + or target_spec.get("description") + or target_spec.get("ocr_description") + or target_spec.get("vlm_description") + ) + if label: + action.setdefault("target_text", target_spec.get("target_text") or label) + action.setdefault("target_description", label) + action.setdefault("description", target_spec.get("description") or label) + + +def _normalize_actions_target_semantics(actions: List[Dict[str, Any]]) -> None: + for action in actions or []: + _normalize_action_target_semantics(action) + + +def _fast_pause_for_no_visible_change( + verification: Any, + original_action: Optional[Dict[str, Any]], +) -> bool: + """Avoid stale server retries when the only signal is no visible change.""" + if os.environ.get("RPA_FAST_PAUSE_NO_VISIBLE_CHANGE", "1").lower() not in ( + "1", "true", "yes", "on", + ): + return False + action = original_action or {} + if action.get("expected_result"): + return False + if action.get("type") != "click": + return False + detail = str(getattr(verification, "detail", "") or "").lower() + return ( + not getattr(verification, "changes_detected", True) + or "aucun changement" in detail + or "no change" in detail + or "unchanged" in detail + ) + + +def _is_idempotent_key_combo_no_visible_change(action: Dict[str, Any]) -> bool: + """Combos whose success often has no pixel delta in the demo flow.""" + if not isinstance(action, dict): + return False + if action.get("expected_result"): + return False + if action.get("type") != "key_combo": + return False + + keys = action.get("keys") or [] + normalized = tuple(str(k or "").strip().lower() for k in keys if str(k or "").strip()) + key_set = set(normalized) + return ( + key_set in ({"ctrl", "s"}, {"ctrl_l", "s"}, {"ctrl_r", "s"}) + or key_set in ({"ctrl", "end"}, {"ctrl_l", "end"}, {"ctrl_r", "end"}) + or key_set in ({"ctrl", "home"}, {"ctrl_l", "home"}, {"ctrl_r", "home"}) + ) + + class ErrorCallbackConfig(BaseModel): """Configuration du callback d'erreur pour un replay.""" replay_id: str @@ -1051,6 +1718,11 @@ async def register_session(session_id: str, machine_id: str = "default"): session_id: Identifiant unique de la session machine_id: Identifiant de la machine source (multi-machine, défaut: "default") """ + machine_id = machine_id or "default" + _guard_agent_registry_access( + machine_id, + endpoint="/api/v1/traces/stream/register", + ) processor.session_manager.register_session(session_id, machine_id=machine_id) # Reset des compteurs pour cette session (évite les reliquats d'une session précédente) with _pending_lock: @@ -1071,6 +1743,11 @@ def _ensure_session_registered(session_id: str, machine_id: str = "default"): session_id: Identifiant de la session machine_id: Identifiant machine (propagé depuis l'agent) """ + machine_id = machine_id or "default" + _guard_agent_registry_access( + machine_id, + endpoint="/api/v1/traces/stream", + ) session = processor.session_manager.get_session(session_id) if session is None: logger.info(f"Auto-enregistrement de la session {session_id} (machine={machine_id})") @@ -1650,6 +2327,13 @@ async def finalize( detail=f"Session {session_id} non trouvée", ) + # ── Garde fleet registry ── + _finalize_guard_mid = getattr(session, "machine_id", None) or machine_id + _guard_agent_registry_access( + _finalize_guard_mid, + endpoint="/api/v1/traces/stream/finalize", + ) + # Marquer la session comme finalisée (persistée sur disque) processor.session_manager.finalize(session_id) logger.info(f"Session {session_id} finalisée, ajout à la queue du worker VLM") @@ -2295,6 +2979,8 @@ async def start_replay(request: ReplayRequest): app_info.get("primary_app"), app_info.get("primary_launch_cmd"), ) + _normalize_actions_target_semantics(actions) + # Créer l'identifiant de replay replay_id = f"replay_{uuid.uuid4().hex[:8]}" @@ -2371,6 +3057,7 @@ async def start_raw_replay(request: RawReplayRequest): ) # Validation de chaque action (sécurité HIGH) + _normalize_actions_target_semantics(actions) for i, action in enumerate(actions): error = _validate_replay_action(action) if error: @@ -2480,6 +3167,12 @@ async def replay_from_session( if not session_id: raise HTTPException(status_code=400, detail="session_id requis") + # ── Garde révocation : agent actif obligatoire ── + _guard_agent_registry_access( + machine_id, + endpoint="/api/v1/traces/stream/replay-session", + ) + # ── 1. Trouver le fichier live_events.jsonl de la session ── events_file = None @@ -2591,6 +3284,7 @@ async def replay_from_session( ) # Validation de chaque action (sécurité HIGH) + _normalize_actions_target_semantics(actions) for i, action in enumerate(actions): error = _validate_replay_action(action) if error: @@ -2699,6 +3393,7 @@ async def enqueue_single_action(request: SingleActionRequest): target_machine_id = request.machine_id # Validation de l'action (sécurité HIGH) + _normalize_action_target_semantics(action) error = _validate_replay_action(action) if error: raise HTTPException(status_code=400, detail=f"Action invalide : {error}") @@ -2846,6 +3541,7 @@ async def launch_replay_from_plan(request: PlanReplayRequest): ) # ── 3. Validation de chaque action (sécurité HIGH) ── + _normalize_actions_target_semantics(actions) validated: List[Dict[str, Any]] = [] for i, action in enumerate(actions): error = _validate_replay_action(action) @@ -3139,6 +3835,12 @@ async def get_next_action(session_id: str, machine_id: str = "default"): (single-threaded) et même les polls qui devraient recevoir server_busy sont bloqués jusqu'à libération — ce qui annule l'effet du timeout. """ + machine_id = machine_id or "default" + _guard_agent_registry_access( + machine_id, + endpoint="/api/v1/traces/stream/replay/next", + ) + import asyncio loop = asyncio.get_event_loop() acquired = await loop.run_in_executor(None, _replay_lock.acquire, True, 4.5) @@ -3166,6 +3868,9 @@ async def get_next_action(session_id: str, machine_id: str = "default"): "replay_paused": True, "pause_message": state.get("pause_message", "Replay en pause"), "replay_id": state["replay_id"], + "current_action_index": state.get("current_action_index", 0), + "completed_actions": state.get("completed_actions", 0), + "total_actions": state.get("total_actions", "?"), } # CRITIQUE : vérifier que la queue appartient BIEN à cette machine. @@ -3341,7 +4046,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): # pause_for_human : pause supervisée si safety_level/safety_checks ou mode supervised, # sinon no-op en mode autonome (skip). if type_ == "pause_for_human": - _params = action.get("parameters") or {} + _params = _pause_action_parameters(action) _exec_mode = ( (owning_replay or {}).get("params", {}).get("execution_mode", "autonomous") if owning_replay else "autonomous" @@ -3374,11 +4079,20 @@ async def get_next_action(session_id: str, machine_id: str = "default"): logger.warning("QW4 build_pause_payload échec (%s) — pause sans checks", e) owning_replay["safety_checks"] = [] + pause_message = ( + owning_replay.get("pause_message") + or _pause_action_message(action) + ) + owning_replay["pause_message"] = pause_message + # Conserver le contexte de l'action (audit + reprise) owning_replay["failed_action"] = { "action_id": action.get("action_id"), "type": "pause_for_human", "reason": "user_request", + "target_description": pause_message, + "message": pause_message, + "safety_checks": _params.get("safety_checks") or [], } owning_replay["status"] = "paused_need_help" queue.pop(0) @@ -3392,8 +4106,11 @@ async def get_next_action(session_id: str, machine_id: str = "default"): "session_id": session_id, "machine_id": machine_id, "replay_paused": True, - "pause_message": owning_replay.get("pause_message", "Validation requise"), + "pause_message": pause_message, "replay_id": owning_replay["replay_id"], + "current_action_index": owning_replay.get("current_action_index", 0), + "completed_actions": owning_replay.get("completed_actions", 0), + "total_actions": owning_replay.get("total_actions", "?"), } # Mode autonome sans safety_checks → skip (comportement legacy) @@ -3402,6 +4119,8 @@ async def get_next_action(session_id: str, machine_id: str = "default"): owning_replay["replay_id"] if owning_replay else "?" ) queue.pop(0) + if owning_replay is not None: + _bump_completed_actions(owning_replay, action.get("action_id", "")) _replay_queues[session_id] = queue continue @@ -3480,6 +4199,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): except Exception as e: logger.warning(f"Action serveur {type_} a levé : {e}") queue.pop(0) + _bump_completed_actions(owning_replay, action.get("action_id", "")) _replay_queues[session_id] = queue continue # action suivante @@ -3497,6 +4217,7 @@ async def get_next_action(session_id: str, machine_id: str = "default"): logger.info("Clic conditionnel ignoré (%s=%s) — action %s", condition_key, val, action.get("action_id", "?")) queue.pop(0) + _bump_completed_actions(owning_replay, action.get("action_id", "")) _replay_queues[session_id] = queue continue @@ -3770,6 +4491,24 @@ async def report_action_result(report: ReplayResultReport): session_id = report.session_id action_id = report.action_id + # ── Garde inconditionnel sur /replay/result ── + # Le guard ne dépend plus de _retry_pending : il s'applique sur chaque + # rapport, en résolvant le machine_id depuis la session ou le replay_state. + _result_guard_mid: str | None = None + _result_guard_sess = processor.session_manager.get_session(session_id) + if _result_guard_sess and getattr(_result_guard_sess, "machine_id", None): + _result_guard_mid = _result_guard_sess.machine_id + # Fallback : chercher le machine_id dans un replay_state actif pour cette session + if not _result_guard_mid: + for _rs in _replay_states.values(): + if _rs.get("session_id") == session_id: + _result_guard_mid = _rs.get("machine_id") + break + _guard_agent_registry_access( + _result_guard_mid, + endpoint="/api/v1/traces/stream/replay/result", + ) + # [REPLAY] log structuré d'arrivée du rapport agent _pos_log = report.actual_position or {} _x_log = _pos_log.get("x_pct", "?") if isinstance(_pos_log, dict) else "?" @@ -3834,21 +4573,25 @@ async def report_action_result(report: ReplayResultReport): # toujours considérées réussies si l'agent dit success (pas de position à vérifier, # et le screenshot change peu pour une frappe clavier) # - # Si l'agent a envoyé un warning "no_screen_change" ou "popup_handled", - # il a déjà tenté de gérer la situation (popup handler). Ne PAS relancer - # de retry côté serveur — continuer vers l'action suivante. + # Si l'agent a déjà classé l'échec (popup/no visible change), ne relance + # pas une vérification serveur qui réinjecterait des retries périmés. agent_warning = report.warning or "" - agent_handled_popup = agent_warning in ("no_screen_change", "popup_handled") - if agent_handled_popup: + agent_handled_terminal = ( + agent_warning == "popup_handled" + or _is_no_visible_change_report(report) + or _is_target_not_found_report(report) + or _is_visual_resolve_failed_report(report) + ) + if agent_handled_terminal: logger.info( f"Action {action_id} : agent warning='{agent_warning}' — " - f"popup déjà gérée côté agent, pas de retry serveur" + f"échec déjà classé côté agent, pas de vérification/retry serveur" ) action_type_for_verify = (original_action or {}).get("type", "unknown") skip_verify = action_type_for_verify in ("type", "key_combo", "wait") # Skip aussi la vérification serveur si l'agent a déjà géré la popup - skip_verify = skip_verify or agent_handled_popup + skip_verify = skip_verify or agent_handled_terminal verification = None # [VALIDATOR_V2] override conditionnel — flag RPA_VALIDATOR_V2_ENABLED. # Si verdict ≠ COMPLETE, on force result.success=False et on expose failure_category. @@ -3959,7 +4702,8 @@ async def report_action_result(report: ReplayResultReport): async with _async_replay_lock(): result_entry = { "action_id": action_id, - "success": report.success, + "success": _final_success, + "agent_success": report.success, "error": report.error, "warning": report.warning, "has_screenshot": bool(screenshot_after), @@ -4125,8 +4869,18 @@ async def report_action_result(report: ReplayResultReport): # === Logique de retry / success / failure === if report.success and (verification is None or verification.verified): # Action réussie (vérification OK ou pas de vérification) - replay_state["completed_actions"] += 1 - replay_state["current_action_index"] += 1 + _bump_completed_actions(replay_state, action_id) + pending_removed = _remove_retry_pending_duplicates( + session_id, + replay_state.get("replay_id", ""), + action_id, + ) + if pending_removed: + logger.warning( + "[REPLAY] success cleanup action_id=%s removed_retry_pending=%d", + action_id, + pending_removed, + ) elif report.success and verification and not verification.verified: # Agent dit "success" mais la vérification échoue (rien n'a changé) @@ -4135,16 +4889,71 @@ async def report_action_result(report: ReplayResultReport): f"Action {action_id} marquée success mais non vérifiée: " f"{verification.detail}" ) - if verification.suggestion == "retry" and retry_count < MAX_RETRIES_PER_ACTION: + _verify_action_ctx = _current_replay_action( + replay_state, action_id, original_action, + ) + fast_pause_no_visible = _fast_pause_for_no_visible_change( + verification, _verify_action_ctx, + ) + if ( + verification.suggestion == "retry" + and retry_count < MAX_RETRIES_PER_ACTION + and not fast_pause_no_visible + ): # Réinjecter pour retry _schedule_retry( - session_id, replay_state, original_action or {"action_id": action_id}, + session_id, replay_state, _verify_action_ctx or {"action_id": action_id}, retry_count, "verification_failed" ) else: - # Continuer malgré tout (action non vérifiée) - replay_state["completed_actions"] += 1 - replay_state["current_action_index"] += 1 + # Retries épuisés, suggestion non retry, ou no visible change : + # pause supervisée immédiate avec cible intelligible. + _ctx_pv = _failed_target_context( + report, replay_state, action_id, original_action, + ) + _tspec_pv = _ctx_pv["target_spec"] + _target_desc_pv = _ctx_pv["target_description"] + _reason_pv = ( + "verification_no_visible_change" + if fast_pause_no_visible else "verification_failed_max_retries" + ) + replay_state["status"] = "paused_need_help" + replay_state["failed_action"] = { + "action_id": action_id, + "type": (_ctx_pv.get("action") or original_action or {}).get("type", "unknown"), + "target_description": _target_desc_pv, + "screenshot_b64": screenshot_after or report.screenshot, + "target_spec": _tspec_pv, + "original_action": dict(_ctx_pv.get("action") or original_action or {}), + "anchor_id": _ctx_pv.get("anchor_id", ""), + "reason": _reason_pv, + "verification_detail": (verification.detail or "")[:200], + "failure_category": validator_v2_failure_category, + } + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx_pv, "verification_failed", + ) + replay_state["error_log"].append({ + "action_id": action_id, + "error": f"{_reason_pv}: {verification.detail or ''}", + "retry_count": retry_count, + "timestamp": time.time(), + }) + logger.warning( + f"Replay PAUSE supervisée : {action_id} non vérifiée " + f"({_reason_pv}, retry_count={retry_count}) — {_target_desc_pv}" + ) + try: + log_replay_failure( + replay_id=replay_state["replay_id"], + action_id=action_id, + target_spec=_tspec_pv, + screenshot_b64=screenshot_after or report.screenshot, + error=_reason_pv, + extra={"verification_detail": verification.detail or ""}, + ) + except Exception as _log_exc: + logger.debug("log_replay_failure skip: %s", _log_exc) elif not report.success and (report.system_dialog or (report.error or "").startswith("system_dialog:")): # ── SÉCURITÉ : dialogue système Windows détecté (UAC / CredUI / SmartScreen) ── @@ -4243,34 +5052,25 @@ async def report_action_result(report: ReplayResultReport): # est apparue, l'écran a changé entre deux actions. # # On redirige vers paused_need_help pour que l'humain intervienne. - _tspec_ww = (original_action or {}).get("target_spec") or report.target_spec or {} - _intent_ww = "" - _idx_ww = replay_state.get("current_action_index", 0) - _actions_ww = replay_state.get("actions", []) - if 0 <= _idx_ww < len(_actions_ww): - _intent_ww = str((_actions_ww[_idx_ww] or {}).get("intention", "") or "") - - _target_desc_ww = ( - _intent_ww - or _tspec_ww.get("by_text", "") - or _tspec_ww.get("vlm_description", "")[:80] - or "cette action" + _ctx_ww = _failed_target_context( + report, replay_state, action_id, original_action, ) + _tspec_ww = _ctx_ww["target_spec"] + _target_desc_ww = _ctx_ww["target_description"] replay_state["status"] = "paused_need_help" replay_state["failed_action"] = { "action_id": action_id, - "type": (original_action or {}).get("type", "unknown"), + "type": (_ctx_ww.get("action") or original_action or {}).get("type", "unknown"), "target_description": _target_desc_ww, "screenshot_b64": screenshot_after or report.screenshot, "target_spec": _tspec_ww, - "original_action": dict(original_action or {}), + "original_action": dict(_ctx_ww.get("action") or original_action or {}), + "anchor_id": _ctx_ww.get("anchor_id", ""), "reason": "wrong_window", "error_detail": report.error or "", } - replay_state["pause_message"] = ( - f"Je m'attendais à voir la bonne fenêtre mais je vois autre " - f"chose. Peux-tu vérifier que l'application est au premier " - f"plan ? ({report.error or ''})" + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx_ww, "wrong_window", ) error_entry = { "action_id": action_id, @@ -4291,12 +5091,12 @@ async def report_action_result(report: ReplayResultReport): target_spec=_tspec_ww, screenshot_b64=screenshot_after or report.screenshot, error="wrong_window", - extra={"error_detail": report.error or "", "intent": _intent_ww}, + extra={"error_detail": report.error or "", "target_description": _target_desc_ww}, ) except Exception as _log_exc: logger.debug("log_replay_failure skip: %s", _log_exc) - elif not report.success and agent_warning == "no_screen_change": + elif not report.success and _is_no_visible_change_report(report): # L'action a été exécutée mais l'écran n'a pas changé. # # Philosophie Léa (feedback_failure_is_learning.md) : un échec @@ -4322,30 +5122,43 @@ async def report_action_result(report: ReplayResultReport): _is_strict = bool(_current_strict.get("success_strict", False)) _intent_strict = str(_current_strict.get("intention", "") or "") - if _is_strict: - # Apprentissage supervisé : pause, demande d'intervention - _tspec = (original_action or {}).get("target_spec") or report.target_spec or {} - _target_desc = ( - _intent_strict - or _tspec.get("by_text", "") - or _tspec.get("vlm_description", "")[:80] - or "cette action" + _no_change_action = _current_replay_action( + replay_state, action_id, original_action, + ) + _idempotent_key_combo_no_change = _is_idempotent_key_combo_no_visible_change( + _no_change_action, + ) + if _idempotent_key_combo_no_change: + replay_state["unverified_actions"] += 1 + _bump_completed_actions(replay_state, action_id) + logger.warning( + "Action %s : no visible change accepté pour key_combo idempotent %s", + action_id, + _no_change_action.get("keys"), ) + + elif _is_strict: + # Apprentissage supervisé : pause, demande d'intervention + _ctx = _failed_target_context( + report, replay_state, action_id, original_action, + ) + _tspec = _ctx["target_spec"] + _target_desc = _ctx["target_description"] replay_state["status"] = "paused_need_help" replay_state["failed_action"] = { "action_id": action_id, - "type": (original_action or {}).get("type", "unknown"), + "type": (_ctx.get("action") or original_action or {}).get("type", "unknown"), "target_description": _target_desc, "screenshot_b64": screenshot_after or report.screenshot, "target_spec": _tspec, - "original_action": dict(original_action or {}), + "original_action": dict(_ctx.get("action") or original_action or {}), + "anchor_id": _ctx.get("anchor_id", ""), "reason": "no_screen_change_strict", "resolution_method": report.resolution_method or "", "resolution_score": report.resolution_score or 0, } - replay_state["pause_message"] = ( - f"Mon clic sur '{_target_desc}' n'a produit aucun effet. " - f"Peux-tu me montrer où je devais cliquer ?" + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx, "no_screen_change", ) error_entry = { "action_id": action_id, @@ -4377,30 +5190,92 @@ async def report_action_result(report: ReplayResultReport): except Exception as _log_exc: logger.debug("log_replay_failure skip: %s", _log_exc) else: - # Legacy (non-strict) : on continue, comportement historique - replay_state["unverified_actions"] += 1 - replay_state["completed_actions"] += 1 - replay_state["current_action_index"] += 1 - logger.warning( - f"Action {action_id} : écran inchangé (no_screen_change) — " - f"action sans effet visible, on continue (non-strict)" + _ctx_ns = _failed_target_context( + report, replay_state, action_id, original_action, ) + _tspec_ns = _ctx_ns["target_spec"] + _target_desc_ns = _ctx_ns["target_description"] + _tolerant_no_screen_change = os.environ.get( + "RPA_TOLERANT_NO_SCREEN_CHANGE", "" + ).lower() in ("1", "true", "yes", "on") - elif not report.success and (report.error or "") == "target_not_found": + if _tolerant_no_screen_change: + # Compatibilité explicite : certains workflows historiques + # acceptent des clics sans effet visible. + replay_state["unverified_actions"] += 1 + _bump_completed_actions(replay_state, action_id) + logger.warning( + f"Action {action_id} : écran inchangé (no_screen_change) — " + f"action sans effet visible, on continue (mode tolérant explicite)" + ) + + # Par défaut : ne jamais valider un clic sans effet visible. + else: + replay_state["unverified_actions"] += 1 + replay_state["status"] = "paused_need_help" + replay_state["failed_action"] = { + "action_id": action_id, + "type": (_ctx_ns.get("action") or original_action or {}).get("type", "unknown"), + "target_description": _target_desc_ns, + "screenshot_b64": screenshot_after or report.screenshot, + "target_spec": _tspec_ns, + "original_action": dict(_ctx_ns.get("action") or original_action or {}), + "anchor_id": _ctx_ns.get("anchor_id", ""), + "reason": "no_screen_change", + "resolution_method": report.resolution_method or "", + "resolution_score": report.resolution_score or 0, + } + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx_ns, "no_screen_change", + ) + replay_state["error_log"].append({ + "action_id": action_id, + "error": f"no_screen_change: {_target_desc_ns}", + "retry_count": retry_count, + "timestamp": time.time(), + }) + logger.warning( + f"Replay PAUSE supervisée (no_screen_change) : {action_id} " + f"sur '{_target_desc_ns}' — en attente d'intervention humaine" + ) + try: + log_replay_failure( + replay_id=replay_state["replay_id"], + action_id=action_id, + target_spec=_tspec_ns, + screenshot_b64=screenshot_after or report.screenshot, + error="no_screen_change", + extra={ + "target_description": _target_desc_ns, + "resolution_method": report.resolution_method or "", + "resolution_score": report.resolution_score or 0, + "actions_completed": replay_state["completed_actions"], + }, + ) + except Exception as _log_exc: + logger.debug("log_replay_failure skip: %s", _log_exc) + + elif not report.success and _is_target_not_found_report(report): # Cible non trouvée visuellement — PAUSE supervisée, PAS d'erreur fatale. # L'utilisateur doit intervenir (naviguer vers le bon ecran, fermer une popup, etc.) # On NE vide PAS la queue : les actions restantes seront reprises apres intervention. - target_desc = report.target_description or "élément inconnu" + _ctx_tnf = _failed_target_context( + report, replay_state, action_id, original_action, + ) + target_desc = _ctx_tnf["target_description"] replay_state["status"] = "paused_need_help" replay_state["failed_action"] = { "action_id": action_id, - "type": (original_action or {}).get("type", "unknown"), + "type": (_ctx_tnf.get("action") or original_action or {}).get("type", "unknown"), "target_description": target_desc, "screenshot_b64": screenshot_after or report.screenshot, - "target_spec": report.target_spec, - "original_action": dict(original_action or {}), + "target_spec": _ctx_tnf["target_spec"], + "original_action": dict(_ctx_tnf.get("action") or original_action or {}), + "anchor_id": _ctx_tnf.get("anchor_id", ""), } - replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran" + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx_tnf, "target_not_found", + ) error_entry = { "action_id": action_id, "error": f"target_not_found: {target_desc}", @@ -4416,7 +5291,7 @@ async def report_action_result(report: ReplayResultReport): log_replay_failure( replay_id=replay_state["replay_id"], action_id=action_id, - target_spec=report.target_spec, + target_spec=_ctx_tnf["target_spec"], screenshot_b64=screenshot_after or report.screenshot, resolution_attempts=[ r for r in replay_state["results"] @@ -4430,20 +5305,26 @@ async def report_action_result(report: ReplayResultReport): }, ) - elif not report.success and "visual resolve" in (report.error or "").lower(): + elif not report.success and _is_visual_resolve_failed_report(report): # Visual resolve échoué (ancien format d'erreur) — PAUSE supervisée aussi. # Compatibilité avec les agents qui n'envoient pas encore "target_not_found". - target_desc = report.target_description or (report.error or "Visual resolve échoué") + _ctx_vrf = _failed_target_context( + report, replay_state, action_id, original_action, + ) + target_desc = _ctx_vrf["target_description"] replay_state["status"] = "paused_need_help" replay_state["failed_action"] = { "action_id": action_id, - "type": (original_action or {}).get("type", "unknown"), + "type": (_ctx_vrf.get("action") or original_action or {}).get("type", "unknown"), "target_description": target_desc, "screenshot_b64": screenshot_after or report.screenshot, - "target_spec": report.target_spec, - "original_action": dict(original_action or {}), + "target_spec": _ctx_vrf["target_spec"], + "original_action": dict(_ctx_vrf.get("action") or original_action or {}), + "anchor_id": _ctx_vrf.get("anchor_id", ""), } - replay_state["pause_message"] = f"Je ne vois pas '{target_desc}' à l'écran" + replay_state["pause_message"] = _pause_message_for_failed_target( + _ctx_vrf, "visual_resolve_failed", + ) error_entry = { "action_id": action_id, "error": report.error or "Visual resolve échoué", @@ -4459,7 +5340,7 @@ async def report_action_result(report: ReplayResultReport): log_replay_failure( replay_id=replay_state["replay_id"], action_id=action_id, - target_spec=report.target_spec, + target_spec=_ctx_vrf["target_spec"], screenshot_b64=screenshot_after or report.screenshot, error="visual_resolve_failed", ) @@ -4494,10 +5375,40 @@ async def report_action_result(report: ReplayResultReport): # Notifier via callback si configuré _notify_error_callback(replay_state, action_id, report.error) + # Défense contre les compteurs incohérents, notamment les actions + # synthétiques de retry qui ne font pas partie du plan original. + total_actions = int(replay_state.get("total_actions", 0) or 0) + if replay_state["completed_actions"] > total_actions: + logger.warning( + f"completed_actions={replay_state['completed_actions']} > " + f"total={total_actions} — clamp défensif" + ) + replay_state["completed_actions"] = total_actions + # Vérifier si le replay est terminé (queue vide + dernière action réussie) remaining = len(_replay_queues.get(session_id, [])) if remaining == 0 and replay_state["status"] == "running": - replay_state["status"] = "completed" + if replay_state.get("failed_actions", 0) > 0: + replay_state["status"] = "error" + elif replay_state["completed_actions"] >= total_actions: + replay_state["status"] = "completed" + else: + replay_state["status"] = "paused_need_help" + replay_state["pause_message"] = ( + "Le plan n'est pas terminé mais je n'ai plus d'action à exécuter. " + "Vérifie l'écran avant de reprendre." + ) + logger.error( + f"[INVARIANT] queue vide, completed=" + f"{replay_state['completed_actions']}/{total_actions}, " + f"status forcé paused_need_help" + ) + return { + "status": "recorded", + "replay_status": replay_state["status"], + "pause_reason": "paused_need_help", + } + logger.info( f"Replay {replay_state['replay_id']} terminé avec succès : " f"{replay_state['completed_actions']}/{replay_state['total_actions']} actions" @@ -4781,6 +5692,10 @@ async def resume_replay( # Remettre le replay en mode running state["status"] = "running" + + if failed_action and failed_action.get("reason") == "user_request": + _bump_completed_actions(state, failed_action.get("action_id", "")) + state["failed_action"] = None state["pause_message"] = None # QW4 — vider safety_checks après acquittement (la pause est résolue) @@ -5930,6 +6845,20 @@ async def agents_enroll(request: AgentEnrollRequest): "existing": existing, }, ) + except AgentRevokedError as exc: + existing = _agent_row_public(exc.existing) + logger.warning( + f"[FLEET] Reenrollement refuse machine_id={machine_id} " + f"(revoque admin depuis {existing.get('uninstalled_at')})" + ) + raise HTTPException( + status_code=403, + detail={ + "error": "agent_revoked", + "message": "machine_id revoque par administrateur", + "existing": existing, + }, + ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) @@ -6068,6 +6997,482 @@ async def dialog_resolve(payload: DialogResolveRequest): return resolution.to_dict() +# ========================================================================= +# POC Lea-first — POST /api/v1/lea/competences/candidate/persist +# Cloture le cycle Shadow (start -> stop -> understanding -> feedback -> +# build -> persist). Ecrit le YAML candidate sur disque + journal d'audit. +# Specs : docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md +# Periimetre : ajout en fin de fichier uniquement (pas de modif des +# routes existantes ni du _guard_agent_registry_access). +# ========================================================================= + +try: + from core.competences import persist as _competence_persist + _PERSIST_HELPERS_AVAILABLE = True +except Exception as _persist_import_exc: # pragma: no cover + _competence_persist = None + _PERSIST_HELPERS_AVAILABLE = False + logger.warning( + "[PERSIST] core.competences.persist indisponible : %s", _persist_import_exc + ) + +# Registry agents externes (mentionne dans specs §5). Import optionnel : +# si le registry n'existe pas encore, on accepte tous les agent_id non vides. +try: + from core.competences.external_decision_client import ( # type: ignore + ExternalDecisionClient as _ExternalDecisionClient, + ) + _EXTERNAL_DECISION_REGISTRY_AVAILABLE = True +except Exception: # pragma: no cover + _ExternalDecisionClient = None # type: ignore[assignment] + _EXTERNAL_DECISION_REGISTRY_AVAILABLE = False + + +class CompetencePersistRequest(BaseModel): + """Payload entree pour /persist (cf. specs §1).""" + + name: str + machine_id: str + session_id: Optional[str] = None + workflow_ir: Dict[str, Any] = {} + parameters: Optional[List[Dict[str, Any]]] = None + external_decision: Optional[Dict[str, Any]] = None + annotations_semantiques: Optional[Dict[str, Any]] = None + learning_metadata: Dict[str, Any] = {} + + +def _persist_known_external_agents() -> Optional[List[str]]: + """Liste des agent_id externes connus (None = registry indisponible).""" + if not _EXTERNAL_DECISION_REGISTRY_AVAILABLE or _ExternalDecisionClient is None: + return None + try: + client = _ExternalDecisionClient() + return list(getattr(client, "list_agents", lambda: [])()) + except Exception: + return None + + +@app.post("/api/v1/lea/competences/candidate/persist", status_code=201) +async def lea_competence_persist( + payload: CompetencePersistRequest, + request: Request, +): + """Persister un workflow_ir issu du cycle Shadow en YAML candidate. + + Cycle complet : start -> stop -> understanding -> feedback (N) -> + build -> **persist**. + + Specs : ``docs/POC/SPECS_ENDPOINT_PERSIST_2026-06-01.md``. + """ + if not _PERSIST_HELPERS_AVAILABLE or _competence_persist is None: + raise HTTPException( + status_code=503, + detail="Module core.competences.persist indisponible", + ) + + persist_mod = _competence_persist + + # --- 1. Couplage machine_id <-> token --------------------------------- + # Le POC utilise un token global ; on ne peut pas (encore) verifier + # l'appariement strict token<->machine_id. On delegue au garde + # agent_registry (deja en place sur d'autres endpoints) : si la fleet + # est peuplee, machine_id doit etre enrole. + _guard_agent_registry_access( + payload.machine_id, + endpoint="/api/v1/lea/competences/candidate/persist", + ) + + # --- 2. Rate limit par machine_id ------------------------------------- + allowed, retry_after = persist_mod.persist_rate_limiter.allow(payload.machine_id) + if not allowed: + raise HTTPException( + status_code=429, + detail={ + "error": "rate_limit_exceeded", + "retry_after": retry_after, + "machine_id": payload.machine_id, + }, + headers={"Retry-After": str(retry_after)}, + ) + + # --- 3. Idempotence : persist_id deja vu ? ---------------------------- + learning_meta = dict(payload.learning_metadata or {}) + client_persist_id = str(learning_meta.get("persist_id") or "").strip() + if client_persist_id: + previous = persist_mod.find_existing_audit_entry( + client_persist_id, audit_path=persist_mod.AUDIT_PATH + ) + if previous is not None: + return { + "competence_id": previous.get("competence_id"), + "yaml_path": previous.get("yaml_path"), + "learning_state": previous.get("learning_state"), + "persist_id": previous.get("persist_id"), + "audit_entry_id": previous.get("audit_entry_id"), + "idempotent_replay": True, + } + persist_id = client_persist_id or str(uuid.uuid4()) + + # --- 4. Slugification ---------------------------------------------------- + try: + slug = persist_mod.slugify(payload.name) + except ValueError as exc: + raise HTTPException(status_code=400, detail={"error": "invalid_name", "message": str(exc)}) + + # --- 5. Validation WorkflowIR ---------------------------------------- + partial = bool(learning_meta.get("partial", False)) + dropout_reason = learning_meta.get("dropout_reason") + steps = list((payload.workflow_ir or {}).get("steps") or []) + if not steps and not partial: + raise HTTPException( + status_code=400, + detail={"error": "empty_workflow_ir", "message": "steps vide et partial=false"}, + ) + if partial and not dropout_reason: + raise HTTPException( + status_code=400, + detail={ + "error": "dropout_reason_required", + "message": "partial=true exige learning_metadata.dropout_reason", + }, + ) + + # --- 6. Agent externe (si fourni) ------------------------------------- + external_agent_id = None + if payload.external_decision: + external_agent_id = str(payload.external_decision.get("agent_id") or "").strip() or None + known_agents = _persist_known_external_agents() + if external_agent_id and known_agents is not None and external_agent_id not in known_agents: + raise HTTPException( + status_code=400, + detail={ + "error": "external_agent_unknown", + "agent_id": external_agent_id, + "known_agents": known_agents, + }, + ) + + # --- 7. PII detection (regle d'or HDS) -------------------------------- + pii_scope = { + "workflow_ir": payload.workflow_ir, + "annotations_semantiques": payload.annotations_semantiques, + } + pii_matches = persist_mod.detect_pii(pii_scope) + if pii_matches: + logger.warning( + "[PERSIST] payload rejete (PII detecte) machine_id=%s persist_id=%s patterns=%d", + payload.machine_id, persist_id, len(pii_matches), + ) + raise HTTPException( + status_code=400, + detail={ + "error": "pii_detected", + "message": "Pattern PII detecte dans workflow_ir ou annotations_semantiques", + "patterns_count": len(pii_matches), + }, + ) + + # --- 8. learning_state — regle d'or : jamais stable par persist direct + requested_state = str(learning_meta.get("learning_state") or "").strip().lower() + if partial: + learning_state = "incomplete" + elif requested_state in {"", "stable", "candidate"}: + learning_state = "candidate" + elif requested_state in {"supervised", "observed"}: + learning_state = requested_state + else: + learning_state = "candidate" + + # --- 9. Collisions cross-states --------------------------------------- + collision_dir = persist_mod.detect_cross_state_collision( + slug, competences_root=persist_mod.COMPETENCES_ROOT + ) + if collision_dir is not None: + raise HTTPException( + status_code=409, + detail={ + "error": "slug_collision", + "slug": slug, + "existing_state": collision_dir, + "message": f"competence '{slug}' existe deja en '{collision_dir}/'", + }, + ) + + # --- 10. Construction YAML -------------------------------------------- + annotations = payload.annotations_semantiques or {} + intent_fr = str(annotations.get("intent_fr") or payload.name) + yaml_body = persist_mod.build_competence_yaml( + slug=slug, + name=payload.name, + workflow_ir=payload.workflow_ir or {}, + parameters=payload.parameters, + intent_fr=intent_fr, + learning_state=learning_state, + session_id=payload.session_id, + machine_id=payload.machine_id, + external_agent_id=external_agent_id, + ) + missing = persist_mod.validate_yaml_schema(yaml_body) + if missing: + raise HTTPException( + status_code=500, + detail={"error": "yaml_schema_invalid", "missing_fields": missing}, + ) + + # --- 11. Atomic write + roundtrip ------------------------------------ + target_path = persist_mod.CANDIDATE_DIR / f"{slug}.yaml" + try: + persist_mod.atomic_write_yaml(target_path, yaml_body, persist_id=persist_id) + except OSError as exc: + logger.exception("[PERSIST] echec atomic write") + raise HTTPException( + status_code=500, + detail={"error": "atomic_write_failed", "message": str(exc)}, + ) + + # Post-write roundtrip via catalog + try: + from core.competences.catalog import load_competence_file + roundtrip = load_competence_file(target_path) + if roundtrip.id != slug: + raise ValueError(f"roundtrip id mismatch : {roundtrip.id} != {slug}") + except Exception as exc: + logger.critical("[PERSIST] roundtrip KO pour %s : %s", target_path, exc) + try: + target_path.unlink() + except OSError: + pass + raise HTTPException( + status_code=500, + detail={"error": "roundtrip_failed", "message": str(exc)}, + ) + + # --- 12. Journal d'audit --------------------------------------------- + audit_entry = { + "persist_id": persist_id, + "machine_id": payload.machine_id, + "session_id": payload.session_id, + "competence_name": payload.name, + "competence_id": slug, + "yaml_path": f"data/competences/candidate/{slug}.yaml", + "learning_state": learning_state, + "partial": partial, + "dropout_reason": dropout_reason, + "external_agent_id": external_agent_id, + } + try: + audit_entry_id = persist_mod.audit_append( + audit_entry, audit_path=persist_mod.AUDIT_PATH + ) + except OSError as exc: + logger.exception("[PERSIST] echec audit_append (YAML deja ecrit)") + # Ne pas supprimer le YAML : ecrit avec succes, audit non-fatal. + audit_entry_id = -1 + logger.warning("[PERSIST] audit non journalise : %s", exc) + + # --- 13. Si partial : journalisation supplementaire ------------------ + if partial: + try: + persist_mod.audit_append( + { + **audit_entry, + "audit_entry_id": audit_entry_id, + "incomplete": True, + }, + audit_path=persist_mod.INCOMPLETE_PATH, + ) + except OSError as exc: + logger.warning("[PERSIST] incomplete_learnings.jsonl non ecrit : %s", exc) + + logger.info( + "[PERSIST] competence persistee slug=%s state=%s machine_id=%s persist_id=%s audit_id=%d", + slug, learning_state, payload.machine_id, persist_id, audit_entry_id, + ) + + return { + "competence_id": slug, + "yaml_path": f"data/competences/candidate/{slug}.yaml", + "learning_state": learning_state, + "persist_id": persist_id, + "audit_entry_id": audit_entry_id, + } + + +# ========================================================================= +# POC Phase 2.5 sémantique — POST /api/v1/lea/screen/analyze +# Analyse post-apprentissage des écrans capturés (Shadow stop -> analyse -> +# restitution Option C). N'altère JAMAIS le hot path replay. +# Specs : docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md +# Périmètre : ajout en fin de fichier uniquement (aucune modif endpoints +# existants, aucun changement du _guard_agent_registry_access). +# ========================================================================= + +try: + from core.semantic.phase25_analyzer import ( # type: ignore + Phase25Analyzer as _Phase25Analyzer, + load_frames_from_paths as _phase25_load_frames, + SEMANTIC_DIR as _PHASE25_SEMANTIC_DIR, + ) + _PHASE25_AVAILABLE = True +except Exception as _phase25_import_exc: # pragma: no cover + _Phase25Analyzer = None # type: ignore[assignment] + _phase25_load_frames = None # type: ignore[assignment] + _PHASE25_SEMANTIC_DIR = None # type: ignore[assignment] + _PHASE25_AVAILABLE = False + logger.warning( + "[PHASE25] core.semantic.phase25_analyzer indisponible : %s", + _phase25_import_exc, + ) + + +class _Phase25ScreenRequest(BaseModel): + """Payload entrée pour ``/api/v1/lea/screen/analyze`` (cf. specs §2).""" + + session_id: str + screenshot_indexes: List[int] = [] + # Optionnels — permettent au caller de fournir directement les chemins + # si le serveur ne sait pas les retrouver depuis le session_id. + screenshot_paths: Optional[Dict[int, str]] = None + window_titles: Optional[Dict[int, str]] = None + slug: Optional[str] = None # si fourni : écrit aussi le .semantic.yaml + write_yaml: bool = False + + +def _phase25_default_paths(session_id: str, indexes: List[int]) -> Dict[int, str]: + """Reconstruction best-effort des chemins screenshots à partir de session_id. + + Convention du POC : ``data/training/live_sessions//shots/.png``. + Si la convention n'est pas respectée, le caller doit fournir + ``screenshot_paths`` explicitement. + """ + base = Path("data") / "training" / "live_sessions" / session_id / "shots" + result: Dict[int, str] = {} + for idx in indexes: + candidate = base / f"{idx}.png" + if candidate.exists(): + result[int(idx)] = str(candidate) + return result + + +@app.post("/api/v1/lea/screen/analyze") +async def lea_screen_analyze(payload: _Phase25ScreenRequest, request: Request): + """Analyser sémantiquement les écrans clés d'une session Shadow. + + Cycle d'appel : Shadow stop -> identification écrans distincts -> + **analyse** -> restitution Option C. + + Specs : ``docs/POC/SPECS_PHASE_25_SEMANTIQUE_2026-06-01.md``. + + Comportement : + - Toujours 200 (jamais 500) : erreur OmniParser -> ``degraded: true`` + + fallback OCR-seul docTR. + - 503 si le module ``core.semantic`` est indisponible. + - 400 si ``session_id`` invalide. + - Hot path replay : strictement intact (cet endpoint n'est appelé que + par agent-chat / dashboard post-apprentissage). + """ + if not _PHASE25_AVAILABLE or _Phase25Analyzer is None: + raise HTTPException( + status_code=503, + detail="Module core.semantic.phase25_analyzer indisponible", + ) + + # Validation session_id (anti path-traversal côté analyzer aussi). + try: + analyzer = _Phase25Analyzer(session_id=payload.session_id) + except ValueError as exc: + raise HTTPException( + status_code=400, + detail={"error": "invalid_session_id", "message": str(exc)}, + ) + + # Récupération des chemins screenshots. + paths = dict(payload.screenshot_paths or {}) + if not paths: + paths = _phase25_default_paths( + payload.session_id, payload.screenshot_indexes + ) + if not paths: + # Aucun frame disponible : on retourne un résultat vide cohérent + # plutôt qu'une erreur. Le caller saura via screens=[]. + from datetime import datetime as _dt_phase25 + from datetime import timezone as _tz_phase25 + return { + "session_id": payload.session_id, + "generated_at": _dt_phase25.now(_tz_phase25.utc).isoformat(), + "omniparser_available": analyzer.omniparser.available, + "degraded": True, + "too_complex": False, + "healthcheck_passed": True, + "healthcheck_reason": "no_frames_provided", + "screens": [], + } + + # Chargement des frames (best-effort, ignore les chemins manquants). + try: + frames = _phase25_load_frames(paths) + except RuntimeError as exc: + raise HTTPException( + status_code=503, + detail={"error": "pil_unavailable", "message": str(exc)}, + ) + + # Lancer l'analyse complète (healthcheck inclus). + # CORRECTIF P1-SEMANTIQUE : analyze_frames est synchrone et CPU/IO-bound + # (OmniParser + docTR, 2-5s par screen). On l'enrobe dans run_in_executor + # pour ne pas bloquer l'event loop FastAPI pendant l'analyse — sinon + # tous les autres endpoints (replay hot path inclus) seraient gelés. + import asyncio as _asyncio_phase25 + + try: + _loop_phase25 = _asyncio_phase25.get_running_loop() + result = await _loop_phase25.run_in_executor( + None, + lambda: analyzer.analyze_frames( + frames=frames, + screenshot_paths=paths, + window_titles=payload.window_titles or {}, + run_healthcheck=True, + ), + ) + except Exception as exc: + # Filet de sécurité ultime : jamais de 500 vers le caller. + logger.exception("[PHASE25] analyze_frames exception inattendue") + raise HTTPException( + status_code=500, + detail={"error": "phase25_internal_error", "message": str(exc)}, + ) + + # Écriture YAML optionnelle (uniquement si slug fourni). + written_yaml_path: Optional[str] = None + if payload.write_yaml and payload.slug: + try: + target = analyzer.write_semantic_yaml(result, slug=payload.slug) + written_yaml_path = str(target) + except ValueError as exc: + raise HTTPException( + status_code=400, + detail={"error": "invalid_slug", "message": str(exc)}, + ) + except OSError as exc: + logger.warning("[PHASE25] write_semantic_yaml KO : %s", exc) + # On ne propage pas en 500 : le payload reste disponible. + written_yaml_path = None + + payload_out = result.to_dict() + if written_yaml_path: + payload_out["semantic_yaml_path"] = written_yaml_path + logger.info( + "[PHASE25] session=%s screens=%d degraded=%s too_complex=%s yaml=%s", + payload.session_id, + len(result.screens), + result.degraded, + result.too_complex, + written_yaml_path or "-", + ) + return payload_out + + if __name__ == "__main__": import uvicorn diff --git a/agent_v0/server_v1/run_worker.py b/agent_v0/server_v1/run_worker.py index ac373dff3..82c1caf1a 100644 --- a/agent_v0/server_v1/run_worker.py +++ b/agent_v0/server_v1/run_worker.py @@ -25,6 +25,7 @@ Le worker : 5. Se suspend quand un replay est actif (libère le GPU) """ +import json import logging import os import signal @@ -67,6 +68,7 @@ class VLMWorker: self._running = False self._processor = None # Initialisé au premier besoin (lazy loading GPU) self._current_session: Optional[str] = None + self._started_at: str = datetime.now().isoformat() # Stats self._stats: Dict[str, int] = { @@ -83,7 +85,10 @@ class VLMWorker: if self._processor is None: logger.info("Initialisation du StreamProcessor (chargement GPU)...") from .stream_processor import StreamProcessor - self._processor = StreamProcessor(data_dir=str(LIVE_SESSIONS_DIR)) + self._processor = StreamProcessor( + data_dir=str(DATA_DIR), + enable_vlm=True, + ) logger.info("StreamProcessor initialisé.") return self._processor @@ -98,6 +103,11 @@ class VLMWorker: logger.info(" Sessions dir : %s", LIVE_SESSIONS_DIR) logger.info(" Poll interval : %ds", POLL_INTERVAL) + # N2 + N3 : santé initiale + signal READY systemd dès le démarrage + # (avant tout chargement GPU, pour ne pas dépasser le timeout de start). + self._write_health("healthy") + self._sd_notify("READY=1") + while self._running: try: # Vérifier si un replay est actif @@ -110,6 +120,7 @@ class VLMWorker: if session_id: self._process_session(session_id) else: + self._write_health("healthy") # N2 : cycle idle time.sleep(POLL_INTERVAL) except KeyboardInterrupt: @@ -119,6 +130,7 @@ class VLMWorker: logger.error("Erreur dans la boucle principale : %s", e, exc_info=True) time.sleep(5) # Éviter une boucle d'erreurs rapide + self._write_health("stopped") # N2 : santé finale logger.info("VLM Worker arrêté.") def stop(self): @@ -126,6 +138,103 @@ class VLMWorker: self._running = False logger.info("Arrêt demandé.") + # ========================================================================= + # N2 — Health file (_worker_health.json) + # ========================================================================= + # + # Garde-fou anti-blocage silencieux : expose l'état de santé du worker sur + # disque pour qu'un superviseur (humain, dashboard, watchdog) détecte un + # worker dégradé sans avoir à fouiller les logs. Écriture atomique. + # + # CONFIDENTIALITÉ (HDS) : n'écrit AUCUNE donnée patient — uniquement des + # identifiants techniques (session_id), des compteurs et des booléens de + # composants. Jamais d'OCR, de noms de fichiers screenshots, ni de contenu + # de session. + + def _sd_notify(self, state: str) -> bool: + """Notifie systemd via $NOTIFY_SOCKET, sans dépendance `systemd.daemon`. + + Implémentation pure socket (AF_UNIX SOCK_DGRAM) : fonctionne sous systemd + `Type=notify` pour `READY=1` et le heartbeat `WATCHDOG=1`. No-op silencieux + hors systemd (variable absente) ou en cas d'erreur — jamais bloquant. + Retourne True si le message a été émis. + """ + addr = os.environ.get("NOTIFY_SOCKET") + if not addr: + return False + try: + import socket + + # Namespace abstrait systemd : '@' → octet nul de préfixe + connect_addr = "\0" + addr[1:] if addr.startswith("@") else addr + with socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) as sock: + sock.connect(connect_addr) + sock.sendall(state.encode("utf-8")) + return True + except Exception as e: + logger.debug("sd_notify(%s) échoué : %s", state, e) + return False + + def _health_components(self) -> Dict[str, bool]: + """Statut booléen de chaque composant lourd, dérivé du processor.""" + proc = self._processor + return { + "screen_analyzer": proc is not None and getattr(proc, "_screen_analyzer", None) is not None, + "clip_embedder": proc is not None and getattr(proc, "_clip_embedder", None) is not None, + "faiss_manager": proc is not None and getattr(proc, "_faiss_manager", None) is not None, + "state_embedding_builder": proc is not None and getattr(proc, "_state_embedding_builder", None) is not None, + } + + def _write_health(self, status: str) -> None: + """Écrit data/training/_worker_health.json de façon atomique. + + `status` attendu : healthy | busy | degraded | stopped. Si le worker + tourne en mode VLM mais que ScreenAnalyzer est absent, le statut est + forcé à 'degraded' quelle que soit la valeur demandée. + """ + try: + components = self._health_components() + + proc = self._processor + vlm_mode = proc is not None and getattr(proc, "_enable_vlm", False) + if vlm_mode and not components["screen_analyzer"]: + status = "degraded" + + queue_path = DATA_DIR / "_worker_queue.txt" + try: + queue_length = len( + [ln for ln in queue_path.read_text(encoding="utf-8").splitlines() if ln.strip()] + ) if queue_path.exists() else 0 + except Exception: + queue_length = 0 + + payload = { + "pid": os.getpid(), + "started_at": self._started_at, + "last_cycle": datetime.now().isoformat(), + "current_session": self._current_session, + "queue_length": queue_length, + "components": components, + "stats": dict(self._stats), + "status": status, + } + + health_path = DATA_DIR / "_worker_health.json" + tmp_path = health_path.with_suffix(".json.tmp") + tmp_path.write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + tmp_path.rename(health_path) + except Exception as e: + # Le health file est un garde-fou, jamais un point de défaillance. + logger.warning("Écriture health file échouée : %s", e) + + # N3 : chaque écriture santé sert aussi de heartbeat watchdog systemd + # (sauf à l'arrêt). No-op hors systemd. + if status != "stopped": + self._sd_notify("WATCHDOG=1") + # ========================================================================= # Queue management (fichier _worker_queue.txt) # ========================================================================= @@ -206,6 +315,9 @@ class VLMWorker: REPLAY_WAIT_TIMEOUT, ) break + # N3 : heartbeat pendant la pause replay (peut durer jusqu'à 120s, + # sinon le watchdog tuerait un worker pourtant sain et en attente). + self._sd_notify("WATCHDOG=1") time.sleep(REPLAY_CHECK_INTERVAL) elapsed = time.time() - start @@ -220,6 +332,7 @@ class VLMWorker: """Traite une session complète (analyse VLM + construction workflow).""" self._current_session = session_id logger.info("=== Début traitement session %s ===", session_id) + self._write_health("busy") # N2 : début de session start_time = time.time() try: @@ -331,6 +444,7 @@ class VLMWorker: finally: self._current_session = None + self._write_health("healthy") # N2 : fin de session (ou degraded auto) logger.info("=== Fin traitement session %s ===", session_id) @@ -347,6 +461,8 @@ class VLMWorker: f" ({shot_id})" if shot_id else "", ) + self._write_health("busy") # N2 : heartbeat à chaque screenshot + # Vérifier si un replay est devenu actif pendant le traitement if self._is_replay_active(): logger.info( diff --git a/agent_v0/server_v1/safety_checks_provider.py b/agent_v0/server_v1/safety_checks_provider.py index 6360ea363..c899a3b21 100644 --- a/agent_v0/server_v1/safety_checks_provider.py +++ b/agent_v0/server_v1/safety_checks_provider.py @@ -20,6 +20,15 @@ from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) +try: + from agent_v0.agent_v1.ui.message_contract import ( + coerce_supervised_pause_message, + warn_visible_message, + ) +except Exception: # pragma: no cover - fallback for partial server deployments + coerce_supervised_pause_message = None + warn_visible_message = None + @dataclass class PausePayload: @@ -50,8 +59,25 @@ def build_pause_payload( last_screenshot: Optional[str], ) -> PausePayload: """Construit le payload de pause enrichi pour une action pause_for_human.""" - params = action.get("parameters") or {} - message = params.get("message", "Validation requise") + params = dict(action.get("parameters") or {}) + for key in ("message", "safety_level", "safety_checks", "pause_reason"): + if key not in params or params.get(key) in (None, "", []): + if action.get(key) not in (None, "", []): + params[key] = action.get(key) + + raw_message = ( + params.get("message") + or action.get("message") + or action.get("intention") + or "" + ) + message = _coerce_pause_message( + raw_message, + intention=params.get("intention") or action.get("intention") or action.get("description"), + attendu=params.get("attendu") or params.get("expected") or action.get("expected"), + vu=params.get("vu") or params.get("observed") or action.get("observed"), + demande=params.get("demande") or params.get("request"), + ) safety_level = params.get("safety_level") declarative = params.get("safety_checks") or [] @@ -90,11 +116,60 @@ def build_pause_payload( return PausePayload( checks=checks, - pause_reason="", + pause_reason=params.get("pause_reason", ""), message=message, ) +def _coerce_pause_message( + message: Any = "", + *, + intention: Any = "", + attendu: Any = "", + vu: Any = "", + demande: Any = "", +) -> str: + if warn_visible_message is not None: + warn_visible_message( + message, + source="safety_checks_provider._coerce_pause_message.raw", + supervised_pause=False, + ) + + if coerce_supervised_pause_message is not None: + result = coerce_supervised_pause_message( + message, + intention=intention, + attendu=attendu, + vu=vu, + demande=demande, + ) + if warn_visible_message is not None: + warn_visible_message( + result, + source="safety_checks_provider._coerce_pause_message.final", + supervised_pause=True, + ) + return result + + fallback_request = "indiquer si je peux continuer ou corriger l'action attendue" + result = "\n".join( + ( + f"J'essaie de : {intention or 'continuer une etape supervisee'}", + f"J'attendais : {attendu or 'un accord humain clair avant de continuer'}", + f"Je vois : {vu or 'je suis sur une etape qui demande une verification humaine'}", + f"Peux-tu : {demande or message or fallback_request}", + ) + ) + if warn_visible_message is not None: + warn_visible_message( + result, + source="safety_checks_provider._coerce_pause_message.final_fallback", + supervised_pause=True, + ) + return result + + def _call_llm_for_contextual_checks( action: Dict[str, Any], replay_state: Dict[str, Any], diff --git a/agent_v0/server_v1/stream_processor.py b/agent_v0/server_v1/stream_processor.py index 089ec996a..9cc534bcd 100644 --- a/agent_v0/server_v1/stream_processor.py +++ b/agent_v0/server_v1/stream_processor.py @@ -37,6 +37,11 @@ _MODIFIER_ONLY_KEYS = { "meta", "meta_l", "meta_r", "super", "super_l", "super_r", } +_STANDALONE_SYSTEM_KEYS = { + "win", "win_l", "win_r", "cmd", "cmd_l", "cmd_r", + "windows", "meta", "meta_l", "meta_r", "super", "super_l", "super_r", +} + # Mapping numpad vk codes → caractères (layout-indépendant) _NUMPAD_VK_MAP = { 96: '0', 97: '1', 98: '2', 99: '3', 100: '4', @@ -69,6 +74,18 @@ def _is_modifier_only(keys: list) -> bool: return all(k.lower() in _MODIFIER_ONLY_KEYS for k in keys) +def _is_standalone_system_key(keys: list) -> bool: + """True pour les touches système seules qui sont des gestes utiles.""" + if len(keys) != 1: + return False + return str(keys[0]).lower() in _STANDALONE_SYSTEM_KEYS + + +def _is_ignorable_modifier_only(keys: list) -> bool: + """True pour les modificateurs seuls qui ne doivent pas devenir replay.""" + return _is_modifier_only(keys) and not _is_standalone_system_key(keys) + + def _sanitize_keys(keys: list) -> list: """Nettoyer une liste de touches : convertir les caractères de contrôle.""" cleaned = [] @@ -94,7 +111,7 @@ def _is_parasitic_event(event_data: Dict[str, Any]) -> bool: if event_type in ("key_press", "key_combo"): keys = event_data.get("keys", event_data.get("data", {}).get("keys", [])) - if not keys or _is_modifier_only(keys): + if not keys or _is_ignorable_modifier_only(keys): return True elif event_type == "text_input": @@ -203,7 +220,7 @@ def _filter_parasitic_steps(steps: list) -> list: s for s in steps if not ( s.get("type") in ("key_combo", "key_press") - and _is_modifier_only(s.get("keys", [])) + and _is_ignorable_modifier_only(s.get("keys", [])) ) ] @@ -266,7 +283,7 @@ def clean_enriched_actions(actions: list) -> list: # key_combo : sanitiser les touches, puis filtrer les modificateurs seuls if atype == 'key_combo': keys = _sanitize_keys(a.get('keys', [])) - if _is_modifier_only(keys): + if _is_ignorable_modifier_only(keys): continue if not keys: continue @@ -328,6 +345,12 @@ _IGNORED_EVENT_TYPES = frozenset({ _POST_COMBO_WAITS = { # (tuple de touches normalisées, triées en minuscule) -> wait_ms # NB : les tuples sont sorted() alphabétiquement + ('win',): 2000, # Win seul → menu/recherche Windows + ('cmd',): 2000, + ('s', 'win'): 2000, # Win+S → Recherche Windows + ('cmd', 's'): 2000, + ('escape',): 800, # Escape → fermeture menu/dialogue + ('esc',): 800, ('r', 'win'): 3000, # Win+R → Exécuter ('r', 'super'): 3000, ('meta', 'r'): 3000, @@ -956,8 +979,32 @@ def enrich_click_from_screenshot( vlm_description = ", ".join(vlm_parts) if vlm_parts else "" # ── 4. SomEngine : identifier l'élément cliqué ── + # C2d-bis (2026-05-25) court-circuits : + # Niveau A : si vision_info.text déjà présent, le code priorise vision_info.text + # ligne 974-981 de toute façon → SomEngine redondant (économie ~1.2s/clic CPU). + # Niveau B : flag RPA_SKIP_BUILD_VISION=true (alias RPA_SKIP_BUILD_VLM) + # skip total SomEngine + gemma4 (économie ~4s/clic). Défaut OFF + # pour préserver comportement historique. + has_vision_text = bool(isinstance(vision_info, dict) and vision_info.get("text")) + _skip_flag_raw = ( + os.environ.get("RPA_SKIP_BUILD_VISION") + or os.environ.get("RPA_SKIP_BUILD_VLM") + or "0" + ) + skip_build_vision = _skip_flag_raw.strip().lower() in ("1", "true", "yes") + som_elem = None - if session_dir and screenshot_id: + if skip_build_vision: + logger.debug( + "[PERF] vision.skip_som reason=RPA_SKIP_BUILD_VISION click=(%d,%d)", + click_x, click_y, + ) + elif has_vision_text: + logger.debug( + "[PERF] vision.skip_som reason=vision_info.text click=(%d,%d) text=%r", + click_x, click_y, vision_info.get("text", "")[:40], + ) + elif session_dir and screenshot_id: # Appeler _som_identify_clicked_element via un event_data minimal fake_event = { "screenshot_id": screenshot_id, @@ -981,10 +1028,15 @@ def enrich_click_from_screenshot( text_source = "ocr" # ── 5b. Gemma4 : identifier l'élément cliqué via le screenshot fenêtre ── - # Quand l'OCR et SomEngine ne trouvent pas de texte, gemma4 (port 11435) - # reçoit le screenshot fenêtre + la position du clic et décrit l'élément. - # Un seul appel, une seule fois, pendant l'enregistrement. - if not element_text: + # Quand l'OCR et SomEngine ne trouvent pas de texte, gemma4 reçoit le + # screenshot fenêtre + la position du clic et décrit l'élément. + # Skippé si RPA_SKIP_BUILD_VISION actif (Niveau B C2d-bis). + if not element_text and skip_build_vision: + logger.debug( + "[PERF] vision.skip_gemma4 reason=RPA_SKIP_BUILD_VISION click=(%d,%d)", + click_x, click_y, + ) + elif not element_text: # Essayer avec le screenshot fenêtre (contexte complet) win_screenshot = None if session_dir and screenshot_id: @@ -1320,6 +1372,157 @@ def _infer_close_tab_target( return None +def _is_notepad_title(title: str) -> bool: + """Retourne True pour les fenêtres Bloc-notes modernes/françaises.""" + lowered = str(title or "").casefold() + return "bloc-notes" in lowered or "notepad" in lowered + + +def _infer_save_dialog_primary_button_target( + raw_events: list, + click_event: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Détecter le bouton primaire du dialogue Windows ``Enregistrer sous``. + + Pattern réel ``sess_20260520T102916_066851`` : + - clic dans la fenêtre ``Enregistrer sous`` en bas de la boîte ; + - focus immédiat de retour vers ``... – Bloc-notes``. + + Quand OCR/SomEngine sont skippés au build, ce clic restait seulement + décrit par position + crop. Le template matching pouvait alors dériver. + On encode donc l'intention UI stable : bouton ``Enregistrer``. + """ + if click_event.get("type") != "mouse_click": + return None + + window = click_event.get("window", {}) + if not isinstance(window, dict): + return None + + from_title = str(window.get("title", "") or "").strip() + app_name = str(window.get("app_name", "") or "").strip().lower() + if from_title.casefold() != "enregistrer sous": + return None + if app_name and "notepad" not in app_name: + return None + + window_capture = click_event.get("window_capture", {}) + if not isinstance(window_capture, dict): + return None + click_relative = window_capture.get("click_relative") + window_size = window_capture.get("window_size") + if not ( + isinstance(click_relative, list) + and len(click_relative) == 2 + and isinstance(window_size, list) + and len(window_size) == 2 + ): + return None + try: + rel_y = float(click_relative[1]) + win_h = float(window_size[1]) + except (TypeError, ValueError): + return None + if win_h <= 0 or rel_y / win_h < 0.78: + # Boutons Enregistrer/Annuler en bas de dialogue. + return None + + click_ts = click_event.get("timestamp") + click_pos = click_event.get("pos") or [] + match_idx = None + for idx, raw_evt in enumerate(raw_events): + event_data = raw_evt.get("event", raw_evt) + if event_data.get("type") != "mouse_click": + continue + if event_data.get("timestamp") != click_ts: + continue + if (event_data.get("pos") or []) != click_pos: + continue + match_idx = idx + break + + if match_idx is None: + return None + + for follow_evt in raw_events[match_idx + 1: match_idx + 6]: + follow_data = follow_evt.get("event", follow_evt) + follow_type = follow_data.get("type", "") + if follow_type in {"mouse_click", "text_input", "key_press", "key_combo"}: + return None + if follow_type != "window_focus_change": + continue + + to_info = follow_data.get("to", {}) + if not isinstance(to_info, dict): + continue + to_title = str(to_info.get("title", "") or "").strip() + to_app = str(to_info.get("app_name", "") or "").strip().lower() + if "notepad" not in to_app or not _is_notepad_title(to_title): + continue + + follow_ts = follow_data.get("timestamp") + if ( + isinstance(click_ts, (int, float)) + and isinstance(follow_ts, (int, float)) + and follow_ts - click_ts > 3.0 + ): + break + + return { + "by_text": "Enregistrer", + "by_role": "button", + "window_title": "Enregistrer sous", + "context_hints": { + "window_title": "Enregistrer sous", + "interaction": "save_dialog_primary_button", + "expected_after_window": to_title, + }, + "vlm_description": ( + "Dans la fenêtre 'Enregistrer sous', le bouton principal " + "'Enregistrer' en bas de la boîte de dialogue" + ), + } + + return None + + +def _is_post_save_out_of_window_click(event_data: dict) -> bool: + """Vrai pour un clic parasite hors fenêtre juste après sauvegarde Notepad.""" + if event_data.get("type") != "mouse_click": + return False + window = event_data.get("window", {}) + if not isinstance(window, dict): + return False + if not _is_notepad_title(str(window.get("title", "") or "")): + return False + + window_capture = event_data.get("window_capture", {}) + if not isinstance(window_capture, dict): + return False + if window_capture.get("click_inside_window") is False: + return True + + click_relative = window_capture.get("click_relative") + window_size = window_capture.get("window_size") + if not ( + isinstance(click_relative, list) + and len(click_relative) == 2 + and isinstance(window_size, list) + and len(window_size) == 2 + ): + return False + try: + rel_x = float(click_relative[0]) + rel_y = float(click_relative[1]) + win_w = float(window_size[0]) + win_h = float(window_size[1]) + except (TypeError, ValueError): + return False + return win_w > 0 and win_h > 0 and ( + rel_x < 0 or rel_y < 0 or rel_x > win_w or rel_y > win_h + ) + + def _attach_expected_window_before(actions: list, raw_events: list) -> None: """Attacher la fenêtre attendue AVANT chaque clic en rejouant les raw events et en conservant le dernier ``window_focus_change.to.title``. @@ -1463,6 +1666,17 @@ def _enrich_actions_with_intentions( """ import requests as _requests + skip_flag = ( + os.environ.get("RPA_SKIP_INTENTION_ENRICHMENT") + or os.environ.get("RPA_SKIP_ENRICHMENT") + or "" + ) + if skip_flag.strip().lower() in {"1", "true", "yes", "on"}: + logger.info( + "Enrichissement intentions désactivé par RPA_SKIP_INTENTION_ENRICHMENT" + ) + return + gemma4_port = os.environ.get("GEMMA4_PORT", _GEMMA4_PORT) gemma4_url = f"http://localhost:{gemma4_port}/api/chat" @@ -1659,6 +1873,21 @@ def build_replay_from_raw_events( if not events: return [] + # C2b 2026-05-25 : instrumentation [PERF] des étapes de build_replay + # (décomposition des ~22s restantes après skip enrichissement gemma4). + # Préfixe [PERF] cohérent avec arbitrage Codex D3 10:19. Pas de flag : + # spans build hors boucle chaude, info permanente OK. + import time as _time_perf + _perf_t_step = _time_perf.perf_counter() + _perf_t_total = _perf_t_step + + def _perf_log(step: str) -> None: + nonlocal _perf_t_step + now = _time_perf.perf_counter() + elapsed_ms = (now - _perf_t_step) * 1000 + logger.info("[PERF] build.%s session=%s elapsed_ms=%.0f", step, session_id, elapsed_ms) + _perf_t_step = now + # Résoudre le répertoire de session pour les crops visuels session_dir_path = Path(session_dir) if session_dir else None if session_dir_path and not session_dir_path.is_dir(): @@ -1675,6 +1904,8 @@ def build_replay_from_raw_events( bool(session_dir_path), ) + _perf_log("step1_extract_resolution") + # ── 2. Filtrer et normaliser les événements ── actionable_events = [] saw_save_combo = False # Tracker Ctrl+S / Ctrl+Shift+S pour la coupure systray @@ -1714,8 +1945,19 @@ def build_replay_from_raw_events( ) break + if _is_post_save_out_of_window_click(event_data): + logger.debug( + "Coupure du replay : clic post-save hors fenêtre applicative " + "(window=%s, click_relative=%s)", + (event_data.get("window") or {}).get("title", ""), + (event_data.get("window_capture") or {}).get("click_relative"), + ) + break + actionable_events.append(event_data) + _perf_log("step2_filter_normalize") + # ── 3. Fusionner les text_input consécutifs ── # Tous les text_input consécutifs sont fusionnés en un seul, indépendamment # du gap temporel. L'utilisateur tape lettre par lettre mais on veut un @@ -1854,6 +2096,8 @@ def build_replay_from_raw_events( original[:50], ) + _perf_log("step3_merge_text_input") + # ── 4. Convertir en actions replay normalisées ── actions = [] last_ts = 0.0 @@ -1977,6 +2221,22 @@ def build_replay_from_raw_events( target_spec["context_hints"] = context_hints action["visual_mode"] = True + save_dialog_target = _infer_save_dialog_primary_button_target(events, evt) + if save_dialog_target: + target_spec = action.setdefault("target_spec", {}) + target_spec["by_text"] = save_dialog_target["by_text"] + target_spec["by_text_source"] = "heuristic" + target_spec["by_role"] = save_dialog_target["by_role"] + target_spec["window_title"] = save_dialog_target["window_title"] + target_spec["vlm_description"] = save_dialog_target["vlm_description"] + context_hints = dict(target_spec.get("context_hints") or {}) + context_hints.update(save_dialog_target["context_hints"]) + target_spec["context_hints"] = context_hints + expected_after_window = context_hints.get("expected_after_window") + if expected_after_window: + action["expected_window_title"] = expected_after_window + action["visual_mode"] = True + elif evt_type == "text_input": text = evt.get("text", "") if not text: @@ -2027,8 +2287,11 @@ def build_replay_from_raw_events( actions.append(action) + _perf_log("step4_convert_actions_and_crops") + # ── 5. Nettoyage global (dédup combos, sanitize, merge texte, waits) ── actions = clean_enriched_actions(actions) + _perf_log("step5_clean_enriched_actions") # ── 6. Insérer des waits contextuels après raccourcis critiques ── final_actions = [] @@ -2043,6 +2306,8 @@ def build_replay_from_raw_events( "duration_ms": post_wait, }) + _perf_log("step6_insert_contextual_waits") + # ── 7. Dernier nettoyage des waits consécutifs ── result = [] for a in final_actions: @@ -2055,12 +2320,16 @@ def build_replay_from_raw_events( continue result.append(a) + _perf_log("step7_cleanup_consecutive_waits") + # ── 8. Attacher les screenshots de référence (état attendu après action) ── # Les screenshots res_shot_XXXX.png capturés 1s après chaque action pendant # l'enregistrement servent de référence pour le contrôle visuel. if session_dir_path: _attach_expected_screenshots(result, events, session_dir_path) + _perf_log("step8_attach_screenshots") + # ── 9. Enrichir avec expected_window_title (titre fenêtre attendu après le clic) ── # Pour la vérification post-action : le titre de la fenêtre APRÈS le clic # est le window_title du PROCHAIN clic dans la séquence. @@ -2087,6 +2356,8 @@ def build_replay_from_raw_events( # il prime sur target_spec.window_title obsolète. _attach_expected_window_before(result, events) + _perf_log("step9_expected_window_title") + # ── 10. Enrichir avec intention + expected_result via gemma4 (Critic) ── # gemma4 analyse chaque action dans son contexte pour produire : # - intention : ce que l'utilisateur veut accomplir @@ -2099,6 +2370,8 @@ def build_replay_from_raw_events( if session_dir_path: _enrich_actions_with_intentions(result, session_dir_path) + _perf_log("step10_enrich_intentions_gemma4") + # ── 11. Consolider avec les apprentissages passés ── # Les replays précédents ont enregistré quelles méthodes marchent # pour quels éléments. On réinjecte ces connaissances dans le workflow. @@ -2115,6 +2388,10 @@ def build_replay_from_raw_events( except Exception as e: logger.debug("Consolidation apprentissage échouée : %s", e) + _perf_log("step11_replay_learner_consolidation") + _total_ms = (_time_perf.perf_counter() - _perf_t_total) * 1000 + logger.info("[PERF] build.TOTAL session=%s total_ms=%.0f", session_id, _total_ms) + # Stats visual replay visual_clicks = sum( 1 for a in result @@ -2148,8 +2425,9 @@ class StreamProcessor: 4. finalize_session() — construit le Workflow via GraphBuilder (DBSCAN) """ - def __init__(self, data_dir: str = "data/training"): + def __init__(self, data_dir: str = "data/training", enable_vlm: bool = False): self.data_dir = Path(data_dir) + self._enable_vlm = enable_vlm persist_dir = str(self.data_dir / "streaming_sessions") live_sessions_dir = str(self.data_dir / "live_sessions") self.session_manager = LiveSessionManager( @@ -2290,10 +2568,12 @@ class StreamProcessor: """ if self._initialized: return - # Marquer comme initialisé SANS charger les composants GPU - self._initialized = True - logger.info("StreamProcessor initialisé en mode LÉGER (pas de GPU, pas de VLM)") - return + if not self._enable_vlm: + # Marquer comme initialisé SANS charger les composants GPU. Le serveur + # HTTP reste en mode léger ; le worker dédié active enable_vlm=True. + self._initialized = True + logger.info("StreamProcessor initialisé en mode LÉGER (pas de GPU, pas de VLM)") + return with self._lock: if self._initialized: @@ -2357,6 +2637,20 @@ class StreamProcessor: logger.error(f" Erreur init FAISSManager: {e}") self._faiss_manager = None + # N1 anti-poison : en mode VLM, un ScreenAnalyzer absent rend le worker + # incapable d'enrichir le moindre screenshot. Ne PAS figer + # _initialized=True dans ce cas, sinon l'échec (souvent transitoire : + # contention GPU au boot, OOM passager) est mis en cache pour toute la + # vie du process — c'est précisément ce qui a provoqué le blocage R6 de + # 5 jours (worker vivant mais 0 enrichissement, sans alarme). On laisse + # _initialized à False pour réessayer au screenshot / cycle suivant. + if self._screen_analyzer is None: + logger.critical( + "Worker VLM DÉGRADÉ : ScreenAnalyzer indisponible après init " + "(_initialized laissé à False, retry au prochain cycle)." + ) + return + self._initialized = True logger.info("Composants core initialisés.") @@ -3115,7 +3409,7 @@ class StreamProcessor: # pour que ScreenAnalyzer crée des ScreenStates avec les bons titres de fenêtre self._restore_window_events(session_id, session_dir) - # Restaurer les événements utilisateur (mouse_click, text_input, key_press) + # Restaurer les événements utilisateur (mouse_click, text_input, key_press, key_combo) # depuis live_events.jsonl → session.events, pour que to_raw_session() # puisse les passer au GraphBuilder (construction des edges/actions) self._restore_user_events(session_id, session_dir) @@ -3377,7 +3671,7 @@ class StreamProcessor: def _restore_user_events(self, session_id: str, session_dir: Path): """Restaurer les événements utilisateur depuis live_events.jsonl. - Charge les événements d'action (mouse_click, text_input, key_press) + Charge les événements d'action (mouse_click, text_input, key_press, key_combo) dans session.events via session_manager.add_event(). Sans cela, to_raw_session() retourne une liste d'events vide, et le GraphBuilder ne peut pas construire les actions des edges. @@ -3423,7 +3717,7 @@ class StreamProcessor: evt_type = event_data.get("type", "") ts = float(event_data.get("timestamp", raw.get("timestamp", 0))) - if evt_type not in ("mouse_click", "text_input", "key_press"): + if evt_type not in ("mouse_click", "text_input", "key_press", "key_combo"): continue # Construire le dict d'événement pour add_event() @@ -3438,8 +3732,11 @@ class StreamProcessor: evt_dict["button"] = event_data.get("button", "left") elif evt_type == "text_input": evt_dict["text"] = event_data.get("text", "") - elif evt_type == "key_press": + elif evt_type in ("key_press", "key_combo"): evt_dict["keys"] = event_data.get("keys", []) + raw_keys = event_data.get("raw_keys") + if raw_keys: + evt_dict["raw_keys"] = raw_keys # Copier window info si disponible window = event_data.get("window") diff --git a/agent_v0/server_v1/worker_stream.py b/agent_v0/server_v1/worker_stream.py index 1b5cfb5ec..b92ddeeb5 100644 --- a/agent_v0/server_v1/worker_stream.py +++ b/agent_v0/server_v1/worker_stream.py @@ -34,8 +34,16 @@ class StreamWorker: self.running = False self.processed_files: Set[str] = set() - # StreamProcessor partagé (créé si non fourni) - self.processor = processor or StreamProcessor(data_dir=str(self.live_dir)) + # StreamProcessor partagé (créé si non fourni). En mode standalone, + # live_dir pointe normalement vers data/training/live_sessions ; le + # processor doit garder data/training comme racine pour workflows/. + processor_data_dir = ( + self.live_dir.parent if self.live_dir.name == "live_sessions" else self.live_dir + ) + self.processor = processor or StreamProcessor( + data_dir=str(processor_data_dir), + enable_vlm=True, + ) self._thread: threading.Thread = None diff --git a/tests/integration/test_replay_single_inflight.py b/tests/integration/test_replay_single_inflight.py index e70d5f9e8..1f701acd0 100644 --- a/tests/integration/test_replay_single_inflight.py +++ b/tests/integration/test_replay_single_inflight.py @@ -15,13 +15,20 @@ if str(ROOT) not in sys.path: @pytest.fixture -def isolated_replay_state(monkeypatch): +def isolated_replay_state(monkeypatch, tmp_path): monkeypatch.setenv("RPA_API_TOKEN", "test_replay_single_inflight_token") from agent_v0.server_v1 import api_stream + from agent_v0.server_v1.agent_registry import AgentRegistry monkeypatch.setattr(api_stream, "API_TOKEN", "test_replay_single_inflight_token") + # Isoler le registre pour que _agent_registry_has_entries() retourne False + # (mode dev, aucun agent enrolle) — sinon le garde fleet bloque les tests + original_registry = api_stream.agent_registry + empty_registry = AgentRegistry(db_path=str(tmp_path / "empty_agents.db")) + monkeypatch.setattr(api_stream, "agent_registry", empty_registry) + if api_stream._replay_lock.locked(): pytest.fail( "_replay_lock is already held at fixture setup — a previous test " @@ -53,6 +60,7 @@ def isolated_replay_state(monkeypatch): api_stream._machine_replay_target.update(saved_targets) api_stream._last_heartbeat.clear() api_stream._last_heartbeat.update(saved_heartbeat) + monkeypatch.setattr(api_stream, "agent_registry", original_registry) def _running_replay_state( diff --git a/tests/integration/test_stream_processor.py b/tests/integration/test_stream_processor.py index 7c6495b4c..660187901 100644 --- a/tests/integration/test_stream_processor.py +++ b/tests/integration/test_stream_processor.py @@ -11,6 +11,7 @@ import shutil import sys import tempfile import threading +import types from pathlib import Path from unittest.mock import MagicMock, patch @@ -171,6 +172,271 @@ class TestLiveSessionManager: class TestStreamProcessor: + def test_default_initialization_stays_light(self, temp_dir): + """Par défaut, l'API HTTP ne charge pas les composants VLM/GPU.""" + from agent_v0.server_v1.stream_processor import StreamProcessor + + test_processor = StreamProcessor(data_dir=temp_dir) + test_processor._ensure_initialized() + + assert test_processor._initialized is True + assert test_processor._screen_analyzer is None + assert test_processor._clip_embedder is None + assert test_processor._faiss_manager is None + + def test_enable_vlm_initialization_loads_components(self, temp_dir, monkeypatch): + """Le worker VLM peut explicitement charger ScreenAnalyzer/CLIP/FAISS.""" + from agent_v0.server_v1.stream_processor import StreamProcessor + + screen_module = types.ModuleType("core.pipeline.screen_analyzer") + clip_module = types.ModuleType("core.embedding.clip_embedder") + state_module = types.ModuleType("core.embedding.state_embedding_builder") + faiss_module = types.ModuleType("core.embedding.faiss_manager") + + class FakeScreenAnalyzer: + def __init__(self, session_id=""): + self.session_id = session_id + + class FakeCLIPEmbedder: + pass + + class FakeStateEmbeddingBuilder: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + + class FakeFAISSManager: + def __init__(self, *args, **kwargs): + self.args = args + self.kwargs = kwargs + self.index = MagicMock(ntotal=0) + + screen_module.ScreenAnalyzer = FakeScreenAnalyzer + clip_module.CLIPEmbedder = FakeCLIPEmbedder + state_module.StateEmbeddingBuilder = FakeStateEmbeddingBuilder + faiss_module.FAISSManager = FakeFAISSManager + + monkeypatch.setitem(sys.modules, "core.pipeline.screen_analyzer", screen_module) + monkeypatch.setitem(sys.modules, "core.embedding.clip_embedder", clip_module) + monkeypatch.setitem(sys.modules, "core.embedding.state_embedding_builder", state_module) + monkeypatch.setitem(sys.modules, "core.embedding.faiss_manager", faiss_module) + + test_processor = StreamProcessor(data_dir=temp_dir, enable_vlm=True) + test_processor._ensure_initialized() + + assert test_processor._initialized is True + assert isinstance(test_processor._screen_analyzer, FakeScreenAnalyzer) + assert isinstance(test_processor._clip_embedder, FakeCLIPEmbedder) + assert isinstance(test_processor._state_embedding_builder, FakeStateEmbeddingBuilder) + assert isinstance(test_processor._faiss_manager, FakeFAISSManager) + + def test_enable_vlm_screen_analyzer_failure_does_not_cache_broken_state( + self, temp_dir, monkeypatch, caplog + ): + """N1 anti-poison : en mode VLM, si ScreenAnalyzer échoue à l'init, ne PAS + figer _initialized=True (sinon le worker reste cassé à vie, cf. blocage R6 + des 5 jours). Doit logger en critical et permettre un retry au cycle suivant. + """ + import logging + + from agent_v0.server_v1.stream_processor import StreamProcessor + + screen_module = types.ModuleType("core.pipeline.screen_analyzer") + clip_module = types.ModuleType("core.embedding.clip_embedder") + state_module = types.ModuleType("core.embedding.state_embedding_builder") + faiss_module = types.ModuleType("core.embedding.faiss_manager") + + class BrokenScreenAnalyzer: + def __init__(self, session_id=""): + raise RuntimeError("CUDA indisponible au démarrage du worker") + + class HealedScreenAnalyzer: + def __init__(self, session_id=""): + self.session_id = session_id + + class FakeCLIPEmbedder: + pass + + class FakeStateEmbeddingBuilder: + def __init__(self, *args, **kwargs): + pass + + class FakeFAISSManager: + def __init__(self, *args, **kwargs): + self.index = MagicMock(ntotal=0) + + screen_module.ScreenAnalyzer = BrokenScreenAnalyzer + clip_module.CLIPEmbedder = FakeCLIPEmbedder + state_module.StateEmbeddingBuilder = FakeStateEmbeddingBuilder + faiss_module.FAISSManager = FakeFAISSManager + + monkeypatch.setitem(sys.modules, "core.pipeline.screen_analyzer", screen_module) + monkeypatch.setitem(sys.modules, "core.embedding.clip_embedder", clip_module) + monkeypatch.setitem(sys.modules, "core.embedding.state_embedding_builder", state_module) + monkeypatch.setitem(sys.modules, "core.embedding.faiss_manager", faiss_module) + + test_processor = StreamProcessor(data_dir=temp_dir, enable_vlm=True) + + with caplog.at_level(logging.CRITICAL): + test_processor._ensure_initialized() + + # Pas de cache à vie : l'état reste retry-able + assert test_processor._initialized is False + assert test_processor._screen_analyzer is None + assert any(rec.levelno == logging.CRITICAL for rec in caplog.records), ( + "un log critical doit signaler le worker VLM dégradé" + ) + + # Retry au cycle suivant : ScreenAnalyzer réparé → init réussit cette fois + screen_module.ScreenAnalyzer = HealedScreenAnalyzer + test_processor._ensure_initialized() + + assert test_processor._initialized is True + assert isinstance(test_processor._screen_analyzer, HealedScreenAnalyzer) + + def test_worker_writes_health_file_with_component_status(self, tmp_path, monkeypatch): + """N2 : le worker écrit _worker_health.json avec le statut des composants + dérivé du processor, le pid, les stats et le statut global.""" + from agent_v0.server_v1 import run_worker + + data_dir = tmp_path / "data" / "training" + data_dir.mkdir(parents=True) + monkeypatch.setattr(run_worker, "DATA_DIR", data_dir) + + worker = run_worker.VLMWorker() + + class FakeProc: + _enable_vlm = True + _screen_analyzer = object() + _clip_embedder = object() + _faiss_manager = object() + _state_embedding_builder = object() + + worker._processor = FakeProc() + worker._stats["sessions_processed"] = 1 + worker._stats["total_screenshots_analyzed"] = 7 + + worker._write_health("healthy") + + health_path = data_dir / "_worker_health.json" + assert health_path.exists() + data = json.loads(health_path.read_text(encoding="utf-8")) + + assert data["status"] == "healthy" + assert data["pid"] == os.getpid() + assert data["components"] == { + "screen_analyzer": True, + "clip_embedder": True, + "faiss_manager": True, + "state_embedding_builder": True, + } + assert data["stats"]["sessions_processed"] == 1 + assert data["stats"]["total_screenshots_analyzed"] == 7 + + def test_worker_health_degraded_when_screen_analyzer_missing(self, tmp_path, monkeypatch): + """N2 : worker VLM dont le ScreenAnalyzer est absent => status 'degraded', + même si l'appelant demande 'healthy'.""" + from agent_v0.server_v1 import run_worker + + data_dir = tmp_path / "data" / "training" + data_dir.mkdir(parents=True) + monkeypatch.setattr(run_worker, "DATA_DIR", data_dir) + + worker = run_worker.VLMWorker() + + class DegradedProc: + _enable_vlm = True + _screen_analyzer = None + _clip_embedder = object() + _faiss_manager = object() + _state_embedding_builder = None + + worker._processor = DegradedProc() + worker._write_health("healthy") + + data = json.loads((data_dir / "_worker_health.json").read_text(encoding="utf-8")) + assert data["status"] == "degraded" + assert data["components"]["screen_analyzer"] is False + + def test_worker_health_file_contains_no_patient_data(self, tmp_path, monkeypatch): + """N2 confidentialité : le health file ne contient que des clés autorisées — + aucune donnée patient (OCR, noms de fichiers screenshots, contenu session).""" + from agent_v0.server_v1 import run_worker + + data_dir = tmp_path / "data" / "training" + data_dir.mkdir(parents=True) + monkeypatch.setattr(run_worker, "DATA_DIR", data_dir) + + worker = run_worker.VLMWorker() + worker._current_session = "sess_20260529T154427_f95956" + worker._write_health("busy") + + data = json.loads((data_dir / "_worker_health.json").read_text(encoding="utf-8")) + allowed_top = { + "pid", "started_at", "last_cycle", "current_session", + "queue_length", "components", "stats", "status", + } + assert set(data.keys()) <= allowed_top, f"clés inattendues: {set(data.keys()) - allowed_top}" + # current_session ne porte que l'identifiant, pas de contenu de session + assert data["current_session"] == "sess_20260529T154427_f95956" + + def test_sd_notify_noop_without_socket(self, monkeypatch): + """N3 : hors systemd (NOTIFY_SOCKET absent), _sd_notify est un no-op + silencieux qui retourne False — jamais d'exception.""" + from agent_v0.server_v1 import run_worker + + monkeypatch.delenv("NOTIFY_SOCKET", raising=False) + worker = run_worker.VLMWorker() + + assert worker._sd_notify("WATCHDOG=1") is False + + def test_sd_notify_sends_watchdog_to_socket(self, tmp_path, monkeypatch): + """N3 : sous systemd, _sd_notify écrit l'état brut dans $NOTIFY_SOCKET.""" + import socket + + from agent_v0.server_v1 import run_worker + + sock_path = str(tmp_path / "notify.sock") + listener = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) + listener.bind(sock_path) + listener.settimeout(2) + try: + monkeypatch.setenv("NOTIFY_SOCKET", sock_path) + worker = run_worker.VLMWorker() + + assert worker._sd_notify("WATCHDOG=1") is True + received = listener.recv(64) + assert received == b"WATCHDOG=1" + finally: + listener.close() + + def test_vlm_worker_uses_training_root_data_dir(self, tmp_path, monkeypatch): + """Le worker R6 doit produire workflows/embeddings sous data/training.""" + from agent_v0.server_v1 import run_worker + + data_dir = tmp_path / "data" / "training" + live_sessions_dir = data_dir / "live_sessions" + monkeypatch.setattr(run_worker, "DATA_DIR", data_dir) + monkeypatch.setattr(run_worker, "LIVE_SESSIONS_DIR", live_sessions_dir) + + test_worker = run_worker.VLMWorker() + test_processor = test_worker._get_processor() + + assert test_processor.data_dir == data_dir + assert test_processor.session_manager._live_sessions_dir == live_sessions_dir + assert test_processor._enable_vlm is True + + def test_stream_worker_standalone_uses_training_root_data_dir(self, tmp_path): + """Le StreamWorker standalone garde aussi data/training comme racine.""" + from agent_v0.server_v1.worker_stream import StreamWorker + + live_sessions_dir = tmp_path / "data" / "training" / "live_sessions" + test_worker = StreamWorker(live_dir=str(live_sessions_dir)) + + assert test_worker.processor.data_dir == live_sessions_dir.parent + assert test_worker.processor.session_manager._live_sessions_dir == live_sessions_dir + assert test_worker.processor._enable_vlm is True + def test_process_event(self, processor): result = processor.process_event("sess_010", { "type": "mouse_click", @@ -181,6 +447,49 @@ class TestStreamProcessor: session = processor.session_manager.get_session("sess_010") assert session.last_window_info["title"] == "Chrome" + def test_restore_user_events_keeps_key_combo(self, processor, tmp_path): + session_id = "sess_restore_combo" + session_dir = tmp_path / session_id + session_dir.mkdir() + (session_dir / "live_events.jsonl").write_text( + json.dumps({ + "session_id": session_id, + "timestamp": 1779900720.0, + "event": { + "type": "key_combo", + "keys": ["win", "s"], + "raw_keys": [ + {"action": "release", "kind": "vk", "vk": 83, "char": "s"}, + {"action": "release", "kind": "key", "name": "cmd"}, + ], + "timestamp": 1779900719.5, + "window": {"title": "Rechercher", "app_name": "SearchHost.exe"}, + "screenshot_id": "shot_0001", + }, + }) + "\n" + + json.dumps({ + "session_id": session_id, + "timestamp": 1779900725.0, + "event": { + "type": "text_input", + "text": "test", + "timestamp": 1779900725.0, + "window": {"title": "Rechercher", "app_name": "SearchHost.exe"}, + }, + }) + "\n", + encoding="utf-8", + ) + + processor.session_manager.add_event(session_id, {"type": "text_input", "text": "old"}) + + processor._restore_user_events(session_id, session_dir) + + session = processor.session_manager.get_session(session_id) + assert [event["type"] for event in session.events] == ["key_combo", "text_input"] + assert session.events[0]["keys"] == ["win", "s"] + assert session.events[0]["raw_keys"][0]["vk"] == 83 + assert session.events[0]["screenshot_id"] == "shot_0001" + def test_process_crop(self, processor): result = processor.process_crop("sess_011", "shot_001_crop", "/tmp/crop.png") assert result["status"] == "crop_stored" @@ -479,6 +788,156 @@ class TestStreamProcessor: assert first_hints.get("active_tab_label") == "test" assert "fermer l'onglet actif 'test'" in first_spec.get("vlm_description", "") + def test_build_replay_save_as_button_gets_semantic_target( + self, tmp_path, monkeypatch, + ): + """Le clic du bouton Enregistrer dans Save As ne doit pas rester + anchor-only/positionnel. + + Régression live 2026-05-25 : avec RPA_SKIP_BUILD_VISION, l'action + Save As était seulement décrite par position + crop, puis résolue par + template matching trop haut/gauche. Le builder doit encoder le bouton + primaire stable ``Enregistrer``. + """ + from agent_v0.server_v1 import stream_processor as sp + + session_dir = tmp_path / "sess" + (session_dir / "shots").mkdir(parents=True) + + monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: "abc123") + monkeypatch.setattr( + sp, + "enrich_click_from_screenshot", + lambda *args, **kwargs: { + "anchor_image_base64": "abc123", + "by_text": "", + "by_role": "", + "vlm_description": "positionnel", + }, + ) + monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None) + monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None) + monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None) + + events = [ + {"event": { + "type": "mouse_click", + "timestamp": 1.0, + "pos": [1329, 1265], + "button": "left", + "screenshot_id": "shot_006", + "screen_metadata": {"screen_resolution": [2560, 1600]}, + "window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"}, + "window_capture": { + "rect": [332, 522, 1613, 1323], + "click_relative": [997, 743], + "window_size": [1281, 801], + "click_inside_window": True, + }, + }}, + {"event": { + "type": "window_focus_change", + "timestamp": 1.2, + "from": {"title": "Enregistrer sous", "app_name": "Notepad.exe"}, + "to": {"title": "*test – Bloc-notes", "app_name": "Notepad.exe"}, + }}, + ] + + actions = sp.build_replay_from_raw_events( + events, + session_id="sess_save_as_button", + session_dir=str(session_dir), + ) + + clicks = [a for a in actions if a.get("type") == "click"] + assert len(clicks) == 1 + spec = clicks[0].get("target_spec", {}) + hints = spec.get("context_hints") or {} + assert spec.get("by_text") == "Enregistrer" + assert spec.get("by_text_source") == "heuristic" + assert spec.get("by_role") == "button" + assert spec.get("window_title") == "Enregistrer sous" + assert hints.get("interaction") == "save_dialog_primary_button" + assert hints.get("expected_after_window") == "*test – Bloc-notes" + assert clicks[0].get("expected_window_title") == "*test – Bloc-notes" + + def test_build_replay_cuts_post_save_out_of_window_click( + self, tmp_path, monkeypatch, + ): + """Le clic hors fenêtre après retour Bloc-notes est parasite. + + C'est l'ancienne action finale 17/18 : coordonnées en bas à droite, + ``click_inside_window=false``. Elle ne fait pas partie du coeur + "saisir et enregistrer". + """ + from agent_v0.server_v1 import stream_processor as sp + + session_dir = tmp_path / "sess" + (session_dir / "shots").mkdir(parents=True) + + monkeypatch.setattr(sp, "_load_crop_for_event", lambda *args, **kwargs: "abc123") + monkeypatch.setattr( + sp, + "enrich_click_from_screenshot", + lambda *args, **kwargs: {"anchor_image_base64": "abc123"}, + ) + monkeypatch.setattr(sp, "_attach_expected_screenshots", lambda *args, **kwargs: None) + monkeypatch.setattr(sp, "_enrich_actions_with_intentions", lambda *args, **kwargs: None) + monkeypatch.setattr(sp, "_unload_gemma4", lambda *args, **kwargs: None) + + events = [ + {"event": { + "type": "mouse_click", + "timestamp": 1.0, + "pos": [1329, 1265], + "button": "left", + "screenshot_id": "shot_006", + "screen_metadata": {"screen_resolution": [2560, 1600]}, + "window": {"title": "Enregistrer sous", "app_name": "Notepad.exe"}, + "window_capture": { + "rect": [332, 522, 1613, 1323], + "click_relative": [997, 743], + "window_size": [1281, 801], + "click_inside_window": True, + }, + }}, + {"event": { + "type": "window_focus_change", + "timestamp": 1.2, + "from": {"title": "Enregistrer sous", "app_name": "Notepad.exe"}, + "to": {"title": "*test – Bloc-notes", "app_name": "Notepad.exe"}, + }}, + {"event": { + "type": "mouse_click", + "timestamp": 1.5, + "pos": [2248, 1577], + "button": "left", + "screenshot_id": "shot_007", + "screen_metadata": {"screen_resolution": [2560, 1600]}, + "window": { + "title": "http192.168.1.408765dossier.htmlid=.txt – Bloc-notes", + "app_name": "Notepad.exe", + }, + "window_capture": { + "rect": [323, 522, 2243, 1638], + "click_relative": [1925, 1055], + "window_size": [1920, 1116], + "click_inside_window": False, + }, + }}, + ] + + actions = sp.build_replay_from_raw_events( + events, + session_id="sess_post_save_cut", + session_dir=str(session_dir), + ) + + clicks = [a for a in actions if a.get("type") == "click"] + assert len(clicks) == 1 + assert clicks[0].get("target_spec", {}).get("by_text") == "Enregistrer" + assert clicks[0].get("expected_window_title") == "*test – Bloc-notes" + # ========================================================================= # StreamWorker diff --git a/tests/unit/test_api_stream_auth_p0bc.py b/tests/unit/test_api_stream_auth_p0bc.py index ebbed43c1..98f3bf5f2 100644 --- a/tests/unit/test_api_stream_auth_p0bc.py +++ b/tests/unit/test_api_stream_auth_p0bc.py @@ -52,12 +52,12 @@ class TestImageEndpointNotPublic: mod = _reload_api_stream() assert "/health" in mod._PUBLIC_PATHS - def test_replay_next_still_public(self, monkeypatch): - """/replay/next reste public (legacy agent Rust polling).""" + def test_replay_next_removed_from_public_paths(self, monkeypatch): + """/replay/next distribue des actions et exige desormais un Bearer.""" monkeypatch.setenv("RPA_API_TOKEN", "deadbeef" * 4) monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False) mod = _reload_api_stream() - assert "/api/v1/traces/stream/replay/next" in mod._PUBLIC_PATHS + assert "/api/v1/traces/stream/replay/next" not in mod._PUBLIC_PATHS # --------------------------------------------------------------------------- @@ -157,6 +157,23 @@ class TestFailClosedTokenP0C: asyncio.get_event_loop().run_until_complete(mod._verify_token(req)) assert exc_info.value.status_code == 401 + def test_verify_token_rejects_replay_next_without_bearer(self, monkeypatch): + """P0 révocation : GET /replay/next n'est plus public.""" + import asyncio + from unittest.mock import MagicMock + from fastapi import HTTPException + + monkeypatch.setenv("RPA_API_TOKEN", "validtoken" * 4) + monkeypatch.delenv("RPA_AUTH_DISABLED", raising=False) + mod = _reload_api_stream() + + req = MagicMock() + req.url.path = "/api/v1/traces/stream/replay/next" + req.headers = {} + with pytest.raises(HTTPException) as exc_info: + asyncio.get_event_loop().run_until_complete(mod._verify_token(req)) + assert exc_info.value.status_code == 401 + @pytest.fixture(autouse=True) def _cleanup(monkeypatch): diff --git a/tests/unit/test_api_stream_revocation_gaps.py b/tests/unit/test_api_stream_revocation_gaps.py new file mode 100644 index 000000000..4193de544 --- /dev/null +++ b/tests/unit/test_api_stream_revocation_gaps.py @@ -0,0 +1,350 @@ +""" +Tests des gaps de revocation fleet sur agent_v0/server_v1/api_stream.py. + +Couvre : +1. test_result_guard_without_pending — le garde est appliqué sur /replay/result + meme sans _retry_pending (garde inconditionnel). +2. test_finalize_revoked_agent — enroll + revoke + finalize → 403 +3. test_finalize_unknown_machine_registered — registre avec agents → + machine_id inconnu → 403 +4. test_guard_default_machine_id_registered — registre avec agents → + machine_id="default" → 403 +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +ROOT = str(Path(__file__).resolve().parents[2]) +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +@pytest.fixture +def isolated_fleet_state(monkeypatch, tmp_path): + """Fixture qui isole le registre AgentRegistry et les structures replay.""" + monkeypatch.setenv("RPA_API_TOKEN", "test_revocation_gaps_token") + + from agent_v0.server_v1 import api_stream + from agent_v0.server_v1.agent_registry import AgentRegistry + + # Aligner le token attendu par le middleware + monkeypatch.setattr(api_stream, "API_TOKEN", "test_revocation_gaps_token") + + # Substituer le registre global par une instance dediee au test + original_registry = api_stream.agent_registry + test_registry = AgentRegistry(db_path=str(tmp_path / "test_agents.db")) + monkeypatch.setattr(api_stream, "agent_registry", test_registry) + + # Sauver et nettoyer les structures replay + saved_states = dict(api_stream._replay_states) + api_stream._replay_states.clear() + + _auth_headers = {"Authorization": "Bearer test_revocation_gaps_token"} + + yield api_stream, test_registry, _auth_headers + + # Restauration + api_stream._replay_states.clear() + api_stream._replay_states.update(saved_states) + monkeypatch.setattr(api_stream, "agent_registry", original_registry) + + +# --------------------------------------------------------------------------- +# Test 1 : /replay/result — le garde est appliqué sans _retry_pending +# --------------------------------------------------------------------------- + + +def test_result_guard_without_pending(isolated_fleet_state, monkeypatch): + """Le garde sur /replay/result s'applique meme sans entrée dans _retry_pending. + + Scénario : un agent enrolle et actif envoie un rapport de résultat pour + une action qui n'est pas dans _retry_pending (cas nominal, pas un retry). + Le garde doit être appelé et laisser passer car l'agent est actif. + + Si l'agent est révoqué, le même rapport doit être bloqué (403). + """ + api_stream, registry, auth_headers = isolated_fleet_state + + # Enroller un agent actif + registry.enroll( + machine_id="test-machine-result-guard", + user_name="Test User", + hostname="TEST-HOST", + ) + + # Forger une session avec machine_id pour que le garde puisse le résoudre + from agent_v0.server_v1.live_session_manager import LiveSessionState + session = LiveSessionState( + session_id="sess-result-guard", + machine_id="test-machine-result-guard", + ) + monkeypatch.setattr( + api_stream.processor.session_manager, + "get_session", + lambda sid: session if sid == "sess-result-guard" else None, + ) + + from fastapi.testclient import TestClient + client = TestClient(api_stream.app, raise_server_exceptions=False) + + # Rapport sans _retry_pending — le garde doit quand meme s'appliquer + # et laisser passer car l'agent est actif + resp = client.post( + "/api/v1/traces/stream/replay/result", + json={ + "session_id": "sess-result-guard", + "action_id": "act-no-retry", + "success": True, + }, + headers=auth_headers, + ) + # Ne doit PAS etre 403 (agent actif) + assert resp.status_code != 403, ( + f"/replay/result a été bloqué (403) pour un agent actif sans retry. " + f"Body: {resp.text}" + ) + + # Maintenant révoquer l'agent + registry.uninstall( + machine_id="test-machine-result-guard", + reason="admin_revoke", + ) + + # Le meme rapport doit maintenant être bloqué + resp = client.post( + "/api/v1/traces/stream/replay/result", + json={ + "session_id": "sess-result-guard", + "action_id": "act-no-retry-2", + "success": True, + }, + headers=auth_headers, + ) + assert resp.status_code == 403, ( + f"/replay/result DOIT être bloqué (403) pour un agent révoqué. " + f"Body: {resp.text}" + ) + + +# --------------------------------------------------------------------------- +# Test 2 : /finalize — enroll + revoke + finalize → 403 +# --------------------------------------------------------------------------- + + +def test_finalize_revoked_agent(isolated_fleet_state, monkeypatch): + """Un agent enrolle puis révoqué doit être bloqué sur /finalize.""" + api_stream, registry, auth_headers = isolated_fleet_state + + # Enroller un agent + registry.enroll( + machine_id="test-machine-revoked-finalize", + user_name="Test User", + hostname="TEST-HOST", + ) + + # Le révoquer + registry.uninstall( + machine_id="test-machine-revoked-finalize", + reason="admin_revoke", + ) + + # Forger une session pour que le finalize trouve la session + from agent_v0.server_v1.live_session_manager import LiveSessionState + session = LiveSessionState( + session_id="sess-revoked-finalize", + machine_id="test-machine-revoked-finalize", + ) + monkeypatch.setattr( + api_stream.processor.session_manager, + "get_session", + lambda sid: session if sid == "sess-revoked-finalize" else None, + ) + # finalize() appelle aussi finalize() sur le session_manager — mock pour éviter I/O + monkeypatch.setattr( + api_stream.processor.session_manager, + "finalize", + lambda sid: None, + ) + monkeypatch.setattr(api_stream.processor, "_find_session_dir", lambda sid: None) + + from fastapi.testclient import TestClient + client = TestClient(api_stream.app, raise_server_exceptions=False) + + resp = client.post( + "/api/v1/traces/stream/finalize", + params={"session_id": "sess-revoked-finalize"}, + headers=auth_headers, + ) + assert resp.status_code == 403, ( + f"/finalize DOIT être bloqué (403) pour un agent révoqué. " + f"Body: {resp.text}" + ) + + +# --------------------------------------------------------------------------- +# Test 3 : /finalize — machine_id inconnu avec registre non vide → 403 +# --------------------------------------------------------------------------- + + +def test_finalize_unknown_machine_registered(isolated_fleet_state, monkeypatch): + """Quand le registre contient au moins un agent, un machine_id inconnu → 403.""" + api_stream, registry, auth_headers = isolated_fleet_state + + # Enroller un agent (le registre n'est donc pas vide) + registry.enroll( + machine_id="known-machine-xyz", + user_name="Known User", + hostname="KNOWN-HOST", + ) + + # Forger une session avec un machine_id inconnu du registre + from agent_v0.server_v1.live_session_manager import LiveSessionState + session = LiveSessionState( + session_id="sess-unknown-finalize", + machine_id="unknown-machine-abc", # Pas dans le registre + ) + monkeypatch.setattr( + api_stream.processor.session_manager, + "get_session", + lambda sid: session if sid == "sess-unknown-finalize" else None, + ) + monkeypatch.setattr( + api_stream.processor.session_manager, + "finalize", + lambda sid: None, + ) + monkeypatch.setattr(api_stream.processor, "_find_session_dir", lambda sid: None) + + from fastapi.testclient import TestClient + client = TestClient(api_stream.app, raise_server_exceptions=False) + + resp = client.post( + "/api/v1/traces/stream/finalize", + params={"session_id": "sess-unknown-finalize"}, + headers=auth_headers, + ) + assert resp.status_code == 403, ( + f"/finalize DOIT être bloqué (403) pour un machine_id inconnu " + f"quand le registre contient des agents. Body: {resp.text}" + ) + body = resp.json() + assert body.get("detail", {}).get("error") == "agent_unknown", ( + f"Erreur attendue 'agent_unknown', obtenu: {body}" + ) + + +# --------------------------------------------------------------------------- +# Test 4 : _guard_agent_registry_access — machine_id="default" avec registre → 403 +# --------------------------------------------------------------------------- + + +def test_guard_default_machine_id_registered(isolated_fleet_state): + """Quand le registre contient des agents, machine_id='default' → 403.""" + api_stream, registry, _auth_headers = isolated_fleet_state + + # Enroller un agent (le registre n'est donc pas vide) + registry.enroll( + machine_id="some-enrolled-agent", + user_name="Enrolled User", + hostname="ENROLLED-HOST", + ) + + from fastapi import HTTPException + + # Appel direct du garde avec machine_id="default" + with pytest.raises(HTTPException) as exc_info: + api_stream._guard_agent_registry_access( + "default", + endpoint="/api/v1/traces/stream/finalize", + ) + assert exc_info.value.status_code == 403 + detail = exc_info.value.detail + assert detail.get("error") == "agent_enrollment_required", ( + f"Erreur attendue 'agent_enrollment_required', obtenu: {detail}" + ) + + # Idem avec machine_id="" (vide) + with pytest.raises(HTTPException) as exc_info: + api_stream._guard_agent_registry_access( + "", + endpoint="/api/v1/traces/stream/event", + ) + assert exc_info.value.status_code == 403 + assert exc_info.value.detail.get("error") == "agent_enrollment_required" + + # Idem avec machine_id=None + with pytest.raises(HTTPException) as exc_info: + api_stream._guard_agent_registry_access( + None, + endpoint="/api/v1/traces/stream/event", + ) + assert exc_info.value.status_code == 403 + assert exc_info.value.detail.get("error") == "agent_enrollment_required" + + +# --------------------------------------------------------------------------- +# Test 5 : /replay-session — enroll + revoke → 403 +# --------------------------------------------------------------------------- + + +def test_replay_session_revoked_agent(isolated_fleet_state, monkeypatch): + """Un agent révoqué ne doit pas pouvoir lancer un replay-session.""" + api_stream, registry, auth_headers = isolated_fleet_state + + registry.enroll( + machine_id="test-machine-replay-session", + user_name="Test User", + hostname="TEST-HOST", + ) + + registry.uninstall( + machine_id="test-machine-replay-session", + reason="admin_revoke", + ) + + from fastapi.testclient import TestClient + client = TestClient(api_stream.app, raise_server_exceptions=False) + + resp = client.post( + "/api/v1/traces/stream/replay-session", + params={ + "session_id": "sess-some-replay", + "machine_id": "test-machine-replay-session", + }, + headers=auth_headers, + ) + assert resp.status_code == 403, ( + f"/replay-session DOIT être bloqué (403) pour un agent révoqué. " + f"Body: {resp.text}" + ) + + +def test_replay_session_unknown_machine_registered(isolated_fleet_state): + """Quand le registre contient des agents, machine_id inconnu sur replay-session → 403.""" + api_stream, registry, auth_headers = isolated_fleet_state + + registry.enroll( + machine_id="known-machine-replay", + user_name="Known User", + hostname="KNOWN-HOST", + ) + + from fastapi.testclient import TestClient + client = TestClient(api_stream.app, raise_server_exceptions=False) + + resp = client.post( + "/api/v1/traces/stream/replay-session", + params={ + "session_id": "sess-some-replay", + "machine_id": "unknown-machine-replay", + }, + headers=auth_headers, + ) + assert resp.status_code == 403 + assert resp.json().get("detail", {}).get("error") == "agent_unknown" diff --git a/tests/unit/test_safety_checks_provider.py b/tests/unit/test_safety_checks_provider.py index e87a57274..3905851e8 100644 --- a/tests/unit/test_safety_checks_provider.py +++ b/tests/unit/test_safety_checks_provider.py @@ -26,6 +26,15 @@ def test_only_declarative_when_no_safety_level(): assert payload.checks[0]["source"] == "declarative" +def test_default_pause_message_is_structured_not_validation_required(): + """Fallback humain: jamais 'Validation requise' seul.""" + payload = build_pause_payload({"type": "pause_for_human", "parameters": {}}, {}, last_screenshot=None) + lines = payload.message.splitlines() + assert len(lines) == 4 + assert lines[0].startswith("J'essaie de :") + assert "Validation requise" not in payload.message + + def test_hybrid_appends_llm_checks_on_medical_critical(monkeypatch): """safety_level=medical_critical → LLM appelé, checks concaténés.""" decl = [{"id": "c1", "label": "Vérifier IPP", "required": True}]