519 lines
21 KiB
Python
519 lines
21 KiB
Python
import subprocess
|
|
import platform
|
|
import pyautogui
|
|
import asyncio
|
|
import base64
|
|
import os
|
|
import time
|
|
if platform.system() == "Darwin":
|
|
import Quartz # uncomment this line if you are on macOS
|
|
from enum import StrEnum
|
|
from pathlib import Path
|
|
from typing import Literal, TypedDict
|
|
from uuid import uuid4
|
|
from screeninfo import get_monitors
|
|
|
|
from PIL import ImageGrab, Image
|
|
from functools import partial
|
|
|
|
from anthropic.types.beta import BetaToolComputerUse20241022Param
|
|
|
|
from .base import BaseAnthropicTool, ToolError, ToolResult
|
|
from .run import run
|
|
|
|
OUTPUT_DIR = "./tmp/outputs"
|
|
|
|
TYPING_DELAY_MS = 12
|
|
TYPING_GROUP_SIZE = 50
|
|
|
|
Action = Literal[
|
|
"key",
|
|
"type",
|
|
"mouse_move",
|
|
"left_click",
|
|
"left_click_drag",
|
|
"right_click",
|
|
"middle_click",
|
|
"double_click",
|
|
"screenshot",
|
|
"cursor_position",
|
|
]
|
|
|
|
|
|
class Resolution(TypedDict):
|
|
width: int
|
|
height: int
|
|
|
|
|
|
MAX_SCALING_TARGETS: dict[str, Resolution] = {
|
|
"XGA": Resolution(width=1024, height=768), # 4:3
|
|
"WXGA": Resolution(width=1280, height=800), # 16:10
|
|
"FWXGA": Resolution(width=1366, height=768), # ~16:9
|
|
}
|
|
|
|
|
|
class ScalingSource(StrEnum):
|
|
COMPUTER = "computer"
|
|
API = "api"
|
|
|
|
|
|
class ComputerToolOptions(TypedDict):
|
|
display_height_px: int
|
|
display_width_px: int
|
|
display_number: int | None
|
|
|
|
|
|
def chunks(s: str, chunk_size: int) -> list[str]:
|
|
return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
|
|
|
|
|
|
def get_screen_details():
|
|
screens = get_monitors()
|
|
screen_details = []
|
|
|
|
# Sort screens by x position to arrange from left to right
|
|
sorted_screens = sorted(screens, key=lambda s: s.x)
|
|
|
|
# Loop through sorted screens and assign positions
|
|
primary_index = 0
|
|
for i, screen in enumerate(sorted_screens):
|
|
if i == 0:
|
|
layout = "Left"
|
|
elif i == len(sorted_screens) - 1:
|
|
layout = "Right"
|
|
else:
|
|
layout = "Center"
|
|
|
|
if screen.is_primary:
|
|
position = "Primary"
|
|
primary_index = i
|
|
else:
|
|
position = "Secondary"
|
|
screen_info = f"Screen {i + 1}: {screen.width}x{screen.height}, {layout}, {position}"
|
|
screen_details.append(screen_info)
|
|
|
|
return screen_details, primary_index
|
|
|
|
|
|
class ComputerTool(BaseAnthropicTool):
|
|
"""
|
|
A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer.
|
|
Adapted for Windows using 'pyautogui'.
|
|
"""
|
|
|
|
name: Literal["computer"] = "computer"
|
|
api_type: Literal["computer_20241022"] = "computer_20241022"
|
|
width: int
|
|
height: int
|
|
display_num: int | None
|
|
|
|
_screenshot_delay = 2.0
|
|
_scaling_enabled = True
|
|
|
|
@property
|
|
def options(self) -> ComputerToolOptions:
|
|
width, height = self.scale_coordinates(
|
|
ScalingSource.COMPUTER, self.width, self.height
|
|
)
|
|
return {
|
|
"display_width_px": width,
|
|
"display_height_px": height,
|
|
"display_number": self.display_num,
|
|
}
|
|
|
|
def to_params(self) -> BetaToolComputerUse20241022Param:
|
|
return {"name": self.name, "type": self.api_type, **self.options}
|
|
|
|
def __init__(self, selected_screen: int = 0, is_scaling: bool = False):
|
|
super().__init__()
|
|
|
|
# Get screen width and height using Windows command
|
|
self.display_num = None
|
|
self.offset_x = 0
|
|
self.offset_y = 0
|
|
self.selected_screen = selected_screen
|
|
self.is_scaling = is_scaling
|
|
self.width, self.height = self.get_screen_size()
|
|
|
|
# Path to cliclick
|
|
self.cliclick = "cliclick"
|
|
self.key_conversion = {"Page_Down": "pagedown",
|
|
"Page_Up": "pageup",
|
|
"Super_L": "win",
|
|
"Escape": "esc"}
|
|
|
|
system = platform.system() # Detect platform
|
|
if system == "Windows":
|
|
screens = get_monitors()
|
|
sorted_screens = sorted(screens, key=lambda s: s.x)
|
|
if self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
screen = sorted_screens[self.selected_screen]
|
|
bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
|
|
|
|
elif system == "Darwin": # macOS
|
|
max_displays = 32 # Maximum number of displays to handle
|
|
active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
|
|
screens = []
|
|
for display_id in active_displays:
|
|
bounds = Quartz.CGDisplayBounds(display_id)
|
|
screens.append({
|
|
'id': display_id, 'x': int(bounds.origin.x), 'y': int(bounds.origin.y),
|
|
'width': int(bounds.size.width), 'height': int(bounds.size.height),
|
|
'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
|
|
})
|
|
sorted_screens = sorted(screens, key=lambda s: s['x'])
|
|
if self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
screen = sorted_screens[self.selected_screen]
|
|
bbox = (screen['x'], screen['y'], screen['x'] + screen['width'], screen['y'] + screen['height'])
|
|
else: # Linux or other OS
|
|
cmd = "xrandr | grep ' primary' | awk '{print $4}'"
|
|
try:
|
|
# output = subprocess.check_output(cmd, shell=True).decode()
|
|
# resolution = output.strip().split()[0]
|
|
# width, height = map(int, resolution.split('x'))
|
|
# bbox = (0, 0, width, height) # Assuming single primary screen for simplicity
|
|
screen = get_monitors()[0]
|
|
bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
|
|
except subprocess.CalledProcessError:
|
|
raise RuntimeError("Failed to get screen resolution on Linux.")
|
|
|
|
self.offset_x = screen['x'] if system == "Darwin" else screen.x
|
|
self.offset_y = screen['y'] if system == "Darwin" else screen.y
|
|
self.bbox = bbox
|
|
|
|
|
|
async def __call__(
|
|
self,
|
|
*,
|
|
action: Action,
|
|
text: str | None = None,
|
|
coordinate: tuple[int, int] | None = None,
|
|
**kwargs,
|
|
):
|
|
print(f"action: {action}, text: {text}, coordinate: {coordinate}, is_scaling: {self.is_scaling}")
|
|
if action in ("mouse_move", "left_click_drag"):
|
|
if coordinate is None:
|
|
raise ToolError(f"coordinate is required for {action}")
|
|
if text is not None:
|
|
raise ToolError(f"text is not accepted for {action}")
|
|
if not isinstance(coordinate, (list, tuple)) or len(coordinate) != 2:
|
|
raise ToolError(f"{coordinate} must be a tuple of length 2")
|
|
# if not all(isinstance(i, int) and i >= 0 for i in coordinate):
|
|
if not all(isinstance(i, int) for i in coordinate):
|
|
raise ToolError(f"{coordinate} must be a tuple of non-negative ints")
|
|
|
|
if self.is_scaling:
|
|
x, y = self.scale_coordinates(
|
|
ScalingSource.API, coordinate[0], coordinate[1]
|
|
)
|
|
else:
|
|
x, y = coordinate
|
|
|
|
# print(f"scaled_coordinates: {x}, {y}")
|
|
# print(f"offset: {self.offset_x}, {self.offset_y}")
|
|
|
|
# x += self.offset_x # TODO - check if this is needed
|
|
# y += self.offset_y
|
|
|
|
print(f"mouse move to {x}, {y}")
|
|
|
|
if action == "mouse_move":
|
|
pyautogui.moveTo(x, y)
|
|
return ToolResult(output=f"Moved mouse to ({x}, {y})")
|
|
elif action == "left_click_drag":
|
|
current_x, current_y = pyautogui.position()
|
|
pyautogui.dragTo(x, y, duration=0.5) # Adjust duration as needed
|
|
return ToolResult(output=f"Dragged mouse from ({current_x}, {current_y}) to ({x}, {y})")
|
|
|
|
if action in ("key", "type"):
|
|
if text is None:
|
|
raise ToolError(f"text is required for {action}")
|
|
if coordinate is not None:
|
|
raise ToolError(f"coordinate is not accepted for {action}")
|
|
if not isinstance(text, str):
|
|
raise ToolError(output=f"{text} must be a string")
|
|
|
|
if action == "key":
|
|
# Handle key combinations
|
|
keys = text.split('+')
|
|
for key in keys:
|
|
key = self.key_conversion.get(key.strip(), key.strip())
|
|
key = key.lower()
|
|
pyautogui.keyDown(key) # Press down each key
|
|
for key in reversed(keys):
|
|
key = self.key_conversion.get(key.strip(), key.strip())
|
|
key = key.lower()
|
|
pyautogui.keyUp(key) # Release each key in reverse order
|
|
return ToolResult(output=f"Pressed keys: {text}")
|
|
|
|
elif action == "type":
|
|
pyautogui.typewrite(text, interval=TYPING_DELAY_MS / 1000) # Convert ms to seconds
|
|
pyautogui.press('enter')
|
|
screenshot_base64 = (await self.screenshot()).base64_image
|
|
return ToolResult(output=text, base64_image=screenshot_base64)
|
|
|
|
if action in (
|
|
"left_click",
|
|
"right_click",
|
|
"double_click",
|
|
"middle_click",
|
|
"screenshot",
|
|
"cursor_position",
|
|
"left_press",
|
|
):
|
|
if text is not None:
|
|
raise ToolError(f"text is not accepted for {action}")
|
|
if coordinate is not None:
|
|
raise ToolError(f"coordinate is not accepted for {action}")
|
|
|
|
if action == "screenshot":
|
|
return await self.screenshot()
|
|
elif action == "cursor_position":
|
|
x, y = pyautogui.position()
|
|
x, y = self.scale_coordinates(ScalingSource.COMPUTER, x, y)
|
|
return ToolResult(output=f"X={x},Y={y}")
|
|
else:
|
|
if action == "left_click":
|
|
pyautogui.click()
|
|
elif action == "right_click":
|
|
pyautogui.rightClick()
|
|
elif action == "middle_click":
|
|
pyautogui.middleClick()
|
|
elif action == "double_click":
|
|
pyautogui.doubleClick()
|
|
elif action == "left_press":
|
|
pyautogui.mouseDown()
|
|
time.sleep(1)
|
|
pyautogui.mouseUp()
|
|
return ToolResult(output=f"Performed {action}")
|
|
|
|
raise ToolError(f"Invalid action: {action}")
|
|
|
|
async def screenshot(self):
|
|
|
|
import time
|
|
time.sleep(1)
|
|
|
|
"""Take a screenshot of the current screen and return a ToolResult with the base64 encoded image."""
|
|
output_dir = Path(OUTPUT_DIR)
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
path = output_dir / f"screenshot_{uuid4().hex}.png"
|
|
|
|
ImageGrab.grab = partial(ImageGrab.grab, all_screens=True)
|
|
|
|
# Detect platform
|
|
system = platform.system()
|
|
|
|
if system == "Windows":
|
|
# Windows: Use screeninfo to get monitor details
|
|
screens = get_monitors()
|
|
|
|
# Sort screens by x position to arrange from left to right
|
|
sorted_screens = sorted(screens, key=lambda s: s.x)
|
|
|
|
if self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
|
|
screen = sorted_screens[self.selected_screen]
|
|
bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
|
|
|
|
elif system == "Darwin": # macOS
|
|
# macOS: Use Quartz to get monitor details
|
|
max_displays = 32 # Maximum number of displays to handle
|
|
active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
|
|
|
|
# Get the display bounds (resolution) for each active display
|
|
screens = []
|
|
for display_id in active_displays:
|
|
bounds = Quartz.CGDisplayBounds(display_id)
|
|
screens.append({
|
|
'id': display_id,
|
|
'x': int(bounds.origin.x),
|
|
'y': int(bounds.origin.y),
|
|
'width': int(bounds.size.width),
|
|
'height': int(bounds.size.height),
|
|
'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
|
|
})
|
|
|
|
# Sort screens by x position to arrange from left to right
|
|
sorted_screens = sorted(screens, key=lambda s: s['x'])
|
|
|
|
if self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
|
|
screen = sorted_screens[self.selected_screen]
|
|
bbox = (screen['x'], screen['y'], screen['x'] + screen['width'], screen['y'] + screen['height'])
|
|
|
|
else: # Linux or other OS
|
|
cmd = "xrandr | grep ' primary' | awk '{print $4}'"
|
|
try:
|
|
# output = subprocess.check_output(cmd, shell=True).decode()
|
|
# resolution = output.strip().split()[0]
|
|
# width, height = map(int, resolution.split('x'))
|
|
# bbox = (0, 0, width, height) # Assuming single primary screen for simplicity
|
|
screen = get_monitors()[0]
|
|
bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
|
|
except subprocess.CalledProcessError:
|
|
raise RuntimeError("Failed to get screen resolution on Linux.")
|
|
|
|
# Take screenshot using the bounding box
|
|
screenshot = ImageGrab.grab(bbox=bbox)
|
|
|
|
# Set offsets (for potential future use)
|
|
self.offset_x = screen['x'] if system == "Darwin" else screen.x
|
|
self.offset_y = screen['y'] if system == "Darwin" else screen.y
|
|
|
|
print(f"target_dimension {self.target_dimension}")
|
|
|
|
if not hasattr(self, 'target_dimension'):
|
|
screenshot = self.padding_image(screenshot)
|
|
self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
|
|
|
|
# Resize if target_dimensions are specified
|
|
print(f"offset is {self.offset_x}, {self.offset_y}")
|
|
print(f"target_dimension is {self.target_dimension}")
|
|
screenshot = screenshot.resize((self.target_dimension["width"], self.target_dimension["height"]))
|
|
|
|
# Save the screenshot
|
|
screenshot.save(str(path))
|
|
|
|
if path.exists():
|
|
# Return a ToolResult instance instead of a dictionary
|
|
return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode())
|
|
|
|
raise ToolError(f"Failed to take screenshot: {path} does not exist.")
|
|
|
|
def padding_image(self, screenshot):
|
|
"""Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
|
|
_, height = screenshot.size
|
|
new_width = height * 16 // 10
|
|
|
|
padding_image = Image.new("RGB", (new_width, height), (255, 255, 255))
|
|
# padding to top left
|
|
padding_image.paste(screenshot, (0, 0))
|
|
return padding_image
|
|
|
|
async def shell(self, command: str, take_screenshot=True) -> ToolResult:
|
|
"""Run a shell command and return the output, error, and optionally a screenshot."""
|
|
_, stdout, stderr = await run(command)
|
|
base64_image = None
|
|
|
|
if take_screenshot:
|
|
# delay to let things settle before taking a screenshot
|
|
await asyncio.sleep(self._screenshot_delay)
|
|
base64_image = (await self.screenshot()).base64_image
|
|
|
|
return ToolResult(output=stdout, error=stderr, base64_image=base64_image)
|
|
|
|
def scale_coordinates(self, source: ScalingSource, x: int, y: int):
|
|
"""Scale coordinates to a target maximum resolution."""
|
|
if not self._scaling_enabled:
|
|
return x, y
|
|
ratio = self.width / self.height
|
|
target_dimension = None
|
|
|
|
for target_name, dimension in MAX_SCALING_TARGETS.items():
|
|
# allow some error in the aspect ratio - not ratios are exactly 16:9
|
|
if abs(dimension["width"] / dimension["height"] - ratio) < 0.02:
|
|
if dimension["width"] < self.width:
|
|
target_dimension = dimension
|
|
self.target_dimension = target_dimension
|
|
# print(f"target_dimension: {target_dimension}")
|
|
break
|
|
|
|
if target_dimension is None:
|
|
# TODO: currently we force the target to be WXGA (16:10), when it cannot find a match
|
|
target_dimension = MAX_SCALING_TARGETS["WXGA"]
|
|
self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
|
|
|
|
# should be less than 1
|
|
x_scaling_factor = target_dimension["width"] / self.width
|
|
y_scaling_factor = target_dimension["height"] / self.height
|
|
if source == ScalingSource.API:
|
|
if x > self.width or y > self.height:
|
|
raise ToolError(f"Coordinates {x}, {y} are out of bounds")
|
|
# scale up
|
|
return round(x / x_scaling_factor), round(y / y_scaling_factor)
|
|
# scale down
|
|
return round(x * x_scaling_factor), round(y * y_scaling_factor)
|
|
|
|
def get_screen_size(self):
|
|
if platform.system() == "Windows":
|
|
# Use screeninfo to get primary monitor on Windows
|
|
screens = get_monitors()
|
|
|
|
# Sort screens by x position to arrange from left to right
|
|
sorted_screens = sorted(screens, key=lambda s: s.x)
|
|
|
|
if self.selected_screen is None:
|
|
primary_monitor = next((m for m in get_monitors() if m.is_primary), None)
|
|
return primary_monitor.width, primary_monitor.height
|
|
elif self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
else:
|
|
screen = sorted_screens[self.selected_screen]
|
|
return screen.width, screen.height
|
|
|
|
elif platform.system() == "Darwin":
|
|
# macOS part using Quartz to get screen information
|
|
max_displays = 32 # Maximum number of displays to handle
|
|
active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
|
|
|
|
# Get the display bounds (resolution) for each active display
|
|
screens = []
|
|
for display_id in active_displays:
|
|
bounds = Quartz.CGDisplayBounds(display_id)
|
|
screens.append({
|
|
'id': display_id,
|
|
'x': int(bounds.origin.x),
|
|
'y': int(bounds.origin.y),
|
|
'width': int(bounds.size.width),
|
|
'height': int(bounds.size.height),
|
|
'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
|
|
})
|
|
|
|
# Sort screens by x position to arrange from left to right
|
|
sorted_screens = sorted(screens, key=lambda s: s['x'])
|
|
|
|
if self.selected_screen is None:
|
|
# Find the primary monitor
|
|
primary_monitor = next((screen for screen in screens if screen['is_primary']), None)
|
|
if primary_monitor:
|
|
return primary_monitor['width'], primary_monitor['height']
|
|
else:
|
|
raise RuntimeError("No primary monitor found.")
|
|
elif self.selected_screen < 0 or self.selected_screen >= len(screens):
|
|
raise IndexError("Invalid screen index.")
|
|
else:
|
|
# Return the resolution of the selected screen
|
|
screen = sorted_screens[self.selected_screen]
|
|
return screen['width'], screen['height']
|
|
|
|
else: # Linux or other OS
|
|
cmd = "xrandr | grep ' primary' | awk '{print $4}'"
|
|
try:
|
|
# output = subprocess.check_output(cmd, shell=True).decode()
|
|
# resolution = output.strip().split()[0]
|
|
# width, height = map(int, resolution.split('x'))
|
|
# return width, height
|
|
screen = get_monitors()[0]
|
|
return screen.width, screen.height
|
|
except subprocess.CalledProcessError:
|
|
raise RuntimeError("Failed to get screen resolution on Linux.")
|
|
|
|
def get_mouse_position(self):
|
|
# TODO: enhance this func
|
|
from AppKit import NSEvent
|
|
from Quartz import CGEventSourceCreate, kCGEventSourceStateCombinedSessionState
|
|
|
|
loc = NSEvent.mouseLocation()
|
|
# Adjust for different coordinate system
|
|
return int(loc.x), int(self.height - loc.y)
|
|
|
|
def map_keys(self, text: str):
|
|
"""Map text to cliclick key codes if necessary."""
|
|
# For simplicity, return text as is
|
|
# Implement mapping if special keys are needed
|
|
return text |