feat(navigation): brique login visuel OCR-ancre + action navigate au replay
- core/navigation/ : visual_verifier (presence=OCR, role=VLM ancre sur tokens), grounding (OCR-anchor first, VLM fallback, cache coords valide par la vue), visual_login (verify_before/after, DETTE-023), action_resolver (pont runtime) - api_stream/replay_engine : dispatch action navigate server-side, never-fail -> needs_review, import depuis core.navigation (boot 5005 garanti) - 131 tests verts (wiring boot, e2e handler, unit modules) Chantier Qwen 01-02/07/2026, revue croisee Claude (plan deploy v2). Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
119
core/navigation/__init__.py
Normal file
119
core/navigation/__init__.py
Normal file
@@ -0,0 +1,119 @@
|
||||
"""Navigation brique — login visuel, recherche dossiers, vérification écran.
|
||||
|
||||
Modules :
|
||||
- visual_verifier : verify_before / verify_after chaque action (vision = validateur, OCR-ancré)
|
||||
- grounding : résolution visuelle d'éléments UI (OCR-anchor first, VLM fallback, coords cache)
|
||||
- visual_login : login form resolution + verification (DPI urgences default config)
|
||||
- action_resolver : pont navigation → runtime (coords normalisés, OCR/VLM adapters)
|
||||
|
||||
Pattern d'injection : VlmClient + OcrClient + OcrDetailedClient injectables
|
||||
"""
|
||||
|
||||
from .visual_verifier import verify_screen_match, ScreenMatchResult
|
||||
from .action_resolver import navigate_login, NavigateResult
|
||||
|
||||
__all__ = [
|
||||
"verify_screen_match",
|
||||
"ScreenMatchResult",
|
||||
"navigate_login",
|
||||
"NavigateResult",
|
||||
"_handle_navigate_action",
|
||||
]
|
||||
|
||||
# Handler pour replay_engine — importé par api_stream.py
|
||||
def _handle_navigate_action(
|
||||
action: dict,
|
||||
replay_state: dict,
|
||||
session_id: str,
|
||||
) -> bool:
|
||||
"""Handler serveur pour action navigate (branchement replay_engine).
|
||||
|
||||
Thin wrapper : résout coords du login form et les stocke dans
|
||||
replay_state["variables"] pour les actions type/click suivantes.
|
||||
|
||||
N'échoue jamais le replay — toute erreur → log + needs_review.
|
||||
"""
|
||||
import logging
|
||||
logger = logging.getLogger("navigation._handle_navigate_action")
|
||||
|
||||
params = action.get("parameters") or {}
|
||||
navigate_action = params.get("action", "login")
|
||||
|
||||
# Noms des variables output (configurable)
|
||||
login_var = (params.get("login_coords_var") or "navigate_login_coords").strip()
|
||||
password_var = (params.get("password_coords_var") or "navigate_password_coords").strip()
|
||||
submit_var = (params.get("submit_coords_var") or "navigate_submit_coords").strip()
|
||||
|
||||
variables = replay_state.setdefault("variables", {})
|
||||
|
||||
try:
|
||||
screenshot_path = ""
|
||||
# Résoudre screenshot depuis replay_state
|
||||
if "last_screenshot_path" in replay_state:
|
||||
screenshot_path = replay_state["last_screenshot_path"]
|
||||
elif "last_heartbeat" in replay_state:
|
||||
hb = replay_state["last_heartbeat"]
|
||||
screenshot_path = hb.get("screenshot_path", "") if isinstance(hb, dict) else ""
|
||||
|
||||
if not screenshot_path:
|
||||
logger.warning("navigate: no screenshot for session %s", session_id)
|
||||
variables[login_var] = {"error": "no_screenshot"}
|
||||
return False
|
||||
|
||||
# Dimensions écran (fallback 1920×1080)
|
||||
screen_width = replay_state.get("screen_width", 1920)
|
||||
screen_height = replay_state.get("screen_height", 1080)
|
||||
|
||||
# OCR/VLM clients — lazy import pour éviter circular dependency
|
||||
from core.llm import extract_grid_from_image
|
||||
from core.extraction.vlm_client import make_vllm_client
|
||||
from core.navigation.action_resolver import make_ocr_detailed_from_grid
|
||||
|
||||
ocr_detailed = make_ocr_detailed_from_grid(extract_grid_from_image)
|
||||
vlm_client = make_vllm_client()
|
||||
|
||||
# Config login
|
||||
from core.navigation.visual_login import LoginFormConfig, dpi_urgences_login_config
|
||||
config = dpi_urgences_login_config()
|
||||
if "login_field" in params:
|
||||
config = LoginFormConfig(
|
||||
login_field=params.get("login_field", config.login_field),
|
||||
password_field=params.get("password_field", config.password_field),
|
||||
submit_button=params.get("submit_button", config.submit_button),
|
||||
success_elements=params.get("success_elements", config.success_elements),
|
||||
context=params.get("context", config.context),
|
||||
)
|
||||
|
||||
# Orchestration navigate
|
||||
from core.navigation.action_resolver import navigate_login
|
||||
result = navigate_login(
|
||||
screenshot_path, config=config,
|
||||
ocr_client=ocr_detailed, vlm_client=vlm_client,
|
||||
screen_width=screen_width, screen_height=screen_height,
|
||||
)
|
||||
|
||||
# Stocker coords dans variables (format dict pour substitution)
|
||||
if result.login_coords:
|
||||
variables[login_var] = result.login_coords.to_dict()
|
||||
if result.password_coords:
|
||||
variables[password_var] = result.password_coords.to_dict()
|
||||
if result.submit_coords:
|
||||
variables[submit_var] = result.submit_coords.to_dict()
|
||||
|
||||
variables["navigate_result"] = {
|
||||
"all_resolved": result.all_resolved,
|
||||
"method": result.login_coords.method if result.login_coords else "",
|
||||
"error": result.error,
|
||||
}
|
||||
|
||||
if not result.all_resolved:
|
||||
logger.warning("navigate: incomplete — %s", result.error)
|
||||
return False
|
||||
|
||||
logger.info("navigate: login form resolved OK (method=%s)", result.login_coords.method if result.login_coords else "?")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("navigate: exception (%s) — needs_review", e)
|
||||
variables["navigate_result"] = {"all_resolved": False, "error": str(e)}
|
||||
return False
|
||||
205
core/navigation/action_resolver.py
Normal file
205
core/navigation/action_resolver.py
Normal file
@@ -0,0 +1,205 @@
|
||||
"""Action resolver — pont entre modules navigation et runtime replay.
|
||||
|
||||
Orchestre verify → ground → store coords pour le handler replay_engine.
|
||||
Convertit coords pixels → normalisé (x_pct/y_pct) pour le client Agent V1.
|
||||
|
||||
Architecture :
|
||||
- handler replay_engine = thin wrapper (appelle action_resolver)
|
||||
- action_resolver = bridge (adapte OCR/VLM runtime → interfaces navigation)
|
||||
- modules navigation = pure functions (ne connaissent pas le runtime)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from core.navigation.grounding import (
|
||||
BBox,
|
||||
CoordsCache,
|
||||
GroundedElement,
|
||||
OcrDetailedClient,
|
||||
OcrTokenInfo,
|
||||
ground_element,
|
||||
)
|
||||
from core.navigation.visual_login import (
|
||||
LoginFormConfig,
|
||||
LoginResolution,
|
||||
dpi_urgences_login_config,
|
||||
resolve_login_form,
|
||||
verify_login_visible,
|
||||
verify_login_success,
|
||||
)
|
||||
from core.navigation.visual_verifier import (
|
||||
OcrClient,
|
||||
ScreenMatchResult,
|
||||
VlmClient,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Dataclasses ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class NavigateCoords:
|
||||
"""Normalized coords for a grounded element — format Agent V1 client."""
|
||||
|
||||
x_pct: float # center x normalized [0-1]
|
||||
y_pct: float # center y normalized [0-1]
|
||||
bbox_pct: Optional[Tuple[float, float, float, float]] = None # (x1, y1, x2, y2) normalized
|
||||
method: str = "" # grounding method used
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
d = {"x_pct": self.x_pct, "y_pct": self.y_pct, "method": self.method}
|
||||
if self.bbox_pct:
|
||||
d["bbox_pct"] = list(self.bbox_pct)
|
||||
return d
|
||||
|
||||
|
||||
@dataclass
|
||||
class NavigateResult:
|
||||
"""Result of a navigate action — coords for each resolved field."""
|
||||
|
||||
login_coords: Optional[NavigateCoords] = None
|
||||
password_coords: Optional[NavigateCoords] = None
|
||||
submit_coords: Optional[NavigateCoords] = None
|
||||
all_resolved: bool = False
|
||||
pre_verify: Optional[ScreenMatchResult] = None
|
||||
post_verify: Optional[ScreenMatchResult] = None # set later by verify_after
|
||||
error: str = ""
|
||||
|
||||
|
||||
# ── Coordinate conversion ────────────────────────────────────────────
|
||||
|
||||
|
||||
def grounded_to_coords(
|
||||
element: GroundedElement,
|
||||
screen_width: int,
|
||||
screen_height: int,
|
||||
) -> NavigateCoords:
|
||||
"""Convert GroundedElement (pixels) to NavigateCoords (normalized pct)."""
|
||||
x_pct = element.center[0] / screen_width if screen_width else 0
|
||||
y_pct = element.center[1] / screen_height if screen_height else 0
|
||||
x1_pct = element.bbox[0] / screen_width if screen_width else 0
|
||||
y1_pct = element.bbox[1] / screen_height if screen_height else 0
|
||||
x2_pct = element.bbox[2] / screen_width if screen_width else 0
|
||||
y2_pct = element.bbox[3] / screen_height if screen_height else 0
|
||||
return NavigateCoords(
|
||||
x_pct=x_pct,
|
||||
y_pct=y_pct,
|
||||
bbox_pct=(x1_pct, y1_pct, x2_pct, y2_pct),
|
||||
method=element.method,
|
||||
)
|
||||
|
||||
|
||||
# ── OCR adapter ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def make_ocr_detailed_from_grid(
|
||||
grid_fn: Callable[[str], List[List[Dict[str, Any]]]],
|
||||
) -> OcrDetailedClient:
|
||||
"""Adapt extract_grid_from_image → OcrDetailedClient (List[OcrTokenInfo]).
|
||||
|
||||
Converts the grid format (list of rows of cells with bbox) into
|
||||
flat OcrTokenInfo list with normalized LTRB bbox.
|
||||
"""
|
||||
from core.extraction.role_mapper import tokens_from_grid
|
||||
|
||||
def client(image_path: str) -> List[OcrTokenInfo]:
|
||||
grid = grid_fn(image_path)
|
||||
ocr_tokens = tokens_from_grid(grid)
|
||||
return [
|
||||
OcrTokenInfo(
|
||||
text=t.text,
|
||||
bbox=t.bbox,
|
||||
confidence=t.confidence,
|
||||
)
|
||||
for t in ocr_tokens
|
||||
]
|
||||
|
||||
return client
|
||||
|
||||
|
||||
def make_ocr_simple_from_detailed(
|
||||
ocr_detailed: OcrDetailedClient,
|
||||
) -> OcrClient:
|
||||
"""Derive text-only OcrClient from OcrDetailedClient."""
|
||||
def client(image_path: str) -> List[str]:
|
||||
return [t.text for t in ocr_detailed(image_path)]
|
||||
return client
|
||||
|
||||
|
||||
# ── Navigate login orchestration ─────────────────────────────────────
|
||||
|
||||
|
||||
def navigate_login(
|
||||
screenshot_path: str,
|
||||
config: Optional[LoginFormConfig] = None,
|
||||
ocr_client: Optional[OcrDetailedClient] = None,
|
||||
vlm_client: Optional[VlmClient] = None,
|
||||
screen_width: int = 1920,
|
||||
screen_height: int = 1080,
|
||||
coords_cache: Optional[CoordsCache] = None,
|
||||
skip_pre_verify: bool = False,
|
||||
) -> NavigateResult:
|
||||
"""Orchestrate login navigation: verify → ground → convert coords.
|
||||
|
||||
Returns NavigateResult with normalized coords for each field.
|
||||
The handler stores these in replay_state variables for subsequent
|
||||
type/click actions.
|
||||
"""
|
||||
if config is None:
|
||||
config = dpi_urgences_login_config()
|
||||
|
||||
if ocr_client is None or vlm_client is None:
|
||||
return NavigateResult(
|
||||
all_resolved=False,
|
||||
error="ocr_client and vlm_client required",
|
||||
)
|
||||
|
||||
ocr_simple = make_ocr_simple_from_detailed(ocr_client)
|
||||
|
||||
# Step 1: Pre-verification (optional)
|
||||
pre_verify = None
|
||||
if not skip_pre_verify:
|
||||
pre_verify = verify_login_visible(
|
||||
screenshot_path, config, ocr_simple, vlm_client,
|
||||
)
|
||||
if not pre_verify.match:
|
||||
logger.warning("navigate_login: pre-verify failed — %s", pre_verify.describe())
|
||||
return NavigateResult(
|
||||
all_resolved=False,
|
||||
pre_verify=pre_verify,
|
||||
error=f"pre-verify failed: {pre_verify.describe()}",
|
||||
)
|
||||
|
||||
# Step 2: Ground all fields
|
||||
resolution = resolve_login_form(
|
||||
screenshot_path, config, ocr_client, vlm_client,
|
||||
screen_width=screen_width, screen_height=screen_height,
|
||||
coords_cache=coords_cache,
|
||||
)
|
||||
|
||||
if not resolution.all_resolved:
|
||||
logger.warning("navigate_login: incomplete resolution — %s", resolution.describe())
|
||||
return NavigateResult(
|
||||
all_resolved=False,
|
||||
pre_verify=pre_verify,
|
||||
error=f"incomplete resolution: {resolution.describe()}",
|
||||
)
|
||||
|
||||
# Step 3: Convert to normalized coords
|
||||
login_coords = grounded_to_coords(resolution.login_field, screen_width, screen_height) if resolution.login_field else None
|
||||
password_coords = grounded_to_coords(resolution.password_field, screen_width, screen_height) if resolution.password_field else None
|
||||
submit_coords = grounded_to_coords(resolution.submit_button, screen_width, screen_height) if resolution.submit_button else None
|
||||
|
||||
return NavigateResult(
|
||||
login_coords=login_coords,
|
||||
password_coords=password_coords,
|
||||
submit_coords=submit_coords,
|
||||
all_resolved=True,
|
||||
pre_verify=pre_verify,
|
||||
)
|
||||
375
core/navigation/grounding.py
Normal file
375
core/navigation/grounding.py
Normal file
@@ -0,0 +1,375 @@
|
||||
"""Grounding — résolution visuelle d'éléments UI → coords (bbox + center).
|
||||
|
||||
Architecture OCR-ancrée (alignée avec visual_verifier) :
|
||||
- STRATÉGIE 1 : OCR-anchor — si le texte cible est trouvé par OCR,
|
||||
utiliser le bbox du token OCR (déterministe, zero hallucination).
|
||||
- STRATÉGIE 2 : VLM grounder — si OCR ne trouve pas le texte,
|
||||
le VLM localise l'élément visuellement (fallback, risque contrôlé).
|
||||
- CACHE coords : mémorise les coords résolues, validées par vision avant usage.
|
||||
Si cached coords fail → re-résolution visuelle.
|
||||
|
||||
Coords = cache local validé par vue (Dom/Claude recadrage 01/07).
|
||||
Vision = source de vérité, coords = shortcut validé.
|
||||
|
||||
BBox format interne : LTRB (x1, y1, x2, y2) pixels absolus —
|
||||
cohérent avec SomElement, OcrToken, DetectedUIElement.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from core.navigation.visual_verifier import (
|
||||
fuzzy_match,
|
||||
normalize_text,
|
||||
OcrClient,
|
||||
VlmClient,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# BBox format: LTRB pixels (x1, y1, x2, y2)
|
||||
BBox = Tuple[int, int, int, int]
|
||||
|
||||
|
||||
# ── Dataclasses ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class OcrTokenInfo:
|
||||
"""OCR token with bounding box — for grounding (richer than text-only)."""
|
||||
|
||||
text: str
|
||||
bbox: Optional[BBox] = None # (x1, y1, x2, y2) LTRB pixels
|
||||
confidence: float = 1.0
|
||||
|
||||
|
||||
# Type alias — injectable OCR client returning tokens with bbox
|
||||
# More detailed than visual_verifier's OcrClient (which returns List[str])
|
||||
OcrDetailedClient = Callable[[str], List[OcrTokenInfo]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class GroundedElement:
|
||||
"""A UI element grounded on screen with coordinates."""
|
||||
|
||||
role: str
|
||||
text: str
|
||||
bbox: BBox # (x1, y1, x2, y2) LTRB pixels
|
||||
center: Tuple[int, int] # (cx, cy) — click target
|
||||
confidence: float
|
||||
method: str # "ocr_anchor" or "vlm_grounder" or "cache"
|
||||
source_ocr_text: str = "" # actual OCR text that matched (for fuzzy)
|
||||
|
||||
|
||||
@dataclass
|
||||
class CoordsCacheEntry:
|
||||
"""Cached coordinates for a UI element."""
|
||||
|
||||
element_key: str # "role:text"
|
||||
bbox: BBox
|
||||
center: Tuple[int, int]
|
||||
method: str # how it was originally resolved
|
||||
validation_count: int = 0
|
||||
|
||||
|
||||
class CoordsCache:
|
||||
"""In-memory cache of grounded coordinates.
|
||||
|
||||
Entries are validated by vision before use (verify_after).
|
||||
If cached coords fail verification → invalidate + re-resolve.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._entries: Dict[str, CoordsCacheEntry] = {}
|
||||
|
||||
def get(self, element_key: str) -> Optional[CoordsCacheEntry]:
|
||||
return self._entries.get(element_key)
|
||||
|
||||
def put(
|
||||
self,
|
||||
element_key: str,
|
||||
bbox: BBox,
|
||||
center: Tuple[int, int],
|
||||
method: str,
|
||||
) -> None:
|
||||
entry = self._entries.get(element_key)
|
||||
if entry:
|
||||
entry.bbox = bbox
|
||||
entry.center = center
|
||||
entry.method = method
|
||||
entry.validation_count += 1
|
||||
else:
|
||||
self._entries[element_key] = CoordsCacheEntry(
|
||||
element_key=element_key,
|
||||
bbox=bbox,
|
||||
center=center,
|
||||
method=method,
|
||||
validation_count=1,
|
||||
)
|
||||
|
||||
def invalidate(self, element_key: str) -> None:
|
||||
self._entries.pop(element_key, None)
|
||||
|
||||
def clear(self) -> None:
|
||||
self._entries.clear()
|
||||
|
||||
def keys(self) -> List[str]:
|
||||
return list(self._entries.keys())
|
||||
|
||||
|
||||
# ── Helper functions ─────────────────────────────────────────────────
|
||||
|
||||
|
||||
def bbox_center(bbox: BBox) -> Tuple[int, int]:
|
||||
"""Compute center point from LTRB bbox."""
|
||||
x1, y1, x2, y2 = bbox
|
||||
return ((x1 + x2) // 2, (y1 + y2) // 2)
|
||||
|
||||
|
||||
def make_element_key(role: str, text: str) -> str:
|
||||
"""Create a stable cache key from role + text."""
|
||||
return f"{role}:{normalize_text(text)}"
|
||||
|
||||
|
||||
# ── OCR-anchored grounding (deterministic) ───────────────────────────
|
||||
|
||||
|
||||
def ocr_anchor_ground(
|
||||
ocr_tokens: List[OcrTokenInfo],
|
||||
target: Dict[str, Any],
|
||||
fuzzy_threshold: float = 0.8,
|
||||
) -> Optional[GroundedElement]:
|
||||
"""Ground an element using OCR tokens with bbox (deterministic).
|
||||
|
||||
Finds the target text in OCR tokens via fuzzy match.
|
||||
Returns GroundedElement with bbox from the matching OCR token.
|
||||
"""
|
||||
target_text = target.get("text", "")
|
||||
target_role = target.get("role", "?")
|
||||
|
||||
if not target_text:
|
||||
return None
|
||||
|
||||
for token in ocr_tokens:
|
||||
if fuzzy_match(target_text, token.text, threshold=fuzzy_threshold):
|
||||
if token.bbox is None:
|
||||
continue # token found but no bbox → can't ground
|
||||
|
||||
return GroundedElement(
|
||||
role=target_role,
|
||||
text=target_text,
|
||||
bbox=token.bbox,
|
||||
center=bbox_center(token.bbox),
|
||||
confidence=token.confidence,
|
||||
method="ocr_anchor",
|
||||
source_ocr_text=token.text,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ── VLM grounder (fallback) ─────────────────────────────────────────
|
||||
|
||||
|
||||
def build_grounder_prompt(
|
||||
target: Dict[str, Any],
|
||||
context: str = "",
|
||||
) -> str:
|
||||
"""Build VLM prompt for locating a UI element on screen.
|
||||
|
||||
Asks for bounding box in normalized coordinates [0-1].
|
||||
"""
|
||||
role = target.get("role", "?")
|
||||
text = target.get("text", "")
|
||||
extra = target.get("extra", "")
|
||||
|
||||
prompt = (
|
||||
"You are a UI element locator. Find the specified element on this "
|
||||
"screenshot and return its bounding box.\n"
|
||||
)
|
||||
if context:
|
||||
prompt += f"Context: {context}\n"
|
||||
prompt += f"Target element: {role} with text \"{text}\""
|
||||
if extra:
|
||||
prompt += f" ({extra})"
|
||||
prompt += (
|
||||
"\n\nRespond in JSON format:\n"
|
||||
"{\"found\": true/false, "
|
||||
"\"bbox\": [x1_norm, y1_norm, x2_norm, y2_norm], "
|
||||
"\"confidence\": 0.0-1.0, "
|
||||
"\"description\": \"...\"}\n"
|
||||
"bbox coordinates are normalized [0.0-1.0] relative to image dimensions "
|
||||
"(x1=left, y1=top, x2=right, y2=bottom). "
|
||||
"Only return found=true if you can clearly locate the element."
|
||||
)
|
||||
return prompt
|
||||
|
||||
|
||||
def parse_grounder_response(
|
||||
vlm_text: str,
|
||||
screen_width: int,
|
||||
screen_height: int,
|
||||
target: Dict[str, Any],
|
||||
) -> Optional[GroundedElement]:
|
||||
"""Parse VLM grounder response into GroundedElement.
|
||||
|
||||
Converts normalized bbox [0-1] to absolute pixels.
|
||||
"""
|
||||
try:
|
||||
data = json.loads(vlm_text)
|
||||
except json.JSONDecodeError:
|
||||
json_match = re.search(r"\{[\s\S]*\}", vlm_text)
|
||||
if json_match:
|
||||
try:
|
||||
data = json.loads(json_match.group())
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("grounding: VLM response not parseable as JSON")
|
||||
return None
|
||||
else:
|
||||
return None
|
||||
|
||||
if not data.get("found", False):
|
||||
return None
|
||||
|
||||
bbox_norm = data.get("bbox", [])
|
||||
if not isinstance(bbox_norm, list) or len(bbox_norm) != 4:
|
||||
logger.warning("grounding: invalid bbox format from VLM")
|
||||
return None
|
||||
|
||||
# Convert normalized [0-1] to absolute pixels
|
||||
try:
|
||||
x1 = int(float(bbox_norm[0]) * screen_width)
|
||||
y1 = int(float(bbox_norm[1]) * screen_height)
|
||||
x2 = int(float(bbox_norm[2]) * screen_width)
|
||||
y2 = int(float(bbox_norm[3]) * screen_height)
|
||||
except (ValueError, TypeError):
|
||||
logger.warning("grounding: bbox values not numeric")
|
||||
return None
|
||||
|
||||
# Clamp to screen bounds
|
||||
x1 = max(0, min(x1, screen_width))
|
||||
y1 = max(0, min(y1, screen_height))
|
||||
x2 = max(x1, min(x2, screen_width))
|
||||
y2 = max(y1, min(y2, screen_height))
|
||||
|
||||
confidence = data.get("confidence", 0.5)
|
||||
if isinstance(confidence, str):
|
||||
try:
|
||||
confidence = float(confidence)
|
||||
except ValueError:
|
||||
confidence = 0.5
|
||||
|
||||
bbox_abs: BBox = (x1, y1, x2, y2)
|
||||
|
||||
return GroundedElement(
|
||||
role=target.get("role", "?"),
|
||||
text=target.get("text", ""),
|
||||
bbox=bbox_abs,
|
||||
center=bbox_center(bbox_abs),
|
||||
confidence=confidence,
|
||||
method="vlm_grounder",
|
||||
)
|
||||
|
||||
|
||||
# ── Core grounding function (composition) ───────────────────────────
|
||||
|
||||
|
||||
def ground_element(
|
||||
screenshot_path: str,
|
||||
target: Dict[str, Any],
|
||||
ocr_client: OcrDetailedClient,
|
||||
vlm_client: VlmClient,
|
||||
screen_width: int = 1920,
|
||||
screen_height: int = 1080,
|
||||
coords_cache: Optional[CoordsCache] = None,
|
||||
context: str = "",
|
||||
fuzzy_threshold: float = 0.8,
|
||||
) -> Optional[GroundedElement]:
|
||||
"""Ground a UI element on screen — OCR-anchor first, VLM fallback.
|
||||
|
||||
Resolution strategy:
|
||||
1. Cache: if cached coords exist → return cached (validated separately)
|
||||
2. OCR-anchor: deterministic, zero hallucination
|
||||
3. VLM grounder: fallback when OCR can't find the text
|
||||
|
||||
Args:
|
||||
screenshot_path: path to screenshot image
|
||||
target: {"role": "bouton", "text": "Connexion"} — element to find
|
||||
ocr_client: injectable OCR client returning List[OcrTokenInfo]
|
||||
vlm_client: injectable VLM client (image_path, prompt) -> text
|
||||
screen_width/height: screen dimensions for pixel conversion
|
||||
coords_cache: optional CoordsCache for memoization
|
||||
context: optional context (e.g. "page login DPI")
|
||||
fuzzy_threshold: fuzzy match threshold for OCR anchoring
|
||||
|
||||
Returns:
|
||||
GroundedElement with bbox + center, or None if not found
|
||||
"""
|
||||
target_text = target.get("text", "")
|
||||
target_role = target.get("role", "?")
|
||||
element_key = make_element_key(target_role, target_text)
|
||||
|
||||
# Step 0: Check cache
|
||||
if coords_cache:
|
||||
cached = coords_cache.get(element_key)
|
||||
if cached:
|
||||
cached.validation_count += 1
|
||||
logger.info("grounding: using cached coords for %s", element_key)
|
||||
return GroundedElement(
|
||||
role=target_role,
|
||||
text=target_text,
|
||||
bbox=cached.bbox,
|
||||
center=cached.center,
|
||||
confidence=1.0, # cached = previously validated
|
||||
method="cache",
|
||||
)
|
||||
|
||||
# Step 1: OCR-anchor (deterministic)
|
||||
try:
|
||||
ocr_tokens = ocr_client(screenshot_path)
|
||||
except Exception as e:
|
||||
logger.warning("grounding: OCR call failed (%s)", e)
|
||||
ocr_tokens = []
|
||||
|
||||
ocr_result = ocr_anchor_ground(ocr_tokens, target, fuzzy_threshold)
|
||||
|
||||
if ocr_result:
|
||||
if coords_cache:
|
||||
coords_cache.put(element_key, ocr_result.bbox, ocr_result.center, "ocr_anchor")
|
||||
logger.info(
|
||||
"grounding: OCR-anchor found '%s' (matched OCR='%s', conf=%.2f)",
|
||||
target_text, ocr_result.source_ocr_text, ocr_result.confidence,
|
||||
)
|
||||
return ocr_result
|
||||
|
||||
# Step 2: VLM grounder (fallback)
|
||||
if not target_text:
|
||||
logger.warning("grounding: no text for target, VLM grounder needs text")
|
||||
return None
|
||||
|
||||
prompt = build_grounder_prompt(target, context)
|
||||
|
||||
try:
|
||||
vlm_text = vlm_client(screenshot_path, prompt)
|
||||
except Exception as e:
|
||||
logger.warning("grounding: VLM grounder call failed (%s)", e)
|
||||
return None
|
||||
|
||||
vlm_result = parse_grounder_response(vlm_text, screen_width, screen_height, target)
|
||||
|
||||
if vlm_result:
|
||||
if coords_cache:
|
||||
coords_cache.put(element_key, vlm_result.bbox, vlm_result.center, "vlm_grounder")
|
||||
logger.info(
|
||||
"grounding: VLM grounder found '%s' (conf=%.2f)",
|
||||
target_text, vlm_result.confidence,
|
||||
)
|
||||
return vlm_result
|
||||
|
||||
logger.warning("grounding: element '%s' not found by OCR or VLM", target_text)
|
||||
return None
|
||||
227
core/navigation/visual_login.py
Normal file
227
core/navigation/visual_login.py
Normal file
@@ -0,0 +1,227 @@
|
||||
"""Visual login — résolution + vérification du formulaire de login par grounding.
|
||||
|
||||
Architecture (alignée visual_verifier + grounding) :
|
||||
- verify_before : formulaire login visible (champs + bouton présents)
|
||||
- resolve_login_form : ground chaque champ (login, password, bouton) → coords
|
||||
- verify_after : dashboard/accueil visible (post-login)
|
||||
- Chaque étape encadrée par vision (DETTE-023 couvert)
|
||||
|
||||
Coords = cache local validé par vue (Dom/Claude recadrage).
|
||||
Le runtime exécute les actions (type/click) — ce module résout + valide.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Callable, Dict, List, Optional, Tuple
|
||||
|
||||
from core.navigation.grounding import (
|
||||
BBox,
|
||||
CoordsCache,
|
||||
GroundedElement,
|
||||
OcrDetailedClient,
|
||||
OcrTokenInfo,
|
||||
ground_element,
|
||||
)
|
||||
from core.navigation.visual_verifier import (
|
||||
OcrClient,
|
||||
ScreenMatchResult,
|
||||
VlmClient,
|
||||
verify_before,
|
||||
verify_after,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Dataclasses ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoginFormConfig:
|
||||
"""Configuration for a login form — what to look for."""
|
||||
|
||||
login_field: Dict[str, Any] # {"role": "champ", "text": "Login"}
|
||||
password_field: Dict[str, Any] # {"role": "champ", "text": "Mot de passe"}
|
||||
submit_button: Dict[str, Any] # {"role": "bouton", "text": "Connexion"}
|
||||
success_elements: List[Dict[str, Any]] = field(default_factory=list)
|
||||
context: str = "" # e.g. "DPI urgences"
|
||||
|
||||
|
||||
@dataclass
|
||||
class LoginResolution:
|
||||
"""Result of login form resolution — grounded coords for each field."""
|
||||
|
||||
login_field: Optional[GroundedElement] = None
|
||||
password_field: Optional[GroundedElement] = None
|
||||
submit_button: Optional[GroundedElement] = None
|
||||
all_resolved: bool = False
|
||||
method: str = "" # "ocr_anchor", "vlm_grounder", "mixed", "cache"
|
||||
|
||||
def describe(self) -> str:
|
||||
parts = []
|
||||
if self.login_field:
|
||||
parts.append(f"login@{self.login_field.center} ({self.login_field.method})")
|
||||
else:
|
||||
parts.append("login: NOT FOUND")
|
||||
if self.password_field:
|
||||
parts.append(f"password@{self.password_field.center} ({self.password_field.method})")
|
||||
else:
|
||||
parts.append("password: NOT FOUND")
|
||||
if self.submit_button:
|
||||
parts.append(f"button@{self.submit_button.center} ({self.submit_button.method})")
|
||||
else:
|
||||
parts.append("button: NOT FOUND")
|
||||
status = "OK" if self.all_resolved else "INCOMPLETE"
|
||||
return f"Login resolution [{status}]: " + ", ".join(parts)
|
||||
|
||||
|
||||
# ── Default configs ──────────────────────────────────────────────────
|
||||
|
||||
|
||||
def dpi_urgences_login_config() -> LoginFormConfig:
|
||||
"""Default config for DPI urgences login form."""
|
||||
return LoginFormConfig(
|
||||
login_field={"role": "champ", "text": "Login", "extra": "champ identifiant"},
|
||||
password_field={"role": "champ", "text": "Mot de passe", "extra": "champ password"},
|
||||
submit_button={"role": "bouton", "text": "Connexion", "extra": "bouton submit"},
|
||||
success_elements=[
|
||||
{"role": "page", "text": "Accueil"},
|
||||
{"role": "page", "text": "Dashboard"},
|
||||
],
|
||||
context="DPI urgences — page login",
|
||||
)
|
||||
|
||||
|
||||
# ── Helper ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _ocr_detailed_to_simple(ocr_detailed: OcrDetailedClient) -> OcrClient:
|
||||
"""Convert OcrDetailedClient (text+bbox) to OcrClient (text-only) for verification."""
|
||||
def client(image_path: str) -> List[str]:
|
||||
return [t.text for t in ocr_detailed(image_path)]
|
||||
return client
|
||||
|
||||
|
||||
# ── Core functions ───────────────────────────────────────────────────
|
||||
|
||||
|
||||
def verify_login_visible(
|
||||
screenshot_path: str,
|
||||
config: LoginFormConfig,
|
||||
ocr_client: OcrClient,
|
||||
vlm_client: VlmClient,
|
||||
) -> ScreenMatchResult:
|
||||
"""Verify login form is visible on screen (pre-condition).
|
||||
|
||||
Checks that login field, password field, and submit button are present.
|
||||
Uses OCR-anchored verification (deterministic presence, VLM role).
|
||||
"""
|
||||
expected = [
|
||||
config.login_field,
|
||||
config.password_field,
|
||||
config.submit_button,
|
||||
]
|
||||
return verify_before(
|
||||
screenshot_path, expected, ocr_client, vlm_client,
|
||||
context=config.context,
|
||||
)
|
||||
|
||||
|
||||
def verify_login_success(
|
||||
screenshot_path: str,
|
||||
config: LoginFormConfig,
|
||||
ocr_client: OcrClient,
|
||||
vlm_client: VlmClient,
|
||||
) -> ScreenMatchResult:
|
||||
"""Verify dashboard/accueil visible after login (post-condition).
|
||||
|
||||
Higher threshold (verify_after = 0.8) — false positive = Léa proceeds wrong.
|
||||
"""
|
||||
if not config.success_elements:
|
||||
# No success criteria defined → can't verify
|
||||
return ScreenMatchResult(
|
||||
match=False,
|
||||
confidence=0.0,
|
||||
reason="no success_elements defined in config",
|
||||
)
|
||||
return verify_after(
|
||||
screenshot_path, config.success_elements, ocr_client, vlm_client,
|
||||
context=f"POST-LOGIN: {config.context}",
|
||||
)
|
||||
|
||||
|
||||
def resolve_login_form(
|
||||
screenshot_path: str,
|
||||
config: LoginFormConfig,
|
||||
ocr_client: OcrDetailedClient,
|
||||
vlm_client: VlmClient,
|
||||
screen_width: int = 1920,
|
||||
screen_height: int = 1080,
|
||||
coords_cache: Optional[CoordsCache] = None,
|
||||
) -> LoginResolution:
|
||||
"""Ground all login form elements → coords for runtime action.
|
||||
|
||||
Resolution strategy per element:
|
||||
1. Cache hit → return cached coords (validated separately)
|
||||
2. OCR-anchor → deterministic bbox from OCR token
|
||||
3. VLM grounder → fallback visual grounding
|
||||
|
||||
Returns LoginResolution with grounded coords for each field.
|
||||
Runtime uses these coords to type/click.
|
||||
"""
|
||||
login_el = ground_element(
|
||||
screenshot_path, config.login_field,
|
||||
ocr_client=ocr_client, vlm_client=vlm_client,
|
||||
screen_width=screen_width, screen_height=screen_height,
|
||||
coords_cache=coords_cache, context=config.context,
|
||||
)
|
||||
|
||||
password_el = ground_element(
|
||||
screenshot_path, config.password_field,
|
||||
ocr_client=ocr_client, vlm_client=vlm_client,
|
||||
screen_width=screen_width, screen_height=screen_height,
|
||||
coords_cache=coords_cache, context=config.context,
|
||||
)
|
||||
|
||||
button_el = ground_element(
|
||||
screenshot_path, config.submit_button,
|
||||
ocr_client=ocr_client, vlm_client=vlm_client,
|
||||
screen_width=screen_width, screen_height=screen_height,
|
||||
coords_cache=coords_cache, context=config.context,
|
||||
)
|
||||
|
||||
all_resolved = login_el is not None and password_el is not None and button_el is not None
|
||||
|
||||
# Determine overall method
|
||||
methods = []
|
||||
if login_el:
|
||||
methods.append(login_el.method)
|
||||
if password_el:
|
||||
methods.append(password_el.method)
|
||||
if button_el:
|
||||
methods.append(button_el.method)
|
||||
|
||||
unique_methods = set(methods)
|
||||
if len(unique_methods) == 1:
|
||||
method = unique_methods.pop()
|
||||
elif len(unique_methods) > 1:
|
||||
method = "mixed"
|
||||
else:
|
||||
method = ""
|
||||
|
||||
resolution = LoginResolution(
|
||||
login_field=login_el,
|
||||
password_field=password_el,
|
||||
submit_button=button_el,
|
||||
all_resolved=all_resolved,
|
||||
method=method,
|
||||
)
|
||||
|
||||
if all_resolved:
|
||||
logger.info("resolve_login_form: %s", resolution.describe())
|
||||
else:
|
||||
logger.warning("resolve_login_form: incomplete — %s", resolution.describe())
|
||||
|
||||
return resolution
|
||||
408
core/navigation/visual_verifier.py
Normal file
408
core/navigation/visual_verifier.py
Normal file
@@ -0,0 +1,408 @@
|
||||
"""Visual verifier — verify_before / verify_after avec ancrage OCR.
|
||||
|
||||
Architecture OCR-ancrée (challenge Claude 01/07, gate-vert 30/06) :
|
||||
- PRESENCE = tokens OCR (déterministe, pas d'hallucination possible)
|
||||
- RÔLE = VLM confirmation (semantic, ancré sur tokens OCR trouvés)
|
||||
- VLM ne décide JAMAIS de la présence d'un élément
|
||||
- Faux positif impossible par construction ; faux négatif = retry acceptable
|
||||
|
||||
Pattern d'injection : OcrClient + VlmClient injectables (tests sans réseau).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
import unicodedata
|
||||
from dataclasses import dataclass, field
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Any, Callable, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Type aliases — injectable callables for offline testing
|
||||
VlmClient = Callable[[str, str], str] # (image_path, prompt) -> text
|
||||
OcrClient = Callable[[str], List[str]] # (image_path) -> list of OCR text strings
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScreenMatchResult:
|
||||
"""Result of a screen verification check."""
|
||||
|
||||
match: bool
|
||||
confidence: float = 0.0
|
||||
reason: str = ""
|
||||
observed_elements: List[Dict[str, Any]] = field(default_factory=list)
|
||||
expected_elements: List[Dict[str, Any]] = field(default_factory=list)
|
||||
mismatches: List[str] = field(default_factory=list)
|
||||
|
||||
def describe(self) -> str:
|
||||
if self.match:
|
||||
return f"Screen match OK (conf={self.confidence:.2f})"
|
||||
parts = [f"Screen mismatch (conf={self.confidence:.2f})"]
|
||||
if self.mismatches:
|
||||
parts.append("missing: " + ", ".join(self.mismatches))
|
||||
if self.reason:
|
||||
parts.append(self.reason)
|
||||
return " | ".join(parts)
|
||||
|
||||
|
||||
# ── Text normalization (pure functions) ────────────────────────────────
|
||||
|
||||
|
||||
def normalize_text(text: str) -> str:
|
||||
"""Normalize text for fuzzy matching: lowercase, strip accents, collapse whitespace."""
|
||||
text = text.lower().strip()
|
||||
# Strip accents: é→e, è→e, ê→e, à→a, etc.
|
||||
text = unicodedata.normalize("NFKD", text)
|
||||
text = "".join(c for c in text if not unicodedata.combining(c))
|
||||
# Collapse whitespace
|
||||
text = re.sub(r"\s+", " ", text)
|
||||
return text
|
||||
|
||||
|
||||
def fuzzy_match(expected: str, observed: str, threshold: float = 0.8) -> bool:
|
||||
"""Check if observed text fuzzy-matches expected text.
|
||||
|
||||
Three strategies (any wins):
|
||||
1. Exact match after normalization
|
||||
2. Substring containment (either direction)
|
||||
3. SequenceMatcher ratio >= threshold
|
||||
"""
|
||||
norm_expected = normalize_text(expected)
|
||||
norm_observed = normalize_text(observed)
|
||||
|
||||
if norm_expected == norm_observed:
|
||||
return True
|
||||
|
||||
if norm_expected in norm_observed or norm_observed in norm_expected:
|
||||
return True
|
||||
|
||||
ratio = SequenceMatcher(None, norm_expected, norm_observed).ratio()
|
||||
return ratio >= threshold
|
||||
|
||||
|
||||
# ── OCR presence check (deterministic, no VLM) ──────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class OcrPresenceResult:
|
||||
"""Result of OCR-based presence check."""
|
||||
|
||||
found_texts: Dict[str, str] = field(default_factory=dict)
|
||||
missing: List[str] = field(default_factory=list)
|
||||
all_found: bool = False
|
||||
|
||||
@property
|
||||
def presence_ratio(self) -> float:
|
||||
if not self.found_texts:
|
||||
return 1.0
|
||||
found_count = sum(1 for v in self.found_texts.values() if v != "")
|
||||
return found_count / len(self.found_texts)
|
||||
|
||||
|
||||
def ocr_presence_check(
|
||||
ocr_tokens: List[str],
|
||||
expected_elements: List[Dict[str, Any]],
|
||||
fuzzy_threshold: float = 0.8,
|
||||
) -> OcrPresenceResult:
|
||||
"""Check presence of expected texts against OCR tokens (deterministic).
|
||||
|
||||
Pure function — no VLM call, zero hallucination risk.
|
||||
"""
|
||||
found_texts: Dict[str, str] = {}
|
||||
missing: List[str] = []
|
||||
|
||||
for el in expected_elements:
|
||||
expected_text = el.get("text", "")
|
||||
if not expected_text:
|
||||
found_texts[""] = ""
|
||||
continue
|
||||
|
||||
matched_ocr = ""
|
||||
for token in ocr_tokens:
|
||||
if fuzzy_match(expected_text, token, threshold=fuzzy_threshold):
|
||||
matched_ocr = token
|
||||
break
|
||||
|
||||
if matched_ocr:
|
||||
found_texts[expected_text] = matched_ocr
|
||||
else:
|
||||
found_texts[expected_text] = ""
|
||||
missing.append(f"{el.get('role', '?')}: {expected_text}")
|
||||
|
||||
all_found = len(missing) == 0
|
||||
return OcrPresenceResult(
|
||||
found_texts=found_texts,
|
||||
missing=missing,
|
||||
all_found=all_found,
|
||||
)
|
||||
|
||||
|
||||
# ── VLM role confirmation (semantic, anchored on found OCR texts) ────
|
||||
|
||||
|
||||
def build_role_confirm_prompt(
|
||||
found_elements: List[Dict[str, Any]],
|
||||
expected_elements: List[Dict[str, Any]],
|
||||
context: str = "",
|
||||
) -> str:
|
||||
"""Build VLM prompt for role confirmation of OCR-found elements.
|
||||
|
||||
VLM receives found texts and confirms their ROLE only — never presence.
|
||||
"""
|
||||
found_lines = []
|
||||
for i, el in enumerate(found_elements):
|
||||
matched_ocr = el.get("matched_ocr", "")
|
||||
expected_role = el.get("expected_role", "?")
|
||||
line = f"{i+1}. Text \"{matched_ocr}\" — expected role: {expected_role}"
|
||||
found_lines.append(line)
|
||||
|
||||
found_block = "\n".join(found_lines)
|
||||
|
||||
prompt = (
|
||||
"You are a screen role validator. OCR has confirmed these texts are "
|
||||
"present on the screen. Your job is ONLY to confirm their ROLE — "
|
||||
"do NOT re-declare whether they are present.\n"
|
||||
)
|
||||
if context:
|
||||
prompt += f"Context: {context}\n"
|
||||
prompt += (
|
||||
f"Found texts with expected roles:\n{found_block}\n\n"
|
||||
"Respond in JSON format:\n"
|
||||
"{\"confirmed\": [{\"index\": 1, \"role_confirmed\": true/false, "
|
||||
"\"actual_role\": \"...\", \"confidence\": 0.0-1.0}], "
|
||||
"\"overall_confidence\": 0.0-1.0}\n"
|
||||
"Only confirm role_confirmed=true if the text clearly plays the "
|
||||
"expected role (e.g., a button, not just a label with the same text)."
|
||||
)
|
||||
return prompt
|
||||
|
||||
|
||||
def parse_role_confirm_response(vlm_text: str) -> Dict[str, Any]:
|
||||
"""Parse VLM role confirmation JSON response."""
|
||||
try:
|
||||
data = json.loads(vlm_text)
|
||||
except json.JSONDecodeError:
|
||||
json_match = re.search(r"\{[\s\S]*\}", vlm_text)
|
||||
if json_match:
|
||||
try:
|
||||
data = json.loads(json_match.group())
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("role_confirm: VLM response not parseable as JSON")
|
||||
return {"confirmed": [], "overall_confidence": 0.0}
|
||||
else:
|
||||
return {"confirmed": [], "overall_confidence": 0.0}
|
||||
|
||||
confirmed = data.get("confirmed", [])
|
||||
overall_conf = data.get("overall_confidence", 0.0)
|
||||
if isinstance(overall_conf, str):
|
||||
try:
|
||||
overall_conf = float(overall_conf)
|
||||
except ValueError:
|
||||
overall_conf = 0.0
|
||||
|
||||
return {
|
||||
"confirmed": confirmed,
|
||||
"overall_confidence": float(overall_conf),
|
||||
}
|
||||
|
||||
|
||||
# ── Core verification (OCR-anchored composition) ────────────────────
|
||||
|
||||
|
||||
def verify_screen_match(
|
||||
screenshot_path: str,
|
||||
expected_elements: List[Dict[str, Any]],
|
||||
ocr_client: OcrClient,
|
||||
vlm_client: VlmClient,
|
||||
context: str = "",
|
||||
min_confidence: float = 0.7,
|
||||
) -> ScreenMatchResult:
|
||||
"""Verify screen state with OCR-anchored presence + VLM role confirmation.
|
||||
|
||||
Step 1: OCR screenshot → tokens → deterministic presence check
|
||||
Step 2: VLM confirms role of found elements (not presence!)
|
||||
|
||||
Eliminates VLM self-report hallucination for presence checks.
|
||||
"""
|
||||
if not expected_elements:
|
||||
return ScreenMatchResult(
|
||||
match=True,
|
||||
confidence=1.0,
|
||||
reason="no expected elements to verify",
|
||||
)
|
||||
|
||||
# Step 1: OCR presence check (deterministic)
|
||||
try:
|
||||
ocr_tokens = ocr_client(screenshot_path)
|
||||
except Exception as e:
|
||||
logger.warning("verify_screen_match: OCR call failed (%s)", e)
|
||||
return ScreenMatchResult(
|
||||
match=False,
|
||||
confidence=0.0,
|
||||
reason=f"OCR error: {e}",
|
||||
expected_elements=expected_elements,
|
||||
)
|
||||
|
||||
presence = ocr_presence_check(ocr_tokens, expected_elements)
|
||||
|
||||
if not presence.all_found:
|
||||
observed = []
|
||||
for el in expected_elements:
|
||||
text = el.get("text", "")
|
||||
matched = presence.found_texts.get(text, "")
|
||||
observed.append({
|
||||
"role": el.get("role", "?"),
|
||||
"expected_text": text,
|
||||
"matched_ocr": matched,
|
||||
"found": matched != "",
|
||||
})
|
||||
return ScreenMatchResult(
|
||||
match=False,
|
||||
confidence=presence.presence_ratio,
|
||||
reason="OCR presence check: some texts not found",
|
||||
observed_elements=observed,
|
||||
expected_elements=expected_elements,
|
||||
mismatches=presence.missing,
|
||||
)
|
||||
|
||||
# Step 2: VLM role confirmation (only for found elements)
|
||||
found_elements = []
|
||||
for el in expected_elements:
|
||||
text = el.get("text", "")
|
||||
matched_ocr = presence.found_texts.get(text, "")
|
||||
if text and matched_ocr:
|
||||
found_elements.append({
|
||||
"text": text,
|
||||
"expected_role": el.get("role", "?"),
|
||||
"matched_ocr": matched_ocr,
|
||||
})
|
||||
|
||||
if not found_elements:
|
||||
# All elements had no text → presence trivially OK
|
||||
return ScreenMatchResult(
|
||||
match=True,
|
||||
confidence=1.0,
|
||||
reason="no text-based elements to verify",
|
||||
expected_elements=expected_elements,
|
||||
)
|
||||
|
||||
prompt = build_role_confirm_prompt(found_elements, expected_elements, context)
|
||||
|
||||
try:
|
||||
vlm_text = vlm_client(screenshot_path, prompt)
|
||||
except Exception as e:
|
||||
logger.warning("verify_screen_match: VLM role confirm failed (%s)", e)
|
||||
observed = []
|
||||
for el in expected_elements:
|
||||
text = el.get("text", "")
|
||||
observed.append({
|
||||
"role": el.get("role", "?"),
|
||||
"expected_text": text,
|
||||
"matched_ocr": presence.found_texts.get(text, ""),
|
||||
"found": True,
|
||||
"role_confirmed": False,
|
||||
"role_confidence": 0.0,
|
||||
})
|
||||
return ScreenMatchResult(
|
||||
match=True,
|
||||
confidence=0.5,
|
||||
reason=f"OCR presence OK, VLM role confirm failed: {e}",
|
||||
observed_elements=observed,
|
||||
expected_elements=expected_elements,
|
||||
)
|
||||
|
||||
parsed = parse_role_confirm_response(vlm_text)
|
||||
overall_conf = parsed.get("overall_confidence", 0.0)
|
||||
confirmed = parsed.get("confirmed", [])
|
||||
|
||||
observed = []
|
||||
role_mismatches = []
|
||||
for i, el in enumerate(expected_elements):
|
||||
text = el.get("text", "")
|
||||
expected_role = el.get("role", "?")
|
||||
matched_ocr = presence.found_texts.get(text, "")
|
||||
|
||||
role_entry = None
|
||||
for c in confirmed:
|
||||
if c.get("index") == i + 1:
|
||||
role_entry = c
|
||||
break
|
||||
|
||||
role_confirmed = False
|
||||
actual_role = ""
|
||||
role_confidence = 0.0
|
||||
|
||||
if role_entry:
|
||||
role_confirmed = role_entry.get("role_confirmed", False)
|
||||
actual_role = role_entry.get("actual_role", "")
|
||||
role_confidence = role_entry.get("confidence", 0.0)
|
||||
if isinstance(role_confidence, str):
|
||||
try:
|
||||
role_confidence = float(role_confidence)
|
||||
except ValueError:
|
||||
role_confidence = 0.0
|
||||
|
||||
observed.append({
|
||||
"role": expected_role,
|
||||
"expected_text": text,
|
||||
"matched_ocr": matched_ocr,
|
||||
"found": True,
|
||||
"role_confirmed": role_confirmed,
|
||||
"actual_role": actual_role,
|
||||
"role_confidence": role_confidence,
|
||||
})
|
||||
|
||||
if not role_confirmed or role_confidence < min_confidence:
|
||||
role_mismatches.append(
|
||||
f"{expected_role}: {text} (actual={actual_role}, conf={role_confidence:.2f})"
|
||||
)
|
||||
|
||||
is_match = len(role_mismatches) == 0 and overall_conf >= min_confidence
|
||||
|
||||
return ScreenMatchResult(
|
||||
match=is_match,
|
||||
confidence=overall_conf,
|
||||
reason=f"OCR presence: {presence.presence_ratio:.0%}, VLM role: {overall_conf:.2f}",
|
||||
observed_elements=observed,
|
||||
expected_elements=expected_elements,
|
||||
mismatches=presence.missing + role_mismatches,
|
||||
)
|
||||
|
||||
|
||||
def verify_before(
|
||||
screenshot_path: str,
|
||||
expected_elements: List[Dict[str, Any]],
|
||||
ocr_client: OcrClient,
|
||||
vlm_client: VlmClient,
|
||||
context: str = "",
|
||||
) -> ScreenMatchResult:
|
||||
"""Verify screen state BEFORE an action (OCR-anchored).
|
||||
|
||||
Checks pre-conditions: expected texts present + roles correct.
|
||||
min_confidence=0.7 — some tolerance for pre-action verification.
|
||||
"""
|
||||
return verify_screen_match(
|
||||
screenshot_path, expected_elements, ocr_client, vlm_client,
|
||||
context=f"PRE-ACTION: {context}", min_confidence=0.7,
|
||||
)
|
||||
|
||||
|
||||
def verify_after(
|
||||
screenshot_path: str,
|
||||
expected_elements: List[Dict[str, Any]],
|
||||
ocr_client: OcrClient,
|
||||
vlm_client: VlmClient,
|
||||
context: str = "",
|
||||
) -> ScreenMatchResult:
|
||||
"""Verify screen state AFTER an action (OCR-anchored).
|
||||
|
||||
Checks post-conditions with higher threshold (0.8).
|
||||
False positive = Léa proceeds on wrong assumption → stricter gate.
|
||||
"""
|
||||
return verify_screen_match(
|
||||
screenshot_path, expected_elements, ocr_client, vlm_client,
|
||||
context=f"POST-ACTION: {context}", min_confidence=0.8,
|
||||
)
|
||||
Reference in New Issue
Block a user