Files
rpa_vision_v3/core/execution/llm_actions.py
Dom 5e3865d328 feat: DAG executor async + intégration IA/LLM dans le VWB
- DAGExecutor : exécution workflow par graphe de dépendances,
  étapes LLM parallèles, UI séquentielles, injection ${step.result}
- LLMActionHandler : analyze_text, translate, extract_data, generate_text
  via Ollama /api/chat (qwen3-vl:8b, temperature 0.1)
- VWB palette : catégorie "IA / LLM" avec 4 actions draggables
- VWB propriétés : éditeurs pour chaque action LLM (modèle, prompt, langue)
- VWB endpoint : POST /api/v3/workflow/<id>/execute-dag
- 37 tests unitaires DAG executor (tous passent)
- Fix log spam cache workflows (info → debug)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-16 22:58:44 +01:00

432 lines
14 KiB
Python

"""
LLMActionHandler — Gestionnaire d'actions LLM pour les workflows DAG
Gère les appels LLM via l'API Ollama /api/chat pour les étapes de workflow.
Chaque action est un appel synchrone (bloquant) qui sera exécuté dans
le ThreadPool LLM du DAGExecutor.
Actions supportées :
- analyze_text : Analyser / résumer un texte
- translate : Traduire un texte vers une langue cible
- extract_data : Extraire des données structurées d'un texte
- generate_text : Générer du texte à partir d'un prompt
Auteur : Dom, Claude
Date : 16 mars 2026
"""
import json
import logging
from typing import Any, Dict, Optional
import requests
logger = logging.getLogger(__name__)
class LLMActionHandler:
"""Gestionnaire d'appels LLM pour les étapes de workflow.
Utilise l'API Ollama /api/chat (mode conversationnel) pour toutes
les interactions avec le modèle de langage.
Args:
ollama_endpoint: URL de l'API Ollama
model: Nom du modèle à utiliser
temperature: Température de génération par défaut
timeout: Timeout par appel en secondes
"""
def __init__(
self,
ollama_endpoint: str = "http://localhost:11434",
model: str = "qwen3-vl:8b",
temperature: float = 0.1,
timeout: int = 120,
):
self.endpoint = ollama_endpoint.rstrip("/")
self.model = model
self.temperature = temperature
self.timeout = timeout
# -----------------------------------------------------------------
# Dispatcher principal
# -----------------------------------------------------------------
def execute(self, action: Dict[str, Any], context: Dict[str, Any]) -> Any:
"""Dispatcher vers la bonne action LLM.
Args:
action: Dict contenant au minimum 'llm_action' (nom de l'action)
et les paramètres spécifiques à l'action
context: Contexte d'exécution (résultats précédents, step_id, etc.)
Returns:
Résultat de l'action (texte, dict, etc.)
Raises:
ValueError: Si l'action LLM est inconnue
RuntimeError: Si l'appel à Ollama échoue
"""
llm_action = action.get("llm_action", "")
dispatch = {
"analyze_text": self._dispatch_analyze,
"translate": self._dispatch_translate,
"extract_data": self._dispatch_extract,
"generate_text": self._dispatch_generate,
}
handler = dispatch.get(llm_action)
if handler is None:
raise ValueError(
f"Action LLM inconnue : '{llm_action}'. "
f"Actions supportées : {list(dispatch.keys())}"
)
return handler(action, context)
# -----------------------------------------------------------------
# Actions LLM
# -----------------------------------------------------------------
def analyze_text(
self,
text: str,
instruction: str = "Analyse et résume ce texte.",
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> str:
"""Analyser ou résumer un texte.
Args:
text: Texte à analyser
instruction: Instruction pour l'analyse
model: Modèle spécifique (sinon modèle par défaut)
temperature: Température spécifique
Returns:
Texte de l'analyse
"""
system_prompt = (
"Tu es un assistant d'analyse de texte. "
"Réponds de manière concise et structurée."
)
user_message = f"{instruction}\n\nTexte :\n{text}"
return self._chat(
system_prompt=system_prompt,
user_message=user_message,
model=model,
temperature=temperature,
)
def translate(
self,
text: str,
target_lang: str,
source_lang: Optional[str] = None,
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> str:
"""Traduire un texte vers une langue cible.
Args:
text: Texte à traduire
target_lang: Langue cible (ex: "français", "chinois", "english")
source_lang: Langue source (détection auto si None)
model: Modèle spécifique
temperature: Température spécifique
Returns:
Texte traduit
"""
system_prompt = (
"Tu es un traducteur professionnel. "
"Traduis le texte fidèlement, sans ajouter d'explication. "
"Retourne uniquement la traduction."
)
if source_lang:
user_message = (
f"Traduis le texte suivant du {source_lang} "
f"vers le {target_lang} :\n\n{text}"
)
else:
user_message = (
f"Traduis le texte suivant en {target_lang} :\n\n{text}"
)
return self._chat(
system_prompt=system_prompt,
user_message=user_message,
model=model,
temperature=temperature,
)
def extract_data(
self,
text: str,
schema: Dict[str, Any],
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> Dict[str, Any]:
"""Extraire des données structurées d'un texte.
Args:
text: Texte source
schema: Schéma des données à extraire (clés attendues + descriptions)
model: Modèle spécifique
temperature: Température spécifique
Returns:
Dict avec les données extraites
"""
schema_desc = json.dumps(schema, ensure_ascii=False, indent=2)
system_prompt = (
"Tu es un extracteur de données. "
"Extrais les informations demandées du texte et retourne "
"un JSON valide correspondant au schéma fourni. "
"Retourne UNIQUEMENT le JSON, sans explication."
)
user_message = (
f"Schéma attendu :\n{schema_desc}\n\n"
f"Texte source :\n{text}"
)
response = self._chat(
system_prompt=system_prompt,
user_message=user_message,
model=model,
temperature=temperature,
force_json=True,
)
# Parser le JSON de la réponse
try:
return json.loads(response)
except json.JSONDecodeError:
# Tenter d'extraire le JSON de la réponse
logger.warning(
"Réponse LLM non-JSON pour extract_data, tentative d'extraction"
)
return self._try_extract_json(response)
def generate_text(
self,
prompt: str,
context: str = "",
model: Optional[str] = None,
temperature: Optional[float] = None,
) -> str:
"""Générer du texte à partir d'un prompt.
Args:
prompt: Instruction de génération
context: Contexte additionnel
model: Modèle spécifique
temperature: Température spécifique
Returns:
Texte généré
"""
system_prompt = (
"Tu es un assistant de rédaction. "
"Génère le contenu demandé de manière claire et précise."
)
user_message = prompt
if context:
user_message = f"Contexte :\n{context}\n\nInstruction :\n{prompt}"
return self._chat(
system_prompt=system_prompt,
user_message=user_message,
model=model,
temperature=temperature,
)
# -----------------------------------------------------------------
# Dispatchers internes (adaptent les paramètres d'action)
# -----------------------------------------------------------------
def _dispatch_analyze(
self, action: Dict[str, Any], context: Dict[str, Any]
) -> str:
"""Dispatche une action analyze_text depuis les paramètres du workflow."""
return self.analyze_text(
text=action.get("text", ""),
instruction=action.get("instruction", "Analyse et résume ce texte."),
model=action.get("model"),
temperature=action.get("temperature"),
)
def _dispatch_translate(
self, action: Dict[str, Any], context: Dict[str, Any]
) -> str:
"""Dispatche une action translate depuis les paramètres du workflow."""
return self.translate(
text=action.get("text", ""),
target_lang=action.get("target_lang", "français"),
source_lang=action.get("source_lang"),
model=action.get("model"),
temperature=action.get("temperature"),
)
def _dispatch_extract(
self, action: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]:
"""Dispatche une action extract_data depuis les paramètres du workflow."""
return self.extract_data(
text=action.get("text", ""),
schema=action.get("schema", {}),
model=action.get("model"),
temperature=action.get("temperature"),
)
def _dispatch_generate(
self, action: Dict[str, Any], context: Dict[str, Any]
) -> str:
"""Dispatche une action generate_text depuis les paramètres du workflow."""
return self.generate_text(
prompt=action.get("prompt", ""),
context=action.get("context", ""),
model=action.get("model"),
temperature=action.get("temperature"),
)
# -----------------------------------------------------------------
# Communication avec Ollama via /api/chat
# -----------------------------------------------------------------
def _chat(
self,
system_prompt: str,
user_message: str,
model: Optional[str] = None,
temperature: Optional[float] = None,
force_json: bool = False,
) -> str:
"""Appel à l'API /api/chat d'Ollama.
Args:
system_prompt: Message système
user_message: Message utilisateur
model: Modèle (défaut: self.model)
temperature: Température (défaut: self.temperature)
force_json: Forcer la sortie JSON
Returns:
Contenu de la réponse du modèle
Raises:
RuntimeError: Si l'appel échoue
"""
effective_model = model or self.model
effective_temp = temperature if temperature is not None else self.temperature
# Pour Qwen3, désactiver le mode thinking pour des réponses directes
effective_user_message = user_message
if "qwen" in effective_model.lower():
effective_user_message = f"/nothink {user_message}"
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": effective_user_message},
]
payload = {
"model": effective_model,
"messages": messages,
"stream": False,
"options": {
"temperature": effective_temp,
},
}
if force_json:
payload["format"] = "json"
url = f"{self.endpoint}/api/chat"
try:
response = requests.post(
url,
json=payload,
timeout=self.timeout,
)
if response.status_code != 200:
raise RuntimeError(
f"Ollama /api/chat a retourné HTTP {response.status_code} : "
f"{response.text[:500]}"
)
data = response.json()
content = data.get("message", {}).get("content", "")
if not content:
raise RuntimeError(
"Ollama a retourné une réponse vide"
)
return content
except requests.exceptions.Timeout:
raise RuntimeError(
f"Timeout de {self.timeout}s dépassé pour l'appel LLM "
f"(modèle: {effective_model})"
)
except requests.exceptions.ConnectionError:
raise RuntimeError(
f"Impossible de se connecter à Ollama sur {self.endpoint}. "
f"Vérifiez que le service est lancé."
)
# -----------------------------------------------------------------
# Utilitaires
# -----------------------------------------------------------------
@staticmethod
def _try_extract_json(text: str) -> Dict[str, Any]:
"""Tente d'extraire un objet JSON d'un texte libre.
Cherche le premier { et le dernier } pour isoler le JSON.
"""
start = text.find("{")
end = text.rfind("}")
if start != -1 and end != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
logger.error("Impossible d'extraire du JSON de la réponse LLM")
return {"raw_response": text, "_parse_error": True}
def check_connection(self) -> bool:
"""Vérifie la connexion à Ollama et la disponibilité du modèle.
Returns:
True si Ollama répond et le modèle est disponible
"""
try:
response = requests.get(
f"{self.endpoint}/api/tags", timeout=5
)
if response.status_code == 200:
models = response.json().get("models", [])
model_names = [m["name"] for m in models]
if self.model in model_names:
return True
logger.warning(
"Modèle '%s' non trouvé dans Ollama. "
"Modèles disponibles : %s",
self.model,
model_names,
)
return False
except Exception as exc:
logger.warning(
"Impossible de se connecter à Ollama sur %s : %s",
self.endpoint,
exc,
)
return False