#!/usr/bin/env python3 """ RPA Vision V3 - ConfirmationLoop Système de confirmation avant les actions critiques. Ce module gère la validation utilisateur pour les actions sensibles : - Exécution de workflows - Actions destructives ou irréversibles - Actions sur des données sensibles Auteur: Dom - Janvier 2026 """ import logging from dataclasses import dataclass, field from enum import Enum from typing import Dict, Any, List, Optional, Callable from datetime import datetime, timedelta logger = logging.getLogger(__name__) class RiskLevel(Enum): """Niveaux de risque des actions.""" LOW = "low" # Pas de confirmation requise MEDIUM = "medium" # Confirmation simple HIGH = "high" # Confirmation détaillée CRITICAL = "critical" # Double confirmation class ConfirmationStatus(Enum): """Statuts de confirmation.""" PENDING = "pending" CONFIRMED = "confirmed" DENIED = "denied" EXPIRED = "expired" MODIFIED = "modified" # Confirmé avec modifications @dataclass class PendingConfirmation: """Une confirmation en attente.""" id: str action_type: str workflow_name: str parameters: Dict[str, Any] risk_level: RiskLevel created_at: datetime expires_at: datetime status: ConfirmationStatus = ConfirmationStatus.PENDING confirmation_message: str = "" details: List[str] = field(default_factory=list) modified_parameters: Optional[Dict[str, Any]] = None def is_expired(self) -> bool: return datetime.now() > self.expires_at def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "action_type": self.action_type, "workflow_name": self.workflow_name, "parameters": self.parameters, "risk_level": self.risk_level.value, "status": self.status.value, "confirmation_message": self.confirmation_message, "details": self.details, "expires_in_seconds": max(0, (self.expires_at - datetime.now()).total_seconds()), "modified_parameters": self.modified_parameters } class ConfirmationLoop: """ Gestionnaire de confirmations pour l'agent conversationnel. Gère le flux de confirmation pour les actions sensibles : 1. Analyse du risque de l'action 2. Génération du message de confirmation 3. Attente de la réponse utilisateur 4. Validation et exécution """ # Configuration par niveau de risque RISK_CONFIG = { RiskLevel.LOW: { "requires_confirmation": False, "timeout_seconds": 0, "message_template": "Exécution de {workflow_name}..." }, RiskLevel.MEDIUM: { "requires_confirmation": True, "timeout_seconds": 60, "message_template": "Voulez-vous exécuter '{workflow_name}' ?" }, RiskLevel.HIGH: { "requires_confirmation": True, "timeout_seconds": 120, "message_template": "⚠️ Action importante : '{workflow_name}'\n\nParamètres :\n{params_list}\n\nConfirmez-vous cette action ?" }, RiskLevel.CRITICAL: { "requires_confirmation": True, "timeout_seconds": 180, "message_template": "🚨 ACTION CRITIQUE : '{workflow_name}'\n\n{details}\n\nCette action est irréversible. Tapez 'confirmer' pour continuer." } } # Mots-clés associés aux niveaux de risque RISK_KEYWORDS = { RiskLevel.CRITICAL: [ "supprimer", "delete", "effacer", "purger", "reset", "formater", "production", "live", "client", "facturation", "paiement" ], RiskLevel.HIGH: [ "modifier", "update", "éditer", "changer", "migrer", "transfert", "export", "envoyer", "publier", "synchroniser" ], RiskLevel.MEDIUM: [ "créer", "create", "générer", "ajouter", "nouveau", "copier" ] } def __init__(self, default_timeout: int = 60): """ Initialiser le gestionnaire de confirmations. Args: default_timeout: Timeout par défaut en secondes """ self.default_timeout = default_timeout self.pending_confirmations: Dict[str, PendingConfirmation] = {} self._confirmation_counter = 0 def evaluate_risk( self, workflow_name: str, parameters: Dict[str, Any], action_type: str = "execute" ) -> RiskLevel: """ Évaluer le niveau de risque d'une action. Args: workflow_name: Nom du workflow parameters: Paramètres de l'action action_type: Type d'action Returns: RiskLevel correspondant """ text_to_analyze = f"{workflow_name} {action_type} {str(parameters)}".lower() # Vérifier les mots-clés par ordre de criticité for risk_level in [RiskLevel.CRITICAL, RiskLevel.HIGH, RiskLevel.MEDIUM]: for keyword in self.RISK_KEYWORDS.get(risk_level, []): if keyword in text_to_analyze: logger.debug(f"Risk {risk_level.value} detected by keyword: {keyword}") return risk_level return RiskLevel.LOW def create_confirmation_request( self, workflow_name: str, parameters: Dict[str, Any], action_type: str = "execute", risk_level: Optional[RiskLevel] = None, custom_message: Optional[str] = None ) -> PendingConfirmation: """ Créer une demande de confirmation. Args: workflow_name: Nom du workflow parameters: Paramètres de l'action action_type: Type d'action risk_level: Niveau de risque (auto-détecté si non fourni) custom_message: Message personnalisé Returns: PendingConfirmation créée """ # Auto-détecter le risque si non fourni if risk_level is None: risk_level = self.evaluate_risk(workflow_name, parameters, action_type) # Générer un ID unique self._confirmation_counter += 1 confirmation_id = f"conf_{self._confirmation_counter}_{datetime.now().strftime('%H%M%S')}" # Calculer le timeout config = self.RISK_CONFIG[risk_level] timeout = config["timeout_seconds"] or self.default_timeout expires_at = datetime.now() + timedelta(seconds=timeout) # Générer le message de confirmation if custom_message: message = custom_message else: message = self._generate_confirmation_message( workflow_name, parameters, risk_level ) # Générer les détails details = self._generate_details(workflow_name, parameters, risk_level) # Créer la confirmation confirmation = PendingConfirmation( id=confirmation_id, action_type=action_type, workflow_name=workflow_name, parameters=parameters, risk_level=risk_level, created_at=datetime.now(), expires_at=expires_at, confirmation_message=message, details=details ) # Stocker si confirmation requise if config["requires_confirmation"]: self.pending_confirmations[confirmation_id] = confirmation logger.info(f"Confirmation request created: {confirmation_id} ({risk_level.value})") return confirmation def _generate_confirmation_message( self, workflow_name: str, parameters: Dict[str, Any], risk_level: RiskLevel ) -> str: """Générer le message de confirmation.""" config = self.RISK_CONFIG[risk_level] template = config["message_template"] # Formater la liste des paramètres params_list = "\n".join([f" • {k}: {v}" for k, v in parameters.items()]) if not params_list: params_list = " (aucun paramètre)" # Générer les détails pour les actions critiques details = "" if risk_level == RiskLevel.CRITICAL: details = "Cette action peut affecter des données importantes." return template.format( workflow_name=workflow_name, params_list=params_list, details=details ) def _generate_details( self, workflow_name: str, parameters: Dict[str, Any], risk_level: RiskLevel ) -> List[str]: """Générer les détails de l'action.""" details = [] # Détails basiques details.append(f"Workflow: {workflow_name}") if parameters: for key, value in parameters.items(): details.append(f" {key}: {value}") # Avertissements selon le risque if risk_level == RiskLevel.CRITICAL: details.append("⚠️ Cette action est irréversible") elif risk_level == RiskLevel.HIGH: details.append("⚠️ Cette action modifiera des données") return details def confirm( self, confirmation_id: str, modified_parameters: Optional[Dict[str, Any]] = None ) -> Optional[PendingConfirmation]: """ Confirmer une action en attente. Args: confirmation_id: ID de la confirmation modified_parameters: Paramètres modifiés (optionnel) Returns: La confirmation mise à jour, ou None si non trouvée """ confirmation = self.pending_confirmations.get(confirmation_id) if not confirmation: logger.warning(f"Confirmation not found: {confirmation_id}") return None if confirmation.is_expired(): confirmation.status = ConfirmationStatus.EXPIRED logger.info(f"Confirmation expired: {confirmation_id}") return confirmation if modified_parameters: confirmation.status = ConfirmationStatus.MODIFIED confirmation.modified_parameters = modified_parameters else: confirmation.status = ConfirmationStatus.CONFIRMED logger.info(f"Confirmation confirmed: {confirmation_id}") return confirmation def deny(self, confirmation_id: str) -> Optional[PendingConfirmation]: """ Refuser une action en attente. Args: confirmation_id: ID de la confirmation Returns: La confirmation mise à jour, ou None si non trouvée """ confirmation = self.pending_confirmations.get(confirmation_id) if not confirmation: logger.warning(f"Confirmation not found: {confirmation_id}") return None confirmation.status = ConfirmationStatus.DENIED logger.info(f"Confirmation denied: {confirmation_id}") return confirmation def get_pending(self, confirmation_id: Optional[str] = None) -> Optional[PendingConfirmation]: """ Obtenir une ou la dernière confirmation en attente. Args: confirmation_id: ID spécifique (optionnel) Returns: La confirmation ou None """ if confirmation_id: return self.pending_confirmations.get(confirmation_id) # Retourner la dernière confirmation en attente non expirée for conf in reversed(list(self.pending_confirmations.values())): if conf.status == ConfirmationStatus.PENDING and not conf.is_expired(): return conf return None def cleanup_expired(self) -> int: """ Nettoyer les confirmations expirées. Returns: Nombre de confirmations nettoyées """ expired_ids = [ conf_id for conf_id, conf in self.pending_confirmations.items() if conf.is_expired() or conf.status != ConfirmationStatus.PENDING ] for conf_id in expired_ids: del self.pending_confirmations[conf_id] if expired_ids: logger.info(f"Cleaned up {len(expired_ids)} confirmations") return len(expired_ids) def requires_confirmation(self, risk_level: RiskLevel) -> bool: """Vérifier si un niveau de risque requiert une confirmation.""" return self.RISK_CONFIG[risk_level]["requires_confirmation"] # Singleton pour utilisation globale _confirmation_loop: Optional[ConfirmationLoop] = None def get_confirmation_loop() -> ConfirmationLoop: """Obtenir l'instance globale du gestionnaire de confirmations.""" global _confirmation_loop if _confirmation_loop is None: _confirmation_loop = ConfirmationLoop() return _confirmation_loop if __name__ == "__main__": # Tests rapides loop = ConfirmationLoop() test_cases = [ ("facturation_client", {"client": "Acme"}, "execute"), ("supprimer_donnees", {"table": "clients"}, "execute"), ("export_rapport", {"format": "pdf"}, "execute"), ("mise_a_jour_production", {"env": "live"}, "update"), ] print("=== Tests ConfirmationLoop ===\n") for workflow, params, action in test_cases: risk = loop.evaluate_risk(workflow, params, action) conf = loop.create_confirmation_request(workflow, params, action) print(f"Workflow: {workflow}") print(f" Risk: {risk.value}") print(f" Requires confirmation: {loop.requires_confirmation(risk)}") print(f" Message: {conf.confirmation_message[:100]}...") print()