Files
rpa_vision_v3/agent_v0/server_v1/safety_checks_provider.py

271 lines
9.2 KiB
Python

# agent_v0/server_v1/safety_checks_provider.py
"""SafetyChecksProvider — checks hybrides déclaratifs + LLM contextuels (QW4).
Pour une action pause_for_human :
- les checks déclaratifs (workflow) sont toujours inclus
- si safety_level == "medical_critical" et RPA_SAFETY_CHECKS_LLM_ENABLED=1,
un appel LLM (medgemma:4b par défaut) ajoute jusqu'à N checks contextuels
Tout échec côté LLM (timeout, exception, parse) → additional_checks=[] :
le replay continue avec uniquement les déclaratifs (fallback safe).
"""
import base64
import json
import logging
import os
import uuid
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
try:
from agent_v0.agent_v1.ui.message_contract import (
coerce_supervised_pause_message,
warn_visible_message,
)
except Exception: # pragma: no cover - fallback for partial server deployments
coerce_supervised_pause_message = None
warn_visible_message = None
@dataclass
class PausePayload:
checks: List[Dict[str, Any]] = field(default_factory=list)
pause_reason: str = ""
message: str = ""
def _env(name: str, default: str) -> str:
return os.environ.get(name, default).strip()
def _env_int(name: str, default: int) -> int:
try:
return int(os.environ.get(name, default))
except (TypeError, ValueError):
return default
def _env_bool_enabled(name: str) -> bool:
val = os.environ.get(name, "1").strip().lower()
return val not in ("0", "false", "no", "off", "")
def build_pause_payload(
action: Dict[str, Any],
replay_state: Dict[str, Any],
last_screenshot: Optional[str],
) -> PausePayload:
"""Construit le payload de pause enrichi pour une action pause_for_human."""
params = dict(action.get("parameters") or {})
for key in ("message", "safety_level", "safety_checks", "pause_reason"):
if key not in params or params.get(key) in (None, "", []):
if action.get(key) not in (None, "", []):
params[key] = action.get(key)
raw_message = (
params.get("message")
or action.get("message")
or action.get("intention")
or ""
)
message = _coerce_pause_message(
raw_message,
intention=params.get("intention") or action.get("intention") or action.get("description"),
attendu=params.get("attendu") or params.get("expected") or action.get("expected"),
vu=params.get("vu") or params.get("observed") or action.get("observed"),
demande=params.get("demande") or params.get("request"),
)
safety_level = params.get("safety_level")
declarative = params.get("safety_checks") or []
# Normalisation des checks déclaratifs
checks: List[Dict[str, Any]] = []
for d in declarative:
checks.append({
"id": d.get("id") or f"decl_{uuid.uuid4().hex[:6]}",
"label": d.get("label", "Validation"),
"required": bool(d.get("required", True)),
"source": "declarative",
"evidence": None,
})
# Ajout LLM contextual si applicable
if safety_level == "medical_critical" and _env_bool_enabled("RPA_SAFETY_CHECKS_LLM_ENABLED"):
try:
additional = _call_llm_for_contextual_checks(
action=action,
replay_state=replay_state,
last_screenshot=last_screenshot,
existing_labels=[c["label"] for c in checks],
)
except Exception as e:
logger.warning("[BUS] lea:safety_checks_llm_failed reason=exception detail=%s", e)
additional = []
for a in additional:
checks.append({
"id": f"llm_{uuid.uuid4().hex[:6]}",
"label": a.get("label", ""),
"required": False, # checks LLM = informationnels, pas obligatoires V1
"source": "llm_contextual",
"evidence": a.get("evidence", ""),
})
return PausePayload(
checks=checks,
pause_reason=params.get("pause_reason", ""),
message=message,
)
def _coerce_pause_message(
message: Any = "",
*,
intention: Any = "",
attendu: Any = "",
vu: Any = "",
demande: Any = "",
) -> str:
if warn_visible_message is not None:
warn_visible_message(
message,
source="safety_checks_provider._coerce_pause_message.raw",
supervised_pause=False,
)
if coerce_supervised_pause_message is not None:
result = coerce_supervised_pause_message(
message,
intention=intention,
attendu=attendu,
vu=vu,
demande=demande,
)
if warn_visible_message is not None:
warn_visible_message(
result,
source="safety_checks_provider._coerce_pause_message.final",
supervised_pause=True,
)
return result
fallback_request = "indiquer si je peux continuer ou corriger l'action attendue"
result = "\n".join(
(
f"J'essaie de : {intention or 'continuer une etape supervisee'}",
f"J'attendais : {attendu or 'un accord humain clair avant de continuer'}",
f"Je vois : {vu or 'je suis sur une etape qui demande une verification humaine'}",
f"Peux-tu : {demande or message or fallback_request}",
)
)
if warn_visible_message is not None:
warn_visible_message(
result,
source="safety_checks_provider._coerce_pause_message.final_fallback",
supervised_pause=True,
)
return result
def _call_llm_for_contextual_checks(
action: Dict[str, Any],
replay_state: Dict[str, Any],
last_screenshot: Optional[str],
existing_labels: List[str],
) -> List[Dict[str, str]]:
"""Appelle Ollama en mode JSON strict pour générer 0-N checks contextuels.
Returns:
List[{label, evidence}] (max RPA_SAFETY_CHECKS_LLM_MAX_CHECKS).
[] sur tout échec (timeout, JSON invalide, exception).
"""
import requests
# Défaut gemma4:latest : meilleur compromis détection/latence sur bench
# 2026-05-06 (cf. docs/BENCH_SAFETY_CHECKS_2026-05-06.md). medgemma:4b
# retournait systématiquement [] (refus de signaler).
model = _env("RPA_SAFETY_CHECKS_LLM_MODEL", "gemma4:latest")
# Timeout 7s : warm avg gemma4 = 2.9s + marge 4s. Cold start ~10s couvert
# si le modèle reste résident (OLLAMA_KEEP_ALIVE=24h recommandé prod).
timeout_s = _env_int("RPA_SAFETY_CHECKS_LLM_TIMEOUT_S", 7)
max_checks = _env_int("RPA_SAFETY_CHECKS_LLM_MAX_CHECKS", 3)
ollama_url = _env("OLLAMA_URL", "http://localhost:11434")
params = action.get("parameters") or {}
workflow_message = params.get("message", "")
existing = ", ".join(existing_labels) if existing_labels else "aucun"
prompt = f"""Tu es Léa, assistante médicale supervisée.
Avant de continuer le workflow, tu dois lister 0 à {max_checks} vérifications supplémentaires
que l'humain doit acquitter, en regardant l'écran actuel.
Contexte workflow : {workflow_message}
Checks déjà demandés : {existing}
NE répète PAS un check déjà demandé.
Si rien d'inhabituel à signaler, retourne {{"additional_checks": []}}.
Réponds UNIQUEMENT en JSON :
{{
"additional_checks": [
{{"label": "string court", "evidence": "ce que tu as vu d'inhabituel"}}
]
}}
"""
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"format": "json",
"options": {"temperature": 0.1, "num_predict": 200},
}
if last_screenshot and os.path.isfile(last_screenshot):
try:
with open(last_screenshot, "rb") as f:
payload["images"] = [base64.b64encode(f.read()).decode("ascii")]
except Exception as e:
logger.debug("safety_checks: lecture screenshot échouée (%s) — appel sans image", e)
try:
response = requests.post(
f"{ollama_url}/api/generate",
json=payload,
timeout=timeout_s,
)
if response.status_code != 200:
logger.warning("[BUS] lea:safety_checks_llm_failed reason=http_status detail=%s", response.status_code)
return []
text = response.json().get("response", "").strip()
except requests.Timeout:
logger.warning("[BUS] lea:safety_checks_llm_failed reason=timeout detail=%ss", timeout_s)
return []
except Exception as e:
logger.warning("[BUS] lea:safety_checks_llm_failed reason=network detail=%s", e)
return []
# format=json garantit normalement du JSON valide
try:
parsed = json.loads(text)
except json.JSONDecodeError as e:
logger.warning("[BUS] lea:safety_checks_llm_failed reason=json_decode detail=%s", e)
return []
additional = parsed.get("additional_checks") or []
if not isinstance(additional, list):
return []
# Filtre + tronc
valid = []
for item in additional[:max_checks]:
if isinstance(item, dict) and item.get("label"):
valid.append({
"label": str(item["label"])[:200],
"evidence": str(item.get("evidence", ""))[:300],
})
return valid