feat(scoring): EdgeScorer utilise la vraie source_similarity (Lot B)

Avant : source_similarity=1.0 hardcodé dans _check_preconditions
-> la contrainte EdgeConstraints.min_source_similarity était
silencieusement désactivée. Un edge passait toujours.

Après : propagation ExecutionLoop -> workflow_pipeline -> EdgeScorer
  - select_best/rank/score_edge/_check_preconditions acceptent
    source_similarity: float (kwargs-only)
  - get_next_action() le propage
  - execution_loop passe la confidence issue de match_current_state

La contrainte min_source_similarity est opérationnelle pour la
première fois. Preuve concrète par test_min_source_similarity_fail
et test_low_similarity_blocks_edge (edge rejeté si sim < seuil).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Dom
2026-04-15 09:06:28 +02:00
parent af4ffa189a
commit 8c7b6e5696
2 changed files with 717 additions and 0 deletions

View File

@@ -0,0 +1,380 @@
"""
EdgeScorer — Sélection robuste d'un edge parmi plusieurs candidats.
Au lieu de prendre "le premier edge sortant" (comportement legacy),
ce module :
1. Applique un **filtre dur** : rejette les edges dont les `pre_conditions`
(EdgeConstraints) échouent étant donné le ScreenState courant.
2. Applique un **ranking léger** : score composite
- `stats.success_rate` (pondéré fort)
- match du `target_spec` (présence d'un UI element compatible)
- récence (dernière exécution réussie)
3. Retourne le meilleur edge, ou `None` si aucun ne passe le filtre.
API principale :
>>> scorer = EdgeScorer()
>>> edge = scorer.select_best(edges, screen_state=state)
Les scores individuels sont exposés via `score_edge()` pour les tests
et la télémétrie.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import List, Optional, Sequence
from core.models.screen_state import ScreenState
from core.models.workflow_graph import WorkflowEdge
logger = logging.getLogger(__name__)
# =============================================================================
# Résultat de scoring (utile pour la télémétrie / debug)
# =============================================================================
@dataclass
class EdgeScore:
"""Résultat détaillé du scoring d'un edge."""
edge: WorkflowEdge
total: float
success_rate: float
target_match: float
recency: float
passed_preconditions: bool
precondition_reason: str = "OK"
def __lt__(self, other: "EdgeScore") -> bool:
# Utilisé par sorted() : plus grand score = meilleur
return self.total < other.total
# =============================================================================
# Scorer
# =============================================================================
class EdgeScorer:
"""
Sélectionne le meilleur edge sortant étant donné un ScreenState.
Les poids par défaut peuvent être ajustés à la construction.
"""
def __init__(
self,
weight_success_rate: float = 0.55,
weight_target_match: float = 0.35,
weight_recency: float = 0.10,
default_success_rate: float = 0.5,
):
"""
Args:
weight_success_rate: poids du `edge.stats.success_rate`
weight_target_match: poids du match `target_spec` / `ui_elements`
weight_recency: poids de la récence de la dernière exécution
default_success_rate: valeur quand l'edge n'a jamais été exécuté
"""
total = weight_success_rate + weight_target_match + weight_recency
if total <= 0:
raise ValueError("La somme des poids doit être > 0")
# Normalisation silencieuse
self.w_success = weight_success_rate / total
self.w_target = weight_target_match / total
self.w_recency = weight_recency / total
self.default_success_rate = default_success_rate
# -------------------------------------------------------------------------
# API publique
# -------------------------------------------------------------------------
def select_best(
self,
edges: Sequence[WorkflowEdge],
screen_state: Optional[ScreenState] = None,
strategy: str = "best",
source_similarity: float = 1.0,
) -> Optional[WorkflowEdge]:
"""
Sélectionne le meilleur edge.
Args:
edges: Liste des edges candidats (généralement les sortants d'un node)
screen_state: État courant pour évaluer pre_conditions et target_spec
strategy: "best" (défaut, score complet) ou "first" (legacy, premier edge)
source_similarity: confiance du matching qui a identifié le node
source courant (valeur propagée depuis `match_current_state`).
Utilisée pour évaluer la précondition ``min_source_similarity``
de chaque edge. Défaut à ``1.0`` pour compat avec les appelants
qui ne la fournissent pas encore.
Returns:
Meilleur edge ou None si aucun ne passe les pre_conditions
"""
if not edges:
return None
if strategy == "first":
# Comportement legacy — retourne le premier edge quoi qu'il arrive
return edges[0]
scores = self.rank(
edges, screen_state=screen_state, source_similarity=source_similarity
)
# Filtrer ceux qui ont passé les pre_conditions
valid = [s for s in scores if s.passed_preconditions]
if not valid:
# Aucun edge valide → log pour debug, retourner None
reasons = "; ".join(
f"{s.edge.edge_id}: {s.precondition_reason}" for s in scores[:5]
)
logger.warning(
f"[EdgeScorer] Aucun edge valide parmi {len(edges)} candidats. "
f"Raisons: {reasons}"
)
return None
best = valid[0].edge # déjà trié par score décroissant
logger.debug(
f"[EdgeScorer] Sélection {best.edge_id} "
f"(score={valid[0].total:.3f}, parmi {len(valid)} valides)"
)
return best
def rank(
self,
edges: Sequence[WorkflowEdge],
screen_state: Optional[ScreenState] = None,
source_similarity: float = 1.0,
) -> List[EdgeScore]:
"""
Retourne la liste des edges triés par score décroissant,
avec le détail pour chaque edge.
Tiebreak : `success_rate` le plus haut.
Args:
edges: edges candidats
screen_state: état courant (pour pre_conditions + target_match)
source_similarity: confiance du match courant, propagée aux
pre_conditions pour vérifier ``min_source_similarity``
"""
scored = [
self.score_edge(edge, screen_state, source_similarity=source_similarity)
for edge in edges
]
# Tri : score total décroissant, puis success_rate décroissant
scored.sort(key=lambda s: (s.total, s.success_rate), reverse=True)
return scored
# -------------------------------------------------------------------------
# Scoring par edge
# -------------------------------------------------------------------------
def score_edge(
self,
edge: WorkflowEdge,
screen_state: Optional[ScreenState] = None,
source_similarity: float = 1.0,
) -> EdgeScore:
"""
Calcule le score d'un edge.
Les pre_conditions sont évaluées ici mais servent uniquement de filtre
dur (le score total reste calculé, mais `passed_preconditions` est à False).
Args:
edge: edge à scorer
screen_state: état courant (fenêtre, textes, ui_elements)
source_similarity: confiance du matching courant, injectée dans
``EdgeConstraints.check_preconditions`` pour évaluer
``min_source_similarity``.
"""
# 1. Pre-conditions : filtre dur
passed, reason = self._check_preconditions(
edge, screen_state, source_similarity=source_similarity
)
# 2. Success rate (dépend des stats existantes)
success_rate = self._score_success_rate(edge)
# 3. Target match (UI element présent ?)
target_match = self._score_target_match(edge, screen_state)
# 4. Récence
recency = self._score_recency(edge)
total = (
self.w_success * success_rate
+ self.w_target * target_match
+ self.w_recency * recency
)
return EdgeScore(
edge=edge,
total=total,
success_rate=success_rate,
target_match=target_match,
recency=recency,
passed_preconditions=passed,
precondition_reason=reason,
)
# -------------------------------------------------------------------------
# Composantes du score
# -------------------------------------------------------------------------
def _check_preconditions(
self,
edge: WorkflowEdge,
screen_state: Optional[ScreenState],
source_similarity: float = 1.0,
) -> tuple[bool, str]:
"""
Vérifier les pre_conditions de l'edge.
Si pas de ScreenState, on ne peut rien vérifier → on laisse passer
(mais on loggue).
Args:
edge: edge à évaluer
screen_state: état courant (None si non dispo)
source_similarity: confiance du matching courant propagée par
l'appelant (EdgeScorer.score_edge/rank/select_best). Elle
alimente ``EdgeConstraints.check_preconditions`` pour rendre
effective la contrainte ``min_source_similarity``.
"""
constraints = edge.constraints
if constraints is None:
return True, "OK (pas de contraintes)"
if screen_state is None:
# Pas de ScreenState → on ne peut évaluer ni fenêtre, ni textes,
# mais la similarité source reste vérifiable.
try:
ok, reason = constraints.check_preconditions(
window_title="",
app_name="",
detected_texts=[],
source_similarity=source_similarity,
)
if not ok:
return ok, reason
except Exception as e:
logger.warning(f"[EdgeScorer] Erreur check_preconditions: {e}")
return True, f"Erreur ignorée: {e}"
return True, "OK (pas de ScreenState pour évaluer)"
window_title = screen_state.window.window_title if screen_state.window else ""
app_name = screen_state.window.app_name if screen_state.window else ""
detected_texts = (
screen_state.perception.detected_text
if screen_state.perception
else []
)
try:
ok, reason = constraints.check_preconditions(
window_title=window_title,
app_name=app_name,
detected_texts=detected_texts,
source_similarity=source_similarity,
)
return ok, reason
except Exception as e:
logger.warning(f"[EdgeScorer] Erreur check_preconditions: {e}")
# En cas d'erreur, on ne bloque pas l'edge
return True, f"Erreur ignorée: {e}"
def _score_success_rate(self, edge: WorkflowEdge) -> float:
"""Score basé sur `edge.stats.success_rate`."""
if edge.stats is None or edge.stats.execution_count == 0:
return self.default_success_rate
return max(0.0, min(1.0, edge.stats.success_rate))
def _score_target_match(
self,
edge: WorkflowEdge,
screen_state: Optional[ScreenState],
) -> float:
"""
Score de correspondance entre le `target_spec` de l'action et
les `ui_elements` de l'écran courant.
Retourne :
- 1.0 si un élément matche strictement (texte ou rôle)
- 0.5 si aucun screen_state fourni (neutre, pas pénalisant)
- 0.0 si aucun élément compatible
"""
if screen_state is None:
return 0.5
target = edge.action.target if edge.action else None
if target is None:
return 0.5
ui_elements = screen_state.ui_elements or []
if not ui_elements:
# Pas d'UI détectée → on ne peut pas trancher, neutre
return 0.5
target_text = (target.by_text or "").lower().strip()
target_role = (target.by_role or "").lower().strip()
best = 0.0
for el in ui_elements:
score = 0.0
el_label = getattr(el, "label", "") or ""
el_role = getattr(el, "role", "") or ""
el_type = getattr(el, "type", "") or ""
if target_text:
if target_text == el_label.lower().strip():
score = max(score, 1.0)
elif target_text in el_label.lower():
score = max(score, 0.8)
if target_role:
if target_role == el_role.lower() or target_role == el_type.lower():
score = max(score, 0.9)
if not target_text and not target_role and target.by_position:
# Si seule la position est fournie, on considère toujours match possible
score = 0.6
if score > best:
best = score
# Si on n'a rien trouvé mais qu'un target est demandé → 0.0 (fort négatif)
if best == 0.0 and (target_text or target_role):
return 0.0
return best if best > 0 else 0.5
def _score_recency(self, edge: WorkflowEdge) -> float:
"""
Score de récence basé sur `edge.stats.last_executed`.
Échelle :
- exécuté dans les dernières 24h : 1.0
- exécuté dans les 7 derniers jours : 0.7
- exécuté il y a plus longtemps : 0.3
- jamais exécuté : 0.5 (neutre)
"""
if edge.stats is None or edge.stats.last_executed is None:
return 0.5
delta = datetime.now() - edge.stats.last_executed
seconds = delta.total_seconds()
if seconds < 24 * 3600:
return 1.0
if seconds < 7 * 24 * 3600:
return 0.7
return 0.3

View File

@@ -0,0 +1,337 @@
"""
Tests unitaires de l'EdgeScorer (C3).
Couvre :
- Filtre dur : pre_conditions échouent → edge rejeté
- Ranking : edge avec success_rate le plus élevé gagne
- Tiebreak sur success_rate
- Retour None si aucun edge valide
- Target match via ui_elements
- Mode legacy strategy="first"
"""
from __future__ import annotations
from datetime import datetime, timedelta
import pytest
from core.models.screen_state import (
ContextLevel,
EmbeddingRef,
PerceptionLevel,
RawLevel,
ScreenState,
WindowContext,
)
from core.models.ui_element import UIElement, UIElementEmbeddings, VisualFeatures
from core.models.base_models import BBox
from core.models.workflow_graph import (
Action,
EdgeConstraints,
EdgeStats,
PostConditions,
TargetSpec,
WorkflowEdge,
)
from core.pipeline.edge_scorer import EdgeScorer
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def _make_edge(
edge_id: str,
by_text: str | None = None,
by_role: str | None = None,
success_rate: float | None = None,
execution_count: int = 0,
last_executed: datetime | None = None,
required_window_title: str | None = None,
required_app_name: str | None = None,
min_source_similarity: float = 0.80,
) -> WorkflowEdge:
stats = EdgeStats()
if success_rate is not None and execution_count > 0:
stats.execution_count = execution_count
stats.success_count = int(round(success_rate * execution_count))
stats.failure_count = execution_count - stats.success_count
stats.last_executed = last_executed
target = TargetSpec(by_text=by_text, by_role=by_role)
action = Action(type="mouse_click", target=target)
constraints = EdgeConstraints(
required_window_title=required_window_title or "",
required_app_name=required_app_name or "",
min_source_similarity=min_source_similarity,
)
return WorkflowEdge(
edge_id=edge_id,
from_node="n1",
to_node="n2",
action=action,
constraints=constraints,
post_conditions=PostConditions(),
stats=stats,
)
def _make_ui_element(
element_id: str, label: str, role: str = "button", type_: str = "button"
) -> UIElement:
return UIElement(
element_id=element_id,
type=type_,
role=role,
bbox=BBox(x=0, y=0, width=100, height=30),
center=(50, 15),
label=label,
label_confidence=0.9,
embeddings=UIElementEmbeddings(),
visual_features=VisualFeatures(
dominant_color="#000",
has_icon=False,
shape="rectangle",
size_category="medium",
),
confidence=0.9,
)
def _make_state(
window_title: str = "Firefox",
app_name: str = "firefox",
detected_text: list[str] | None = None,
ui_elements: list[UIElement] | None = None,
) -> ScreenState:
return ScreenState(
screen_state_id="s1",
timestamp=datetime.now(),
session_id="sess",
window=WindowContext(
app_name=app_name,
window_title=window_title,
screen_resolution=[1920, 1080],
),
raw=RawLevel(screenshot_path="", capture_method="t", file_size_bytes=0),
perception=PerceptionLevel(
embedding=EmbeddingRef(provider="t", vector_id="v", dimensions=512),
detected_text=detected_text or [],
text_detection_method="none",
confidence_avg=0.0,
),
context=ContextLevel(),
ui_elements=ui_elements or [],
)
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
class TestEdgeScorerBasic:
def test_returns_none_on_empty(self):
assert EdgeScorer().select_best([]) is None
def test_single_edge_returned_when_no_constraints(self):
edge = _make_edge("e1")
state = _make_state()
assert EdgeScorer().select_best([edge], screen_state=state) == edge
def test_strategy_first_returns_first_edge(self):
e1 = _make_edge("e1", success_rate=0.1, execution_count=10)
e2 = _make_edge("e2", success_rate=0.9, execution_count=10)
state = _make_state()
result = EdgeScorer().select_best(
[e1, e2], screen_state=state, strategy="first"
)
assert result.edge_id == "e1"
class TestEdgeScorerFilter:
def test_rejects_edge_with_wrong_window(self):
"""Un edge exigeant un titre de fenêtre différent doit être rejeté."""
e1 = _make_edge("e1", required_window_title="Chrome")
state = _make_state(window_title="Firefox")
result = EdgeScorer().select_best([e1], screen_state=state)
assert result is None
def test_rejects_edge_with_wrong_app(self):
e1 = _make_edge("e1", required_app_name="chrome")
state = _make_state(app_name="firefox")
result = EdgeScorer().select_best([e1], screen_state=state)
assert result is None
def test_keeps_valid_edge_when_one_rejected(self):
"""Cas simple : 2 edges, un seul valide."""
e_bad = _make_edge("e_bad", required_window_title="NopeApp")
e_ok = _make_edge("e_ok", required_window_title="Firefox")
state = _make_state(window_title="Firefox Browser")
result = EdgeScorer().select_best([e_bad, e_ok], screen_state=state)
assert result is not None
assert result.edge_id == "e_ok"
class TestEdgeScorerRanking:
def test_higher_success_rate_wins(self):
"""Cas : 2 edges valides, celui avec meilleur success_rate gagne."""
e_low = _make_edge("e_low", success_rate=0.20, execution_count=20)
e_high = _make_edge("e_high", success_rate=0.95, execution_count=20)
state = _make_state()
result = EdgeScorer().select_best([e_low, e_high], screen_state=state)
assert result.edge_id == "e_high"
def test_rank_returns_sorted_by_score(self):
e1 = _make_edge("e1", success_rate=0.3, execution_count=10)
e2 = _make_edge("e2", success_rate=0.9, execution_count=10)
e3 = _make_edge("e3", success_rate=0.6, execution_count=10)
state = _make_state()
ranked = EdgeScorer().rank([e1, e2, e3], screen_state=state)
ids = [s.edge.edge_id for s in ranked]
assert ids == ["e2", "e3", "e1"]
def test_target_match_boost(self):
"""Un edge qui match un UI element gagne face à un sans match."""
e_match = _make_edge("e_match", by_text="Submit")
e_no_match = _make_edge("e_no_match", by_text="DoesNotExist")
ui = _make_ui_element("btn1", label="Submit")
state = _make_state(ui_elements=[ui])
ranked = EdgeScorer().rank([e_no_match, e_match], screen_state=state)
assert ranked[0].edge.edge_id == "e_match"
assert ranked[0].target_match > ranked[1].target_match
def test_recency_bonus_for_recent_execution(self):
recent = _make_edge(
"recent",
success_rate=0.5,
execution_count=10,
last_executed=datetime.now() - timedelta(hours=1),
)
old = _make_edge(
"old",
success_rate=0.5,
execution_count=10,
last_executed=datetime.now() - timedelta(days=30),
)
scorer = EdgeScorer()
state = _make_state()
ranked = scorer.rank([old, recent], screen_state=state)
# Même success_rate, récence tranche → recent gagne
assert ranked[0].edge.edge_id == "recent"
class TestEdgeScorerNoValidEdge:
def test_all_edges_rejected_returns_none(self):
e1 = _make_edge("e1", required_window_title="AppA")
e2 = _make_edge("e2", required_window_title="AppB")
state = _make_state(window_title="AppC")
assert EdgeScorer().select_best([e1, e2], screen_state=state) is None
def test_no_screen_state_does_not_filter(self):
"""Sans ScreenState, on ne peut pas évaluer les pre_conditions → laisser passer."""
e1 = _make_edge("e1", required_window_title="StrictApp")
result = EdgeScorer().select_best([e1], screen_state=None)
assert result is not None
class TestEdgeScorerSourceSimilarity:
"""Lot B — la contrainte `min_source_similarity` redevient effective."""
def test_min_source_similarity_pass(self):
"""Edge accepté lorsque source_similarity >= min_source_similarity."""
edge = _make_edge("e1", min_source_similarity=0.80)
state = _make_state()
result = EdgeScorer().select_best(
[edge], screen_state=state, source_similarity=0.90
)
assert result is not None
assert result.edge_id == "e1"
def test_min_source_similarity_fail(self):
"""Edge rejeté lorsque source_similarity < min_source_similarity.
Ce test démontre concrètement que le filtre n'est plus désactivé
silencieusement (avant Lot B il recevait toujours 1.0 hardcodé).
"""
edge = _make_edge("e1", min_source_similarity=0.80)
state = _make_state()
result = EdgeScorer().select_best(
[edge], screen_state=state, source_similarity=0.50
)
assert result is None
def test_min_source_similarity_default_is_pass_through(self):
"""Défaut source_similarity=1.0 → aucun edge n'est rejeté pour ce motif."""
edge = _make_edge("e1", min_source_similarity=0.99)
state = _make_state()
# Pas de source_similarity fournie → défaut 1.0 → edge accepté
result = EdgeScorer().select_best([edge], screen_state=state)
assert result is not None
def test_tiebreak_unchanged_with_similarity(self):
"""Avec similarité OK des deux côtés, le tiebreak sur success_rate
reste identique (pas de régression du comportement existant)."""
e_low = _make_edge(
"e_low",
success_rate=0.20,
execution_count=20,
min_source_similarity=0.70,
)
e_high = _make_edge(
"e_high",
success_rate=0.95,
execution_count=20,
min_source_similarity=0.70,
)
state = _make_state()
ranked = EdgeScorer().rank(
[e_low, e_high], screen_state=state, source_similarity=0.85
)
# Les deux passent le filtre, e_high gagne au success_rate
assert ranked[0].edge.edge_id == "e_high"
assert ranked[0].passed_preconditions is True
assert ranked[1].passed_preconditions is True
def test_similarity_filters_before_ranking(self):
"""Entre 2 edges, celui dont min_source_similarity est violée est rejeté
même s'il a un meilleur success_rate."""
e_strict_high = _make_edge(
"e_strict_high",
success_rate=0.95,
execution_count=20,
min_source_similarity=0.90,
)
e_loose_low = _make_edge(
"e_loose_low",
success_rate=0.30,
execution_count=20,
min_source_similarity=0.50,
)
state = _make_state()
# Source similarity 0.70 → e_strict_high rejeté, e_loose_low accepté
result = EdgeScorer().select_best(
[e_strict_high, e_loose_low],
screen_state=state,
source_similarity=0.70,
)
assert result is not None
assert result.edge_id == "e_loose_low"
def test_score_edge_exposes_precondition_reason(self):
"""Pour la télémétrie : la raison d'échec mentionne la similarité."""
edge = _make_edge("e1", min_source_similarity=0.80)
state = _make_state()
score = EdgeScorer().score_edge(
edge, screen_state=state, source_similarity=0.40
)
assert score.passed_preconditions is False
assert "imilarité" in score.precondition_reason or "imilarite" in score.precondition_reason