Files
rpa_vision_v3/core/execution/target_resolver.py
Dom 53d29d9b24
Some checks failed
security-audit / Bandit (scan statique) (push) Successful in 13s
security-audit / pip-audit (CVE dépendances) (push) Successful in 12s
security-audit / Scan secrets (grep) (push) Successful in 9s
tests / Lint (ruff + black) (push) Successful in 13s
tests / Tests unitaires (sans GPU) (push) Failing after 14s
tests / Tests sécurité (critique) (push) Has been skipped
fix(lint): ruff passe propre — 2 vrais bugs + suppression fichier corrompu
Vrais bugs corrigés :
- core/execution/target_resolver.py : suppression de 5 lignes de dead code
  après un return (vestige de refacto incomplète référençant des params
  jamais assignés à self : similarity_threshold, use_spatial_fallback)
- agent_v0/agent_v1/core/executor.py:2180 : variable `prefill` référencée
  mais jamais définie. Initialisation explicite ajoutée en amont
  (conditionnée sur _is_thinking_popup, cohérent avec l'append du message)

Fichier supprimé :
- core/security/input_validator_new.py : contenu corrompu (texte inversé,
  artefact de copier-coller), jamais importé nulle part, 550 erreurs ruff
  à lui seul

Workflow CI :
- Exclusions ajoutées pour dossiers legacy connus cassés :
    - agent_v0/deploy/windows_client/ (clone obsolète)
    - tests/property/ (cf. MEMORY.md — imports cassés)
    - tests/integration/test_visual_rpa_checkpoint.py (VisualMetadata
      inexistant, déjà documenté)

Résultat : "ruff All checks passed!" sur core/ agent_v0/ tests/
(avec E9,F63,F7,F82 — syntax + undefined critiques).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-15 19:01:11 +02:00

3490 lines
144 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Target Resolver Avancé - Résolution de cibles UI pour RPA Vision V3
Auteur : Dom, Alice Kiro - 15 décembre 2024
Stratégies de résolution:
1. By Role - Recherche par rôle sémantique (button, input, etc.)
2. By Text - Recherche par texte exact ou partiel
3. By Position - Recherche par coordonnées avec tolérance
4. By Embedding - Recherche par similarité visuelle
5. By Hierarchy - Recherche dans la hiérarchie DOM/UI
6. By Context - Recherche contextuelle (près de X, dans Y)
7. By Spatial - Recherche par relations spatiales (Exigences 5.3, 5.4)
8. Composite - Combinaison de plusieurs stratégies
Fonctionnalités avancées:
- Fallback automatique entre stratégies
- Fallback spatial avec éléments ancres (Exigence 5.4)
- Détection d'état visuel (enabled/disabled) (Exigence 5.5)
- Scoring multi-critères
- Cache de résolutions récentes
- Apprentissage des patterns de résolution
- Gestion d'erreurs centralisée avec ErrorHandler
- Fiche #18: Apprentissage persistant "mix" (JSONL + SQLite)
"""
import logging
import json
import math
import re
import time
import unicodedata
from typing import Optional, List, Tuple, Dict, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
import numpy as np
from collections import OrderedDict
from difflib import SequenceMatcher
from ..models.screen_state import ScreenState
from ..models.ui_element import UIElement
from ..models.workflow_graph import TargetSpec
from .spatial_index import SpatialIndexGrid
from .screen_signature import screen_signature
from .target_memory import TargetFingerprint
from .computation_cache import ComputationCache, cached_bbox_center, cached_euclidean_distance
# Fiche #18: Import TargetMemoryStore pour apprentissage persistant
try:
from ..learning.target_memory_store import TargetMemoryStore, TargetFingerprint as PersistentFingerprint
TARGET_MEMORY_AVAILABLE = True
except ImportError:
TARGET_MEMORY_AVAILABLE = False
TargetMemoryStore = None
PersistentFingerprint = None
# Fiche #13: Helpers bbox XYWH standardisés
def _bbox_to_tuple(bbox):
"""Convert bbox to tuple format (x, y, w, h) for compatibility"""
if hasattr(bbox, 'to_tuple'):
return bbox.to_tuple()
return bbox
def _bbox_right(bbox) -> float:
bbox_tuple = _bbox_to_tuple(bbox)
return float(bbox_tuple[0] + bbox_tuple[2])
def _bbox_bottom(bbox) -> float:
bbox_tuple = _bbox_to_tuple(bbox)
return float(bbox_tuple[1] + bbox_tuple[3])
def _bbox_center(bbox) -> tuple[float, float]:
bbox_tuple = _bbox_to_tuple(bbox)
return (float(bbox_tuple[0] + bbox_tuple[2] / 2), float(bbox_tuple[1] + bbox_tuple[3] / 2))
def _bbox_area(bbox) -> float:
bbox_tuple = _bbox_to_tuple(bbox)
return float(bbox_tuple[2] * bbox_tuple[3])
def _bbox_contains_point(bbox, x: float, y: float) -> bool:
"""Point inside bbox"""
bbox_tuple = _bbox_to_tuple(bbox)
return (bbox_tuple[0] <= x <= bbox_tuple[0] + bbox_tuple[2]) and (bbox_tuple[1] <= y <= bbox_tuple[1] + bbox_tuple[3])
def _bbox_intersects(a, b) -> bool:
"""Check if two bboxes intersect"""
a_tuple = _bbox_to_tuple(a)
b_tuple = _bbox_to_tuple(b)
ax1, ay1, aw, ah = a_tuple
bx1, by1, bw, bh = b_tuple
ax2, ay2 = ax1 + aw, ay1 + ah
bx2, by2 = bx1 + bw, by1 + bh
return not (ax2 <= bx1 or bx2 <= ax1 or ay2 <= by1 or by2 <= ay1)
# Import SpatialAnalyzer pour fallback spatial
try:
from ..detection.spatial_analyzer import SpatialAnalyzer, RelationType, SpatialRelation
SPATIAL_ANALYZER_AVAILABLE = True
except ImportError:
SPATIAL_ANALYZER_AVAILABLE = False
SpatialAnalyzer = None
RelationType = None
# Import MetricsEngine pour Fiche #10
try:
from ..precision.metrics_engine import MetricsEngine, get_global_metrics_engine
METRICS_AVAILABLE = True
except ImportError:
METRICS_AVAILABLE = False
MetricsEngine = None
get_global_metrics_engine = lambda: None
# Import ErrorHandler pour gestion centralisée des erreurs
try:
from .error_handler import ErrorHandler
ERROR_HANDLER_AVAILABLE = True
except ImportError:
ERROR_HANDLER_AVAILABLE = False
ErrorHandler = None
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Fiche #10 - Auto-healing: Role Aliases (terrain)
# ---------------------------------------------------------------------------
ROLE_ALIASES = {
"input": {"input", "textfield", "text_field", "form_input", "forminput", "edit", "textbox"},
"button": {"button", "submit", "action", "cta"},
"label": {"label", "text", "data_display"},
"checkbox": {"checkbox", "check_box", "toggle"},
}
TYPE_ALIASES = {
"text_input": {"text_input", "input", "textfield"},
"button": {"button"},
}
# ---------------------------------------------------------------------------
# BBox helpers (contract: UIElement.bbox = (x, y, w, h))
# ---------------------------------------------------------------------------
def _bbox_contains(outer, inner) -> bool:
"""inner bbox fully inside outer bbox (XYWH)"""
outer_tuple = _bbox_to_tuple(outer)
inner_tuple = _bbox_to_tuple(inner)
return (
inner_tuple[0] >= outer_tuple[0]
and inner_tuple[1] >= outer_tuple[1]
and _bbox_right(inner) <= _bbox_right(outer)
and _bbox_bottom(inner) <= _bbox_bottom(outer)
)
def _bbox_intersection_area(a, b) -> float:
a_tuple = _bbox_to_tuple(a)
b_tuple = _bbox_to_tuple(b)
x1 = max(a_tuple[0], b_tuple[0])
y1 = max(a_tuple[1], b_tuple[1])
x2 = min(_bbox_right(a), _bbox_right(b))
y2 = min(_bbox_bottom(a), _bbox_bottom(b))
if x2 <= x1 or y2 <= y1:
return 0.0
return float((x2 - x1) * (y2 - y1))
def _bbox_iou(a, b) -> float:
inter = _bbox_intersection_area(a, b)
if inter <= 0:
return 0.0
union = _bbox_area(a) + _bbox_area(b) - inter
return float(inter / union) if union > 0 else 0.0
# ---------------------------------------------------------------------------
# Fiche #7 - Layout helpers pour Form Logic
# ---------------------------------------------------------------------------
def _overlap_1d(a1, a2, b1, b2) -> float:
"""Calcul overlap 1D entre deux segments"""
left = max(a1, b1)
right = min(a2, b2)
return max(0.0, right - left)
def _x_overlap_ratio(a, b) -> float:
"""Ratio d'overlap en X / min(width)"""
a_tuple = _bbox_to_tuple(a)
b_tuple = _bbox_to_tuple(b)
ax1, ax2 = a_tuple[0], _bbox_right(a)
bx1, bx2 = b_tuple[0], _bbox_right(b)
ov = _overlap_1d(ax1, ax2, bx1, bx2)
denom = max(1.0, min(a_tuple[2], b_tuple[2]))
return float(ov / denom)
def _y_overlap_ratio(a, b) -> float:
"""Ratio d'overlap en Y / min(height)"""
a_tuple = _bbox_to_tuple(a)
b_tuple = _bbox_to_tuple(b)
ay1, ay2 = a_tuple[1], _bbox_bottom(a)
by1, by2 = b_tuple[1], _bbox_bottom(b)
ov = _overlap_1d(ay1, ay2, by1, by2)
denom = max(1.0, min(a_tuple[3], b_tuple[3]))
return float(ov / denom)
def _baseline_distance(a, b) -> float:
"""Distance entre centres Y (utile label ↔ input sur une ligne)"""
return abs(_bbox_center(a)[1] - _bbox_center(b)[1])
# ---------------------------------------------------------------------------
# Fiche #8 - Anti-bugs terrain : Normalisation + Fuzzy matching
# ---------------------------------------------------------------------------
_WS_RE = re.compile(r"\s+")
def _norm_text(s: str) -> str:
"""Normalise le texte pour matching robuste (accents, casse, espaces, OCR)"""
if not s:
return ""
# Remplace NBSP & co + trim
s = s.replace("\u00A0", " ").strip().lower()
# Enlève accents
s = unicodedata.normalize("NFKD", s)
s = "".join(ch for ch in s if not unicodedata.combining(ch))
# Harmonise tirets/ponctuation courante
s = s.replace("-", "-").replace("", "-").replace("", "-")
s = _WS_RE.sub(" ", s)
# Petites confusions OCR (optionnel mais utile)
s = s.translate(str.maketrans({"|": "l"}))
return s
def _fuzzy_ratio(a: str, b: str) -> float:
"""Calcul de similarité fuzzy entre deux textes normalisés"""
a, b = _norm_text(a), _norm_text(b)
if not a or not b:
return 0.0
return SequenceMatcher(None, a, b).ratio()
class ResolutionStrategy(str, Enum):
"""Stratégies de résolution disponibles"""
BY_ROLE = "by_role"
BY_TEXT = "by_text"
BY_POSITION = "by_position"
BY_EMBEDDING = "by_embedding"
BY_HIERARCHY = "by_hierarchy"
BY_CONTEXT = "by_context"
BY_SPATIAL = "by_spatial" # Nouvelle stratégie (Exigence 5.3)
COMPOSITE = "composite"
class VisualState(str, Enum):
"""États visuels détectables (Exigence 5.5)"""
ENABLED = "enabled"
DISABLED = "disabled"
FOCUSED = "focused"
HOVERED = "hovered"
SELECTED = "selected"
CHECKED = "checked"
UNCHECKED = "unchecked"
LOADING = "loading"
ERROR = "error"
UNKNOWN = "unknown"
@dataclass
class ResolvedTarget:
"""Résultat d'une résolution de cible"""
element: UIElement
confidence: float
strategy_used: str
fallback_applied: bool = False
resolution_details: Dict[str, Any] = field(default_factory=dict)
alternatives: List["ResolvedTarget"] = field(default_factory=list)
visual_state: VisualState = VisualState.UNKNOWN # État visuel détecté (Exigence 5.5)
spatial_anchor: Optional[str] = None # ID de l'ancre spatiale utilisée
@dataclass
class ResolutionContext:
"""Contexte pour la résolution"""
screen_state: ScreenState
previous_target: Optional[UIElement] = None
workflow_context: Dict[str, Any] = field(default_factory=dict)
anchor_elements: List[UIElement] = field(default_factory=list)
class TargetResolver:
"""
Résolveur de cibles UI avancé.
Utilise plusieurs stratégies pour trouver l'élément UI correspondant
à une spécification de cible, avec fallback automatique.
Example:
>>> resolver = TargetResolver()
>>> target_spec = TargetSpec(by_role="button", by_text="Submit")
>>> result = resolver.resolve_target(target_spec, screen_state)
>>> if result:
... print(f"Found: {result.element.label} ({result.confidence:.2f})")
"""
def __init__(
self,
similarity_threshold: float = 0.75,
position_tolerance: int = 50,
text_fuzzy_threshold: float = 0.65,
use_embedding_fallback: bool = True,
use_spatial_fallback: bool = True, # Nouveau: fallback spatial (Exigence 5.4)
cache_size: int = 100,
metrics_engine: Optional[MetricsEngine] = None, # Fiche #10: Metrics Engine
error_handler: Optional[ErrorHandler] = None, # Gestion d'erreurs centralisée
enable_persistent_learning: bool = True, # Fiche #18: Apprentissage persistant
persistent_memory_path: str = "data/learning" # Fiche #18: Chemin mémoire persistante
):
"""
Initialiser le résolveur.
Args:
similarity_threshold: Seuil de similarité minimum
position_tolerance: Tolérance en pixels for by_position
text_fuzzy_threshold: Seuil pour matching texte fuzzy
use_embedding_fallback: Utiliser embeddings en fallback
use_spatial_fallback: Utiliser relations spatiales en fallback
cache_size: Taille du cache de résolutions
metrics_engine: Moteur métriques pour Fiche #10 (optionnel)
error_handler: Gestionnaire d'erreurs centralisé (optionnel)
enable_persistent_learning: Activer l'apprentissage persistant (Fiche #18)
persistent_memory_path: Chemin pour la mémoire persistante (Fiche #18)
"""
self.similarity_threshold = similarity_threshold
self.position_tolerance = position_tolerance
self.text_fuzzy_threshold = text_fuzzy_threshold
self.use_embedding_fallback = use_embedding_fallback
self.use_spatial_fallback = use_spatial_fallback
# Cache LRU pour résolutions récentes
self._cache: OrderedDict[str, ResolvedTarget] = OrderedDict()
self._cache_size = cache_size
# Fiche #13: Cache LRU d'index spatiaux (par screen_state)
self._index_cache: OrderedDict[str, SpatialIndexGrid] = OrderedDict()
self._index_cache_size = max(10, int(cache_size / 2))
# Fiche #14: Mémoire cross-frame LRU
self._cross_frame_cache: OrderedDict[str, TargetFingerprint] = OrderedDict()
self._cross_frame_cache_size = 200 # ajuste comme tu veux
# Tâche 5.4: Cache de calculs redondants
self._computation_cache = ComputationCache(max_size=500)
# Fiche #10: Metrics Engine intégration
self.metrics_engine = metrics_engine or get_global_metrics_engine()
# Fiche #10: Auto-healing attempt counter
self.healing_attempt = 0 # 0=normal, 1..n=secours
# Gestion d'erreurs centralisée
self.error_handler = error_handler
if self.error_handler is None and ERROR_HANDLER_AVAILABLE:
self.error_handler = ErrorHandler()
# Fiche #18: Apprentissage persistant
self.enable_persistent_learning = enable_persistent_learning
self._persistent_memory = None
if self.enable_persistent_learning and TARGET_MEMORY_AVAILABLE:
try:
self._persistent_memory = TargetMemoryStore(persistent_memory_path)
logger.info("Persistent learning enabled with TargetMemoryStore")
except Exception as e:
logger.warning(f"Failed to initialize persistent memory: {e}")
self.enable_persistent_learning = False
elif self.enable_persistent_learning:
logger.warning("Persistent learning requested but TargetMemoryStore not available")
self.enable_persistent_learning = False
# Stats
self._stats = {
"total_resolutions": 0,
"successful": 0,
"failed": 0,
"cache_hits": 0,
"fallbacks_used": 0,
"spatial_fallbacks": 0, # Nouveau compteur
"by_strategy": {}
}
# Embedding matcher (lazy load)
self._embedding_matcher = None
logger.info(f"TargetResolver initialized with sniper mode enabled, metrics_engine={'enabled' if self.metrics_engine else 'disabled'}")
# =========================================================================
# Fiche #10 - Auto-healing: Healing Profile System
# =========================================================================
def _healing_profile(self) -> Dict[str, Any]:
"""
Retourne un profil de tolérance selon healing_attempt.
Fiche #10: Progressive relaxation des critères de matching.
- Level 0 (Normal): Critères stricts
- Level 1 (First Healing): Critères relaxés
- Level 2+ (Desperate): Critères très relaxés
Returns:
Dict avec min_ratio, pad_mul, expand_roles
"""
a = int(getattr(self, "healing_attempt", 0) or 0)
if a <= 0:
# Normal: critères stricts
return {"min_ratio": 0.82, "pad_mul": 1.0, "expand_roles": False}
elif a == 1:
# First healing: critères relaxés
return {"min_ratio": 0.78, "pad_mul": 1.3, "expand_roles": True}
else:
# Desperate (a >= 2): critères très relaxés
return {"min_ratio": 0.72, "pad_mul": 1.7, "expand_roles": True}
# =========================================================================
# Fiche #14 - Screen signature + Cross-frame Target Memory
# =========================================================================
def _make_cross_key(self, target_spec, sig: str) -> str:
"""Clé de cache cross-frame optimisée"""
import hashlib
# Créer une représentation compacte des éléments clés
key_parts = [
sig,
str(getattr(target_spec, "by_role", None) or ""),
str(getattr(target_spec, "by_text", None) or ""),
str(getattr(target_spec, "by_position", None) or ""),
]
# Pour les objets complexes, utiliser leur hash
hints = getattr(target_spec, "context_hints", None)
if hints:
hints_str = str(sorted(hints.items())) if isinstance(hints, dict) else str(hints)
key_parts.append(hashlib.md5(hints_str.encode('utf-8')).hexdigest()[:8])
hard = getattr(target_spec, "hard_constraints", None)
if hard:
hard_str = str(sorted(hard.items())) if isinstance(hard, dict) else str(hard)
key_parts.append(hashlib.md5(hard_str.encode('utf-8')).hexdigest()[:8])
# Limiter la longueur totale de la clé
key = "|".join(key_parts)
if len(key) > 200: # Limite raisonnable
return hashlib.md5(key.encode('utf-8')).hexdigest()
return key
def _resolve_from_memory(self, target_spec, screen_state, ui_elements, context):
"""Résolution depuis mémoire cross-frame"""
# signature robuste: layout
sig = screen_signature(screen_state, ui_elements, mode="layout")
key = self._make_cross_key(target_spec, sig)
fp = self._cross_frame_cache.get(key)
if not fp:
return None
# 1) match direct par element_id (si stable)
by_id = {e.element_id: e for e in ui_elements}
if fp.element_id in by_id:
e = by_id[fp.element_id]
return ResolvedTarget(
element=e,
confidence=min(0.92 * float(getattr(e, "confidence", 1.0) or 1.0), 1.0),
strategy_used="CROSS_FRAME_CACHE",
resolution_details={"cache": "element_id", "screen_sig": sig}
)
# 2) sinon: search proche de l'ancienne bbox via index spatial (#13)
idx = context.workflow_context.get("spatial_index")
x, y, w, h = fp.bbox
cx = int(x + w / 2)
cy = int(y + h / 2)
# bbox de recherche élargie (tolérance)
pad = 120
query = (cx - pad, cy - pad, 2 * pad, 2 * pad)
if idx:
pool = idx.query_bbox(query)
else:
# Fallback: créer une recherche géométrique simple
pool = []
for e in ui_elements:
ex, ey = _bbox_center(e.bbox)
if abs(ex - cx) <= pad and abs(ey - cy) <= pad:
pool.append(e)
# score simple: rôle/type + distance bbox center + label (si dispo)
def center(b):
return (b[0] + b[2] / 2.0, b[1] + b[3] / 2.0)
fx, fy = center(fp.bbox)
best = None
best_s = -1e9
fp_role = (fp.role or "").lower()
fp_type = (fp.etype or "").lower()
fp_label = (fp.label or "").strip().lower()
for e in pool:
er = (getattr(e, "role", "") or "").lower()
et = (getattr(e, "type", "") or "").lower()
# gating léger
role_ok = (fp_role and er == fp_role) or (not fp_role)
type_ok = (fp_type and et == fp_type) or (not fp_type)
if not (role_ok or type_ok):
continue
ex, ey = center(e.bbox)
d = ((ex - fx) ** 2 + (ey - fy) ** 2) ** 0.5
s = 0.0
if er == fp_role:
s += 1.0
if et == fp_type:
s += 0.6
# bonus label exact si disponible
elbl = (getattr(e, "label", "") or "").strip().lower()
if fp_label and elbl and (fp_label == elbl or fp_label in elbl or elbl in fp_label):
s += 0.7
# pénalité distance
s -= (d / 120.0)
if s > best_s:
best_s = s
best = e
if not best:
return None
return ResolvedTarget(
element=best,
confidence=min(0.85 * float(getattr(best, "confidence", 1.0) or 1.0), 1.0),
strategy_used="CROSS_FRAME_CACHE",
resolution_details={"cache": "near_bbox", "screen_sig": sig, "score": round(best_s, 4)}
)
# =========================================================================
# Fiche #11 - Multi-anchor + contraintes combinées: Helper Methods
# =========================================================================
def _as_text_list(self, v):
"""
Convertit une valeur en liste de textes.
Args:
v: None, str, list, tuple ou autre
Returns:
List[str]: Liste de strings non-vides
"""
if v is None:
return []
if isinstance(v, (list, tuple)):
return [str(x) for x in v if x]
return [str(v)]
def _container_bbox_from_text(self, text: str, ui_elements):
"""
Trouve un conteneur par texte avec détection intelligente.
Process:
1. Trouver un élément portant ce texte (label/panel)
2. Si c'est déjà un panel/container, utiliser sa bbox
3. Sinon: plus petit container qui contient ce label
Args:
text: Texte à rechercher pour identifier le conteneur
ui_elements: Liste des éléments UI
Returns:
Optional[Tuple]: BBox du conteneur ou None si non trouvé
"""
# 1) Trouver un élément portant ce texte (label/panel)
anchor = self._find_element_by_text(text, ui_elements, min_ratio=self._healing_profile()["min_ratio"])
if not anchor:
return None
# 2) Si c'est déjà un panel/container, ok
r = (getattr(anchor, "role", "") or "").lower()
t = (getattr(anchor, "type", "") or "").lower()
if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}:
return anchor.bbox
# 3) Sinon: plus petit container qui contient ce label
containers = []
for e in ui_elements:
rr = (getattr(e, "role", "") or "").lower()
tt = (getattr(e, "type", "") or "").lower()
if rr in {"panel", "container", "group", "form"} or tt in {"panel", "container", "group", "form"}:
if _bbox_contains(e.bbox, anchor.bbox):
containers.append(e)
if not containers:
return None
# Retourner le plus petit container (plus spécifique)
containers.sort(key=lambda c: _bbox_area(c.bbox))
return containers[0].bbox
def _apply_hard_constraints(self, candidates, target_spec, ui_elements):
"""
Applique les contraintes strictes pour filtrer les candidats.
Args:
candidates: Liste des éléments candidats
target_spec: Spécification de la cible avec hard_constraints
ui_elements: Liste complète des éléments UI
Returns:
List[UIElement]: Candidats filtrés après application des contraintes
"""
hc = getattr(target_spec, "hard_constraints", None) or {}
filtered_candidates = list(candidates) # Copie pour éviter de modifier l'original
# Exemple: forcer un container identifié par texte
if "within_container_text" in hc and hc["within_container_text"]:
cb = self._container_bbox_from_text(str(hc["within_container_text"]), ui_elements)
if cb is not None:
before_count = len(filtered_candidates)
filtered_candidates = [e for e in filtered_candidates if _bbox_contains(cb, e.bbox)]
after_count = len(filtered_candidates)
logger.debug(f"Container constraint '{hc['within_container_text']}' filtered {before_count - after_count} candidates")
# Exemple: ignorer tiny elements (si tu veux)
if "min_area" in hc:
min_area = float(hc["min_area"])
before_count = len(filtered_candidates)
filtered_candidates = [e for e in filtered_candidates if _bbox_area(e.bbox) >= min_area]
after_count = len(filtered_candidates)
logger.debug(f"Min area constraint {min_area} filtered {before_count - after_count} candidates")
return filtered_candidates
# =========================================================================
# Fiche #6 - Sniper Mode : Scoring et Ranking
# =========================================================================
def _score_candidate_sniper_cached(
self,
elem,
base_score: float,
anchor_elem,
roi_bbox,
container_bbox,
hints=None,
ui_elements=None,
weights=None,
ui_analysis_cache=None,
distance_cache=None,
alignment_cache=None
) -> float:
"""
Sniper scoring avec cache pour éviter les calculs redondants (Tâche 8.1).
Auteur : Dom, Alice Kiro - 15 décembre 2024
Optimisations implémentées (Exigences 8.2, 8.4):
- Cache des analyses d'éléments UI entre ancres
- Cache des calculs de distance
- Cache des calculs d'alignement
- Réutilisation des calculs coûteux
Args:
elem: Élément candidat à scorer
base_score: Score de base (role/text/embedding)
anchor_elem: Élément ancre (optionnel)
roi_bbox: Zone d'intérêt (optionnel)
container_bbox: Container préféré (optionnel)
hints: Context hints pour alignement
ui_elements: Tous les éléments UI
weights: Pondérations personnalisées (optionnel)
ui_analysis_cache: Cache des analyses d'éléments UI
distance_cache: Cache des calculs de distance
alignment_cache: Cache des calculs d'alignement
Returns:
Score composite pondéré avec optimisations de cache
"""
# Initialiser les caches si non fournis
if ui_analysis_cache is None:
ui_analysis_cache = {}
if distance_cache is None:
distance_cache = {}
if alignment_cache is None:
alignment_cache = {}
# Fiche #11 - Tâche 5.1: Extraire les poids avec defaults
if weights is None:
weights = {}
# Poids par défaut (Exigences 3.1, 3.2, 3.3, 3.4)
w_proximity = float(weights.get("proximity", 0.35))
w_alignment = float(weights.get("alignment", 0.25))
w_container = float(weights.get("container", 0.15))
w_roi_iou = float(weights.get("roi_iou", 0.25))
# Normaliser les poids pour qu'ils somment à 1.0
total_weight = w_proximity + w_alignment + w_container + w_roi_iou
if total_weight > 0:
w_proximity /= total_weight
w_alignment /= total_weight
w_container /= total_weight
w_roi_iou /= total_weight
s = float(base_score)
# Tâche 8.1: Calculer les composants individuels avec cache (Exigence 8.4)
proximity_score = 0.0
alignment_score = 0.0
container_score = 0.0
roi_iou_score = 0.0
# Composant 1: Proximité à l'ancre avec cache (Exigence 8.2)
if anchor_elem is not None:
# Clé de cache pour la distance
distance_key = f"dist_{elem.element_id}_{anchor_elem.element_id}"
if distance_key in distance_cache:
distance = distance_cache[distance_key]
else:
# Calculer et mettre en cache
ax, ay = _bbox_center(anchor_elem.bbox)
ex, ey = _bbox_center(elem.bbox)
dx = ex - ax
dy = ey - ay
distance = (dx * dx + dy * dy) ** 0.5
distance_cache[distance_key] = distance
# Normalisation : 200px = zone "proche"
d0 = 200.0
proximity_score = 1.0 / (1.0 + (distance / d0)) # [~0..1]
# Composant 2: Alignement avec l'ancre avec cache (Exigence 8.4)
if anchor_elem is not None and hints:
# Clé de cache pour l'alignement
alignment_key = f"align_{elem.element_id}_{anchor_elem.element_id}_{hash(str(sorted(hints.items())))}"
if alignment_key in alignment_cache:
alignment_bonus = alignment_cache[alignment_key]
else:
# Calculer et mettre en cache
alignment_bonus = self._alignment_bonus(elem, anchor_elem, hints)
alignment_cache[alignment_key] = alignment_bonus
# Convertir le bonus multiplicatif en score [0..1]
alignment_score = min(1.0, max(0.0, (alignment_bonus - 1.0) / 0.2)) # Normaliser autour de 1.0-1.2
# Composant 3: Préférence container (Exigence 3.4)
if container_bbox is not None:
# Clé de cache pour l'analyse de containment
container_key = f"container_{elem.element_id}_{hash(str(container_bbox))}"
if container_key in ui_analysis_cache:
container_score = ui_analysis_cache[container_key]
else:
# Calculer et mettre en cache
if _bbox_contains(container_bbox, elem.bbox):
container_score = 1.0
else:
# Score partiel basé sur la proximité au container
elem_center = _bbox_center(elem.bbox)
container_center = _bbox_center(container_bbox)
dist_to_container = ((elem_center[0] - container_center[0]) ** 2 +
(elem_center[1] - container_center[1]) ** 2) ** 0.5
container_score = max(0.0, 1.0 - (dist_to_container / 300.0)) # Décroissance sur 300px
ui_analysis_cache[container_key] = container_score
# Composant 4: Intersection avec ROI (Exigence 3.1)
if roi_bbox is not None:
# Clé de cache pour l'IOU
roi_key = f"roi_{elem.element_id}_{hash(str(roi_bbox))}"
if roi_key in ui_analysis_cache:
roi_iou_score = ui_analysis_cache[roi_key]
else:
# Calculer et mettre en cache
roi_iou_score = _bbox_iou(elem.bbox, roi_bbox)
ui_analysis_cache[roi_key] = roi_iou_score
# Fiche #11 - Tâche 5.2: Scoring composite avec pondération légère
# Appliquer les pondérations configurables
weighted_bonus = (
w_proximity * proximity_score +
w_alignment * alignment_score +
w_container * container_score +
w_roi_iou * roi_iou_score
)
# Multiplier le score de base par un facteur pondéré (Exigence 3.5)
# Éviter de réécrire complètement le système existant
s *= (1.0 + 0.4 * weighted_bonus) # Bonus jusqu'à +40% basé sur les composants pondérés
# Bonus léger pour éléments "cliquables" (logique existante)
role = (getattr(elem, "role", "") or "").lower()
etype = (getattr(elem, "type", "") or "").lower()
if role in {"submit", "button", "textfield", "input", "form_input"} or etype in {"button", "text_input"}:
s *= 1.05
# Intègre la confiance (logique existante)
conf = float(getattr(elem, "confidence", 1.0) or 1.0)
s *= (0.75 + 0.25 * min(conf, 1.0))
# Fiche #7: Bonus "même petit container commun" avec cache (logique existante)
if anchor_elem and ui_elements:
# Clé de cache pour le container commun
common_container_key = f"common_{elem.element_id}_{anchor_elem.element_id}"
if common_container_key in ui_analysis_cache:
has_common_container = ui_analysis_cache[common_container_key]
else:
# Calculer et mettre en cache
common = self._smallest_common_container_bbox(anchor_elem, elem, ui_elements)
has_common_container = common is not None
ui_analysis_cache[common_container_key] = has_common_container
if has_common_container:
s *= 1.12
# Fiche #12: Bonus "same row/column" pour ranking avec cache
if hints and ui_elements:
hp = self._healing_profile()
# Cache pour les calculs de lignes (coûteux)
rows_key = f"rows_{hash(str([e.element_id for e in ui_elements]))}"
if rows_key in ui_analysis_cache:
rows, row_of = ui_analysis_cache[rows_key]
else:
rows, row_of = self._build_rows(ui_elements)
ui_analysis_cache[rows_key] = (rows, row_of)
# same_row_as_text avec cache
for txt in self._as_text_list(hints.get("same_row_as_text")):
row_anchor_key = f"row_anchor_{txt}_{hash(str([e.element_id for e in ui_elements]))}"
if row_anchor_key in ui_analysis_cache:
anchors = ui_analysis_cache[row_anchor_key]
else:
anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"])
ui_analysis_cache[row_anchor_key] = anchors
if anchors:
anchor = anchors[0]
if row_of.get(anchor.element_id) == row_of.get(elem.element_id):
s *= 1.15
break
# same_column_as_text (approx via X overlap) avec cache
def _x_overlap_ratio_local(b1, b2):
ax1, ax2 = b1[0], _bbox_right(b1)
bx1, bx2 = b2[0], _bbox_right(b2)
ov = max(0.0, min(ax2, bx2) - max(ax1, bx1))
denom = max(1.0, min(b1[2], b2[2]))
return ov / denom
for txt in self._as_text_list(hints.get("same_column_as_text")):
col_anchor_key = f"col_anchor_{txt}_{hash(str([e.element_id for e in ui_elements]))}"
if col_anchor_key in ui_analysis_cache:
anchors = ui_analysis_cache[col_anchor_key]
else:
anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"])
ui_analysis_cache[col_anchor_key] = anchors
if anchors:
anchor = anchors[0]
overlap_key = f"x_overlap_{elem.element_id}_{anchor.element_id}"
if overlap_key in ui_analysis_cache:
x_overlap = ui_analysis_cache[overlap_key]
else:
x_overlap = _x_overlap_ratio_local(elem.bbox, anchor.bbox)
ui_analysis_cache[overlap_key] = x_overlap
if x_overlap > 0.35:
s *= 1.10
break
return s
def _score_candidate_sniper(
self,
elem,
base_score: float,
anchor_elem,
roi_bbox,
container_bbox,
hints=None,
ui_elements=None,
weights=None
) -> float:
"""
Sniper scoring avec pondération configurable (Fiche #11 - Tâche 5).
Wrapper pour la compatibilité - utilise la version avec cache en interne.
Auteur : Dom, Alice Kiro - 15 décembre 2024
"""
# Utiliser la version avec cache pour bénéficier des optimisations
return self._score_candidate_sniper_cached(
elem=elem,
base_score=base_score,
anchor_elem=anchor_elem,
roi_bbox=roi_bbox,
container_bbox=container_bbox,
hints=hints,
ui_elements=ui_elements,
weights=weights,
# Caches temporaires pour cette invocation
ui_analysis_cache={},
distance_cache={},
alignment_cache={}
)
def _build_anchor_and_roi_and_container(self, target_spec, ui_elements):
"""
Déduit une ancre + une ROI + un container à partir des context_hints.
Fiche #8: Gère les labels dupliqués en choisissant le meilleur anchor.
- anchor: élément texte (label) le plus pertinent
- roi: zone autour/à côté/au-dessous de l'ancre
- container: un éventuel panel/container qui contient l'ancre
"""
hints = getattr(target_spec, "context_hints", None) or {}
anchor = None
roi = None
container_bbox = None
for key in ("below_text", "above_text", "right_of_text", "left_of_text", "near_text"):
if key in hints and hints[key]:
# Fiche #11: Support multi-anchor - convertir en liste de textes
anchor_texts = self._as_text_list(hints[key])
# Fiche #8: Trouve tous les anchors candidats pour tous les textes
# Fiche #10: Utiliser healing profile pour min_ratio
hp = self._healing_profile()
anchors = []
for text in anchor_texts:
anchors.extend(self._find_anchors_by_text(text, ui_elements, min_ratio=hp["min_ratio"]))
if not anchors:
continue
# Si un seul anchor, on le prend
if len(anchors) == 1:
anchor = anchors[0]
else:
# Fiche #8: Choisir le meilleur anchor selon le contexte
best_anchor = None
best_quality = -1.0
best_roi = None
best_container = None
for a in anchors:
# Calcule ROI et container pour cet anchor
x, y, w, h = a.bbox
# Fiche #10: Appliquer healing profile pour padding
hp = self._healing_profile()
pad_x = int(300 * hp["pad_mul"])
pad_y = int(250 * hp["pad_mul"])
if "below_text" in hints:
roi_candidate = (x - 20, y + h, w + pad_x, pad_y)
elif "above_text" in hints:
roi_candidate = (x - 20, y - pad_y, w + pad_x, pad_y)
elif "right_of_text" in hints:
roi_candidate = (x + w, y - 20, pad_x, h + 120)
elif "left_of_text" in hints:
roi_candidate = (x - pad_x, y - 20, pad_x, h + 120)
else:
roi_candidate = (x - 80, y - 80, w + 160, h + 160)
# Container pour cet anchor
containers = []
for e in ui_elements:
r = (getattr(e, "role", "") or "").lower()
t = (getattr(e, "type", "") or "").lower()
if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group"}:
if _bbox_contains(e.bbox, a.bbox):
containers.append(e)
container_candidate = None
if containers:
containers.sort(key=lambda c: _bbox_area(c.bbox))
container_candidate = containers[0].bbox
# "Qualité" = préfère l'ancre qui est dans un container petit
q = 0.0
if container_candidate is not None:
q += 1.0 / max(1.0, _bbox_area(container_candidate))
if roi_candidate is not None:
q += 0.1
if q > best_quality:
best_quality = q
best_anchor = a
best_roi = roi_candidate
best_container = container_candidate
anchor = best_anchor
roi = best_roi
container_bbox = best_container
if anchor:
break
# Si pas encore de ROI/container calculés (cas anchor unique)
if anchor and roi is None:
x, y, w, h = anchor.bbox
# Fiche #10: Appliquer healing profile pour padding
hp = self._healing_profile()
pad_x = int(300 * hp["pad_mul"])
pad_y = int(250 * hp["pad_mul"])
# selon l'indication, on oriente la ROI
if "below_text" in hints:
roi = (x - 20, y + h, w + pad_x, pad_y)
elif "above_text" in hints:
roi = (x - 20, y - pad_y, w + pad_x, pad_y)
elif "right_of_text" in hints:
roi = (x + w, y - 20, pad_x, h + 120)
elif "left_of_text" in hints:
roi = (x - pad_x, y - 20, pad_x, h + 120)
else:
roi = (x - 80, y - 80, w + 160, h + 160)
# Container (best-effort) si pas encore calculé
if anchor and container_bbox is None:
containers = []
for e in ui_elements:
r = (getattr(e, "role", "") or "").lower()
t = (getattr(e, "type", "") or "").lower()
if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group"}:
if _bbox_contains(e.bbox, anchor.bbox):
containers.append(e)
if containers:
containers.sort(key=lambda c: _bbox_area(c.bbox)) # le plus petit container
container_bbox = containers[0].bbox
return anchor, roi, container_bbox
def _find_element_by_text(self, text: str, ui_elements: List[UIElement], min_ratio: float = 0.65) -> Optional[UIElement]:
"""
Trouver un élément par son texte/label avec normalisation et fuzzy matching.
Fiche #8: Au lieu de "premier qui matche", on prend le meilleur score.
"""
target = _norm_text(text)
if not target:
return None
best = None
best_score = 0.0
for e in ui_elements:
label = _norm_text(getattr(e, "label", "") or "")
if not label:
continue
# Exact / contains = score fort
if label == target:
score = 1.0
elif target in label or label in target:
score = 0.92
else:
score = _fuzzy_ratio(label, target)
if score > best_score and score >= min_ratio:
best = e
best_score = score
return best
# =========================================================================
# Fiche #7 - Container et Form Logic
# =========================================================================
def _smallest_common_container_bbox(self, anchor, candidate, ui_elements):
"""Retourne la bbox du plus petit container qui contient anchor ET candidate."""
if not anchor or not candidate:
return None
containers = []
for e in ui_elements:
r = (getattr(e, "role", "") or "").lower()
t = (getattr(e, "type", "") or "").lower()
if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}:
if _bbox_contains(e.bbox, anchor.bbox) and _bbox_contains(e.bbox, candidate.bbox):
containers.append(e)
if not containers:
return None
containers.sort(key=lambda c: _bbox_area(c.bbox)) # plus petit = plus spécifique
return containers[0].bbox
def _alignment_bonus(self, elem, anchor, hints) -> float:
"""
Bonus multiplicatif selon la logique formulaire avec cache optimisé.
Tâche 5.4: Utilise le cache de calculs pour éviter les recalculs d'alignement.
- right_of_text: même ligne (Y proche) + un peu de recouvrement vertical
- below_text: même colonne visuelle (X overlap) + proche verticalement
"""
if not anchor or not hints:
return 1.0
bonus = 1.0
eb, ab = elem.bbox, anchor.bbox
# Utiliser le cache pour les calculs d'alignement
for hint_type in hints.keys():
if hint_type in ("right_of_text", "left_of_text", "below_text", "above_text", "near_text"):
alignment_score = self._computation_cache.get_alignment_score(
elem.element_id,
anchor.element_id,
hint_type,
lambda: self._compute_alignment_score(elem, anchor, hint_type)
)
bonus *= alignment_score
return bonus
def _compute_alignment_score(self, elem, anchor, hint_type: str) -> float:
"""
Calculer le score d'alignement pour un type de hint spécifique.
Args:
elem: Élément à évaluer
anchor: Élément ancre
hint_type: Type de hint spatial
Returns:
Score multiplicatif d'alignement
"""
eb, ab = elem.bbox, anchor.bbox
# Normalisation : seuils tolérants (UI réelles = jamais parfaites)
baseline_ok = _baseline_distance(eb, ab) <= 25.0
y_ov = _y_overlap_ratio(eb, ab)
x_ov = _x_overlap_ratio(eb, ab)
bonus = 1.0
if hint_type in ("right_of_text", "left_of_text"):
if baseline_ok:
bonus *= 1.20
if y_ov > 0.3:
bonus *= 1.10
elif hint_type in ("below_text", "above_text"):
if x_ov > 0.25:
bonus *= 1.18
# petit bonus si l'élément est "pas trop loin"
dy = abs(_bbox_center(eb)[1] - _bbox_center(ab)[1])
if dy <= 180:
bonus *= 1.06
elif hint_type == "near_text":
# si c'est proche ET aligné un minimum, bonus
if baseline_ok or x_ov > 0.25 or y_ov > 0.3:
bonus *= 1.08
return bonus
def _is_interactable(self, elem: UIElement, screen_state: ScreenState) -> bool:
"""
Vérifie si un élément est interactable (pas hidden/disabled/offscreen).
Fiche #8: Filtre les éléments non-cliquables pour éviter les faux positifs.
"""
# bbox XYWH
x, y, w, h = elem.bbox
if w <= 2 or h <= 2:
return False
# Screen bounds (best effort)
if self.error_handler:
# Utiliser ErrorHandler pour gérer les erreurs d'accès aux propriétés
try:
sw, sh = screen_state.window.screen_resolution
except Exception as e:
context_data = {
'screen_state': screen_state,
'details': {'operation': 'get_screen_resolution'},
'original_data': {'window': screen_state.window}
}
recovery_result = self.error_handler.handle_error(e, context_data)
if recovery_result.success and 'screen_resolution' in recovery_result.recovery_data:
sw, sh = recovery_result.recovery_data['screen_resolution']
else:
sw, sh = None, None
else:
# Fallback au comportement original
try:
sw, sh = screen_state.window.screen_resolution
except Exception:
sw, sh = None, None
if sw and sh:
r = x + w
b = y + h
# totalement hors écran (plus strict)
if x < 0 or y < 0 or x >= sw or y >= sh:
return False
# Tags / metadata (si présents)
tags = set((getattr(elem, "tags", None) or []))
meta = getattr(elem, "metadata", None) or {}
if "hidden" in tags or "disabled" in tags:
return False
if meta.get("visible") is False or meta.get("enabled") is False:
return False
return True
def _find_anchors_by_text(self, text: str, ui_elements: List[UIElement], min_ratio: float = 0.65) -> List[UIElement]:
"""
Trouve tous les anchors candidats pour un texte donné.
Fiche #8: Gère les labels dupliqués en retournant tous les candidats.
"""
target = _norm_text(text)
out = []
for e in ui_elements:
label = _norm_text(getattr(e, "label", "") or "")
if not label:
continue
if label == target or target in label or label in target or _fuzzy_ratio(label, target) >= min_ratio:
out.append(e)
return out
# =========================================================================
# Fiche #12 - Form Rows/Columns: Helpers pour grouping et association label→champ
# =========================================================================
def _y_center(self, bbox) -> float:
"""Centre Y d'une bbox (x, y, w, h)"""
bbox_tuple = _bbox_to_tuple(bbox)
return float(bbox_tuple[1] + bbox_tuple[3] / 2)
def _x_center(self, bbox) -> float:
"""Centre X d'une bbox (x, y, w, h)"""
bbox_tuple = _bbox_to_tuple(bbox)
return float(bbox_tuple[0] + bbox_tuple[2] / 2)
def _is_inputish(self, elem: UIElement) -> bool:
"""
Vérifie si un élément est un champ de saisie.
Auteur : Dom, Alice Kiro - 19 décembre 2024
"""
role = (getattr(elem, "role", "") or "").lower()
etype = (getattr(elem, "type", "") or "").lower()
return (role in {"input", "textfield", "text_field", "form_input", "textbox", "edit"} or
etype in {"text_input", "input", "textfield"})
def _is_labelish(self, elem: UIElement) -> bool:
"""
Vérifie si un élément est un label.
Auteur : Dom, Alice Kiro - 19 décembre 2024
"""
role = (getattr(elem, "role", "") or "").lower()
etype = (getattr(elem, "type", "") or "").lower()
return (role in {"label", "text", "data_display"} or etype in {"label"})
def _build_rows(self, elements: List[UIElement], y_tol: float = 18.0):
"""
Groupe les éléments en lignes selon la proximité du centre Y.
Auteur : Dom, Alice Kiro - 19 décembre 2024
Args:
elements: Liste des éléments UI à grouper
y_tol: Tolérance en pixels pour considérer deux éléments sur la même ligne
Returns:
Tuple[List[List[UIElement]], Dict[str, int]]:
- rows: liste de lignes (chaque ligne = liste d'éléments)
- row_of: mapping element_id -> index de ligne
"""
# Trier par centre Y
els = sorted(elements, key=lambda e: self._y_center(e.bbox))
rows = []
row_of = {}
for elem in els:
placed = False
ey = self._y_center(elem.bbox)
for idx, row in enumerate(rows):
# Centre Y moyen de la ligne
ry = sum(self._y_center(x.bbox) for x in row) / max(1, len(row))
if abs(ey - ry) <= y_tol:
row.append(elem)
row_of[elem.element_id] = idx
placed = True
break
if not placed:
rows.append([elem])
row_of[elem.element_id] = len(rows) - 1
# Trier chaque ligne par X (gauche à droite)
for row in rows:
row.sort(key=lambda e: _bbox_to_tuple(e.bbox)[0])
return rows, row_of
# =========================================================================
# Fiche #11 - Multi-anchor + Hard Constraints Helpers
# =========================================================================
def _as_text_list(self, v):
"""
Convertit une valeur en liste de textes.
Fiche #11: Helper pour gérer les listes d'ancres alternatives.
Args:
v: str, list, tuple, ou None
Returns:
List[str]: Liste de textes non-vides
"""
if v is None:
return []
if isinstance(v, (list, tuple)):
return [str(x) for x in v if x]
return [str(v)]
def _container_bbox_from_text(self, text: str, ui_elements):
"""
Trouve un container à partir d'un texte.
Fiche #11: Helper pour identifier les containers par nom/label.
Args:
text: Texte à chercher (nom du container)
ui_elements: Liste des éléments UI
Returns:
bbox du container ou None si non trouvé
"""
# 1) Trouver un élément portant ce texte (label/panel)
anchor = self._find_element_by_text(text, ui_elements, min_ratio=self._healing_profile()["min_ratio"])
if not anchor:
return None
# 2) Si c'est déjà un panel/container, ok
r = (getattr(anchor, "role", "") or "").lower()
t = (getattr(anchor, "type", "") or "").lower()
if r in {"panel", "container", "group", "form"} or t in {"panel", "container", "group", "form"}:
return anchor.bbox
# 3) Sinon: plus petit container qui contient ce label
containers = []
for e in ui_elements:
rr = (getattr(e, "role", "") or "").lower()
tt = (getattr(e, "type", "") or "").lower()
if rr in {"panel", "container", "group", "form"} or tt in {"panel", "container", "group", "form"}:
if _bbox_contains(e.bbox, anchor.bbox):
containers.append(e)
if not containers:
return None
containers.sort(key=lambda c: _bbox_area(c.bbox))
return containers[0].bbox
def _apply_hard_constraints(self, candidates, target_spec, ui_elements):
"""
Applique les contraintes strictes aux candidats.
Fiche #11: Filtre strict selon hard_constraints.
Args:
candidates: Liste des candidats actuels
target_spec: TargetSpec avec hard_constraints
ui_elements: Tous les éléments UI
Returns:
Liste filtrée des candidats
"""
hc = getattr(target_spec, "hard_constraints", None) or {}
# Exemple: forcer un container identifié par texte
if "within_container_text" in hc and hc["within_container_text"]:
cb = self._container_bbox_from_text(str(hc["within_container_text"]), ui_elements)
if cb is not None:
candidates = [e for e in candidates if _bbox_contains(cb, e.bbox)]
logger.debug(f"Hard constraint within_container_text '{hc['within_container_text']}': {len(candidates)} candidates remaining")
# Exemple: ignorer tiny elements
if "min_area" in hc:
min_area = float(hc["min_area"])
candidates = [e for e in candidates if _bbox_area(e.bbox) >= min_area]
logger.debug(f"Hard constraint min_area {min_area}: {len(candidates)} candidates remaining")
return candidates
def _evaluate_all_anchor_candidate_combinations(
self,
target_spec: TargetSpec,
candidates: List[UIElement],
ui_elements: List[UIElement],
scores: Dict[str, float],
hints: Dict[str, Any],
healing_profile: Dict[str, Any]
) -> Dict[str, Any]:
"""
Évaluer toutes les combinaisons ancre-candidat et retourner la meilleure.
Fiche #11 - Tâche 7.1: Pour chaque ancre, scorer tous les candidats après contraintes
et garder la trace de la meilleure combinaison (score, ancre, candidat).
Tâche 8.1: Réutilisation de l'analyse des éléments UI (Exigences 8.2, 8.4)
- Éviter de re-analyser les mêmes éléments entre les ancres
- Réutiliser les calculs de distance et d'alignement
Auteur : Dom, Alice Kiro - 15 décembre 2024
Args:
target_spec: Spécification de la cible avec context_hints et weights
candidates: Liste des candidats après filtrage initial
ui_elements: Tous les éléments UI disponibles
scores: Scores de base par element_id
hints: Context hints extraits de target_spec
healing_profile: Profil de healing actuel
Returns:
Dict contenant la meilleure combinaison:
- element: Meilleur élément sélectionné
- anchor: Ancre utilisée (peut être None)
- score: Score final de la combinaison
- top3: Liste des 3 meilleurs candidats avec scores
- tie_break_criterion: Critère utilisé pour le tie-breaking
- anchor_id: ID de l'ancre utilisée
- performance_metrics: Métriques de performance
"""
start_time = time.perf_counter()
# Tâche 8.1: Cache pour réutiliser les analyses entre ancres (Exigence 8.2)
ui_analysis_cache = {} # Cache des analyses d'éléments UI
distance_cache = {} # Cache des calculs de distance
alignment_cache = {} # Cache des calculs d'alignement
container_cache = {} # Cache des résolutions de conteneur
# 1) Construire une liste de textes d'ancrage possibles (multi-anchor)
anchor_texts = []
for k in ("near_text", "below_text", "above_text", "right_of_text", "left_of_text"):
anchor_texts += self._as_text_list(hints.get(k))
logger.debug(f"Multi-anchor evaluation: {len(anchor_texts)} anchor texts found: {anchor_texts}")
# 2) Trouver tous les candidats d'ancrage (Exigence 1.2)
anchor_candidates = []
for txt in anchor_texts:
found_anchors = self._find_anchors_by_text(txt, ui_elements, min_ratio=healing_profile["min_ratio"])
anchor_candidates.extend(found_anchors)
logger.debug(f"Anchor text '{txt}': found {len(found_anchors)} candidates")
# Gérer le cas où aucune ancre n'est trouvée (fallback à None) - Exigence 1.4
if not anchor_candidates:
anchor_candidates = [None]
logger.debug("No anchors found, using anchor-less resolution")
# Variables pour tracker la meilleure combinaison
best_element = None
best_score = -1.0
best_anchor = None
best_details = None
best_tie_break_criterion = "score"
# Métriques de performance avec cache tracking
combinations_evaluated = 0
anchor_evaluations = 0
cache_hits = 0
cache_misses = 0
# 3) Évaluer chaque combinaison ancre-candidat (Exigence 1.2, 8.2)
for anchor in anchor_candidates:
anchor_evaluations += 1
# Construire ROI et container pour cette ancre avec cache (Exigence 8.3)
roi_bbox, container_bbox = None, None
if anchor is not None:
# Cache key pour cette ancre
anchor_cache_key = f"anchor_{anchor.element_id}"
if anchor_cache_key in container_cache:
# Réutiliser les calculs précédents (Exigence 8.2)
roi_bbox, container_bbox = container_cache[anchor_cache_key]
cache_hits += 1
logger.debug(f"Cache hit for anchor {anchor.element_id}")
else:
# Calculer et mettre en cache
try:
_, roi_bbox, container_bbox = self._build_anchor_and_roi_and_container(target_spec, ui_elements)
container_cache[anchor_cache_key] = (roi_bbox, container_bbox)
cache_misses += 1
except Exception as e:
logger.debug(f"Failed to build ROI/container for anchor {anchor.element_id}: {e}")
roi_bbox, container_bbox = None, None
container_cache[anchor_cache_key] = (None, None)
cache_misses += 1
# Appliquer les contraintes strictes une fois par ancre (Exigence 8.2)
anchor_candidates_filtered = self._apply_hard_constraints(candidates, target_spec, ui_elements)
if not anchor_candidates_filtered:
logger.debug(f"No candidates remaining after hard constraints for anchor {anchor.element_id if anchor else 'None'}")
continue
# Scoring pondéré pour tous les candidats avec cette ancre
weights = getattr(target_spec, "weights", None) or {}
sniper_scores = {}
for candidate in anchor_candidates_filtered:
combinations_evaluated += 1
# Tâche 8.1: Utiliser le scoring avec cache pour éviter les calculs redondants (Exigence 8.4)
score = self._score_candidate_sniper_cached(
candidate,
base_score=scores[candidate.element_id],
anchor_elem=anchor,
roi_bbox=roi_bbox,
container_bbox=container_bbox,
hints=hints,
ui_elements=ui_elements,
weights=weights,
# Caches pour réutilisation
ui_analysis_cache=ui_analysis_cache,
distance_cache=distance_cache,
alignment_cache=alignment_cache
)
sniper_scores[candidate.element_id] = score
# Sélection avec tie-breaking stable (Tâche 6)
chosen, tie_break_criterion = self._select_best_candidate_with_tiebreak(
anchor_candidates_filtered, sniper_scores
)
if not chosen:
continue
chosen_score = sniper_scores[chosen.element_id]
# Vérifier si c'est la meilleure combinaison jusqu'à présent
if chosen_score > best_score:
best_score = chosen_score
best_element = chosen
best_anchor = anchor
best_tie_break_criterion = tie_break_criterion
# Créer top3 avec les scores actuels
scored_candidates = [(c, sniper_scores[c.element_id]) for c in anchor_candidates_filtered]
scored_candidates.sort(key=lambda x: x[1], reverse=True)
top3 = [x[0] for x in scored_candidates[:3]]
best_details = [
{"id": t.element_id, "score": round(sniper_scores[t.element_id], 4)}
for t in top3
]
logger.debug(f"New best combination: anchor={anchor.element_id if anchor else 'None'}, "
f"element={chosen.element_id}, score={chosen_score:.4f}")
# Calculer les métriques de performance avec cache stats
duration_ms = (time.perf_counter() - start_time) * 1000
performance_metrics = {
"evaluation_duration_ms": round(duration_ms, 2),
"combinations_evaluated": combinations_evaluated,
"anchor_evaluations": anchor_evaluations,
"anchors_attempted": len(anchor_candidates),
"anchor_texts": anchor_texts,
# Tâche 8.1: Métriques de cache (Exigence 8.5)
"cache_hits": cache_hits,
"cache_misses": cache_misses,
"cache_hit_ratio": round(cache_hits / max(1, cache_hits + cache_misses), 3),
"ui_analysis_cache_size": len(ui_analysis_cache),
"distance_cache_size": len(distance_cache),
"alignment_cache_size": len(alignment_cache),
"container_cache_size": len(container_cache)
}
logger.debug(f"Multi-anchor evaluation completed: {combinations_evaluated} combinations in {duration_ms:.2f}ms "
f"(cache hit ratio: {performance_metrics['cache_hit_ratio']:.1%})")
return {
"element": best_element,
"anchor": best_anchor,
"score": best_score,
"top3": best_details or [],
"tie_break_criterion": best_tie_break_criterion,
"anchor_id": best_anchor.element_id if best_anchor else None,
"performance_metrics": performance_metrics
}
def _create_stable_sort_key(self, element: UIElement, score: float) -> Tuple[float, float, float, str]:
"""
Créer une clé de tri stable pour le tie-breaking (Fiche #11 - Tâche 6).
Auteur : Dom, Alice Kiro - 15 décembre 2024
Critères de tri (dans l'ordre de priorité):
1. Score composite (décroissant)
2. Confiance de l'élément (décroissant)
3. Aire de l'élément (décroissant - plus grand = plus visible)
4. ID de l'élément (croissant - pour stabilité)
Args:
element: Élément UI à évaluer
score: Score composite calculé
Returns:
Tuple de critères pour tri stable (Exigences 5.1, 5.2, 5.3, 5.4)
"""
# Critère 1: Score composite (négatif pour tri décroissant)
sort_score = -float(score)
# Critère 2: Confiance (négatif pour tri décroissant)
confidence = float(getattr(element, "confidence", 1.0) or 1.0)
sort_confidence = -confidence
# Critère 3: Aire (négatif pour tri décroissant - préférer les plus grands)
area = _bbox_area(element.bbox)
sort_area = -area
# Critère 4: ID élément (croissant pour déterminisme)
element_id = str(getattr(element, "element_id", id(element)))
return (sort_score, sort_confidence, sort_area, element_id)
def _select_best_candidate_with_tiebreak(self, candidates: List[UIElement], scores: Dict[str, float]) -> Tuple[UIElement, str]:
"""
Sélectionner le meilleur candidat avec tie-breaking stable (Fiche #11 - Tâche 6).
Auteur : Dom, Alice Kiro - 15 décembre 2024
Args:
candidates: Liste des candidats
scores: Dictionnaire des scores par element_id
Returns:
Tuple (meilleur_élément, critère_tie_break_utilisé)
"""
if not candidates:
return None, "no_candidates"
if len(candidates) == 1:
return candidates[0], "single_candidate"
# Créer les clés de tri pour tous les candidats
candidate_keys = []
for elem in candidates:
score = scores.get(elem.element_id, 0.0)
sort_key = self._create_stable_sort_key(elem, score)
candidate_keys.append((elem, sort_key, score))
# Trier par clé stable (Exigence 5.4, 5.5)
candidate_keys.sort(key=lambda x: x[1])
# Prendre le meilleur
best_elem, best_key, best_score = candidate_keys[0]
# Déterminer quel critère a été utilisé pour le tie-break
tie_break_criterion = "score"
# Vérifier s'il y a égalité sur le score
score_ties = [x for x in candidate_keys if abs(x[2] - best_score) < 1e-6]
if len(score_ties) > 1:
# Il y a égalité sur le score, vérifier les autres critères
confidence_ties = [x for x in score_ties if abs(x[1][1] - best_key[1]) < 1e-6]
if len(confidence_ties) > 1:
# Égalité sur score ET confiance
area_ties = [x for x in confidence_ties if abs(x[1][2] - best_key[2]) < 1e-6]
if len(area_ties) > 1:
# Égalité sur score, confiance ET aire
tie_break_criterion = "element_id"
else:
tie_break_criterion = "area"
else:
tie_break_criterion = "confidence"
logger.debug(f"Selected element {best_elem.element_id} with tie-break criterion: {tie_break_criterion}")
return best_elem, tie_break_criterion
# =========================================================================
# Résolution principale
# =========================================================================
def resolve_target(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
context: Optional[ResolutionContext] = None
) -> Optional[ResolvedTarget]:
"""
Résoudre une cible selon sa spécification.
Args:
target_spec: Spécification de la cible
screen_state: État actuel de l'écran
context: Contexte additionnel (optionnel)
Returns:
ResolvedTarget ou None si non trouvé
"""
# Fiche #10: Démarrage mesure performance
start_time = time.perf_counter()
self._stats["total_resolutions"] += 1
# Vérifier le cache
cache_key = self._make_cache_key(target_spec, screen_state)
if cache_key in self._cache:
self._stats["cache_hits"] += 1
return self._cache[cache_key]
# Créer le contexte si non fourni
if context is None:
context = ResolutionContext(screen_state=screen_state)
# Obtenir les éléments UI et filtrer les non-interactifs (Fiche #8)
all_elements = self._get_ui_elements(screen_state)
ui_elements = [e for e in all_elements if self._is_interactable(e, screen_state)]
if not ui_elements:
logger.warning("No interactable UI elements available for resolution")
self._stats["failed"] += 1
return None
# Fiche #13: Index spatial (perf) - disponible dans le contexte
context.workflow_context["spatial_index"] = self._get_spatial_index(screen_state, ui_elements)
# Fiche #18: Lookup depuis mémoire persistante (avant résolution coûteuse)
persistent_result = self._lookup_from_persistent_memory(target_spec, screen_state, ui_elements)
if persistent_result is not None:
# Mise en cache et stats
self._cache[cache_key] = persistent_result
if len(self._cache) > self._cache_size:
self._cache.popitem(last=False)
self._stats["successful"] += 1
logger.info(f"Resolved target from persistent memory: {persistent_result.element.element_id}")
return persistent_result
# Fiche #14: Résolution depuis mémoire cross-frame (avant scoring coûteux)
mem = self._resolve_from_memory(target_spec, screen_state, ui_elements, context)
if mem is not None:
# Mise en cache et stats
self._cache[cache_key] = mem
if len(self._cache) > self._cache_size:
self._cache.popitem(last=False)
self._stats["successful"] += 1
return mem
# Fiche #12: Shortcut field_for (label -> field) - priorité absolue
result = self._resolve_field_for(target_spec, screen_state, ui_elements, context)
if result is not None:
# Mise en cache et stats
self._cache[cache_key] = result
if len(self._cache) > self._cache_size:
self._cache.popitem(last=False)
self._stats["successful"] += 1
# Fiche #14: Enregistrer en mémoire cross-frame
self._record_in_cross_frame_memory(target_spec, screen_state, ui_elements, result.element)
return result
# Essayer les stratégies dans l'ordre de priorité
result = None
fallback_applied = False
# 1. Stratégie composite si plusieurs critères
if self._is_composite_spec(target_spec):
result = self._resolve_composite(target_spec, ui_elements, context)
# 2. By Role (priorité haute)
if not result and target_spec.by_role:
result = self._resolve_by_role(
target_spec.by_role,
target_spec.selection_policy or "first",
ui_elements,
context
)
# 3. By Text
if not result and target_spec.by_text:
result = self._resolve_by_text(target_spec.by_text, ui_elements, context)
if result:
fallback_applied = target_spec.by_role is not None
# 4. By Position
if not result and target_spec.by_position:
result = self._resolve_by_position(
target_spec.by_position,
ui_elements,
context
)
if result:
fallback_applied = True
# 5. By Embedding (fallback avancé)
if not result and self.use_embedding_fallback and hasattr(target_spec, 'embedding_ref'):
result = self._resolve_by_embedding(target_spec, ui_elements, context)
if result:
fallback_applied = True
# 6. By Context (dernier recours)
if not result and hasattr(target_spec, 'context_hints'):
result = self._resolve_by_context(target_spec, ui_elements, context)
if result:
fallback_applied = True
# Fiche #10: Collecte métriques résolution
duration_ms = (time.perf_counter() - start_time) * 1000
if self.metrics_engine and METRICS_AVAILABLE:
if self.error_handler:
# Utiliser ErrorHandler pour gérer les erreurs de métriques
try:
self.metrics_engine.record_resolution(
target_spec=target_spec,
result=result,
duration_ms=duration_ms,
screen_state=screen_state
)
except Exception as e:
# Déléguer à ErrorHandler
context_data = {
'screen_state': screen_state,
'details': {'operation': 'record_resolution_metrics'},
'original_data': {'target_spec': target_spec, 'duration_ms': duration_ms}
}
recovery_result = self.error_handler.handle_error(e, context_data)
if not recovery_result.success:
logger.debug(f"Failed to record resolution metrics: {e}")
else:
# Fallback au comportement original
try:
self.metrics_engine.record_resolution(
target_spec=target_spec,
result=result,
duration_ms=duration_ms,
screen_state=screen_state
)
except Exception as e:
logger.debug(f"Failed to record resolution metrics: {e}")
# Mettre à jour les stats
if result:
result.fallback_applied = fallback_applied
self._stats["successful"] += 1
strategy = result.strategy_used
self._stats["by_strategy"][strategy] = self._stats["by_strategy"].get(strategy, 0) + 1
if fallback_applied:
self._stats["fallbacks_used"] += 1
# Mettre en cache
self._add_to_cache(cache_key, result)
# Fiche #14: Enregistrer en mémoire cross-frame
self._record_in_cross_frame_memory(target_spec, screen_state, ui_elements, result.element)
else:
self._stats["failed"] += 1
logger.debug(f"Failed to resolve target: {target_spec}")
return result
# =========================================================================
# Stratégies de résolution
# =========================================================================
def _resolve_by_role(
self,
role: str,
selection_policy: str,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""
Résoudre par rôle sémantique avec support healing.
Fiche #10: Intègre le healing profile pour expansion des aliases.
"""
# Fiche #10: Récupérer le profil de healing
hp = self._healing_profile()
role_lower = role.lower().strip()
# Fiche #10: Construire la liste des rôles à chercher
wanted_roles = {role_lower}
if hp["expand_roles"]:
# Expansion avec ROLE_ALIASES si healing actif
wanted_roles |= ROLE_ALIASES.get(role_lower, set())
# Filtrer les candidats
candidates = []
for elem in ui_elements:
elem_role = (getattr(elem, 'role', '') or "").lower().strip()
elem_type = (getattr(elem, 'type', '') or "").lower().strip()
# Vérifier correspondance role
ok_role = elem_role in wanted_roles
# Fiche #10: Bonus - si le rôle est trop "flou", accepter aussi par type (en healing)
if not ok_role and hp["expand_roles"]:
for k, types in TYPE_ALIASES.items():
if role_lower == k and elem_type in types:
ok_role = True
break
if ok_role:
candidates.append(elem)
if not candidates:
return None
# Appliquer la politique de sélection
selected = self._apply_selection_policy(candidates, selection_policy, context)
if selected:
return ResolvedTarget(
element=selected,
confidence=selected.confidence,
strategy_used=ResolutionStrategy.BY_ROLE.value,
resolution_details={
"role_searched": role,
"candidates_found": len(candidates),
"policy_applied": selection_policy,
# Fiche #10: Ajouter healing metadata
"healing_attempt": int(getattr(self, "healing_attempt", 0) or 0),
"healing_profile": hp,
"role_aliases_used": list(wanted_roles) if hp["expand_roles"] else [role_lower]
},
alternatives=[
ResolvedTarget(e, e.confidence, ResolutionStrategy.BY_ROLE.value)
for e in candidates if e != selected
][:3] # Max 3 alternatives
)
return None
def _resolve_by_text(
self,
text: str,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""Résoudre par texte (exact, partiel, fuzzy)."""
text_lower = text.lower().strip()
# 1. Match exact
for elem in ui_elements:
if elem.label and elem.label.strip().lower() == text_lower:
return ResolvedTarget(
element=elem,
confidence=elem.confidence,
strategy_used=ResolutionStrategy.BY_TEXT.value,
resolution_details={"match_type": "exact", "text": text}
)
# 2. Match partiel (contient)
partial_matches = []
for elem in ui_elements:
if elem.label and text_lower in elem.label.strip().lower():
partial_matches.append((elem, 0.9))
elif elem.label and elem.label.strip().lower() in text_lower:
partial_matches.append((elem, 0.85))
if partial_matches:
# Trier par confiance
partial_matches.sort(key=lambda x: x[1] * x[0].confidence, reverse=True)
best = partial_matches[0]
return ResolvedTarget(
element=best[0],
confidence=best[1] * best[0].confidence,
strategy_used=ResolutionStrategy.BY_TEXT.value,
resolution_details={"match_type": "partial", "text": text}
)
# 3. Match fuzzy (Levenshtein) - Fiche #10: Utiliser healing profile
hp = self._healing_profile()
fuzzy_threshold = hp["min_ratio"] # Utiliser le seuil du healing profile
fuzzy_matches = []
for elem in ui_elements:
if elem.label:
similarity = self._fuzzy_match(text_lower, elem.label.strip().lower())
if similarity >= fuzzy_threshold:
fuzzy_matches.append((elem, similarity))
if fuzzy_matches:
fuzzy_matches.sort(key=lambda x: x[1], reverse=True)
best = fuzzy_matches[0]
return ResolvedTarget(
element=best[0],
confidence=best[1] * best[0].confidence,
strategy_used=ResolutionStrategy.BY_TEXT.value,
resolution_details={
"match_type": "fuzzy",
"similarity": best[1],
"text": text,
# Fiche #10: Ajouter healing metadata
"healing_attempt": int(getattr(self, "healing_attempt", 0) or 0),
"healing_profile": hp,
"fuzzy_threshold_used": fuzzy_threshold
}
)
return None
def _resolve_by_position(
self,
position: Tuple[int, int],
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""
Résoudre par position avec index spatial optimisé.
Fiche #13: Optimisation O(n) → O(k) avec SpatialIndexGrid.
Auteur : Dom, Alice Kiro - 19 décembre 2024
"""
x, y = position
idx: SpatialIndexGrid = context.workflow_context.get("spatial_index")
# 1) Contain strict (dans la cellule)
hits = idx.query_point(x, y) if idx else [e for e in ui_elements if _bbox_contains_point(e.bbox, x, y)]
if hits:
# Tie-break stable: plus petit (souvent le contrôle) puis confiance
hits.sort(key=lambda e: (_bbox_area(e.bbox), -e.confidence, e.element_id))
elem = hits[0]
return ResolvedTarget(
element=elem,
confidence=elem.confidence * 0.95,
strategy_used=ResolutionStrategy.BY_POSITION.value,
resolution_details={"match_type": "contains", "position": position}
)
# 2) Nearest dans la tolérance (query bbox autour du point)
tol = self.position_tolerance
box = (x - tol, y - tol, 2 * tol, 2 * tol)
candidates = idx.query_bbox(box) if idx else ui_elements
best = None
best_dist = 10**18
best_score = 0.0
for elem in candidates:
cx, cy = _bbox_center(elem.bbox)
dist = ((x - cx) ** 2 + (y - cy) ** 2) ** 0.5
if dist <= tol and dist < best_dist:
best_dist = dist
best = elem
best_score = 1.0 - (dist / tol)
if best:
return ResolvedTarget(
element=best,
confidence=best.confidence * best_score * 0.8,
strategy_used=ResolutionStrategy.BY_POSITION.value,
resolution_details={"match_type": "nearest", "distance": best_dist, "position": position}
)
return None
def _resolve_by_embedding(
self,
target_spec: TargetSpec,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""Résoudre par similarité d'embedding visuel."""
# Cette méthode nécessite un embedding de référence
if not hasattr(target_spec, 'embedding_ref') or not target_spec.embedding_ref:
return None
# Lazy load du matcher
if self._embedding_matcher is None:
if self.error_handler:
# Utiliser ErrorHandler pour gérer les erreurs d'import
try:
from ..embedding.fusion_engine import FusionEngine
self._embedding_matcher = FusionEngine()
except ImportError as e:
context_data = {
'details': {'operation': 'import_fusion_engine'},
'original_data': {'module': 'fusion_engine'}
}
recovery_result = self.error_handler.handle_error(e, context_data)
if not recovery_result.success:
logger.warning("FusionEngine not available for embedding matching")
return None
else:
# Fallback au comportement original
try:
from ..embedding.fusion_engine import FusionEngine
self._embedding_matcher = FusionEngine()
except ImportError:
logger.warning("FusionEngine not available for embedding matching")
return None
# Comparer les embeddings
best_match = None
best_similarity = 0.0
for elem in ui_elements:
if hasattr(elem, 'embedding') and elem.embedding is not None:
similarity = self._compute_similarity(
target_spec.embedding_ref,
elem.embedding
)
if similarity > best_similarity and similarity >= self.similarity_threshold:
best_similarity = similarity
best_match = elem
if best_match:
return ResolvedTarget(
element=best_match,
confidence=best_similarity,
strategy_used=ResolutionStrategy.BY_EMBEDDING.value,
resolution_details={"similarity": best_similarity}
)
return None
def _resolve_by_context(
self,
target_spec: TargetSpec,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""Résoudre par contexte spatial (près de X, dans Y)."""
if not hasattr(target_spec, 'context_hints') or not target_spec.context_hints:
return None
hints = target_spec.context_hints
candidates = list(ui_elements)
# Filtrer par "near" (près de)
if 'near_text' in hints:
anchor = self._find_element_by_text(hints['near_text'], ui_elements)
if anchor:
max_distance = hints.get('max_distance', 200)
candidates = self._filter_by_proximity(candidates, anchor, max_distance)
# Filtrer par "below" (en dessous de)
if 'below_text' in hints:
anchor = self._find_element_by_text(hints['below_text'], ui_elements)
if anchor:
candidates = [e for e in candidates if _bbox_to_tuple(e.bbox)[1] > _bbox_bottom(anchor.bbox)]
# Filtrer par "right_of" (à droite de)
if 'right_of_text' in hints:
anchor = self._find_element_by_text(hints['right_of_text'], ui_elements)
if anchor:
candidates = [e for e in candidates if _bbox_to_tuple(e.bbox)[0] > _bbox_right(anchor.bbox)]
if candidates:
# Prendre le premier candidat (ou appliquer une politique)
return ResolvedTarget(
element=candidates[0],
confidence=candidates[0].confidence * 0.7,
strategy_used=ResolutionStrategy.BY_CONTEXT.value,
resolution_details={"hints_applied": list(hints.keys())}
)
return None
def _resolve_field_for(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""
Résolution directe "field_for" : cherche le champ associé à un label.
Fiche #12: Association label → champ avec logique de formulaire.
Auteur : Dom, Alice Kiro - 19 décembre 2024
Stratégie:
1. Cherche label sur la même ligne à droite (pattern #1 des UI)
2. Fallback : input sous le label, même colonne
Args:
target_spec: Spécification avec context_hints["field_for"]
screen_state: État de l'écran
ui_elements: Liste des éléments UI
context: Contexte de résolution
Returns:
ResolvedTarget si trouvé, None sinon
"""
hints = getattr(target_spec, "context_hints", None) or {}
if "field_for" not in hints or not hints["field_for"]:
return None
hp = self._healing_profile()
label_texts = self._as_text_list(hints["field_for"])
rows, row_of = self._build_rows(ui_elements)
# 1) Trouver les anchors label possibles
anchors = []
for txt in label_texts:
anchors += self._find_anchors_by_text(txt, ui_elements, min_ratio=hp["min_ratio"])
if not anchors:
return None
best = None
best_score = -1.0
best_anchor = None
for anchor in anchors:
a_row = row_of.get(anchor.element_id, None)
if a_row is None:
continue
# Candidats input-ish
candidates = [e for e in ui_elements if self._is_inputish(e)]
candidates = self._apply_hard_constraints(candidates, target_spec, ui_elements)
ax1, ay1, aw, ah = anchor.bbox
a_right = ax1 + aw
a_bottom = ay1 + ah
# 2) Même ligne, à droite
same_row = [e for e in candidates
if row_of.get(e.element_id) == a_row and _bbox_to_tuple(e.bbox)[0] >= a_right - 10]
if same_row:
# Choisir le plus proche horizontalement
same_row.sort(key=lambda e: (_bbox_to_tuple(e.bbox)[0] - a_right, _bbox_area(e.bbox), e.element_id))
chosen = same_row[0]
score = 0.95
else:
# 3) Fallback : sous le label, même colonne (x overlap)
def x_overlap_ratio(b1, b2):
ax1, ax2 = b1[0], _bbox_right(b1)
bx1, bx2 = b2[0], _bbox_right(b2)
ov = max(0.0, min(ax2, bx2) - max(ax1, bx1))
denom = max(1.0, min(b1[2], b2[2]))
return ov / denom
below = [e for e in candidates
if _bbox_to_tuple(e.bbox)[1] >= a_bottom - 5 and x_overlap_ratio(e.bbox, anchor.bbox) > 0.25]
below.sort(key=lambda e: (_bbox_to_tuple(e.bbox)[1] - a_bottom, _bbox_to_tuple(e.bbox)[0], e.element_id))
if not below:
continue
chosen = below[0]
score = 0.85
if score > best_score:
best_score = score
best = chosen
best_anchor = anchor
if not best:
return None
return ResolvedTarget(
element=best,
confidence=min(best_score, 1.0),
strategy_used=ResolutionStrategy.COMPOSITE.value,
resolution_details={
"criteria_used": {"field_for": label_texts},
"anchor_id": best_anchor.element_id if best_anchor else None,
"healing_attempt": int(getattr(self, "healing_attempt", 0) or 0),
}
)
def _resolve_composite(
self,
target_spec: TargetSpec,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""
Résoudre avec plusieurs critères combinés.
Fiche #3: Intègre maintenant context_hints dans la résolution composite.
Auteur: Dom, Alice Kiro - 15 décembre 2024
"""
candidates = list(ui_elements)
scores: Dict[str, float] = {e.element_id: 1.0 for e in candidates}
try:
# Filtrer et scorer par rôle
if target_spec.by_role:
role_lower = target_spec.by_role.lower()
new_candidates = []
for elem in candidates:
elem_role = (getattr(elem, 'role', '') or "").lower()
elem_type = (getattr(elem, 'type', '') or "").lower()
if elem_role == role_lower or elem_type == role_lower:
scores[elem.element_id] *= 1.0
new_candidates.append(elem)
elif role_lower in elem_role or role_lower in elem_type:
scores[elem.element_id] *= 0.8
new_candidates.append(elem)
if new_candidates:
candidates = new_candidates
logger.debug(f"Role filter: {len(new_candidates)} candidates for role '{target_spec.by_role}'")
# Filtrer et scorer par texte
if target_spec.by_text:
text_lower = target_spec.by_text.lower()
new_candidates = []
for elem in candidates:
label = (getattr(elem, 'label', '') or "").lower()
if label == text_lower:
scores[elem.element_id] *= 1.0
new_candidates.append(elem)
elif text_lower in label:
scores[elem.element_id] *= 0.9
new_candidates.append(elem)
elif label and self._fuzzy_match(text_lower, label) >= 0.7:
scores[elem.element_id] *= 0.7
new_candidates.append(elem)
if new_candidates:
candidates = new_candidates
logger.debug(f"Text filter: {len(new_candidates)} candidates for text '{target_spec.by_text}'")
# Scorer par position si fournie
if target_spec.by_position:
x, y = target_spec.by_position
for elem in candidates:
# BBOX format: (x, y, w, h) - Correction Fiche #2
center_x, center_y = _bbox_center(elem.bbox)
distance = math.sqrt((x - center_x) ** 2 + (y - center_y) ** 2)
# Bonus pour proximité
if distance < 50:
scores[elem.element_id] *= 1.2
elif distance < 100:
scores[elem.element_id] *= 1.1
logger.debug(f"Position scoring applied for position ({x}, {y})")
# Fiche #3: Appliquer context_hints AVANT la sélection finale
if hasattr(target_spec, 'context_hints') and target_spec.context_hints:
candidates = self._apply_context_hints_to_candidates(
candidates, target_spec.context_hints, ui_elements, scores
)
logger.debug(f"Context hints applied: {len(candidates)} candidates remaining")
if not candidates:
logger.debug("No candidates remaining after all filters")
return None
# Fiche #11: Multi-anchor + contraintes combinées
# Appliquer hard constraints AVANT le scoring
candidates = self._apply_hard_constraints(candidates, target_spec, ui_elements)
if not candidates:
logger.debug("No candidates remaining after hard constraints")
return None
# Fiche #11: Multi-anchor system - évaluer toutes les combinaisons ancre-candidat
hints = getattr(target_spec, "context_hints", None) or {}
hp = self._healing_profile()
# Mesurer le temps de résolution
resolution_start_time = time.perf_counter()
# Tâche 7.1: Évaluer chaque combinaison ancre-candidat
best_combination = self._evaluate_all_anchor_candidate_combinations(
target_spec, candidates, ui_elements, scores, hints, hp
)
# Utiliser la meilleure combinaison trouvée
selected = best_combination["element"]
anchor = best_combination["anchor"]
final_score = best_combination["score"]
best_details = best_combination["top3"]
tie_break_criterion = best_combination["tie_break_criterion"]
# Vérifier que nous avons trouvé un élément
if not selected:
logger.debug("No element selected after multi-anchor evaluation")
return None
# Fiche #11 - Tâche 7.2: Calculer les métriques de performance complètes
total_duration_ms = (time.perf_counter() - resolution_start_time) * 1000
# Extraire les métriques du cache de calculs si disponible
cache_stats = self._computation_cache.get_stats() if hasattr(self, '_computation_cache') else {}
return ResolvedTarget(
element=selected,
confidence=min(final_score, 1.0),
strategy_used=ResolutionStrategy.COMPOSITE.value,
resolution_details={
"criteria_used": {
"role": target_spec.by_role,
"text": target_spec.by_text,
"position": target_spec.by_position,
"hints": list((getattr(target_spec, "context_hints", None) or {}).keys())
},
"candidates_evaluated": len(candidates),
"top3": best_details or [],
# Fiche #11 - Tâche 7.2: Métadonnées complètes multi-anchor (Exigences 7.1, 7.2, 7.3, 7.5)
"anchor_id": best_combination["anchor_id"],
"anchors_attempted": best_combination["performance_metrics"]["anchor_texts"],
"successful_anchor": best_combination["anchor_id"],
"hard_constraints_applied": getattr(target_spec, "hard_constraints", None) or {},
"candidates_filtered": len(ui_elements) - len(candidates),
"weights_used": getattr(target_spec, "weights", None) or {},
"tie_break_criteria": best_combination["tie_break_criterion"],
"container_resolved": None, # TODO: Extraire du container_bbox si disponible
"performance_metrics": {
"total_resolution_duration_ms": round(total_duration_ms, 2),
"multi_anchor_evaluation": best_combination["performance_metrics"],
"scoring_duration_ms": best_combination["performance_metrics"]["evaluation_duration_ms"],
"container_resolution_duration_ms": 0.0, # TODO: Mesurer séparément
"cache_hits": cache_stats.get("hits", 0),
"cache_misses": cache_stats.get("misses", 0)
},
# Fiche #10: Healing metadata (existant)
"healing_attempt": int(getattr(self, "healing_attempt", 0) or 0),
"healing_profile": hp,
"spatial_padding_used": hp["pad_mul"]
}
)
except Exception as e:
if self.error_handler:
context_data = {
'details': {'operation': 'composite_resolution'},
'original_data': {
'target_spec': target_spec,
'num_candidates': len(candidates) if 'candidates' in locals() else 0
}
}
recovery_result = self.error_handler.handle_error(e, context_data)
if not recovery_result.success:
logger.error(f"Error in composite resolution: {e}")
return None
else:
logger.error(f"Error in composite resolution: {e}")
return None
# =========================================================================
# Résolution par relations spatiales (Exigences 5.3, 5.4)
# =========================================================================
def _resolve_by_spatial(
self,
target_spec: TargetSpec,
ui_elements: List[UIElement],
context: ResolutionContext
) -> Optional[ResolvedTarget]:
"""
Résoudre par relations spatiales avec éléments ancres.
Utilise le SpatialAnalyzer pour trouver des éléments basés sur
leurs relations spatiales avec des ancres connues.
Args:
target_spec: Spécification de la cible
ui_elements: Éléments UI détectés
context: Contexte de résolution
Returns:
ResolvedTarget si trouvé via relation spatiale
"""
if not SPATIAL_ANALYZER_AVAILABLE:
logger.debug("SpatialAnalyzer not available for spatial resolution")
return None
# Vérifier si on a des hints de contexte spatial
if not hasattr(target_spec, 'context_hints') or not target_spec.context_hints:
return None
hints = target_spec.context_hints
# Lazy load du spatial analyzer
if self._spatial_analyzer is None:
self._spatial_analyzer = SpatialAnalyzer()
# Calculer les relations spatiales (avec cache)
screen_id = getattr(context.screen_state, 'screen_state_id', 'default')
if screen_id not in self._spatial_relations_cache:
self._spatial_relations_cache[screen_id] = self._spatial_analyzer.compute_relations(ui_elements)
relations = self._spatial_relations_cache[screen_id]
candidates = list(ui_elements)
anchor_used = None
# Filtrer par relation "below" (en dessous de)
if 'below_text' in hints:
anchor = self._find_element_by_text(hints['below_text'], ui_elements)
if anchor:
anchor_id = self._get_element_id(anchor)
below_ids = self._spatial_analyzer.find_by_relation(
anchor_id, RelationType.BELOW, relations
)
candidates = [e for e in candidates if self._get_element_id(e) in below_ids]
anchor_used = anchor_id
# Filtrer par relation "above" (au-dessus de)
if 'above_text' in hints:
anchor = self._find_element_by_text(hints['above_text'], ui_elements)
if anchor:
anchor_id = self._get_element_id(anchor)
above_ids = self._spatial_analyzer.find_by_relation(
anchor_id, RelationType.ABOVE, relations
)
candidates = [e for e in candidates if self._get_element_id(e) in above_ids]
anchor_used = anchor_id
# Filtrer par relation "right_of" (à droite de)
if 'right_of_text' in hints:
anchor = self._find_element_by_text(hints['right_of_text'], ui_elements)
if anchor:
anchor_id = self._get_element_id(anchor)
right_ids = self._spatial_analyzer.find_by_relation(
anchor_id, RelationType.RIGHT_OF, relations
)
candidates = [e for e in candidates if self._get_element_id(e) in right_ids]
anchor_used = anchor_id
# Filtrer par relation "left_of" (à gauche de)
if 'left_of_text' in hints:
anchor = self._find_element_by_text(hints['left_of_text'], ui_elements)
if anchor:
anchor_id = self._get_element_id(anchor)
left_ids = self._spatial_analyzer.find_by_relation(
anchor_id, RelationType.LEFT_OF, relations
)
candidates = [e for e in candidates if self._get_element_id(e) in left_ids]
anchor_used = anchor_id
# Filtrer par rôle si spécifié
if target_spec.by_role and candidates:
role_lower = target_spec.by_role.lower()
candidates = [e for e in candidates if (getattr(e, 'role', '') or '').lower() == role_lower]
if candidates:
# Sélectionner le meilleur candidat
best = self._apply_selection_policy(
candidates,
target_spec.selection_policy or "first",
context
)
if best:
# Détecter l'état visuel
visual_state = self._detect_visual_state(best)
return ResolvedTarget(
element=best,
confidence=best.confidence * 0.85, # Légère pénalité pour fallback
strategy_used=ResolutionStrategy.BY_SPATIAL.value,
resolution_details={
"hints_applied": list(hints.keys()),
"anchor_used": anchor_used,
"candidates_found": len(candidates)
},
visual_state=visual_state,
spatial_anchor=anchor_used
)
return None
def _get_element_id(self, element: Any) -> str:
"""Extraire l'ID d'un élément."""
if hasattr(element, 'element_id'):
return element.element_id
if hasattr(element, 'id'):
return element.id
if isinstance(element, dict):
return element.get('id', element.get('element_id', str(id(element))))
return str(id(element))
def _apply_context_hints_to_candidates(
self,
candidates: List[UIElement],
context_hints: Dict[str, Any],
all_elements: List[UIElement],
scores: Dict[str, float]
) -> List[UIElement]:
"""
Appliquer les context_hints pour filtrer les candidats.
Fiche #3: Nouvelle méthode pour intégrer context_hints dans la résolution composite.
Auteur: Dom, Alice Kiro - 15 décembre 2024
Args:
candidates: Liste des candidats actuels
context_hints: Dictionnaire des hints contextuels
all_elements: Tous les éléments UI disponibles
scores: Dictionnaire des scores par élément
Returns:
Liste filtrée des candidats
"""
filtered_candidates = list(candidates)
try:
# Filtrer par "below_text" (en dessous de) - Support multi-anchor
if 'below_text' in context_hints:
below_text_value = context_hints['below_text']
anchor_texts = self._as_text_list(below_text_value)
found_anchor = None
for text in anchor_texts:
anchor = self._find_element_by_text(text, all_elements)
if anchor:
found_anchor = anchor
break
if found_anchor:
anchor_bottom = _bbox_bottom(found_anchor.bbox)
filtered_candidates = [
e for e in filtered_candidates
if _bbox_to_tuple(e.bbox)[1] > anchor_bottom
]
# Bonus de score pour proximité verticale
for elem in filtered_candidates:
vertical_distance = abs(_bbox_to_tuple(elem.bbox)[1] - anchor_bottom)
if vertical_distance < 100:
scores[elem.element_id] *= 1.1
logger.debug(f"below_text '{below_text_value}': {len(filtered_candidates)} candidates")
# Filtrer par "above_text" (au-dessus de) - Support multi-anchor
if 'above_text' in context_hints:
above_text_value = context_hints['above_text']
anchor_texts = self._as_text_list(above_text_value)
found_anchor = None
for text in anchor_texts:
anchor = self._find_element_by_text(text, all_elements)
if anchor:
found_anchor = anchor
break
if found_anchor:
anchor_top = _bbox_to_tuple(found_anchor.bbox)[1]
filtered_candidates = [
e for e in filtered_candidates
if _bbox_bottom(e.bbox) < anchor_top
]
# Bonus de score pour proximité verticale
for elem in filtered_candidates:
vertical_distance = abs(_bbox_bottom(elem.bbox) - anchor_top)
if vertical_distance < 100:
scores[elem.element_id] *= 1.1
logger.debug(f"above_text '{above_text_value}': {len(filtered_candidates)} candidates")
# Filtrer par "right_of_text" (à droite de) - Support multi-anchor
if 'right_of_text' in context_hints:
right_of_text_value = context_hints['right_of_text']
anchor_texts = self._as_text_list(right_of_text_value)
found_anchor = None
for text in anchor_texts:
anchor = self._find_element_by_text(text, all_elements)
if anchor:
found_anchor = anchor
break
if found_anchor:
anchor_right = _bbox_right(found_anchor.bbox)
filtered_candidates = [
e for e in filtered_candidates
if _bbox_to_tuple(e.bbox)[0] > anchor_right
]
# Bonus de score pour proximité horizontale
for elem in filtered_candidates:
horizontal_distance = abs(_bbox_to_tuple(elem.bbox)[0] - anchor_right)
if horizontal_distance < 100:
scores[elem.element_id] *= 1.1
logger.debug(f"right_of_text '{right_of_text_value}': {len(filtered_candidates)} candidates")
# Filtrer par "left_of_text" (à gauche de) - Support multi-anchor
if 'left_of_text' in context_hints:
left_of_text_value = context_hints['left_of_text']
anchor_texts = self._as_text_list(left_of_text_value)
found_anchor = None
for text in anchor_texts:
anchor = self._find_element_by_text(text, all_elements)
if anchor:
found_anchor = anchor
break
if found_anchor:
anchor_left = _bbox_to_tuple(found_anchor.bbox)[0]
filtered_candidates = [
e for e in filtered_candidates
if _bbox_right(e.bbox) < anchor_left
]
# Bonus de score pour proximité horizontale
for elem in filtered_candidates:
horizontal_distance = abs(_bbox_right(elem.bbox) - anchor_left)
if horizontal_distance < 100:
scores[elem.element_id] *= 1.1
logger.debug(f"left_of_text '{left_of_text_value}': {len(filtered_candidates)} candidates")
# Filtrer par "near_text" (près de) - Support multi-anchor
if 'near_text' in context_hints:
near_text_value = context_hints['near_text']
# Fiche #11: Support multi-anchor - convertir en liste
anchor_texts = self._as_text_list(near_text_value)
# Trouver au moins une ancre parmi les textes possibles
found_anchor = None
for text in anchor_texts:
anchor = self._find_element_by_text(text, all_elements)
if anchor:
found_anchor = anchor
break
if found_anchor:
max_distance = context_hints.get('max_distance', 200)
filtered_candidates = self._filter_by_proximity(
filtered_candidates, found_anchor, max_distance
)
# Bonus de score inversement proportionnel à la distance
anchor_center = _bbox_center(found_anchor.bbox)
for elem in filtered_candidates:
elem_center = _bbox_center(elem.bbox)
distance = math.sqrt(
(elem_center[0] - anchor_center[0]) ** 2 +
(elem_center[1] - anchor_center[1]) ** 2
)
if distance < max_distance:
proximity_bonus = 1.0 + (1.0 - distance / max_distance) * 0.2
scores[elem.element_id] *= proximity_bonus
logger.debug(f"near_text '{near_text_value}': {len(filtered_candidates)} candidates")
# Amélioration: Filtrer par "within_region" (dans une région)
if 'within_region' in context_hints:
region = context_hints['within_region']
if isinstance(region, (list, tuple)) and len(region) == 4:
x, y, w, h = region
filtered_candidates = [
e for e in filtered_candidates
if (_bbox_to_tuple(e.bbox)[0] >= x and _bbox_to_tuple(e.bbox)[1] >= y and
_bbox_to_tuple(e.bbox)[0] + _bbox_to_tuple(e.bbox)[2] <= x + w and
_bbox_to_tuple(e.bbox)[1] + _bbox_to_tuple(e.bbox)[3] <= y + h)
]
logger.debug(f"within_region {region}: {len(filtered_candidates)} candidates")
# Amélioration: Exclure par "exclude_near_text" (exclure près de) - Support multi-anchor
if 'exclude_near_text' in context_hints:
exclude_near_text_value = context_hints['exclude_near_text']
anchor_texts = self._as_text_list(exclude_near_text_value)
found_exclude_anchor = None
for text in anchor_texts:
exclude_anchor = self._find_element_by_text(text, all_elements)
if exclude_anchor:
found_exclude_anchor = exclude_anchor
break
if found_exclude_anchor:
exclude_distance = context_hints.get('exclude_distance', 100)
exclude_candidates = self._filter_by_proximity(
filtered_candidates, found_exclude_anchor, exclude_distance
)
# Retirer les candidats trop proches de l'ancre d'exclusion
filtered_candidates = [
e for e in filtered_candidates if e not in exclude_candidates
]
logger.debug(f"exclude_near_text '{exclude_near_text_value}': {len(filtered_candidates)} candidates")
return filtered_candidates
except Exception as e:
logger.error(f"Error applying context hints: {e}")
return candidates # Retourner les candidats originaux en cas d'erreur
# =========================================================================
# Détection d'état visuel (Exigence 5.5)
# =========================================================================
def _detect_visual_state(self, element: UIElement) -> VisualState:
"""
Détecter l'état visuel d'un élément UI.
Analyse les propriétés visuelles pour déterminer si l'élément est:
- enabled/disabled (couleur, opacité, bordure)
- focused/hovered
- checked/unchecked
- loading/error
Args:
element: Élément UI à analyser
Returns:
VisualState détecté
"""
# Vérifier les attributs directs
if hasattr(element, 'state'):
state_str = str(element.state).lower()
if 'disabled' in state_str:
return VisualState.DISABLED
if 'focused' in state_str:
return VisualState.FOCUSED
if 'checked' in state_str:
return VisualState.CHECKED
if 'loading' in state_str:
return VisualState.LOADING
if 'error' in state_str:
return VisualState.ERROR
# Vérifier les attributs de style
if hasattr(element, 'style') or hasattr(element, 'attributes'):
attrs = getattr(element, 'attributes', {}) or getattr(element, 'style', {})
# Vérifier opacité (disabled souvent = opacité réduite)
opacity = attrs.get('opacity', 1.0)
if isinstance(opacity, (int, float)) and opacity < 0.5:
return VisualState.DISABLED
# Vérifier couleur de fond (gris = souvent disabled)
bg_color = attrs.get('background_color', attrs.get('backgroundColor', ''))
if self._is_disabled_color(bg_color):
return VisualState.DISABLED
# Vérifier bordure (rouge = souvent erreur)
border_color = attrs.get('border_color', attrs.get('borderColor', ''))
if self._is_error_color(border_color):
return VisualState.ERROR
# Vérifier le texte pour indices
if hasattr(element, 'label') and element.label:
label_lower = element.label.lower()
if 'loading' in label_lower or 'chargement' in label_lower:
return VisualState.LOADING
if 'error' in label_lower or 'erreur' in label_lower:
return VisualState.ERROR
# Vérifier la confiance (faible confiance peut indiquer état inhabituel)
if hasattr(element, 'confidence') and element.confidence < 0.5:
return VisualState.UNKNOWN
return VisualState.ENABLED
def _is_disabled_color(self, color: str) -> bool:
"""Vérifier si une couleur indique un état disabled."""
if not color:
return False
color_lower = color.lower()
# Couleurs grises typiques pour disabled
disabled_patterns = ['gray', 'grey', '#808080', '#a0a0a0', '#c0c0c0', 'rgb(128', 'rgb(160', 'rgb(192']
return any(pattern in color_lower for pattern in disabled_patterns)
def _is_error_color(self, color: str) -> bool:
"""Vérifier si une couleur indique un état erreur."""
if not color:
return False
color_lower = color.lower()
# Couleurs rouges typiques pour erreur
error_patterns = ['red', '#ff0000', '#f00', 'rgb(255,0,0', 'rgb(255, 0, 0', '#dc3545', '#d9534f']
return any(pattern in color_lower for pattern in error_patterns)
def resolve_with_spatial_fallback(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
anchor_elements: List[UIElement] = None,
context: Optional[ResolutionContext] = None
) -> Optional[ResolvedTarget]:
"""
Résoudre une cible avec fallback spatial automatique.
Essaie d'abord les stratégies standard, puis utilise les relations
spatiales avec les éléments ancres si le match direct échoue.
Args:
target_spec: Spécification de la cible
screen_state: État actuel de l'écran
anchor_elements: Éléments ancres pour fallback spatial
context: Contexte additionnel
Returns:
ResolvedTarget ou None
"""
# Essayer résolution standard
result = self.resolve_target(target_spec, screen_state, context)
if result:
# Ajouter détection d'état visuel
result.visual_state = self._detect_visual_state(result.element)
return result
# Fallback spatial si activé et ancres disponibles
if not self.use_spatial_fallback or not anchor_elements:
return None
logger.debug(f"Attempting spatial fallback with {len(anchor_elements)} anchors")
# Créer contexte avec ancres
if context is None:
context = ResolutionContext(screen_state=screen_state)
context.anchor_elements = anchor_elements
ui_elements = self._get_ui_elements(screen_state)
# Essayer résolution spatiale
result = self._resolve_by_spatial(target_spec, ui_elements, context)
if result:
result.fallback_applied = True
self._stats["spatial_fallbacks"] += 1
logger.info(f"Spatial fallback successful for target: {target_spec.by_role or target_spec.by_text}")
return result
# =========================================================================
# Utilitaires
# =========================================================================
def _get_ui_elements(self, screen_state: ScreenState) -> List[UIElement]:
"""Extraire les éléments UI du screen state."""
# Essayer différentes sources
if hasattr(screen_state, 'ui_elements') and screen_state.ui_elements:
return screen_state.ui_elements
if hasattr(screen_state, 'perception') and screen_state.perception:
if hasattr(screen_state.perception, 'ui_elements'):
return screen_state.perception.ui_elements or []
return []
def _apply_selection_policy(
self,
candidates: List[UIElement],
policy: str,
context: ResolutionContext
) -> Optional[UIElement]:
"""Appliquer une politique de sélection."""
if not candidates:
return None
if policy == "first":
# Premier dans l'ordre de lecture (top-left)
return min(candidates, key=lambda e: (_bbox_to_tuple(e.bbox)[1], _bbox_to_tuple(e.bbox)[0]))
elif policy == "last":
# Dernier dans l'ordre de lecture
return max(candidates, key=lambda e: (_bbox_to_tuple(e.bbox)[1], _bbox_to_tuple(e.bbox)[0]))
elif policy == "by_similarity" or policy == "highest_confidence":
return max(candidates, key=lambda e: e.confidence)
elif policy == "largest":
# Plus grand élément
return max(candidates, key=lambda e: _bbox_area(e.bbox))
elif policy == "nearest_to_previous":
# Plus proche de la cible précédente
# BBOX format: (x, y, w, h) - Correction Fiche #2
if context.previous_target:
prev_bbox = _bbox_to_tuple(context.previous_target.bbox)
prev_center = (
prev_bbox[0] + prev_bbox[2] / 2, # x + w/2
prev_bbox[1] + prev_bbox[3] / 2 # y + h/2
)
return min(candidates, key=lambda e: (
((_bbox_to_tuple(e.bbox)[0] + _bbox_to_tuple(e.bbox)[2]/2) - prev_center[0])**2 +
((_bbox_to_tuple(e.bbox)[1] + _bbox_to_tuple(e.bbox)[3]/2) - prev_center[1])**2
))
# Default: premier
return candidates[0]
def _fuzzy_match(self, s1: str, s2: str) -> float:
"""Calcul de similarité fuzzy (Levenshtein normalisé)."""
if not s1 or not s2:
return 0.0
if s1 == s2:
return 1.0
# Levenshtein distance
len1, len2 = len(s1), len(s2)
if len1 < len2:
s1, s2 = s2, s1
len1, len2 = len2, len1
distances = range(len2 + 1)
for i, c1 in enumerate(s1):
new_distances = [i + 1]
for j, c2 in enumerate(s2):
if c1 == c2:
new_distances.append(distances[j])
else:
new_distances.append(1 + min(distances[j], distances[j+1], new_distances[-1]))
distances = new_distances
max_len = max(len1, len2)
return 1.0 - (distances[-1] / max_len)
def _compute_similarity(self, emb1: np.ndarray, emb2: np.ndarray) -> float:
"""Calculer la similarité cosinus entre deux embeddings."""
if emb1 is None or emb2 is None:
return 0.0
norm1 = np.linalg.norm(emb1)
norm2 = np.linalg.norm(emb2)
if norm1 == 0 or norm2 == 0:
return 0.0
return float(np.dot(emb1, emb2) / (norm1 * norm2))
def _filter_by_proximity(
self,
elements: List[UIElement],
anchor: UIElement,
max_distance: float
) -> List[UIElement]:
"""
Filtrer les éléments par proximité à un ancre avec cache optimisé.
Tâche 5.4: Utilise le cache de calculs pour éviter les recalculs de distance.
"""
# BBOX format: (x, y, w, h) - Correction Fiche #2
# Auteur: Dom, Alice Kiro - 15 décembre 2024
anchor_center = cached_bbox_center(_bbox_to_tuple(anchor.bbox))
filtered = []
for elem in elements:
# Utiliser le cache de calculs pour la distance
distance = self._computation_cache.get_distance(
elem.element_id,
anchor.element_id,
lambda: cached_euclidean_distance(
cached_bbox_center(_bbox_to_tuple(elem.bbox)),
anchor_center
)
)
if distance <= max_distance:
filtered.append(elem)
return filtered
def _is_composite_spec(self, target_spec: TargetSpec) -> bool:
"""
Vérifier si la spec utilise plusieurs critères.
Fiche #3: Inclut maintenant context_hints comme critère composite.
Auteur: Dom, Alice Kiro - 15 décembre 2024
"""
criteria_count = sum([
target_spec.by_role is not None,
target_spec.by_text is not None,
target_spec.by_position is not None,
hasattr(target_spec, 'context_hints') and target_spec.context_hints is not None and len(target_spec.context_hints) > 0
])
return criteria_count >= 2
# =========================================================================
# Cache
# =========================================================================
def _make_cache_key(self, target_spec: TargetSpec, screen_state: ScreenState) -> str:
"""
Créer une clé de cache.
Fiche #3: Inclut maintenant context_hints dans la clé de cache.
Auteur: Dom, Alice Kiro - 15 décembre 2024
"""
# Sérialiser context_hints pour la clé de cache
context_hints_str = ""
if hasattr(target_spec, 'context_hints') and target_spec.context_hints:
try:
context_hints_str = json.dumps(target_spec.context_hints, sort_keys=True)
except Exception:
context_hints_str = str(target_spec.context_hints)
spec_key = f"{target_spec.by_role}|{target_spec.by_text}|{target_spec.by_position}|{context_hints_str}"
state_key = screen_state.screen_state_id if hasattr(screen_state, 'screen_state_id') else "unknown"
return f"{spec_key}:{state_key}"
def _add_to_cache(self, key: str, result: ResolvedTarget) -> None:
"""Ajouter au cache LRU."""
if key in self._cache:
self._cache.move_to_end(key)
else:
self._cache[key] = result
if len(self._cache) > self._cache_size:
self._cache.popitem(last=False)
def _record_in_cross_frame_memory(self, target_spec, screen_state, ui_elements, selected_element):
"""Enregistrer en mémoire cross-frame à chaque succès"""
sig = screen_signature(screen_state, ui_elements, mode="layout")
ckey = self._make_cross_key(target_spec, sig)
self._cross_frame_cache[ckey] = TargetFingerprint.from_element(selected_element)
self._cross_frame_cache.move_to_end(ckey)
if len(self._cross_frame_cache) > self._cross_frame_cache_size:
self._cross_frame_cache.popitem(last=False)
def clear_cache(self) -> None:
"""
Vider tous les caches avec optimisations.
Tâche 5.4: Inclut le nettoyage du cache de calculs.
"""
self._cache.clear()
self._computation_cache.clear()
logger.debug("All caches cleared (resolution + computation)")
def _get_spatial_index(self, screen_state: ScreenState, ui_elements: List[UIElement]) -> SpatialIndexGrid:
"""
Obtient ou crée un index spatial pour un screen_state donné avec signature optimisée.
Tâche 5.1: Cache de l'index spatial par signature d'écran.
Évite la reconstruction à chaque résolution pour la même disposition UI.
Auteur : Dom, Alice Kiro - 20 décembre 2024
"""
# Utiliser signature d'écran au lieu de screen_state_id pour plus de robustesse
screen_sig = screen_signature(screen_state, ui_elements, mode="layout")
if screen_sig in self._index_cache:
self._index_cache.move_to_end(screen_sig)
logger.debug(f"Spatial index cache hit for signature: {screen_sig[:16]}...")
return self._index_cache[screen_sig]
# Construire nouvel index spatial
try:
sw = int(screen_state.window.screen_resolution[0])
except Exception:
sw = 1920
# cell_size "UI-friendly" : ~ 1/12e de la largeur, borné
cell = max(120, min(220, sw // 12))
logger.debug(f"Building spatial index for {len(ui_elements)} elements with cell_size={cell}")
start_time = time.perf_counter()
idx = SpatialIndexGrid(cell_size=cell).build(ui_elements)
build_time = (time.perf_counter() - start_time) * 1000
logger.debug(f"Spatial index built in {build_time:.2f}ms")
# Ajouter au cache avec éviction LRU
self._index_cache[screen_sig] = idx
self._index_cache.move_to_end(screen_sig)
if len(self._index_cache) > self._index_cache_size:
evicted_key = next(iter(self._index_cache))
self._index_cache.popitem(last=False)
logger.debug(f"Evicted spatial index cache entry: {evicted_key[:16]}...")
return idx
def clear_index_cache(self) -> None:
"""Vider le cache d'index spatiaux."""
self._index_cache.clear()
# =========================================================================
# Stats
# =========================================================================
def get_stats(self) -> Dict[str, Any]:
"""
Obtenir les statistiques de résolution avec optimisations.
Tâche 5.4: Inclut les stats du cache de calculs.
"""
base_stats = dict(self._stats)
# Ajouter les stats du cache de calculs
computation_stats = self._computation_cache.get_stats()
base_stats['computation_cache'] = computation_stats
# Ajouter les stats des caches d'index
base_stats['spatial_index_cache'] = {
'size': len(self._index_cache),
'max_size': self._index_cache_size
}
# Ajouter les stats de mémoire cross-frame
base_stats['cross_frame_cache'] = {
'size': len(self._cross_frame_cache),
'max_size': self._cross_frame_cache_size
}
return base_stats
def reset_stats(self) -> None:
"""Réinitialiser les statistiques."""
self._stats = {
"total_resolutions": 0,
"successful": 0,
"failed": 0,
"cache_hits": 0,
"fallbacks_used": 0,
"spatial_fallbacks": 0,
"by_strategy": {}
}
def clear_spatial_cache(self) -> None:
"""Vider le cache des relations spatiales."""
self._spatial_relations_cache.clear()
def get_spatial_analyzer(self) -> Optional[SpatialAnalyzer]:
"""Récupérer l'analyseur spatial (lazy load)."""
if self._spatial_analyzer is None and SPATIAL_ANALYZER_AVAILABLE:
self._spatial_analyzer = SpatialAnalyzer()
return self._spatial_analyzer
# =========================================================================
# Fiche #18 - Apprentissage persistant "mix" (JSONL + SQLite)
# =========================================================================
def _lookup_from_persistent_memory(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
ui_elements: List[UIElement]
) -> Optional[ResolvedTarget]:
"""
Rechercher une résolution depuis la mémoire persistante.
Fiche #18: Lookup avant résolution coûteuse pour réutiliser
les apprentissages précédents.
Args:
target_spec: Spécification de la cible
screen_state: État actuel de l'écran
ui_elements: Éléments UI disponibles
Returns:
ResolvedTarget si trouvé en mémoire, None sinon
"""
if not self.enable_persistent_learning or not self._persistent_memory:
return None
try:
# Générer signature d'écran
from .screen_signature import screen_signature
screen_sig = screen_signature(screen_state, ui_elements, mode="layout")
# Lookup dans la mémoire persistante
fingerprint = self._persistent_memory.lookup(screen_sig, target_spec)
if fingerprint is None:
return None
# Essayer de retrouver l'élément par element_id
element_by_id = {e.element_id: e for e in ui_elements}
if fingerprint.element_id in element_by_id:
element = element_by_id[fingerprint.element_id]
return ResolvedTarget(
element=element,
confidence=min(0.95, fingerprint.confidence),
strategy_used="PERSISTENT_MEMORY_ID",
fallback_applied=False,
resolution_details={
"source": "persistent_memory",
"lookup_method": "element_id",
"screen_signature": screen_sig[:16] + "..."
}
)
# Fallback: recherche par position et caractéristiques
best_match = None
best_score = 0.0
fp_center = (
fingerprint.bbox[0] + fingerprint.bbox[2] / 2,
fingerprint.bbox[1] + fingerprint.bbox[3] / 2
)
for elem in ui_elements:
score = 0.0
# Score par rôle/type
elem_role = (getattr(elem, 'role', '') or '').lower()
elem_type = (getattr(elem, 'type', '') or '').lower()
if fingerprint.role and elem_role == fingerprint.role.lower():
score += 2.0
if fingerprint.etype and elem_type == fingerprint.etype.lower():
score += 1.5
# Score par proximité spatiale
elem_center = _bbox_center(elem.bbox)
distance = ((elem_center[0] - fp_center[0]) ** 2 +
(elem_center[1] - fp_center[1]) ** 2) ** 0.5
# Tolérance de 100px pour les changements de layout
if distance <= 100:
score += (100 - distance) / 100.0
# Score par label si disponible
if fingerprint.label and hasattr(elem, 'label'):
elem_label = (getattr(elem, 'label', '') or '').strip().lower()
fp_label = fingerprint.label.strip().lower()
if fp_label and elem_label:
if fp_label == elem_label:
score += 1.0
elif fp_label in elem_label or elem_label in fp_label:
score += 0.7
else:
# Fuzzy matching
ratio = _fuzzy_ratio(fp_label, elem_label)
if ratio >= 0.8:
score += ratio * 0.6
if score > best_score and score >= 2.0: # Seuil minimum
best_score = score
best_match = elem
if best_match:
return ResolvedTarget(
element=best_match,
confidence=min(0.90, best_score / 5.0), # Normaliser sur 5.0 max
strategy_used="PERSISTENT_MEMORY_SPATIAL",
fallback_applied=False,
resolution_details={
"source": "persistent_memory",
"lookup_method": "spatial_matching",
"match_score": round(best_score, 3),
"screen_signature": screen_sig[:16] + "..."
}
)
return None
except Exception as e:
logger.debug(f"Persistent memory lookup failed: {e}")
return None
def record_resolution_success(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
ui_elements: List[UIElement],
resolved_element: UIElement,
strategy_used: str,
confidence: float
) -> None:
"""
Enregistrer une résolution réussie dans la mémoire persistante.
Fiche #18: Hook appelé après validation des post-conditions
pour apprendre des résolutions réussies.
Args:
target_spec: Spécification de la cible
screen_state: État de l'écran
ui_elements: Éléments UI disponibles
resolved_element: Élément résolu avec succès
strategy_used: Stratégie de résolution utilisée
confidence: Confiance de la résolution
"""
if not self.enable_persistent_learning or not self._persistent_memory:
return
try:
# Générer signature d'écran
from .screen_signature import screen_signature
screen_sig = screen_signature(screen_state, ui_elements, mode="layout")
# Créer fingerprint de l'élément résolu
fingerprint = PersistentFingerprint(
element_id=resolved_element.element_id,
bbox=tuple(_bbox_to_tuple(resolved_element.bbox)),
role=getattr(resolved_element, 'role', None),
etype=getattr(resolved_element, 'type', None),
label=getattr(resolved_element, 'label', None),
confidence=confidence
)
# Enregistrer le succès
self._persistent_memory.record_success(
screen_signature=screen_sig,
target_spec=target_spec,
fingerprint=fingerprint,
strategy_used=strategy_used,
confidence=confidence
)
logger.debug(
f"Recorded successful resolution in persistent memory: "
f"element={resolved_element.element_id} strategy={strategy_used} "
f"confidence={confidence:.3f}"
)
except Exception as e:
logger.warning(f"Failed to record resolution success: {e}")
def record_resolution_failure(
self,
target_spec: TargetSpec,
screen_state: ScreenState,
ui_elements: List[UIElement],
error_message: str
) -> None:
"""
Enregistrer un échec de résolution dans la mémoire persistante.
Fiche #18: Hook appelé après échec de résolution pour
tracker les patterns problématiques.
Args:
target_spec: Spécification de la cible
screen_state: État de l'écran
ui_elements: Éléments UI disponibles
error_message: Message d'erreur
"""
if not self.enable_persistent_learning or not self._persistent_memory:
return
try:
# Générer signature d'écran
from .screen_signature import screen_signature
screen_sig = screen_signature(screen_state, ui_elements, mode="layout")
# Enregistrer l'échec
self._persistent_memory.record_failure(
screen_signature=screen_sig,
target_spec=target_spec,
error_message=error_message
)
logger.debug(
f"Recorded resolution failure in persistent memory: "
f"error='{error_message}'"
)
except Exception as e:
logger.warning(f"Failed to record resolution failure: {e}")
def get_persistent_memory_stats(self) -> Optional[Dict[str, Any]]:
"""
Obtenir les statistiques de la mémoire persistante.
Returns:
Dictionnaire avec statistiques ou None si non disponible
"""
if not self.enable_persistent_learning or not self._persistent_memory:
return None
try:
return self._persistent_memory.get_stats()
except Exception as e:
logger.warning(f"Failed to get persistent memory stats: {e}")
return None
def cleanup_persistent_memory(self, days_to_keep: int = 90) -> Optional[int]:
"""
Nettoyer les entrées anciennes de la mémoire persistante.
Args:
days_to_keep: Nombre de jours à conserver
Returns:
Nombre d'entrées supprimées ou None si non disponible
"""
if not self.enable_persistent_learning or not self._persistent_memory:
return None
try:
return self._persistent_memory.cleanup_old_entries(days_to_keep)
except Exception as e:
logger.warning(f"Failed to cleanup persistent memory: {e}")
return None