""" VariantManager - Gestion des variantes d'écran et états UI Ce module gère: - Détection et groupement de variantes d'écran - Matching avec variantes multiples - Détection d'états UI (enabled, disabled, etc.) - Détection d'overlays (modals, popups, etc.) """ import logging from typing import List, Dict, Optional, Any, Tuple from dataclasses import dataclass, field from datetime import datetime from enum import Enum from pathlib import Path import numpy as np logger = logging.getLogger(__name__) # ============================================================================= # Enums et Dataclasses # ============================================================================= class UIState(Enum): """États possibles d'un élément UI""" ENABLED = "enabled" DISABLED = "disabled" CHECKED = "checked" UNCHECKED = "unchecked" LOADING = "loading" ERROR = "error" FOCUSED = "focused" SELECTED = "selected" HOVER = "hover" @dataclass class NodeVariant: """Variante d'un node de workflow""" variant_id: str node_id: str embedding: np.ndarray similarity_to_primary: float observation_count: int = 1 created_at: datetime = field(default_factory=datetime.now) metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "variant_id": self.variant_id, "node_id": self.node_id, "similarity_to_primary": self.similarity_to_primary, "observation_count": self.observation_count, "created_at": self.created_at.isoformat(), "metadata": self.metadata } @dataclass class VariantCluster: """Cluster de variantes similaires""" cluster_id: str prototype: np.ndarray member_count: int similarity_to_primary: float variants: List[NodeVariant] = field(default_factory=list) @dataclass class VariantMatchResult: """Résultat de matching avec variantes""" node_id: str variant_id: Optional[str] similarity: float is_primary: bool all_matches: List[Tuple[str, float]] = field(default_factory=list) @dataclass class OverlayInfo: """Information sur un overlay détecté""" overlay_type: str # "modal", "popup", "tooltip", "dropdown", "notification" bounds: Tuple[int, int, int, int] # (x, y, width, height) blocking: bool # True si bloque l'interaction confidence: float detected_at: datetime = field(default_factory=datetime.now) def to_dict(self) -> Dict[str, Any]: return { "overlay_type": self.overlay_type, "bounds": self.bounds, "blocking": self.blocking, "confidence": self.confidence, "detected_at": self.detected_at.isoformat() } @dataclass class VariantManagerConfig: """Configuration du gestionnaire de variantes""" # Seuils de similarité similarity_threshold: float = 0.7 # Seuil pour créer variante variant_match_threshold: float = 0.8 # Seuil pour matcher variante # Clustering max_variants_per_node: int = 5 min_observations_for_variant: int = 2 # Détection d'états disabled_opacity_threshold: float = 0.5 loading_animation_threshold: float = 0.3 # Détection d'overlays overlay_area_threshold: float = 0.1 # % de l'écran minimum overlay_contrast_threshold: float = 0.3 # Stockage variants_dir: str = "data/variants" # ============================================================================= # Gestionnaire de Variantes # ============================================================================= class VariantManager: """ Gestionnaire des variantes d'écran et états UI. Fonctionnalités: - Détection et groupement de variantes similaires - Matching contre toutes les variantes d'un node - Détection d'états UI (enabled, disabled, etc.) - Détection d'overlays (modals, popups) Example: >>> manager = VariantManager() >>> variants = manager.detect_variants(embeddings) >>> result = manager.match_with_variants(query, "node_001") """ def __init__(self, config: Optional[VariantManagerConfig] = None): """ Initialiser le gestionnaire. Args: config: Configuration (utilise défaut si None) """ self.config = config or VariantManagerConfig() # Stockage des variantes par node self._variants: Dict[str, List[NodeVariant]] = {} # Prototypes primaires par node self._primary_prototypes: Dict[str, np.ndarray] = {} # Créer répertoire Path(self.config.variants_dir).mkdir(parents=True, exist_ok=True) logger.info(f"VariantManager initialisé (threshold={self.config.similarity_threshold})") def detect_variants( self, embeddings: List[np.ndarray], similarity_threshold: Optional[float] = None ) -> List[VariantCluster]: """ Détecter et grouper les variantes dans un ensemble d'embeddings. Args: embeddings: Liste d'embeddings à analyser similarity_threshold: Seuil de similarité (défaut: config) Returns: Liste de VariantCluster """ if not embeddings: return [] threshold = similarity_threshold or self.config.similarity_threshold # Clustering simple basé sur similarité clusters = [] assigned = set() for i, emb_i in enumerate(embeddings): if i in assigned: continue # Créer nouveau cluster cluster_members = [i] assigned.add(i) # Trouver membres similaires for j, emb_j in enumerate(embeddings): if j in assigned: continue similarity = self._cosine_similarity(emb_i, emb_j) if similarity >= threshold: cluster_members.append(j) assigned.add(j) # Calculer prototype du cluster cluster_embeddings = [embeddings[k] for k in cluster_members] prototype = np.mean(cluster_embeddings, axis=0) prototype = prototype / np.linalg.norm(prototype) cluster = VariantCluster( cluster_id=f"cluster_{len(clusters):03d}", prototype=prototype, member_count=len(cluster_members), similarity_to_primary=1.0 if len(clusters) == 0 else self._cosine_similarity( prototype, clusters[0].prototype if clusters else prototype ) ) clusters.append(cluster) logger.info(f"Détecté {len(clusters)} clusters de variantes") return clusters def add_variant( self, node_id: str, embedding: np.ndarray, metadata: Optional[Dict] = None ) -> Optional[str]: """ Ajouter une variante pour un node. Args: node_id: ID du node embedding: Embedding de la variante metadata: Métadonnées optionnelles Returns: ID de la variante créée ou None si trop similaire """ # Normaliser embedding norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm # Vérifier si prototype primaire existe if node_id not in self._primary_prototypes: self._primary_prototypes[node_id] = embedding self._variants[node_id] = [] logger.info(f"Prototype primaire créé pour node {node_id}") return None # Pas de variante, c'est le primaire # Calculer similarité avec primaire primary = self._primary_prototypes[node_id] similarity = self._cosine_similarity(embedding, primary) # Si trop similaire au primaire, pas de variante if similarity >= self.config.similarity_threshold: logger.debug(f"Embedding trop similaire au primaire ({similarity:.3f})") return None # Vérifier similarité avec variantes existantes for variant in self._variants.get(node_id, []): var_similarity = self._cosine_similarity(embedding, variant.embedding) if var_similarity >= self.config.similarity_threshold: # Mettre à jour variante existante variant.observation_count += 1 logger.debug(f"Variante existante mise à jour: {variant.variant_id}") return variant.variant_id # Vérifier limite de variantes if len(self._variants.get(node_id, [])) >= self.config.max_variants_per_node: logger.warning(f"Limite de variantes atteinte pour node {node_id}") return None # Créer nouvelle variante variant_id = f"{node_id}_var_{len(self._variants.get(node_id, [])) + 1:03d}" variant = NodeVariant( variant_id=variant_id, node_id=node_id, embedding=embedding, similarity_to_primary=similarity, metadata=metadata or {} ) if node_id not in self._variants: self._variants[node_id] = [] self._variants[node_id].append(variant) # Sauvegarder self._save_variant(variant) logger.info(f"Variante {variant_id} créée (similarité={similarity:.3f})") return variant_id def match_with_variants( self, query_embedding: np.ndarray, node_id: str ) -> VariantMatchResult: """ Matcher un embedding contre toutes les variantes d'un node. Args: query_embedding: Embedding à matcher node_id: ID du node Returns: VariantMatchResult avec le meilleur match """ # Normaliser query norm = np.linalg.norm(query_embedding) if norm > 0: query_embedding = query_embedding / norm all_matches = [] # Matcher contre prototype primaire if node_id in self._primary_prototypes: primary = self._primary_prototypes[node_id] primary_similarity = self._cosine_similarity(query_embedding, primary) all_matches.append(("primary", primary_similarity)) else: primary_similarity = 0.0 # Matcher contre variantes for variant in self._variants.get(node_id, []): similarity = self._cosine_similarity(query_embedding, variant.embedding) all_matches.append((variant.variant_id, similarity)) # Trier par similarité décroissante all_matches.sort(key=lambda x: x[1], reverse=True) if not all_matches: return VariantMatchResult( node_id=node_id, variant_id=None, similarity=0.0, is_primary=True, all_matches=[] ) best_match = all_matches[0] is_primary = best_match[0] == "primary" return VariantMatchResult( node_id=node_id, variant_id=None if is_primary else best_match[0], similarity=best_match[1], is_primary=is_primary, all_matches=all_matches ) def detect_ui_states( self, elements: List[Any] ) -> Dict[str, UIState]: """ Détecter les états UI des éléments. Analyse les caractéristiques visuelles pour déterminer: - enabled/disabled - checked/unchecked - loading - error - focused Args: elements: Liste d'éléments UI détectés Returns: Dict {element_id: UIState} """ states = {} for element in elements: element_id = getattr(element, 'element_id', None) or element.get('id', str(id(element))) # Analyser caractéristiques state = self._analyze_element_state(element) states[element_id] = state return states def _analyze_element_state(self, element: Any) -> UIState: """Analyser l'état d'un élément UI.""" # Récupérer attributs opacity = getattr(element, 'opacity', None) or element.get('opacity', 1.0) color = getattr(element, 'color', None) or element.get('color', None) border = getattr(element, 'border', None) or element.get('border', None) is_checked = getattr(element, 'checked', None) or element.get('checked', None) has_focus = getattr(element, 'focused', None) or element.get('focused', False) is_loading = getattr(element, 'loading', None) or element.get('loading', False) has_error = getattr(element, 'error', None) or element.get('error', False) # Déterminer état if has_error: return UIState.ERROR if is_loading: return UIState.LOADING if has_focus: return UIState.FOCUSED if is_checked is not None: return UIState.CHECKED if is_checked else UIState.UNCHECKED # Détecter disabled par opacité if opacity is not None and opacity < self.config.disabled_opacity_threshold: return UIState.DISABLED # Détecter disabled par couleur grisée if color and self._is_grayed_color(color): return UIState.DISABLED return UIState.ENABLED def _is_grayed_color(self, color: Any) -> bool: """Vérifier si une couleur est grisée (désactivée).""" if isinstance(color, (list, tuple)) and len(color) >= 3: r, g, b = color[:3] # Couleur grisée: faible saturation et luminosité moyenne max_c = max(r, g, b) min_c = min(r, g, b) saturation = (max_c - min_c) / max_c if max_c > 0 else 0 return saturation < 0.2 and 100 < max_c < 200 return False def detect_overlay( self, screenshot: Any, baseline: Optional[Any] = None ) -> Optional[OverlayInfo]: """ Détecter un overlay (modal, popup, etc.) sur l'écran. Args: screenshot: Image actuelle baseline: Image de référence (optionnel) Returns: OverlayInfo si overlay détecté, None sinon """ try: import numpy as np # Convertir en numpy si nécessaire if hasattr(screenshot, 'numpy'): current = screenshot.numpy() elif hasattr(screenshot, '__array__'): current = np.array(screenshot) else: current = screenshot if current is None: return None # Analyser l'image pour détecter overlay overlay_info = self._detect_overlay_regions(current, baseline) return overlay_info except Exception as e: logger.warning(f"Erreur détection overlay: {e}") return None def _detect_overlay_regions( self, current: np.ndarray, baseline: Optional[np.ndarray] ) -> Optional[OverlayInfo]: """Détecter les régions d'overlay dans l'image.""" if current is None or len(current.shape) < 2: return None height, width = current.shape[:2] # Méthode 1: Détecter zones de contraste élevé (overlay sombre) if len(current.shape) == 3: gray = np.mean(current, axis=2) else: gray = current # Calculer gradient pour détecter bords grad_x = np.abs(np.diff(gray, axis=1)) grad_y = np.abs(np.diff(gray, axis=0)) # Détecter zone centrale avec bords marqués (typique d'un modal) center_x, center_y = width // 2, height // 2 # Chercher rectangle centré avec bords # Simplification: vérifier si zone centrale a contraste différent center_region = gray[ center_y - height//4:center_y + height//4, center_x - width//4:center_x + width//4 ] if center_region.size == 0: return None center_mean = np.mean(center_region) overall_mean = np.mean(gray) # Si zone centrale significativement différente contrast_diff = abs(center_mean - overall_mean) / 255.0 if contrast_diff > self.config.overlay_contrast_threshold: # Overlay probable détecté bounds = ( center_x - width//4, center_y - height//4, width//2, height//2 ) # Déterminer type d'overlay area_ratio = (width//2 * height//2) / (width * height) if area_ratio > 0.5: overlay_type = "modal" blocking = True elif area_ratio > 0.2: overlay_type = "popup" blocking = True else: overlay_type = "tooltip" blocking = False return OverlayInfo( overlay_type=overlay_type, bounds=bounds, blocking=blocking, confidence=min(1.0, contrast_diff * 2) ) return None def get_variants(self, node_id: str) -> List[NodeVariant]: """Récupérer les variantes d'un node.""" return self._variants.get(node_id, []) def get_primary_prototype(self, node_id: str) -> Optional[np.ndarray]: """Récupérer le prototype primaire d'un node.""" return self._primary_prototypes.get(node_id) def _save_variant(self, variant: NodeVariant) -> None: """Sauvegarder une variante sur disque.""" variant_path = Path(self.config.variants_dir) / f"{variant.variant_id}.npy" np.save(str(variant_path), variant.embedding) def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float: """Calculer similarité cosinus.""" norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 return float(np.dot(a, b) / (norm_a * norm_b)) def get_config(self) -> VariantManagerConfig: """Récupérer la configuration.""" return self.config # ============================================================================= # Fonctions utilitaires # ============================================================================= def create_variant_manager( similarity_threshold: float = 0.7, max_variants: int = 5 ) -> VariantManager: """ Créer un gestionnaire avec configuration personnalisée. Args: similarity_threshold: Seuil pour créer variante max_variants: Nombre max de variantes par node Returns: VariantManager configuré """ config = VariantManagerConfig( similarity_threshold=similarity_threshold, max_variants_per_node=max_variants ) return VariantManager(config)