163 lines
5.0 KiB
Python
163 lines
5.0 KiB
Python
import importlib
|
|
import sys
|
|
import types
|
|
|
|
|
|
def _install_fake_pynput(monkeypatch):
|
|
class FakeKey:
|
|
def __init__(self, name):
|
|
self.name = name
|
|
|
|
def __repr__(self):
|
|
return f"Key.{self.name}"
|
|
|
|
for name in (
|
|
"ctrl",
|
|
"ctrl_l",
|
|
"ctrl_r",
|
|
"alt",
|
|
"alt_l",
|
|
"alt_r",
|
|
"shift",
|
|
"shift_l",
|
|
"shift_r",
|
|
"cmd",
|
|
"cmd_l",
|
|
"cmd_r",
|
|
"esc",
|
|
"enter",
|
|
"tab",
|
|
"space",
|
|
"backspace",
|
|
):
|
|
setattr(FakeKey, name, FakeKey(name))
|
|
|
|
class FakeKeyCode:
|
|
def __init__(self, char=None, vk=None):
|
|
self.char = char
|
|
self.vk = vk
|
|
|
|
pynput = types.ModuleType("pynput")
|
|
mouse = types.ModuleType("pynput.mouse")
|
|
keyboard = types.ModuleType("pynput.keyboard")
|
|
|
|
class FakeButton:
|
|
pass
|
|
|
|
mouse.Button = FakeButton
|
|
mouse.Listener = object
|
|
keyboard.Key = FakeKey
|
|
keyboard.KeyCode = FakeKeyCode
|
|
keyboard.Listener = object
|
|
pynput.mouse = mouse
|
|
pynput.keyboard = keyboard
|
|
|
|
monkeypatch.setitem(sys.modules, "pynput", pynput)
|
|
monkeypatch.setitem(sys.modules, "pynput.mouse", mouse)
|
|
monkeypatch.setitem(sys.modules, "pynput.keyboard", keyboard)
|
|
sys.modules.pop("agent_v0.agent_v1.core.captor", None)
|
|
return FakeKey, FakeKeyCode
|
|
|
|
|
|
def _load_captor(monkeypatch):
|
|
fake_key, fake_key_code = _install_fake_pynput(monkeypatch)
|
|
module = importlib.import_module("agent_v0.agent_v1.core.captor")
|
|
return module, fake_key, fake_key_code
|
|
|
|
|
|
def test_standalone_windows_key_is_emitted_on_release(monkeypatch):
|
|
captor_module, key, _key_code = _load_captor(monkeypatch)
|
|
events = []
|
|
captor = captor_module.EventCaptorV1(events.append)
|
|
captor._inject_screen_metadata = lambda _event: None
|
|
|
|
captor._on_press(key.cmd)
|
|
assert events == []
|
|
|
|
captor._on_release(key.cmd)
|
|
|
|
assert [event["keys"] for event in events] == [["win"]]
|
|
assert [raw["action"] for raw in events[0]["raw_keys"]] == ["press", "release"]
|
|
assert "win" not in captor.modifiers
|
|
|
|
|
|
def test_windows_shortcut_cancels_standalone_windows_key(monkeypatch):
|
|
captor_module, key, key_code = _load_captor(monkeypatch)
|
|
events = []
|
|
captor = captor_module.EventCaptorV1(events.append)
|
|
captor._inject_screen_metadata = lambda _event: None
|
|
|
|
captor._on_press(key.cmd)
|
|
captor._on_press(key_code(char="s", vk=83))
|
|
captor._on_release(key_code(char="s", vk=83))
|
|
captor._on_release(key.cmd)
|
|
|
|
assert [event["keys"] for event in events] == [["win", "s"]]
|
|
|
|
|
|
def test_release_only_windows_shortcut_is_inferred(monkeypatch):
|
|
captor_module, key, key_code = _load_captor(monkeypatch)
|
|
events = []
|
|
captor = captor_module.EventCaptorV1(events.append)
|
|
captor._inject_screen_metadata = lambda _event: None
|
|
|
|
# Windows/NoMachine can swallow press events for Win+S and only deliver
|
|
# release('s') then release('cmd').
|
|
captor._on_release(key_code(char="s", vk=83))
|
|
captor._on_release(key.cmd)
|
|
|
|
assert [event["keys"] for event in events] == [["win", "s"]]
|
|
assert [raw["action"] for raw in events[0]["raw_keys"]] == ["release", "release"]
|
|
|
|
|
|
def test_escape_key_is_emitted_as_key_combo(monkeypatch):
|
|
captor_module, key, _key_code = _load_captor(monkeypatch)
|
|
events = []
|
|
captor = captor_module.EventCaptorV1(events.append)
|
|
captor._inject_screen_metadata = lambda _event: None
|
|
|
|
captor._on_press(key.esc)
|
|
|
|
assert [event["keys"] for event in events] == [["escape"]]
|
|
|
|
|
|
def test_stream_processor_keeps_win_but_filters_other_modifiers():
|
|
from agent_v0.server_v1.stream_processor import (
|
|
_is_parasitic_event,
|
|
_needs_post_wait,
|
|
clean_compound_steps,
|
|
clean_enriched_actions,
|
|
)
|
|
|
|
assert _is_parasitic_event({"type": "key_combo", "keys": ["ctrl"]}) is True
|
|
assert _is_parasitic_event({"type": "key_combo", "keys": ["win"]}) is False
|
|
|
|
assert clean_enriched_actions(
|
|
[
|
|
{"type": "key_combo", "keys": ["ctrl"]},
|
|
{"type": "key_combo", "keys": ["win"]},
|
|
]
|
|
) == [{"type": "key_combo", "keys": ["win"]}]
|
|
|
|
assert clean_compound_steps(
|
|
[
|
|
{"type": "key_combo", "keys": ["shift"]},
|
|
{"type": "key_combo", "keys": ["win"]},
|
|
]
|
|
) == [{"type": "key_combo", "keys": ["win"]}]
|
|
|
|
assert _needs_post_wait({"type": "key_combo", "keys": ["win"]}) >= 1500
|
|
assert _needs_post_wait({"type": "key_combo", "keys": ["win", "s"]}) >= 1500
|
|
assert _needs_post_wait({"type": "key_combo", "keys": ["escape"]}) >= 500
|
|
|
|
|
|
def test_streamer_prioritizes_real_captor_event_types():
|
|
from agent_v0.agent_v1.network.streamer import TraceStreamer
|
|
|
|
streamer = TraceStreamer("sess_keyboard_priority")
|
|
|
|
assert streamer._is_priority_item("event", {"type": "key_combo"}) is True
|
|
assert streamer._is_priority_item("event", {"type": "text_input"}) is True
|
|
assert streamer._is_priority_item("event", {"type": "mouse_click"}) is True
|
|
assert streamer._is_priority_item("event", {"type": "heartbeat"}) is False
|