#!/usr/bin/env python3 """ WorkflowMatcher - Compare les actions courantes avec les workflows connus pour détecter les correspondances et suggérer l'auto-complétion. """ from typing import List, Dict, Any, Optional from dataclasses import dataclass import math import numpy as np @dataclass class WorkflowMatch: """Représente une correspondance entre la session courante et un workflow.""" workflow_id: str workflow_name: str confidence: float matched_steps: int total_steps: int remaining_steps: List[Dict[str, Any]] current_step_index: int @property def completion_percentage(self) -> float: """Pourcentage de complétion du workflow.""" if self.total_steps == 0: return 0.0 return (self.matched_steps / self.total_steps) * 100 class WorkflowMatcher: """ Gestionnaire de correspondance de workflows. Compare les actions courantes avec les workflows connus. """ def __init__(self, logger, config: Dict[str, Any], faiss_index=None): """ Initialise le matcher de workflows. Args: logger: Logger pour journalisation config: Configuration globale faiss_index: Optional FAISSIndex for visual similarity matching """ self.logger = logger self.config = config self.faiss_index = faiss_index # Configuration self.position_tolerance = config.get("workflow", {}).get( "position_tolerance", 50 # 50px par défaut ) self.min_confidence = config.get("workflow", {}).get( "min_confidence", 0.80 # 80% par défaut ) # Visual similarity weight (if FAISS is available) self.visual_similarity_weight = config.get("workflow", {}).get( "visual_similarity_weight", 0.3 # 30% weight for visual similarity ) if self.logger: self.logger.log_action({ "action": "workflow_matcher_initialized", "position_tolerance": self.position_tolerance, "min_confidence": self.min_confidence, "faiss_enabled": faiss_index is not None }) def match_current_session( self, session_actions: List[Dict[str, Any]], workflows: List[Dict[str, Any]] ) -> List[WorkflowMatch]: """ Compare la session courante avec tous les workflows connus. Args: session_actions: Liste des actions de la session courante workflows: Liste des workflows connus Returns: Liste des correspondances trouvées, triées par confiance """ if not session_actions or not workflows: return [] matches = [] for workflow in workflows: match_score = self.calculate_match_score( session_actions, workflow.get("steps", []) ) if match_score > 0: # Calculer le nombre d'étapes matchées matched_steps = self._count_matched_steps( session_actions, workflow.get("steps", []) ) total_steps = len(workflow.get("steps", [])) # Créer la correspondance match = WorkflowMatch( workflow_id=workflow.get("workflow_id", ""), workflow_name=workflow.get("name", "Workflow inconnu"), confidence=match_score, matched_steps=matched_steps, total_steps=total_steps, remaining_steps=workflow.get("steps", [])[matched_steps:], current_step_index=matched_steps ) matches.append(match) # Trier par confiance décroissante matches.sort(key=lambda m: m.confidence, reverse=True) if matches and self.logger: self.logger.log_action({ "action": "workflows_matched", "num_matches": len(matches), "best_confidence": matches[0].confidence if matches else 0.0 }) return matches def calculate_match_score( self, actions: List[Dict[str, Any]], workflow_steps: List[Dict[str, Any]] ) -> float: """ Calcule le score de correspondance entre des actions et un workflow. Le score prend en compte: - La correspondance des types d'actions - La similarité des positions (avec tolérance) - La correspondance des fenêtres Args: actions: Liste des actions à comparer workflow_steps: Étapes du workflow Returns: Score de correspondance (0-1) """ if not actions or not workflow_steps: return 0.0 # Comparer seulement les N premières actions avec le début du workflow num_actions = len(actions) num_steps = len(workflow_steps) # On ne peut pas matcher plus d'étapes qu'il n'y en a dans le workflow compare_length = min(num_actions, num_steps) if compare_length == 0: return 0.0 total_score = 0.0 for i in range(compare_length): action = actions[i] step = workflow_steps[i] # Score pour cette étape step_score = self._calculate_step_similarity(action, step) total_score += step_score # Score moyen avg_score = total_score / compare_length # Bonus si on a matché plusieurs étapes (plus fiable) # Plus on a d'étapes matchées, plus on est confiant sequence_bonus = min(0.1, compare_length * 0.02) final_score = min(1.0, avg_score + sequence_bonus) return final_score def _normalize_action_type(self, action_type: str) -> str: """ Normalise les types d'actions pour le matching. Args: action_type: Type d'action brut Returns: Type d'action normalisé """ # Mapper les variantes vers un type standard type_mapping = { "mouse_click": "click", "mouse_move": "move", "key_press": "type", "keyboard": "type", } return type_mapping.get(action_type, action_type) def _calculate_step_similarity( self, action: Dict[str, Any], step: Dict[str, Any] ) -> float: """ Calcule la similarité entre une action et une étape de workflow. Args: action: Action à comparer step: Étape du workflow Returns: Score de similarité (0-1) """ # Check if we have visual embeddings and FAISS index has_visual = ( self.faiss_index is not None and action.get("embedding") is not None and step.get("embedding") is not None ) if has_visual: # Adjust weights to include visual similarity weights = { "action_type": 0.3, "position": 0.2, "window": 0.2, "visual": 0.3 } else: # Original weights without visual weights = { "action_type": 0.4, "position": 0.3, "window": 0.3 } score = 0.0 # 1. Correspondance du type d'action action_type = self._normalize_action_type(action.get("action_type", "")) step_type = self._normalize_action_type(step.get("action_type", "")) action_type_match = (action_type == step_type) if action_type_match: score += weights["action_type"] # 2. Similarité de position action_pos = action.get("position", [0, 0]) step_pos = step.get("position", [0, 0]) if action_pos and step_pos: position_similarity = self._calculate_position_similarity( action_pos, step_pos ) score += weights["position"] * position_similarity # 3. Correspondance de fenêtre action_window = action.get("window", "") step_window = step.get("window", "") if action_window and step_window: # Correspondance exacte ou partielle if action_window == step_window: score += weights["window"] elif action_window in step_window or step_window in action_window: score += weights["window"] * 0.5 # 4. Similarité visuelle (si disponible) if has_visual: visual_similarity = self._calculate_visual_similarity( action.get("embedding"), step.get("embedding") ) score += weights["visual"] * visual_similarity return score def _calculate_visual_similarity( self, embedding1: np.ndarray, embedding2: np.ndarray ) -> float: """ Calcule la similarité cosinus entre deux embeddings. Args: embedding1: Premier embedding embedding2: Deuxième embedding Returns: Score de similarité (0-1) """ if embedding1 is None or embedding2 is None: return 0.0 try: # Ensure embeddings are numpy arrays if not isinstance(embedding1, np.ndarray): embedding1 = np.array(embedding1) if not isinstance(embedding2, np.ndarray): embedding2 = np.array(embedding2) # Normalize embeddings emb1_norm = embedding1 / (np.linalg.norm(embedding1) + 1e-8) emb2_norm = embedding2 / (np.linalg.norm(embedding2) + 1e-8) # Cosine similarity similarity = np.dot(emb1_norm, emb2_norm) # Clamp to [0, 1] return float(max(0.0, min(1.0, similarity))) except Exception as e: if self.logger: self.logger.log_action({ "action": "visual_similarity_error", "error": str(e) }) return 0.0 def _calculate_position_similarity( self, pos1: List[int], pos2: List[int] ) -> float: """ Calcule la similarité entre deux positions avec tolérance. Args: pos1: Position 1 [x, y] pos2: Position 2 [x, y] Returns: Score de similarité (0-1) """ if not pos1 or not pos2 or len(pos1) < 2 or len(pos2) < 2: return 0.0 # Distance euclidienne dx = pos1[0] - pos2[0] dy = pos1[1] - pos2[1] distance = math.sqrt(dx * dx + dy * dy) # Si distance <= tolérance, score = 1.0 # Si distance > tolérance, score décroît linéairement if distance <= self.position_tolerance: return 1.0 else: # Décroissance linéaire jusqu'à 2x la tolérance max_distance = self.position_tolerance * 2 if distance >= max_distance: return 0.0 else: return 1.0 - ((distance - self.position_tolerance) / self.position_tolerance) def _count_matched_steps( self, actions: List[Dict[str, Any]], workflow_steps: List[Dict[str, Any]] ) -> int: """ Compte le nombre d'étapes matchées consécutivement. Args: actions: Liste des actions workflow_steps: Étapes du workflow Returns: Nombre d'étapes matchées """ compare_length = min(len(actions), len(workflow_steps)) matched = 0 for i in range(compare_length): similarity = self._calculate_step_similarity( actions[i], workflow_steps[i] ) # On considère qu'une étape est matchée si similarité > 0.7 if similarity >= 0.7: matched += 1 else: # Arrêter au premier non-match (séquence consécutive) break return matched def find_best_match( self, matches: List[WorkflowMatch] ) -> Optional[WorkflowMatch]: """ Trouve la meilleure correspondance parmi une liste. Args: matches: Liste des correspondances Returns: Meilleure correspondance si confiance > seuil, None sinon """ if not matches: return None # Les matches sont déjà triés par confiance décroissante best_match = matches[0] # Vérifier le seuil de confiance if best_match.confidence >= self.min_confidence: if self.logger: self.logger.log_action({ "action": "best_match_found", "workflow_id": best_match.workflow_id, "workflow_name": best_match.workflow_name, "confidence": best_match.confidence, "matched_steps": best_match.matched_steps, "total_steps": best_match.total_steps }) return best_match return None def get_match_details(self, match: WorkflowMatch) -> Dict[str, Any]: """ Retourne les détails d'une correspondance pour affichage. Args: match: Correspondance à détailler Returns: Dictionnaire avec les détails """ return { "workflow_id": match.workflow_id, "workflow_name": match.workflow_name, "confidence": match.confidence, "matched_steps": match.matched_steps, "total_steps": match.total_steps, "remaining_steps": match.remaining_steps, "completion_percentage": match.completion_percentage, "next_steps_preview": match.remaining_steps[:3] # 3 prochaines étapes } if __name__ == "__main__": # Tests basiques print("Test du WorkflowMatcher") print("=" * 50) # Mock logger class MockLogger: def log_action(self, data): print(f"[LOG] {data}") logger = MockLogger() config = { "workflow": { "position_tolerance": 50, "min_confidence": 0.80 } } matcher = WorkflowMatcher(logger, config) # Test 1: Match parfait print("\n1. Test match parfait:") session_actions = [ { "action_type": "click", "position": [100, 100], "window": "Calculatrice" }, { "action_type": "type", "position": [0, 0], "window": "Calculatrice" } ] workflow = { "workflow_id": "calc_001", "name": "Calcul simple", "steps": [ { "action_type": "click", "position": [100, 100], "window": "Calculatrice" }, { "action_type": "type", "position": [0, 0], "window": "Calculatrice" }, { "action_type": "click", "position": [200, 200], "window": "Calculatrice" } ] } matches = matcher.match_current_session(session_actions, [workflow]) print(f" Nombre de matches: {len(matches)}") if matches: print(f" Meilleur match: {matches[0].workflow_name}") print(f" Confiance: {matches[0].confidence:.2%}") print(f" Étapes matchées: {matches[0].matched_steps}/{matches[0].total_steps}") # Test 2: Match avec tolérance de position print("\n2. Test avec tolérance de position:") session_actions[0]["position"] = [120, 110] # Légèrement décalé matches = matcher.match_current_session(session_actions, [workflow]) if matches: print(f" Confiance avec décalage: {matches[0].confidence:.2%}") # Test 3: Trouver le meilleur match print("\n3. Test find_best_match:") best = matcher.find_best_match(matches) if best: print(f" Meilleur match trouvé: {best.workflow_name}") print(f" Confiance: {best.confidence:.2%}") details = matcher.get_match_details(best) print(f" Prochaines étapes: {len(details['next_steps_preview'])}") else: print(" Aucun match au-dessus du seuil") print("\n✓ Tests terminés!")