#!/usr/bin/env python3 """ RPA Vision V3 - ConversationManager Gestionnaire de contexte multi-tour pour l'agent conversationnel. Ce module gère : - L'historique des échanges - Le contexte courant (workflow en cours, paramètres accumulés) - La résolution des références anaphoriques ("le", "ça", "celui-ci") - Les sessions utilisateur Auteur: Dom - Janvier 2026 """ import json import logging from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Dict, Any, List, Optional, Tuple from pathlib import Path import hashlib from .intent_parser import ParsedIntent, IntentType from .confirmation import PendingConfirmation logger = logging.getLogger(__name__) @dataclass class ConversationTurn: """Un tour de conversation.""" turn_id: int timestamp: datetime user_message: str intent: ParsedIntent response: str action_taken: Optional[str] = None result: Optional[Dict[str, Any]] = None def to_dict(self) -> Dict[str, Any]: return { "turn_id": self.turn_id, "timestamp": self.timestamp.isoformat(), "user_message": self.user_message, "intent": self.intent.to_dict(), "response": self.response, "action_taken": self.action_taken, "result": self.result } @dataclass class ConversationContext: """Contexte courant de la conversation.""" # Workflow en cours de discussion current_workflow: Optional[str] = None current_workflow_params: Dict[str, Any] = field(default_factory=dict) # Confirmation en attente pending_confirmation: Optional[PendingConfirmation] = None # Entités mentionnées (pour références anaphoriques) mentioned_entities: Dict[str, Any] = field(default_factory=dict) # Dernier résultat d'exécution last_execution_result: Optional[Dict[str, Any]] = None # État de la conversation awaiting_clarification: bool = False clarification_for: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return { "current_workflow": self.current_workflow, "current_workflow_params": self.current_workflow_params, "pending_confirmation": self.pending_confirmation.to_dict() if self.pending_confirmation else None, "mentioned_entities": self.mentioned_entities, "last_execution_result": self.last_execution_result, "awaiting_clarification": self.awaiting_clarification, "clarification_for": self.clarification_for } @dataclass class ConversationSession: """Session de conversation utilisateur.""" session_id: str user_id: Optional[str] created_at: datetime last_activity: datetime turns: List[ConversationTurn] = field(default_factory=list) context: ConversationContext = field(default_factory=ConversationContext) def to_dict(self) -> Dict[str, Any]: return { "session_id": self.session_id, "user_id": self.user_id, "created_at": self.created_at.isoformat(), "last_activity": self.last_activity.isoformat(), "turns_count": len(self.turns), "context": self.context.to_dict() } class ConversationManager: """ Gestionnaire de conversations multi-tour. Maintient le contexte à travers plusieurs échanges : - Historique des tours - Entités mentionnées - Workflow et paramètres en cours - Résolution des références """ # Mots de référence anaphorique ANAPHORIC_REFERENCES = { "le": "workflow", "la": "workflow", "celui-ci": "workflow", "celle-ci": "workflow", "ça": "last_action", "cela": "last_action", "lui": "entity", "elle": "entity", "ce client": "client", "cette facture": "invoice", "le même": "last_value", "pareil": "last_value", "idem": "last_value" } def __init__( self, session_timeout_minutes: int = 30, max_history_turns: int = 50, persistence_path: Optional[Path] = None ): """ Initialiser le gestionnaire de conversations. Args: session_timeout_minutes: Timeout d'inactivité max_history_turns: Nombre max de tours à garder persistence_path: Chemin pour persister les sessions """ self.session_timeout = timedelta(minutes=session_timeout_minutes) self.max_history_turns = max_history_turns self.persistence_path = persistence_path self.sessions: Dict[str, ConversationSession] = {} if persistence_path: self._load_sessions() def get_or_create_session( self, session_id: Optional[str] = None, user_id: Optional[str] = None ) -> ConversationSession: """ Obtenir ou créer une session de conversation. Args: session_id: ID de session (généré si non fourni) user_id: ID utilisateur (optionnel) Returns: ConversationSession active """ # Générer un ID si non fourni if not session_id: session_id = self._generate_session_id(user_id) # Vérifier si la session existe et n'est pas expirée if session_id in self.sessions: session = self.sessions[session_id] if datetime.now() - session.last_activity < self.session_timeout: session.last_activity = datetime.now() return session else: # Session expirée, créer une nouvelle logger.info(f"Session expired: {session_id}") del self.sessions[session_id] # Créer une nouvelle session session = ConversationSession( session_id=session_id, user_id=user_id, created_at=datetime.now(), last_activity=datetime.now() ) self.sessions[session_id] = session logger.info(f"New session created: {session_id}") return session def add_turn( self, session: ConversationSession, user_message: str, intent: ParsedIntent, response: str, action_taken: Optional[str] = None, result: Optional[Dict[str, Any]] = None ) -> ConversationTurn: """ Ajouter un tour à la conversation. Args: session: Session de conversation user_message: Message utilisateur intent: Intention parsée response: Réponse générée action_taken: Action effectuée result: Résultat de l'action Returns: ConversationTurn créé """ turn = ConversationTurn( turn_id=len(session.turns) + 1, timestamp=datetime.now(), user_message=user_message, intent=intent, response=response, action_taken=action_taken, result=result ) session.turns.append(turn) session.last_activity = datetime.now() # Mettre à jour le contexte self._update_context(session, intent, result) # Limiter l'historique if len(session.turns) > self.max_history_turns: session.turns = session.turns[-self.max_history_turns:] # Persister si configuré if self.persistence_path: self._save_session(session) return turn def resolve_references( self, session: ConversationSession, intent: ParsedIntent ) -> ParsedIntent: """ Résoudre les références anaphoriques dans l'intention. Args: session: Session de conversation intent: Intention à résoudre Returns: ParsedIntent avec références résolues """ resolved_intent = intent context = session.context # Vérifier si le message contient des références message_lower = intent.raw_query.lower() for reference, ref_type in self.ANAPHORIC_REFERENCES.items(): if reference in message_lower: if ref_type == "workflow" and context.current_workflow: # Référence au workflow courant if not resolved_intent.workflow_hint: resolved_intent.workflow_hint = context.current_workflow logger.debug(f"Resolved '{reference}' to workflow: {context.current_workflow}") elif ref_type == "last_action" and context.last_execution_result: # Référence à la dernière action resolved_intent.parameters.update( context.last_execution_result.get("params", {}) ) elif ref_type == "entity" and context.mentioned_entities: # Référence à une entité mentionnée for entity_type, entity_value in context.mentioned_entities.items(): if entity_type not in resolved_intent.parameters: resolved_intent.parameters[entity_type] = entity_value elif ref_type in context.mentioned_entities: # Référence à un type d'entité spécifique if ref_type not in resolved_intent.parameters: resolved_intent.parameters[ref_type] = context.mentioned_entities[ref_type] elif ref_type == "last_value" and context.current_workflow_params: # Répéter les derniers paramètres for key, value in context.current_workflow_params.items(): if key not in resolved_intent.parameters: resolved_intent.parameters[key] = value return resolved_intent def _update_context( self, session: ConversationSession, intent: ParsedIntent, result: Optional[Dict[str, Any]] ): """Mettre à jour le contexte après un tour.""" context = session.context # Mettre à jour le workflow courant if intent.intent_type == IntentType.EXECUTE: if intent.workflow_hint: context.current_workflow = intent.workflow_hint if intent.parameters: context.current_workflow_params.update(intent.parameters) # Mettre à jour les entités mentionnées for entity in intent.entities: entity_type = entity["type"] entity_value = entity["value"] context.mentioned_entities[entity_type] = entity_value # Mettre à jour le dernier résultat if result and result.get("success"): context.last_execution_result = result # Gérer les clarifications if intent.clarification_needed: context.awaiting_clarification = True context.clarification_for = intent.workflow_hint elif intent.intent_type in [IntentType.CONFIRM, IntentType.DENY]: context.awaiting_clarification = False context.clarification_for = None def get_context_summary(self, session: ConversationSession) -> Dict[str, Any]: """ Obtenir un résumé du contexte pour les prompts LLM. Args: session: Session de conversation Returns: Dict résumant le contexte """ context = session.context # Résumé des derniers tours recent_turns = [] for turn in session.turns[-5:]: recent_turns.append({ "user": turn.user_message, "intent": turn.intent.intent_type.value, "action": turn.action_taken }) return { "current_workflow": context.current_workflow, "current_params": context.current_workflow_params, "mentioned_entities": context.mentioned_entities, "awaiting_clarification": context.awaiting_clarification, "recent_turns": recent_turns } def set_pending_confirmation( self, session: ConversationSession, confirmation: PendingConfirmation ): """Définir une confirmation en attente.""" session.context.pending_confirmation = confirmation def get_pending_confirmation( self, session: ConversationSession ) -> Optional[PendingConfirmation]: """Obtenir la confirmation en attente.""" return session.context.pending_confirmation def clear_pending_confirmation(self, session: ConversationSession): """Effacer la confirmation en attente.""" session.context.pending_confirmation = None def cleanup_expired_sessions(self) -> int: """ Nettoyer les sessions expirées. Returns: Nombre de sessions nettoyées """ now = datetime.now() expired = [ sid for sid, session in self.sessions.items() if now - session.last_activity > self.session_timeout ] for sid in expired: del self.sessions[sid] logger.info(f"Session cleaned up: {sid}") return len(expired) def _generate_session_id(self, user_id: Optional[str]) -> str: """Générer un ID de session unique.""" base = f"{user_id or 'anon'}_{datetime.now().isoformat()}" return hashlib.md5(base.encode()).hexdigest()[:16] def _load_sessions(self): """Charger les sessions depuis le stockage.""" if not self.persistence_path: return sessions_file = self.persistence_path / "sessions.json" if sessions_file.exists(): try: with open(sessions_file, 'r') as f: data = json.load(f) # TODO: Reconstruire les sessions depuis JSON logger.info(f"Loaded {len(data)} sessions") except Exception as e: logger.warning(f"Failed to load sessions: {e}") def _save_session(self, session: ConversationSession): """Sauvegarder une session.""" if not self.persistence_path: return try: self.persistence_path.mkdir(parents=True, exist_ok=True) session_file = self.persistence_path / f"session_{session.session_id}.json" with open(session_file, 'w') as f: json.dump(session.to_dict(), f, indent=2, default=str) except Exception as e: logger.warning(f"Failed to save session: {e}") # Singleton pour utilisation globale _conversation_manager: Optional[ConversationManager] = None def get_conversation_manager( persistence_path: Optional[Path] = None ) -> ConversationManager: """Obtenir l'instance globale du gestionnaire de conversations.""" global _conversation_manager if _conversation_manager is None: _conversation_manager = ConversationManager(persistence_path=persistence_path) return _conversation_manager if __name__ == "__main__": # Tests rapides from .intent_parser import IntentParser parser = IntentParser() manager = ConversationManager() # Simuler une conversation session = manager.get_or_create_session(user_id="test_user") test_conversation = [ "facturer le client Acme", "oui", "et pour le client Beta ?", # Référence implicite "le même format", # Référence anaphorique "statut", ] print("=== Tests ConversationManager ===\n") for message in test_conversation: intent = parser.parse(message) # Résoudre les références resolved = manager.resolve_references(session, intent) print(f"User: {message}") print(f"Intent: {resolved.intent_type.value}") print(f"Workflow: {resolved.workflow_hint}") print(f"Params: {resolved.parameters}") # Ajouter le tour manager.add_turn( session=session, user_message=message, intent=resolved, response="[Réponse simulée]", action_taken=resolved.intent_type.value ) print(f"Context workflow: {session.context.current_workflow}") print()