diff --git a/agent_v0/server_v1/api_stream.py b/agent_v0/server_v1/api_stream.py index 68a14233b..0f5d3a19e 100644 --- a/agent_v0/server_v1/api_stream.py +++ b/agent_v0/server_v1/api_stream.py @@ -2928,8 +2928,54 @@ async def get_next_action(session_id: str, machine_id: str = "default"): type_ = action.get("type") - # pause_for_human : no-op en mode autonome — on saute et on continue + # pause_for_human : pause supervisée si safety_level/safety_checks ou mode supervised, + # sinon no-op en mode autonome (skip). if type_ == "pause_for_human": + _params = action.get("parameters") or {} + _exec_mode = ( + (owning_replay or {}).get("params", {}).get("execution_mode", "autonomous") + if owning_replay else "autonomous" + ) + _has_safety_decl = bool(_params.get("safety_level") or _params.get("safety_checks")) + _is_supervised = _exec_mode != "autonomous" + + if owning_replay is not None and (_has_safety_decl or _is_supervised): + # QW4 — Construire le payload de pause enrichi (déclaratif + LLM contextuel) + try: + from agent_v0.server_v1.safety_checks_provider import build_pause_payload + last_screenshot_path = owning_replay.get("last_screenshot") + payload = build_pause_payload(action, owning_replay, last_screenshot_path) + owning_replay["safety_checks"] = payload.checks + owning_replay["pause_payload"] = { + "checks": payload.checks, + "pause_reason": payload.pause_reason, + "message": payload.message, + } + if payload.message: + owning_replay["pause_message"] = payload.message + # Bus event d'observabilité (pattern QW1/QW2 = logger.info) + logger.info( + "[BUS] lea:safety_checks_generated replay=%s count=%d sources=%s", + owning_replay.get("replay_id", "?"), + len(payload.checks), + [c["source"] for c in payload.checks], + ) + except Exception as e: + logger.warning("QW4 build_pause_payload échec (%s) — pause sans checks", e) + owning_replay["safety_checks"] = [] + + # Conserver le contexte de l'action (audit + reprise) + owning_replay["failed_action"] = { + "action_id": action.get("action_id"), + "type": "pause_for_human", + "reason": "user_request", + } + owning_replay["status"] = "paused_need_help" + queue.pop(0) + _replay_queues[session_id] = queue + return {"action": None, "session_id": session_id, "machine_id": machine_id} + + # Mode autonome sans safety_checks → skip (comportement legacy) logger.info( "pause_for_human ignorée (mode autonome) — replay %s continue", owning_replay["replay_id"] if owning_replay else "?" @@ -4104,8 +4150,16 @@ async def list_replays(): } +class ReplayResumeRequest(BaseModel): + """Body optionnel pour /replay/resume — QW4 acquittement de safety_checks.""" + acknowledged_check_ids: List[str] = [] + + @app.post("/api/v1/traces/stream/replay/{replay_id}/resume") -async def resume_replay(replay_id: str): +async def resume_replay( + replay_id: str, + payload: Optional[ReplayResumeRequest] = None, +): """Reprendre un replay en pause supervisee (paused_need_help). L'utilisateur a intervenu manuellement (naviguer vers le bon ecran, @@ -4113,6 +4167,10 @@ async def resume_replay(replay_id: str): est reinjectee en tete de queue pour etre re-tentee. Si le replay n'est pas en pause, retourne une erreur 409 (conflit). + + QW4 — Si des safety_checks sont attachés à la pause, tous ceux marqués + `required` doivent figurer dans `acknowledged_check_ids`. Sinon → 400 + avec `{"error": "required_checks_missing", "missing": [...]}`. """ with _replay_lock: state = _replay_states.get(replay_id) @@ -4131,6 +4189,25 @@ async def resume_replay(replay_id: str): ), ) + # QW4 — Vérification des safety_checks required avant reprise + safety_checks = state.get("safety_checks") or [] + ack_ids = (payload.acknowledged_check_ids if payload else []) or [] + if safety_checks: + required_ids = {c["id"] for c in safety_checks if c.get("required")} + ack_set = set(ack_ids) + missing = sorted(required_ids - ack_set) + if missing: + raise HTTPException( + status_code=400, + detail={"error": "required_checks_missing", "missing": missing}, + ) + # Audit trail + state["checks_acknowledged"] = sorted(ack_set) + logger.info( + "QW4 resume replay=%s acquittements=%d (%s)", + state.get("replay_id"), len(ack_set), sorted(ack_set), + ) + # Recuperer l'action echouee pour la reinjecter failed_action = state.get("failed_action") session_id = state["session_id"] @@ -4139,6 +4216,10 @@ async def resume_replay(replay_id: str): state["status"] = "running" state["failed_action"] = None state["pause_message"] = None + # QW4 — vider safety_checks après acquittement (la pause est résolue) + state["safety_checks"] = [] + state["pause_payload"] = None + state["pause_reason"] = "" # Reinjecter l'action echouee en tete de queue (sera re-tentee) # pause_for_human est une pause intentionnelle, pas une erreur — ne pas réinjecter diff --git a/agent_v0/server_v1/replay_engine.py b/agent_v0/server_v1/replay_engine.py index 37eb755f6..583ae3078 100644 --- a/agent_v0/server_v1/replay_engine.py +++ b/agent_v0/server_v1/replay_engine.py @@ -1384,6 +1384,11 @@ def _create_replay_state( # QW2 — Anneaux d'historique pour LoopDetector (5 derniers max) "_screenshot_history": [], # images PIL des N derniers heartbeats (LoopDetector embed à chaque tick) "_action_history": [], # N dernières actions exécutées (signature) + # QW4 — Safety checks (hybride déclaratif + LLM contextuel) et audit acquittements + "safety_checks": [], # liste produite par SafetyChecksProvider + "checks_acknowledged": [], # ids acquittés via /replay/resume (audit trail) + "pause_reason": "", # "loop_detected" | "" pour V1 + "pause_payload": None, # payload complet pour debug/audit } diff --git a/tests/integration/test_replay_resume_acknowledgments.py b/tests/integration/test_replay_resume_acknowledgments.py new file mode 100644 index 000000000..dfe93e16b --- /dev/null +++ b/tests/integration/test_replay_resume_acknowledgments.py @@ -0,0 +1,52 @@ +# tests/integration/test_replay_resume_acknowledgments.py +"""Tests intégration : /replay/resume valide les acquittements de safety_checks (QW4).""" +import pytest + + +def test_resume_accepts_when_all_required_acknowledged(): + """État pause + tous required acquittés → reprise OK.""" + state = { + "status": "paused_need_help", + "safety_checks": [ + {"id": "c1", "label": "X", "required": True, "source": "declarative", "evidence": None}, + {"id": "c2", "label": "Y", "required": True, "source": "declarative", "evidence": None}, + ], + "checks_acknowledged": [], + } + # Simuler la validation côté serveur + acknowledged = ["c1", "c2"] + required_ids = {c["id"] for c in state["safety_checks"] if c["required"]} + missing = required_ids - set(acknowledged) + assert missing == set() # rien ne manque → reprise OK + + +def test_resume_rejects_when_required_missing(): + """État pause + un required non acquitté → 400 required_checks_missing.""" + state = { + "status": "paused_need_help", + "safety_checks": [ + {"id": "c1", "label": "X", "required": True, "source": "declarative", "evidence": None}, + {"id": "c2", "label": "Y", "required": False, "source": "llm_contextual", "evidence": "..."}, + ], + "checks_acknowledged": [], + } + acknowledged = ["c2"] # only optional + required_ids = {c["id"] for c in state["safety_checks"] if c["required"]} + missing = required_ids - set(acknowledged) + assert missing == {"c1"} # c1 manquant → resume doit retourner 400 + + +def test_resume_audit_trail_stored(): + """checks_acknowledged contient les ids reçus (audit).""" + state = { + "status": "paused_need_help", + "safety_checks": [ + {"id": "c1", "required": True, "label": "X", "source": "declarative", "evidence": None}, + ], + "checks_acknowledged": [], + } + acknowledged = ["c1"] + state["checks_acknowledged"] = acknowledged + state["status"] = "running" + assert state["checks_acknowledged"] == ["c1"] + assert state["status"] == "running" diff --git a/visual_workflow_builder/backend/api_v3/dag_execute.py b/visual_workflow_builder/backend/api_v3/dag_execute.py index 3feb44d9e..9d73cb204 100644 --- a/visual_workflow_builder/backend/api_v3/dag_execute.py +++ b/visual_workflow_builder/backend/api_v3/dag_execute.py @@ -1113,3 +1113,45 @@ def execute_windows(): return jsonify({'error': 'Streaming server (port 5005) non disponible'}), 503 except Exception as e: return jsonify({'error': str(e)}), 500 + + +# --------------------------------------------------------------------------- +# QW4 — Proxy /api/v3/replay/resume → streaming /replay/{id}/resume +# Forward Bearer token + body { replay_id, acknowledged_check_ids }. +# Le frontend (PauseDialog) appelle /api/v3/replay/resume via le VWB ; +# on relaye au streaming server pour valider les acquittements safety_checks. +# --------------------------------------------------------------------------- +@api_v3_bp.route('/replay/resume', methods=['POST']) +def replay_resume_proxy(): + """Proxy QW4 vers le serveur streaming pour la reprise avec safety_checks.""" + import requests as req + + data = request.get_json() or {} + replay_id = data.get('replay_id') + if not replay_id: + return jsonify({'error': 'replay_id manquant'}), 400 + + streaming_url = os.environ.get('RPA_STREAMING_URL', 'http://localhost:5005') + token = os.environ.get('RPA_API_TOKEN', '') + headers = {'Content-Type': 'application/json'} + if token: + headers['Authorization'] = f'Bearer {token}' + + # Body forwardé : uniquement acknowledged_check_ids (replay_id est dans l'URL) + forward_body = { + 'acknowledged_check_ids': data.get('acknowledged_check_ids') or [], + } + + try: + resp = req.post( + f'{streaming_url}/api/v1/traces/stream/replay/{replay_id}/resume', + json=forward_body, + headers=headers, + timeout=10, + ) + return resp.content, resp.status_code, {'Content-Type': 'application/json'} + except req.ConnectionError: + return jsonify({'error': 'streaming_unreachable', + 'detail': f'Streaming server non disponible ({streaming_url})'}), 502 + except req.RequestException as e: + return jsonify({'error': 'streaming_unreachable', 'detail': str(e)}), 502