"""NodeMatcher - Matching de ScreenStates contre WorkflowNodes en temps réel.""" import logging import json from pathlib import Path from datetime import datetime from typing import List, Optional, Tuple, Dict, Any import numpy as np from core.models.screen_state import ScreenState from core.models.workflow_graph import WorkflowNode from core.embedding.state_embedding_builder import StateEmbeddingBuilder from core.embedding.faiss_manager import FAISSManager from core.execution.error_handler import ErrorHandler, ErrorType, RecoveryStrategy logger = logging.getLogger(__name__) class NodeMatcher: """Matcher pour trouver le WorkflowNode correspondant à un ScreenState.""" def __init__( self, embedding_builder: Optional[StateEmbeddingBuilder] = None, faiss_manager: Optional[FAISSManager] = None, error_handler: Optional[ErrorHandler] = None, similarity_threshold: float = 0.85, failed_matches_dir: str = "data/failed_matches" ): self.embedding_builder = embedding_builder or StateEmbeddingBuilder() self.faiss_manager = faiss_manager self.error_handler = error_handler or ErrorHandler() self.similarity_threshold = similarity_threshold self.failed_matches_dir = Path(failed_matches_dir) self.failed_matches_dir.mkdir(parents=True, exist_ok=True) logger.info(f"NodeMatcher initialized with threshold={similarity_threshold}") def match( self, current_state: ScreenState, candidate_nodes: List[WorkflowNode] ) -> Optional[Tuple[WorkflowNode, float]]: """ Trouver le WorkflowNode qui matche le mieux le ScreenState actuel. Returns: Tuple (node, confidence) si match trouvé, None sinon """ if not candidate_nodes: logger.warning("No candidate nodes provided") return None state_embedding = self.embedding_builder.build(current_state) current_vector = state_embedding.get_vector() if self.faiss_manager: return self._match_with_faiss(current_vector, candidate_nodes) return self._match_linear(current_state, current_vector, candidate_nodes) def _match_with_faiss( self, query_vector: np.ndarray, candidate_nodes: List[WorkflowNode] ) -> Optional[Tuple[WorkflowNode, float]]: """Matcher avec recherche FAISS.""" results = self.faiss_manager.search(query_vector, k=5) if not results: return None best_match = None best_confidence = 0.0 for result in results: similarity = result['similarity'] if similarity < self.similarity_threshold: continue for node in candidate_nodes: if result['metadata'].get('node_id') == node.node_id: if similarity > best_confidence: best_match = node best_confidence = similarity if best_match: logger.info(f"Matched node {best_match.node_id} with confidence {best_confidence:.3f}") return (best_match, best_confidence) return None def _match_linear( self, current_state: ScreenState, current_vector: np.ndarray, candidate_nodes: List[WorkflowNode] ) -> Optional[Tuple[WorkflowNode, float]]: """Matcher avec recherche linéaire.""" best_match = None best_confidence = 0.0 for node in candidate_nodes: matches, confidence = node.matches(current_state, current_vector) if matches and confidence > best_confidence: best_match = node best_confidence = confidence if best_match and best_confidence >= self.similarity_threshold: logger.info(f"Matched node {best_match.node_id} with confidence {best_confidence:.3f}") return (best_match, best_confidence) # Échec de matching - utiliser ErrorHandler recovery = self.error_handler.handle_matching_failure( current_state, candidate_nodes, best_confidence, self.similarity_threshold ) logger.warning( f"No match found (best confidence: {best_confidence:.3f}, threshold: {self.similarity_threshold})" ) logger.info(f"Recovery strategy: {recovery.strategy_used.value} - {recovery.message}") # Logger aussi les détails localement pour compatibilité self._log_failed_match(current_state, current_vector, candidate_nodes, best_confidence) return None def validate_constraints( self, state: ScreenState, node: WorkflowNode ) -> bool: """Valider les contraintes du node contre l'état.""" template = node.screen_template if template.window_title_pattern: if not state.raw_level or not state.raw_level.window_title: return False return True def _log_failed_match( self, state: ScreenState, state_vector: np.ndarray, candidate_nodes: List[WorkflowNode], best_confidence: float ): """ Logger un échec de matching avec tous les détails pour analyse. Sauvegarde: - Screenshot de l'état non matché - Vecteur d'embedding - Similarités avec tous les nodes candidats - Suggestions de mise à jour ou création de node """ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") failed_match_id = f"failed_match_{timestamp}" failed_match_dir = self.failed_matches_dir / failed_match_id failed_match_dir.mkdir(parents=True, exist_ok=True) # Sauvegarder le screenshot if state.raw_level and state.raw_level.screenshot_path: import shutil screenshot_dest = failed_match_dir / "screenshot.png" try: shutil.copy(state.raw_level.screenshot_path, screenshot_dest) logger.debug(f"Screenshot saved to {screenshot_dest}") except Exception as e: logger.error(f"Failed to copy screenshot: {e}") # Sauvegarder le vecteur d'embedding vector_path = failed_match_dir / "state_embedding.npy" np.save(vector_path, state_vector) # Calculer similarités avec tous les nodes similarities = [] for node in candidate_nodes: if node.screen_template.embedding_prototype_path: try: prototype = np.load(node.screen_template.embedding_prototype_path) similarity = float(np.dot(state_vector, prototype)) similarities.append({ 'node_id': node.node_id, 'node_label': node.label, 'similarity': similarity, 'threshold': self.similarity_threshold, 'matched': similarity >= self.similarity_threshold }) except Exception as e: logger.error(f"Failed to load prototype for node {node.node_id}: {e}") # Trier par similarité décroissante similarities.sort(key=lambda x: x['similarity'], reverse=True) # Générer suggestions suggestions = self._generate_suggestions(similarities, best_confidence) # Sauvegarder le rapport report = { 'timestamp': timestamp, 'failed_match_id': failed_match_id, 'state': { 'window_title': state.raw_level.window_title if state.raw_level else None, 'screenshot_path': str(state.raw_level.screenshot_path) if state.raw_level else None, 'ui_elements_count': len(state.perception_level.ui_elements) if state.perception_level else 0 }, 'matching_results': { 'best_confidence': best_confidence, 'threshold': self.similarity_threshold, 'num_candidates': len(candidate_nodes), 'similarities': similarities }, 'suggestions': suggestions } report_path = failed_match_dir / "report.json" with open(report_path, 'w') as f: json.dump(report, f, indent=2) logger.info(f"Failed match logged to {failed_match_dir}") logger.info(f"Suggestions: {', '.join(suggestions)}") def _generate_suggestions( self, similarities: List[Dict[str, Any]], best_confidence: float ) -> List[str]: """Générer des suggestions d'action basées sur les similarités.""" suggestions = [] if not similarities: suggestions.append("CREATE_NEW_NODE: Aucun node candidat, créer un nouveau node") return suggestions best_match = similarities[0] if best_confidence < 0.70: suggestions.append( f"CREATE_NEW_NODE: Similarité très faible ({best_confidence:.3f}), " "probablement un nouvel état" ) elif best_confidence < self.similarity_threshold: suggestions.append( f"UPDATE_NODE: Similarité proche ({best_confidence:.3f}) avec node " f"'{best_match['node_label']}', considérer mise à jour du prototype" ) suggestions.append( f"ADJUST_THRESHOLD: Ou réduire le seuil de {self.similarity_threshold} " f"à {best_confidence - 0.02:.3f}" ) # Vérifier si plusieurs nodes ont des similarités proches if len(similarities) >= 2: diff = similarities[0]['similarity'] - similarities[1]['similarity'] if diff < 0.05: suggestions.append( f"AMBIGUOUS_MATCH: Deux nodes très similaires " f"({similarities[0]['node_label']}: {similarities[0]['similarity']:.3f}, " f"{similarities[1]['node_label']}: {similarities[1]['similarity']:.3f}), " "affiner les prototypes" ) return suggestions def detect_ui_change( self, current_state: ScreenState, expected_node: WorkflowNode, current_similarity: float ) -> Tuple[bool, Optional[Any]]: """ Détecter si l'UI a changé de manière significative. Args: current_state: État actuel expected_node: Node attendu current_similarity: Similarité actuelle avec le prototype Returns: Tuple (ui_changed, recovery_result) """ return self.error_handler.detect_ui_change( current_state, expected_node, current_similarity ) def get_error_statistics(self) -> Dict[str, Any]: """ Obtenir les statistiques d'erreurs depuis l'ErrorHandler. Returns: Dict avec statistiques d'erreurs """ return self.error_handler.get_error_statistics() if __name__ == '__main__': logging.basicConfig(level=logging.INFO) matcher = NodeMatcher() logger.info(f"NodeMatcher initialized: {matcher}")