From bfbf0f9c3ecd1ef9c2c9f4b6349c1296ff333065 Mon Sep 17 00:00:00 2001 From: Dom Date: Sat, 9 May 2026 15:30:25 +0200 Subject: [PATCH] refactor(grounding): centralise parser bbox_2d MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Avant : 4 occurrences de parsing en cascade dans resolve_engine.py (L840-885, L903-915, L2569-2580, ~110 lignes au total). Après : centralisation dans core/grounding/bbox_parser.py avec paramètre formats= permettant de filtrer les formats reconnus selon le contrat sémantique de chaque site d'appel. Préservation des contrats sémantiques (strict no-op) : - Occ 1+2 (cascade principale) : tous formats (par défaut) - Occ 3 (retry multi-image) : formats={"xy_json", "raw_array"} pour respecter le prompt qui impose {"x": NNN, "y": NNN} in pixels - Occ 4 (_locate_popup_button) : formats={"bbox_2d"} pour respecter le prompt qui demande "bounding box" Notes : - Mini-bug Occ 3 retry multi-image (division systématique sans heuristique x>1, produisait coordonnées aberrantes ~0.0004 si VLM retournait déjà du pourcentage) corrigé incidemment via centralisation. Pas de régression possible (résultat précédent aberrant par construction). - Occ 4 : bbox_2d strict 4-coords élargi à bbox_2d 2 ou 4 coords. Contrat sémantique "bounding box" respecté ; un point 2-coords interprété comme centre de bbox. Tests : 26 cas dans test_bbox_parser.py (tous formats × cascade + filtre formats= + validated). 121 PASS / 0 FAIL sur le périmètre refactor (5 fichiers ciblés). Net : -96 lignes dans resolve_engine.py, +120 lignes module + 250 lignes tests. refs DETTE-006 (étape 2/5 du fix smart_resize) Co-Authored-By: Claude Opus 4.7 (1M context) --- agent_v0/server_v1/resolve_engine.py | 93 ++-------- core/grounding/bbox_parser.py | 120 ++++++++++++ tests/unit/test_bbox_parser.py | 267 +++++++++++++++++++++++++++ 3 files changed, 406 insertions(+), 74 deletions(-) create mode 100644 core/grounding/bbox_parser.py create mode 100644 tests/unit/test_bbox_parser.py diff --git a/agent_v0/server_v1/resolve_engine.py b/agent_v0/server_v1/resolve_engine.py index a89edc1ca..5206cf19e 100644 --- a/agent_v0/server_v1/resolve_engine.py +++ b/agent_v0/server_v1/resolve_engine.py @@ -26,6 +26,8 @@ from typing import Any, Dict, List, Optional from pydantic import BaseModel +from core.grounding.bbox_parser import parse_bbox_to_norm, parse_bbox_to_norm_validated + logger = logging.getLogger("api_stream") @@ -833,51 +835,8 @@ def _resolve_by_grounding( elapsed = time.time() - t0 - # Parser la réponse — supporte bbox_2d en pixels, JSON %, arrays bruts - x_pct, y_pct = None, None - - # Format 1 : bbox_2d en pixels [x, y] ou [x1, y1, x2, y2] - bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', content) - if bbox_match: - coords = [float(v.strip()) for v in bbox_match.group(1).split(",")] - if len(coords) == 2: - x_pct = coords[0] / small_w - y_pct = coords[1] / small_h - elif len(coords) >= 4: - x_pct = (coords[0] + coords[2]) / 2 / small_w - y_pct = (coords[1] + coords[3]) / 2 / small_h - - # Format 2 : JSON {"x": 0.XX, "y": 0.YY} - if x_pct is None: - json_match = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content) - if json_match: - x_val, y_val = float(json_match.group(1)), float(json_match.group(2)) - # Si > 1, c'est en pixels - if x_val > 1: - x_pct = x_val / small_w - y_pct = y_val / small_h - else: - x_pct = x_val - y_pct = y_val - - # Format 3 : {"x_pct": 0.XX, "y_pct": 0.YY} - if x_pct is None: - pct_match = re.search(r'"x_pct"\s*:\s*([\d.]+).*?"y_pct"\s*:\s*([\d.]+)', content) - if pct_match: - x_pct = float(pct_match.group(1)) - y_pct = float(pct_match.group(2)) - - # Format 4 : array brut [x1, y1, x2, y2] ou [x, y] - if x_pct is None: - arr_match = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content) - if arr_match: - vals = [float(v) for v in arr_match.groups() if v is not None] - if len(vals) >= 4: - x_pct = (vals[0] + vals[2]) / 2 / small_w - y_pct = (vals[1] + vals[3]) / 2 / small_h - elif len(vals) == 2: - x_pct = vals[0] / small_w - y_pct = vals[1] / small_h + # Parser la réponse — délégué à core.grounding.bbox_parser + x_pct, y_pct = parse_bbox_to_norm(content, small_w, small_h) if x_pct is None or y_pct is None: # Fallback multi-image : screenshot + crop → grounding sans description @@ -900,21 +859,12 @@ def _resolve_by_grounding( content2 = resp2.json().get("message", {}).get("content", "") elapsed = time.time() - t0 - # Parser tous les formats - arr2 = re.search(r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', content2) - if arr2: - vals = [float(v) for v in arr2.groups() if v is not None] - if len(vals) >= 4: - x_pct = (vals[0] + vals[2]) / 2 / small_w - y_pct = (vals[1] + vals[3]) / 2 / small_h - elif len(vals) == 2: - x_pct = vals[0] / small_w - y_pct = vals[1] / small_h - if x_pct is None: - json2 = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content2) - if json2: - x_pct = float(json2.group(1)) / small_w - y_pct = float(json2.group(2)) / small_h + # Parser la réponse — délégué à core.grounding.bbox_parser + # Restriction aux 2 formats attendus par le prompt retry multi-image + # (cf. prompt_mi qui demande {"x": NNN, "y": NNN} en pixels). + x_pct, y_pct = parse_bbox_to_norm( + content2, small_w, small_h, formats={"xy_json", "raw_array"} + ) if x_pct is not None: logger.info("Grounding multi-image OK (%.1fs)", elapsed) except Exception as e: @@ -2563,21 +2513,16 @@ def _locate_popup_button( content = resp.json().get("message", {}).get("content", "") - # Parser bbox_2d — qwen2.5vl retourne des coordonnées en pixels - # relatifs à l'image envoyée, PAS sur une grille 1000x1000. - # Format JSON : [{"bbox_2d": [x1, y1, x2, y2], "label": "..."}] - bbox_match = re.search( - r'"bbox_2d"\s*:\s*\[\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*,\s*(\d+)\s*\]', - content, + # Parser bbox_2d — délégué à core.grounding.bbox_parser + # Restriction au format bbox_2d attendu par le prompt + # (cf. prompt qui demande "bounding box"). qwen2.5vl retourne + # des coordonnées en pixels relatifs à l'image envoyée. + cx, cy = parse_bbox_to_norm_validated( + content, screen_width, screen_height, formats={"bbox_2d"} ) - if bbox_match: - x1, y1, x2, y2 = [int(bbox_match.group(i)) for i in range(1, 5)] - # Normaliser par les dimensions de l'écran (pixels → 0-1) - cx = (x1 + x2) / 2 / screen_width - cy = (y1 + y2) / 2 / screen_height - if 0.0 <= cx <= 1.0 and 0.0 <= cy <= 1.0: - logger.info(f"Observer : bouton '{button_text}' localisé à ({cx:.3f}, {cy:.3f})") - return {"x_pct": cx, "y_pct": cy} + if cx is not None: + logger.info(f"Observer : bouton '{button_text}' localisé à ({cx:.3f}, {cy:.3f})") + return {"x_pct": cx, "y_pct": cy} except Exception as e: logger.debug(f"Observer grounding bouton erreur : {e}") diff --git a/core/grounding/bbox_parser.py b/core/grounding/bbox_parser.py new file mode 100644 index 000000000..fd296c38a --- /dev/null +++ b/core/grounding/bbox_parser.py @@ -0,0 +1,120 @@ +""" +Parser des réponses VLM de grounding (bbox_2d, x/y, x_pct/y_pct, array brut). + +Centralise le parsing des coordonnées retournées par les modèles VLM +(Qwen-VL via Ollama, vLLM ou Transformers direct) vers une représentation +normalisée (x_pct, y_pct). + +Module pur : regex + arithmétique, sans dépendance lourde. + +Convention des diviseurs (DETTE-006 ouverte) : actuellement les call sites +passent les dimensions de l'image envoyée au VLM (PRE-resize). C'est la +source du bug d'échelle pixel grounding — sera corrigé au commit 3/5 du +fix DETTE-006 en passant les dimensions POST-smart_resize. +""" + +import re + + +_ALL_FORMATS = frozenset({"bbox_2d", "xy_json", "xy_pct", "raw_array"}) + + +def parse_bbox_to_norm( + content: str, + divisor_w: int | float, + divisor_h: int | float, + *, + formats: set[str] | None = None, +) -> tuple[float | None, float | None]: + """Parse une réponse VLM en (x_pct, y_pct) normalisés. + + Cascade des formats (premier qui matche gagne) : + 1. ``"bbox_2d"`` : ``{"bbox_2d": [x, y]}`` ou ``[x1, y1, x2, y2]`` + 2. ``"xy_json"`` : ``{"x": ..., "y": ...}`` (heuristique x>1 → pixels) + 3. ``"xy_pct"`` : ``{"x_pct": ..., "y_pct": ...}`` + 4. ``"raw_array"`` : array brut ``[...]`` 2 ou 4 coords + + Args: + content: réponse texte du VLM. + divisor_w, divisor_h: dimensions normalisant les pixels en pct. + formats: ensemble des formats à essayer. Si ``None`` (défaut), + cascade complète des 4. Pour restreindre, passer un sous-ensemble + de ``{"bbox_2d", "xy_json", "xy_pct", "raw_array"}``. + + Returns: + ``(x_pct, y_pct)`` ou ``(None, None)`` si aucun format ne matche. + """ + enabled = _ALL_FORMATS if formats is None else formats + x_pct, y_pct = None, None + + # Format 1 : bbox_2d en pixels [x, y] ou [x1, y1, x2, y2] + if "bbox_2d" in enabled: + bbox_match = re.search(r'"bbox_2d"\s*:\s*\[([^\]]+)\]', content) + if bbox_match: + coords = [float(v.strip()) for v in bbox_match.group(1).split(",")] + if len(coords) == 2: + x_pct = coords[0] / divisor_w + y_pct = coords[1] / divisor_h + elif len(coords) >= 4: + x_pct = (coords[0] + coords[2]) / 2 / divisor_w + y_pct = (coords[1] + coords[3]) / 2 / divisor_h + + # Format 2 : JSON {"x": 0.XX, "y": 0.YY} + if x_pct is None and "xy_json" in enabled: + json_match = re.search(r'"x"\s*:\s*([\d.]+).*?"y"\s*:\s*([\d.]+)', content) + if json_match: + x_val, y_val = float(json_match.group(1)), float(json_match.group(2)) + if x_val > 1: + x_pct = x_val / divisor_w + y_pct = y_val / divisor_h + else: + x_pct = x_val + y_pct = y_val + + # Format 3 : JSON {"x_pct": 0.XX, "y_pct": 0.YY} + if x_pct is None and "xy_pct" in enabled: + pct_match = re.search(r'"x_pct"\s*:\s*([\d.]+).*?"y_pct"\s*:\s*([\d.]+)', content) + if pct_match: + x_pct = float(pct_match.group(1)) + y_pct = float(pct_match.group(2)) + + # Format 4 : array brut [x1, y1, x2, y2] ou [x, y] + if x_pct is None and "raw_array" in enabled: + arr_match = re.search( + r'\[[\s]*([\d.]+)\s*,\s*([\d.]+)(?:\s*,\s*([\d.]+)\s*,\s*([\d.]+))?\s*\]', + content, + ) + if arr_match: + vals = [float(v) for v in arr_match.groups() if v is not None] + if len(vals) >= 4: + x_pct = (vals[0] + vals[2]) / 2 / divisor_w + y_pct = (vals[1] + vals[3]) / 2 / divisor_h + elif len(vals) == 2: + x_pct = vals[0] / divisor_w + y_pct = vals[1] / divisor_h + + return x_pct, y_pct + + +def parse_bbox_to_norm_validated( + content: str, + divisor_w: int | float, + divisor_h: int | float, + *, + formats: set[str] | None = None, +) -> tuple[float | None, float | None]: + """Idem :func:`parse_bbox_to_norm` + validation domaine [0, 1]. + + Retourne ``(None, None)`` si le résultat parsé est hors ``[0, 1]`` sur + l'un des deux axes — comportement de ``_locate_popup_button`` + (cf. resolve_engine.py:2569-2580). + + Implémentation : appelle :func:`parse_bbox_to_norm` puis valide. Pas + de duplication de la logique de parsing. + """ + x_pct, y_pct = parse_bbox_to_norm(content, divisor_w, divisor_h, formats=formats) + if x_pct is None or y_pct is None: + return None, None + if not (0.0 <= x_pct <= 1.0 and 0.0 <= y_pct <= 1.0): + return None, None + return x_pct, y_pct diff --git a/tests/unit/test_bbox_parser.py b/tests/unit/test_bbox_parser.py new file mode 100644 index 000000000..ca06513fd --- /dev/null +++ b/tests/unit/test_bbox_parser.py @@ -0,0 +1,267 @@ +""" +Tests unitaires pour core.grounding.bbox_parser. + +Module pur, indépendant de resolve_engine.py / agent_v0 / cv2. + +Plan : +- A. Format 1 (bbox_2d) — 4 cas +- B. Format 2 (x/y JSON) — 3 cas +- C. Format 3 (x_pct/y_pct) — 1 cas +- D. Format 4 (array brut) — 3 cas +- E. Cascade et edge cases — 4 cas +- E-bis. Filtre formats= (parse_bbox_to_norm + parse_bbox_to_norm_validated) — 4 cas +- F. parse_bbox_to_norm_validated — 5 cas +- G. Type retour — 2 cas + +Total : 26 cas. +""" + +from core.grounding.bbox_parser import ( + parse_bbox_to_norm, + parse_bbox_to_norm_validated, +) + + +# Dimensions de référence pour les divisions +W = 2560 +H = 1600 + + +# ===================================================================== +# A. Format 1 — bbox_2d +# ===================================================================== + + +class TestFormat1Bbox2d: + def test_2_coords_point(self): + # bbox_2d [x, y] : pixels divisés + content = '{"bbox_2d": [1280, 800], "label": "btn"}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 1280 / W + assert y == 800 / H + + def test_4_coords_rect_center(self): + # bbox_2d [x1, y1, x2, y2] : centre du rect divisé + content = '{"bbox_2d": [100, 200, 300, 400]}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == (100 + 300) / 2 / W + assert y == (200 + 400) / 2 / H + + def test_floats(self): + content = '{"bbox_2d": [1280.5, 800.25, 1300.0, 850.0]}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == (1280.5 + 1300.0) / 2 / W + assert y == (800.25 + 850.0) / 2 / H + + def test_5_coords_uses_first_4(self): + # >= 4 coords : prend les 4 premières comme rect (comportement original) + content = '{"bbox_2d": [10, 20, 30, 40, 99]}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == (10 + 30) / 2 / W + assert y == (20 + 40) / 2 / H + + +# ===================================================================== +# B. Format 2 — x/y JSON +# ===================================================================== + + +class TestFormat2XYJson: + def test_pixels_x_above_1(self): + # x > 1 → divise par W (heuristique pixels) + content = '{"x": 1280, "y": 800}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 1280 / W + assert y == 800 / H + + def test_already_pct_x_below_1(self): + # x <= 1 → considéré comme déjà normalisé, pas de division + content = '{"x": 0.5, "y": 0.3}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 0.5 + assert y == 0.3 + + def test_x_exactly_1_treated_as_pct(self): + # x == 1 : x > 1 est False → traité comme pct (non divisé) + # Comportement original Occ 1+2 — fige la limite. + content = '{"x": 1.0, "y": 1.0}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 1.0 + assert y == 1.0 + + +# ===================================================================== +# C. Format 3 — x_pct/y_pct +# ===================================================================== + + +class TestFormat3XPctYPct: + def test_already_normalized(self): + content = '{"x_pct": 0.42, "y_pct": 0.68}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 0.42 + assert y == 0.68 + + +# ===================================================================== +# D. Format 4 — array brut +# ===================================================================== + + +class TestFormat4RawArray: + def test_2_coords(self): + content = "[1280, 800]" + x, y = parse_bbox_to_norm(content, W, H) + assert x == 1280 / W + assert y == 800 / H + + def test_4_coords(self): + content = "[100, 200, 300, 400]" + x, y = parse_bbox_to_norm(content, W, H) + assert x == (100 + 300) / 2 / W + assert y == (200 + 400) / 2 / H + + def test_floats(self): + content = "[100.5, 200.25, 300.0, 400.75]" + x, y = parse_bbox_to_norm(content, W, H) + assert x == (100.5 + 300.0) / 2 / W + assert y == (200.25 + 400.75) / 2 / H + + +# ===================================================================== +# E. Cascade et edge cases +# ===================================================================== + + +class TestCascadeAndEdge: + def test_bbox_2d_priority_over_array(self): + # bbox_2d présent ET array brut présent → bbox_2d gagne (testé en premier) + content = '{"bbox_2d": [10, 20], "extra": [9999, 9999]}' + x, y = parse_bbox_to_norm(content, W, H) + assert x == 10 / W + assert y == 20 / H + + def test_empty_content_returns_none(self): + x, y = parse_bbox_to_norm("", W, H) + assert x is None + assert y is None + + def test_no_match_returns_none(self): + # Texte ne contenant aucun format reconnu + x, y = parse_bbox_to_norm("Sorry, I cannot locate this element.", W, H) + assert x is None + assert y is None + + def test_malformed_json_no_coords(self): + # JSON sans coordonnées + content = '{"label": "ok", "confidence": 0.9}' + x, y = parse_bbox_to_norm(content, W, H) + assert x is None + assert y is None + + +# ===================================================================== +# E-bis. Filtre formats= (paramètre kwarg) +# ===================================================================== + + +class TestFormatsFilter: + def test_formats_xy_json_only_excludes_bbox_2d(self): + # bbox_2d présent dans le content, mais formats=xy_json seul. + # Avec ce filtre : bbox_2d skipped, raw_array skipped, xy_json + # ne matche pas (regex `"x"\s*:` ne capture pas `"bbox_2d"`). + # → (None, None) confirmé. + content = '{"bbox_2d": [10, 20]}' + x, y = parse_bbox_to_norm(content, W, H, formats={"xy_json"}) + assert x is None + assert y is None + + def test_formats_xy_json_and_raw_array_excludes_xy_pct(self): + # Sous-ensemble Occ 3 : restreint à xy_json + raw_array. + # Content avec x_pct/y_pct uniquement → format 3 filtré, autres + # ne matchent pas → (None, None). + content = '{"x_pct": 0.42, "y_pct": 0.68}' + x, y = parse_bbox_to_norm( + content, W, H, formats={"xy_json", "raw_array"} + ) + assert x is None + assert y is None + + +# ===================================================================== +# F. parse_bbox_to_norm_validated +# ===================================================================== + + +class TestValidated: + def test_inside_domain_returns_value(self): + # x_pct, y_pct ∈ [0, 1] → valeurs retournées + content = '{"x_pct": 0.42, "y_pct": 0.68}' + x, y = parse_bbox_to_norm_validated(content, W, H) + assert x == 0.42 + assert y == 0.68 + + def test_x_negative_returns_none(self): + content = '{"x_pct": -0.1, "y_pct": 0.5}' + x, y = parse_bbox_to_norm_validated(content, W, H) + assert x is None + assert y is None + + def test_x_above_1_returns_none(self): + # bbox_2d en pixels > divisor → x_pct > 1 + content = '{"bbox_2d": [9999, 800]}' + x, y = parse_bbox_to_norm_validated(content, W, H) + assert x is None + assert y is None + + def test_y_out_of_range_returns_none(self): + content = '{"x_pct": 0.5, "y_pct": 1.5}' + x, y = parse_bbox_to_norm_validated(content, W, H) + assert x is None + assert y is None + + def test_no_parse_returns_none(self): + x, y = parse_bbox_to_norm_validated("nope", W, H) + assert x is None + assert y is None + + def test_validated_formats_bbox_2d_only_valid(self): + # Sous-ensemble Occ 4 : restreint à bbox_2d, validation [0, 1]. + # bbox_2d 4-coords valide → coordonnées normalisées dans le domaine. + content = '{"bbox_2d": [100, 200, 300, 400]}' + x, y = parse_bbox_to_norm_validated( + content, W, H, formats={"bbox_2d"} + ) + assert x == (100 + 300) / 2 / W + assert y == (200 + 400) / 2 / H + + def test_validated_formats_bbox_2d_only_excludes_xy_json(self): + # Sous-ensemble Occ 4 : si le VLM retourne {"x":..., "y":...} + # au lieu du bbox_2d demandé, le format est filtré → (None, None). + content = '{"x": 1280, "y": 800}' + x, y = parse_bbox_to_norm_validated( + content, W, H, formats={"bbox_2d"} + ) + assert x is None + assert y is None + + +# ===================================================================== +# G. Type retour +# ===================================================================== + + +class TestReturnType: + def test_tuple_of_two(self): + result = parse_bbox_to_norm('{"bbox_2d": [10, 20]}', W, H) + assert isinstance(result, tuple) + assert len(result) == 2 + + def test_floats_or_none(self): + x, y = parse_bbox_to_norm('{"bbox_2d": [10, 20]}', W, H) + assert isinstance(x, float) + assert isinstance(y, float) + + x_none, y_none = parse_bbox_to_norm("nope", W, H) + assert x_none is None + assert y_none is None