""" HierarchicalMatcher - Système de matching multi-niveau Ce module implémente un système de matching hiérarchique qui combine: - Niveau fenêtre: titre, processus, classe de fenêtre - Niveau région: régions UI détectées - Niveau élément: éléments UI individuels Formule de confiance: 0.2*fenêtre + 0.3*région + 0.5*élément Boost temporel: +0.1 si successeur valide du node précédent """ import logging import re from typing import List, Dict, Optional, Tuple, Any from dataclasses import dataclass, field from datetime import datetime import numpy as np logger = logging.getLogger(__name__) # ============================================================================= # Dataclasses # ============================================================================= @dataclass class TemporalContext: """Contexte temporel pour le matching""" previous_nodes: List[str] = field(default_factory=list) # N derniers nodes matchés previous_confidences: List[float] = field(default_factory=list) time_since_last_match: float = 0.0 # Temps depuis dernier match (secondes) max_history: int = 5 # Nombre max de nodes à garder def add_match(self, node_id: str, confidence: float) -> None: """Ajouter un match à l'historique""" self.previous_nodes.append(node_id) self.previous_confidences.append(confidence) # Limiter la taille de l'historique if len(self.previous_nodes) > self.max_history: self.previous_nodes = self.previous_nodes[-self.max_history:] self.previous_confidences = self.previous_confidences[-self.max_history:] @property def last_node(self) -> Optional[str]: """Dernier node matché""" return self.previous_nodes[-1] if self.previous_nodes else None @dataclass class AlternativeMatch: """Match alternatif avec score""" node_id: str confidence: float window_confidence: float region_confidence: float element_confidence: float @dataclass class MatchResult: """Résultat complet d'un match hiérarchique""" node_id: str confidence: float # Confiance globale (0-1) window_confidence: float # Confiance niveau fenêtre region_confidence: float # Confiance niveau région element_confidence: float # Confiance niveau élément temporal_boost: float = 0.0 # Bonus temporel appliqué matched_variant: Optional[str] = None # ID de variante matchée alternatives: List[AlternativeMatch] = field(default_factory=list) match_time_ms: float = 0.0 # Temps de matching @property def raw_confidence(self) -> float: """Confiance avant boost temporel""" return self.confidence - self.temporal_boost def to_dict(self) -> Dict[str, Any]: """Sérialiser en dictionnaire""" return { "node_id": self.node_id, "confidence": self.confidence, "window_confidence": self.window_confidence, "region_confidence": self.region_confidence, "element_confidence": self.element_confidence, "temporal_boost": self.temporal_boost, "matched_variant": self.matched_variant, "alternatives_count": len(self.alternatives), "match_time_ms": self.match_time_ms } @dataclass class HierarchicalMatcherConfig: """Configuration du matcher hiérarchique""" # Poids pour la combinaison de confiance window_weight: float = 0.2 region_weight: float = 0.3 element_weight: float = 0.5 # Boost temporel temporal_boost: float = 0.1 max_confidence: float = 1.0 # Seuils min_confidence_threshold: float = 0.5 window_title_similarity_threshold: float = 0.8 region_iou_threshold: float = 0.5 element_similarity_threshold: float = 0.7 # Matching de fenêtre use_regex_title_matching: bool = True case_sensitive_title: bool = False # ============================================================================= # Matcher Hiérarchique # ============================================================================= class HierarchicalMatcher: """ Système de matching multi-niveau pour reconnaissance d'états d'écran. Combine trois niveaux de matching: 1. Fenêtre: titre, processus, classe 2. Région: zones UI détectées 3. Élément: éléments UI individuels Example: >>> matcher = HierarchicalMatcher() >>> result = matcher.match(screenshot, workflow, temporal_context) >>> if result.confidence > 0.8: ... print(f"Matched node: {result.node_id}") """ def __init__(self, config: Optional[HierarchicalMatcherConfig] = None): """ Initialiser le matcher. Args: config: Configuration du matcher (utilise défaut si None) """ self.config = config or HierarchicalMatcherConfig() logger.info( f"HierarchicalMatcher initialisé: " f"weights=({self.config.window_weight}, {self.config.region_weight}, {self.config.element_weight})" ) def match( self, screenshot: Any, workflow: Any, window_info: Optional[Dict] = None, detected_elements: Optional[List] = None, temporal_context: Optional[TemporalContext] = None ) -> MatchResult: """ Effectuer un match hiérarchique contre tous les nodes du workflow. Args: screenshot: Image du screenshot (PIL Image ou numpy array) workflow: Workflow avec nodes à matcher window_info: Informations de fenêtre (titre, processus, etc.) detected_elements: Éléments UI détectés temporal_context: Contexte temporel pour boost Returns: MatchResult avec le meilleur match et alternatives """ import time start_time = time.time() nodes = getattr(workflow, 'nodes', []) if not nodes: logger.warning("Workflow sans nodes") return MatchResult( node_id="", confidence=0.0, window_confidence=0.0, region_confidence=0.0, element_confidence=0.0 ) # Calculer scores pour chaque node node_scores = [] for node in nodes: score = self._compute_node_score( node, screenshot, window_info, detected_elements ) node_scores.append((node, score)) # Trier par confiance décroissante node_scores.sort(key=lambda x: x[1]['combined'], reverse=True) # Meilleur match best_node, best_scores = node_scores[0] # Appliquer boost temporel si applicable temporal_boost = 0.0 if temporal_context and temporal_context.last_node: if self._is_valid_successor( temporal_context.last_node, best_node.node_id, workflow ): temporal_boost = self.config.temporal_boost # Calculer confiance finale (plafonnée à 1.0) final_confidence = min( best_scores['combined'] + temporal_boost, self.config.max_confidence ) # Construire alternatives alternatives = [] for node, scores in node_scores[1:4]: # Top 3 alternatives alternatives.append(AlternativeMatch( node_id=node.node_id, confidence=scores['combined'], window_confidence=scores['window'], region_confidence=scores['region'], element_confidence=scores['element'] )) match_time = (time.time() - start_time) * 1000 result = MatchResult( node_id=best_node.node_id, confidence=final_confidence, window_confidence=best_scores['window'], region_confidence=best_scores['region'], element_confidence=best_scores['element'], temporal_boost=temporal_boost, alternatives=alternatives, match_time_ms=match_time ) logger.debug( f"Match: {result.node_id} (conf={result.confidence:.3f}, " f"w={result.window_confidence:.3f}, r={result.region_confidence:.3f}, " f"e={result.element_confidence:.3f}, boost={temporal_boost:.2f})" ) return result def _compute_node_score( self, node: Any, screenshot: Any, window_info: Optional[Dict], detected_elements: Optional[List] ) -> Dict[str, float]: """ Calculer les scores de matching pour un node. Returns: Dict avec scores 'window', 'region', 'element', 'combined' """ # Score niveau fenêtre window_score = self.match_window_level(window_info, node) # Score niveau région region_score = self.match_region_level(screenshot, node) # Score niveau élément element_score = self.match_element_level(detected_elements, node) # Combinaison pondérée combined = ( self.config.window_weight * window_score + self.config.region_weight * region_score + self.config.element_weight * element_score ) return { 'window': window_score, 'region': region_score, 'element': element_score, 'combined': combined } def match_window_level( self, window_info: Optional[Dict], node: Any ) -> float: """ Matcher au niveau fenêtre. Compare: - Titre de fenêtre (pattern regex ou similarité) - Nom du processus - Classe de fenêtre Args: window_info: Dict avec 'title', 'process_name', 'window_class' node: WorkflowNode avec template Returns: Score de confiance 0.0-1.0 """ if not window_info: return 0.5 # Score neutre si pas d'info template = getattr(node, 'template', None) if not template: return 0.5 scores = [] # Matching du titre current_title = window_info.get('title', '') template_pattern = getattr(template.window, 'title_pattern', None) if getattr(template, 'window', None) else None if template_pattern and current_title: if self.config.use_regex_title_matching: try: flags = 0 if self.config.case_sensitive_title else re.IGNORECASE if re.search(template_pattern, current_title, flags): scores.append(1.0) else: # Fallback sur similarité de chaîne scores.append(self._string_similarity(template_pattern, current_title)) except re.error: scores.append(self._string_similarity(template_pattern, current_title)) else: scores.append(self._string_similarity(template_pattern, current_title)) # Matching du processus current_process = window_info.get('process_name', '') template_process = getattr(template.window, 'process_name', None) if getattr(template, 'window', None) else None if template_process and current_process: if current_process.lower() == template_process.lower(): scores.append(1.0) else: scores.append(0.0) # Matching de la classe de fenêtre current_class = window_info.get('window_class', '') template_class = getattr(template, 'window_class', None) if template_class and current_class: if current_class == template_class: scores.append(1.0) else: scores.append(0.0) return np.mean(scores) if scores else 0.5 def match_region_level( self, screenshot: Any, node: Any ) -> float: """ Matcher au niveau région. Compare les régions UI détectées avec les régions template. Utilise IoU (Intersection over Union) et similarité d'embedding. Args: screenshot: Image du screenshot node: WorkflowNode avec template Returns: Score de confiance 0.0-1.0 """ template = getattr(node, 'template', None) if not template: return 0.5 # Récupérer embedding prototype du template prototype = getattr(template.embedding, 'vector_id', None) if getattr(template, 'embedding', None) else None if prototype is None: return 0.5 # Calculer embedding du screenshot actuel try: from core.embedding.state_embedding_builder import StateEmbeddingBuilder builder = StateEmbeddingBuilder() # Créer un ScreenState minimal pour le builder from core.models.screen_state import ScreenState, WindowContext, RawLevel, PerceptionLevel, ContextLevel, EmbeddingRef temp_state = ScreenState( screen_state_id="temp_match", timestamp=datetime.now(), session_id="temp", window=WindowContext( app_name="unknown", window_title="Unknown", screen_resolution=[1920, 1080], workspace="main" ), raw=RawLevel(screenshot_path="", capture_method="memory", file_size_bytes=0), perception=PerceptionLevel( embedding=EmbeddingRef(provider="temp", vector_id="", dimensions=512), detected_text=[], text_detection_method="none", confidence_avg=0.0 ), context=ContextLevel( current_workflow_candidate=None, workflow_step=0, user_id="temp", tags=[], business_variables={} ), metadata={"screenshot_data": screenshot} ) state_embedding = builder.build(temp_state) current_vector = state_embedding.get_vector() # Calculer similarité cosinus prototype_array = np.array(prototype) similarity = self._cosine_similarity(current_vector, prototype_array) return float(similarity) except Exception as e: logger.warning(f"Erreur matching région: {e}") return 0.5 def match_element_level( self, detected_elements: Optional[List], node: Any ) -> float: """ Matcher au niveau élément. Compare les éléments UI détectés avec les éléments template. Utilise rôle, texte et similarité visuelle. Args: detected_elements: Liste d'éléments UI détectés node: WorkflowNode avec template Returns: Score de confiance 0.0-1.0 """ if not detected_elements: return 0.5 template = getattr(node, 'template', None) if not template: return 0.5 required_elements = getattr(template, 'required_ui_elements', []) if not required_elements: return 0.5 # Compter les éléments requis trouvés found_count = 0 for required in required_elements: req_role = required.get('role', '') req_text = required.get('text', '') for detected in detected_elements: det_role = getattr(detected, 'role', '') or detected.get('role', '') det_text = getattr(detected, 'text', '') or detected.get('text', '') # Matching par rôle role_match = req_role.lower() == det_role.lower() if req_role and det_role else True # Matching par texte (similarité) if req_text and det_text: text_match = self._string_similarity(req_text, det_text) > self.config.element_similarity_threshold else: text_match = True if role_match and text_match: found_count += 1 break return found_count / len(required_elements) if required_elements else 0.5 def _is_valid_successor( self, from_node_id: str, to_node_id: str, workflow: Any ) -> bool: """ Vérifier si to_node est un successeur valide de from_node. Args: from_node_id: ID du node source to_node_id: ID du node destination workflow: Workflow avec edges Returns: True si transition valide """ edges = getattr(workflow, 'edges', []) for edge in edges: if edge.from_node == from_node_id and edge.to_node == to_node_id: return True return False def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: """Calculer similarité cosinus entre deux vecteurs.""" norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return float(np.dot(a, b) / (norm_a * norm_b)) def _string_similarity(self, s1: str, s2: str) -> float: """ Calculer similarité entre deux chaînes. Utilise la distance de Levenshtein normalisée. """ if not s1 or not s2: return 0.0 if not self.config.case_sensitive_title: s1 = s1.lower() s2 = s2.lower() # Distance de Levenshtein simplifiée if s1 == s2: return 1.0 len1, len2 = len(s1), len(s2) if len1 == 0 or len2 == 0: return 0.0 # Matrice de distance matrix = [[0] * (len2 + 1) for _ in range(len1 + 1)] for i in range(len1 + 1): matrix[i][0] = i for j in range(len2 + 1): matrix[0][j] = j for i in range(1, len1 + 1): for j in range(1, len2 + 1): cost = 0 if s1[i-1] == s2[j-1] else 1 matrix[i][j] = min( matrix[i-1][j] + 1, # Suppression matrix[i][j-1] + 1, # Insertion matrix[i-1][j-1] + cost # Substitution ) distance = matrix[len1][len2] max_len = max(len1, len2) return 1.0 - (distance / max_len) def get_config(self) -> HierarchicalMatcherConfig: """Récupérer la configuration actuelle.""" return self.config def set_config(self, config: HierarchicalMatcherConfig) -> None: """Mettre à jour la configuration.""" self.config = config logger.info("Configuration du matcher mise à jour") # ============================================================================= # Fonctions utilitaires # ============================================================================= def create_matcher( window_weight: float = 0.2, region_weight: float = 0.3, element_weight: float = 0.5, temporal_boost: float = 0.1 ) -> HierarchicalMatcher: """ Créer un matcher avec configuration personnalisée. Args: window_weight: Poids du niveau fenêtre region_weight: Poids du niveau région element_weight: Poids du niveau élément temporal_boost: Boost pour successeurs valides Returns: HierarchicalMatcher configuré """ config = HierarchicalMatcherConfig( window_weight=window_weight, region_weight=region_weight, element_weight=element_weight, temporal_boost=temporal_boost ) return HierarchicalMatcher(config)