diff --git a/core/pipeline/edge_scorer.py b/core/pipeline/edge_scorer.py new file mode 100644 index 000000000..7be6e28c2 --- /dev/null +++ b/core/pipeline/edge_scorer.py @@ -0,0 +1,380 @@ +""" +EdgeScorer — Sélection robuste d'un edge parmi plusieurs candidats. + +Au lieu de prendre "le premier edge sortant" (comportement legacy), +ce module : + + 1. Applique un **filtre dur** : rejette les edges dont les `pre_conditions` + (EdgeConstraints) échouent étant donné le ScreenState courant. + 2. Applique un **ranking léger** : score composite + - `stats.success_rate` (pondéré fort) + - match du `target_spec` (présence d'un UI element compatible) + - récence (dernière exécution réussie) + 3. Retourne le meilleur edge, ou `None` si aucun ne passe le filtre. + +API principale : + >>> scorer = EdgeScorer() + >>> edge = scorer.select_best(edges, screen_state=state) + +Les scores individuels sont exposés via `score_edge()` pour les tests +et la télémétrie. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from datetime import datetime +from typing import List, Optional, Sequence + +from core.models.screen_state import ScreenState +from core.models.workflow_graph import WorkflowEdge + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Résultat de scoring (utile pour la télémétrie / debug) +# ============================================================================= + + +@dataclass +class EdgeScore: + """Résultat détaillé du scoring d'un edge.""" + + edge: WorkflowEdge + total: float + success_rate: float + target_match: float + recency: float + passed_preconditions: bool + precondition_reason: str = "OK" + + def __lt__(self, other: "EdgeScore") -> bool: + # Utilisé par sorted() : plus grand score = meilleur + return self.total < other.total + + +# ============================================================================= +# Scorer +# ============================================================================= + + +class EdgeScorer: + """ + Sélectionne le meilleur edge sortant étant donné un ScreenState. + + Les poids par défaut peuvent être ajustés à la construction. + """ + + def __init__( + self, + weight_success_rate: float = 0.55, + weight_target_match: float = 0.35, + weight_recency: float = 0.10, + default_success_rate: float = 0.5, + ): + """ + Args: + weight_success_rate: poids du `edge.stats.success_rate` + weight_target_match: poids du match `target_spec` / `ui_elements` + weight_recency: poids de la récence de la dernière exécution + default_success_rate: valeur quand l'edge n'a jamais été exécuté + """ + total = weight_success_rate + weight_target_match + weight_recency + if total <= 0: + raise ValueError("La somme des poids doit être > 0") + # Normalisation silencieuse + self.w_success = weight_success_rate / total + self.w_target = weight_target_match / total + self.w_recency = weight_recency / total + self.default_success_rate = default_success_rate + + # ------------------------------------------------------------------------- + # API publique + # ------------------------------------------------------------------------- + + def select_best( + self, + edges: Sequence[WorkflowEdge], + screen_state: Optional[ScreenState] = None, + strategy: str = "best", + source_similarity: float = 1.0, + ) -> Optional[WorkflowEdge]: + """ + Sélectionne le meilleur edge. + + Args: + edges: Liste des edges candidats (généralement les sortants d'un node) + screen_state: État courant pour évaluer pre_conditions et target_spec + strategy: "best" (défaut, score complet) ou "first" (legacy, premier edge) + source_similarity: confiance du matching qui a identifié le node + source courant (valeur propagée depuis `match_current_state`). + Utilisée pour évaluer la précondition ``min_source_similarity`` + de chaque edge. Défaut à ``1.0`` pour compat avec les appelants + qui ne la fournissent pas encore. + + Returns: + Meilleur edge ou None si aucun ne passe les pre_conditions + """ + if not edges: + return None + + if strategy == "first": + # Comportement legacy — retourne le premier edge quoi qu'il arrive + return edges[0] + + scores = self.rank( + edges, screen_state=screen_state, source_similarity=source_similarity + ) + + # Filtrer ceux qui ont passé les pre_conditions + valid = [s for s in scores if s.passed_preconditions] + if not valid: + # Aucun edge valide → log pour debug, retourner None + reasons = "; ".join( + f"{s.edge.edge_id}: {s.precondition_reason}" for s in scores[:5] + ) + logger.warning( + f"[EdgeScorer] Aucun edge valide parmi {len(edges)} candidats. " + f"Raisons: {reasons}" + ) + return None + + best = valid[0].edge # déjà trié par score décroissant + logger.debug( + f"[EdgeScorer] Sélection {best.edge_id} " + f"(score={valid[0].total:.3f}, parmi {len(valid)} valides)" + ) + return best + + def rank( + self, + edges: Sequence[WorkflowEdge], + screen_state: Optional[ScreenState] = None, + source_similarity: float = 1.0, + ) -> List[EdgeScore]: + """ + Retourne la liste des edges triés par score décroissant, + avec le détail pour chaque edge. + + Tiebreak : `success_rate` le plus haut. + + Args: + edges: edges candidats + screen_state: état courant (pour pre_conditions + target_match) + source_similarity: confiance du match courant, propagée aux + pre_conditions pour vérifier ``min_source_similarity`` + """ + scored = [ + self.score_edge(edge, screen_state, source_similarity=source_similarity) + for edge in edges + ] + # Tri : score total décroissant, puis success_rate décroissant + scored.sort(key=lambda s: (s.total, s.success_rate), reverse=True) + return scored + + # ------------------------------------------------------------------------- + # Scoring par edge + # ------------------------------------------------------------------------- + + def score_edge( + self, + edge: WorkflowEdge, + screen_state: Optional[ScreenState] = None, + source_similarity: float = 1.0, + ) -> EdgeScore: + """ + Calcule le score d'un edge. + + Les pre_conditions sont évaluées ici mais servent uniquement de filtre + dur (le score total reste calculé, mais `passed_preconditions` est à False). + + Args: + edge: edge à scorer + screen_state: état courant (fenêtre, textes, ui_elements) + source_similarity: confiance du matching courant, injectée dans + ``EdgeConstraints.check_preconditions`` pour évaluer + ``min_source_similarity``. + """ + # 1. Pre-conditions : filtre dur + passed, reason = self._check_preconditions( + edge, screen_state, source_similarity=source_similarity + ) + + # 2. Success rate (dépend des stats existantes) + success_rate = self._score_success_rate(edge) + + # 3. Target match (UI element présent ?) + target_match = self._score_target_match(edge, screen_state) + + # 4. Récence + recency = self._score_recency(edge) + + total = ( + self.w_success * success_rate + + self.w_target * target_match + + self.w_recency * recency + ) + + return EdgeScore( + edge=edge, + total=total, + success_rate=success_rate, + target_match=target_match, + recency=recency, + passed_preconditions=passed, + precondition_reason=reason, + ) + + # ------------------------------------------------------------------------- + # Composantes du score + # ------------------------------------------------------------------------- + + def _check_preconditions( + self, + edge: WorkflowEdge, + screen_state: Optional[ScreenState], + source_similarity: float = 1.0, + ) -> tuple[bool, str]: + """ + Vérifier les pre_conditions de l'edge. + + Si pas de ScreenState, on ne peut rien vérifier → on laisse passer + (mais on loggue). + + Args: + edge: edge à évaluer + screen_state: état courant (None si non dispo) + source_similarity: confiance du matching courant propagée par + l'appelant (EdgeScorer.score_edge/rank/select_best). Elle + alimente ``EdgeConstraints.check_preconditions`` pour rendre + effective la contrainte ``min_source_similarity``. + """ + constraints = edge.constraints + if constraints is None: + return True, "OK (pas de contraintes)" + + if screen_state is None: + # Pas de ScreenState → on ne peut évaluer ni fenêtre, ni textes, + # mais la similarité source reste vérifiable. + try: + ok, reason = constraints.check_preconditions( + window_title="", + app_name="", + detected_texts=[], + source_similarity=source_similarity, + ) + if not ok: + return ok, reason + except Exception as e: + logger.warning(f"[EdgeScorer] Erreur check_preconditions: {e}") + return True, f"Erreur ignorée: {e}" + return True, "OK (pas de ScreenState pour évaluer)" + + window_title = screen_state.window.window_title if screen_state.window else "" + app_name = screen_state.window.app_name if screen_state.window else "" + detected_texts = ( + screen_state.perception.detected_text + if screen_state.perception + else [] + ) + + try: + ok, reason = constraints.check_preconditions( + window_title=window_title, + app_name=app_name, + detected_texts=detected_texts, + source_similarity=source_similarity, + ) + return ok, reason + except Exception as e: + logger.warning(f"[EdgeScorer] Erreur check_preconditions: {e}") + # En cas d'erreur, on ne bloque pas l'edge + return True, f"Erreur ignorée: {e}" + + def _score_success_rate(self, edge: WorkflowEdge) -> float: + """Score basé sur `edge.stats.success_rate`.""" + if edge.stats is None or edge.stats.execution_count == 0: + return self.default_success_rate + return max(0.0, min(1.0, edge.stats.success_rate)) + + def _score_target_match( + self, + edge: WorkflowEdge, + screen_state: Optional[ScreenState], + ) -> float: + """ + Score de correspondance entre le `target_spec` de l'action et + les `ui_elements` de l'écran courant. + + Retourne : + - 1.0 si un élément matche strictement (texte ou rôle) + - 0.5 si aucun screen_state fourni (neutre, pas pénalisant) + - 0.0 si aucun élément compatible + """ + if screen_state is None: + return 0.5 + + target = edge.action.target if edge.action else None + if target is None: + return 0.5 + + ui_elements = screen_state.ui_elements or [] + if not ui_elements: + # Pas d'UI détectée → on ne peut pas trancher, neutre + return 0.5 + + target_text = (target.by_text or "").lower().strip() + target_role = (target.by_role or "").lower().strip() + + best = 0.0 + for el in ui_elements: + score = 0.0 + el_label = getattr(el, "label", "") or "" + el_role = getattr(el, "role", "") or "" + el_type = getattr(el, "type", "") or "" + + if target_text: + if target_text == el_label.lower().strip(): + score = max(score, 1.0) + elif target_text in el_label.lower(): + score = max(score, 0.8) + + if target_role: + if target_role == el_role.lower() or target_role == el_type.lower(): + score = max(score, 0.9) + + if not target_text and not target_role and target.by_position: + # Si seule la position est fournie, on considère toujours match possible + score = 0.6 + + if score > best: + best = score + + # Si on n'a rien trouvé mais qu'un target est demandé → 0.0 (fort négatif) + if best == 0.0 and (target_text or target_role): + return 0.0 + + return best if best > 0 else 0.5 + + def _score_recency(self, edge: WorkflowEdge) -> float: + """ + Score de récence basé sur `edge.stats.last_executed`. + + Échelle : + - exécuté dans les dernières 24h : 1.0 + - exécuté dans les 7 derniers jours : 0.7 + - exécuté il y a plus longtemps : 0.3 + - jamais exécuté : 0.5 (neutre) + """ + if edge.stats is None or edge.stats.last_executed is None: + return 0.5 + + delta = datetime.now() - edge.stats.last_executed + seconds = delta.total_seconds() + if seconds < 24 * 3600: + return 1.0 + if seconds < 7 * 24 * 3600: + return 0.7 + return 0.3 diff --git a/tests/unit/test_edge_scorer.py b/tests/unit/test_edge_scorer.py new file mode 100644 index 000000000..f9453c314 --- /dev/null +++ b/tests/unit/test_edge_scorer.py @@ -0,0 +1,337 @@ +""" +Tests unitaires de l'EdgeScorer (C3). + +Couvre : + - Filtre dur : pre_conditions échouent → edge rejeté + - Ranking : edge avec success_rate le plus élevé gagne + - Tiebreak sur success_rate + - Retour None si aucun edge valide + - Target match via ui_elements + - Mode legacy strategy="first" +""" + +from __future__ import annotations + +from datetime import datetime, timedelta + +import pytest + +from core.models.screen_state import ( + ContextLevel, + EmbeddingRef, + PerceptionLevel, + RawLevel, + ScreenState, + WindowContext, +) +from core.models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures +from core.models.base_models import BBox +from core.models.workflow_graph import ( + Action, + EdgeConstraints, + EdgeStats, + PostConditions, + TargetSpec, + WorkflowEdge, +) +from core.pipeline.edge_scorer import EdgeScorer + + +# ----------------------------------------------------------------------------- +# Helpers +# ----------------------------------------------------------------------------- + + +def _make_edge( + edge_id: str, + by_text: str | None = None, + by_role: str | None = None, + success_rate: float | None = None, + execution_count: int = 0, + last_executed: datetime | None = None, + required_window_title: str | None = None, + required_app_name: str | None = None, + min_source_similarity: float = 0.80, +) -> WorkflowEdge: + stats = EdgeStats() + if success_rate is not None and execution_count > 0: + stats.execution_count = execution_count + stats.success_count = int(round(success_rate * execution_count)) + stats.failure_count = execution_count - stats.success_count + stats.last_executed = last_executed + + target = TargetSpec(by_text=by_text, by_role=by_role) + action = Action(type="mouse_click", target=target) + constraints = EdgeConstraints( + required_window_title=required_window_title or "", + required_app_name=required_app_name or "", + min_source_similarity=min_source_similarity, + ) + + return WorkflowEdge( + edge_id=edge_id, + from_node="n1", + to_node="n2", + action=action, + constraints=constraints, + post_conditions=PostConditions(), + stats=stats, + ) + + +def _make_ui_element( + element_id: str, label: str, role: str = "button", type_: str = "button" +) -> UIElement: + return UIElement( + element_id=element_id, + type=type_, + role=role, + bbox=BBox(x=0, y=0, width=100, height=30), + center=(50, 15), + label=label, + label_confidence=0.9, + embeddings=UIElementEmbeddings(), + visual_features=VisualFeatures( + dominant_color="#000", + has_icon=False, + shape="rectangle", + size_category="medium", + ), + confidence=0.9, + ) + + +def _make_state( + window_title: str = "Firefox", + app_name: str = "firefox", + detected_text: list[str] | None = None, + ui_elements: list[UIElement] | None = None, +) -> ScreenState: + return ScreenState( + screen_state_id="s1", + timestamp=datetime.now(), + session_id="sess", + window=WindowContext( + app_name=app_name, + window_title=window_title, + screen_resolution=[1920, 1080], + ), + raw=RawLevel(screenshot_path="", capture_method="t", file_size_bytes=0), + perception=PerceptionLevel( + embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512), + detected_text=detected_text or [], + text_detection_method="none", + confidence_avg=0.0, + ), + context=ContextLevel(), + ui_elements=ui_elements or [], + ) + + +# ----------------------------------------------------------------------------- +# Tests +# ----------------------------------------------------------------------------- + + +class TestEdgeScorerBasic: + + def test_returns_none_on_empty(self): + assert EdgeScorer().select_best([]) is None + + def test_single_edge_returned_when_no_constraints(self): + edge = _make_edge("e1") + state = _make_state() + assert EdgeScorer().select_best([edge], screen_state=state) == edge + + def test_strategy_first_returns_first_edge(self): + e1 = _make_edge("e1", success_rate=0.1, execution_count=10) + e2 = _make_edge("e2", success_rate=0.9, execution_count=10) + state = _make_state() + result = EdgeScorer().select_best( + [e1, e2], screen_state=state, strategy="first" + ) + assert result.edge_id == "e1" + + +class TestEdgeScorerFilter: + + def test_rejects_edge_with_wrong_window(self): + """Un edge exigeant un titre de fenêtre différent doit être rejeté.""" + e1 = _make_edge("e1", required_window_title="Chrome") + state = _make_state(window_title="Firefox") + result = EdgeScorer().select_best([e1], screen_state=state) + assert result is None + + def test_rejects_edge_with_wrong_app(self): + e1 = _make_edge("e1", required_app_name="chrome") + state = _make_state(app_name="firefox") + result = EdgeScorer().select_best([e1], screen_state=state) + assert result is None + + def test_keeps_valid_edge_when_one_rejected(self): + """Cas simple : 2 edges, un seul valide.""" + e_bad = _make_edge("e_bad", required_window_title="NopeApp") + e_ok = _make_edge("e_ok", required_window_title="Firefox") + state = _make_state(window_title="Firefox Browser") + result = EdgeScorer().select_best([e_bad, e_ok], screen_state=state) + assert result is not None + assert result.edge_id == "e_ok" + + +class TestEdgeScorerRanking: + + def test_higher_success_rate_wins(self): + """Cas : 2 edges valides, celui avec meilleur success_rate gagne.""" + e_low = _make_edge("e_low", success_rate=0.20, execution_count=20) + e_high = _make_edge("e_high", success_rate=0.95, execution_count=20) + state = _make_state() + result = EdgeScorer().select_best([e_low, e_high], screen_state=state) + assert result.edge_id == "e_high" + + def test_rank_returns_sorted_by_score(self): + e1 = _make_edge("e1", success_rate=0.3, execution_count=10) + e2 = _make_edge("e2", success_rate=0.9, execution_count=10) + e3 = _make_edge("e3", success_rate=0.6, execution_count=10) + state = _make_state() + ranked = EdgeScorer().rank([e1, e2, e3], screen_state=state) + ids = [s.edge.edge_id for s in ranked] + assert ids == ["e2", "e3", "e1"] + + def test_target_match_boost(self): + """Un edge qui match un UI element gagne face à un sans match.""" + e_match = _make_edge("e_match", by_text="Submit") + e_no_match = _make_edge("e_no_match", by_text="DoesNotExist") + ui = _make_ui_element("btn1", label="Submit") + state = _make_state(ui_elements=[ui]) + + ranked = EdgeScorer().rank([e_no_match, e_match], screen_state=state) + assert ranked[0].edge.edge_id == "e_match" + assert ranked[0].target_match > ranked[1].target_match + + def test_recency_bonus_for_recent_execution(self): + recent = _make_edge( + "recent", + success_rate=0.5, + execution_count=10, + last_executed=datetime.now() - timedelta(hours=1), + ) + old = _make_edge( + "old", + success_rate=0.5, + execution_count=10, + last_executed=datetime.now() - timedelta(days=30), + ) + scorer = EdgeScorer() + state = _make_state() + ranked = scorer.rank([old, recent], screen_state=state) + # Même success_rate, récence tranche → recent gagne + assert ranked[0].edge.edge_id == "recent" + + +class TestEdgeScorerNoValidEdge: + + def test_all_edges_rejected_returns_none(self): + e1 = _make_edge("e1", required_window_title="AppA") + e2 = _make_edge("e2", required_window_title="AppB") + state = _make_state(window_title="AppC") + assert EdgeScorer().select_best([e1, e2], screen_state=state) is None + + def test_no_screen_state_does_not_filter(self): + """Sans ScreenState, on ne peut pas évaluer les pre_conditions → laisser passer.""" + e1 = _make_edge("e1", required_window_title="StrictApp") + result = EdgeScorer().select_best([e1], screen_state=None) + assert result is not None + + +class TestEdgeScorerSourceSimilarity: + """Lot B — la contrainte `min_source_similarity` redevient effective.""" + + def test_min_source_similarity_pass(self): + """Edge accepté lorsque source_similarity >= min_source_similarity.""" + edge = _make_edge("e1", min_source_similarity=0.80) + state = _make_state() + result = EdgeScorer().select_best( + [edge], screen_state=state, source_similarity=0.90 + ) + assert result is not None + assert result.edge_id == "e1" + + def test_min_source_similarity_fail(self): + """Edge rejeté lorsque source_similarity < min_source_similarity. + + Ce test démontre concrètement que le filtre n'est plus désactivé + silencieusement (avant Lot B il recevait toujours 1.0 hardcodé). + """ + edge = _make_edge("e1", min_source_similarity=0.80) + state = _make_state() + result = EdgeScorer().select_best( + [edge], screen_state=state, source_similarity=0.50 + ) + assert result is None + + def test_min_source_similarity_default_is_pass_through(self): + """Défaut source_similarity=1.0 → aucun edge n'est rejeté pour ce motif.""" + edge = _make_edge("e1", min_source_similarity=0.99) + state = _make_state() + # Pas de source_similarity fournie → défaut 1.0 → edge accepté + result = EdgeScorer().select_best([edge], screen_state=state) + assert result is not None + + def test_tiebreak_unchanged_with_similarity(self): + """Avec similarité OK des deux côtés, le tiebreak sur success_rate + reste identique (pas de régression du comportement existant).""" + e_low = _make_edge( + "e_low", + success_rate=0.20, + execution_count=20, + min_source_similarity=0.70, + ) + e_high = _make_edge( + "e_high", + success_rate=0.95, + execution_count=20, + min_source_similarity=0.70, + ) + state = _make_state() + ranked = EdgeScorer().rank( + [e_low, e_high], screen_state=state, source_similarity=0.85 + ) + # Les deux passent le filtre, e_high gagne au success_rate + assert ranked[0].edge.edge_id == "e_high" + assert ranked[0].passed_preconditions is True + assert ranked[1].passed_preconditions is True + + def test_similarity_filters_before_ranking(self): + """Entre 2 edges, celui dont min_source_similarity est violée est rejeté + même s'il a un meilleur success_rate.""" + e_strict_high = _make_edge( + "e_strict_high", + success_rate=0.95, + execution_count=20, + min_source_similarity=0.90, + ) + e_loose_low = _make_edge( + "e_loose_low", + success_rate=0.30, + execution_count=20, + min_source_similarity=0.50, + ) + state = _make_state() + # Source similarity 0.70 → e_strict_high rejeté, e_loose_low accepté + result = EdgeScorer().select_best( + [e_strict_high, e_loose_low], + screen_state=state, + source_similarity=0.70, + ) + assert result is not None + assert result.edge_id == "e_loose_low" + + def test_score_edge_exposes_precondition_reason(self): + """Pour la télémétrie : la raison d'échec mentionne la similarité.""" + edge = _make_edge("e1", min_source_similarity=0.80) + state = _make_state() + score = EdgeScorer().score_edge( + edge, screen_state=state, source_similarity=0.40 + ) + assert score.passed_preconditions is False + assert "imilarité" in score.precondition_reason or "imilarite" in score.precondition_reason