feat(agent_chat): Ajout des composants conversationnels
Nouveaux composants pour l'agent conversationnel : - IntentParser: Analyse des intentions utilisateur (règles + LLM optionnel) - ConfirmationLoop: Validation avant actions critiques (niveaux de risque) - ResponseGenerator: Génération de réponses en langage naturel - ConversationManager: Gestion du contexte multi-tour Endpoint /api/chat ajouté pour le flux conversationnel complet. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
489
agent_chat/conversation_manager.py
Normal file
489
agent_chat/conversation_manager.py
Normal file
@@ -0,0 +1,489 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
RPA Vision V3 - ConversationManager
|
||||
Gestionnaire de contexte multi-tour pour l'agent conversationnel.
|
||||
|
||||
Ce module gère :
|
||||
- L'historique des échanges
|
||||
- Le contexte courant (workflow en cours, paramètres accumulés)
|
||||
- La résolution des références anaphoriques ("le", "ça", "celui-ci")
|
||||
- Les sessions utilisateur
|
||||
|
||||
Auteur: Dom - Janvier 2026
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any, List, Optional, Tuple
|
||||
from pathlib import Path
|
||||
import hashlib
|
||||
|
||||
from .intent_parser import ParsedIntent, IntentType
|
||||
from .confirmation import PendingConfirmation
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConversationTurn:
|
||||
"""Un tour de conversation."""
|
||||
turn_id: int
|
||||
timestamp: datetime
|
||||
user_message: str
|
||||
intent: ParsedIntent
|
||||
response: str
|
||||
action_taken: Optional[str] = None
|
||||
result: Optional[Dict[str, Any]] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"turn_id": self.turn_id,
|
||||
"timestamp": self.timestamp.isoformat(),
|
||||
"user_message": self.user_message,
|
||||
"intent": self.intent.to_dict(),
|
||||
"response": self.response,
|
||||
"action_taken": self.action_taken,
|
||||
"result": self.result
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConversationContext:
|
||||
"""Contexte courant de la conversation."""
|
||||
# Workflow en cours de discussion
|
||||
current_workflow: Optional[str] = None
|
||||
current_workflow_params: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
# Confirmation en attente
|
||||
pending_confirmation: Optional[PendingConfirmation] = None
|
||||
|
||||
# Entités mentionnées (pour références anaphoriques)
|
||||
mentioned_entities: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
# Dernier résultat d'exécution
|
||||
last_execution_result: Optional[Dict[str, Any]] = None
|
||||
|
||||
# État de la conversation
|
||||
awaiting_clarification: bool = False
|
||||
clarification_for: Optional[str] = None
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"current_workflow": self.current_workflow,
|
||||
"current_workflow_params": self.current_workflow_params,
|
||||
"pending_confirmation": self.pending_confirmation.to_dict() if self.pending_confirmation else None,
|
||||
"mentioned_entities": self.mentioned_entities,
|
||||
"last_execution_result": self.last_execution_result,
|
||||
"awaiting_clarification": self.awaiting_clarification,
|
||||
"clarification_for": self.clarification_for
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConversationSession:
|
||||
"""Session de conversation utilisateur."""
|
||||
session_id: str
|
||||
user_id: Optional[str]
|
||||
created_at: datetime
|
||||
last_activity: datetime
|
||||
turns: List[ConversationTurn] = field(default_factory=list)
|
||||
context: ConversationContext = field(default_factory=ConversationContext)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"session_id": self.session_id,
|
||||
"user_id": self.user_id,
|
||||
"created_at": self.created_at.isoformat(),
|
||||
"last_activity": self.last_activity.isoformat(),
|
||||
"turns_count": len(self.turns),
|
||||
"context": self.context.to_dict()
|
||||
}
|
||||
|
||||
|
||||
class ConversationManager:
|
||||
"""
|
||||
Gestionnaire de conversations multi-tour.
|
||||
|
||||
Maintient le contexte à travers plusieurs échanges :
|
||||
- Historique des tours
|
||||
- Entités mentionnées
|
||||
- Workflow et paramètres en cours
|
||||
- Résolution des références
|
||||
"""
|
||||
|
||||
# Mots de référence anaphorique
|
||||
ANAPHORIC_REFERENCES = {
|
||||
"le": "workflow",
|
||||
"la": "workflow",
|
||||
"celui-ci": "workflow",
|
||||
"celle-ci": "workflow",
|
||||
"ça": "last_action",
|
||||
"cela": "last_action",
|
||||
"lui": "entity",
|
||||
"elle": "entity",
|
||||
"ce client": "client",
|
||||
"cette facture": "invoice",
|
||||
"le même": "last_value",
|
||||
"pareil": "last_value",
|
||||
"idem": "last_value"
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
session_timeout_minutes: int = 30,
|
||||
max_history_turns: int = 50,
|
||||
persistence_path: Optional[Path] = None
|
||||
):
|
||||
"""
|
||||
Initialiser le gestionnaire de conversations.
|
||||
|
||||
Args:
|
||||
session_timeout_minutes: Timeout d'inactivité
|
||||
max_history_turns: Nombre max de tours à garder
|
||||
persistence_path: Chemin pour persister les sessions
|
||||
"""
|
||||
self.session_timeout = timedelta(minutes=session_timeout_minutes)
|
||||
self.max_history_turns = max_history_turns
|
||||
self.persistence_path = persistence_path
|
||||
self.sessions: Dict[str, ConversationSession] = {}
|
||||
|
||||
if persistence_path:
|
||||
self._load_sessions()
|
||||
|
||||
def get_or_create_session(
|
||||
self,
|
||||
session_id: Optional[str] = None,
|
||||
user_id: Optional[str] = None
|
||||
) -> ConversationSession:
|
||||
"""
|
||||
Obtenir ou créer une session de conversation.
|
||||
|
||||
Args:
|
||||
session_id: ID de session (généré si non fourni)
|
||||
user_id: ID utilisateur (optionnel)
|
||||
|
||||
Returns:
|
||||
ConversationSession active
|
||||
"""
|
||||
# Générer un ID si non fourni
|
||||
if not session_id:
|
||||
session_id = self._generate_session_id(user_id)
|
||||
|
||||
# Vérifier si la session existe et n'est pas expirée
|
||||
if session_id in self.sessions:
|
||||
session = self.sessions[session_id]
|
||||
if datetime.now() - session.last_activity < self.session_timeout:
|
||||
session.last_activity = datetime.now()
|
||||
return session
|
||||
else:
|
||||
# Session expirée, créer une nouvelle
|
||||
logger.info(f"Session expired: {session_id}")
|
||||
del self.sessions[session_id]
|
||||
|
||||
# Créer une nouvelle session
|
||||
session = ConversationSession(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
created_at=datetime.now(),
|
||||
last_activity=datetime.now()
|
||||
)
|
||||
self.sessions[session_id] = session
|
||||
logger.info(f"New session created: {session_id}")
|
||||
|
||||
return session
|
||||
|
||||
def add_turn(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
user_message: str,
|
||||
intent: ParsedIntent,
|
||||
response: str,
|
||||
action_taken: Optional[str] = None,
|
||||
result: Optional[Dict[str, Any]] = None
|
||||
) -> ConversationTurn:
|
||||
"""
|
||||
Ajouter un tour à la conversation.
|
||||
|
||||
Args:
|
||||
session: Session de conversation
|
||||
user_message: Message utilisateur
|
||||
intent: Intention parsée
|
||||
response: Réponse générée
|
||||
action_taken: Action effectuée
|
||||
result: Résultat de l'action
|
||||
|
||||
Returns:
|
||||
ConversationTurn créé
|
||||
"""
|
||||
turn = ConversationTurn(
|
||||
turn_id=len(session.turns) + 1,
|
||||
timestamp=datetime.now(),
|
||||
user_message=user_message,
|
||||
intent=intent,
|
||||
response=response,
|
||||
action_taken=action_taken,
|
||||
result=result
|
||||
)
|
||||
|
||||
session.turns.append(turn)
|
||||
session.last_activity = datetime.now()
|
||||
|
||||
# Mettre à jour le contexte
|
||||
self._update_context(session, intent, result)
|
||||
|
||||
# Limiter l'historique
|
||||
if len(session.turns) > self.max_history_turns:
|
||||
session.turns = session.turns[-self.max_history_turns:]
|
||||
|
||||
# Persister si configuré
|
||||
if self.persistence_path:
|
||||
self._save_session(session)
|
||||
|
||||
return turn
|
||||
|
||||
def resolve_references(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
intent: ParsedIntent
|
||||
) -> ParsedIntent:
|
||||
"""
|
||||
Résoudre les références anaphoriques dans l'intention.
|
||||
|
||||
Args:
|
||||
session: Session de conversation
|
||||
intent: Intention à résoudre
|
||||
|
||||
Returns:
|
||||
ParsedIntent avec références résolues
|
||||
"""
|
||||
resolved_intent = intent
|
||||
context = session.context
|
||||
|
||||
# Vérifier si le message contient des références
|
||||
message_lower = intent.raw_query.lower()
|
||||
|
||||
for reference, ref_type in self.ANAPHORIC_REFERENCES.items():
|
||||
if reference in message_lower:
|
||||
if ref_type == "workflow" and context.current_workflow:
|
||||
# Référence au workflow courant
|
||||
if not resolved_intent.workflow_hint:
|
||||
resolved_intent.workflow_hint = context.current_workflow
|
||||
logger.debug(f"Resolved '{reference}' to workflow: {context.current_workflow}")
|
||||
|
||||
elif ref_type == "last_action" and context.last_execution_result:
|
||||
# Référence à la dernière action
|
||||
resolved_intent.parameters.update(
|
||||
context.last_execution_result.get("params", {})
|
||||
)
|
||||
|
||||
elif ref_type == "entity" and context.mentioned_entities:
|
||||
# Référence à une entité mentionnée
|
||||
for entity_type, entity_value in context.mentioned_entities.items():
|
||||
if entity_type not in resolved_intent.parameters:
|
||||
resolved_intent.parameters[entity_type] = entity_value
|
||||
|
||||
elif ref_type in context.mentioned_entities:
|
||||
# Référence à un type d'entité spécifique
|
||||
if ref_type not in resolved_intent.parameters:
|
||||
resolved_intent.parameters[ref_type] = context.mentioned_entities[ref_type]
|
||||
|
||||
elif ref_type == "last_value" and context.current_workflow_params:
|
||||
# Répéter les derniers paramètres
|
||||
for key, value in context.current_workflow_params.items():
|
||||
if key not in resolved_intent.parameters:
|
||||
resolved_intent.parameters[key] = value
|
||||
|
||||
return resolved_intent
|
||||
|
||||
def _update_context(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
intent: ParsedIntent,
|
||||
result: Optional[Dict[str, Any]]
|
||||
):
|
||||
"""Mettre à jour le contexte après un tour."""
|
||||
context = session.context
|
||||
|
||||
# Mettre à jour le workflow courant
|
||||
if intent.intent_type == IntentType.EXECUTE:
|
||||
if intent.workflow_hint:
|
||||
context.current_workflow = intent.workflow_hint
|
||||
if intent.parameters:
|
||||
context.current_workflow_params.update(intent.parameters)
|
||||
|
||||
# Mettre à jour les entités mentionnées
|
||||
for entity in intent.entities:
|
||||
entity_type = entity["type"]
|
||||
entity_value = entity["value"]
|
||||
context.mentioned_entities[entity_type] = entity_value
|
||||
|
||||
# Mettre à jour le dernier résultat
|
||||
if result and result.get("success"):
|
||||
context.last_execution_result = result
|
||||
|
||||
# Gérer les clarifications
|
||||
if intent.clarification_needed:
|
||||
context.awaiting_clarification = True
|
||||
context.clarification_for = intent.workflow_hint
|
||||
elif intent.intent_type in [IntentType.CONFIRM, IntentType.DENY]:
|
||||
context.awaiting_clarification = False
|
||||
context.clarification_for = None
|
||||
|
||||
def get_context_summary(self, session: ConversationSession) -> Dict[str, Any]:
|
||||
"""
|
||||
Obtenir un résumé du contexte pour les prompts LLM.
|
||||
|
||||
Args:
|
||||
session: Session de conversation
|
||||
|
||||
Returns:
|
||||
Dict résumant le contexte
|
||||
"""
|
||||
context = session.context
|
||||
|
||||
# Résumé des derniers tours
|
||||
recent_turns = []
|
||||
for turn in session.turns[-5:]:
|
||||
recent_turns.append({
|
||||
"user": turn.user_message,
|
||||
"intent": turn.intent.intent_type.value,
|
||||
"action": turn.action_taken
|
||||
})
|
||||
|
||||
return {
|
||||
"current_workflow": context.current_workflow,
|
||||
"current_params": context.current_workflow_params,
|
||||
"mentioned_entities": context.mentioned_entities,
|
||||
"awaiting_clarification": context.awaiting_clarification,
|
||||
"recent_turns": recent_turns
|
||||
}
|
||||
|
||||
def set_pending_confirmation(
|
||||
self,
|
||||
session: ConversationSession,
|
||||
confirmation: PendingConfirmation
|
||||
):
|
||||
"""Définir une confirmation en attente."""
|
||||
session.context.pending_confirmation = confirmation
|
||||
|
||||
def get_pending_confirmation(
|
||||
self,
|
||||
session: ConversationSession
|
||||
) -> Optional[PendingConfirmation]:
|
||||
"""Obtenir la confirmation en attente."""
|
||||
return session.context.pending_confirmation
|
||||
|
||||
def clear_pending_confirmation(self, session: ConversationSession):
|
||||
"""Effacer la confirmation en attente."""
|
||||
session.context.pending_confirmation = None
|
||||
|
||||
def cleanup_expired_sessions(self) -> int:
|
||||
"""
|
||||
Nettoyer les sessions expirées.
|
||||
|
||||
Returns:
|
||||
Nombre de sessions nettoyées
|
||||
"""
|
||||
now = datetime.now()
|
||||
expired = [
|
||||
sid for sid, session in self.sessions.items()
|
||||
if now - session.last_activity > self.session_timeout
|
||||
]
|
||||
|
||||
for sid in expired:
|
||||
del self.sessions[sid]
|
||||
logger.info(f"Session cleaned up: {sid}")
|
||||
|
||||
return len(expired)
|
||||
|
||||
def _generate_session_id(self, user_id: Optional[str]) -> str:
|
||||
"""Générer un ID de session unique."""
|
||||
base = f"{user_id or 'anon'}_{datetime.now().isoformat()}"
|
||||
return hashlib.md5(base.encode()).hexdigest()[:16]
|
||||
|
||||
def _load_sessions(self):
|
||||
"""Charger les sessions depuis le stockage."""
|
||||
if not self.persistence_path:
|
||||
return
|
||||
|
||||
sessions_file = self.persistence_path / "sessions.json"
|
||||
if sessions_file.exists():
|
||||
try:
|
||||
with open(sessions_file, 'r') as f:
|
||||
data = json.load(f)
|
||||
# TODO: Reconstruire les sessions depuis JSON
|
||||
logger.info(f"Loaded {len(data)} sessions")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to load sessions: {e}")
|
||||
|
||||
def _save_session(self, session: ConversationSession):
|
||||
"""Sauvegarder une session."""
|
||||
if not self.persistence_path:
|
||||
return
|
||||
|
||||
try:
|
||||
self.persistence_path.mkdir(parents=True, exist_ok=True)
|
||||
session_file = self.persistence_path / f"session_{session.session_id}.json"
|
||||
|
||||
with open(session_file, 'w') as f:
|
||||
json.dump(session.to_dict(), f, indent=2, default=str)
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save session: {e}")
|
||||
|
||||
|
||||
# Singleton pour utilisation globale
|
||||
_conversation_manager: Optional[ConversationManager] = None
|
||||
|
||||
def get_conversation_manager(
|
||||
persistence_path: Optional[Path] = None
|
||||
) -> ConversationManager:
|
||||
"""Obtenir l'instance globale du gestionnaire de conversations."""
|
||||
global _conversation_manager
|
||||
if _conversation_manager is None:
|
||||
_conversation_manager = ConversationManager(persistence_path=persistence_path)
|
||||
return _conversation_manager
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Tests rapides
|
||||
from .intent_parser import IntentParser
|
||||
|
||||
parser = IntentParser()
|
||||
manager = ConversationManager()
|
||||
|
||||
# Simuler une conversation
|
||||
session = manager.get_or_create_session(user_id="test_user")
|
||||
|
||||
test_conversation = [
|
||||
"facturer le client Acme",
|
||||
"oui",
|
||||
"et pour le client Beta ?", # Référence implicite
|
||||
"le même format", # Référence anaphorique
|
||||
"statut",
|
||||
]
|
||||
|
||||
print("=== Tests ConversationManager ===\n")
|
||||
for message in test_conversation:
|
||||
intent = parser.parse(message)
|
||||
|
||||
# Résoudre les références
|
||||
resolved = manager.resolve_references(session, intent)
|
||||
|
||||
print(f"User: {message}")
|
||||
print(f"Intent: {resolved.intent_type.value}")
|
||||
print(f"Workflow: {resolved.workflow_hint}")
|
||||
print(f"Params: {resolved.parameters}")
|
||||
|
||||
# Ajouter le tour
|
||||
manager.add_turn(
|
||||
session=session,
|
||||
user_message=message,
|
||||
intent=resolved,
|
||||
response="[Réponse simulée]",
|
||||
action_taken=resolved.intent_type.value
|
||||
)
|
||||
|
||||
print(f"Context workflow: {session.context.current_workflow}")
|
||||
print()
|
||||
Reference in New Issue
Block a user