""" Target Resolver Avancé - Résolution de cibles UI pour RPA Vision V3 Auteur : Dom, Alice Kiro - 15 décembre 2024 Stratégies de résolution: 1. By Role - Recherche par rôle sémantique (button, input, etc.) 2. By Text - Recherche par texte exact ou partiel 3. By Position - Recherche par coordonnées avec tolérance 4. By Embedding - Recherche par similarité visuelle 5. By Hierarchy - Recherche dans la hiérarchie DOM/UI 6. By Context - Recherche contextuelle (près de X, dans Y) 7. By Spatial - Recherche par relations spatiales (Exigences 5.3, 5.4) 8. Composite - Combinaison de plusieurs stratégies Fonctionnalités avancées: - Fallback automatique entre stratégies - Fallback spatial avec éléments ancres (Exigence 5.4) - Détection d'état visuel (enabled/disabled) (Exigence 5.5) - Scoring multi-critères - Cache de résolutions récentes - Apprentissage des patterns de résolution - Gestion d'erreurs centralisée avec ErrorHandler - Fiche #18: Apprentissage persistant "mix" (JSONL + SQLite) """ import logging import json import math import re import time import unicodedata from typing import Optional, List, Tuple, Dict, Any, Callable from dataclasses import dataclass, field from enum import Enum import numpy as np from collections import OrderedDict from difflib import SequenceMatcher from ..models.screen_state import ScreenState from ..models.ui_element import UIElement from ..models.workflow_graph import TargetSpec from .spatial_index import SpatialIndexGrid from .screen_signature import screen_signature from .target_memory import TargetFingerprint from .computation_cache import ComputationCache, cached_bbox_center, cached_euclidean_distance # Fiche #18: Import TargetMemoryStore pour apprentissage persistant try: from ..learning.target_memory_store import TargetMemoryStore, TargetFingerprint as PersistentFingerprint TARGET_MEMORY_AVAILABLE = True except ImportError: TARGET_MEMORY_AVAILABLE = False TargetMemoryStore = None PersistentFingerprint = None # Fiche #13: Helpers bbox XYWH standardisés def _bbox_to_tuple(bbox): """Convert bbox to tuple format (x, y, w, h) for compatibility""" if hasattr(bbox, 'to_tuple'): return bbox.to_tuple() return bbox def _bbox_right(bbox) -> float: bbox_tuple = _bbox_to_tuple(bbox) return float(bbox_tuple[0] + bbox_tuple[2]) def _bbox_bottom(bbox) -> float: bbox_tuple = _bbox_to_tuple(bbox) return float(bbox_tuple[1] + bbox_tuple[3]) def _bbox_center(bbox) -> tuple[float, float]: bbox_tuple = _bbox_to_tuple(bbox) return (float(bbox_tuple[0] + bbox_tuple[2] / 2), float(bbox_tuple[1] + bbox_tuple[3] / 2)) def _bbox_area(bbox) -> float: bbox_tuple = _bbox_to_tuple(bbox) return float(bbox_tuple[2] * bbox_tuple[3]) def _bbox_contains_point(bbox, x: float, y: float) -> bool: """Point inside bbox""" bbox_tuple = _bbox_to_tuple(bbox) return (bbox_tuple[0] <= x <= bbox_tuple[0] + bbox_tuple[2]) and (bbox_tuple[1] <= y <= bbox_tuple[1] + bbox_tuple[3]) def _bbox_intersects(a, b) -> bool: """Check if two bboxes intersect""" a_tuple = _bbox_to_tuple(a) b_tuple = _bbox_to_tuple(b) ax1, ay1, aw, ah = a_tuple bx1, by1, bw, bh = b_tuple ax2, ay2 = ax1 + aw, ay1 + ah bx2, by2 = bx1 + bw, by1 + bh return not (ax2 <= bx1 or bx2 <= ax1 or ay2 <= by1 or by2 <= ay1) # Import SpatialAnalyzer pour fallback spatial try: from ..detection.spatial_analyzer import SpatialAnalyzer, RelationType, SpatialRelation SPATIAL_ANALYZER_AVAILABLE = True except ImportError: SPATIAL_ANALYZER_AVAILABLE = False SpatialAnalyzer = None RelationType = None # Import MetricsEngine pour Fiche #10 try: from ..precision.metrics_engine import MetricsEngine, get_global_metrics_engine METRICS_AVAILABLE = True except ImportError: METRICS_AVAILABLE = False MetricsEngine = None get_global_metrics_engine = lambda: None # Import ErrorHandler pour gestion centralisée des erreurs try: from .error_handler import ErrorHandler ERROR_HANDLER_AVAILABLE = True except ImportError: ERROR_HANDLER_AVAILABLE = False ErrorHandler = None logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Fiche #10 - Auto-healing: Role Aliases (terrain) # --------------------------------------------------------------------------- ROLE_ALIASES = { "input": {"input", "textfield", "text_field", "form_input", "forminput", "edit", "textbox"}, "button": {"button", "submit", "action", "cta"}, "label": {"label", "text", "data_display"}, "checkbox": {"checkbox", "check_box", "toggle"}, } TYPE_ALIASES = { "text_input": {"text_input", "input", "textfield"}, "button": {"button"}, } # --------------------------------------------------------------------------- # BBox helpers (contract: UIElement.bbox = (x, y, w, h)) # --------------------------------------------------------------------------- def _bbox_contains(outer, inner) -> bool: """inner bbox fully inside outer bbox (XYWH)""" outer_tuple = _bbox_to_tuple(outer) inner_tuple = _bbox_to_tuple(inner) return ( inner_tuple[0] >= outer_tuple[0] and inner_tuple[1] >= outer_tuple[1] and _bbox_right(inner) <= _bbox_right(outer) and _bbox_bottom(inner) <= _bbox_bottom(outer) ) def _bbox_intersection_area(a, b) -> float: a_tuple = _bbox_to_tuple(a) b_tuple = _bbox_to_tuple(b) x1 = max(a_tuple[0], b_tuple[0]) y1 = max(a_tuple[1], b_tuple[1]) x2 = min(_bbox_right(a), _bbox_right(b)) y2 = min(_bbox_bottom(a), _bbox_bottom(b)) if x2 <= x1 or y2 <= y1: return 0.0 return float((x2 - x1) * (y2 - y1)) def _bbox_iou(a, b) -> float: inter = _bbox_intersection_area(a, b) if inter <= 0: return 0.0 union = _bbox_area(a) + _bbox_area(b) - inter return float(inter / union) if union > 0 else 0.0 # --------------------------------------------------------------------------- # Fiche #7 - Layout helpers pour Form Logic # --------------------------------------------------------------------------- def _overlap_1d(a1, a2, b1, b2) -> float: """Calcul overlap 1D entre deux segments""" left = max(a1, b1) right = min(a2, b2) return max(0.0, right - left) def _x_overlap_ratio(a, b) -> float: """Ratio d'overlap en X / min(width)""" a_tuple = _bbox_to_tuple(a) b_tuple = _bbox_to_tuple(b) ax1, ax2 = a_tuple[0], _bbox_right(a) bx1, bx2 = b_tuple[0], _bbox_right(b) ov = _overlap_1d(ax1, ax2, bx1, bx2) denom = max(1.0, min(a_tuple[2], b_tuple[2])) return float(ov / denom) def _y_overlap_ratio(a, b) -> float: """Ratio d'overlap en Y / min(height)""" a_tuple = _bbox_to_tuple(a) b_tuple = _bbox_to_tuple(b) ay1, ay2 = a_tuple[1], _bbox_bottom(a) by1, by2 = b_tuple[1], _bbox_bottom(b) ov = _overlap_1d(ay1, ay2, by1, by2) denom = max(1.0, min(a_tuple[3], b_tuple[3])) return float(ov / denom) def _baseline_distance(a, b) -> float: """Distance entre centres Y (utile label ↔ input sur une ligne)""" return abs(_bbox_center(a)[1] - _bbox_center(b)[1]) # --------------------------------------------------------------------------- # Fiche #8 - Anti-bugs terrain : Normalisation + Fuzzy matching # --------------------------------------------------------------------------- _WS_RE = re.compile(r"\s+") def _norm_text(s: str) -> str: """Normalise le texte pour matching robuste (accents, casse, espaces, OCR)""" if not s: return "" # Remplace NBSP & co + trim s = s.replace("\u00A0", " ").strip().lower() # Enlève accents s = unicodedata.normalize("NFKD", s) s = "".join(ch for ch in s if not unicodedata.combining(ch)) # Harmonise tirets/ponctuation courante s = s.replace("-", "-").replace("–", "-").replace("—", "-") s = _WS_RE.sub(" ", s) # Petites confusions OCR (optionnel mais utile) s = s.translate(str.maketrans({"|": "l"})) return s def _fuzzy_ratio(a: str, b: str) -> float: """Calcul de similarité fuzzy entre deux textes normalisés""" a, b = _norm_text(a), _norm_text(b) if not a or not b: return 0.0 return SequenceMatcher(None, a, b).ratio() class ResolutionStrategy(str, Enum): """Stratégies de résolution disponibles""" BY_ROLE = "by_role" BY_TEXT = "by_text" BY_POSITION = "by_position" BY_EMBEDDING = "by_embedding" BY_HIERARCHY = "by_hierarchy" BY_CONTEXT = "by_context" BY_SPATIAL = "by_spatial" # Nouvelle stratégie (Exigence 5.3) COMPOSITE = "composite" class VisualState(str, Enum): """États visuels détectables (Exigence 5.5)""" ENABLED = "enabled" DISABLED = "disabled" FOCUSED = "focused" HOVERED = "hovered" SELECTED = "selected" CHECKED = "checked" UNCHECKED = "unchecked" LOADING = "loading" ERROR = "error" UNKNOWN = "unknown" @dataclass class ResolvedTarget: """Résultat d'une résolution de cible""" element: UIElement confidence: float strategy_used: str fallback_applied: bool = False resolution_details: Dict[str, Any] = field(default_factory=dict) alternatives: List["ResolvedTarget"] = field(default_factory=list) visual_state: VisualState = VisualState.UNKNOWN # État visuel détecté (Exigence 5.5) spatial_anchor: Optional[str] = None # ID de l'ancre spatiale utilisée @dataclass class ResolutionContext: """Contexte pour la résolution""" screen_state: ScreenState previous_target: Optional[UIElement] = None workflow_context: Dict[str, Any] = field(default_factory=dict) anchor_elements: List[UIElement] = field(default_factory=list) class TargetResolver: """ Résolveur de cibles UI avancé. Utilise plusieurs stratégies pour trouver l'élément UI correspondant à une spécification de cible, avec fallback automatique. Example: >>> resolver = TargetResolver() >>> target_spec = TargetSpec(by_role="button", by_text="Submit") >>> result = resolver.resolve_target(target_spec, screen_state) >>> if result: ... print(f"Found: {result.element.label} ({result.confidence:.2f})") """ def __init__( self, similarity_threshold: float = 0.75, position_tolerance: int = 50, text_fuzzy_threshold: float = 0.65, use_embedding_fallback: bool = True, use_spatial_fallback: bool = True, # Nouveau: fallback spatial (Exigence 5.4) cache_size: int = 100, metrics_engine: Optional[MetricsEngine] = None, # Fiche #10: Metrics Engine error_handler: Optional[ErrorHandler] = None, # Gestion d'erreurs centralisée enable_persistent_learning: bool = True, # Fiche #18: Apprentissage persistant persistent_memory_path: str = "data/learning" # Fiche #18: Chemin mémoire persistante ): """ Initialiser le résolveur. Args: similarity_threshold: Seuil de similarité minimum position_tolerance: Tolérance en pixels for by_position text_fuzzy_threshold: Seuil pour matching texte fuzzy use_embedding_fallback: Utiliser embeddings en fallback use_spatial_fallback: Utiliser relations spatiales en fallback cache_size: Taille du cache de résolutions metrics_engine: Moteur métriques pour Fiche #10 (optionnel) error_handler: Gestionnaire d'erreurs centralisé (optionnel) enable_persistent_learning: Activer l'apprentissage persistant (Fiche #18) persistent_memory_path: Chemin pour la mémoire persistante (Fiche #18) """ self.similarity_threshold = similarity_threshold self.position_tolerance = position_tolerance self.text_fuzzy_threshold = text_fuzzy_threshold self.use_embedding_fallback = use_embedding_fallback self.use_spatial_fallback = use_spatial_fallback # Cache LRU pour résolutions récentes self._cache: OrderedDict[str, ResolvedTarget] = OrderedDict() self._cache_size = cache_size # Fiche #13: Cache LRU d'index spatiaux (par screen_state) self._index_cache: OrderedDict[str, SpatialIndexGrid] = OrderedDict() self._index_cache_size = max(10, int(cache_size / 2)) # Fiche #14: Mémoire cross-frame LRU self._cross_frame_cache: OrderedDict[str, TargetFingerprint] = OrderedDict() self._cross_frame_cache_size = 200 # ajuste comme tu veux # Tâche 5.4: Cache de calculs redondants self._computation_cache = ComputationCache(max_size=500) # Fiche #10: Metrics Engine intégration self.metrics_engine = metrics_engine or get_global_metrics_engine() # Fiche #10: Auto-healing attempt counter self.healing_attempt = 0 # 0=normal, 1..n=secours # Gestion d'erreurs centralisée self.error_handler = error_handler if self.error_handler is None and ERROR_HANDLER_AVAILABLE: self.error_handler = ErrorHandler() # Fiche #18: Apprentissage persistant self.enable_persistent_learning = enable_persistent_learning self._persistent_memory = None if self.enable_persistent_learning and TARGET_MEMORY_AVAILABLE: try: self._persistent_memory = TargetMemoryStore(persistent_memory_path) logger.info("Persistent learning enabled with TargetMemoryStore") except Exception as e: logger.warning(f"Failed to initialize persistent memory: {e}") self.enable_persistent_learning = False elif self.enable_persistent_learning: logger.warning("Persistent learning requested but TargetMemoryStore not available") self.enable_persistent_learning = False # Stats self._stats = { "total_resolutions": 0, "successful": 0, "failed": 0, "cache_hits": 0, "fallbacks_used": 0, "spatial_fallbacks": 0, # Nouveau compteur "by_strategy": {} } # Embedding matcher (lazy load) self._embedding_matcher = None logger.info(f"TargetResolver initialized with sniper mode enabled, metrics_engine={'enabled' if self.metrics_engine else 'disabled'}") # ========================================================================= # Fiche #10 - Auto-healing: Healing Profile System # ========================================================================= def _healing_profile(self) -> Dict[str, Any]: """ Retourne un profil de tolérance selon healing_attempt. Fiche #10: Progressive relaxation des critères de matching. - Level 0 (Normal): Critères stricts - Level 1 (First Healing): Critères relaxés - Level 2+ (Desperate): Critères très relaxés Returns: Dict avec min_ratio, pad_mul, expand_roles """ a = int(getattr(self, "healing_attempt", 0) or 0) if a <= 0: # Normal: critères stricts return {"min_ratio": 0.82, "pad_mul": 1.0, "expand_roles": False} elif a == 1: # First healing: critères relaxés return {"min_ratio": 0.78, "pad_mul": 1.3, "expand_roles": True} else: # Desperate (a >= 2): critères très relaxés return {"min_ratio": 0.72, "pad_mul": 1.7, "expand_roles": True} # ========================================================================= # Fiche #14 - Screen signature + Cross-frame Target Memory # ========================================================================= def _make_cross_key(self, target_spec, sig: str) -> str: """Clé de cache cross-frame optimisée""" import hashlib # Créer une représentation compacte des éléments clés key_parts = [ sig, str(getattr(target_spec, "by_role", None) or ""), str(getattr(target_spec, "by_text", None) or ""), str(getattr(target_spec, "by_position", None) or ""), ] # Pour les objets complexes, utiliser leur hash hints = getattr(target_spec, "context_hints", None) if hints: hints_str = str(sorted(hints.items())) if isinstance(hints, dict) else str(hints) key_parts.append(hashlib.md5(hints_str.encode('utf-8')).hexdigest()[:8]) hard = getattr(target_spec, "hard_constraints", None) if hard: hard_str = str(sorted(hard.items())) if isinstance(hard, dict) else str(hard) key_parts.append(hashlib.md5(hard_str.encode('utf-8')).hexdigest()[:8]) # Limiter la longueur totale de la clé key = "|".join(key_parts) if len(key) > 200: # Limite raisonnable return hashlib.md5(key.encode('utf-8')).hexdigest() return key def _resolve_from_memory(self, target_spec, screen_state, ui_elements, context): """Résolution depuis mémoire cross-frame""" # signature robuste: layout sig = screen_signature(screen_state, ui_elements, mode="layout") key = self._make_cross_key(target_spec, sig) fp = self._cross_frame_cache.get(key) if not fp: return None # 1) match direct par element_id (si stable) by_id = {e.element_id: e for e in ui_elements} if fp.element_id in by_id: e = by_id[fp.element_id] return ResolvedTarget( element=e, confidence=min(0.92 * float(getattr(e, "confidence", 1.0) or 1.0), 1.0), strategy_used="CROSS_FRAME_CACHE", resolution_details={"cache": "element_id", "screen_sig": sig} ) # 2) sinon: search proche de l'ancienne bbox via index spatial (#13) idx = context.workflow_context.get("spatial_index") x, y, w, h = fp.bbox cx = int(x + w / 2) cy = int(y + h / 2) # bbox de recherche élargie (tolérance) pad = 120 query = (cx - pad, cy - pad, 2 * pad, 2 * pad) if idx: pool = idx.query_bbox(query) else: # Fallback: créer une recherche géométrique simple pool = [] for e in ui_elements: ex, ey = _bbox_center(e.bbox) if abs(ex - cx) <= pad and abs(ey - cy) <= pad: pool.append(e) # score simple: rôle/type + distance bbox center + label (si dispo) def center(b): return (b[0] + b[2] / 2.0, b[1] + b[3] / 2.0) fx, fy = center(fp.bbox) best = None best_s = -1e9 fp_role = (fp.role or "").lower() fp_type = (fp.etype or "").lower() fp_label = (fp.label or "").strip().lower() for e in pool: er = (getattr(e, "role", "") or "").lower() et = (getattr(e, "type", "") or "").lower() # gating léger role_ok = (fp_role and er == fp_role) or (not fp_role) type_ok = (fp_type and et == fp_type) or (not fp_type) if not (role_ok or type_ok): continue ex, ey = center(e.bbox) d = ((ex - fx) ** 2 + (ey - fy) ** 2) ** 0.5 s = 0.0 if er == fp_role: s += 1.0 if et == fp_type: s += 0.6 # bonus label exact si disponible elbl = (getattr(e, "label", "") or "").strip().lower() if fp_label and elbl and (fp_label == elbl or fp_label in elbl or elbl in fp_label): s += 0.7 # pénalité distance s -= (d / 120.0) if s > best_s: best_s = s best = e if not best: return None return ResolvedTarget( element=best, confidence=min(0.85 * float(getattr(best, "confidence", 1.0) or 1.0), 1.0), strategy_used="CROSS_FRAME_CACHE", resolution_details={"cache": "near_bbox", "screen_sig": sig, "score": round(best_s, 4)} ) # ========================================================================= # Fiche #11 - Multi-anchor + contraintes combinées: Helper Methods # ========================================================================= def _as_text_list(self, v): """ Convertit une valeur en liste de textes. Args: v: None, str, list, tuple ou autre Returns: List[str]: Liste de strings non-vides """ if v is None: return [] if isinstance(v, (list, tuple)): return [str(x) for x in v if x] return [str(v)] def _container_bbox_from_text(self, text: str, ui_elements): """ Trouve un conteneur par texte avec détection intelligente. Process: 1. Trouver un élément portant ce texte (label/panel) 2. Si c'est déjà un panel/container, utiliser sa bbox 3. Sinon: plus petit container qui contient ce label Args: text: Texte à rechercher pour identifier le conteneur ui_elements: Liste des éléments UI Returns: Optional[Tuple]: BBox du conteneur ou None si non trouvé """ # 1) Trouver un élément portant ce texte (label/panel) anchor = self._find_element_by_text(text, ui_elements, min_ratio=self._healing_profile()["min_ratio"]) if not anchor: return None # 2) Si c'est déjà un panel/container, ok r = (getattr(anchor, "role", "") or "").lower() t = (getattr(anchor, "type", "") or "").lower() if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}: return anchor.bbox # 3) Sinon: plus petit container qui contient ce label containers = [] for e in ui_elements: rr = (getattr(e, "role", "") or "").lower() tt = (getattr(e, "type", "") or "").lower() if rr in {"panel", "container", "group", "form"} or tt in {"panel", "container", "group", "form"}: if _bbox_contains(e.bbox, anchor.bbox): containers.append(e) if not containers: return None # Retourner le plus petit container (plus spécifique) containers.sort(key=lambda c: _bbox_area(c.bbox)) return containers[0].bbox def _apply_hard_constraints(self, candidates, target_spec, ui_elements): """ Applique les contraintes strictes pour filtrer les candidats. Args: candidates: Liste des éléments candidats target_spec: Spécification de la cible avec hard_constraints ui_elements: Liste complète des éléments UI Returns: List[UIElement]: Candidats filtrés après application des contraintes """ hc = getattr(target_spec, "hard_constraints", None) or {} filtered_candidates = list(candidates) # Copie pour éviter de modifier l'original # Exemple: forcer un container identifié par texte if "within_container_text" in hc and hc["within_container_text"]: cb = self._container_bbox_from_text(str(hc["within_container_text"]), ui_elements) if cb is not None: before_count = len(filtered_candidates) filtered_candidates = [e for e in filtered_candidates if _bbox_contains(cb, e.bbox)] after_count = len(filtered_candidates) logger.debug(f"Container constraint '{hc['within_container_text']}' filtered {before_count - after_count} candidates") # Exemple: ignorer tiny elements (si tu veux) if "min_area" in hc: min_area = float(hc["min_area"]) before_count = len(filtered_candidates) filtered_candidates = [e for e in filtered_candidates if _bbox_area(e.bbox) >= min_area] after_count = len(filtered_candidates) logger.debug(f"Min area constraint {min_area} filtered {before_count - after_count} candidates") return filtered_candidates # ========================================================================= # Fiche #6 - Sniper Mode : Scoring et Ranking # ========================================================================= def _score_candidate_sniper_cached( self, elem, base_score: float, anchor_elem, roi_bbox, container_bbox, hints=None, ui_elements=None, weights=None, ui_analysis_cache=None, distance_cache=None, alignment_cache=None ) -> float: """ Sniper scoring avec cache pour éviter les calculs redondants (Tâche 8.1). Auteur : Dom, Alice Kiro - 15 décembre 2024 Optimisations implémentées (Exigences 8.2, 8.4): - Cache des analyses d'éléments UI entre ancres - Cache des calculs de distance - Cache des calculs d'alignement - Réutilisation des calculs coûteux Args: elem: Élément candidat à scorer base_score: Score de base (role/text/embedding) anchor_elem: Élément ancre (optionnel) roi_bbox: Zone d'intérêt (optionnel) container_bbox: Container préféré (optionnel) hints: Context hints pour alignement ui_elements: Tous les éléments UI weights: Pondérations personnalisées (optionnel) ui_analysis_cache: Cache des analyses d'éléments UI distance_cache: Cache des calculs de distance alignment_cache: Cache des calculs d'alignement Returns: Score composite pondéré avec optimisations de cache """ # Initialiser les caches si non fournis if ui_analysis_cache is None: ui_analysis_cache = {} if distance_cache is None: distance_cache = {} if alignment_cache is None: alignment_cache = {} # Fiche #11 - Tâche 5.1: Extraire les poids avec defaults if weights is None: weights = {} # Poids par défaut (Exigences 3.1, 3.2, 3.3, 3.4) w_proximity = float(weights.get("proximity", 0.35)) w_alignment = float(weights.get("alignment", 0.25)) w_container = float(weights.get("container", 0.15)) w_roi_iou = float(weights.get("roi_iou", 0.25)) # Normaliser les poids pour qu'ils somment à 1.0 total_weight = w_proximity + w_alignment + w_container + w_roi_iou if total_weight > 0: w_proximity /= total_weight w_alignment /= total_weight w_container /= total_weight w_roi_iou /= total_weight s = float(base_score) # Tâche 8.1: Calculer les composants individuels avec cache (Exigence 8.4) proximity_score = 0.0 alignment_score = 0.0 container_score = 0.0 roi_iou_score = 0.0 # Composant 1: Proximité à l'ancre avec cache (Exigence 8.2) if anchor_elem is not None: # Clé de cache pour la distance distance_key = f"dist_{elem.element_id}_{anchor_elem.element_id}" if distance_key in distance_cache: distance = distance_cache[distance_key] else: # Calculer et mettre en cache ax, ay = _bbox_center(anchor_elem.bbox) ex, ey = _bbox_center(elem.bbox) dx = ex - ax dy = ey - ay distance = (dx * dx + dy * dy) ** 0.5 distance_cache[distance_key] = distance # Normalisation : 200px = zone "proche" d0 = 200.0 proximity_score = 1.0 / (1.0 + (distance / d0)) # [~0..1] # Composant 2: Alignement avec l'ancre avec cache (Exigence 8.4) if anchor_elem is not None and hints: # Clé de cache pour l'alignement alignment_key = f"align_{elem.element_id}_{anchor_elem.element_id}_{hash(str(sorted(hints.items())))}" if alignment_key in alignment_cache: alignment_bonus = alignment_cache[alignment_key] else: # Calculer et mettre en cache alignment_bonus = self._alignment_bonus(elem, anchor_elem, hints) alignment_cache[alignment_key] = alignment_bonus # Convertir le bonus multiplicatif en score [0..1] alignment_score = min(1.0, max(0.0, (alignment_bonus - 1.0) / 0.2)) # Normaliser autour de 1.0-1.2 # Composant 3: Préférence container (Exigence 3.4) if container_bbox is not None: # Clé de cache pour l'analyse de containment container_key = f"container_{elem.element_id}_{hash(str(container_bbox))}" if container_key in ui_analysis_cache: container_score = ui_analysis_cache[container_key] else: # Calculer et mettre en cache if _bbox_contains(container_bbox, elem.bbox): container_score = 1.0 else: # Score partiel basé sur la proximité au container elem_center = _bbox_center(elem.bbox) container_center = _bbox_center(container_bbox) dist_to_container = ((elem_center[0] - container_center[0]) ** 2 + (elem_center[1] - container_center[1]) ** 2) ** 0.5 container_score = max(0.0, 1.0 - (dist_to_container / 300.0)) # Décroissance sur 300px ui_analysis_cache[container_key] = container_score # Composant 4: Intersection avec ROI (Exigence 3.1) if roi_bbox is not None: # Clé de cache pour l'IOU roi_key = f"roi_{elem.element_id}_{hash(str(roi_bbox))}" if roi_key in ui_analysis_cache: roi_iou_score = ui_analysis_cache[roi_key] else: # Calculer et mettre en cache roi_iou_score = _bbox_iou(elem.bbox, roi_bbox) ui_analysis_cache[roi_key] = roi_iou_score # Fiche #11 - Tâche 5.2: Scoring composite avec pondération légère # Appliquer les pondérations configurables weighted_bonus = ( w_proximity * proximity_score + w_alignment * alignment_score + w_container * container_score + w_roi_iou * roi_iou_score ) # Multiplier le score de base par un facteur pondéré (Exigence 3.5) # Éviter de réécrire complètement le système existant s *= (1.0 + 0.4 * weighted_bonus) # Bonus jusqu'à +40% basé sur les composants pondérés # Bonus léger pour éléments "cliquables" (logique existante) role = (getattr(elem, "role", "") or "").lower() etype = (getattr(elem, "type", "") or "").lower() if role in {"submit", "button", "textfield", "input", "form_input"} or etype in {"button", "text_input"}: s *= 1.05 # Intègre la confiance (logique existante) conf = float(getattr(elem, "confidence", 1.0) or 1.0) s *= (0.75 + 0.25 * min(conf, 1.0)) # Fiche #7: Bonus "même petit container commun" avec cache (logique existante) if anchor_elem and ui_elements: # Clé de cache pour le container commun common_container_key = f"common_{elem.element_id}_{anchor_elem.element_id}" if common_container_key in ui_analysis_cache: has_common_container = ui_analysis_cache[common_container_key] else: # Calculer et mettre en cache common = self._smallest_common_container_bbox(anchor_elem, elem, ui_elements) has_common_container = common is not None ui_analysis_cache[common_container_key] = has_common_container if has_common_container: s *= 1.12 # Fiche #12: Bonus "same row/column" pour ranking avec cache if hints and ui_elements: hp = self._healing_profile() # Cache pour les calculs de lignes (coûteux) rows_key = f"rows_{hash(str([e.element_id for e in ui_elements]))}" if rows_key in ui_analysis_cache: rows, row_of = ui_analysis_cache[rows_key] else: rows, row_of = self._build_rows(ui_elements) ui_analysis_cache[rows_key] = (rows, row_of) # same_row_as_text avec cache for txt in self._as_text_list(hints.get("same_row_as_text")): row_anchor_key = f"row_anchor_{txt}_{hash(str([e.element_id for e in ui_elements]))}" if row_anchor_key in ui_analysis_cache: anchors = ui_analysis_cache[row_anchor_key] else: anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"]) ui_analysis_cache[row_anchor_key] = anchors if anchors: anchor = anchors[0] if row_of.get(anchor.element_id) == row_of.get(elem.element_id): s *= 1.15 break # same_column_as_text (approx via X overlap) avec cache def _x_overlap_ratio_local(b1, b2): ax1, ax2 = b1[0], _bbox_right(b1) bx1, bx2 = b2[0], _bbox_right(b2) ov = max(0.0, min(ax2, bx2) - max(ax1, bx1)) denom = max(1.0, min(b1[2], b2[2])) return ov / denom for txt in self._as_text_list(hints.get("same_column_as_text")): col_anchor_key = f"col_anchor_{txt}_{hash(str([e.element_id for e in ui_elements]))}" if col_anchor_key in ui_analysis_cache: anchors = ui_analysis_cache[col_anchor_key] else: anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"]) ui_analysis_cache[col_anchor_key] = anchors if anchors: anchor = anchors[0] overlap_key = f"x_overlap_{elem.element_id}_{anchor.element_id}" if overlap_key in ui_analysis_cache: x_overlap = ui_analysis_cache[overlap_key] else: x_overlap = _x_overlap_ratio_local(elem.bbox, anchor.bbox) ui_analysis_cache[overlap_key] = x_overlap if x_overlap > 0.35: s *= 1.10 break return s def _score_candidate_sniper( self, elem, base_score: float, anchor_elem, roi_bbox, container_bbox, hints=None, ui_elements=None, weights=None ) -> float: """ Sniper scoring avec pondération configurable (Fiche #11 - Tâche 5). Wrapper pour la compatibilité - utilise la version avec cache en interne. Auteur : Dom, Alice Kiro - 15 décembre 2024 """ # Utiliser la version avec cache pour bénéficier des optimisations return self._score_candidate_sniper_cached( elem=elem, base_score=base_score, anchor_elem=anchor_elem, roi_bbox=roi_bbox, container_bbox=container_bbox, hints=hints, ui_elements=ui_elements, weights=weights, # Caches temporaires pour cette invocation ui_analysis_cache={}, distance_cache={}, alignment_cache={} ) def _build_anchor_and_roi_and_container(self, target_spec, ui_elements): """ Déduit une ancre + une ROI + un container à partir des context_hints. Fiche #8: Gère les labels dupliqués en choisissant le meilleur anchor. - anchor: élément texte (label) le plus pertinent - roi: zone autour/à côté/au-dessous de l'ancre - container: un éventuel panel/container qui contient l'ancre """ hints = getattr(target_spec, "context_hints", None) or {} anchor = None roi = None container_bbox = None for key in ("below_text", "above_text", "right_of_text", "left_of_text", "near_text"): if key in hints and hints[key]: # Fiche #11: Support multi-anchor - convertir en liste de textes anchor_texts = self._as_text_list(hints[key]) # Fiche #8: Trouve tous les anchors candidats pour tous les textes # Fiche #10: Utiliser healing profile pour min_ratio hp = self._healing_profile() anchors = [] for text in anchor_texts: anchors.extend(self._find_anchors_by_text(text, ui_elements, min_ratio=hp["min_ratio"])) if not anchors: continue # Si un seul anchor, on le prend if len(anchors) == 1: anchor = anchors[0] else: # Fiche #8: Choisir le meilleur anchor selon le contexte best_anchor = None best_quality = -1.0 best_roi = None best_container = None for a in anchors: # Calcule ROI et container pour cet anchor x, y, w, h = a.bbox # Fiche #10: Appliquer healing profile pour padding hp = self._healing_profile() pad_x = int(300 * hp["pad_mul"]) pad_y = int(250 * hp["pad_mul"]) if "below_text" in hints: roi_candidate = (x - 20, y + h, w + pad_x, pad_y) elif "above_text" in hints: roi_candidate = (x - 20, y - pad_y, w + pad_x, pad_y) elif "right_of_text" in hints: roi_candidate = (x + w, y - 20, pad_x, h + 120) elif "left_of_text" in hints: roi_candidate = (x - pad_x, y - 20, pad_x, h + 120) else: roi_candidate = (x - 80, y - 80, w + 160, h + 160) # Container pour cet anchor containers = [] for e in ui_elements: r = (getattr(e, "role", "") or "").lower() t = (getattr(e, "type", "") or "").lower() if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group"}: if _bbox_contains(e.bbox, a.bbox): containers.append(e) container_candidate = None if containers: containers.sort(key=lambda c: _bbox_area(c.bbox)) container_candidate = containers[0].bbox # "Qualité" = préfère l'ancre qui est dans un container petit q = 0.0 if container_candidate is not None: q += 1.0 / max(1.0, _bbox_area(container_candidate)) if roi_candidate is not None: q += 0.1 if q > best_quality: best_quality = q best_anchor = a best_roi = roi_candidate best_container = container_candidate anchor = best_anchor roi = best_roi container_bbox = best_container if anchor: break # Si pas encore de ROI/container calculés (cas anchor unique) if anchor and roi is None: x, y, w, h = anchor.bbox # Fiche #10: Appliquer healing profile pour padding hp = self._healing_profile() pad_x = int(300 * hp["pad_mul"]) pad_y = int(250 * hp["pad_mul"]) # selon l'indication, on oriente la ROI if "below_text" in hints: roi = (x - 20, y + h, w + pad_x, pad_y) elif "above_text" in hints: roi = (x - 20, y - pad_y, w + pad_x, pad_y) elif "right_of_text" in hints: roi = (x + w, y - 20, pad_x, h + 120) elif "left_of_text" in hints: roi = (x - pad_x, y - 20, pad_x, h + 120) else: roi = (x - 80, y - 80, w + 160, h + 160) # Container (best-effort) si pas encore calculé if anchor and container_bbox is None: containers = [] for e in ui_elements: r = (getattr(e, "role", "") or "").lower() t = (getattr(e, "type", "") or "").lower() if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group"}: if _bbox_contains(e.bbox, anchor.bbox): containers.append(e) if containers: containers.sort(key=lambda c: _bbox_area(c.bbox)) # le plus petit container container_bbox = containers[0].bbox return anchor, roi, container_bbox def _find_element_by_text(self, text: str, ui_elements: List[UIElement], min_ratio: float = 0.65) -> Optional[UIElement]: """ Trouver un élément par son texte/label avec normalisation et fuzzy matching. Fiche #8: Au lieu de "premier qui matche", on prend le meilleur score. """ target = _norm_text(text) if not target: return None best = None best_score = 0.0 for e in ui_elements: label = _norm_text(getattr(e, "label", "") or "") if not label: continue # Exact / contains = score fort if label == target: score = 1.0 elif target in label or label in target: score = 0.92 else: score = _fuzzy_ratio(label, target) if score > best_score and score >= min_ratio: best = e best_score = score return best # ========================================================================= # Fiche #7 - Container et Form Logic # ========================================================================= def _smallest_common_container_bbox(self, anchor, candidate, ui_elements): """Retourne la bbox du plus petit container qui contient anchor ET candidate.""" if not anchor or not candidate: return None containers = [] for e in ui_elements: r = (getattr(e, "role", "") or "").lower() t = (getattr(e, "type", "") or "").lower() if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}: if _bbox_contains(e.bbox, anchor.bbox) and _bbox_contains(e.bbox, candidate.bbox): containers.append(e) if not containers: return None containers.sort(key=lambda c: _bbox_area(c.bbox)) # plus petit = plus spécifique return containers[0].bbox def _alignment_bonus(self, elem, anchor, hints) -> float: """ Bonus multiplicatif selon la logique formulaire avec cache optimisé. Tâche 5.4: Utilise le cache de calculs pour éviter les recalculs d'alignement. - right_of_text: même ligne (Y proche) + un peu de recouvrement vertical - below_text: même colonne visuelle (X overlap) + proche verticalement """ if not anchor or not hints: return 1.0 bonus = 1.0 eb, ab = elem.bbox, anchor.bbox # Utiliser le cache pour les calculs d'alignement for hint_type in hints.keys(): if hint_type in ("right_of_text", "left_of_text", "below_text", "above_text", "near_text"): alignment_score = self._computation_cache.get_alignment_score( elem.element_id, anchor.element_id, hint_type, lambda: self._compute_alignment_score(elem, anchor, hint_type) ) bonus *= alignment_score return bonus def _compute_alignment_score(self, elem, anchor, hint_type: str) -> float: """ Calculer le score d'alignement pour un type de hint spécifique. Args: elem: Élément à évaluer anchor: Élément ancre hint_type: Type de hint spatial Returns: Score multiplicatif d'alignement """ eb, ab = elem.bbox, anchor.bbox # Normalisation : seuils tolérants (UI réelles = jamais parfaites) baseline_ok = _baseline_distance(eb, ab) <= 25.0 y_ov = _y_overlap_ratio(eb, ab) x_ov = _x_overlap_ratio(eb, ab) bonus = 1.0 if hint_type in ("right_of_text", "left_of_text"): if baseline_ok: bonus *= 1.20 if y_ov > 0.3: bonus *= 1.10 elif hint_type in ("below_text", "above_text"): if x_ov > 0.25: bonus *= 1.18 # petit bonus si l'élément est "pas trop loin" dy = abs(_bbox_center(eb)[1] - _bbox_center(ab)[1]) if dy <= 180: bonus *= 1.06 elif hint_type == "near_text": # si c'est proche ET aligné un minimum, bonus if baseline_ok or x_ov > 0.25 or y_ov > 0.3: bonus *= 1.08 return bonus def _is_interactable(self, elem: UIElement, screen_state: ScreenState) -> bool: """ Vérifie si un élément est interactable (pas hidden/disabled/offscreen). Fiche #8: Filtre les éléments non-cliquables pour éviter les faux positifs. """ # bbox XYWH x, y, w, h = elem.bbox if w <= 2 or h <= 2: return False # Screen bounds (best effort) if self.error_handler: # Utiliser ErrorHandler pour gérer les erreurs d'accès aux propriétés try: sw, sh = screen_state.window.screen_resolution except Exception as e: context_data = { 'screen_state': screen_state, 'details': {'operation': 'get_screen_resolution'}, 'original_data': {'window': screen_state.window} } recovery_result = self.error_handler.handle_error(e, context_data) if recovery_result.success and 'screen_resolution' in recovery_result.recovery_data: sw, sh = recovery_result.recovery_data['screen_resolution'] else: sw, sh = None, None else: # Fallback au comportement original try: sw, sh = screen_state.window.screen_resolution except Exception: sw, sh = None, None if sw and sh: r = x + w b = y + h # totalement hors écran (plus strict) if x < 0 or y < 0 or x >= sw or y >= sh: return False # Tags / metadata (si présents) tags = set((getattr(elem, "tags", None) or [])) meta = getattr(elem, "metadata", None) or {} if "hidden" in tags or "disabled" in tags: return False if meta.get("visible") is False or meta.get("enabled") is False: return False return True def _find_anchors_by_text(self, text: str, ui_elements: List[UIElement], min_ratio: float = 0.65) -> List[UIElement]: """ Trouve tous les anchors candidats pour un texte donné. Fiche #8: Gère les labels dupliqués en retournant tous les candidats. """ target = _norm_text(text) out = [] for e in ui_elements: label = _norm_text(getattr(e, "label", "") or "") if not label: continue if label == target or target in label or label in target or _fuzzy_ratio(label, target) >= min_ratio: out.append(e) return out # ========================================================================= # Fiche #12 - Form Rows/Columns: Helpers pour grouping et association label→champ # ========================================================================= def _y_center(self, bbox) -> float: """Centre Y d'une bbox (x, y, w, h)""" bbox_tuple = _bbox_to_tuple(bbox) return float(bbox_tuple[1] + bbox_tuple[3] / 2) def _x_center(self, bbox) -> float: """Centre X d'une bbox (x, y, w, h)""" bbox_tuple = _bbox_to_tuple(bbox) return float(bbox_tuple[0] + bbox_tuple[2] / 2) def _is_inputish(self, elem: UIElement) -> bool: """ Vérifie si un élément est un champ de saisie. Auteur : Dom, Alice Kiro - 19 décembre 2024 """ role = (getattr(elem, "role", "") or "").lower() etype = (getattr(elem, "type", "") or "").lower() return (role in {"input", "textfield", "text_field", "form_input", "textbox", "edit"} or etype in {"text_input", "input", "textfield"}) def _is_labelish(self, elem: UIElement) -> bool: """ Vérifie si un élément est un label. Auteur : Dom, Alice Kiro - 19 décembre 2024 """ role = (getattr(elem, "role", "") or "").lower() etype = (getattr(elem, "type", "") or "").lower() return (role in {"label", "text", "data_display"} or etype in {"label"}) def _build_rows(self, elements: List[UIElement], y_tol: float = 18.0): """ Groupe les éléments en lignes selon la proximité du centre Y. Auteur : Dom, Alice Kiro - 19 décembre 2024 Args: elements: Liste des éléments UI à grouper y_tol: Tolérance en pixels pour considérer deux éléments sur la même ligne Returns: Tuple[List[List[UIElement]], Dict[str, int]]: - rows: liste de lignes (chaque ligne = liste d'éléments) - row_of: mapping element_id -> index de ligne """ # Trier par centre Y els = sorted(elements, key=lambda e: self._y_center(e.bbox)) rows = [] row_of = {} for elem in els: placed = False ey = self._y_center(elem.bbox) for idx, row in enumerate(rows): # Centre Y moyen de la ligne ry = sum(self._y_center(x.bbox) for x in row) / max(1, len(row)) if abs(ey - ry) <= y_tol: row.append(elem) row_of[elem.element_id] = idx placed = True break if not placed: rows.append([elem]) row_of[elem.element_id] = len(rows) - 1 # Trier chaque ligne par X (gauche à droite) for row in rows: row.sort(key=lambda e: _bbox_to_tuple(e.bbox)[0]) return rows, row_of # ========================================================================= # Fiche #11 - Multi-anchor + Hard Constraints Helpers # ========================================================================= def _as_text_list(self, v): """ Convertit une valeur en liste de textes. Fiche #11: Helper pour gérer les listes d'ancres alternatives. Args: v: str, list, tuple, ou None Returns: List[str]: Liste de textes non-vides """ if v is None: return [] if isinstance(v, (list, tuple)): return [str(x) for x in v if x] return [str(v)] def _container_bbox_from_text(self, text: str, ui_elements): """ Trouve un container à partir d'un texte. Fiche #11: Helper pour identifier les containers par nom/label. Args: text: Texte à chercher (nom du container) ui_elements: Liste des éléments UI Returns: bbox du container ou None si non trouvé """ # 1) Trouver un élément portant ce texte (label/panel) anchor = self._find_element_by_text(text, ui_elements, min_ratio=self._healing_profile()["min_ratio"]) if not anchor: return None # 2) Si c'est déjà un panel/container, ok r = (getattr(anchor, "role", "") or "").lower() t = (getattr(anchor, "type", "") or "").lower() if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}: return anchor.bbox # 3) Sinon: plus petit container qui contient ce label containers = [] for e in ui_elements: rr = (getattr(e, "role", "") or "").lower() tt = (getattr(e, "type", "") or "").lower() if rr in {"panel", "container", "group", "form"} or tt in {"panel", "container", "group", "form"}: if _bbox_contains(e.bbox, anchor.bbox): containers.append(e) if not containers: return None containers.sort(key=lambda c: _bbox_area(c.bbox)) return containers[0].bbox def _apply_hard_constraints(self, candidates, target_spec, ui_elements): """ Applique les contraintes strictes aux candidats. Fiche #11: Filtre strict selon hard_constraints. Args: candidates: Liste des candidats actuels target_spec: TargetSpec avec hard_constraints ui_elements: Tous les éléments UI Returns: Liste filtrée des candidats """ hc = getattr(target_spec, "hard_constraints", None) or {} # Exemple: forcer un container identifié par texte if "within_container_text" in hc and hc["within_container_text"]: cb = self._container_bbox_from_text(str(hc["within_container_text"]), ui_elements) if cb is not None: candidates = [e for e in candidates if _bbox_contains(cb, e.bbox)] logger.debug(f"Hard constraint within_container_text '{hc['within_container_text']}': {len(candidates)} candidates remaining") # Exemple: ignorer tiny elements if "min_area" in hc: min_area = float(hc["min_area"]) candidates = [e for e in candidates if _bbox_area(e.bbox) >= min_area] logger.debug(f"Hard constraint min_area {min_area}: {len(candidates)} candidates remaining") return candidates def _evaluate_all_anchor_candidate_combinations( self, target_spec: TargetSpec, candidates: List[UIElement], ui_elements: List[UIElement], scores: Dict[str, float], hints: Dict[str, Any], healing_profile: Dict[str, Any] ) -> Dict[str, Any]: """ Évaluer toutes les combinaisons ancre-candidat et retourner la meilleure. Fiche #11 - Tâche 7.1: Pour chaque ancre, scorer tous les candidats après contraintes et garder la trace de la meilleure combinaison (score, ancre, candidat). Tâche 8.1: Réutilisation de l'analyse des éléments UI (Exigences 8.2, 8.4) - Éviter de re-analyser les mêmes éléments entre les ancres - Réutiliser les calculs de distance et d'alignement Auteur : Dom, Alice Kiro - 15 décembre 2024 Args: target_spec: Spécification de la cible avec context_hints et weights candidates: Liste des candidats après filtrage initial ui_elements: Tous les éléments UI disponibles scores: Scores de base par element_id hints: Context hints extraits de target_spec healing_profile: Profil de healing actuel Returns: Dict contenant la meilleure combinaison: - element: Meilleur élément sélectionné - anchor: Ancre utilisée (peut être None) - score: Score final de la combinaison - top3: Liste des 3 meilleurs candidats avec scores - tie_break_criterion: Critère utilisé pour le tie-breaking - anchor_id: ID de l'ancre utilisée - performance_metrics: Métriques de performance """ start_time = time.perf_counter() # Tâche 8.1: Cache pour réutiliser les analyses entre ancres (Exigence 8.2) ui_analysis_cache = {} # Cache des analyses d'éléments UI distance_cache = {} # Cache des calculs de distance alignment_cache = {} # Cache des calculs d'alignement container_cache = {} # Cache des résolutions de conteneur # 1) Construire une liste de textes d'ancrage possibles (multi-anchor) anchor_texts = [] for k in ("near_text", "below_text", "above_text", "right_of_text", "left_of_text"): anchor_texts += self._as_text_list(hints.get(k)) logger.debug(f"Multi-anchor evaluation: {len(anchor_texts)} anchor texts found: {anchor_texts}") # 2) Trouver tous les candidats d'ancrage (Exigence 1.2) anchor_candidates = [] for txt in anchor_texts: found_anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=healing_profile["min_ratio"]) anchor_candidates.extend(found_anchors) logger.debug(f"Anchor text '{txt}': found {len(found_anchors)} candidates") # Gérer le cas où aucune ancre n'est trouvée (fallback à None) - Exigence 1.4 if not anchor_candidates: anchor_candidates = [None] logger.debug("No anchors found, using anchor-less resolution") # Variables pour tracker la meilleure combinaison best_element = None best_score = -1.0 best_anchor = None best_details = None best_tie_break_criterion = "score" # Métriques de performance avec cache tracking combinations_evaluated = 0 anchor_evaluations = 0 cache_hits = 0 cache_misses = 0 # 3) Évaluer chaque combinaison ancre-candidat (Exigence 1.2, 8.2) for anchor in anchor_candidates: anchor_evaluations += 1 # Construire ROI et container pour cette ancre avec cache (Exigence 8.3) roi_bbox, container_bbox = None, None if anchor is not None: # Cache key pour cette ancre anchor_cache_key = f"anchor_{anchor.element_id}" if anchor_cache_key in container_cache: # Réutiliser les calculs précédents (Exigence 8.2) roi_bbox, container_bbox = container_cache[anchor_cache_key] cache_hits += 1 logger.debug(f"Cache hit for anchor {anchor.element_id}") else: # Calculer et mettre en cache try: _, roi_bbox, container_bbox = self._build_anchor_and_roi_and_container(target_spec, ui_elements) container_cache[anchor_cache_key] = (roi_bbox, container_bbox) cache_misses += 1 except Exception as e: logger.debug(f"Failed to build ROI/container for anchor {anchor.element_id}: {e}") roi_bbox, container_bbox = None, None container_cache[anchor_cache_key] = (None, None) cache_misses += 1 # Appliquer les contraintes strictes une fois par ancre (Exigence 8.2) anchor_candidates_filtered = self._apply_hard_constraints(candidates, target_spec, ui_elements) if not anchor_candidates_filtered: logger.debug(f"No candidates remaining after hard constraints for anchor {anchor.element_id if anchor else 'None'}") continue # Scoring pondéré pour tous les candidats avec cette ancre weights = getattr(target_spec, "weights", None) or {} sniper_scores = {} for candidate in anchor_candidates_filtered: combinations_evaluated += 1 # Tâche 8.1: Utiliser le scoring avec cache pour éviter les calculs redondants (Exigence 8.4) score = self._score_candidate_sniper_cached( candidate, base_score=scores[candidate.element_id], anchor_elem=anchor, roi_bbox=roi_bbox, container_bbox=container_bbox, hints=hints, ui_elements=ui_elements, weights=weights, # Caches pour réutilisation ui_analysis_cache=ui_analysis_cache, distance_cache=distance_cache, alignment_cache=alignment_cache ) sniper_scores[candidate.element_id] = score # Sélection avec tie-breaking stable (Tâche 6) chosen, tie_break_criterion = self._select_best_candidate_with_tiebreak( anchor_candidates_filtered, sniper_scores ) if not chosen: continue chosen_score = sniper_scores[chosen.element_id] # Vérifier si c'est la meilleure combinaison jusqu'à présent if chosen_score > best_score: best_score = chosen_score best_element = chosen best_anchor = anchor best_tie_break_criterion = tie_break_criterion # Créer top3 avec les scores actuels scored_candidates = [(c, sniper_scores[c.element_id]) for c in anchor_candidates_filtered] scored_candidates.sort(key=lambda x: x[1], reverse=True) top3 = [x[0] for x in scored_candidates[:3]] best_details = [ {"id": t.element_id, "score": round(sniper_scores[t.element_id], 4)} for t in top3 ] logger.debug(f"New best combination: anchor={anchor.element_id if anchor else 'None'}, " f"element={chosen.element_id}, score={chosen_score:.4f}") # Calculer les métriques de performance avec cache stats duration_ms = (time.perf_counter() - start_time) * 1000 performance_metrics = { "evaluation_duration_ms": round(duration_ms, 2), "combinations_evaluated": combinations_evaluated, "anchor_evaluations": anchor_evaluations, "anchors_attempted": len(anchor_candidates), "anchor_texts": anchor_texts, # Tâche 8.1: Métriques de cache (Exigence 8.5) "cache_hits": cache_hits, "cache_misses": cache_misses, "cache_hit_ratio": round(cache_hits / max(1, cache_hits + cache_misses), 3), "ui_analysis_cache_size": len(ui_analysis_cache), "distance_cache_size": len(distance_cache), "alignment_cache_size": len(alignment_cache), "container_cache_size": len(container_cache) } logger.debug(f"Multi-anchor evaluation completed: {combinations_evaluated} combinations in {duration_ms:.2f}ms " f"(cache hit ratio: {performance_metrics['cache_hit_ratio']:.1%})") return { "element": best_element, "anchor": best_anchor, "score": best_score, "top3": best_details or [], "tie_break_criterion": best_tie_break_criterion, "anchor_id": best_anchor.element_id if best_anchor else None, "performance_metrics": performance_metrics } def _create_stable_sort_key(self, element: UIElement, score: float) -> Tuple[float, float, float, str]: """ Créer une clé de tri stable pour le tie-breaking (Fiche #11 - Tâche 6). Auteur : Dom, Alice Kiro - 15 décembre 2024 Critères de tri (dans l'ordre de priorité): 1. Score composite (décroissant) 2. Confiance de l'élément (décroissant) 3. Aire de l'élément (décroissant - plus grand = plus visible) 4. ID de l'élément (croissant - pour stabilité) Args: element: Élément UI à évaluer score: Score composite calculé Returns: Tuple de critères pour tri stable (Exigences 5.1, 5.2, 5.3, 5.4) """ # Critère 1: Score composite (négatif pour tri décroissant) sort_score = -float(score) # Critère 2: Confiance (négatif pour tri décroissant) confidence = float(getattr(element, "confidence", 1.0) or 1.0) sort_confidence = -confidence # Critère 3: Aire (négatif pour tri décroissant - préférer les plus grands) area = _bbox_area(element.bbox) sort_area = -area # Critère 4: ID élément (croissant pour déterminisme) element_id = str(getattr(element, "element_id", id(element))) return (sort_score, sort_confidence, sort_area, element_id) def _select_best_candidate_with_tiebreak(self, candidates: List[UIElement], scores: Dict[str, float]) -> Tuple[UIElement, str]: """ Sélectionner le meilleur candidat avec tie-breaking stable (Fiche #11 - Tâche 6). Auteur : Dom, Alice Kiro - 15 décembre 2024 Args: candidates: Liste des candidats scores: Dictionnaire des scores par element_id Returns: Tuple (meilleur_élément, critère_tie_break_utilisé) """ if not candidates: return None, "no_candidates" if len(candidates) == 1: return candidates[0], "single_candidate" # Créer les clés de tri pour tous les candidats candidate_keys = [] for elem in candidates: score = scores.get(elem.element_id, 0.0) sort_key = self._create_stable_sort_key(elem, score) candidate_keys.append((elem, sort_key, score)) # Trier par clé stable (Exigence 5.4, 5.5) candidate_keys.sort(key=lambda x: x[1]) # Prendre le meilleur best_elem, best_key, best_score = candidate_keys[0] # Déterminer quel critère a été utilisé pour le tie-break tie_break_criterion = "score" # Vérifier s'il y a égalité sur le score score_ties = [x for x in candidate_keys if abs(x[2] - best_score) < 1e-6] if len(score_ties) > 1: # Il y a égalité sur le score, vérifier les autres critères confidence_ties = [x for x in score_ties if abs(x[1][1] - best_key[1]) < 1e-6] if len(confidence_ties) > 1: # Égalité sur score ET confiance area_ties = [x for x in confidence_ties if abs(x[1][2] - best_key[2]) < 1e-6] if len(area_ties) > 1: # Égalité sur score, confiance ET aire tie_break_criterion = "element_id" else: tie_break_criterion = "area" else: tie_break_criterion = "confidence" logger.debug(f"Selected element {best_elem.element_id} with tie-break criterion: {tie_break_criterion}") return best_elem, tie_break_criterion # ========================================================================= # Résolution principale # ========================================================================= def resolve_target( self, target_spec: TargetSpec, screen_state: ScreenState, context: Optional[ResolutionContext] = None ) -> Optional[ResolvedTarget]: """ Résoudre une cible selon sa spécification. Args: target_spec: Spécification de la cible screen_state: État actuel de l'écran context: Contexte additionnel (optionnel) Returns: ResolvedTarget ou None si non trouvé """ # Fiche #10: Démarrage mesure performance start_time = time.perf_counter() self._stats["total_resolutions"] += 1 # Vérifier le cache cache_key = self._make_cache_key(target_spec, screen_state) if cache_key in self._cache: self._stats["cache_hits"] += 1 return self._cache[cache_key] # Créer le contexte si non fourni if context is None: context = ResolutionContext(screen_state=screen_state) # Obtenir les éléments UI et filtrer les non-interactifs (Fiche #8) all_elements = self._get_ui_elements(screen_state) ui_elements = [e for e in all_elements if self._is_interactable(e, screen_state)] if not ui_elements: logger.warning("No interactable UI elements available for resolution") self._stats["failed"] += 1 return None # Fiche #13: Index spatial (perf) - disponible dans le contexte context.workflow_context["spatial_index"] = self._get_spatial_index(screen_state, ui_elements) # Fiche #18: Lookup depuis mémoire persistante (avant résolution coûteuse) persistent_result = self._lookup_from_persistent_memory(target_spec, screen_state, ui_elements) if persistent_result is not None: # Mise en cache et stats self._cache[cache_key] = persistent_result if len(self._cache) > self._cache_size: self._cache.popitem(last=False) self._stats["successful"] += 1 logger.info(f"Resolved target from persistent memory: {persistent_result.element.element_id}") return persistent_result # Fiche #14: Résolution depuis mémoire cross-frame (avant scoring coûteux) mem = self._resolve_from_memory(target_spec, screen_state, ui_elements, context) if mem is not None: # Mise en cache et stats self._cache[cache_key] = mem if len(self._cache) > self._cache_size: self._cache.popitem(last=False) self._stats["successful"] += 1 return mem # Fiche #12: Shortcut field_for (label -> field) - priorité absolue result = self._resolve_field_for(target_spec, screen_state, ui_elements, context) if result is not None: # Mise en cache et stats self._cache[cache_key] = result if len(self._cache) > self._cache_size: self._cache.popitem(last=False) self._stats["successful"] += 1 # Fiche #14: Enregistrer en mémoire cross-frame self._record_in_cross_frame_memory(target_spec, screen_state, ui_elements, result.element) return result # Essayer les stratégies dans l'ordre de priorité result = None fallback_applied = False # 1. Stratégie composite si plusieurs critères if self._is_composite_spec(target_spec): result = self._resolve_composite(target_spec, ui_elements, context) # 2. By Role (priorité haute) if not result and target_spec.by_role: result = self._resolve_by_role( target_spec.by_role, target_spec.selection_policy or "first", ui_elements, context ) # 3. By Text if not result and target_spec.by_text: result = self._resolve_by_text(target_spec.by_text, ui_elements, context) if result: fallback_applied = target_spec.by_role is not None # 4. By Position if not result and target_spec.by_position: result = self._resolve_by_position( target_spec.by_position, ui_elements, context ) if result: fallback_applied = True # 5. By Embedding (fallback avancé) if not result and self.use_embedding_fallback and hasattr(target_spec, 'embedding_ref'): result = self._resolve_by_embedding(target_spec, ui_elements, context) if result: fallback_applied = True # 6. By Context (dernier recours) if not result and hasattr(target_spec, 'context_hints'): result = self._resolve_by_context(target_spec, ui_elements, context) if result: fallback_applied = True # Fiche #10: Collecte métriques résolution duration_ms = (time.perf_counter() - start_time) * 1000 if self.metrics_engine and METRICS_AVAILABLE: if self.error_handler: # Utiliser ErrorHandler pour gérer les erreurs de métriques try: self.metrics_engine.record_resolution( target_spec=target_spec, result=result, duration_ms=duration_ms, screen_state=screen_state ) except Exception as e: # Déléguer à ErrorHandler context_data = { 'screen_state': screen_state, 'details': {'operation': 'record_resolution_metrics'}, 'original_data': {'target_spec': target_spec, 'duration_ms': duration_ms} } recovery_result = self.error_handler.handle_error(e, context_data) if not recovery_result.success: logger.debug(f"Failed to record resolution metrics: {e}") else: # Fallback au comportement original try: self.metrics_engine.record_resolution( target_spec=target_spec, result=result, duration_ms=duration_ms, screen_state=screen_state ) except Exception as e: logger.debug(f"Failed to record resolution metrics: {e}") # Mettre à jour les stats if result: result.fallback_applied = fallback_applied self._stats["successful"] += 1 strategy = result.strategy_used self._stats["by_strategy"][strategy] = self._stats["by_strategy"].get(strategy, 0) + 1 if fallback_applied: self._stats["fallbacks_used"] += 1 # Mettre en cache self._add_to_cache(cache_key, result) # Fiche #14: Enregistrer en mémoire cross-frame self._record_in_cross_frame_memory(target_spec, screen_state, ui_elements, result.element) else: self._stats["failed"] += 1 logger.debug(f"Failed to resolve target: {target_spec}") return result # ========================================================================= # Stratégies de résolution # ========================================================================= def _resolve_by_role( self, role: str, selection_policy: str, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """ Résoudre par rôle sémantique avec support healing. Fiche #10: Intègre le healing profile pour expansion des aliases. """ # Fiche #10: Récupérer le profil de healing hp = self._healing_profile() role_lower = role.lower().strip() # Fiche #10: Construire la liste des rôles à chercher wanted_roles = {role_lower} if hp["expand_roles"]: # Expansion avec ROLE_ALIASES si healing actif wanted_roles |= ROLE_ALIASES.get(role_lower, set()) # Filtrer les candidats candidates = [] for elem in ui_elements: elem_role = (getattr(elem, 'role', '') or "").lower().strip() elem_type = (getattr(elem, 'type', '') or "").lower().strip() # Vérifier correspondance role ok_role = elem_role in wanted_roles # Fiche #10: Bonus - si le rôle est trop "flou", accepter aussi par type (en healing) if not ok_role and hp["expand_roles"]: for k, types in TYPE_ALIASES.items(): if role_lower == k and elem_type in types: ok_role = True break if ok_role: candidates.append(elem) if not candidates: return None # Appliquer la politique de sélection selected = self._apply_selection_policy(candidates, selection_policy, context) if selected: return ResolvedTarget( element=selected, confidence=selected.confidence, strategy_used=ResolutionStrategy.BY_ROLE.value, resolution_details={ "role_searched": role, "candidates_found": len(candidates), "policy_applied": selection_policy, # Fiche #10: Ajouter healing metadata "healing_attempt": int(getattr(self, "healing_attempt", 0) or 0), "healing_profile": hp, "role_aliases_used": list(wanted_roles) if hp["expand_roles"] else [role_lower] }, alternatives=[ ResolvedTarget(e, e.confidence, ResolutionStrategy.BY_ROLE.value) for e in candidates if e != selected ][:3] # Max 3 alternatives ) return None def _resolve_by_text( self, text: str, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """Résoudre par texte (exact, partiel, fuzzy).""" text_lower = text.lower().strip() # 1. Match exact for elem in ui_elements: if elem.label and elem.label.strip().lower() == text_lower: return ResolvedTarget( element=elem, confidence=elem.confidence, strategy_used=ResolutionStrategy.BY_TEXT.value, resolution_details={"match_type": "exact", "text": text} ) # 2. Match partiel (contient) partial_matches = [] for elem in ui_elements: if elem.label and text_lower in elem.label.strip().lower(): partial_matches.append((elem, 0.9)) elif elem.label and elem.label.strip().lower() in text_lower: partial_matches.append((elem, 0.85)) if partial_matches: # Trier par confiance partial_matches.sort(key=lambda x: x[1] * x[0].confidence, reverse=True) best = partial_matches[0] return ResolvedTarget( element=best[0], confidence=best[1] * best[0].confidence, strategy_used=ResolutionStrategy.BY_TEXT.value, resolution_details={"match_type": "partial", "text": text} ) # 3. Match fuzzy (Levenshtein) - Fiche #10: Utiliser healing profile hp = self._healing_profile() fuzzy_threshold = hp["min_ratio"] # Utiliser le seuil du healing profile fuzzy_matches = [] for elem in ui_elements: if elem.label: similarity = self._fuzzy_match(text_lower, elem.label.strip().lower()) if similarity >= fuzzy_threshold: fuzzy_matches.append((elem, similarity)) if fuzzy_matches: fuzzy_matches.sort(key=lambda x: x[1], reverse=True) best = fuzzy_matches[0] return ResolvedTarget( element=best[0], confidence=best[1] * best[0].confidence, strategy_used=ResolutionStrategy.BY_TEXT.value, resolution_details={ "match_type": "fuzzy", "similarity": best[1], "text": text, # Fiche #10: Ajouter healing metadata "healing_attempt": int(getattr(self, "healing_attempt", 0) or 0), "healing_profile": hp, "fuzzy_threshold_used": fuzzy_threshold } ) return None def _resolve_by_position( self, position: Tuple[int, int], ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """ Résoudre par position avec index spatial optimisé. Fiche #13: Optimisation O(n) → O(k) avec SpatialIndexGrid. Auteur : Dom, Alice Kiro - 19 décembre 2024 """ x, y = position idx: SpatialIndexGrid = context.workflow_context.get("spatial_index") # 1) Contain strict (dans la cellule) hits = idx.query_point(x, y) if idx else [e for e in ui_elements if _bbox_contains_point(e.bbox, x, y)] if hits: # Tie-break stable: plus petit (souvent le contrôle) puis confiance hits.sort(key=lambda e: (_bbox_area(e.bbox), -e.confidence, e.element_id)) elem = hits[0] return ResolvedTarget( element=elem, confidence=elem.confidence * 0.95, strategy_used=ResolutionStrategy.BY_POSITION.value, resolution_details={"match_type": "contains", "position": position} ) # 2) Nearest dans la tolérance (query bbox autour du point) tol = self.position_tolerance box = (x - tol, y - tol, 2 * tol, 2 * tol) candidates = idx.query_bbox(box) if idx else ui_elements best = None best_dist = 10**18 best_score = 0.0 for elem in candidates: cx, cy = _bbox_center(elem.bbox) dist = ((x - cx) ** 2 + (y - cy) ** 2) ** 0.5 if dist <= tol and dist < best_dist: best_dist = dist best = elem best_score = 1.0 - (dist / tol) if best: return ResolvedTarget( element=best, confidence=best.confidence * best_score * 0.8, strategy_used=ResolutionStrategy.BY_POSITION.value, resolution_details={"match_type": "nearest", "distance": best_dist, "position": position} ) return None def _resolve_by_embedding( self, target_spec: TargetSpec, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """Résoudre par similarité d'embedding visuel.""" # Cette méthode nécessite un embedding de référence if not hasattr(target_spec, 'embedding_ref') or not target_spec.embedding_ref: return None # Lazy load du matcher if self._embedding_matcher is None: if self.error_handler: # Utiliser ErrorHandler pour gérer les erreurs d'import try: from ..embedding.fusion_engine import FusionEngine self._embedding_matcher = FusionEngine() except ImportError as e: context_data = { 'details': {'operation': 'import_fusion_engine'}, 'original_data': {'module': 'fusion_engine'} } recovery_result = self.error_handler.handle_error(e, context_data) if not recovery_result.success: logger.warning("FusionEngine not available for embedding matching") return None else: # Fallback au comportement original try: from ..embedding.fusion_engine import FusionEngine self._embedding_matcher = FusionEngine() except ImportError: logger.warning("FusionEngine not available for embedding matching") return None # Comparer les embeddings best_match = None best_similarity = 0.0 for elem in ui_elements: if hasattr(elem, 'embedding') and elem.embedding is not None: similarity = self._compute_similarity( target_spec.embedding_ref, elem.embedding ) if similarity > best_similarity and similarity >= self.similarity_threshold: best_similarity = similarity best_match = elem if best_match: return ResolvedTarget( element=best_match, confidence=best_similarity, strategy_used=ResolutionStrategy.BY_EMBEDDING.value, resolution_details={"similarity": best_similarity} ) return None def _resolve_by_context( self, target_spec: TargetSpec, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """Résoudre par contexte spatial (près de X, dans Y).""" if not hasattr(target_spec, 'context_hints') or not target_spec.context_hints: return None hints = target_spec.context_hints candidates = list(ui_elements) # Filtrer par "near" (près de) if 'near_text' in hints: anchor = self._find_element_by_text(hints['near_text'], ui_elements) if anchor: max_distance = hints.get('max_distance', 200) candidates = self._filter_by_proximity(candidates, anchor, max_distance) # Filtrer par "below" (en dessous de) if 'below_text' in hints: anchor = self._find_element_by_text(hints['below_text'], ui_elements) if anchor: candidates = [e for e in candidates if _bbox_to_tuple(e.bbox)[1] > _bbox_bottom(anchor.bbox)] # Filtrer par "right_of" (à droite de) if 'right_of_text' in hints: anchor = self._find_element_by_text(hints['right_of_text'], ui_elements) if anchor: candidates = [e for e in candidates if _bbox_to_tuple(e.bbox)[0] > _bbox_right(anchor.bbox)] if candidates: # Prendre le premier candidat (ou appliquer une politique) return ResolvedTarget( element=candidates[0], confidence=candidates[0].confidence * 0.7, strategy_used=ResolutionStrategy.BY_CONTEXT.value, resolution_details={"hints_applied": list(hints.keys())} ) return None def _resolve_field_for( self, target_spec: TargetSpec, screen_state: ScreenState, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """ Résolution directe "field_for" : cherche le champ associé à un label. Fiche #12: Association label → champ avec logique de formulaire. Auteur : Dom, Alice Kiro - 19 décembre 2024 Stratégie: 1. Cherche label sur la même ligne à droite (pattern #1 des UI) 2. Fallback : input sous le label, même colonne Args: target_spec: Spécification avec context_hints["field_for"] screen_state: État de l'écran ui_elements: Liste des éléments UI context: Contexte de résolution Returns: ResolvedTarget si trouvé, None sinon """ hints = getattr(target_spec, "context_hints", None) or {} if "field_for" not in hints or not hints["field_for"]: return None hp = self._healing_profile() label_texts = self._as_text_list(hints["field_for"]) rows, row_of = self._build_rows(ui_elements) # 1) Trouver les anchors label possibles anchors = [] for txt in label_texts: anchors += self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"]) if not anchors: return None best = None best_score = -1.0 best_anchor = None for anchor in anchors: a_row = row_of.get(anchor.element_id, None) if a_row is None: continue # Candidats input-ish candidates = [e for e in ui_elements if self._is_inputish(e)] candidates = self._apply_hard_constraints(candidates, target_spec, ui_elements) ax1, ay1, aw, ah = anchor.bbox a_right = ax1 + aw a_bottom = ay1 + ah # 2) Même ligne, à droite same_row = [e for e in candidates if row_of.get(e.element_id) == a_row and _bbox_to_tuple(e.bbox)[0] >= a_right - 10] if same_row: # Choisir le plus proche horizontalement same_row.sort(key=lambda e: (_bbox_to_tuple(e.bbox)[0] - a_right, _bbox_area(e.bbox), e.element_id)) chosen = same_row[0] score = 0.95 else: # 3) Fallback : sous le label, même colonne (x overlap) def x_overlap_ratio(b1, b2): ax1, ax2 = b1[0], _bbox_right(b1) bx1, bx2 = b2[0], _bbox_right(b2) ov = max(0.0, min(ax2, bx2) - max(ax1, bx1)) denom = max(1.0, min(b1[2], b2[2])) return ov / denom below = [e for e in candidates if _bbox_to_tuple(e.bbox)[1] >= a_bottom - 5 and x_overlap_ratio(e.bbox, anchor.bbox) > 0.25] below.sort(key=lambda e: (_bbox_to_tuple(e.bbox)[1] - a_bottom, _bbox_to_tuple(e.bbox)[0], e.element_id)) if not below: continue chosen = below[0] score = 0.85 if score > best_score: best_score = score best = chosen best_anchor = anchor if not best: return None return ResolvedTarget( element=best, confidence=min(best_score, 1.0), strategy_used=ResolutionStrategy.COMPOSITE.value, resolution_details={ "criteria_used": {"field_for": label_texts}, "anchor_id": best_anchor.element_id if best_anchor else None, "healing_attempt": int(getattr(self, "healing_attempt", 0) or 0), } ) def _resolve_composite( self, target_spec: TargetSpec, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """ Résoudre avec plusieurs critères combinés. Fiche #3: Intègre maintenant context_hints dans la résolution composite. Auteur: Dom, Alice Kiro - 15 décembre 2024 """ candidates = list(ui_elements) scores: Dict[str, float] = {e.element_id: 1.0 for e in candidates} try: # Filtrer et scorer par rôle if target_spec.by_role: role_lower = target_spec.by_role.lower() new_candidates = [] for elem in candidates: elem_role = (getattr(elem, 'role', '') or "").lower() elem_type = (getattr(elem, 'type', '') or "").lower() if elem_role == role_lower or elem_type == role_lower: scores[elem.element_id] *= 1.0 new_candidates.append(elem) elif role_lower in elem_role or role_lower in elem_type: scores[elem.element_id] *= 0.8 new_candidates.append(elem) if new_candidates: candidates = new_candidates logger.debug(f"Role filter: {len(new_candidates)} candidates for role '{target_spec.by_role}'") # Filtrer et scorer par texte if target_spec.by_text: text_lower = target_spec.by_text.lower() new_candidates = [] for elem in candidates: label = (getattr(elem, 'label', '') or "").lower() if label == text_lower: scores[elem.element_id] *= 1.0 new_candidates.append(elem) elif text_lower in label: scores[elem.element_id] *= 0.9 new_candidates.append(elem) elif label and self._fuzzy_match(text_lower, label) >= 0.7: scores[elem.element_id] *= 0.7 new_candidates.append(elem) if new_candidates: candidates = new_candidates logger.debug(f"Text filter: {len(new_candidates)} candidates for text '{target_spec.by_text}'") # Scorer par position si fournie if target_spec.by_position: x, y = target_spec.by_position for elem in candidates: # BBOX format: (x, y, w, h) - Correction Fiche #2 center_x, center_y = _bbox_center(elem.bbox) distance = math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2) # Bonus pour proximité if distance < 50: scores[elem.element_id] *= 1.2 elif distance < 100: scores[elem.element_id] *= 1.1 logger.debug(f"Position scoring applied for position ({x}, {y})") # Fiche #3: Appliquer context_hints AVANT la sélection finale if hasattr(target_spec, 'context_hints') and target_spec.context_hints: candidates = self._apply_context_hints_to_candidates( candidates, target_spec.context_hints, ui_elements, scores ) logger.debug(f"Context hints applied: {len(candidates)} candidates remaining") if not candidates: logger.debug("No candidates remaining after all filters") return None # Fiche #11: Multi-anchor + contraintes combinées # Appliquer hard constraints AVANT le scoring candidates = self._apply_hard_constraints(candidates, target_spec, ui_elements) if not candidates: logger.debug("No candidates remaining after hard constraints") return None # Fiche #11: Multi-anchor system - évaluer toutes les combinaisons ancre-candidat hints = getattr(target_spec, "context_hints", None) or {} hp = self._healing_profile() # Mesurer le temps de résolution resolution_start_time = time.perf_counter() # Tâche 7.1: Évaluer chaque combinaison ancre-candidat best_combination = self._evaluate_all_anchor_candidate_combinations( target_spec, candidates, ui_elements, scores, hints, hp ) # Utiliser la meilleure combinaison trouvée selected = best_combination["element"] anchor = best_combination["anchor"] final_score = best_combination["score"] best_details = best_combination["top3"] tie_break_criterion = best_combination["tie_break_criterion"] # Vérifier que nous avons trouvé un élément if not selected: logger.debug("No element selected after multi-anchor evaluation") return None # Fiche #11 - Tâche 7.2: Calculer les métriques de performance complètes total_duration_ms = (time.perf_counter() - resolution_start_time) * 1000 # Extraire les métriques du cache de calculs si disponible cache_stats = self._computation_cache.get_stats() if hasattr(self, '_computation_cache') else {} return ResolvedTarget( element=selected, confidence=min(final_score, 1.0), strategy_used=ResolutionStrategy.COMPOSITE.value, resolution_details={ "criteria_used": { "role": target_spec.by_role, "text": target_spec.by_text, "position": target_spec.by_position, "hints": list((getattr(target_spec, "context_hints", None) or {}).keys()) }, "candidates_evaluated": len(candidates), "top3": best_details or [], # Fiche #11 - Tâche 7.2: Métadonnées complètes multi-anchor (Exigences 7.1, 7.2, 7.3, 7.5) "anchor_id": best_combination["anchor_id"], "anchors_attempted": best_combination["performance_metrics"]["anchor_texts"], "successful_anchor": best_combination["anchor_id"], "hard_constraints_applied": getattr(target_spec, "hard_constraints", None) or {}, "candidates_filtered": len(ui_elements) - len(candidates), "weights_used": getattr(target_spec, "weights", None) or {}, "tie_break_criteria": best_combination["tie_break_criterion"], "container_resolved": None, # TODO: Extraire du container_bbox si disponible "performance_metrics": { "total_resolution_duration_ms": round(total_duration_ms, 2), "multi_anchor_evaluation": best_combination["performance_metrics"], "scoring_duration_ms": best_combination["performance_metrics"]["evaluation_duration_ms"], "container_resolution_duration_ms": 0.0, # TODO: Mesurer séparément "cache_hits": cache_stats.get("hits", 0), "cache_misses": cache_stats.get("misses", 0) }, # Fiche #10: Healing metadata (existant) "healing_attempt": int(getattr(self, "healing_attempt", 0) or 0), "healing_profile": hp, "spatial_padding_used": hp["pad_mul"] } ) except Exception as e: if self.error_handler: context_data = { 'details': {'operation': 'composite_resolution'}, 'original_data': { 'target_spec': target_spec, 'num_candidates': len(candidates) if 'candidates' in locals() else 0 } } recovery_result = self.error_handler.handle_error(e, context_data) if not recovery_result.success: logger.error(f"Error in composite resolution: {e}") return None else: logger.error(f"Error in composite resolution: {e}") return None # ========================================================================= # Résolution par relations spatiales (Exigences 5.3, 5.4) # ========================================================================= def _resolve_by_spatial( self, target_spec: TargetSpec, ui_elements: List[UIElement], context: ResolutionContext ) -> Optional[ResolvedTarget]: """ Résoudre par relations spatiales avec éléments ancres. Utilise le SpatialAnalyzer pour trouver des éléments basés sur leurs relations spatiales avec des ancres connues. Args: target_spec: Spécification de la cible ui_elements: Éléments UI détectés context: Contexte de résolution Returns: ResolvedTarget si trouvé via relation spatiale """ if not SPATIAL_ANALYZER_AVAILABLE: logger.debug("SpatialAnalyzer not available for spatial resolution") return None # Vérifier si on a des hints de contexte spatial if not hasattr(target_spec, 'context_hints') or not target_spec.context_hints: return None hints = target_spec.context_hints # Lazy load du spatial analyzer if self._spatial_analyzer is None: self._spatial_analyzer = SpatialAnalyzer() # Calculer les relations spatiales (avec cache) screen_id = getattr(context.screen_state, 'screen_state_id', 'default') if screen_id not in self._spatial_relations_cache: self._spatial_relations_cache[screen_id] = self._spatial_analyzer.compute_relations(ui_elements) relations = self._spatial_relations_cache[screen_id] candidates = list(ui_elements) anchor_used = None # Filtrer par relation "below" (en dessous de) if 'below_text' in hints: anchor = self._find_element_by_text(hints['below_text'], ui_elements) if anchor: anchor_id = self._get_element_id(anchor) below_ids = self._spatial_analyzer.find_by_relation( anchor_id, RelationType.BELOW, relations ) candidates = [e for e in candidates if self._get_element_id(e) in below_ids] anchor_used = anchor_id # Filtrer par relation "above" (au-dessus de) if 'above_text' in hints: anchor = self._find_element_by_text(hints['above_text'], ui_elements) if anchor: anchor_id = self._get_element_id(anchor) above_ids = self._spatial_analyzer.find_by_relation( anchor_id, RelationType.ABOVE, relations ) candidates = [e for e in candidates if self._get_element_id(e) in above_ids] anchor_used = anchor_id # Filtrer par relation "right_of" (à droite de) if 'right_of_text' in hints: anchor = self._find_element_by_text(hints['right_of_text'], ui_elements) if anchor: anchor_id = self._get_element_id(anchor) right_ids = self._spatial_analyzer.find_by_relation( anchor_id, RelationType.RIGHT_OF, relations ) candidates = [e for e in candidates if self._get_element_id(e) in right_ids] anchor_used = anchor_id # Filtrer par relation "left_of" (à gauche de) if 'left_of_text' in hints: anchor = self._find_element_by_text(hints['left_of_text'], ui_elements) if anchor: anchor_id = self._get_element_id(anchor) left_ids = self._spatial_analyzer.find_by_relation( anchor_id, RelationType.LEFT_OF, relations ) candidates = [e for e in candidates if self._get_element_id(e) in left_ids] anchor_used = anchor_id # Filtrer par rôle si spécifié if target_spec.by_role and candidates: role_lower = target_spec.by_role.lower() candidates = [e for e in candidates if (getattr(e, 'role', '') or '').lower() == role_lower] if candidates: # Sélectionner le meilleur candidat best = self._apply_selection_policy( candidates, target_spec.selection_policy or "first", context ) if best: # Détecter l'état visuel visual_state = self._detect_visual_state(best) return ResolvedTarget( element=best, confidence=best.confidence * 0.85, # Légère pénalité pour fallback strategy_used=ResolutionStrategy.BY_SPATIAL.value, resolution_details={ "hints_applied": list(hints.keys()), "anchor_used": anchor_used, "candidates_found": len(candidates) }, visual_state=visual_state, spatial_anchor=anchor_used ) return None def _get_element_id(self, element: Any) -> str: """Extraire l'ID d'un élément.""" if hasattr(element, 'element_id'): return element.element_id if hasattr(element, 'id'): return element.id if isinstance(element, dict): return element.get('id', element.get('element_id', str(id(element)))) return str(id(element)) def _apply_context_hints_to_candidates( self, candidates: List[UIElement], context_hints: Dict[str, Any], all_elements: List[UIElement], scores: Dict[str, float] ) -> List[UIElement]: """ Appliquer les context_hints pour filtrer les candidats. Fiche #3: Nouvelle méthode pour intégrer context_hints dans la résolution composite. Auteur: Dom, Alice Kiro - 15 décembre 2024 Args: candidates: Liste des candidats actuels context_hints: Dictionnaire des hints contextuels all_elements: Tous les éléments UI disponibles scores: Dictionnaire des scores par élément Returns: Liste filtrée des candidats """ filtered_candidates = list(candidates) try: # Filtrer par "below_text" (en dessous de) - Support multi-anchor if 'below_text' in context_hints: below_text_value = context_hints['below_text'] anchor_texts = self._as_text_list(below_text_value) found_anchor = None for text in anchor_texts: anchor = self._find_element_by_text(text, all_elements) if anchor: found_anchor = anchor break if found_anchor: anchor_bottom = _bbox_bottom(found_anchor.bbox) filtered_candidates = [ e for e in filtered_candidates if _bbox_to_tuple(e.bbox)[1] > anchor_bottom ] # Bonus de score pour proximité verticale for elem in filtered_candidates: vertical_distance = abs(_bbox_to_tuple(elem.bbox)[1] - anchor_bottom) if vertical_distance < 100: scores[elem.element_id] *= 1.1 logger.debug(f"below_text '{below_text_value}': {len(filtered_candidates)} candidates") # Filtrer par "above_text" (au-dessus de) - Support multi-anchor if 'above_text' in context_hints: above_text_value = context_hints['above_text'] anchor_texts = self._as_text_list(above_text_value) found_anchor = None for text in anchor_texts: anchor = self._find_element_by_text(text, all_elements) if anchor: found_anchor = anchor break if found_anchor: anchor_top = _bbox_to_tuple(found_anchor.bbox)[1] filtered_candidates = [ e for e in filtered_candidates if _bbox_bottom(e.bbox) < anchor_top ] # Bonus de score pour proximité verticale for elem in filtered_candidates: vertical_distance = abs(_bbox_bottom(elem.bbox) - anchor_top) if vertical_distance < 100: scores[elem.element_id] *= 1.1 logger.debug(f"above_text '{above_text_value}': {len(filtered_candidates)} candidates") # Filtrer par "right_of_text" (à droite de) - Support multi-anchor if 'right_of_text' in context_hints: right_of_text_value = context_hints['right_of_text'] anchor_texts = self._as_text_list(right_of_text_value) found_anchor = None for text in anchor_texts: anchor = self._find_element_by_text(text, all_elements) if anchor: found_anchor = anchor break if found_anchor: anchor_right = _bbox_right(found_anchor.bbox) filtered_candidates = [ e for e in filtered_candidates if _bbox_to_tuple(e.bbox)[0] > anchor_right ] # Bonus de score pour proximité horizontale for elem in filtered_candidates: horizontal_distance = abs(_bbox_to_tuple(elem.bbox)[0] - anchor_right) if horizontal_distance < 100: scores[elem.element_id] *= 1.1 logger.debug(f"right_of_text '{right_of_text_value}': {len(filtered_candidates)} candidates") # Filtrer par "left_of_text" (à gauche de) - Support multi-anchor if 'left_of_text' in context_hints: left_of_text_value = context_hints['left_of_text'] anchor_texts = self._as_text_list(left_of_text_value) found_anchor = None for text in anchor_texts: anchor = self._find_element_by_text(text, all_elements) if anchor: found_anchor = anchor break if found_anchor: anchor_left = _bbox_to_tuple(found_anchor.bbox)[0] filtered_candidates = [ e for e in filtered_candidates if _bbox_right(e.bbox) < anchor_left ] # Bonus de score pour proximité horizontale for elem in filtered_candidates: horizontal_distance = abs(_bbox_right(elem.bbox) - anchor_left) if horizontal_distance < 100: scores[elem.element_id] *= 1.1 logger.debug(f"left_of_text '{left_of_text_value}': {len(filtered_candidates)} candidates") # Filtrer par "near_text" (près de) - Support multi-anchor if 'near_text' in context_hints: near_text_value = context_hints['near_text'] # Fiche #11: Support multi-anchor - convertir en liste anchor_texts = self._as_text_list(near_text_value) # Trouver au moins une ancre parmi les textes possibles found_anchor = None for text in anchor_texts: anchor = self._find_element_by_text(text, all_elements) if anchor: found_anchor = anchor break if found_anchor: max_distance = context_hints.get('max_distance', 200) filtered_candidates = self._filter_by_proximity( filtered_candidates, found_anchor, max_distance ) # Bonus de score inversement proportionnel à la distance anchor_center = _bbox_center(found_anchor.bbox) for elem in filtered_candidates: elem_center = _bbox_center(elem.bbox) distance = math.sqrt( (elem_center[0] - anchor_center[0]) ** 2 + (elem_center[1] - anchor_center[1]) ** 2 ) if distance < max_distance: proximity_bonus = 1.0 + (1.0 - distance / max_distance) * 0.2 scores[elem.element_id] *= proximity_bonus logger.debug(f"near_text '{near_text_value}': {len(filtered_candidates)} candidates") # Amélioration: Filtrer par "within_region" (dans une région) if 'within_region' in context_hints: region = context_hints['within_region'] if isinstance(region, (list, tuple)) and len(region) == 4: x, y, w, h = region filtered_candidates = [ e for e in filtered_candidates if (_bbox_to_tuple(e.bbox)[0] >= x and _bbox_to_tuple(e.bbox)[1] >= y and _bbox_to_tuple(e.bbox)[0] + _bbox_to_tuple(e.bbox)[2] <= x + w and _bbox_to_tuple(e.bbox)[1] + _bbox_to_tuple(e.bbox)[3] <= y + h) ] logger.debug(f"within_region {region}: {len(filtered_candidates)} candidates") # Amélioration: Exclure par "exclude_near_text" (exclure près de) - Support multi-anchor if 'exclude_near_text' in context_hints: exclude_near_text_value = context_hints['exclude_near_text'] anchor_texts = self._as_text_list(exclude_near_text_value) found_exclude_anchor = None for text in anchor_texts: exclude_anchor = self._find_element_by_text(text, all_elements) if exclude_anchor: found_exclude_anchor = exclude_anchor break if found_exclude_anchor: exclude_distance = context_hints.get('exclude_distance', 100) exclude_candidates = self._filter_by_proximity( filtered_candidates, found_exclude_anchor, exclude_distance ) # Retirer les candidats trop proches de l'ancre d'exclusion filtered_candidates = [ e for e in filtered_candidates if e not in exclude_candidates ] logger.debug(f"exclude_near_text '{exclude_near_text_value}': {len(filtered_candidates)} candidates") return filtered_candidates except Exception as e: logger.error(f"Error applying context hints: {e}") return candidates # Retourner les candidats originaux en cas d'erreur # ========================================================================= # Détection d'état visuel (Exigence 5.5) # ========================================================================= def _detect_visual_state(self, element: UIElement) -> VisualState: """ Détecter l'état visuel d'un élément UI. Analyse les propriétés visuelles pour déterminer si l'élément est: - enabled/disabled (couleur, opacité, bordure) - focused/hovered - checked/unchecked - loading/error Args: element: Élément UI à analyser Returns: VisualState détecté """ # Vérifier les attributs directs if hasattr(element, 'state'): state_str = str(element.state).lower() if 'disabled' in state_str: return VisualState.DISABLED if 'focused' in state_str: return VisualState.FOCUSED if 'checked' in state_str: return VisualState.CHECKED if 'loading' in state_str: return VisualState.LOADING if 'error' in state_str: return VisualState.ERROR # Vérifier les attributs de style if hasattr(element, 'style') or hasattr(element, 'attributes'): attrs = getattr(element, 'attributes', {}) or getattr(element, 'style', {}) # Vérifier opacité (disabled souvent = opacité réduite) opacity = attrs.get('opacity', 1.0) if isinstance(opacity, (int, float)) and opacity < 0.5: return VisualState.DISABLED # Vérifier couleur de fond (gris = souvent disabled) bg_color = attrs.get('background_color', attrs.get('backgroundColor', '')) if self._is_disabled_color(bg_color): return VisualState.DISABLED # Vérifier bordure (rouge = souvent erreur) border_color = attrs.get('border_color', attrs.get('borderColor', '')) if self._is_error_color(border_color): return VisualState.ERROR # Vérifier le texte pour indices if hasattr(element, 'label') and element.label: label_lower = element.label.lower() if 'loading' in label_lower or 'chargement' in label_lower: return VisualState.LOADING if 'error' in label_lower or 'erreur' in label_lower: return VisualState.ERROR # Vérifier la confiance (faible confiance peut indiquer état inhabituel) if hasattr(element, 'confidence') and element.confidence < 0.5: return VisualState.UNKNOWN return VisualState.ENABLED def _is_disabled_color(self, color: str) -> bool: """Vérifier si une couleur indique un état disabled.""" if not color: return False color_lower = color.lower() # Couleurs grises typiques pour disabled disabled_patterns = ['gray', 'grey', '#808080', '#a0a0a0', '#c0c0c0', 'rgb(128', 'rgb(160', 'rgb(192'] return any(pattern in color_lower for pattern in disabled_patterns) def _is_error_color(self, color: str) -> bool: """Vérifier si une couleur indique un état erreur.""" if not color: return False color_lower = color.lower() # Couleurs rouges typiques pour erreur error_patterns = ['red', '#ff0000', '#f00', 'rgb(255,0,0', 'rgb(255, 0, 0', '#dc3545', '#d9534f'] return any(pattern in color_lower for pattern in error_patterns) def resolve_with_spatial_fallback( self, target_spec: TargetSpec, screen_state: ScreenState, anchor_elements: List[UIElement] = None, context: Optional[ResolutionContext] = None ) -> Optional[ResolvedTarget]: """ Résoudre une cible avec fallback spatial automatique. Essaie d'abord les stratégies standard, puis utilise les relations spatiales avec les éléments ancres si le match direct échoue. Args: target_spec: Spécification de la cible screen_state: État actuel de l'écran anchor_elements: Éléments ancres pour fallback spatial context: Contexte additionnel Returns: ResolvedTarget ou None """ # Essayer résolution standard result = self.resolve_target(target_spec, screen_state, context) if result: # Ajouter détection d'état visuel result.visual_state = self._detect_visual_state(result.element) return result # Fallback spatial si activé et ancres disponibles if not self.use_spatial_fallback or not anchor_elements: return None logger.debug(f"Attempting spatial fallback with {len(anchor_elements)} anchors") # Créer contexte avec ancres if context is None: context = ResolutionContext(screen_state=screen_state) context.anchor_elements = anchor_elements ui_elements = self._get_ui_elements(screen_state) # Essayer résolution spatiale result = self._resolve_by_spatial(target_spec, ui_elements, context) if result: result.fallback_applied = True self._stats["spatial_fallbacks"] += 1 logger.info(f"Spatial fallback successful for target: {target_spec.by_role or target_spec.by_text}") return result # ========================================================================= # Utilitaires # ========================================================================= def _get_ui_elements(self, screen_state: ScreenState) -> List[UIElement]: """Extraire les éléments UI du screen state.""" # Essayer différentes sources if hasattr(screen_state, 'ui_elements') and screen_state.ui_elements: return screen_state.ui_elements if hasattr(screen_state, 'perception') and screen_state.perception: if hasattr(screen_state.perception, 'ui_elements'): return screen_state.perception.ui_elements or [] return [] def _apply_selection_policy( self, candidates: List[UIElement], policy: str, context: ResolutionContext ) -> Optional[UIElement]: """Appliquer une politique de sélection.""" if not candidates: return None if policy == "first": # Premier dans l'ordre de lecture (top-left) return min(candidates, key=lambda e: (_bbox_to_tuple(e.bbox)[1], _bbox_to_tuple(e.bbox)[0])) elif policy == "last": # Dernier dans l'ordre de lecture return max(candidates, key=lambda e: (_bbox_to_tuple(e.bbox)[1], _bbox_to_tuple(e.bbox)[0])) elif policy == "by_similarity" or policy == "highest_confidence": return max(candidates, key=lambda e: e.confidence) elif policy == "largest": # Plus grand élément return max(candidates, key=lambda e: _bbox_area(e.bbox)) elif policy == "nearest_to_previous": # Plus proche de la cible précédente # BBOX format: (x, y, w, h) - Correction Fiche #2 if context.previous_target: prev_bbox = _bbox_to_tuple(context.previous_target.bbox) prev_center = ( prev_bbox[0] + prev_bbox[2] / 2, # x + w/2 prev_bbox[1] + prev_bbox[3] / 2 # y + h/2 ) return min(candidates, key=lambda e: ( ((_bbox_to_tuple(e.bbox)[0] + _bbox_to_tuple(e.bbox)[2]/2) - prev_center[0])**2 + ((_bbox_to_tuple(e.bbox)[1] + _bbox_to_tuple(e.bbox)[3]/2) - prev_center[1])**2 )) # Default: premier return candidates[0] def _fuzzy_match(self, s1: str, s2: str) -> float: """Calcul de similarité fuzzy (Levenshtein normalisé).""" if not s1 or not s2: return 0.0 if s1 == s2: return 1.0 # Levenshtein distance len1, len2 = len(s1), len(s2) if len1 < len2: s1, s2 = s2, s1 len1, len2 = len2, len1 distances = range(len2 + 1) for i, c1 in enumerate(s1): new_distances = [i + 1] for j, c2 in enumerate(s2): if c1 == c2: new_distances.append(distances[j]) else: new_distances.append(1 + min(distances[j], distances[j+1], new_distances[-1])) distances = new_distances max_len = max(len1, len2) return 1.0 - (distances[-1] / max_len) def _compute_similarity(self, emb1: np.ndarray, emb2: np.ndarray) -> float: """Calculer la similarité cosinus entre deux embeddings.""" if emb1 is None or emb2 is None: return 0.0 norm1 = np.linalg.norm(emb1) norm2 = np.linalg.norm(emb2) if norm1 == 0 or norm2 == 0: return 0.0 return float(np.dot(emb1, emb2) / (norm1 * norm2)) def _filter_by_proximity( self, elements: List[UIElement], anchor: UIElement, max_distance: float ) -> List[UIElement]: """ Filtrer les éléments par proximité à un ancre avec cache optimisé. Tâche 5.4: Utilise le cache de calculs pour éviter les recalculs de distance. """ # BBOX format: (x, y, w, h) - Correction Fiche #2 # Auteur: Dom, Alice Kiro - 15 décembre 2024 anchor_center = cached_bbox_center(_bbox_to_tuple(anchor.bbox)) filtered = [] for elem in elements: # Utiliser le cache de calculs pour la distance distance = self._computation_cache.get_distance( elem.element_id, anchor.element_id, lambda: cached_euclidean_distance( cached_bbox_center(_bbox_to_tuple(elem.bbox)), anchor_center ) ) if distance <= max_distance: filtered.append(elem) return filtered def _is_composite_spec(self, target_spec: TargetSpec) -> bool: """ Vérifier si la spec utilise plusieurs critères. Fiche #3: Inclut maintenant context_hints comme critère composite. Auteur: Dom, Alice Kiro - 15 décembre 2024 """ criteria_count = sum([ target_spec.by_role is not None, target_spec.by_text is not None, target_spec.by_position is not None, hasattr(target_spec, 'context_hints') and target_spec.context_hints is not None and len(target_spec.context_hints) > 0 ]) return criteria_count >= 2 # ========================================================================= # Cache # ========================================================================= def _make_cache_key(self, target_spec: TargetSpec, screen_state: ScreenState) -> str: """ Créer une clé de cache. Fiche #3: Inclut maintenant context_hints dans la clé de cache. Auteur: Dom, Alice Kiro - 15 décembre 2024 """ # Sérialiser context_hints pour la clé de cache context_hints_str = "" if hasattr(target_spec, 'context_hints') and target_spec.context_hints: try: context_hints_str = json.dumps(target_spec.context_hints, sort_keys=True) except Exception: context_hints_str = str(target_spec.context_hints) spec_key = f"{target_spec.by_role}|{target_spec.by_text}|{target_spec.by_position}|{context_hints_str}" state_key = screen_state.screen_state_id if hasattr(screen_state, 'screen_state_id') else "unknown" return f"{spec_key}:{state_key}" def _add_to_cache(self, key: str, result: ResolvedTarget) -> None: """Ajouter au cache LRU.""" if key in self._cache: self._cache.move_to_end(key) else: self._cache[key] = result if len(self._cache) > self._cache_size: self._cache.popitem(last=False) def _record_in_cross_frame_memory(self, target_spec, screen_state, ui_elements, selected_element): """Enregistrer en mémoire cross-frame à chaque succès""" sig = screen_signature(screen_state, ui_elements, mode="layout") ckey = self._make_cross_key(target_spec, sig) self._cross_frame_cache[ckey] = TargetFingerprint.from_element(selected_element) self._cross_frame_cache.move_to_end(ckey) if len(self._cross_frame_cache) > self._cross_frame_cache_size: self._cross_frame_cache.popitem(last=False) def clear_cache(self) -> None: """ Vider tous les caches avec optimisations. Tâche 5.4: Inclut le nettoyage du cache de calculs. """ self._cache.clear() self._computation_cache.clear() logger.debug("All caches cleared (resolution + computation)") def _get_spatial_index(self, screen_state: ScreenState, ui_elements: List[UIElement]) -> SpatialIndexGrid: """ Obtient ou crée un index spatial pour un screen_state donné avec signature optimisée. Tâche 5.1: Cache de l'index spatial par signature d'écran. Évite la reconstruction à chaque résolution pour la même disposition UI. Auteur : Dom, Alice Kiro - 20 décembre 2024 """ # Utiliser signature d'écran au lieu de screen_state_id pour plus de robustesse screen_sig = screen_signature(screen_state, ui_elements, mode="layout") if screen_sig in self._index_cache: self._index_cache.move_to_end(screen_sig) logger.debug(f"Spatial index cache hit for signature: {screen_sig[:16]}...") return self._index_cache[screen_sig] # Construire nouvel index spatial try: sw = int(screen_state.window.screen_resolution[0]) except Exception: sw = 1920 # cell_size "UI-friendly" : ~ 1/12e de la largeur, borné cell = max(120, min(220, sw // 12)) logger.debug(f"Building spatial index for {len(ui_elements)} elements with cell_size={cell}") start_time = time.perf_counter() idx = SpatialIndexGrid(cell_size=cell).build(ui_elements) build_time = (time.perf_counter() - start_time) * 1000 logger.debug(f"Spatial index built in {build_time:.2f}ms") # Ajouter au cache avec éviction LRU self._index_cache[screen_sig] = idx self._index_cache.move_to_end(screen_sig) if len(self._index_cache) > self._index_cache_size: evicted_key = next(iter(self._index_cache)) self._index_cache.popitem(last=False) logger.debug(f"Evicted spatial index cache entry: {evicted_key[:16]}...") return idx def clear_index_cache(self) -> None: """Vider le cache d'index spatiaux.""" self._index_cache.clear() # ========================================================================= # Stats # ========================================================================= def get_stats(self) -> Dict[str, Any]: """ Obtenir les statistiques de résolution avec optimisations. Tâche 5.4: Inclut les stats du cache de calculs. """ base_stats = dict(self._stats) # Ajouter les stats du cache de calculs computation_stats = self._computation_cache.get_stats() base_stats['computation_cache'] = computation_stats # Ajouter les stats des caches d'index base_stats['spatial_index_cache'] = { 'size': len(self._index_cache), 'max_size': self._index_cache_size } # Ajouter les stats de mémoire cross-frame base_stats['cross_frame_cache'] = { 'size': len(self._cross_frame_cache), 'max_size': self._cross_frame_cache_size } return base_stats def reset_stats(self) -> None: """Réinitialiser les statistiques.""" self._stats = { "total_resolutions": 0, "successful": 0, "failed": 0, "cache_hits": 0, "fallbacks_used": 0, "spatial_fallbacks": 0, "by_strategy": {} } def clear_spatial_cache(self) -> None: """Vider le cache des relations spatiales.""" self._spatial_relations_cache.clear() def get_spatial_analyzer(self) -> Optional[SpatialAnalyzer]: """Récupérer l'analyseur spatial (lazy load).""" if self._spatial_analyzer is None and SPATIAL_ANALYZER_AVAILABLE: self._spatial_analyzer = SpatialAnalyzer() return self._spatial_analyzer # ========================================================================= # Fiche #18 - Apprentissage persistant "mix" (JSONL + SQLite) # ========================================================================= def _lookup_from_persistent_memory( self, target_spec: TargetSpec, screen_state: ScreenState, ui_elements: List[UIElement] ) -> Optional[ResolvedTarget]: """ Rechercher une résolution depuis la mémoire persistante. Fiche #18: Lookup avant résolution coûteuse pour réutiliser les apprentissages précédents. Args: target_spec: Spécification de la cible screen_state: État actuel de l'écran ui_elements: Éléments UI disponibles Returns: ResolvedTarget si trouvé en mémoire, None sinon """ if not self.enable_persistent_learning or not self._persistent_memory: return None try: # Générer signature d'écran from .screen_signature import screen_signature screen_sig = screen_signature(screen_state, ui_elements, mode="layout") # Lookup dans la mémoire persistante fingerprint = self._persistent_memory.lookup(screen_sig, target_spec) if fingerprint is None: return None # Essayer de retrouver l'élément par element_id element_by_id = {e.element_id: e for e in ui_elements} if fingerprint.element_id in element_by_id: element = element_by_id[fingerprint.element_id] return ResolvedTarget( element=element, confidence=min(0.95, fingerprint.confidence), strategy_used="PERSISTENT_MEMORY_ID", fallback_applied=False, resolution_details={ "source": "persistent_memory", "lookup_method": "element_id", "screen_signature": screen_sig[:16] + "..." } ) # Fallback: recherche par position et caractéristiques best_match = None best_score = 0.0 fp_center = ( fingerprint.bbox[0] + fingerprint.bbox[2] / 2, fingerprint.bbox[1] + fingerprint.bbox[3] / 2 ) for elem in ui_elements: score = 0.0 # Score par rôle/type elem_role = (getattr(elem, 'role', '') or '').lower() elem_type = (getattr(elem, 'type', '') or '').lower() if fingerprint.role and elem_role == fingerprint.role.lower(): score += 2.0 if fingerprint.etype and elem_type == fingerprint.etype.lower(): score += 1.5 # Score par proximité spatiale elem_center = _bbox_center(elem.bbox) distance = ((elem_center[0] - fp_center[0]) ** 2 + (elem_center[1] - fp_center[1]) ** 2) ** 0.5 # Tolérance de 100px pour les changements de layout if distance <= 100: score += (100 - distance) / 100.0 # Score par label si disponible if fingerprint.label and hasattr(elem, 'label'): elem_label = (getattr(elem, 'label', '') or '').strip().lower() fp_label = fingerprint.label.strip().lower() if fp_label and elem_label: if fp_label == elem_label: score += 1.0 elif fp_label in elem_label or elem_label in fp_label: score += 0.7 else: # Fuzzy matching ratio = _fuzzy_ratio(fp_label, elem_label) if ratio >= 0.8: score += ratio * 0.6 if score > best_score and score >= 2.0: # Seuil minimum best_score = score best_match = elem if best_match: return ResolvedTarget( element=best_match, confidence=min(0.90, best_score / 5.0), # Normaliser sur 5.0 max strategy_used="PERSISTENT_MEMORY_SPATIAL", fallback_applied=False, resolution_details={ "source": "persistent_memory", "lookup_method": "spatial_matching", "match_score": round(best_score, 3), "screen_signature": screen_sig[:16] + "..." } ) return None except Exception as e: logger.debug(f"Persistent memory lookup failed: {e}") return None def record_resolution_success( self, target_spec: TargetSpec, screen_state: ScreenState, ui_elements: List[UIElement], resolved_element: UIElement, strategy_used: str, confidence: float ) -> None: """ Enregistrer une résolution réussie dans la mémoire persistante. Fiche #18: Hook appelé après validation des post-conditions pour apprendre des résolutions réussies. Args: target_spec: Spécification de la cible screen_state: État de l'écran ui_elements: Éléments UI disponibles resolved_element: Élément résolu avec succès strategy_used: Stratégie de résolution utilisée confidence: Confiance de la résolution """ if not self.enable_persistent_learning or not self._persistent_memory: return try: # Générer signature d'écran from .screen_signature import screen_signature screen_sig = screen_signature(screen_state, ui_elements, mode="layout") # Créer fingerprint de l'élément résolu fingerprint = PersistentFingerprint( element_id=resolved_element.element_id, bbox=tuple(_bbox_to_tuple(resolved_element.bbox)), role=getattr(resolved_element, 'role', None), etype=getattr(resolved_element, 'type', None), label=getattr(resolved_element, 'label', None), confidence=confidence ) # Enregistrer le succès self._persistent_memory.record_success( screen_signature=screen_sig, target_spec=target_spec, fingerprint=fingerprint, strategy_used=strategy_used, confidence=confidence ) logger.debug( f"Recorded successful resolution in persistent memory: " f"element={resolved_element.element_id} strategy={strategy_used} " f"confidence={confidence:.3f}" ) except Exception as e: logger.warning(f"Failed to record resolution success: {e}") def record_resolution_failure( self, target_spec: TargetSpec, screen_state: ScreenState, ui_elements: List[UIElement], error_message: str ) -> None: """ Enregistrer un échec de résolution dans la mémoire persistante. Fiche #18: Hook appelé après échec de résolution pour tracker les patterns problématiques. Args: target_spec: Spécification de la cible screen_state: État de l'écran ui_elements: Éléments UI disponibles error_message: Message d'erreur """ if not self.enable_persistent_learning or not self._persistent_memory: return try: # Générer signature d'écran from .screen_signature import screen_signature screen_sig = screen_signature(screen_state, ui_elements, mode="layout") # Enregistrer l'échec self._persistent_memory.record_failure( screen_signature=screen_sig, target_spec=target_spec, error_message=error_message ) logger.debug( f"Recorded resolution failure in persistent memory: " f"error='{error_message}'" ) except Exception as e: logger.warning(f"Failed to record resolution failure: {e}") def get_persistent_memory_stats(self) -> Optional[Dict[str, Any]]: """ Obtenir les statistiques de la mémoire persistante. Returns: Dictionnaire avec statistiques ou None si non disponible """ if not self.enable_persistent_learning or not self._persistent_memory: return None try: return self._persistent_memory.get_stats() except Exception as e: logger.warning(f"Failed to get persistent memory stats: {e}") return None def cleanup_persistent_memory(self, days_to_keep: int = 90) -> Optional[int]: """ Nettoyer les entrées anciennes de la mémoire persistante. Args: days_to_keep: Nombre de jours à conserver Returns: Nombre d'entrées supprimées ou None si non disponible """ if not self.enable_persistent_learning or not self._persistent_memory: return None try: return self._persistent_memory.cleanup_old_entries(days_to_keep) except Exception as e: logger.warning(f"Failed to cleanup persistent memory: {e}") return None