""" LLMActionHandler — Gestionnaire d'actions LLM pour les workflows DAG Gère les appels LLM via l'API Ollama /api/chat pour les étapes de workflow. Chaque action est un appel synchrone (bloquant) qui sera exécuté dans le ThreadPool LLM du DAGExecutor. Actions supportées : - analyze_text : Analyser / résumer un texte - translate : Traduire un texte vers une langue cible - extract_data : Extraire des données structurées d'un texte - generate_text : Générer du texte à partir d'un prompt Auteur : Dom, Claude Date : 16 mars 2026 """ import json import logging from typing import Any, Dict, Optional import requests logger = logging.getLogger(__name__) class LLMActionHandler: """Gestionnaire d'appels LLM pour les étapes de workflow. Utilise l'API Ollama /api/chat (mode conversationnel) pour toutes les interactions avec le modèle de langage. Args: ollama_endpoint: URL de l'API Ollama model: Nom du modèle à utiliser temperature: Température de génération par défaut timeout: Timeout par appel en secondes """ def __init__( self, ollama_endpoint: str = "http://localhost:11434", model: str = "qwen3-vl:8b", temperature: float = 0.1, timeout: int = 120, ): self.endpoint = ollama_endpoint.rstrip("/") self.model = model self.temperature = temperature self.timeout = timeout # ----------------------------------------------------------------- # Dispatcher principal # ----------------------------------------------------------------- def execute(self, action: Dict[str, Any], context: Dict[str, Any]) -> Any: """Dispatcher vers la bonne action LLM. Args: action: Dict contenant au minimum 'llm_action' (nom de l'action) et les paramètres spécifiques à l'action context: Contexte d'exécution (résultats précédents, step_id, etc.) Returns: Résultat de l'action (texte, dict, etc.) Raises: ValueError: Si l'action LLM est inconnue RuntimeError: Si l'appel à Ollama échoue """ llm_action = action.get("llm_action", "") dispatch = { "analyze_text": self._dispatch_analyze, "translate": self._dispatch_translate, "extract_data": self._dispatch_extract, "generate_text": self._dispatch_generate, } handler = dispatch.get(llm_action) if handler is None: raise ValueError( f"Action LLM inconnue : '{llm_action}'. " f"Actions supportées : {list(dispatch.keys())}" ) return handler(action, context) # ----------------------------------------------------------------- # Actions LLM # ----------------------------------------------------------------- def analyze_text( self, text: str, instruction: str = "Analyse et résume ce texte.", model: Optional[str] = None, temperature: Optional[float] = None, ) -> str: """Analyser ou résumer un texte. Args: text: Texte à analyser instruction: Instruction pour l'analyse model: Modèle spécifique (sinon modèle par défaut) temperature: Température spécifique Returns: Texte de l'analyse """ system_prompt = ( "Tu es un assistant d'analyse de texte. " "Réponds de manière concise et structurée." ) user_message = f"{instruction}\n\nTexte :\n{text}" return self._chat( system_prompt=system_prompt, user_message=user_message, model=model, temperature=temperature, ) def translate( self, text: str, target_lang: str, source_lang: Optional[str] = None, model: Optional[str] = None, temperature: Optional[float] = None, ) -> str: """Traduire un texte vers une langue cible. Args: text: Texte à traduire target_lang: Langue cible (ex: "français", "chinois", "english") source_lang: Langue source (détection auto si None) model: Modèle spécifique temperature: Température spécifique Returns: Texte traduit """ system_prompt = ( "Tu es un traducteur professionnel. " "Traduis le texte fidèlement, sans ajouter d'explication. " "Retourne uniquement la traduction." ) if source_lang: user_message = ( f"Traduis le texte suivant du {source_lang} " f"vers le {target_lang} :\n\n{text}" ) else: user_message = ( f"Traduis le texte suivant en {target_lang} :\n\n{text}" ) return self._chat( system_prompt=system_prompt, user_message=user_message, model=model, temperature=temperature, ) def extract_data( self, text: str, schema: Dict[str, Any], model: Optional[str] = None, temperature: Optional[float] = None, ) -> Dict[str, Any]: """Extraire des données structurées d'un texte. Args: text: Texte source schema: Schéma des données à extraire (clés attendues + descriptions) model: Modèle spécifique temperature: Température spécifique Returns: Dict avec les données extraites """ schema_desc = json.dumps(schema, ensure_ascii=False, indent=2) system_prompt = ( "Tu es un extracteur de données. " "Extrais les informations demandées du texte et retourne " "un JSON valide correspondant au schéma fourni. " "Retourne UNIQUEMENT le JSON, sans explication." ) user_message = ( f"Schéma attendu :\n{schema_desc}\n\n" f"Texte source :\n{text}" ) response = self._chat( system_prompt=system_prompt, user_message=user_message, model=model, temperature=temperature, force_json=True, ) # Parser le JSON de la réponse try: return json.loads(response) except json.JSONDecodeError: # Tenter d'extraire le JSON de la réponse logger.warning( "Réponse LLM non-JSON pour extract_data, tentative d'extraction" ) return self._try_extract_json(response) def generate_text( self, prompt: str, context: str = "", model: Optional[str] = None, temperature: Optional[float] = None, ) -> str: """Générer du texte à partir d'un prompt. Args: prompt: Instruction de génération context: Contexte additionnel model: Modèle spécifique temperature: Température spécifique Returns: Texte généré """ system_prompt = ( "Tu es un assistant de rédaction. " "Génère le contenu demandé de manière claire et précise." ) user_message = prompt if context: user_message = f"Contexte :\n{context}\n\nInstruction :\n{prompt}" return self._chat( system_prompt=system_prompt, user_message=user_message, model=model, temperature=temperature, ) # ----------------------------------------------------------------- # Dispatchers internes (adaptent les paramètres d'action) # ----------------------------------------------------------------- def _dispatch_analyze( self, action: Dict[str, Any], context: Dict[str, Any] ) -> str: """Dispatche une action analyze_text depuis les paramètres du workflow.""" return self.analyze_text( text=action.get("text", ""), instruction=action.get("instruction", "Analyse et résume ce texte."), model=action.get("model"), temperature=action.get("temperature"), ) def _dispatch_translate( self, action: Dict[str, Any], context: Dict[str, Any] ) -> str: """Dispatche une action translate depuis les paramètres du workflow.""" return self.translate( text=action.get("text", ""), target_lang=action.get("target_lang", "français"), source_lang=action.get("source_lang"), model=action.get("model"), temperature=action.get("temperature"), ) def _dispatch_extract( self, action: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """Dispatche une action extract_data depuis les paramètres du workflow.""" return self.extract_data( text=action.get("text", ""), schema=action.get("schema", {}), model=action.get("model"), temperature=action.get("temperature"), ) def _dispatch_generate( self, action: Dict[str, Any], context: Dict[str, Any] ) -> str: """Dispatche une action generate_text depuis les paramètres du workflow.""" return self.generate_text( prompt=action.get("prompt", ""), context=action.get("context", ""), model=action.get("model"), temperature=action.get("temperature"), ) # ----------------------------------------------------------------- # Communication avec Ollama via /api/chat # ----------------------------------------------------------------- def _chat( self, system_prompt: str, user_message: str, model: Optional[str] = None, temperature: Optional[float] = None, force_json: bool = False, ) -> str: """Appel à l'API /api/chat d'Ollama. Args: system_prompt: Message système user_message: Message utilisateur model: Modèle (défaut: self.model) temperature: Température (défaut: self.temperature) force_json: Forcer la sortie JSON Returns: Contenu de la réponse du modèle Raises: RuntimeError: Si l'appel échoue """ effective_model = model or self.model effective_temp = temperature if temperature is not None else self.temperature # Pour Qwen3, désactiver le mode thinking pour des réponses directes effective_user_message = user_message if "qwen" in effective_model.lower(): effective_user_message = f"/nothink {user_message}" messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": effective_user_message}, ] payload = { "model": effective_model, "messages": messages, "stream": False, "options": { "temperature": effective_temp, }, } if force_json: payload["format"] = "json" url = f"{self.endpoint}/api/chat" try: response = requests.post( url, json=payload, timeout=self.timeout, ) if response.status_code != 200: raise RuntimeError( f"Ollama /api/chat a retourné HTTP {response.status_code} : " f"{response.text[:500]}" ) data = response.json() content = data.get("message", {}).get("content", "") if not content: raise RuntimeError( "Ollama a retourné une réponse vide" ) return content except requests.exceptions.Timeout: raise RuntimeError( f"Timeout de {self.timeout}s dépassé pour l'appel LLM " f"(modèle: {effective_model})" ) except requests.exceptions.ConnectionError: raise RuntimeError( f"Impossible de se connecter à Ollama sur {self.endpoint}. " f"Vérifiez que le service est lancé." ) # ----------------------------------------------------------------- # Utilitaires # ----------------------------------------------------------------- @staticmethod def _try_extract_json(text: str) -> Dict[str, Any]: """Tente d'extraire un objet JSON d'un texte libre. Cherche le premier { et le dernier } pour isoler le JSON. """ start = text.find("{") end = text.rfind("}") if start != -1 and end != -1 and end > start: try: return json.loads(text[start : end + 1]) except json.JSONDecodeError: pass logger.error("Impossible d'extraire du JSON de la réponse LLM") return {"raw_response": text, "_parse_error": True} def check_connection(self) -> bool: """Vérifie la connexion à Ollama et la disponibilité du modèle. Returns: True si Ollama répond et le modèle est disponible """ try: response = requests.get( f"{self.endpoint}/api/tags", timeout=5 ) if response.status_code == 200: models = response.json().get("models", []) model_names = [m["name"] for m in models] if self.model in model_names: return True logger.warning( "Modèle '%s' non trouvé dans Ollama. " "Modèles disponibles : %s", self.model, model_names, ) return False except Exception as exc: logger.warning( "Impossible de se connecter à Ollama sur %s : %s", self.endpoint, exc, ) return False