From 148321dffd4199f483c00fd14330a55445b0ea1a Mon Sep 17 00:00:00 2001 From: Dom Date: Sat, 14 Mar 2026 11:23:33 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20WorkflowRunner,=20matching=20s=C3=A9man?= =?UTF-8?q?tique=20et=20replay=20distant=20(P0-4,=20P0-6,=20P0-7)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0-4: WorkflowRunner — orchestrateur de replay intelligent - Boucle capture → match FAISS → résolution sémantique → exécution - Mode dry_run, substitution de variables, anti-boucle (max 200 steps) - Découplé de pyautogui via executor_callback P0-6: Unification des répertoires workflows - SemanticMatcher scanne data/workflows/ + data/training/workflows/ - Auto-reload sur changement de répertoire (60s) P0-7: Matching sémantique via Ollama - Pré-filtrage Jaccard + re-ranking LLM (qwen2.5:7b) - Score final : 40% Jaccard + 60% LLM, fallback si Ollama indisponible Agent Chat: exécution distante via streaming server - POST http://localhost:5005/api/v1/traces/stream/replay - Fallback sur exécution locale si serveur indisponible Co-Authored-By: Claude Opus 4.6 --- agent_chat/app.py | 147 ++++- core/execution/__init__.py | 9 +- core/execution/workflow_runner.py | 920 ++++++++++++++++++++++++++++++ core/workflow/semantic_matcher.py | 683 +++++++++++++++++----- 4 files changed, 1615 insertions(+), 144 deletions(-) create mode 100644 core/execution/workflow_runner.py diff --git a/agent_chat/app.py b/agent_chat/app.py index eb599a0cf..52a9f2a5d 100644 --- a/agent_chat/app.py +++ b/agent_chat/app.py @@ -14,7 +14,7 @@ Composants intégrés: Usage: python agent_chat/app.py -Puis ouvrir: http://localhost:5002 +Puis ouvrir: http://localhost:5004 Auteur: Dom - Janvier 2026 """ @@ -28,6 +28,8 @@ from pathlib import Path from datetime import datetime from typing import Dict, Any, List, Optional +import requests as http_requests # Pour les appels au streaming server + from flask import Flask, render_template, request, jsonify from flask_socketio import SocketIO, emit @@ -83,6 +85,11 @@ action_executor = None execution_loop = None screen_capturer = None +# URL du streaming server (Agent V1) pour l'exécution distante +STREAMING_SERVER_URL = os.environ.get( + "RPA_STREAMING_URL", "http://localhost:5005" +) + execution_status = { "running": False, "workflow": None, @@ -99,10 +106,22 @@ def init_system(): global intent_parser, confirmation_loop, response_generator, conversation_manager global autonomous_planner - # 1. SemanticMatcher + # 1. SemanticMatcher — multi-répertoires (P0-6) + matching LLM (P0-7) + # Scan data/workflows/ + data/training/workflows/ + data/training/live_sessions/workflows/ try: - matcher = SemanticMatcher("data/workflows") - logger.info(f"✓ SemanticMatcher: {len(matcher.get_all_workflows())} workflows") + matcher = SemanticMatcher( + workflows_dir=None, # None = scan tous les répertoires par défaut + use_llm=True, # Matching sémantique via Ollama (P0-7) + llm_model="qwen2.5:7b", + ) + dirs_info = matcher.get_directories() + dirs_summary = ", ".join( + f"{d['path']}({d['workflow_count']})" for d in dirs_info if d['exists'] + ) + logger.info( + f"✓ SemanticMatcher: {len(matcher.get_all_workflows())} workflows " + f"[{dirs_summary}]" + ) except Exception as e: logger.error(f"✗ SemanticMatcher: {e}") matcher = None @@ -267,20 +286,55 @@ def api_status(): @app.route('/api/workflows') def api_workflows(): - """Liste des workflows.""" + """Liste des workflows (tous répertoires confondus).""" if not matcher: - return jsonify({"workflows": []}) - + return jsonify({"workflows": [], "directories": []}) + workflows = [] for wf in matcher.get_all_workflows(): workflows.append({ "id": wf.workflow_id, "name": wf.name, "description": wf.description, - "tags": wf.tags + "tags": wf.tags, + "source": wf.source_dir, }) - - return jsonify({"workflows": workflows}) + + return jsonify({ + "workflows": workflows, + "directories": matcher.get_directories(), + }) + + +@app.route('/api/workflows/refresh', methods=['POST']) +def api_workflows_refresh(): + """ + Forcer le rechargement des workflows depuis tous les répertoires. + + Utile après qu'un nouveau workflow a été appris par le StreamProcessor. + """ + if not matcher: + return jsonify({"success": False, "error": "SemanticMatcher non initialisé"}) + + try: + count = matcher.reload_workflows() + + # Re-injecter les workflows dans l'intent_parser (contexte LLM) + if intent_parser: + workflows_for_llm = [ + {"name": wf.name, "description": wf.description, "tags": wf.tags} + for wf in matcher.get_all_workflows() + ] + intent_parser.set_workflows(workflows_for_llm) + + return jsonify({ + "success": True, + "workflows_count": count, + "directories": matcher.get_directories(), + }) + except Exception as e: + logger.error(f"Erreur rechargement workflows: {e}") + return jsonify({"success": False, "error": str(e)}) @app.route('/api/search', methods=['POST']) @@ -893,12 +947,79 @@ def handle_cancel(): # Exécution de workflow # ============================================================================= +def _try_streaming_server_replay(workflow_id: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Tenter d'exécuter un workflow via le streaming server (Agent V1). + + POST http://localhost:5005/api/v1/traces/stream/replay + avec workflow_id et params. + + Returns: + Réponse du serveur si succès, None si indisponible ou erreur. + """ + try: + resp = http_requests.post( + f"{STREAMING_SERVER_URL}/api/v1/traces/stream/replay", + json={ + "workflow_id": workflow_id, + "session_id": f"chat_{datetime.now().strftime('%H%M%S')}", + "params": params or {}, + }, + timeout=5, + ) + if resp.status_code == 200: + data = resp.json() + logger.info(f"Workflow {workflow_id} envoyé au streaming server: {data}") + return data + else: + logger.debug( + f"Streaming server refus (HTTP {resp.status_code}): " + f"{resp.text[:200]}" + ) + except http_requests.ConnectionError: + logger.debug("Streaming server non disponible (connexion refusée)") + except http_requests.Timeout: + logger.debug("Streaming server timeout") + except Exception as e: + logger.debug(f"Erreur streaming server: {e}") + + return None + + def execute_workflow(match, params): - """Exécuter un workflow avec le vrai système d'exécution.""" + """ + Exécuter un workflow — tente d'abord le streaming server, + puis fallback sur l'exécution locale. + """ global execution_status import time + # Tenter l'exécution via le streaming server (Agent V1 distant) + replay_result = _try_streaming_server_replay(match.workflow_id, params) + if replay_result: + # Le streaming server a accepté le replay + execution_status["running"] = True + execution_status["workflow"] = match.workflow_name + execution_status["progress"] = 50 + execution_status["message"] = "Envoyé au streaming server (Agent V1)" + + socketio.emit('execution_progress', { + "progress": 50, + "step": "Exécution via streaming server...", + "current": 1, + "total": 1, + }) + + finish_execution( + match.workflow_name, True, + f"Workflow envoyé au streaming server ({replay_result.get('status', 'ok')})" + ) + return + + # Fallback : exécution locale + logger.info("Streaming server indisponible, exécution locale") + try: # Charger le workflow with open(match.workflow_path, 'r') as f: @@ -1257,10 +1378,10 @@ if __name__ == '__main__': ╔════════════════════════════════════════════════════════════╗ ║ RPA Vision V3 - Interface de Commande ║ ║ ║ -║ 🌐 http://localhost:5002 ║ +║ 🌐 http://localhost:5004 ║ ║ ║ ║ Ctrl+C pour arrêter ║ ╚════════════════════════════════════════════════════════════╝ """) - socketio.run(app, host='127.0.0.1', port=5002, debug=False, allow_unsafe_werkzeug=True) + socketio.run(app, host='127.0.0.1', port=5004, debug=False, allow_unsafe_werkzeug=True) diff --git a/core/execution/__init__.py b/core/execution/__init__.py index ceb70af96..e01b04d10 100644 --- a/core/execution/__init__.py +++ b/core/execution/__init__.py @@ -7,6 +7,7 @@ Provides classes for executing workflow actions automatically. from .action_executor import ActionExecutor from .target_resolver import TargetResolver, ResolvedTarget from .error_handler import ErrorHandler, ErrorType, RecoveryStrategy +from .workflow_runner import WorkflowRunner, RunResult, RunStatus, RunnerConfig # Import tardif pour éviter import circulaire avec pipeline def _get_execution_loop(): @@ -14,11 +15,15 @@ def _get_execution_loop(): return ExecutionLoop, ExecutionMode, ExecutionState, create_execution_loop __all__ = [ - 'ActionExecutor', - 'TargetResolver', + 'ActionExecutor', + 'TargetResolver', 'ResolvedTarget', 'ErrorHandler', 'ErrorType', 'RecoveryStrategy', + 'WorkflowRunner', + 'RunResult', + 'RunStatus', + 'RunnerConfig', # ExecutionLoop accessible via import direct du module ] diff --git a/core/execution/workflow_runner.py b/core/execution/workflow_runner.py new file mode 100644 index 000000000..45d7680fa --- /dev/null +++ b/core/execution/workflow_runner.py @@ -0,0 +1,920 @@ +""" +WorkflowRunner — Orchestrateur de replay de workflows appris + +Exécute un Workflow du début à la fin en utilisant la compréhension sémantique +de l'UI plutôt que de simples coordonnées X/Y. + +Boucle principale : + 1. Capturer l'écran + 2. Analyser via ScreenAnalyzer → ScreenState + 3. Matcher l'état courant contre les noeuds du workflow (CLIP/FAISS) + 4. Choisir l'edge sortant et résoudre la cible sémantiquement + 5. Exécuter l'action via le callback (local ou distant) + 6. Attendre la stabilisation de l'écran + 7. Vérifier qu'on a atteint le noeud suivant + 8. Boucler jusqu'au noeud END + +Le Runner est découplé de l'exécution physique : il utilise un executor_callback +qui peut être pyautogui (local) ou une commande API (distant). + +Auteur : Dom, Alice Kiro +Date : 14 mars 2026 +""" + +import hashlib +import logging +import re +import time +from dataclasses import dataclass, field +from enum import Enum +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional + +import numpy as np +from PIL import Image + +from ..models.workflow_graph import Action, Workflow, WorkflowEdge, WorkflowNode + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Résultat d'exécution +# ============================================================================= + +class RunStatus(str, Enum): + """Statut final d'une exécution de workflow.""" + SUCCESS = "success" + FAILED = "failed" + TIMEOUT = "timeout" + ABORTED = "aborted" + DRY_RUN = "dry_run" + + +@dataclass +class StepResult: + """Résultat d'une étape individuelle du workflow.""" + node_id: str + edge_id: Optional[str] = None + action_type: Optional[str] = None + action_details: Optional[Dict[str, Any]] = None + match_confidence: float = 0.0 + duration_seconds: float = 0.0 + success: bool = True + error: Optional[str] = None + retries: int = 0 + + +@dataclass +class RunResult: + """Résultat complet d'une exécution de workflow.""" + success: bool + status: RunStatus = RunStatus.FAILED + nodes_visited: List[str] = field(default_factory=list) + actions_executed: List[Dict[str, Any]] = field(default_factory=list) + errors: List[str] = field(default_factory=list) + duration_seconds: float = 0.0 + steps: List[StepResult] = field(default_factory=list) + + def to_dict(self) -> Dict[str, Any]: + """Sérialiser le résultat.""" + return { + "success": self.success, + "status": self.status.value, + "nodes_visited": self.nodes_visited, + "actions_executed": self.actions_executed, + "errors": self.errors, + "duration_seconds": round(self.duration_seconds, 3), + "steps_count": len(self.steps), + "steps": [ + { + "node_id": s.node_id, + "edge_id": s.edge_id, + "action_type": s.action_type, + "match_confidence": round(s.match_confidence, 4), + "duration_seconds": round(s.duration_seconds, 3), + "success": s.success, + "error": s.error, + "retries": s.retries, + } + for s in self.steps + ], + } + + +# ============================================================================= +# Configuration du Runner +# ============================================================================= + +@dataclass +class RunnerConfig: + """Configuration de l'orchestrateur de replay.""" + # Seuils de matching + min_node_similarity: float = 0.75 + # Stabilisation écran + stabilization_frames: int = 3 + stabilization_interval: float = 0.3 # secondes entre chaque frame de vérification + stabilization_timeout: float = 10.0 # secondes max pour stabiliser + # Timeouts + node_match_timeout: float = 30.0 # secondes max pour matcher un noeud + action_timeout: float = 15.0 # secondes max pour exécuter une action + global_timeout: float = 300.0 # 5 minutes max pour le workflow entier + # Retries + max_retries_per_action: int = 3 + retry_delay: float = 1.0 # secondes entre retries + # Capture + capture_dir: str = "data/runner_captures" + # Sécurité + max_steps: int = 200 # limite anti-boucle infinie + + +# ============================================================================= +# WorkflowRunner +# ============================================================================= + +class WorkflowRunner: + """ + Orchestre l'exécution complète d'un workflow appris. + + Utilise la compréhension sémantique de l'UI (CLIP embeddings + FAISS) + plutôt que de simples coordonnées pour naviguer entre les états. + + Args: + workflow: Le workflow à exécuter + screen_analyzer: ScreenAnalyzer pour analyser les captures d'écran + clip_embedder: CLIPEmbedder pour générer les embeddings d'image + faiss_manager: FAISSManager avec les prototypes du workflow indexés + executor_callback: Fonction qui exécute une action physiquement. + Signature : callback(action_dict: dict) -> bool + L'action_dict contient : type, target, parameters, resolved_position + Retourne True si succès, False sinon. + capture_callback: Fonction optionnelle de capture d'écran. + Signature : callback() -> Optional[PIL.Image] + Si None, utilise ScreenCapturer par défaut. + config: Configuration du runner (optionnel) + """ + + def __init__( + self, + workflow: Workflow, + screen_analyzer, + clip_embedder, + faiss_manager, + executor_callback: Callable[[Dict[str, Any]], bool], + capture_callback: Optional[Callable[[], Optional[Image.Image]]] = None, + config: Optional[RunnerConfig] = None, + ): + self.workflow = workflow + self.screen_analyzer = screen_analyzer + self.clip_embedder = clip_embedder + self.faiss_manager = faiss_manager + self.executor_callback = executor_callback + self.capture_callback = capture_callback + self.config = config or RunnerConfig() + + # État interne + self._current_node_id: Optional[str] = None + self._aborted = False + self._last_screen_hash: Optional[str] = None + + # Répertoire de captures temporaires + Path(self.config.capture_dir).mkdir(parents=True, exist_ok=True) + + # Index des embeddings de noeuds pour matching rapide + # Sera construit lors du premier run si le faiss_manager est vide + self._node_embeddings_indexed = False + + # ========================================================================= + # API publique + # ========================================================================= + + def run(self, params: Optional[Dict[str, Any]] = None, dry_run: bool = False) -> RunResult: + """ + Boucle principale de replay. + + Args: + params: Paramètres de substitution (ex: {{nom}} → "Dupont") + dry_run: Si True, ne pas exécuter les actions, juste simuler + + Returns: + RunResult avec le détail de l'exécution + """ + params = params or {} + start_time = time.time() + result = RunResult(success=False, status=RunStatus.FAILED) + + # Validation du workflow + validation_error = self._validate_workflow() + if validation_error: + result.errors.append(validation_error) + result.duration_seconds = time.time() - start_time + return result + + # Noeud de départ + start_node_id = self.workflow.entry_nodes[0] + self._current_node_id = start_node_id + result.nodes_visited.append(start_node_id) + + logger.info( + f"Démarrage du workflow '{self.workflow.name}' " + f"(ID: {self.workflow.workflow_id}) depuis le noeud {start_node_id} " + f"{'[DRY RUN]' if dry_run else ''}" + ) + + step_count = 0 + + try: + while not self._aborted: + elapsed = time.time() - start_time + + # Vérification timeout global + if elapsed > self.config.global_timeout: + msg = ( + f"Timeout global atteint ({self.config.global_timeout}s). " + f"Dernier noeud : {self._current_node_id}" + ) + logger.error(msg) + result.errors.append(msg) + result.status = RunStatus.TIMEOUT + break + + # Protection anti-boucle infinie + step_count += 1 + if step_count > self.config.max_steps: + msg = f"Limite de {self.config.max_steps} étapes atteinte — abandon" + logger.error(msg) + result.errors.append(msg) + result.status = RunStatus.FAILED + break + + # Récupérer le noeud courant + current_node = self.workflow.get_node(self._current_node_id) + if current_node is None: + msg = f"Noeud introuvable : {self._current_node_id}" + logger.error(msg) + result.errors.append(msg) + break + + # Vérifier si on a atteint un noeud END + if current_node.is_end or self._current_node_id in self.workflow.end_nodes: + logger.info(f"Noeud END atteint : {self._current_node_id}") + result.success = True + result.status = RunStatus.DRY_RUN if dry_run else RunStatus.SUCCESS + break + + # Récupérer les edges sortants + outgoing_edges = self.workflow.get_outgoing_edges(self._current_node_id) + if not outgoing_edges: + # Pas d'edge sortant = fin implicite + logger.info( + f"Aucun edge sortant pour {self._current_node_id} — fin du workflow" + ) + result.success = True + result.status = RunStatus.DRY_RUN if dry_run else RunStatus.SUCCESS + break + + # Choisir l'edge à exécuter + edge = self._select_edge(outgoing_edges, current_node) + if edge is None: + msg = ( + f"Aucun edge exécutable depuis {self._current_node_id} " + f"({len(outgoing_edges)} edges disponibles)" + ) + logger.error(msg) + result.errors.append(msg) + break + + # Préparer l'action + action = edge.action + action_dict = self._build_action_dict(action, params) + + step_start = time.time() + step = StepResult( + node_id=self._current_node_id, + edge_id=edge.edge_id, + action_type=action.type, + action_details=action_dict, + ) + + if dry_run: + # Mode simulation : ne pas exécuter, juste enregistrer + logger.info( + f"[DRY RUN] Étape {step_count}: {self._current_node_id} " + f"→ {edge.to_node} via {action.type}" + ) + step.success = True + step.duration_seconds = time.time() - step_start + result.steps.append(step) + result.actions_executed.append(action_dict) + + # Avancer directement au noeud suivant + self._current_node_id = edge.to_node + result.nodes_visited.append(edge.to_node) + continue + + # --- Exécution réelle --- + + # Étape 1 : Capturer et vérifier l'état courant + screen_state, screen_image = self._capture_and_analyze() + if screen_state is None: + msg = "Échec de capture/analyse de l'écran" + logger.error(msg) + result.errors.append(msg) + step.success = False + step.error = msg + result.steps.append(step) + break + + # Étape 2 : Matcher l'état courant pour confirmer le noeud + matched_node_id, confidence = self._match_current_state(screen_image) + step.match_confidence = confidence + + if matched_node_id and matched_node_id != self._current_node_id: + logger.warning( + f"État écran correspond au noeud {matched_node_id} " + f"(attendu: {self._current_node_id}, confiance: {confidence:.3f})" + ) + # On continue quand même si la confiance est suffisante + # Le workflow pourrait avoir légèrement dévié + + # Étape 3 : Résoudre la cible de l'action + resolved_action = self._resolve_action_target(action, screen_state, action_dict) + + # Étape 4 : Exécuter avec retries + action_success = False + for attempt in range(1, self.config.max_retries_per_action + 1): + step.retries = attempt - 1 + try: + action_success = self.executor_callback(resolved_action) + if action_success: + logger.info( + f"Action exécutée ({action.type}) sur {self._current_node_id} " + f"→ {edge.to_node} (tentative {attempt})" + ) + break + else: + logger.warning( + f"Action échouée (tentative {attempt}/{self.config.max_retries_per_action})" + ) + except Exception as e: + logger.warning( + f"Exception lors de l'exécution (tentative {attempt}): {e}" + ) + action_success = False + + if attempt < self.config.max_retries_per_action: + time.sleep(self.config.retry_delay) + + if not action_success: + msg = ( + f"Action échouée après {self.config.max_retries_per_action} tentatives " + f"sur {self._current_node_id}" + ) + logger.error(msg) + result.errors.append(msg) + step.success = False + step.error = msg + step.duration_seconds = time.time() - step_start + result.steps.append(step) + break + + # Étape 5 : Attendre la stabilisation de l'écran + stabilized = self._wait_for_stabilization( + timeout=self.config.stabilization_timeout + ) + if not stabilized: + logger.warning( + "Écran non stabilisé dans le délai imparti — on continue quand même" + ) + + # Étape 6 : Vérifier qu'on a atteint le noeud suivant + next_node_id = edge.to_node + verified = self._verify_transition(next_node_id) + if not verified: + logger.warning( + f"Transition vers {next_node_id} non confirmée visuellement — " + f"on fait confiance au workflow" + ) + + # Enregistrer le résultat de l'étape + step.success = True + step.duration_seconds = time.time() - step_start + result.steps.append(step) + result.actions_executed.append(resolved_action) + + # Avancer au noeud suivant + self._current_node_id = next_node_id + result.nodes_visited.append(next_node_id) + + except Exception as e: + msg = f"Erreur inattendue dans le runner : {e}" + logger.exception(msg) + result.errors.append(msg) + + result.duration_seconds = time.time() - start_time + + if self._aborted: + result.status = RunStatus.ABORTED + result.errors.append("Exécution interrompue par l'utilisateur") + + logger.info( + f"Workflow terminé : {result.status.value} " + f"({len(result.nodes_visited)} noeuds, " + f"{len(result.actions_executed)} actions, " + f"{result.duration_seconds:.1f}s)" + ) + + return result + + def abort(self) -> None: + """Interrompre l'exécution en cours (thread-safe).""" + logger.info("Demande d'interruption du workflow") + self._aborted = True + + # ========================================================================= + # Capture et analyse d'écran + # ========================================================================= + + def _capture_screen(self) -> Optional[Image.Image]: + """ + Capturer l'écran actuel. + + Utilise le capture_callback si fourni, sinon crée un ScreenCapturer. + """ + if self.capture_callback: + try: + return self.capture_callback() + except Exception as e: + logger.error(f"Erreur capture callback : {e}") + return None + + # Fallback : ScreenCapturer + try: + from ..capture.screen_capturer import ScreenCapturer + capturer = ScreenCapturer() + return capturer.capture_screen() + except Exception as e: + logger.error(f"Erreur ScreenCapturer : {e}") + return None + + def _capture_and_analyze(self): + """ + Capturer l'écran et l'analyser. + + Returns: + (ScreenState, PIL.Image) ou (None, None) + """ + image = self._capture_screen() + if image is None: + return None, None + + try: + # Sauvegarder temporairement pour l'analyse + capture_path = Path(self.config.capture_dir) / f"capture_{int(time.time() * 1000)}.png" + image.save(str(capture_path)) + + # Analyser via ScreenAnalyzer + screen_state = self.screen_analyzer.analyze(str(capture_path)) + + return screen_state, image + except Exception as e: + logger.error(f"Erreur analyse écran : {e}") + return None, None + + # ========================================================================= + # Matching d'état (CLIP + FAISS) + # ========================================================================= + + def _match_current_state(self, screen_image: Image.Image) -> tuple: + """ + Matcher le screenshot actuel contre les noeuds du workflow. + + Utilise l'embedding CLIP de l'image et cherche dans l'index FAISS + les prototypes de noeuds les plus similaires. + + Args: + screen_image: Image PIL du screenshot actuel + + Returns: + (node_id: str ou None, confidence: float) + """ + try: + # Générer l'embedding CLIP de l'écran actuel + embedding = self.clip_embedder.embed_image(screen_image) + + # Chercher dans FAISS les prototypes les plus proches + results = self.faiss_manager.search_similar( + query_vector=embedding, + k=3, + min_similarity=self.config.min_node_similarity, + ) + + if not results: + logger.debug("Aucun noeud matché au-dessus du seuil de similarité") + return None, 0.0 + + # Le meilleur résultat + best = results[0] + node_id = best.metadata.get("node_id") if best.metadata else best.embedding_id + confidence = best.similarity + + logger.debug( + f"Meilleur match : noeud={node_id}, " + f"similarité={confidence:.4f}" + ) + + return node_id, confidence + + except Exception as e: + logger.error(f"Erreur matching état : {e}") + return None, 0.0 + + # ========================================================================= + # Sélection d'edge + # ========================================================================= + + def _select_edge( + self, + edges: List[WorkflowEdge], + current_node: WorkflowNode, + ) -> Optional[WorkflowEdge]: + """ + Choisir l'edge à exécuter parmi les edges sortants. + + Stratégie : + 1. S'il n'y a qu'un seul edge → le prendre + 2. Sinon, prendre celui avec le meilleur taux de succès + 3. Vérifier les pre-conditions de chaque edge + + Args: + edges: Liste des edges sortants du noeud courant + current_node: Le noeud courant + + Returns: + L'edge sélectionné ou None + """ + if len(edges) == 1: + return edges[0] + + # Filtrer les edges dont les pre-conditions sont satisfaites + eligible = [] + for edge in edges: + can_exec, reason = edge.can_execute(current_node) + if can_exec: + eligible.append(edge) + else: + logger.debug(f"Edge {edge.edge_id} non éligible : {reason}") + + if not eligible: + logger.warning("Aucun edge éligible trouvé") + return None + + if len(eligible) == 1: + return eligible[0] + + # Trier par taux de succès décroissant + eligible.sort(key=lambda e: e.stats.success_rate, reverse=True) + + logger.info( + f"Sélection parmi {len(eligible)} edges — " + f"choisi : {eligible[0].edge_id} " + f"(succès: {eligible[0].stats.success_rate:.0%})" + ) + + return eligible[0] + + # ========================================================================= + # Résolution de cible + # ========================================================================= + + def _resolve_action_target( + self, + action: Action, + screen_state, + action_dict: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Résoudre la cible de l'action en utilisant le ScreenState. + + Essaye la résolution sémantique (par texte, rôle, similarité visuelle). + En fallback, utilise les coordonnées brutes si disponibles. + + Args: + action: L'action du workflow + screen_state: Le ScreenState analysé + action_dict: Le dict d'action déjà construit + + Returns: + action_dict enrichi avec resolved_position + """ + target = action.target + resolved = dict(action_dict) + + # Stratégie 1 : Résolution par texte + if target.by_text and hasattr(screen_state, "ui_elements"): + for elem in screen_state.ui_elements: + elem_text = getattr(elem, "text", "") or "" + if target.by_text.lower() in elem_text.lower(): + bbox = getattr(elem, "bbox", None) + if bbox: + cx, cy = self._bbox_center(bbox) + resolved["resolved_position"] = {"x": cx, "y": cy} + resolved["resolution_method"] = "by_text" + logger.debug( + f"Cible résolue par texte '{target.by_text}' → ({cx}, {cy})" + ) + return resolved + + # Stratégie 2 : Résolution par rôle + if target.by_role and hasattr(screen_state, "ui_elements"): + candidates = [ + elem + for elem in screen_state.ui_elements + if getattr(elem, "role", None) == target.by_role + or getattr(elem, "type", None) == target.by_role + ] + if candidates: + # Appliquer la politique de sélection + elem = self._apply_selection_policy(candidates, target) + if elem: + bbox = getattr(elem, "bbox", None) + if bbox: + cx, cy = self._bbox_center(bbox) + resolved["resolved_position"] = {"x": cx, "y": cy} + resolved["resolution_method"] = "by_role" + logger.debug( + f"Cible résolue par rôle '{target.by_role}' → ({cx}, {cy})" + ) + return resolved + + # Stratégie 3 : Coordonnées brutes (fallback) + if target.by_position: + resolved["resolved_position"] = { + "x": target.by_position[0], + "y": target.by_position[1], + } + resolved["resolution_method"] = "by_position" + logger.debug( + f"Cible résolue par position brute → {target.by_position}" + ) + return resolved + + # Aucune résolution possible — on laisse le callback gérer + resolved["resolution_method"] = "unresolved" + logger.warning( + f"Cible non résolue pour action {action.type} — " + f"le callback devra se débrouiller" + ) + return resolved + + def _apply_selection_policy(self, candidates: list, target) -> Optional[Any]: + """ + Appliquer la politique de sélection sur les candidats. + + Args: + candidates: Liste d'éléments UI candidats + target: TargetSpec avec la politique + + Returns: + L'élément sélectionné ou None + """ + policy = getattr(target, "selection_policy", "first") + + if policy == "last": + return candidates[-1] if candidates else None + elif policy == "by_position": + # Trier par position (haut-gauche d'abord) + candidates.sort( + key=lambda e: ( + getattr(getattr(e, "bbox", None), "y", 0), + getattr(getattr(e, "bbox", None), "x", 0), + ) + ) + return candidates[0] if candidates else None + else: + # "first" par défaut + return candidates[0] if candidates else None + + # ========================================================================= + # Stabilisation de l'écran + # ========================================================================= + + def _wait_for_stabilization(self, timeout: float = 10.0) -> bool: + """ + Attendre que l'écran se stabilise. + + L'écran est considéré stable quand le hash de l'image reste identique + sur N captures consécutives. + + Args: + timeout: Délai maximum en secondes + + Returns: + True si l'écran s'est stabilisé, False si timeout + """ + start_time = time.time() + consecutive_same = 0 + last_hash = None + + while (time.time() - start_time) < timeout: + image = self._capture_screen() + if image is None: + time.sleep(self.config.stabilization_interval) + continue + + current_hash = self._compute_image_hash(image) + + if current_hash == last_hash: + consecutive_same += 1 + if consecutive_same >= self.config.stabilization_frames: + elapsed = time.time() - start_time + logger.debug( + f"Écran stabilisé après {elapsed:.1f}s " + f"({consecutive_same} frames identiques)" + ) + self._last_screen_hash = current_hash + return True + else: + consecutive_same = 1 + last_hash = current_hash + + time.sleep(self.config.stabilization_interval) + + logger.warning(f"Stabilisation échouée après {timeout}s") + return False + + # ========================================================================= + # Vérification de transition + # ========================================================================= + + def _verify_transition(self, expected_node_id: str) -> bool: + """ + Vérifier qu'on a bien atteint le noeud attendu après une action. + + Capture l'écran, génère un embedding CLIP, et vérifie la similarité + avec le prototype du noeud attendu. + + Args: + expected_node_id: ID du noeud qu'on devrait avoir atteint + + Returns: + True si le noeud est confirmé + """ + image = self._capture_screen() + if image is None: + return False + + matched_node_id, confidence = self._match_current_state(image) + + if matched_node_id == expected_node_id and confidence >= self.config.min_node_similarity: + logger.debug( + f"Transition confirmée vers {expected_node_id} " + f"(confiance: {confidence:.3f})" + ) + return True + + if confidence >= self.config.min_node_similarity: + logger.warning( + f"Transition : noeud matché = {matched_node_id} " + f"(attendu: {expected_node_id}, confiance: {confidence:.3f})" + ) + else: + logger.warning( + f"Transition non confirmée vers {expected_node_id} " + f"(meilleur match: {matched_node_id}, confiance: {confidence:.3f})" + ) + + return False + + # ========================================================================= + # Construction du dict d'action + # ========================================================================= + + def _build_action_dict( + self, + action: Action, + params: Dict[str, Any], + ) -> Dict[str, Any]: + """ + Construire le dictionnaire d'action à passer au callback. + + Effectue la substitution des paramètres {{variable}} dans les + valeurs de l'action. + + Args: + action: L'action du workflow + params: Paramètres de substitution + + Returns: + Dict prêt pour le callback + """ + action_dict = { + "type": action.type, + "target": action.target.to_dict(), + "parameters": self._substitute_params(action.parameters, params), + } + return action_dict + + def _substitute_params( + self, + data: Any, + params: Dict[str, Any], + ) -> Any: + """ + Remplacer les variables {{param}} dans les données de l'action. + + Supporte la substitution récursive dans les dicts et listes. + + Args: + data: Données à traiter (str, dict, list, ou autre) + params: Dictionnaire de paramètres + + Returns: + Données avec les variables substituées + """ + if isinstance(data, str): + # Remplacer {{variable}} par la valeur correspondante + def replacer(match): + var_name = match.group(1).strip() + return str(params.get(var_name, match.group(0))) + + return re.sub(r"\{\{(.+?)\}\}", replacer, data) + + elif isinstance(data, dict): + return {k: self._substitute_params(v, params) for k, v in data.items()} + + elif isinstance(data, list): + return [self._substitute_params(item, params) for item in data] + + return data + + # ========================================================================= + # Utilitaires + # ========================================================================= + + def _validate_workflow(self) -> Optional[str]: + """ + Valider le workflow avant exécution. + + Returns: + Message d'erreur si invalide, None si OK + """ + if not self.workflow.entry_nodes: + return "Le workflow n'a pas de noeud d'entrée (entry_nodes vide)" + + start_id = self.workflow.entry_nodes[0] + start_node = self.workflow.get_node(start_id) + if start_node is None: + return f"Le noeud d'entrée '{start_id}' n'existe pas dans le workflow" + + if not self.workflow.nodes: + return "Le workflow n'a aucun noeud" + + if not self.workflow.edges and not start_node.is_end: + return "Le workflow n'a aucun edge et le noeud d'entrée n'est pas un noeud END" + + return None + + @staticmethod + def _compute_image_hash(image: Image.Image) -> str: + """ + Calculer un hash rapide d'une image PIL pour détecter les changements. + + Sous-échantillonne l'image pour un hash rapide (même méthode que + ScreenCapturer._compute_hash). + + Args: + image: Image PIL + + Returns: + Hash MD5 de l'image sous-échantillonnée + """ + img_array = np.array(image) + # Sous-échantillonner comme ScreenCapturer + small = img_array[::20, ::20, :].tobytes() + return hashlib.md5(small).hexdigest() + + @staticmethod + def _bbox_center(bbox) -> tuple: + """ + Calculer le centre d'un bounding box. + + Supporte les formats (x, y, w, h) tuple ou objet avec attributs. + + Args: + bbox: Bounding box + + Returns: + (cx, cy) centre du bbox + """ + if hasattr(bbox, "to_tuple"): + x, y, w, h = bbox.to_tuple() + elif hasattr(bbox, "x") and hasattr(bbox, "width"): + x, y, w, h = bbox.x, bbox.y, bbox.width, bbox.height + elif isinstance(bbox, (list, tuple)) and len(bbox) >= 4: + x, y, w, h = bbox[0], bbox[1], bbox[2], bbox[3] + else: + logger.warning(f"Format bbox inconnu : {type(bbox)}") + return (0, 0) + + return (float(x + w / 2), float(y + h / 2)) diff --git a/core/workflow/semantic_matcher.py b/core/workflow/semantic_matcher.py index 1fe064869..452f86df6 100644 --- a/core/workflow/semantic_matcher.py +++ b/core/workflow/semantic_matcher.py @@ -4,18 +4,36 @@ Semantic Matcher - Matching sémantique des commandes en langage naturel Permet de : - Trouver le workflow correspondant à une commande en langage naturel - Extraire les paramètres de la commande -- Utiliser des embeddings pour le matching sémantique +- Matching multi-répertoires (workflows manuels + appris par streaming) +- Matching sémantique via LLM (Ollama) avec fallback Jaccard + +P0-6 : Unification des répertoires workflows +P0-7 : Matching sémantique via Ollama """ import re import logging -from typing import Dict, Any, List, Optional, Tuple +import time +import threading +from typing import Dict, Any, List, Optional, Tuple, Union from dataclasses import dataclass from pathlib import Path import json logger = logging.getLogger(__name__) +# Répertoires par défaut à scanner pour les workflows +DEFAULT_WORKFLOW_DIRS = [ + "data/workflows", # Workflows manuels / existants + "data/training/workflows", # Workflows appris par le StreamProcessor (défaut) + "data/training/live_sessions/workflows", # Workflows appris via api_stream (live_sessions data_dir) +] + +# Configuration Ollama par défaut +DEFAULT_OLLAMA_ENDPOINT = "http://localhost:11434" +DEFAULT_OLLAMA_MODEL = "qwen2.5:7b" +DEFAULT_LLM_TIMEOUT = 10 # secondes + @dataclass class WorkflowMatch: @@ -38,117 +56,261 @@ class WorkflowMetadata: keywords: List[str] param_patterns: List[str] # Patterns pour extraire les paramètres path: str + source_dir: str = "" # Répertoire source (pour debug/traçabilité) class SemanticMatcher: """ Matcher sémantique pour trouver des workflows depuis des commandes. - - Utilise plusieurs stratégies : - 1. Matching exact par nom/tags - 2. Matching par mots-clés - 3. Matching par embeddings (si disponible) + + Utilise plusieurs stratégies en cascade : + 1. Matching exact par nom/tags (rapide) + 2. Matching par mots-clés Jaccard (rapide) + 3. Matching sémantique via LLM Ollama (top-5 candidats, précis) 4. Extraction de paramètres - + + Supporte le scan multi-répertoires pour unifier : + - data/workflows/ (workflows manuels) + - data/training/workflows/ (workflows appris par streaming) + - data/training/live_sessions/workflows/ (workflows live_sessions) + + Auto-reload : détecte les nouveaux workflows via mtime (toutes les 60s). + Example: - >>> matcher = SemanticMatcher("data/workflows") + >>> matcher = SemanticMatcher() >>> result = matcher.find_workflow("facturer le client Acme") >>> print(result.workflow_name) # "Facturation Client" >>> print(result.extracted_params) # {"client": "Acme"} """ - + def __init__( self, - workflows_dir: str = "data/workflows", - use_embeddings: bool = True + workflows_dir: Union[str, List[str], None] = None, + use_embeddings: bool = True, + use_llm: bool = True, + llm_model: str = DEFAULT_OLLAMA_MODEL, + llm_endpoint: str = DEFAULT_OLLAMA_ENDPOINT, + llm_timeout: int = DEFAULT_LLM_TIMEOUT, + auto_reload_interval: int = 60, ): """ Initialiser le matcher. - + Args: - workflows_dir: Répertoire des workflows - use_embeddings: Utiliser les embeddings pour le matching + workflows_dir: Répertoire(s) des workflows. Si None, utilise DEFAULT_WORKFLOW_DIRS. + Peut être un str (un seul répertoire) ou une liste. + use_embeddings: Utiliser les embeddings pour le matching (compatibilité) + use_llm: Activer le matching sémantique via Ollama LLM + llm_model: Modèle Ollama à utiliser (défaut: qwen2.5:7b) + llm_endpoint: Endpoint Ollama (défaut: http://localhost:11434) + llm_timeout: Timeout pour les appels LLM en secondes + auto_reload_interval: Intervalle en secondes pour vérifier les nouveaux workflows (0 = désactivé) """ - self.workflows_dir = Path(workflows_dir) + # Gérer la rétro-compatibilité : un seul str → liste + if workflows_dir is None: + self._workflows_dirs = [Path(d) for d in DEFAULT_WORKFLOW_DIRS] + elif isinstance(workflows_dir, str): + self._workflows_dirs = [Path(workflows_dir)] + elif isinstance(workflows_dir, (list, tuple)): + self._workflows_dirs = [Path(d) for d in workflows_dir] + else: + self._workflows_dirs = [Path(workflows_dir)] + + # Garder l'attribut simple pour la rétro-compatibilité + self.workflows_dir = self._workflows_dirs[0] if self._workflows_dirs else Path("data/workflows") + self.use_embeddings = use_embeddings - + self.use_llm = use_llm + self.llm_model = llm_model + self.llm_endpoint = llm_endpoint + self.llm_timeout = llm_timeout + # Cache des métadonnées self._workflows: Dict[str, WorkflowMetadata] = {} - - # Embedder (chargé à la demande) + + # Embedder (compatibilité, non utilisé pour le matching LLM) self._embedder = None self._workflow_embeddings: Dict[str, Any] = {} - - # Charger les workflows + + # Auto-reload : timestamps des répertoires pour détecter les changements + self._dir_mtimes: Dict[str, float] = {} + self._auto_reload_interval = auto_reload_interval + self._last_reload_check = 0.0 + self._reload_lock = threading.Lock() + + # État LLM + self._llm_available: Optional[bool] = None # None = pas encore testé + + # Charger les workflows au démarrage self._load_workflows() - + + # ========================================================================= + # Chargement multi-répertoires + # ========================================================================= + def _load_workflows(self) -> None: - """Charger les métadonnées de tous les workflows.""" - if not self.workflows_dir.exists(): - logger.warning(f"Workflows directory not found: {self.workflows_dir}") - return - - for workflow_path in self.workflows_dir.glob("*.json"): + """Charger les métadonnées de tous les workflows depuis tous les répertoires.""" + self._workflows.clear() + total_loaded = 0 + + for workflows_dir in self._workflows_dirs: + if not workflows_dir.exists(): + logger.debug(f"Répertoire workflows absent (ignoré): {workflows_dir}") + continue + + count = self._load_workflows_from_dir(workflows_dir) + total_loaded += count + + # Mémoriser le mtime pour l'auto-reload + try: + self._dir_mtimes[str(workflows_dir)] = workflows_dir.stat().st_mtime + except OSError: + pass + + self._last_reload_check = time.time() + logger.info( + f"SemanticMatcher: {total_loaded} workflow(s) chargé(s) " + f"depuis {len(self._workflows_dirs)} répertoire(s)" + ) + + def _load_workflows_from_dir(self, workflows_dir: Path) -> int: + """ + Charger les workflows d'un répertoire spécifique. + + Returns: + Nombre de workflows chargés + """ + count = 0 + for workflow_path in workflows_dir.glob("*.json"): try: with open(workflow_path, 'r', encoding='utf-8') as f: data = json.load(f) - + workflow_id = workflow_path.stem - + + # Éviter les doublons (le premier répertoire a la priorité) + if workflow_id in self._workflows: + logger.debug( + f"Workflow {workflow_id} déjà chargé, " + f"ignoré depuis {workflows_dir}" + ) + continue + + # Extraire le nom — compatibilité entre les deux formats de workflow + name = data.get("name", workflow_id) + description = data.get("description", "") + + # Tags : supportent les deux formats + tags = data.get("tags", []) + # Format alternatif : tags dans metadata + if not tags and "metadata" in data: + tags = data["metadata"].get("tags", []) + # Extraire les métadonnées metadata = WorkflowMetadata( workflow_id=workflow_id, - name=data.get("name", workflow_id), - description=data.get("description", ""), - tags=data.get("tags", []), + name=name, + description=description, + tags=tags, keywords=self._extract_keywords(data), param_patterns=data.get("param_patterns", []), - path=str(workflow_path) + path=str(workflow_path), + source_dir=str(workflows_dir), ) - + self._workflows[workflow_id] = metadata - logger.debug(f"Loaded workflow: {metadata.name}") - + count += 1 + logger.debug(f"Workflow chargé: {metadata.name} [{workflows_dir.name}]") + except Exception as e: - logger.error(f"Error loading workflow {workflow_path}: {e}") - - logger.info(f"Loaded {len(self._workflows)} workflows") - + logger.error(f"Erreur chargement workflow {workflow_path}: {e}") + + if count: + logger.info(f" {count} workflow(s) depuis {workflows_dir}") + return count + + def _check_auto_reload(self) -> None: + """ + Vérifier si les répertoires ont changé et recharger si nécessaire. + + Appelé avant chaque recherche, vérifie le mtime des répertoires + au maximum toutes les `auto_reload_interval` secondes. + """ + if self._auto_reload_interval <= 0: + return + + now = time.time() + if now - self._last_reload_check < self._auto_reload_interval: + return + + with self._reload_lock: + # Double-check après acquisition du lock + if time.time() - self._last_reload_check < self._auto_reload_interval: + return + + needs_reload = False + + for workflows_dir in self._workflows_dirs: + if not workflows_dir.exists(): + # Répertoire créé entre-temps ? + if str(workflows_dir) not in self._dir_mtimes: + needs_reload = True + break + continue + + try: + current_mtime = workflows_dir.stat().st_mtime + prev_mtime = self._dir_mtimes.get(str(workflows_dir), 0) + if current_mtime > prev_mtime: + needs_reload = True + break + except OSError: + pass + + self._last_reload_check = time.time() + + if needs_reload: + logger.info("Changements détectés dans les répertoires workflows, rechargement...") + self._load_workflows() + def _extract_keywords(self, workflow_data: Dict[str, Any]) -> List[str]: """Extraire les mots-clés d'un workflow.""" keywords = set() - + # Nom name = workflow_data.get("name", "") keywords.update(self._tokenize(name)) - + # Description description = workflow_data.get("description", "") keywords.update(self._tokenize(description)) - + # Tags - keywords.update(workflow_data.get("tags", [])) - + tags = workflow_data.get("tags", []) + if not tags and "metadata" in workflow_data: + tags = workflow_data["metadata"].get("tags", []) + keywords.update(tags) + # Actions (noms des actions) for edge in workflow_data.get("edges", []): action = edge.get("action", {}) if isinstance(action, dict): action_type = action.get("type", "") keywords.add(action_type) - + return list(keywords) - + def _tokenize(self, text: str) -> List[str]: """Tokeniser un texte en mots-clés.""" # Normaliser text = text.lower() - + # Supprimer la ponctuation text = re.sub(r'[^\w\s]', ' ', text) - + # Découper en mots words = text.split() - + # Filtrer les mots courts et les stop words stop_words = { 'le', 'la', 'les', 'un', 'une', 'des', 'de', 'du', 'à', 'au', 'aux', @@ -156,13 +318,222 @@ class SemanticMatcher: 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'from', 'is', 'are', 'was', 'were', 'be', 'been' } - + return [w for w in words if len(w) > 2 and w not in stop_words] - + + # ========================================================================= + # Matching LLM (Ollama) + # ========================================================================= + + def _check_llm_availability(self) -> bool: + """Vérifier si Ollama est disponible avec le modèle configuré.""" + try: + import requests + resp = requests.get( + f"{self.llm_endpoint}/api/tags", + timeout=3 + ) + if resp.status_code == 200: + models = resp.json().get("models", []) + model_names = [m.get("name", "") for m in models] + # Vérifier que le modèle est disponible (avec ou sans tag) + available = any( + self.llm_model in name or name in self.llm_model + for name in model_names + ) + self._llm_available = available + if not available: + logger.warning( + f"Modèle {self.llm_model} non trouvé dans Ollama " + f"(disponibles: {model_names[:5]})" + ) + return available + except Exception as e: + logger.debug(f"Ollama indisponible: {e}") + self._llm_available = False + return False + + def _llm_semantic_rerank( + self, + command: str, + candidates: List[WorkflowMatch], + ) -> List[WorkflowMatch]: + """ + Re-classer les candidats via le LLM Ollama pour un matching sémantique. + + Envoie la commande utilisateur et la liste des workflows candidats au LLM, + lui demande de scorer la pertinence de chaque workflow (0-100). + + Args: + command: Commande en langage naturel de l'utilisateur + candidates: Liste de WorkflowMatch pré-filtrés par Jaccard (top-5) + + Returns: + Liste re-classée par score sémantique LLM + """ + if not candidates: + return candidates + + # Vérifier la disponibilité du LLM (cache le résultat) + if self._llm_available is None: + self._check_llm_availability() + + if not self._llm_available: + logger.debug("LLM indisponible, utilisation du score Jaccard seul") + return candidates + + # Construire la liste des workflows pour le prompt + workflows_desc = [] + for i, match in enumerate(candidates): + meta = self._workflows.get(match.workflow_id) + desc = meta.description if meta else "" + tags = ", ".join(meta.tags) if meta and meta.tags else "aucun" + workflows_desc.append( + f"{i+1}. ID: {match.workflow_id} | " + f"Nom: {match.workflow_name} | " + f"Description: {desc or 'aucune'} | " + f"Tags: {tags}" + ) + + workflows_list = "\n".join(workflows_desc) + + prompt = f"""Tu es un assistant RPA. L'utilisateur demande une action en langage naturel. +Tu dois évaluer la pertinence de chaque workflow candidat par rapport à sa demande. + +DEMANDE UTILISATEUR: "{command}" + +WORKFLOWS CANDIDATS: +{workflows_list} + +Pour chaque workflow, donne un score de pertinence de 0 à 100. +- 100 = correspondance parfaite (même action, même domaine) +- 70-99 = très pertinent (action similaire ou liée) +- 30-69 = moyennement pertinent +- 0-29 = pas pertinent + +Réponds UNIQUEMENT au format JSON, sans texte avant ni après: +{{"scores": [{{"id": "workflow_id", "score": 85, "raison": "explication courte"}}]}}""" + + try: + import requests + + resp = requests.post( + f"{self.llm_endpoint}/api/generate", + json={ + "model": self.llm_model, + "prompt": prompt, + "stream": False, + "options": { + "temperature": 0.1, # Déterministe + "num_predict": 500, # Réponse courte + }, + }, + timeout=self.llm_timeout, + ) + + if resp.status_code != 200: + logger.warning(f"Ollama erreur HTTP {resp.status_code}") + return candidates + + response_text = resp.json().get("response", "") + + # Parser la réponse JSON du LLM + scores = self._parse_llm_scores(response_text, candidates) + + if scores: + # Appliquer les scores LLM aux candidats + reranked = [] + for match in candidates: + llm_score = scores.get(match.workflow_id, 0) / 100.0 + # Score final : pondération 40% Jaccard + 60% LLM + combined_score = 0.4 * match.confidence + 0.6 * llm_score + reranked.append(WorkflowMatch( + workflow_id=match.workflow_id, + workflow_name=match.workflow_name, + workflow_path=match.workflow_path, + confidence=min(combined_score, 1.0), + extracted_params=match.extracted_params, + match_reason=match.match_reason + f" | llm_score:{int(llm_score*100)}", + )) + + # Re-trier par score combiné + reranked.sort(key=lambda m: m.confidence, reverse=True) + logger.info( + f"LLM reranking: {len(reranked)} candidats re-classés " + f"(meilleur: {reranked[0].workflow_name} @ {reranked[0].confidence:.2f})" + ) + return reranked + + except Exception as e: + logger.warning(f"Erreur matching LLM (fallback Jaccard): {e}") + # Marquer comme indisponible temporairement + self._llm_available = None + + return candidates + + def _parse_llm_scores( + self, + response_text: str, + candidates: List[WorkflowMatch], + ) -> Dict[str, float]: + """ + Parser les scores retournés par le LLM. + + Gère les différents formats possibles de réponse JSON. + + Returns: + Dict workflow_id → score (0-100) + """ + scores = {} + + try: + # Essayer de trouver le JSON dans la réponse + # Le LLM peut ajouter du texte avant/après le JSON + json_match = re.search(r'\{[\s\S]*"scores"[\s\S]*\}', response_text) + if not json_match: + logger.debug(f"Pas de JSON trouvé dans la réponse LLM: {response_text[:200]}") + return scores + + data = json.loads(json_match.group()) + score_list = data.get("scores", []) + + # Mapper les scores aux workflow_ids + candidate_ids = {m.workflow_id for m in candidates} + + for entry in score_list: + wf_id = entry.get("id", "") + score = entry.get("score", 0) + + # Normaliser le score + if isinstance(score, (int, float)): + score = max(0, min(100, score)) + else: + continue + + # Vérifier que l'ID correspond à un candidat + if wf_id in candidate_ids: + scores[wf_id] = score + else: + # Le LLM a peut-être utilisé un index au lieu de l'ID + # Essayer de résoudre par position + try: + idx = int(wf_id) - 1 + if 0 <= idx < len(candidates): + scores[candidates[idx].workflow_id] = score + except (ValueError, IndexError): + pass + + except json.JSONDecodeError as e: + logger.debug(f"Erreur parsing JSON LLM: {e}") + except Exception as e: + logger.debug(f"Erreur parsing scores LLM: {e}") + + return scores + # ========================================================================= # Matching # ========================================================================= - + def find_workflow( self, command: str, @@ -170,17 +541,17 @@ class SemanticMatcher: ) -> Optional[WorkflowMatch]: """ Trouver le workflow correspondant à une commande. - + Args: command: Commande en langage naturel min_confidence: Confiance minimale requise - + Returns: WorkflowMatch ou None si aucun match """ matches = self.find_workflows(command, limit=1, min_confidence=min_confidence) return matches[0] if matches else None - + def find_workflows( self, command: str, @@ -189,45 +560,63 @@ class SemanticMatcher: ) -> List[WorkflowMatch]: """ Trouver les workflows correspondant à une commande. - + + Stratégie en 2 phases : + 1. Pré-filtrage rapide par Jaccard/tags/nom → top-5 candidats + 2. Re-ranking sémantique via LLM Ollama (si activé et disponible) + Args: command: Commande en langage naturel limit: Nombre max de résultats min_confidence: Confiance minimale requise - + Returns: Liste de WorkflowMatch triés par confiance """ + # Auto-reload si des changements sont détectés + self._check_auto_reload() + if not self._workflows: - logger.warning("No workflows loaded") + logger.warning("Aucun workflow chargé") return [] - + command_lower = command.lower() command_tokens = set(self._tokenize(command)) - - matches = [] - + + # Phase 1 : Pré-filtrage Jaccard (rapide) + jaccard_matches = [] + for workflow_id, metadata in self._workflows.items(): - # Calculer le score de matching + # Calculer le score de matching Jaccard score, reason, params = self._calculate_match_score( command_lower, command_tokens, metadata ) - - if score >= min_confidence: - matches.append(WorkflowMatch( + + if score > 0: # Garder tous les candidats avec un score > 0 + jaccard_matches.append(WorkflowMatch( workflow_id=workflow_id, workflow_name=metadata.name, workflow_path=metadata.path, confidence=score, extracted_params=params, - match_reason=reason + match_reason=reason, )) - + # Trier par confiance décroissante - matches.sort(key=lambda m: m.confidence, reverse=True) - - return matches[:limit] - + jaccard_matches.sort(key=lambda m: m.confidence, reverse=True) + + # Prendre les top-5 pour le re-ranking LLM + top_candidates = jaccard_matches[:5] + + # Phase 2 : Re-ranking sémantique via LLM (si activé) + if self.use_llm and top_candidates: + top_candidates = self._llm_semantic_rerank(command, top_candidates) + + # Filtrer par confiance minimale + filtered = [m for m in top_candidates if m.confidence >= min_confidence] + + return filtered[:limit] + def _calculate_match_score( self, command: str, @@ -236,25 +625,25 @@ class SemanticMatcher: ) -> Tuple[float, str, Dict[str, str]]: """ Calculer le score de matching entre une commande et un workflow. - + Returns: (score, reason, extracted_params) """ score = 0.0 reasons = [] params = {} - + # 1. Matching exact du nom if metadata.name.lower() in command: score += 0.5 reasons.append("exact_name") - + # 2. Matching des tags for tag in metadata.tags: if tag.lower() in command: score += 0.3 reasons.append(f"tag:{tag}") - + # 3. Matching des mots-clés (Jaccard similarity) workflow_tokens = set(metadata.keywords) if workflow_tokens and command_tokens: @@ -264,7 +653,7 @@ class SemanticMatcher: score += jaccard * 0.4 if intersection: reasons.append(f"keywords:{','.join(intersection)}") - + # 4. Matching de la description if metadata.description: desc_tokens = set(self._tokenize(metadata.description)) @@ -273,18 +662,18 @@ class SemanticMatcher: if intersection: score += 0.2 reasons.append("description_match") - + # 5. Extraction des paramètres params = self._extract_params(command, metadata) if params: score += 0.1 reasons.append(f"params:{','.join(params.keys())}") - + # Normaliser le score (max 1.0) score = min(score, 1.0) - + return score, " | ".join(reasons), params - + def _extract_params( self, command: str, @@ -292,11 +681,11 @@ class SemanticMatcher: ) -> Dict[str, str]: """ Extraire les paramètres d'une commande. - + Utilise les patterns définis dans le workflow et des heuristiques. """ params = {} - + # 1. Utiliser les patterns définis for pattern in metadata.param_patterns: try: @@ -304,8 +693,8 @@ class SemanticMatcher: if match: params.update(match.groupdict()) except Exception as e: - logger.warning(f"Invalid pattern '{pattern}': {e}") - + logger.warning(f"Pattern invalide '{pattern}': {e}") + # 2. Heuristiques communes # Pattern: "de X à Y" ou "from X to Y" range_pattern = r'(?:de|from)\s+(\w+)\s+(?:à|to)\s+(\w+)' @@ -313,38 +702,46 @@ class SemanticMatcher: if match: params['start'] = match.group(1) params['end'] = match.group(2) - + # Pattern: "client X" ou "customer X" client_pattern = r'(?:client|customer|compte)\s+([A-Za-z0-9_\-]+)' match = re.search(client_pattern, command, re.IGNORECASE) if match: params['client'] = match.group(1) - + # Pattern: "facture N" ou "invoice N" invoice_pattern = r'(?:facture|invoice|commande|order)\s+([A-Za-z0-9_\-]+)' match = re.search(invoice_pattern, command, re.IGNORECASE) if match: params['invoice'] = match.group(1) - + # Pattern: valeurs entre guillemets quoted_pattern = r'"([^"]+)"' quoted_values = re.findall(quoted_pattern, command) for i, value in enumerate(quoted_values): if f'value{i}' not in params: params[f'value{i}'] = value - + return params - + # ========================================================================= # Gestion des workflows # ========================================================================= - - def reload_workflows(self) -> None: - """Recharger tous les workflows.""" - self._workflows.clear() - self._workflow_embeddings.clear() - self._load_workflows() - + + def reload_workflows(self) -> int: + """ + Recharger tous les workflows depuis tous les répertoires. + + Returns: + Nombre total de workflows chargés + """ + with self._reload_lock: + self._workflows.clear() + self._workflow_embeddings.clear() + self._llm_available = None # Re-tester la dispo LLM + self._load_workflows() + return len(self._workflows) + def add_workflow( self, workflow_id: str, @@ -356,7 +753,7 @@ class SemanticMatcher: ) -> None: """ Ajouter un workflow au matcher. - + Args: workflow_id: ID unique du workflow name: Nom du workflow @@ -372,77 +769,101 @@ class SemanticMatcher: tags=tags or [], keywords=self._tokenize(name) + self._tokenize(description) + (tags or []), param_patterns=param_patterns or [], - path=path + path=path, + source_dir="dynamic", ) - + self._workflows[workflow_id] = metadata - logger.info(f"Added workflow: {name}") - + logger.info(f"Workflow ajouté: {name}") + def get_all_workflows(self) -> List[WorkflowMetadata]: """Obtenir tous les workflows.""" return list(self._workflows.values()) - + def get_workflow(self, workflow_id: str) -> Optional[WorkflowMetadata]: """Obtenir un workflow par ID.""" return self._workflows.get(workflow_id) - + + def get_directories(self) -> List[Dict[str, Any]]: + """ + Obtenir la liste des répertoires scannés et leur contenu. + + Returns: + Liste de dicts avec path, exists, workflow_count + """ + dirs_info = [] + for d in self._workflows_dirs: + count = sum( + 1 for wf in self._workflows.values() + if wf.source_dir == str(d) + ) + dirs_info.append({ + "path": str(d), + "exists": d.exists(), + "workflow_count": count, + }) + return dirs_info + # ========================================================================= # Suggestions # ========================================================================= - + def suggest_commands(self, partial_command: str, limit: int = 5) -> List[str]: """ Suggérer des commandes basées sur une entrée partielle. - + Args: partial_command: Début de commande limit: Nombre max de suggestions - + Returns: Liste de suggestions """ suggestions = [] partial_lower = partial_command.lower() - + for metadata in self._workflows.values(): # Suggérer basé sur le nom if metadata.name.lower().startswith(partial_lower): suggestions.append(metadata.name) - + # Suggérer basé sur les tags for tag in metadata.tags: if tag.lower().startswith(partial_lower): suggestions.append(f"{tag} ({metadata.name})") - + return suggestions[:limit] - + def get_workflow_help(self, workflow_id: str) -> str: """ Obtenir l'aide pour un workflow. - + Args: workflow_id: ID du workflow - + Returns: Texte d'aide """ metadata = self._workflows.get(workflow_id) if not metadata: - return f"Workflow '{workflow_id}' not found" - - help_text = f"📋 {metadata.name}\n" - + return f"Workflow '{workflow_id}' non trouvé" + + help_text = f"Workflow: {metadata.name}\n" + if metadata.description: help_text += f"\n{metadata.description}\n" - + if metadata.tags: - help_text += f"\n🏷️ Tags: {', '.join(metadata.tags)}\n" - + help_text += f"\nTags: {', '.join(metadata.tags)}\n" + if metadata.param_patterns: - help_text += f"\n📝 Paramètres supportés:\n" + help_text += f"\nParametres supportes:\n" for pattern in metadata.param_patterns: help_text += f" - {pattern}\n" - + + if metadata.source_dir: + help_text += f"\nSource: {metadata.source_dir}\n" + return help_text @@ -450,14 +871,18 @@ class SemanticMatcher: # Fonctions utilitaires # ============================================================================= -def create_semantic_matcher(workflows_dir: str = "data/workflows") -> SemanticMatcher: +def create_semantic_matcher( + workflows_dir: Union[str, List[str], None] = None, + use_llm: bool = True, +) -> SemanticMatcher: """ Créer un matcher sémantique. - + Args: - workflows_dir: Répertoire des workflows - + workflows_dir: Répertoire(s) des workflows. None = tous les répertoires par défaut. + use_llm: Activer le matching LLM via Ollama + Returns: SemanticMatcher configuré """ - return SemanticMatcher(workflows_dir=workflows_dir) + return SemanticMatcher(workflows_dir=workflows_dir, use_llm=use_llm)