test(coords+capture): coords write-only gap (10 tests) + capture I/O + image_chat_cli
test_coords_consumption_gap.py documents 3 structural gaps where NavigateCoords are written but never consumed. test_capture_io.py and test_image_chat_cli.py cover capture and chat CLI paths.
This commit is contained in:
285
tests/test_image_chat_cli.py
Normal file
285
tests/test_image_chat_cli.py
Normal file
@@ -0,0 +1,285 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Chat interactif en ligne de commande avec gemma4:26b via Ollama.
|
||||
|
||||
Usage interactif :
|
||||
python tests/test_image_chat_cli.py
|
||||
# puis taper des questions sur l'image fournie
|
||||
|
||||
Usage one-shot :
|
||||
python tests/test_image_chat_cli.py /chemin/vers/image.png "Que vois-tu ?"
|
||||
|
||||
Usage avec modèle différent :
|
||||
python tests/test_image_chat_cli.py --model qwen3-vl:8b image.png
|
||||
|
||||
Le script utilise l'API Ollama directement (via la lib `ollama` du projet,
|
||||
`ollama==0.6.1` dans requirements.txt).
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
try:
|
||||
import ollama
|
||||
except ImportError:
|
||||
print("ERREUR : la librairie 'ollama' n'est pas installée.")
|
||||
print("Installez-la avec : pip install ollama")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
DEFAULT_MODEL = "gemma4:26b"
|
||||
|
||||
|
||||
def encode_image(image_path: str) -> str:
|
||||
"""Encode une image en base64 pour l'API Ollama."""
|
||||
path = Path(image_path)
|
||||
if not path.exists():
|
||||
print(f"ERREUR : le fichier '{image_path}' n'existe pas.")
|
||||
sys.exit(1)
|
||||
if not path.is_file():
|
||||
print(f"ERREUR : '{image_path}' n'est pas un fichier.")
|
||||
sys.exit(1)
|
||||
with open(path, "rb") as f:
|
||||
return base64.b64encode(f.read()).decode("utf-8")
|
||||
|
||||
|
||||
def get_client(host: str):
|
||||
"""Renvoie un client Ollama configuré pour l'hôte donné."""
|
||||
return ollama.Client(host=host)
|
||||
|
||||
|
||||
def check_ollama_running(host: str = "http://localhost:11434") -> bool:
|
||||
"""Vérifie que le serveur Ollama est accessible."""
|
||||
try:
|
||||
client = get_client(host)
|
||||
client.list()
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"ERREUR : impossible de joindre Ollama sur {host}")
|
||||
print(f"Détail : {e}")
|
||||
print()
|
||||
print("Assurez-vous qu'Ollama est lancé :")
|
||||
print(" ollama serve")
|
||||
return False
|
||||
|
||||
|
||||
def check_model_available(model: str, host: str = "http://localhost:11434") -> bool:
|
||||
"""Vérifie que le modèle est disponible dans Ollama."""
|
||||
try:
|
||||
client = get_client(host)
|
||||
tags = client.list()
|
||||
# ollama.list() retourne un ListResponse avec un attribut 'models'
|
||||
models = getattr(tags, "models", [])
|
||||
|
||||
model_names = []
|
||||
for m in models:
|
||||
if isinstance(m, dict):
|
||||
model_names.append(m.get("name", ""))
|
||||
else:
|
||||
model_names.append(getattr(m, "name", str(m)))
|
||||
|
||||
# Correspondance exacte ou préfixe
|
||||
matched = [name for name in model_names if model in name]
|
||||
if matched:
|
||||
return True
|
||||
else:
|
||||
print(f"AVERTISSEMENT : modèle '{model}' non trouvé dans Ollama.")
|
||||
print(f"Modèles disponibles : {', '.join(model_names) or '(aucun)'}")
|
||||
print()
|
||||
print(f"Pour le télécharger :")
|
||||
print(f" ollama pull {model}")
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"ERREUR : impossible de lister les modèles : {e}")
|
||||
return False
|
||||
|
||||
|
||||
def chat_with_image(image_path: str, model: str, host: str = "http://localhost:11434") -> None:
|
||||
"""Mode interactif : charge l'image une fois, puis pose des questions."""
|
||||
client = get_client(host)
|
||||
image_b64 = encode_image(image_path)
|
||||
print(f"🖼️ Image chargée : {image_path}")
|
||||
print(f"🤖 Modèle : {model}")
|
||||
print(f"🔗 Ollama : {host}")
|
||||
print()
|
||||
print("Mode interactif — tapez vos questions (ou 'exit'/'quit' pour sortir)")
|
||||
print("Tapez '/image /chemin/nouvelle.png' pour changer d'image")
|
||||
print("-" * 60)
|
||||
|
||||
# Historique de conversation (sans l'image à chaque fois pour économiser la mémoire)
|
||||
messages = []
|
||||
|
||||
while True:
|
||||
try:
|
||||
question = input("\nVous > ").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print("\n👋 Au revoir !")
|
||||
break
|
||||
|
||||
if not question:
|
||||
continue
|
||||
|
||||
if question.lower() in ("exit", "quit", "q"):
|
||||
print("👋 Au revoir !")
|
||||
break
|
||||
|
||||
# Changement d'image
|
||||
if question.startswith("/image "):
|
||||
new_path = question[len("/image "):].strip()
|
||||
try:
|
||||
image_b64 = encode_image(new_path)
|
||||
image_path = new_path
|
||||
# Réinitialiser l'historique car image différente
|
||||
messages = []
|
||||
print(f"🖼️ Nouvelle image : {new_path}")
|
||||
except SystemExit:
|
||||
pass
|
||||
continue
|
||||
|
||||
# Construire le message user avec l'image au premier tour
|
||||
# Ensuite, l'image n'est ré-envoyée que si l'historique est vide
|
||||
has_image_in_context = any(
|
||||
isinstance(m.get("images"), list) and len(m["images"]) > 0
|
||||
for m in messages
|
||||
)
|
||||
|
||||
user_msg = {"role": "user", "content": question}
|
||||
if not has_image_in_context:
|
||||
# Première question ou image changée — inclure l'image
|
||||
user_msg["images"] = [image_b64]
|
||||
messages.append(user_msg)
|
||||
|
||||
print(f"🤖 Réponse ({model})...", end=" ", flush=True)
|
||||
|
||||
try:
|
||||
response = client.chat(
|
||||
model=model,
|
||||
messages=messages,
|
||||
stream=True,
|
||||
options={
|
||||
"temperature": 0.2,
|
||||
"num_predict": 2048,
|
||||
},
|
||||
)
|
||||
|
||||
full_response = ""
|
||||
print() # nouvelle ligne après le "..."
|
||||
for chunk in response:
|
||||
content = chunk.get("message", {}).get("content", "")
|
||||
if content:
|
||||
print(content, end="", flush=True)
|
||||
full_response += content
|
||||
|
||||
print() # retour à la ligne après la réponse
|
||||
messages.append({"role": "assistant", "content": full_response})
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n❌ Erreur : {e}")
|
||||
# Retirer le dernier message user en cas d'erreur
|
||||
messages.pop()
|
||||
|
||||
|
||||
def one_shot(image_path: str, question: str, model: str, host: str = "http://localhost:11434") -> None:
|
||||
"""Mode one-shot : une question, une réponse."""
|
||||
client = get_client(host)
|
||||
image_b64 = encode_image(image_path)
|
||||
|
||||
messages = [
|
||||
{"role": "user", "content": question, "images": [image_b64]},
|
||||
]
|
||||
|
||||
try:
|
||||
response = client.chat(
|
||||
model=model,
|
||||
messages=messages,
|
||||
stream=True,
|
||||
options={
|
||||
"temperature": 0.2,
|
||||
"num_predict": 2048,
|
||||
},
|
||||
)
|
||||
|
||||
print(f"🤖 {model} — '{question}'\n")
|
||||
for chunk in response:
|
||||
content = chunk.get("message", {}).get("content", "")
|
||||
if content:
|
||||
print(content, end="", flush=True)
|
||||
print()
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Erreur : {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Chat interactif avec une image via Ollama (gemma4:26b par défaut)",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog="""
|
||||
Exemples :
|
||||
# Mode interactif avec une image
|
||||
python tests/test_image_chat_cli.py screenshot.png
|
||||
|
||||
# Mode one-shot (question directe)
|
||||
python tests/test_image_chat_cli.py screenshot.png "Quels boutons vois-tu ?"
|
||||
|
||||
# Avec un autre modèle
|
||||
python tests/test_image_chat_cli.py --model qwen3-vl:8b screenshot.png
|
||||
|
||||
# Ollama sur une machine distante
|
||||
python tests/test_image_chat_cli.py --host http://dgx:11434 screenshot.png
|
||||
""",
|
||||
)
|
||||
parser.add_argument(
|
||||
"image",
|
||||
nargs="?",
|
||||
help="Chemin vers l'image à analyser",
|
||||
)
|
||||
parser.add_argument(
|
||||
"question",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Question one-shot (si absent → mode interactif)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--model",
|
||||
default=DEFAULT_MODEL,
|
||||
help=f"Modèle Ollama à utiliser (défaut: {DEFAULT_MODEL})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--host",
|
||||
default="http://localhost:11434",
|
||||
help="URL du serveur Ollama (défaut: http://localhost:11434)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
# Vérifications préalables
|
||||
if not check_ollama_running(args.host):
|
||||
sys.exit(1)
|
||||
|
||||
if not check_model_available(args.model, args.host):
|
||||
sys.exit(1)
|
||||
|
||||
if not args.image:
|
||||
print("Utilisation interactive — veuillez fournir le chemin d'une image.")
|
||||
print()
|
||||
print("Usage :")
|
||||
print(f" python {sys.argv[0]} /chemin/vers/image.png")
|
||||
print(f" python {sys.argv[0]} /chemin/vers/image.png \"Votre question\"")
|
||||
print()
|
||||
parser.print_help()
|
||||
sys.exit(1)
|
||||
|
||||
if args.question:
|
||||
# Mode one-shot
|
||||
one_shot(args.image, args.question, args.model, args.host)
|
||||
else:
|
||||
# Mode interactif
|
||||
chat_with_image(args.image, args.model, args.host)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
155
tests/unit/test_capture_io.py
Normal file
155
tests/unit/test_capture_io.py
Normal file
@@ -0,0 +1,155 @@
|
||||
"""Tests unitaires de la politique de sauvegarde des captures (agent_v1).
|
||||
|
||||
Objectif : réduire le poids disque des captures (90 Go / 13 sessions = trop)
|
||||
sans casser la précision du grounding. La politique distingue le *type* de
|
||||
shot :
|
||||
|
||||
- ``crop`` → PNG lossless (cible de grounding qwen3-vl, précision pixel) ;
|
||||
- ``full`` / ``window`` / ``context`` → JPEG ``optimize=True`` (vue humaine /
|
||||
contexte, compression ~5-10x acceptable) ;
|
||||
- ``heartbeat`` → JPEG **downscalé** (liveness, pas de grounding → on peut
|
||||
réduire la résolution).
|
||||
|
||||
La fonction ``save_capture`` retourne le chemin RÉELLEMENT écrit (extension
|
||||
ajustée selon le format), pour que l'appelant streame le bon fichier.
|
||||
|
||||
Branche feat/push-log-dgx — réduction du poids de capture (unité testée,
|
||||
non encore câblée dans capturer.py).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
from PIL import Image
|
||||
|
||||
_ROOT = str(Path(__file__).resolve().parents[2])
|
||||
if _ROOT not in sys.path:
|
||||
sys.path.insert(0, _ROOT)
|
||||
|
||||
|
||||
def _noisy_image(width: int, height: int) -> Image.Image:
|
||||
"""Image RGB avec du bruit réel.
|
||||
|
||||
Un aplat uni se compresse à quasi-zéro en PNG comme en JPEG : la
|
||||
comparaison de poids serait truquée. On injecte du bruit pour que la
|
||||
différence PNG/JPEG soit représentative d'un vrai screenshot.
|
||||
"""
|
||||
return Image.frombytes("RGB", (width, height), os.urandom(width * height * 3))
|
||||
|
||||
|
||||
def test_crop_reste_png_et_dimensions_identiques(tmp_path):
|
||||
"""Un crop est sauvé en PNG lossless, dimensions inchangées."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(80, 80)
|
||||
base = str(tmp_path / "shot_0001_crop")
|
||||
|
||||
out_path = save_capture(img, base, kind="crop")
|
||||
|
||||
assert out_path.endswith(".png"), f"crop doit rester PNG, obtenu {out_path}"
|
||||
assert os.path.exists(out_path)
|
||||
reread = Image.open(out_path)
|
||||
assert reread.size == (80, 80)
|
||||
# PNG lossless : les pixels doivent être identiques au bruit d'origine.
|
||||
assert list(reread.convert("RGB").getdata()) == list(img.getdata())
|
||||
|
||||
|
||||
def test_full_est_jpeg(tmp_path):
|
||||
"""Un full est sauvé en JPEG (.jpg)."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(640, 480)
|
||||
base = str(tmp_path / "shot_0001_full")
|
||||
|
||||
out_path = save_capture(img, base, kind="full")
|
||||
|
||||
assert out_path.endswith(".jpg"), f"full doit être JPEG, obtenu {out_path}"
|
||||
assert os.path.exists(out_path)
|
||||
|
||||
|
||||
def test_full_jpeg_significativement_plus_leger_que_png(tmp_path):
|
||||
"""Le JPEG full doit peser nettement moins que le PNG équivalent.
|
||||
|
||||
On génère une image bruitée plein écran (2560×1600) et on compare le
|
||||
poids du JPEG produit par la politique au poids d'un PNG lossless du
|
||||
même contenu. Le gain doit être substantiel (au moins 2x plus léger).
|
||||
"""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(2560, 1600)
|
||||
|
||||
jpeg_path = save_capture(img, str(tmp_path / "full_jpeg"), kind="full")
|
||||
png_ref = tmp_path / "full_ref.png"
|
||||
img.save(png_ref, "PNG")
|
||||
|
||||
jpeg_size = os.path.getsize(jpeg_path)
|
||||
png_size = os.path.getsize(png_ref)
|
||||
|
||||
assert jpeg_size < png_size / 2, (
|
||||
f"JPEG ({jpeg_size}o) doit peser < moitié du PNG ({png_size}o)"
|
||||
)
|
||||
|
||||
|
||||
def test_context_et_window_sont_jpeg(tmp_path):
|
||||
"""context et window suivent la même politique JPEG que full."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(320, 240)
|
||||
for kind in ("context", "window"):
|
||||
out_path = save_capture(img, str(tmp_path / f"x_{kind}"), kind=kind)
|
||||
assert out_path.endswith(".jpg"), f"{kind} doit être JPEG, obtenu {out_path}"
|
||||
assert os.path.exists(out_path)
|
||||
|
||||
|
||||
def test_heartbeat_est_downscale(tmp_path):
|
||||
"""Un heartbeat est downscalé (largeur réduite) et reste JPEG."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(2560, 1600)
|
||||
out_path = save_capture(img, str(tmp_path / "heartbeat_1234"), kind="heartbeat")
|
||||
|
||||
assert out_path.endswith(".jpg"), f"heartbeat doit être JPEG, obtenu {out_path}"
|
||||
reread = Image.open(out_path)
|
||||
assert reread.width < 2560, "heartbeat doit être downscalé en largeur"
|
||||
# Ratio préservé (16:10 → la hauteur doit suivre la largeur réduite).
|
||||
ratio_src = 2560 / 1600
|
||||
ratio_out = reread.width / reread.height
|
||||
assert abs(ratio_src - ratio_out) < 0.02, "le ratio doit être préservé"
|
||||
|
||||
|
||||
def test_heartbeat_plus_leger_que_full_jpeg(tmp_path):
|
||||
"""Le downscale du heartbeat le rend plus léger que le full JPEG plein res."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(2560, 1600)
|
||||
hb = save_capture(img, str(tmp_path / "heartbeat_5678"), kind="heartbeat")
|
||||
full = save_capture(img, str(tmp_path / "shot_9999_full"), kind="full")
|
||||
|
||||
assert os.path.getsize(hb) < os.path.getsize(full), (
|
||||
"le heartbeat downscalé doit peser moins que le full JPEG plein res"
|
||||
)
|
||||
|
||||
|
||||
def test_kind_inconnu_leve_erreur(tmp_path):
|
||||
"""Un kind non reconnu doit échouer explicitement (fail-closed)."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = _noisy_image(40, 40)
|
||||
try:
|
||||
save_capture(img, str(tmp_path / "x"), kind="inexistant")
|
||||
except ValueError:
|
||||
return
|
||||
raise AssertionError("un kind inconnu doit lever ValueError")
|
||||
|
||||
|
||||
def test_rgba_converti_pour_jpeg(tmp_path):
|
||||
"""Une image RGBA doit être convertie avant l'encodage JPEG (pas d'alpha)."""
|
||||
from agent_v0.agent_v1.vision.capture_io import save_capture
|
||||
|
||||
img = Image.new("RGBA", (64, 64), (10, 20, 30, 128))
|
||||
out_path = save_capture(img, str(tmp_path / "shot_rgba_full"), kind="full")
|
||||
assert out_path.endswith(".jpg")
|
||||
assert os.path.exists(out_path)
|
||||
202
tests/unit/test_coords_consumption_gap.py
Normal file
202
tests/unit/test_coords_consumption_gap.py
Normal file
@@ -0,0 +1,202 @@
|
||||
"""Tests documenting the coords consumption gap: write-only navigate coords.
|
||||
|
||||
Test 1 (POSITIVE): _resolve_runtime_vars mechanism works — template strings
|
||||
like {{navigate_login_coords.x_pct}} resolve correctly when variables dict
|
||||
contains the stored coords.
|
||||
|
||||
Test 2 (NEGATIVE): _edge_to_normalized_actions bakes coords as literal floats,
|
||||
never producing template strings — so runtime variable resolution is never
|
||||
triggered for navigate coords, proving the write-only gap.
|
||||
|
||||
These tests are evidence, not regression guards. Test 2 documents a known
|
||||
structural gap; when the gap is fixed, Test 2 should be updated to assert
|
||||
templates ARE produced.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
from types import SimpleNamespace
|
||||
|
||||
os.environ.setdefault("RPA_AUTH_DISABLED", "true")
|
||||
|
||||
from agent_v0.server_v1.replay_engine import (
|
||||
_edge_to_normalized_actions,
|
||||
_resolve_runtime_vars,
|
||||
_resolve_runtime_vars_in_str,
|
||||
)
|
||||
|
||||
|
||||
# ── Fake fixtures (minimal, per test_visual_anchor_semantics.py pattern) ──
|
||||
|
||||
|
||||
class _FakeAction:
|
||||
def __init__(self, type_, target=None, parameters=None):
|
||||
self.type = type_
|
||||
self.target = target
|
||||
self.parameters = parameters or {}
|
||||
|
||||
|
||||
class _FakeEdge:
|
||||
def __init__(self, action):
|
||||
self.edge_id = "edge_coords_gap"
|
||||
self.from_node = "node_src"
|
||||
self.to_node = "node_dst"
|
||||
self.action = action
|
||||
|
||||
|
||||
# ── Test 1: resolve mechanism is viable ──────────────────────────────────
|
||||
|
||||
|
||||
class TestResolveRuntimeVarsViable:
|
||||
"""Prove _resolve_runtime_vars infrastructure works with template strings."""
|
||||
|
||||
VARIABLES = {
|
||||
"navigate_login_coords": {
|
||||
"x_pct": 0.15,
|
||||
"y_pct": 0.07,
|
||||
"method": "ocr_anchor",
|
||||
}
|
||||
}
|
||||
|
||||
def test_resolve_in_str_dot_path(self):
|
||||
"""{{navigate_login_coords.x_pct}} → "0.15" (string, not float)."""
|
||||
result = _resolve_runtime_vars_in_str(
|
||||
"{{navigate_login_coords.x_pct}}", self.VARIABLES
|
||||
)
|
||||
assert result == "0.15"
|
||||
|
||||
def test_resolve_in_str_y_pct(self):
|
||||
"""{{navigate_login_coords.y_pct}} → "0.07"."""
|
||||
result = _resolve_runtime_vars_in_str(
|
||||
"{{navigate_login_coords.y_pct}}", self.VARIABLES
|
||||
)
|
||||
assert result == "0.07"
|
||||
|
||||
def test_resolve_dict_with_templates(self):
|
||||
"""_resolve_runtime_vars substitutes templates inside dict values."""
|
||||
action = {
|
||||
"type": "click",
|
||||
"x_pct": "{{navigate_login_coords.x_pct}}",
|
||||
"y_pct": "{{navigate_login_coords.y_pct}}",
|
||||
}
|
||||
resolved = _resolve_runtime_vars(action, self.VARIABLES)
|
||||
assert resolved["x_pct"] == "0.15"
|
||||
assert resolved["y_pct"] == "0.07"
|
||||
assert resolved["type"] == "click" # no-template strings unchanged
|
||||
|
||||
def test_resolve_nested_dict(self):
|
||||
"""_resolve_runtime_vars handles nested dicts with templates."""
|
||||
action = {
|
||||
"parameters": {
|
||||
"coords": "{{navigate_login_coords.x_pct}}",
|
||||
},
|
||||
}
|
||||
resolved = _resolve_runtime_vars(action, self.VARIABLES)
|
||||
assert resolved["parameters"]["coords"] == "0.15"
|
||||
|
||||
def test_resolve_missing_var_leaves_template_intact(self):
|
||||
"""Missing variable: template string stays unchanged."""
|
||||
result = _resolve_runtime_vars_in_str(
|
||||
"{{navigate_password_coords.x_pct}}", self.VARIABLES
|
||||
)
|
||||
assert "{{navigate_password_coords.x_pct}}" in result
|
||||
|
||||
def test_resolve_float_passthrough(self):
|
||||
"""_resolve_runtime_vars returns non-str values unchanged — floats pass through."""
|
||||
action = {"x_pct": 0.15, "y_pct": 0.07}
|
||||
resolved = _resolve_runtime_vars(action, self.VARIABLES)
|
||||
# Floats are NOT substituted — they're not strings containing {{...}}
|
||||
assert resolved["x_pct"] == 0.15 # literal float, unchanged
|
||||
assert resolved["y_pct"] == 0.07
|
||||
|
||||
|
||||
# ── Test 2: compiler gap — literals not templates ────────────────────────
|
||||
|
||||
|
||||
class TestCompilerGapLiteralFloats:
|
||||
"""Document that _edge_to_normalized_actions produces literal floats,
|
||||
never template strings — so navigate coords are write-only.
|
||||
|
||||
This is the STRUCTURAL GAP: the compiler bakes coords as floats,
|
||||
_resolve_runtime_vars only operates on strings, so stored navigate
|
||||
variables are never consumed downstream.
|
||||
"""
|
||||
|
||||
def test_mouse_click_produces_literal_floats(self):
|
||||
"""mouse_click edge: x_pct/y_pct are literal floats, not templates."""
|
||||
target = SimpleNamespace(
|
||||
by_position=(0.15, 0.07),
|
||||
by_role=None,
|
||||
by_text=None,
|
||||
context_hints={},
|
||||
)
|
||||
edge = _FakeEdge(
|
||||
_FakeAction("mouse_click", target=target, parameters={"button": "left"})
|
||||
)
|
||||
actions = _edge_to_normalized_actions(edge, params={})
|
||||
assert len(actions) == 1
|
||||
action = actions[0]
|
||||
|
||||
# GAP: coords are literal floats, not template strings
|
||||
assert isinstance(action["x_pct"], float)
|
||||
assert isinstance(action["y_pct"], float)
|
||||
assert action["x_pct"] == 0.15
|
||||
assert action["y_pct"] == 0.07
|
||||
|
||||
# Proof: no template string is ever produced by the compiler
|
||||
assert not isinstance(action["x_pct"], str)
|
||||
assert not isinstance(action["y_pct"], str)
|
||||
|
||||
def test_literal_floats_not_resolved(self):
|
||||
"""Literal floats pass through _resolve_runtime_vars unchanged —
|
||||
proving navigate coords stored in variables are NEVER consumed."""
|
||||
target = SimpleNamespace(
|
||||
by_position=(0.15, 0.07),
|
||||
by_role=None,
|
||||
by_text=None,
|
||||
context_hints={},
|
||||
)
|
||||
edge = _FakeEdge(
|
||||
_FakeAction("mouse_click", target=target, parameters={"button": "left"})
|
||||
)
|
||||
actions = _edge_to_normalized_actions(edge, params={})
|
||||
action = actions[0]
|
||||
|
||||
# Simulate variables from a prior navigate_login step
|
||||
different_coords = {
|
||||
"navigate_login_coords": {"x_pct": 0.20, "y_pct": 0.10}
|
||||
}
|
||||
resolved = _resolve_runtime_vars(action, different_coords)
|
||||
|
||||
# Coords REMAIN the original literal floats — no substitution
|
||||
assert resolved["x_pct"] == 0.15 # NOT 0.20 (no substitution)
|
||||
assert resolved["y_pct"] == 0.07 # NOT 0.10 (no substitution)
|
||||
|
||||
def test_text_input_produces_literal_floats(self):
|
||||
"""text_input edge: same literal float pattern for click target."""
|
||||
target = SimpleNamespace(
|
||||
by_position=(0.30, 0.50),
|
||||
by_role=None,
|
||||
by_text=None,
|
||||
context_hints={},
|
||||
)
|
||||
edge = _FakeEdge(
|
||||
_FakeAction("text_input", target=target, parameters={"text": "admin"})
|
||||
)
|
||||
actions = _edge_to_normalized_actions(edge, params={})
|
||||
assert len(actions) == 1
|
||||
action = actions[0]
|
||||
|
||||
assert isinstance(action["x_pct"], float)
|
||||
assert isinstance(action["y_pct"], float)
|
||||
assert action["x_pct"] == 0.30
|
||||
assert action["y_pct"] == 0.50
|
||||
|
||||
def test_navigate_action_type_unknown(self):
|
||||
"""navigate action type is NOT handled by _edge_to_normalized_actions —
|
||||
falls into the else branch logging "Type d'action inconnu"."""
|
||||
edge = _FakeEdge(_FakeAction("navigate", parameters={"target": "login"}))
|
||||
actions = _edge_to_normalized_actions(edge, params={})
|
||||
|
||||
# navigate produces empty actions — not compiled at all
|
||||
assert actions == []
|
||||
Reference in New Issue
Block a user