# agent_v0/server_v1/safety_checks_provider.py """SafetyChecksProvider — checks hybrides déclaratifs + LLM contextuels (QW4). Pour une action pause_for_human : - les checks déclaratifs (workflow) sont toujours inclus - si safety_level == "medical_critical" et RPA_SAFETY_CHECKS_LLM_ENABLED=1, un appel LLM (medgemma:4b par défaut) ajoute jusqu'à N checks contextuels Tout échec côté LLM (timeout, exception, parse) → additional_checks=[] : le replay continue avec uniquement les déclaratifs (fallback safe). """ import base64 import io import json import logging import os import uuid from dataclasses import dataclass, field from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) @dataclass class PausePayload: checks: List[Dict[str, Any]] = field(default_factory=list) pause_reason: str = "" message: str = "" def _env(name: str, default: str) -> str: return os.environ.get(name, default).strip() def _env_int(name: str, default: int) -> int: try: return int(os.environ.get(name, default)) except (TypeError, ValueError): return default def _env_bool_enabled(name: str) -> bool: val = os.environ.get(name, "1").strip().lower() return val not in ("0", "false", "no", "off", "") def build_pause_payload( action: Dict[str, Any], replay_state: Dict[str, Any], last_screenshot: Optional[str], ) -> PausePayload: """Construit le payload de pause enrichi pour une action pause_for_human.""" params = action.get("parameters") or {} message = params.get("message", "Validation requise") safety_level = params.get("safety_level") declarative = params.get("safety_checks") or [] # Normalisation des checks déclaratifs checks: List[Dict[str, Any]] = [] for d in declarative: checks.append({ "id": d.get("id") or f"decl_{uuid.uuid4().hex[:6]}", "label": d.get("label", "Validation"), "required": bool(d.get("required", True)), "source": "declarative", "evidence": None, }) # Ajout LLM contextual si applicable if safety_level == "medical_critical" and _env_bool_enabled("RPA_SAFETY_CHECKS_LLM_ENABLED"): try: additional = _call_llm_for_contextual_checks( action=action, replay_state=replay_state, last_screenshot=last_screenshot, existing_labels=[c["label"] for c in checks], ) except Exception as e: logger.warning("safety_checks LLM exception (%s) — fallback safe", e) additional = [] for a in additional: checks.append({ "id": f"llm_{uuid.uuid4().hex[:6]}", "label": a.get("label", ""), "required": False, # checks LLM = informationnels, pas obligatoires V1 "source": "llm_contextual", "evidence": a.get("evidence", ""), }) return PausePayload( checks=checks, pause_reason="", message=message, ) def _call_llm_for_contextual_checks( action: Dict[str, Any], replay_state: Dict[str, Any], last_screenshot: Optional[str], existing_labels: List[str], ) -> List[Dict[str, str]]: """Appelle Ollama en mode JSON strict pour générer 0-N checks contextuels. Returns: List[{label, evidence}] (max RPA_SAFETY_CHECKS_LLM_MAX_CHECKS). [] sur tout échec (timeout, JSON invalide, exception). """ import requests model = _env("RPA_SAFETY_CHECKS_LLM_MODEL", "medgemma:4b") timeout_s = _env_int("RPA_SAFETY_CHECKS_LLM_TIMEOUT_S", 5) max_checks = _env_int("RPA_SAFETY_CHECKS_LLM_MAX_CHECKS", 3) ollama_url = _env("OLLAMA_URL", "http://localhost:11434") params = action.get("parameters") or {} workflow_message = params.get("message", "") existing = ", ".join(existing_labels) if existing_labels else "aucun" prompt = f"""Tu es Léa, assistante médicale supervisée. Avant de continuer le workflow, tu dois lister 0 à {max_checks} vérifications supplémentaires que l'humain doit acquitter, en regardant l'écran actuel. Contexte workflow : {workflow_message} Checks déjà demandés : {existing} NE répète PAS un check déjà demandé. Si rien d'inhabituel à signaler, retourne {{"additional_checks": []}}. Réponds UNIQUEMENT en JSON : {{ "additional_checks": [ {{"label": "string court", "evidence": "ce que tu as vu d'inhabituel"}} ] }} """ payload = { "model": model, "prompt": prompt, "stream": False, "format": "json", "options": {"temperature": 0.1, "num_predict": 200}, } if last_screenshot and os.path.isfile(last_screenshot): try: with open(last_screenshot, "rb") as f: payload["images"] = [base64.b64encode(f.read()).decode("ascii")] except Exception as e: logger.debug("safety_checks: lecture screenshot échouée (%s) — appel sans image", e) try: response = requests.post( f"{ollama_url}/api/generate", json=payload, timeout=timeout_s, ) if response.status_code != 200: logger.warning("safety_checks LLM HTTP %s", response.status_code) return [] text = response.json().get("response", "").strip() except requests.Timeout: logger.warning("safety_checks LLM timeout (%ss)", timeout_s) return [] except Exception as e: logger.warning("safety_checks LLM erreur réseau: %s", e) return [] # format=json garantit normalement du JSON valide try: parsed = json.loads(text) except json.JSONDecodeError as e: logger.warning("safety_checks LLM JSON invalide (%s) — fallback safe", e) return [] additional = parsed.get("additional_checks") or [] if not isinstance(additional, list): return [] # Filtre + tronc valid = [] for item in additional[:max_checks]: if isinstance(item, dict) and item.get("label"): valid.append({ "label": str(item["label"])[:200], "evidence": str(item.get("evidence", ""))[:300], }) return valid