diff --git a/.gitignore b/.gitignore
index 2ad14a7..02eae0d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -6,3 +6,7 @@ weights/icon_detect_v1_5_2/
.gradio
__pycache__/
debug.ipynb
+util/__pycache__/
+index.html?linkid=2289031
+wget-log
+weights/omniv2/
\ No newline at end of file
diff --git a/Dockerfile b/Dockerfile
deleted file mode 100644
index 5737680..0000000
--- a/Dockerfile
+++ /dev/null
@@ -1,201 +0,0 @@
-# Dockerfile for OmniParser with GPU support and OpenGL libraries
-#
-# This Dockerfile is intended to create an environment with NVIDIA CUDA
-# support and the necessary dependencies to run the OmniParser project.
-# The configuration is designed to support applications that rely on
-# Python 3.12, OpenCV, Hugging Face transformers, and Gradio. Additionally,
-# it includes steps to pull large files from Git LFS and a script to
-# convert model weights from .safetensor to .pt format. The container
-# runs a Gradio server by default, exposed on port 7861.
-#
-# Base image: nvidia/cuda:12.3.1-devel-ubuntu22.04
-#
-# Key features:
-# - System dependencies for OpenGL to support graphical libraries.
-# - Miniconda for Python 3.12, allowing for environment management.
-# - Git Large File Storage (LFS) setup for handling large model files.
-# - Requirement file installation, including specific versions of
-# OpenCV and Hugging Face Hub.
-# - Entrypoint script execution with Gradio server configuration for
-# external access.
-
-FROM nvidia/cuda:12.3.1-devel-ubuntu22.04
-
-# Install system dependencies with explicit OpenGL libraries
-RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y \
- git \
- git-lfs \
- wget \
- libgl1 \
- libglib2.0-0 \
- libsm6 \
- libxext6 \
- libxrender1 \
- libglu1-mesa \
- libglib2.0-0 \
- libsm6 \
- libxrender1 \
- libxext6 \
- python3-opencv \
- && apt-get clean \
- && rm -rf /var/lib/apt/lists/* \
- && git lfs install
-
-# Install Miniconda for Python 3.12
-RUN wget https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -O miniconda.sh && \
- bash miniconda.sh -b -p /opt/conda && \
- rm miniconda.sh
-ENV PATH="/opt/conda/bin:$PATH"
-
-# Create and activate Conda environment with Python 3.12, and set it as the default
-RUN conda create -n omni python=3.12 && \
- echo "source activate omni" > ~/.bashrc
-ENV CONDA_DEFAULT_ENV=omni
-ENV PATH="/opt/conda/envs/omni/bin:$PATH"
-
-# Set the working directory in the container
-WORKDIR /usr/src/app
-
-# Copy project files and requirements
-COPY . .
-COPY requirements.txt /usr/src/app/requirements.txt
-
-# Initialize Git LFS and pull LFS files
-RUN git lfs install && \
- git lfs pull
-
-# Install dependencies from requirements.txt with specific opencv-python-headless version
-RUN . /opt/conda/etc/profile.d/conda.sh && conda activate omni && \
- pip uninstall -y opencv-python opencv-python-headless && \
- pip install --no-cache-dir opencv-python-headless==4.8.1.78 && \
- pip install -r requirements.txt && \
- pip install huggingface_hub
-
-# Run download.py to fetch model weights and convert safetensors to .pt format
-# RUN . /opt/conda/etc/profile.d/conda.sh && conda activate omni && \
-# python download.py && \
-# echo "Contents of weights directory:" && \
-# ls -lR weights && \
-# python weights/convert_safetensor_to_pt.py
-
-# Expose the default Gradio port
-EXPOSE 7861
-
-# Configure Gradio to be accessible externally
-ENV GRADIO_SERVER_NAME="0.0.0.0"
-
-# Copy and set permissions for entrypoint script
-# COPY entrypoint.sh /usr/src/app/entrypoint.sh
-# RUN chmod +x /usr/src/app/entrypoint.sh
-
-# To debug, keep the container running
-# CMD ["tail", "-f", "/dev/null"]
-
-################################################################################################
-# virtual display related setup --> from anthropic-quickstarts/computer-use-demo/Dockerfile
-
-ENV DEBIAN_FRONTEND=noninteractive
-ENV DEBIAN_PRIORITY=high
-
-RUN apt-get update && \
- apt-get -y upgrade && \
- apt-get -y install \
- # UI Requirements
- xvfb \
- xterm \
- xdotool \
- scrot \
- imagemagick \
- sudo \
- mutter \
- x11vnc \
- # Python/pyenv reqs
- build-essential \
- libssl-dev \
- zlib1g-dev \
- libbz2-dev \
- libreadline-dev \
- libsqlite3-dev \
- curl \
- git \
- libncursesw5-dev \
- xz-utils \
- tk-dev \
- libxml2-dev \
- libxmlsec1-dev \
- libffi-dev \
- liblzma-dev \
- # Network tools
- net-tools \
- netcat \
- # PPA req
- software-properties-common && \
- # Userland apps
- sudo add-apt-repository ppa:mozillateam/ppa && \
- sudo apt-get install -y --no-install-recommends \
- libreoffice \
- firefox-esr \
- x11-apps \
- xpdf \
- gedit \
- xpaint \
- tint2 \
- galculator \
- pcmanfm \
- unzip && \
- apt-get clean
-
-# Install noVNC
-RUN git clone --branch v1.5.0 https://github.com/novnc/noVNC.git /opt/noVNC && \
- git clone --branch v0.12.0 https://github.com/novnc/websockify /opt/noVNC/utils/websockify && \
- ln -s /opt/noVNC/vnc.html /opt/noVNC/index.html
-
-# setup user
-ENV USERNAME=computeruse
-ENV HOME=/home/$USERNAME
-RUN useradd -m -s /bin/bash -d $HOME $USERNAME
-RUN echo "${USERNAME} ALL=(ALL) NOPASSWD: ALL" >> /etc/sudoers
-USER computeruse
-WORKDIR $HOME
-
-# setup python
-RUN git clone https://github.com/pyenv/pyenv.git ~/.pyenv && \
- cd ~/.pyenv && src/configure && make -C src && cd .. && \
- echo 'export PYENV_ROOT="$HOME/.pyenv"' >> ~/.bashrc && \
- echo 'command -v pyenv >/dev/null || export PATH="$PYENV_ROOT/bin:$PATH"' >> ~/.bashrc && \
- echo 'eval "$(pyenv init -)"' >> ~/.bashrc
-ENV PYENV_ROOT="$HOME/.pyenv"
-ENV PATH="$PYENV_ROOT/bin:$PATH"
-ENV PYENV_VERSION_MAJOR=3
-ENV PYENV_VERSION_MINOR=11
-ENV PYENV_VERSION_PATCH=6
-ENV PYENV_VERSION=$PYENV_VERSION_MAJOR.$PYENV_VERSION_MINOR.$PYENV_VERSION_PATCH
-RUN eval "$(pyenv init -)" && \
- pyenv install $PYENV_VERSION && \
- pyenv global $PYENV_VERSION && \
- pyenv rehash
-
-ENV PATH="$HOME/.pyenv/shims:$HOME/.pyenv/bin:$PATH"
-
-RUN python -m pip install --upgrade pip==23.1.2 setuptools==58.0.4 wheel==0.40.0 && \
- python -m pip config set global.disable-pip-version-check true
-
-# only reinstall if requirements.txt changes
-# COPY --chown=$USERNAME:$USERNAME computer_use_demo/requirements.txt $HOME/computer_use_demo/requirements.txt
-# RUN python -m pip install -r $HOME/computer_use_demo/requirements.txt
-
-# setup desktop env & app
-# COPY --chown=$USERNAME:$USERNAME image/ $HOME
-# COPY --chown=$USERNAME:$USERNAME computer_use_demo/ $HOME/computer_use_demo/
-
-ARG DISPLAY_NUM=1
-ARG HEIGHT=768
-ARG WIDTH=1024
-ENV DISPLAY_NUM=$DISPLAY_NUM
-ENV HEIGHT=$HEIGHT
-ENV WIDTH=$WIDTH
-
-# Set the entrypoint
-# ENTRYPOINT ["/usr/src/app/entrypoint.sh"]
-
-# docker build . -t omniparser-x-demo:local # manually build the docker image (optional)
diff --git a/demo.ipynb b/demo.ipynb
index 7489fbe..4be80a2 100644
--- a/demo.ipynb
+++ b/demo.ipynb
@@ -14,7 +14,7 @@
}
],
"source": [
- "from utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
+ "from util.utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
"import torch\n",
"from ultralytics import YOLO\n",
"from PIL import Image\n",
@@ -48,9 +48,9 @@
"source": [
"# two choices for caption model: fine-tuned blip2 or florence2\n",
"import importlib\n",
- "import utils\n",
- "importlib.reload(utils)\n",
- "from utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
+ "import util.utils\n",
+ "importlib.reload(util.utils)\n",
+ "from util.utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
"# caption_model_processor = get_caption_model_processor(model_name=\"blip2\", model_name_or_path=\"weights/icon_caption_blip2\", device=device)\n",
"caption_model_processor = get_caption_model_processor(model_name=\"florence2\", model_name_or_path=\"weights/icon_caption_florence\", device=device)\n",
"\n"
@@ -102,9 +102,9 @@
"source": [
"# reload utils\n",
"import importlib\n",
- "import utils\n",
- "importlib.reload(utils)\n",
- "from utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
+ "import util.utils\n",
+ "importlib.reload(util.utils)\n",
+ "from util.utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
"\n",
"image_path = 'imgs/google_page.png'\n",
"image_path = 'imgs/windows_home.png'\n",
@@ -167,9 +167,9 @@
"# run on cpu!!!\n",
"# reload utils\n",
"import importlib\n",
- "import utils\n",
- "importlib.reload(utils)\n",
- "from utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
+ "import util.utils\n",
+ "importlib.reload(util.utils)\n",
+ "from util.utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model\n",
"\n",
"image_path = 'imgs/google_page.png'\n",
"image_path = 'imgs/windows_home.png'\n",
@@ -447,13 +447,6 @@
"source": [
"parsed_content_list[-1]"
]
- },
- {
- "cell_type": "code",
- "execution_count": null,
- "metadata": {},
- "outputs": [],
- "source": []
}
],
"metadata": {
diff --git a/demo/gui_agent/llm_utils/llm_utils.py b/demo/gui_agent/llm_utils/llm_utils.py
deleted file mode 100644
index fbc4a42..0000000
--- a/demo/gui_agent/llm_utils/llm_utils.py
+++ /dev/null
@@ -1,109 +0,0 @@
-import os
-import re
-import ast
-import base64
-
-
-def is_image_path(text):
- # Checking if the input text ends with typical image file extensions
- image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")
- if text.endswith(image_extensions):
- return True
- else:
- return False
-
-
-def encode_image(image_path):
- """Encode image file to base64."""
- with open(image_path, "rb") as image_file:
- return base64.b64encode(image_file.read()).decode("utf-8")
-
-
-def is_url_or_filepath(input_string):
- # Check if input_string is a URL
- url_pattern = re.compile(
- r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
- )
- if url_pattern.match(input_string):
- return "URL"
-
- # Check if input_string is a file path
- file_path = os.path.abspath(input_string)
- if os.path.exists(file_path):
- return "File path"
-
- return "Invalid"
-
-
-def extract_data(input_string, data_type):
- # Regular expression to extract content starting from '```python' until the end if there are no closing backticks
- pattern = f"```{data_type}" + r"(.*?)(```|$)"
- # Extract content
- # re.DOTALL allows '.' to match newlines as well
- matches = re.findall(pattern, input_string, re.DOTALL)
- # Return the first match if exists, trimming whitespace and ignoring potential closing backticks
- return matches[0][0].strip() if matches else input_string
-
-
-def parse_input(code):
- """Use AST to parse the input string and extract the function name, arguments, and keyword arguments."""
-
- def get_target_names(target):
- """Recursively get all variable names from the assignment target."""
- if isinstance(target, ast.Name):
- return [target.id]
- elif isinstance(target, ast.Tuple):
- names = []
- for elt in target.elts:
- names.extend(get_target_names(elt))
- return names
- return []
-
- def extract_value(node):
- """提取 AST 节点的实际值"""
- if isinstance(node, ast.Constant):
- return node.value
- elif isinstance(node, ast.Name):
- # TODO: a better way to handle variables
- raise ValueError(
- f"Arguments should be a Constant, got a variable {node.id} instead."
- )
- # 添加其他需要处理的 AST 节点类型
- return None
-
- try:
- tree = ast.parse(code)
- for node in ast.walk(tree):
- if isinstance(node, ast.Assign):
- targets = []
- for t in node.targets:
- targets.extend(get_target_names(t))
- if isinstance(node.value, ast.Call):
- func_name = node.value.func.id
- args = [ast.dump(arg) for arg in node.value.args]
- kwargs = {
- kw.arg: extract_value(kw.value) for kw in node.value.keywords
- }
- print(f"Input: {code.strip()}")
- print(f"Output Variables: {targets}")
- print(f"Function Name: {func_name}")
- print(f"Arguments: {args}")
- print(f"Keyword Arguments: {kwargs}")
- elif isinstance(node, ast.Expr) and isinstance(node.value, ast.Call):
- targets = []
- func_name = extract_value(node.value.func)
- args = [extract_value(arg) for arg in node.value.args]
- kwargs = {kw.arg: extract_value(kw.value) for kw in node.value.keywords}
-
- except SyntaxError:
- print(f"Input: {code.strip()}")
- print("No match found")
-
- return targets, func_name, args, kwargs
-
-
-if __name__ == "__main__":
- import json
- s='{"Reasoning": "The Docker icon has been successfully clicked, and the Docker application should now be opening. No further actions are required.", "Next Action": None}'
- json_str = json.loads(s)
- print(json_str)
\ No newline at end of file
diff --git a/demo/gui_agent/llm_utils/oai.py b/demo/gui_agent/llm_utils/oai.py
deleted file mode 100644
index ad1d54e..0000000
--- a/demo/gui_agent/llm_utils/oai.py
+++ /dev/null
@@ -1,117 +0,0 @@
-
-import os
-import logging
-import base64
-import requests
-
-# from computer_use_demo.gui_agent.llm_utils import is_image_path, encode_image
-
-def is_image_path(text):
- # Checking if the input text ends with typical image file extensions
- image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")
- if text.endswith(image_extensions):
- return True
- else:
- return False
-
-def encode_image(image_path):
- """Encode image file to base64."""
- with open(image_path, "rb") as image_file:
- return base64.b64encode(image_file.read()).decode("utf-8")
-
-
-
-# from openai import OpenAI
-# client = OpenAI(
-# api_key=os.environ.get("OPENAI_API_KEY")
-# )
-
-
-def run_oai_interleaved(messages: list, system: str, llm: str, api_key: str, max_tokens=256, temperature=0):
-
- api_key = api_key or os.environ.get("OPENAI_API_KEY")
- if not api_key:
- raise ValueError("OPENAI_API_KEY is not set")
-
- headers = {"Content-Type": "application/json",
- "Authorization": f"Bearer {api_key}"}
-
- final_messages = [{"role": "system", "content": system}]
-
- # image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
- if type(messages) == list:
- for item in messages:
- contents = []
- if isinstance(item, dict):
- for cnt in item["content"]:
- if isinstance(cnt, str):
- if is_image_path(cnt):
- base64_image = encode_image(cnt)
- content = {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
- else:
- content = {"type": "text", "text": cnt}
- else:
- # in this case it is a text block from anthropic
- content = {"type": "text", "text": str(cnt)}
-
- contents.append(content)
-
- message = {"role": 'user', "content": contents}
- else: # str
- contents.append({"type": "text", "text": item})
- message = {"role": "user", "content": contents}
-
- final_messages.append(message)
-
-
- elif isinstance(messages, str):
- final_messages = [{"role": "user", "content": messages}]
- # import pdb; pdb.set_trace()
-
- print("[oai] sending messages:", {"role": "user", "content": messages})
-
- payload = {
- "model": llm,
- "messages": final_messages,
- "max_tokens": max_tokens,
- "temperature": temperature,
- # "stop": stop,
- }
-
- # from IPython.core.debugger import Pdb; Pdb().set_trace()
-
- response = requests.post(
- "https://api.openai.com/v1/chat/completions", headers=headers, json=payload
- )
-
- try:
- text = response.json()['choices'][0]['message']['content']
- token_usage = int(response.json()['usage']['total_tokens'])
- return text, token_usage
-
- # return error message if the response is not successful
- except Exception as e:
- print(f"Error in interleaved openAI: {e}. This may due to your invalid OPENAI_API_KEY. Please check the response: {response.json()} ")
- return response.json()
-
-
-if __name__ == "__main__":
-
- api_key = os.environ.get("OPENAI_API_KEY")
- if not api_key:
- raise ValueError("OPENAI_API_KEY is not set")
-
- text, token_usage = run_oai_interleaved(
- messages= [{"content": [
- "What is in the screenshot?",
- "./tmp/outputs/screenshot_0b04acbb783d4706bc93873d17ba8c05.png"],
- "role": "user"
- }],
- llm="gpt-4o-mini",
- system="You are a helpful assistant",
- api_key=api_key,
- max_tokens=256,
- temperature=0)
-
- print(text, token_usage)
- # There is an introduction describing the Calyx... 36986
diff --git a/demo/gui_agent/llm_utils/qwen.py b/demo/gui_agent/llm_utils/qwen.py
deleted file mode 100644
index 2d3e6e7..0000000
--- a/demo/gui_agent/llm_utils/qwen.py
+++ /dev/null
@@ -1,107 +0,0 @@
-
-import os
-import logging
-import base64
-import requests
-
-import dashscope
-# from computer_use_demo.gui_agent.llm_utils import is_image_path, encode_image
-
-def is_image_path(text):
- return False
-
-def encode_image(image_path):
- return ""
-
-
-def run_qwen(messages: list, system: str, llm: str, api_key: str, max_tokens=256, temperature=0):
-
- api_key = api_key or os.environ.get("QWEN_API_KEY")
- if not api_key:
- raise ValueError("QWEN_API_KEY is not set")
-
- dashscope.api_key = api_key
-
- # from IPython.core.debugger import Pdb; Pdb().set_trace()
-
- final_messages = [{"role": "system", "content": [{"text": system}]}]
- # image_url = "https://upload.wikimedia.org/wikipedia/commons/thumb/d/dd/Gfp-wisconsin-madison-the-nature-boardwalk.jpg/2560px-Gfp-wisconsin-madison-the-nature-boardwalk.jpg"
- if type(messages) == list:
- for item in messages:
- contents = []
- if isinstance(item, dict):
- for cnt in item["content"]:
- if isinstance(cnt, str):
- if is_image_path(cnt):
- # base64_image = encode_image(cnt)
- content = [{"image": cnt}]
- # content = {"type": "image_url", "image_url": {"url": image_url}}
- else:
- content = {"text": cnt}
- contents.append(content)
-
- message = {"role": item["role"], "content": contents}
- else: # str
- contents.append({"text": item})
- message = {"role": "user", "content": contents}
-
- final_messages.append(message)
-
- print("[qwen-vl] sending messages:", final_messages)
-
- response = dashscope.MultiModalConversation.call(
- model='qwen-vl-max-0809',
- messages=final_messages
- )
-
- # from IPython.core.debugger import Pdb; Pdb().set_trace()
-
- try:
- text = response.output.choices[0].message.content[0]['text']
- usage = response.usage
-
- if "total_tokens" not in usage:
- token_usage = int(usage["input_tokens"] + usage["output_tokens"])
- else:
- token_usage = int(usage["total_tokens"])
-
- return text, token_usage
- # return response.json()['choices'][0]['message']['content']
- # return error message if the response is not successful
- except Exception as e:
- print(f"Error in interleaved openAI: {e}. This may due to your invalid OPENAI_API_KEY. Please check the response: {response.json()} ")
- return response.json()
-
-
-
-if __name__ == "__main__":
- api_key = os.environ.get("QWEN_API_KEY")
- if not api_key:
- raise ValueError("QWEN_API_KEY is not set")
-
- dashscope.api_key = api_key
-
- final_messages = [{"role": "user",
- "content": [
- {"text": "What is in the screenshot?"},
- {"image": "./tmp/outputs/screenshot_0b04acbb783d4706bc93873d17ba8c05.png"}
- ]
- }
- ]
- response = dashscope.MultiModalConversation.call(model='qwen-vl-max-0809', messages=final_messages)
-
- print(response)
-
- text = response.output.choices[0].message.content[0]['text']
- usage = response.usage
-
- if "total_tokens" not in usage:
- if "image_tokens" in usage:
- token_usage = usage["input_tokens"] + usage["output_tokens"] + usage["image_tokens"]
- else:
- token_usage = usage["input_tokens"] + usage["output_tokens"]
- else:
- token_usage = usage["total_tokens"]
-
- print(text, token_usage)
- # The screenshot is from a video game... 1387
\ No newline at end of file
diff --git a/demo/gui_agent/llm_utils/run_llm.py b/demo/gui_agent/llm_utils/run_llm.py
deleted file mode 100644
index bc207dd..0000000
--- a/demo/gui_agent/llm_utils/run_llm.py
+++ /dev/null
@@ -1,44 +0,0 @@
-import base64
-import logging
-from .oai import run_oai_interleaved
-from .gemini import run_gemini_interleaved
-
-def run_llm(prompt, llm="gpt-4o-mini", max_tokens=256, temperature=0, stop=None):
- log_prompt(prompt)
-
- # turn string prompt into list
- if isinstance(prompt, str):
- prompt = [prompt]
- elif isinstance(prompt, list):
- pass
- else:
- raise ValueError(f"Invalid prompt type: {type(prompt)}")
-
- if llm.startswith("gpt"): # gpt series
- out = run_oai_interleaved(
- prompt,
- llm,
- max_tokens,
- temperature,
- stop
- )
- elif llm.startswith("gemini"): # gemini series
- out = run_gemini_interleaved(
- prompt,
- llm,
- max_tokens,
- temperature,
- stop
- )
- else:
- raise ValueError(f"Invalid llm: {llm}")
- logging.info(
- f"========Output for {llm}=======\n{out}\n============================")
- return out
-
-def log_prompt(prompt):
- prompt_display = [prompt] if isinstance(prompt, str) else prompt
- prompt_display = "\n\n".join(prompt_display)
- logging.info(
- f"========Prompt=======\n{prompt_display}\n============================")
-
\ No newline at end of file
diff --git a/demo/loop.py b/demo/loop.py
deleted file mode 100644
index 3d506c2..0000000
--- a/demo/loop.py
+++ /dev/null
@@ -1,236 +0,0 @@
-"""
-Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
-"""
-import time
-import json
-import asyncio
-import platform
-from collections.abc import Callable
-from datetime import datetime
-from enum import StrEnum
-from typing import Any, cast, Dict
-
-from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
-from anthropic.types import (
- ToolResultBlockParam,
- TextBlock,
-)
-from anthropic.types.beta import (
- BetaContentBlock,
- BetaContentBlockParam,
- BetaImageBlockParam,
- BetaMessage,
- BetaMessageParam,
- BetaTextBlockParam,
- BetaToolResultBlockParam,
-)
-from tools import BashTool, ComputerTool, EditTool, ToolCollection, ToolResult
-
-import torch
-
-from gui_agent.anthropic_agent import AnthropicActor
-from executor.anthropic_executor import AnthropicExecutor
-from omniparser_agent.vlm_agent import OmniParser, VLMAgent
-from tools.colorful_text import colorful_text_showui, colorful_text_vlm
-from tools.screen_capture import get_screenshot
-from gui_agent.llm_utils.oai import encode_image
-
-
-BETA_FLAG = "computer-use-2024-10-22"
-
-
-class APIProvider(StrEnum):
- ANTHROPIC = "anthropic"
- BEDROCK = "bedrock"
- VERTEX = "vertex"
- OPENAI = "openai"
- QWEN = "qwen"
-
-
-PROVIDER_TO_DEFAULT_MODEL_NAME: dict[APIProvider, str] = {
- APIProvider.ANTHROPIC: "claude-3-5-sonnet-20241022",
- APIProvider.BEDROCK: "anthropic.claude-3-5-sonnet-20241022-v2:0",
- APIProvider.VERTEX: "claude-3-5-sonnet-v2@20241022",
- # APIProvider.OPENAI: "gpt-4o",
- # APIProvider.QWEN: "qwen2vl",
-}
-
-
-# This system prompt is optimized for the Docker environment in this repository and
-# specific tool combinations enabled.
-# We encourage modifying this system prompt to ensure the model has context for the
-# environment it is running in, and to provide any additional information that may be
-# helpful for the task at hand.
-SYSTEM_PROMPT = f"""
-* You are utilizing a Windows system with internet access.
-* The current date is {datetime.today().strftime('%A, %B %d, %Y')}.
-
-"""
-
-import base64
-from PIL import Image
-from io import BytesIO
-
-def sampling_loop_sync(
- *,
- model: str,
- provider: APIProvider | None,
- system_prompt_suffix: str,
- messages: list[BetaMessageParam],
- output_callback: Callable[[BetaContentBlock], None],
- tool_output_callback: Callable[[ToolResult, str], None],
- api_response_callback: Callable[[APIResponse[BetaMessage]], None],
- api_key: str,
- only_n_most_recent_images: int | None = 2,
- max_tokens: int = 4096,
- selected_screen: int = 0
-):
- """
- Synchronous agentic sampling loop for the assistant/tool interaction of computer use.
- """
- print('in sampling_loop_sync, model:', model)
- if model == "claude-3-5-sonnet-20241022":
- omniparser = OmniParser(url="http://127.0.0.1:8000/send_text/",
- selected_screen=selected_screen,)
-
- # Register Actor and Executor
- actor = AnthropicActor(
- model=model,
- provider=provider,
- system_prompt_suffix=system_prompt_suffix,
- api_key=api_key,
- api_response_callback=api_response_callback,
- max_tokens=max_tokens,
- only_n_most_recent_images=only_n_most_recent_images,
- selected_screen=selected_screen
- )
-
- # from IPython.core.debugger import Pdb; Pdb().set_trace()
- executor = AnthropicExecutor(
- output_callback=output_callback,
- tool_output_callback=tool_output_callback,
- selected_screen=selected_screen
- )
-
- elif model == "omniparser + gpt-4o" or model == "omniparser + phi35v":
- omniparser = OmniParser(url="http://127.0.0.1:8000/send_text/",
- selected_screen=selected_screen,)
-
- actor = VLMAgent(
- model=model,
- provider=provider,
- system_prompt_suffix=system_prompt_suffix,
- api_key=api_key,
- api_response_callback=api_response_callback,
- selected_screen=selected_screen,
- output_callback=output_callback,
- )
-
- executor = AnthropicExecutor(
- output_callback=output_callback,
- tool_output_callback=tool_output_callback,
- selected_screen=selected_screen
- )
-
- # elif model == "gpt-4o + ShowUI" or model == "qwen2vl + ShowUI":
- # planner = VLMPlanner(
- # model=model,
- # provider=provider,
- # system_prompt_suffix=system_prompt_suffix,
- # api_key=api_key,
- # api_response_callback=api_response_callback,
- # selected_screen=selected_screen,
- # output_callback=output_callback,
- # )
-
- # if torch.cuda.is_available(): device = torch.device("cuda")
- # elif torch.backends.mps.is_available(): device = torch.device("mps")
- # else: device = torch.device("cpu") # support: 'cpu', 'mps', 'cuda'
- # print(f"showUI-2B inited on device: {device}.")
-
- # actor = ShowUIActor(
- # model_path="./showui-2b/",
- # # Replace with your local path, e.g., "C:\\code\\ShowUI-2B", "/Users/your_username/ShowUI-2B/".
- # device=device,
- # split='web', # 'web' or 'phone'
- # selected_screen=selected_screen,
- # output_callback=output_callback,
- # )
-
- # executor = ShowUIExecutor(
- # output_callback=output_callback,
- # tool_output_callback=tool_output_callback,
- # selected_screen=selected_screen
- # )
-
- else:
- raise ValueError(f"Model {model} not supported")
- print(f"Model Inited: {model}, Provider: {provider}")
-
- tool_result_content = None
-
- print(f"Start the message loop. User messages: {messages}")
-
- if model == "claude-3-5-sonnet-20241022": # Anthropic loop
- while True:
- parsed_screen = omniparser() # parsed_screen: {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, "screen_info"}
- import pdb; pdb.set_trace()
- screen_info_block = TextBlock(text='Below is the structured accessibility information of the current UI screen, which includes text and icons you can operate on, take these information into account when you are making the prediction for the next action. Note you will still need to take screenshot to get the image: \n' + parsed_screen['screen_info'], type='text')
- # # messages[-1]['content'].append(screen_info_block)
- screen_info_dict = {"role": "user", "content": [screen_info_block]}
- messages.append(screen_info_dict)
- response = actor(messages=messages)
-
- for message, tool_result_content in executor(response, messages):
- yield message
-
- if not tool_result_content:
- return messages
-
- messages.append({"content": tool_result_content, "role": "user"})
-
- elif model == "omniparser + gpt-4o" or model == "omniparser + phi35v":
- while True:
- parsed_screen = omniparser()
- response, vlm_response_json = actor(messages=messages, parsed_screen=parsed_screen)
-
- for message, tool_result_content in executor(response, messages):
- yield message
-
- if not tool_result_content:
- return messages
-
- # import pdb; pdb.set_trace()
- # messages.append({"role": "user",
- # "content": ["History plan:\n" + str(vlm_response_json['Reasoning'])]})
-
- # messages.append({"content": tool_result_content, "role": "user"})
-
- elif model == "gpt-4o + ShowUI" or model == "qwen2vl + ShowUI": # ShowUI loop
- while True:
- vlm_response = planner(messages=messages)
-
- next_action = json.loads(vlm_response).get("Next Action")
- yield next_action
-
- if next_action == None or next_action == "" or next_action == "None":
- final_sc, final_sc_path = get_screenshot(selected_screen=selected_screen)
- output_callback(f'No more actions from {colorful_text_vlm}. End of task. Final State:\n',
- sender="bot")
- yield None
-
- output_callback(f"{colorful_text_vlm} sending action to {colorful_text_showui}:\n{next_action}", sender="bot")
-
- actor_response = actor(messages=next_action)
- yield actor_response
-
- for message, tool_result_content in executor(actor_response, messages):
- time.sleep(1)
- yield message
-
- # since showui executor has no feedback for now, we use "actor_response" to represent its response
- # update messages for the next loop
- messages.append({"role": "user",
- "content": ["History plan:\n" + str(json.loads(vlm_response)) +
- "\nHistory actions:\n" + str(actor_response["content"])]})
- print(f"End of loop. Messages: {str(messages)[:100000]}. Total cost: $USD{planner.total_cost:.5f}")
\ No newline at end of file
diff --git a/demo/omniparser_agent/vlm_agent.py b/demo/omniparser_agent/vlm_agent.py
deleted file mode 100644
index 53ba46a..0000000
--- a/demo/omniparser_agent/vlm_agent.py
+++ /dev/null
@@ -1,417 +0,0 @@
-import json
-import asyncio
-import platform
-from collections.abc import Callable
-from datetime import datetime
-from enum import StrEnum
-from typing import Any, cast, Dict, Callable
-import uuid
-import requests
-from PIL import Image, ImageDraw
-import base64
-from io import BytesIO
-
-from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
-from anthropic.types import TextBlock, ToolResultBlockParam
-from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock, BetaMessageParam, BetaUsage
-
-from tools.screen_capture import get_screenshot
-from gui_agent.llm_utils.oai import run_oai_interleaved, encode_image
-from gui_agent.llm_utils.qwen import run_qwen
-from gui_agent.llm_utils.llm_utils import extract_data
-from tools.colorful_text import colorful_text_showui, colorful_text_vlm
-
-
-SYSTEM_PROMPT = f"""
-* You are utilizing a Windows system with internet access.
-* The current date is {datetime.today().strftime('%A, %B %d, %Y')}.
-
-"""
-
-
-class OmniParser:
- def __init__(self,
- url: str,
- selected_screen: int = 0) -> None:
- self.url = url
- self.selected_screen = selected_screen
-
- def __call__(self,):
- screenshot, screenshot_path = get_screenshot(selected_screen=self.selected_screen)
- screenshot_path = str(screenshot_path)
- image_base64 = encode_image(screenshot_path)
-
- # response = requests.post(self.url, json={"base64_image": image_base64, 'prompt': 'omniparser process'})
- # response_json = response.json()
- # example response_json: {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, "latency": 0.1}
- # Debug
- response_json = {"som_image_base64": image_base64, "parsed_content_list": ['debug1', 'debug2'], "latency": 0.1}
- print('omniparser latency:', response_json['latency'])
- response_json = self.reformat_messages(response_json)
- return response_json
-
- def reformat_messages(self, response_json: dict):
- parsed_content_list = response_json["parsed_content_list"]
- screen_info = ""
- # Debug
- # for idx, element in enumerate(parsed_content_list):
- # element['idx'] = idx
- # if element['type'] == 'text':
- # # screen_info += f'''
\n'''
- # screen_info += f'ID: {idx}, Text: {element["content"]}\n'
- # elif element['type'] == 'icon':
- # # screen_info += f''' \n'''
- # screen_info += f'ID: {idx}, Icon: {element["content"]}\n'
- response_json['screen_info'] = screen_info
- return response_json
-
-
-
-class VLMAgent:
- def __init__(
- self,
- model: str,
- provider: str,
- system_prompt_suffix: str,
- api_key: str,
- output_callback: Callable,
- api_response_callback: Callable,
- max_tokens: int = 4096,
- only_n_most_recent_images: int | None = None,
- selected_screen: int = 0,
- print_usage: bool = True,
- ):
- if model == "gpt-4o + ShowUI":
- self.model = "gpt-4o-2024-11-20"
- elif model == "gpt-4o-mini + ShowUI":
- self.model = "gpt-4o-mini" # "gpt-4o-mini"
- elif model == "qwen2vl + ShowUI":
- self.model = "qwen2vl"
- elif model == "omniparser + gpt-4o":
- self.model = "gpt-4o-2024-11-20"
- else:
- raise ValueError(f"Model {model} not supported")
-
- self.provider = provider
- self.system_prompt_suffix = system_prompt_suffix
- self.api_key = api_key
- self.api_response_callback = api_response_callback
- self.max_tokens = max_tokens
- self.only_n_most_recent_images = only_n_most_recent_images
- self.selected_screen = selected_screen
- self.output_callback = output_callback
-
- self.print_usage = print_usage
- self.total_token_usage = 0
- self.total_cost = 0
-
- self.system = (
- # f"{SYSTEM_PROMPT}{' ' + system_prompt_suffix if system_prompt_suffix else ''}"
- f"{system_prompt_suffix}"
- )
-
- def __call__(self, messages: list, parsed_screen: list[str, list]):
- # example parsed_screen: {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, "screen_info"}
- screen_info = parsed_screen["screen_info"]
- # drop looping actions msg, byte image etc
- planner_messages = messages
- # planner_messages = _message_filter_callback(messages)
-
- print(f"filtered_messages: {planner_messages}\n\n", "full messages:", messages)
- # import pdb; pdb.set_trace()
- planner_messages = _keep_latest_images(planner_messages)
- # if self.only_n_most_recent_images:
- # _maybe_filter_to_n_most_recent_images(planner_messages, self.only_n_most_recent_images)
-
- system = self._get_system_prompt(screen_info) + self.system_prompt_suffix
-
- # Take a screenshot
- screenshot, screenshot_path = get_screenshot(selected_screen=self.selected_screen)
- screen_width, screen_height = screenshot.size
- screenshot_path = str(screenshot_path)
- image_base64 = encode_image(screenshot_path)
-
- som_image_data = base64.b64decode(parsed_screen['som_image_base64'])
- som_screenshot_path = f"./tmp/outputs/screenshot_som_{uuid.uuid4().hex}.png"
- with open(som_screenshot_path, "wb") as f:
- f.write(som_image_data)
-
- self.output_callback(f'Screenshot for {colorful_text_vlm}:\n',
- sender="bot")
- self.output_callback(f'Set of Marks Screenshot for {colorful_text_vlm}:\n', sender="bot")
-
-
- if isinstance(planner_messages[-1], dict):
- if not isinstance(planner_messages[-1]["content"], list):
- planner_messages[-1]["content"] = [planner_messages[-1]["content"]]
- planner_messages[-1]["content"].append(screenshot_path)
- planner_messages[-1]["content"].append(som_screenshot_path)
-
- print(f"Sending messages to VLMPlanner : {planner_messages}")
-
- if "gpt" in self.model:
- vlm_response, token_usage = run_oai_interleaved(
- messages=planner_messages,
- system=system,
- llm=self.model,
- api_key=self.api_key,
- max_tokens=self.max_tokens,
- temperature=0,
- )
- print(f"oai token usage: {token_usage}")
- self.total_token_usage += token_usage
- self.total_cost += (token_usage * 0.15 / 1000000) # https://openai.com/api/pricing/
-
- elif "qwen" in self.model:
- vlm_response, token_usage = run_qwen(
- messages=planner_messages,
- system=system,
- llm=self.model,
- api_key=self.api_key,
- max_tokens=self.max_tokens,
- temperature=0,
- )
- print(f"qwen token usage: {token_usage}")
- self.total_token_usage += token_usage
- self.total_cost += (token_usage * 0.02 / 7.25 / 1000) # 1USD=7.25CNY, https://help.aliyun.com/zh/dashscope/developer-reference/tongyi-qianwen-vl-plus-api
- elif "phi" in self.model:
- pass # TODO
- else:
- raise ValueError(f"Model {self.model} not supported")
-
- print(f"VLMPlanner response: {vlm_response}")
-
- if self.print_usage:
- print(f"VLMPlanner total token usage so far: {self.total_token_usage}. Total cost so far: $USD{self.total_cost:.5f}")
-
- vlm_response_json = extract_data(vlm_response, "json")
- vlm_response_json = json.loads(vlm_response_json)
-
- # map "box_id" to "idx" in parsed_screen, and output the xy coordinate of bbox
- # TODO add try except for the case when "box_id" is not in the response
- # if 'Box ID' in vlm_response_json:
- try:
- bbox = parsed_screen["parsed_content_list"][int(vlm_response_json["Box ID"])]["bbox"]
- vlm_response_json["coordinate"] = [int((bbox[0] + bbox[2]) / 2 * screen_width), int((bbox[1] + bbox[3]) / 2 * screen_height)]
- # draw a circle on the screenshot image to indicate the action
- self.draw_action(vlm_response_json, image_base64)
- except:
- print("No Box ID in the response.")
-
-
- # vlm_plan_str = '\n'.join([f'{key}: {value}' for key, value in json.loads(response).items()])
- vlm_plan_str = ""
- for key, value in vlm_response_json.items():
- if key == "Reasoning":
- vlm_plan_str += f'{value}'
- else:
- vlm_plan_str += f'\n{key}: {value}'
-
- # self.output_callback(f"{colorful_text_vlm}:\n{vlm_plan_str}", sender="bot")
-
- # construct the response so that anthropicExcutor can execute the tool
- analysis = BetaTextBlock(text=vlm_plan_str, type='text')
- if 'coordinate' in vlm_response_json:
- move_cursor_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
- input={'action': 'mouse_move', 'coordinate': vlm_response_json["coordinate"]},
- name='computer', type='tool_use')
- response_content = [analysis, move_cursor_block]
- else:
- response_content = [analysis]
- if vlm_response_json["Next Action"] == "type":
- click_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}', input={'action': 'left_click'}, name='computer', type='tool_use')
- sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
- input={'action': vlm_response_json["Next Action"], 'text': vlm_response_json["value"]},
- name='computer', type='tool_use')
- response_content.extend([click_block, sim_content_block])
- elif vlm_response_json["Next Action"] == "None":
- print("Task paused/completed.")
- else:
- sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
- input={'action': vlm_response_json["Next Action"]},
- name='computer', type='tool_use')
- response_content.append(sim_content_block)
-
- response = BetaMessage(id=f'toolu_{uuid.uuid4()}', content=response_content, model='', role='assistant', type='message', stop_reason='tool_use', usage=BetaUsage(input_tokens=0, output_tokens=0))
- return response, vlm_response_json
-
-
- def _api_response_callback(self, response: APIResponse):
- self.api_response_callback(response)
-
-
- def reformat_messages(self, messages: list):
- pass
-
- def _get_system_prompt(self, screen_info: str = ""):
- datetime_str = datetime.now().strftime("%A, %B %d, %Y")
- os_name = platform.system()
- return f"""
-You are using an {os_name} device.
-You are able to use a mouse and keyboard to interact with the computer based on the given task and screenshot.
-You can only interact with the desktop GUI (no terminal or application menu access).
-
-You may be given some history plan and actions, this is the response from the previous loop.
-You should carefully consider your plan base on the task, screenshot, and history actions.
-
-Here is the list of all detected bounding boxes by IDs on the screen and their description:{screen_info}
-
-Your available "Next Action" only include:
-- type: type a string of text.
-- left_click: Describe the ui element to be clicked.
-- enter: Press an enter key.
-- escape: Press an ESCAPE key.
-- hover: Describe the ui element to be hovered.
-- scroll: Scroll the screen, you must specify up or down.
-- press: Describe the ui element to be pressed.
-
-Based on the visual information from the screenshot image and the detected bounding boxes, please determine the next action, the Box ID you should operate on, and the value (if the action is 'type') in order to complete the task.
-
-Output format:
-```json
-{{
- "Reasoning": str, # describe what is in the current screen, taking into account the history, then describe your step-by-step thoughts on how to achieve the task, choose one action from available actions at a time.
- "Next Action": "action_type, action description" | "None" # one action at a time, describe it in short and precisely.
- 'Box ID': n,
- 'value': "xxx" # if the action is type, you should provide the text to type.
-}}
-```
-
-One Example:
-```json
-{{
- "Reasoning": "The current screen shows google result of amazon, in previous action I have searched amazon on google. Then I need to click on the first search results to go to amazon.com.",
- "Next Action": "left_click",
- 'Box ID': m,
-}}
-```
-
-Another Example:
-```json
-{{
- "Reasoning": "The current screen shows the front page of amazon. There is no previous action. Therefore I need to type "Apple watch" in the search bar.",
- "Next Action": "type",
- 'Box ID': n,
- 'value': "Apple watch"
-}}
-```
-
-IMPORTANT NOTES:
-1. You should only give a single action at a time.
-2. You should give an analysis to the current screen, and reflect on what has been done by looking at the history, then describe your step-by-step thoughts on how to achieve the task.
-3. Attach the next action prediction in the "Next Action".
-4. You should not include other actions, such as keyboard shortcuts.
-5. When the task is completed, you should say "Next Action": "None" in the json field.
-"""
- def draw_action(self, vlm_response_json, image_base64):
- # draw a circle using the coordinate in parsed_screen['som_image_base64']
- image_data = base64.b64decode(image_base64)
- image = Image.open(BytesIO(image_data))
-
- draw = ImageDraw.Draw(image)
- x, y = vlm_response_json["coordinate"]
- radius = 10
- draw.ellipse((x - radius, y - radius, x + radius, y + radius), outline='red', width=3)
- buffered = BytesIO()
- image.save('demo.png')
- image.save(buffered, format="PNG")
- image_with_circle_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
- self.output_callback(f'Action performed on the Screenshot (red circle), for {colorful_text_vlm}:\n', sender="bot")
-
-
-def _keep_latest_images(messages):
- for i in range(len(messages)-1):
- if isinstance(messages[i]["content"], list):
- for cnt in messages[i]["content"]:
- if isinstance(cnt, str):
- if cnt.endswith((".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")):
- messages[i]["content"].remove(cnt)
- return messages
-
-
-def _maybe_filter_to_n_most_recent_images(
- messages: list[BetaMessageParam],
- images_to_keep: int,
- min_removal_threshold: int = 10,
-):
- """
- With the assumption that images are screenshots that are of diminishing value as
- the conversation progresses, remove all but the final `images_to_keep` tool_result
- images in place, with a chunk of min_removal_threshold to reduce the amount we
- break the implicit prompt cache.
- """
- if images_to_keep is None:
- return messages
-
- tool_result_blocks = cast(
- list[ToolResultBlockParam],
- [
- item
- for message in messages
- for item in (
- message["content"] if isinstance(message["content"], list) else []
- )
- if isinstance(item, dict) and item.get("type") == "tool_result"
- ],
- )
-
- total_images = sum(
- 1
- for tool_result in tool_result_blocks
- for content in tool_result.get("content", [])
- if isinstance(content, dict) and content.get("type") == "image"
- )
-
- images_to_remove = total_images - images_to_keep
- # for better cache behavior, we want to remove in chunks
- images_to_remove -= images_to_remove % min_removal_threshold
-
- for tool_result in tool_result_blocks:
- if isinstance(tool_result.get("content"), list):
- new_content = []
- for content in tool_result.get("content", []):
- if isinstance(content, dict) and content.get("type") == "image":
- if images_to_remove > 0:
- images_to_remove -= 1
- continue
- new_content.append(content)
- tool_result["content"] = new_content
-
-
-def _message_filter_callback(messages):
- filtered_list = []
- try:
- for msg in messages:
- if msg.get('role') in ['user']:
- if not isinstance(msg["content"], list):
- msg["content"] = [msg["content"]]
- if isinstance(msg["content"][0], TextBlock):
- filtered_list.append(str(msg["content"][0].text)) # User message
- elif isinstance(msg["content"][0], str):
- filtered_list.append(msg["content"][0]) # User message
- else:
- print("[_message_filter_callback]: drop message", msg)
- continue
-
- # elif msg.get('role') in ['assistant']:
- # if isinstance(msg["content"][0], TextBlock):
- # msg["content"][0] = str(msg["content"][0].text)
- # elif isinstance(msg["content"][0], BetaTextBlock):
- # msg["content"][0] = str(msg["content"][0].text)
- # elif isinstance(msg["content"][0], BetaToolUseBlock):
- # msg["content"][0] = str(msg['content'][0].input)
- # elif isinstance(msg["content"][0], Dict) and msg["content"][0]["content"][-1]["type"] == "image":
- # msg["content"][0] = f''
- # else:
- # print("[_message_filter_callback]: drop message", msg)
- # continue
- # filtered_list.append(msg["content"][0]) # User message
-
- else:
- print("[_message_filter_callback]: drop message", msg)
- continue
-
- except Exception as e:
- print("[_message_filter_callback]: error", e)
-
- return filtered_list
\ No newline at end of file
diff --git a/demo/remote_request.py b/demo/remote_request.py
deleted file mode 100644
index d0bc1c3..0000000
--- a/demo/remote_request.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# uvicorn remote_request:app --host 0.0.0.0 --port 8000 --reload
-
-import sys
-import os
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from utils import get_som_labeled_img, check_ocr_box, get_caption_model_processor, get_yolo_model
-import torch
-from PIL import Image
-from typing import Dict, Tuple, List
-import base64
-
-
-config = {
- 'som_model_path': '../weights/icon_detect_v1_5/model_v1_5.pt',
- 'device': 'cpu',
- 'caption_model_name': 'florence2',
- 'caption_model_path': '../weights/icon_caption_florence',
- 'BOX_TRESHOLD': 0.05
-}
-
-
-class Omniparser(object):
- def __init__(self, config: Dict):
- self.config = config
- device = 'cuda' if torch.cuda.is_available() else 'cpu'
-
- self.som_model = get_yolo_model(model_path=config['som_model_path'])
- self.caption_model_processor = get_caption_model_processor(model_name=config['caption_model_name'], model_name_or_path=config['caption_model_path'], device=device)
- print('Omniparser initialized!!!')
-
- def parse(self, image_base64: str):
- image_path = '../imgs/demo_image.jpg'
- with open(image_path, "wb") as fh:
- fh.write(base64.b64decode(image_base64))
- print('Parsing image:', image_path)
-
- image = Image.open(image_path)
- print('image size:', image.size)
-
- box_overlay_ratio = max(image.size) / 3200
- draw_bbox_config = {
- 'text_scale': 0.8 * box_overlay_ratio,
- 'text_thickness': max(int(2 * box_overlay_ratio), 1),
- 'text_padding': max(int(3 * box_overlay_ratio), 1),
- 'thickness': max(int(3 * box_overlay_ratio), 1),
- }
- BOX_TRESHOLD = config['BOX_TRESHOLD']
-
- ocr_bbox_rslt, is_goal_filtered = check_ocr_box(image_path, display_img = False, output_bb_format='xyxy', goal_filtering=None, easyocr_args={'paragraph': False, 'text_threshold':0.8}, use_paddleocr=True)
- text, ocr_bbox = ocr_bbox_rslt
- dino_labled_img, label_coordinates, parsed_content_list = get_som_labeled_img(image_path, self.som_model, BOX_TRESHOLD = BOX_TRESHOLD, output_coord_in_ratio=True, ocr_bbox=ocr_bbox,draw_bbox_config=draw_bbox_config, caption_model_processor=self.caption_model_processor, ocr_text=text,use_local_semantics=True, iou_threshold=0.7, scale_img=False, batch_size=128)
- with open('../imgs/demo_image_som.jpg', "wb") as fh:
- fh.write(base64.b64decode(dino_labled_img))
-
- return dino_labled_img, parsed_content_list
-
-
-from fastapi import FastAPI
-from pydantic import BaseModel
-
-app = FastAPI()
-
-class Item(BaseModel):
- base64_image: str
- prompt: str
-
-Omniparser = Omniparser(config)
-
-@app.post("/send_text/")
-async def send_text(item: Item):
- print('start parsing...')
- import time
- start = time.time()
- dino_labled_img, parsed_content_list = Omniparser.parse(item.base64_image)
- latency = time.time() - start
- print('time:', latency)
- return {"som_image_base64": dino_labled_img, "parsed_content_list": parsed_content_list, 'latency': latency}
\ No newline at end of file
diff --git a/demo/tmp/outputs/screenshot.png b/demo/tmp/outputs/screenshot.png
deleted file mode 100644
index eca7672..0000000
Binary files a/demo/tmp/outputs/screenshot.png and /dev/null differ
diff --git a/demo/tools/bash.py b/demo/tools/bash.py
deleted file mode 100644
index 56eb912..0000000
--- a/demo/tools/bash.py
+++ /dev/null
@@ -1,136 +0,0 @@
-import asyncio
-import os
-from typing import ClassVar, Literal
-
-from anthropic.types.beta import BetaToolBash20241022Param
-
-from .base import BaseAnthropicTool, CLIResult, ToolError, ToolResult
-
-
-class _BashSession:
- """A session of a bash shell."""
-
- _started: bool
- _process: asyncio.subprocess.Process
-
- command: str = "/bin/bash"
- _output_delay: float = 0.2 # seconds
- _timeout: float = 120.0 # seconds
- _sentinel: str = "<>"
-
- def __init__(self):
- self._started = False
- self._timed_out = False
-
- async def start(self):
- if self._started:
- return
-
- self._process = await asyncio.create_subprocess_shell(
- self.command,
- shell=False,
- stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE,
- )
-
- self._started = True
-
- def stop(self):
- """Terminate the bash shell."""
- if not self._started:
- raise ToolError("Session has not started.")
- if self._process.returncode is not None:
- return
- self._process.terminate()
-
- async def run(self, command: str):
- """Execute a command in the bash shell."""
- if not self._started:
- raise ToolError("Session has not started.")
- if self._process.returncode is not None:
- return ToolResult(
- system="tool must be restarted",
- error=f"bash has exited with returncode {self._process.returncode}",
- )
- if self._timed_out:
- raise ToolError(
- f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
- )
-
- # we know these are not None because we created the process with PIPEs
- assert self._process.stdin
- assert self._process.stdout
- assert self._process.stderr
-
- # send command to the process
- self._process.stdin.write(
- command.encode() + f"; echo '{self._sentinel}'\n".encode()
- )
- await self._process.stdin.drain()
-
- # read output from the process, until the sentinel is found
- output = ""
- try:
- async with asyncio.timeout(self._timeout):
- while True:
- await asyncio.sleep(self._output_delay)
- data = await self._process.stdout.readline()
- if not data:
- break
- line = data.decode()
- output += line
- if self._sentinel in line:
- output = output.replace(self._sentinel, "")
- break
- except asyncio.TimeoutError:
- self._timed_out = True
- raise ToolError(
- f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
- ) from None
-
- error = await self._process.stderr.read()
- error = error.decode()
-
- return CLIResult(output=output.strip(), error=error.strip())
-
-
-class BashTool(BaseAnthropicTool):
- """
- A tool that allows the agent to run bash commands.
- The tool parameters are defined by Anthropic and are not editable.
- """
-
- _session: _BashSession | None
- name: ClassVar[Literal["bash"]] = "bash"
- api_type: ClassVar[Literal["bash_20241022"]] = "bash_20241022"
-
- def __init__(self):
- self._session = None
- super().__init__()
-
- async def __call__(
- self, command: str | None = None, restart: bool = False, **kwargs
- ):
- if restart:
- if self._session:
- self._session.stop()
- self._session = _BashSession()
- await self._session.start()
-
- return ToolResult(system="tool has been restarted.")
-
- if self._session is None:
- self._session = _BashSession()
- await self._session.start()
-
- if command is not None:
- return await self._session.run(command)
-
- raise ToolError("no command provided.")
-
- def to_params(self) -> BetaToolBash20241022Param:
- return {
- "type": self.api_type,
- "name": self.name,
- }
\ No newline at end of file
diff --git a/demo/tools/colorful_text.py b/demo/tools/colorful_text.py
deleted file mode 100644
index 5804187..0000000
--- a/demo/tools/colorful_text.py
+++ /dev/null
@@ -1,27 +0,0 @@
-"""
-Define some colorful stuffs for better visualization in the chat.
-"""
-
-# Define the RGB colors for each letter
-colors = {
- 'S': 'rgb(106, 158, 210)',
- 'h': 'rgb(111, 163, 82)',
- 'o': 'rgb(209, 100, 94)',
- 'w': 'rgb(238, 171, 106)',
- 'U': 'rgb(0, 0, 0)',
- 'I': 'rgb(0, 0, 0)',
-}
-
-# Construct the colorful "ShowUI" word
-colorful_text_showui = "**"+''.join(
- f'{letter}'
- for letter in "ShowUI"
-)+"**"
-
-
-colorful_text_vlm = "**OmniParser Agent**"
-
-colorful_text_user = "**User**"
-
-# print(f"colorful_text_showui: {colorful_text_showui}")
-# **ShowUI**
\ No newline at end of file
diff --git a/demo/tools/computer.py b/demo/tools/computer.py
deleted file mode 100644
index e49f932..0000000
--- a/demo/tools/computer.py
+++ /dev/null
@@ -1,519 +0,0 @@
-import subprocess
-import platform
-import pyautogui
-import asyncio
-import base64
-import os
-import time
-if platform.system() == "Darwin":
- import Quartz # uncomment this line if you are on macOS
-from enum import StrEnum
-from pathlib import Path
-from typing import Literal, TypedDict
-from uuid import uuid4
-from screeninfo import get_monitors
-
-from PIL import ImageGrab, Image
-from functools import partial
-
-from anthropic.types.beta import BetaToolComputerUse20241022Param
-
-from .base import BaseAnthropicTool, ToolError, ToolResult
-from .run import run
-
-OUTPUT_DIR = "./tmp/outputs"
-
-TYPING_DELAY_MS = 12
-TYPING_GROUP_SIZE = 50
-
-Action = Literal[
- "key",
- "type",
- "mouse_move",
- "left_click",
- "left_click_drag",
- "right_click",
- "middle_click",
- "double_click",
- "screenshot",
- "cursor_position",
-]
-
-
-class Resolution(TypedDict):
- width: int
- height: int
-
-
-MAX_SCALING_TARGETS: dict[str, Resolution] = {
- "XGA": Resolution(width=1024, height=768), # 4:3
- "WXGA": Resolution(width=1280, height=800), # 16:10
- "FWXGA": Resolution(width=1366, height=768), # ~16:9
-}
-
-
-class ScalingSource(StrEnum):
- COMPUTER = "computer"
- API = "api"
-
-
-class ComputerToolOptions(TypedDict):
- display_height_px: int
- display_width_px: int
- display_number: int | None
-
-
-def chunks(s: str, chunk_size: int) -> list[str]:
- return [s[i : i + chunk_size] for i in range(0, len(s), chunk_size)]
-
-
-def get_screen_details():
- screens = get_monitors()
- screen_details = []
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s.x)
-
- # Loop through sorted screens and assign positions
- primary_index = 0
- for i, screen in enumerate(sorted_screens):
- if i == 0:
- layout = "Left"
- elif i == len(sorted_screens) - 1:
- layout = "Right"
- else:
- layout = "Center"
-
- if screen.is_primary:
- position = "Primary"
- primary_index = i
- else:
- position = "Secondary"
- screen_info = f"Screen {i + 1}: {screen.width}x{screen.height}, {layout}, {position}"
- screen_details.append(screen_info)
-
- return screen_details, primary_index
-
-
-class ComputerTool(BaseAnthropicTool):
- """
- A tool that allows the agent to interact with the screen, keyboard, and mouse of the current computer.
- Adapted for Windows using 'pyautogui'.
- """
-
- name: Literal["computer"] = "computer"
- api_type: Literal["computer_20241022"] = "computer_20241022"
- width: int
- height: int
- display_num: int | None
-
- _screenshot_delay = 2.0
- _scaling_enabled = True
-
- @property
- def options(self) -> ComputerToolOptions:
- width, height = self.scale_coordinates(
- ScalingSource.COMPUTER, self.width, self.height
- )
- return {
- "display_width_px": width,
- "display_height_px": height,
- "display_number": self.display_num,
- }
-
- def to_params(self) -> BetaToolComputerUse20241022Param:
- return {"name": self.name, "type": self.api_type, **self.options}
-
- def __init__(self, selected_screen: int = 0, is_scaling: bool = False):
- super().__init__()
-
- # Get screen width and height using Windows command
- self.display_num = None
- self.offset_x = 0
- self.offset_y = 0
- self.selected_screen = selected_screen
- self.is_scaling = is_scaling
- self.width, self.height = self.get_screen_size()
-
- # Path to cliclick
- self.cliclick = "cliclick"
- self.key_conversion = {"Page_Down": "pagedown",
- "Page_Up": "pageup",
- "Super_L": "win",
- "Escape": "esc"}
-
- system = platform.system() # Detect platform
- if system == "Windows":
- screens = get_monitors()
- sorted_screens = sorted(screens, key=lambda s: s.x)
- if self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- screen = sorted_screens[self.selected_screen]
- bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
-
- elif system == "Darwin": # macOS
- max_displays = 32 # Maximum number of displays to handle
- active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
- screens = []
- for display_id in active_displays:
- bounds = Quartz.CGDisplayBounds(display_id)
- screens.append({
- 'id': display_id, 'x': int(bounds.origin.x), 'y': int(bounds.origin.y),
- 'width': int(bounds.size.width), 'height': int(bounds.size.height),
- 'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
- })
- sorted_screens = sorted(screens, key=lambda s: s['x'])
- if self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- screen = sorted_screens[self.selected_screen]
- bbox = (screen['x'], screen['y'], screen['x'] + screen['width'], screen['y'] + screen['height'])
- else: # Linux or other OS
- cmd = "xrandr | grep ' primary' | awk '{print $4}'"
- try:
- # output = subprocess.check_output(cmd, shell=True).decode()
- # resolution = output.strip().split()[0]
- # width, height = map(int, resolution.split('x'))
- # bbox = (0, 0, width, height) # Assuming single primary screen for simplicity
- screen = get_monitors()[0]
- bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
- except subprocess.CalledProcessError:
- raise RuntimeError("Failed to get screen resolution on Linux.")
-
- self.offset_x = screen['x'] if system == "Darwin" else screen.x
- self.offset_y = screen['y'] if system == "Darwin" else screen.y
- self.bbox = bbox
-
-
- async def __call__(
- self,
- *,
- action: Action,
- text: str | None = None,
- coordinate: tuple[int, int] | None = None,
- **kwargs,
- ):
- print(f"action: {action}, text: {text}, coordinate: {coordinate}, is_scaling: {self.is_scaling}")
- if action in ("mouse_move", "left_click_drag"):
- if coordinate is None:
- raise ToolError(f"coordinate is required for {action}")
- if text is not None:
- raise ToolError(f"text is not accepted for {action}")
- if not isinstance(coordinate, (list, tuple)) or len(coordinate) != 2:
- raise ToolError(f"{coordinate} must be a tuple of length 2")
- # if not all(isinstance(i, int) and i >= 0 for i in coordinate):
- if not all(isinstance(i, int) for i in coordinate):
- raise ToolError(f"{coordinate} must be a tuple of non-negative ints")
-
- if self.is_scaling:
- x, y = self.scale_coordinates(
- ScalingSource.API, coordinate[0], coordinate[1]
- )
- else:
- x, y = coordinate
-
- # print(f"scaled_coordinates: {x}, {y}")
- # print(f"offset: {self.offset_x}, {self.offset_y}")
-
- # x += self.offset_x # TODO - check if this is needed
- # y += self.offset_y
-
- print(f"mouse move to {x}, {y}")
-
- if action == "mouse_move":
- pyautogui.moveTo(x, y)
- return ToolResult(output=f"Moved mouse to ({x}, {y})")
- elif action == "left_click_drag":
- current_x, current_y = pyautogui.position()
- pyautogui.dragTo(x, y, duration=0.5) # Adjust duration as needed
- return ToolResult(output=f"Dragged mouse from ({current_x}, {current_y}) to ({x}, {y})")
-
- if action in ("key", "type"):
- if text is None:
- raise ToolError(f"text is required for {action}")
- if coordinate is not None:
- raise ToolError(f"coordinate is not accepted for {action}")
- if not isinstance(text, str):
- raise ToolError(output=f"{text} must be a string")
-
- if action == "key":
- # Handle key combinations
- keys = text.split('+')
- for key in keys:
- key = self.key_conversion.get(key.strip(), key.strip())
- key = key.lower()
- pyautogui.keyDown(key) # Press down each key
- for key in reversed(keys):
- key = self.key_conversion.get(key.strip(), key.strip())
- key = key.lower()
- pyautogui.keyUp(key) # Release each key in reverse order
- return ToolResult(output=f"Pressed keys: {text}")
-
- elif action == "type":
- pyautogui.typewrite(text, interval=TYPING_DELAY_MS / 1000) # Convert ms to seconds
- pyautogui.press('enter')
- screenshot_base64 = (await self.screenshot()).base64_image
- return ToolResult(output=text, base64_image=screenshot_base64)
-
- if action in (
- "left_click",
- "right_click",
- "double_click",
- "middle_click",
- "screenshot",
- "cursor_position",
- "left_press",
- ):
- if text is not None:
- raise ToolError(f"text is not accepted for {action}")
- if coordinate is not None:
- raise ToolError(f"coordinate is not accepted for {action}")
-
- if action == "screenshot":
- return await self.screenshot()
- elif action == "cursor_position":
- x, y = pyautogui.position()
- x, y = self.scale_coordinates(ScalingSource.COMPUTER, x, y)
- return ToolResult(output=f"X={x},Y={y}")
- else:
- if action == "left_click":
- pyautogui.click()
- elif action == "right_click":
- pyautogui.rightClick()
- elif action == "middle_click":
- pyautogui.middleClick()
- elif action == "double_click":
- pyautogui.doubleClick()
- elif action == "left_press":
- pyautogui.mouseDown()
- time.sleep(1)
- pyautogui.mouseUp()
- return ToolResult(output=f"Performed {action}")
-
- raise ToolError(f"Invalid action: {action}")
-
- async def screenshot(self):
-
- import time
- time.sleep(1)
-
- """Take a screenshot of the current screen and return a ToolResult with the base64 encoded image."""
- output_dir = Path(OUTPUT_DIR)
- output_dir.mkdir(parents=True, exist_ok=True)
- path = output_dir / f"screenshot_{uuid4().hex}.png"
-
- ImageGrab.grab = partial(ImageGrab.grab, all_screens=True)
-
- # Detect platform
- system = platform.system()
-
- if system == "Windows":
- # Windows: Use screeninfo to get monitor details
- screens = get_monitors()
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s.x)
-
- if self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
-
- screen = sorted_screens[self.selected_screen]
- bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
-
- elif system == "Darwin": # macOS
- # macOS: Use Quartz to get monitor details
- max_displays = 32 # Maximum number of displays to handle
- active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
-
- # Get the display bounds (resolution) for each active display
- screens = []
- for display_id in active_displays:
- bounds = Quartz.CGDisplayBounds(display_id)
- screens.append({
- 'id': display_id,
- 'x': int(bounds.origin.x),
- 'y': int(bounds.origin.y),
- 'width': int(bounds.size.width),
- 'height': int(bounds.size.height),
- 'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
- })
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s['x'])
-
- if self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
-
- screen = sorted_screens[self.selected_screen]
- bbox = (screen['x'], screen['y'], screen['x'] + screen['width'], screen['y'] + screen['height'])
-
- else: # Linux or other OS
- cmd = "xrandr | grep ' primary' | awk '{print $4}'"
- try:
- # output = subprocess.check_output(cmd, shell=True).decode()
- # resolution = output.strip().split()[0]
- # width, height = map(int, resolution.split('x'))
- # bbox = (0, 0, width, height) # Assuming single primary screen for simplicity
- screen = get_monitors()[0]
- bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
- except subprocess.CalledProcessError:
- raise RuntimeError("Failed to get screen resolution on Linux.")
-
- # Take screenshot using the bounding box
- screenshot = ImageGrab.grab(bbox=bbox)
-
- # Set offsets (for potential future use)
- self.offset_x = screen['x'] if system == "Darwin" else screen.x
- self.offset_y = screen['y'] if system == "Darwin" else screen.y
-
- print(f"target_dimension {self.target_dimension}")
-
- if not hasattr(self, 'target_dimension'):
- screenshot = self.padding_image(screenshot)
- self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
-
- # Resize if target_dimensions are specified
- print(f"offset is {self.offset_x}, {self.offset_y}")
- print(f"target_dimension is {self.target_dimension}")
- screenshot = screenshot.resize((self.target_dimension["width"], self.target_dimension["height"]))
-
- # Save the screenshot
- screenshot.save(str(path))
-
- if path.exists():
- # Return a ToolResult instance instead of a dictionary
- return ToolResult(base64_image=base64.b64encode(path.read_bytes()).decode())
-
- raise ToolError(f"Failed to take screenshot: {path} does not exist.")
-
- def padding_image(self, screenshot):
- """Pad the screenshot to 16:10 aspect ratio, when the aspect ratio is not 16:10."""
- _, height = screenshot.size
- new_width = height * 16 // 10
-
- padding_image = Image.new("RGB", (new_width, height), (255, 255, 255))
- # padding to top left
- padding_image.paste(screenshot, (0, 0))
- return padding_image
-
- async def shell(self, command: str, take_screenshot=True) -> ToolResult:
- """Run a shell command and return the output, error, and optionally a screenshot."""
- _, stdout, stderr = await run(command)
- base64_image = None
-
- if take_screenshot:
- # delay to let things settle before taking a screenshot
- await asyncio.sleep(self._screenshot_delay)
- base64_image = (await self.screenshot()).base64_image
-
- return ToolResult(output=stdout, error=stderr, base64_image=base64_image)
-
- def scale_coordinates(self, source: ScalingSource, x: int, y: int):
- """Scale coordinates to a target maximum resolution."""
- if not self._scaling_enabled:
- return x, y
- ratio = self.width / self.height
- target_dimension = None
-
- for target_name, dimension in MAX_SCALING_TARGETS.items():
- # allow some error in the aspect ratio - not ratios are exactly 16:9
- if abs(dimension["width"] / dimension["height"] - ratio) < 0.02:
- if dimension["width"] < self.width:
- target_dimension = dimension
- self.target_dimension = target_dimension
- # print(f"target_dimension: {target_dimension}")
- break
-
- if target_dimension is None:
- # TODO: currently we force the target to be WXGA (16:10), when it cannot find a match
- target_dimension = MAX_SCALING_TARGETS["WXGA"]
- self.target_dimension = MAX_SCALING_TARGETS["WXGA"]
-
- # should be less than 1
- x_scaling_factor = target_dimension["width"] / self.width
- y_scaling_factor = target_dimension["height"] / self.height
- if source == ScalingSource.API:
- if x > self.width or y > self.height:
- raise ToolError(f"Coordinates {x}, {y} are out of bounds")
- # scale up
- return round(x / x_scaling_factor), round(y / y_scaling_factor)
- # scale down
- return round(x * x_scaling_factor), round(y * y_scaling_factor)
-
- def get_screen_size(self):
- if platform.system() == "Windows":
- # Use screeninfo to get primary monitor on Windows
- screens = get_monitors()
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s.x)
-
- if self.selected_screen is None:
- primary_monitor = next((m for m in get_monitors() if m.is_primary), None)
- return primary_monitor.width, primary_monitor.height
- elif self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- else:
- screen = sorted_screens[self.selected_screen]
- return screen.width, screen.height
-
- elif platform.system() == "Darwin":
- # macOS part using Quartz to get screen information
- max_displays = 32 # Maximum number of displays to handle
- active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
-
- # Get the display bounds (resolution) for each active display
- screens = []
- for display_id in active_displays:
- bounds = Quartz.CGDisplayBounds(display_id)
- screens.append({
- 'id': display_id,
- 'x': int(bounds.origin.x),
- 'y': int(bounds.origin.y),
- 'width': int(bounds.size.width),
- 'height': int(bounds.size.height),
- 'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
- })
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s['x'])
-
- if self.selected_screen is None:
- # Find the primary monitor
- primary_monitor = next((screen for screen in screens if screen['is_primary']), None)
- if primary_monitor:
- return primary_monitor['width'], primary_monitor['height']
- else:
- raise RuntimeError("No primary monitor found.")
- elif self.selected_screen < 0 or self.selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- else:
- # Return the resolution of the selected screen
- screen = sorted_screens[self.selected_screen]
- return screen['width'], screen['height']
-
- else: # Linux or other OS
- cmd = "xrandr | grep ' primary' | awk '{print $4}'"
- try:
- # output = subprocess.check_output(cmd, shell=True).decode()
- # resolution = output.strip().split()[0]
- # width, height = map(int, resolution.split('x'))
- # return width, height
- screen = get_monitors()[0]
- return screen.width, screen.height
- except subprocess.CalledProcessError:
- raise RuntimeError("Failed to get screen resolution on Linux.")
-
- def get_mouse_position(self):
- # TODO: enhance this func
- from AppKit import NSEvent
- from Quartz import CGEventSourceCreate, kCGEventSourceStateCombinedSessionState
-
- loc = NSEvent.mouseLocation()
- # Adjust for different coordinate system
- return int(loc.x), int(self.height - loc.y)
-
- def map_keys(self, text: str):
- """Map text to cliclick key codes if necessary."""
- # For simplicity, return text as is
- # Implement mapping if special keys are needed
- return text
\ No newline at end of file
diff --git a/demo/tools/edit.py b/demo/tools/edit.py
deleted file mode 100644
index fd4e868..0000000
--- a/demo/tools/edit.py
+++ /dev/null
@@ -1,290 +0,0 @@
-from collections import defaultdict
-from pathlib import Path
-from typing import Literal, get_args
-
-from anthropic.types.beta import BetaToolTextEditor20241022Param
-
-from .base import BaseAnthropicTool, CLIResult, ToolError, ToolResult
-from .run import maybe_truncate, run
-
-Command = Literal[
- "view",
- "create",
- "str_replace",
- "insert",
- "undo_edit",
-]
-SNIPPET_LINES: int = 4
-
-
-class EditTool(BaseAnthropicTool):
- """
- An filesystem editor tool that allows the agent to view, create, and edit files.
- The tool parameters are defined by Anthropic and are not editable.
- """
-
- api_type: Literal["text_editor_20241022"] = "text_editor_20241022"
- name: Literal["str_replace_editor"] = "str_replace_editor"
-
- _file_history: dict[Path, list[str]]
-
- def __init__(self):
- self._file_history = defaultdict(list)
- super().__init__()
-
- def to_params(self) -> BetaToolTextEditor20241022Param:
- return {
- "name": self.name,
- "type": self.api_type,
- }
-
- async def __call__(
- self,
- *,
- command: Command,
- path: str,
- file_text: str | None = None,
- view_range: list[int] | None = None,
- old_str: str | None = None,
- new_str: str | None = None,
- insert_line: int | None = None,
- **kwargs,
- ):
- _path = Path(path)
- self.validate_path(command, _path)
- if command == "view":
- return await self.view(_path, view_range)
- elif command == "create":
- if not file_text:
- raise ToolError("Parameter `file_text` is required for command: create")
- self.write_file(_path, file_text)
- self._file_history[_path].append(file_text)
- return ToolResult(output=f"File created successfully at: {_path}")
- elif command == "str_replace":
- if not old_str:
- raise ToolError(
- "Parameter `old_str` is required for command: str_replace"
- )
- return self.str_replace(_path, old_str, new_str)
- elif command == "insert":
- if insert_line is None:
- raise ToolError(
- "Parameter `insert_line` is required for command: insert"
- )
- if not new_str:
- raise ToolError("Parameter `new_str` is required for command: insert")
- return self.insert(_path, insert_line, new_str)
- elif command == "undo_edit":
- return self.undo_edit(_path)
- raise ToolError(
- f'Unrecognized command {command}. The allowed commands for the {self.name} tool are: {", ".join(get_args(Command))}'
- )
-
- def validate_path(self, command: str, path: Path):
- """
- Check that the path/command combination is valid.
- """
- # Check if its an absolute path
- if not path.is_absolute():
- suggested_path = Path("") / path
- raise ToolError(
- f"The path {path} is not an absolute path, it should start with `/`. Maybe you meant {suggested_path}?"
- )
- # Check if path exists
- if not path.exists() and command != "create":
- raise ToolError(
- f"The path {path} does not exist. Please provide a valid path."
- )
- if path.exists() and command == "create":
- raise ToolError(
- f"File already exists at: {path}. Cannot overwrite files using command `create`."
- )
- # Check if the path points to a directory
- if path.is_dir():
- if command != "view":
- raise ToolError(
- f"The path {path} is a directory and only the `view` command can be used on directories"
- )
-
- async def view(self, path: Path, view_range: list[int] | None = None):
- """Implement the view command"""
- if path.is_dir():
- if view_range:
- raise ToolError(
- "The `view_range` parameter is not allowed when `path` points to a directory."
- )
-
- _, stdout, stderr = await run(
- rf"find {path} -maxdepth 2 -not -path '*/\.*'"
- )
- if not stderr:
- stdout = f"Here's the files and directories up to 2 levels deep in {path}, excluding hidden items:\n{stdout}\n"
- return CLIResult(output=stdout, error=stderr)
-
- file_content = self.read_file(path)
- init_line = 1
- if view_range:
- if len(view_range) != 2 or not all(isinstance(i, int) for i in view_range):
- raise ToolError(
- "Invalid `view_range`. It should be a list of two integers."
- )
- file_lines = file_content.split("\n")
- n_lines_file = len(file_lines)
- init_line, final_line = view_range
- if init_line < 1 or init_line > n_lines_file:
- raise ToolError(
- f"Invalid `view_range`: {view_range}. It's first element `{init_line}` should be within the range of lines of the file: {[1, n_lines_file]}"
- )
- if final_line > n_lines_file:
- raise ToolError(
- f"Invalid `view_range`: {view_range}. It's second element `{final_line}` should be smaller than the number of lines in the file: `{n_lines_file}`"
- )
- if final_line != -1 and final_line < init_line:
- raise ToolError(
- f"Invalid `view_range`: {view_range}. It's second element `{final_line}` should be larger or equal than its first `{init_line}`"
- )
-
- if final_line == -1:
- file_content = "\n".join(file_lines[init_line - 1 :])
- else:
- file_content = "\n".join(file_lines[init_line - 1 : final_line])
-
- return CLIResult(
- output=self._make_output(file_content, str(path), init_line=init_line)
- )
-
- def str_replace(self, path: Path, old_str: str, new_str: str | None):
- """Implement the str_replace command, which replaces old_str with new_str in the file content"""
- # Read the file content
- file_content = self.read_file(path).expandtabs()
- old_str = old_str.expandtabs()
- new_str = new_str.expandtabs() if new_str is not None else ""
-
- # Check if old_str is unique in the file
- occurrences = file_content.count(old_str)
- if occurrences == 0:
- raise ToolError(
- f"No replacement was performed, old_str `{old_str}` did not appear verbatim in {path}."
- )
- elif occurrences > 1:
- file_content_lines = file_content.split("\n")
- lines = [
- idx + 1
- for idx, line in enumerate(file_content_lines)
- if old_str in line
- ]
- raise ToolError(
- f"No replacement was performed. Multiple occurrences of old_str `{old_str}` in lines {lines}. Please ensure it is unique"
- )
-
- # Replace old_str with new_str
- new_file_content = file_content.replace(old_str, new_str)
-
- # Write the new content to the file
- self.write_file(path, new_file_content)
-
- # Save the content to history
- self._file_history[path].append(file_content)
-
- # Create a snippet of the edited section
- replacement_line = file_content.split(old_str)[0].count("\n")
- start_line = max(0, replacement_line - SNIPPET_LINES)
- end_line = replacement_line + SNIPPET_LINES + new_str.count("\n")
- snippet = "\n".join(new_file_content.split("\n")[start_line : end_line + 1])
-
- # Prepare the success message
- success_msg = f"The file {path} has been edited. "
- success_msg += self._make_output(
- snippet, f"a snippet of {path}", start_line + 1
- )
- success_msg += "Review the changes and make sure they are as expected. Edit the file again if necessary."
-
- return CLIResult(output=success_msg)
-
- def insert(self, path: Path, insert_line: int, new_str: str):
- """Implement the insert command, which inserts new_str at the specified line in the file content."""
- file_text = self.read_file(path).expandtabs()
- new_str = new_str.expandtabs()
- file_text_lines = file_text.split("\n")
- n_lines_file = len(file_text_lines)
-
- if insert_line < 0 or insert_line > n_lines_file:
- raise ToolError(
- f"Invalid `insert_line` parameter: {insert_line}. It should be within the range of lines of the file: {[0, n_lines_file]}"
- )
-
- new_str_lines = new_str.split("\n")
- new_file_text_lines = (
- file_text_lines[:insert_line]
- + new_str_lines
- + file_text_lines[insert_line:]
- )
- snippet_lines = (
- file_text_lines[max(0, insert_line - SNIPPET_LINES) : insert_line]
- + new_str_lines
- + file_text_lines[insert_line : insert_line + SNIPPET_LINES]
- )
-
- new_file_text = "\n".join(new_file_text_lines)
- snippet = "\n".join(snippet_lines)
-
- self.write_file(path, new_file_text)
- self._file_history[path].append(file_text)
-
- success_msg = f"The file {path} has been edited. "
- success_msg += self._make_output(
- snippet,
- "a snippet of the edited file",
- max(1, insert_line - SNIPPET_LINES + 1),
- )
- success_msg += "Review the changes and make sure they are as expected (correct indentation, no duplicate lines, etc). Edit the file again if necessary."
- return CLIResult(output=success_msg)
-
- def undo_edit(self, path: Path):
- """Implement the undo_edit command."""
- if not self._file_history[path]:
- raise ToolError(f"No edit history found for {path}.")
-
- old_text = self._file_history[path].pop()
- self.write_file(path, old_text)
-
- return CLIResult(
- output=f"Last edit to {path} undone successfully. {self._make_output(old_text, str(path))}"
- )
-
- def read_file(self, path: Path):
- """Read the content of a file from a given path; raise a ToolError if an error occurs."""
- try:
- return path.read_text()
- except Exception as e:
- raise ToolError(f"Ran into {e} while trying to read {path}") from None
-
- def write_file(self, path: Path, file: str):
- """Write the content of a file to a given path; raise a ToolError if an error occurs."""
- try:
- path.write_text(file)
- except Exception as e:
- raise ToolError(f"Ran into {e} while trying to write to {path}") from None
-
- def _make_output(
- self,
- file_content: str,
- file_descriptor: str,
- init_line: int = 1,
- expand_tabs: bool = True,
- ):
- """Generate output for the CLI based on the content of a file."""
- file_content = maybe_truncate(file_content)
- if expand_tabs:
- file_content = file_content.expandtabs()
- file_content = "\n".join(
- [
- f"{i + init_line:6}\t{line}"
- for i, line in enumerate(file_content.split("\n"))
- ]
- )
- return (
- f"Here's the result of running `cat -n` on {file_descriptor}:\n"
- + file_content
- + "\n"
- )
diff --git a/demo/tools/run.py b/demo/tools/run.py
deleted file mode 100644
index b9d3481..0000000
--- a/demo/tools/run.py
+++ /dev/null
@@ -1,42 +0,0 @@
-"""Utility to run shell commands asynchronously with a timeout."""
-
-import asyncio
-
-TRUNCATED_MESSAGE: str = "To save on context only part of this file has been shown to you. You should retry this tool after you have searched inside the file with `grep -n` in order to find the line numbers of what you are looking for."
-MAX_RESPONSE_LEN: int = 16000
-
-
-def maybe_truncate(content: str, truncate_after: int | None = MAX_RESPONSE_LEN):
- """Truncate content and append a notice if content exceeds the specified length."""
- return (
- content
- if not truncate_after or len(content) <= truncate_after
- else content[:truncate_after] + TRUNCATED_MESSAGE
- )
-
-
-async def run(
- cmd: str,
- timeout: float | None = 120.0, # seconds
- truncate_after: int | None = MAX_RESPONSE_LEN,
-):
- """Run a shell command asynchronously with a timeout."""
- process = await asyncio.create_subprocess_shell(
- cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
- )
-
- try:
- stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=timeout)
- return (
- process.returncode or 0,
- maybe_truncate(stdout.decode(), truncate_after=truncate_after),
- maybe_truncate(stderr.decode(), truncate_after=truncate_after),
- )
- except asyncio.TimeoutError as exc:
- try:
- process.kill()
- except ProcessLookupError:
- pass
- raise TimeoutError(
- f"Command '{cmd}' timed out after {timeout} seconds"
- ) from exc
diff --git a/demo/tools/screen_capture.py b/demo/tools/screen_capture.py
deleted file mode 100644
index d425141..0000000
--- a/demo/tools/screen_capture.py
+++ /dev/null
@@ -1,185 +0,0 @@
-import subprocess
-import base64
-from pathlib import Path
-from PIL import ImageGrab
-from uuid import uuid4
-from screeninfo import get_monitors
-import platform
-if platform.system() == "Darwin":
- import Quartz # uncomment this line if you are on macOS
-
-from PIL import ImageGrab
-from functools import partial
-from .base import BaseAnthropicTool, ToolError, ToolResult
-
-
-OUTPUT_DIR = "./tmp/outputs"
-
-def get_screenshot(selected_screen: int = 0, resize: bool = True, target_width: int = 1920, target_height: int = 1080):
- # print(f"get_screenshot selected_screen: {selected_screen}")
-
- # Get screen width and height using Windows command
- display_num = None
- offset_x = 0
- offset_y = 0
- selected_screen = selected_screen
- width, height = _get_screen_size()
-
- """Take a screenshot of the current screen and return a ToolResult with the base64 encoded image."""
- output_dir = Path(OUTPUT_DIR)
- output_dir.mkdir(parents=True, exist_ok=True)
- path = output_dir / f"screenshot_{uuid4().hex}.png"
-
- ImageGrab.grab = partial(ImageGrab.grab, all_screens=True)
-
- # Detect platform
- system = platform.system()
-
- if system == "Windows":
- # Windows: Use screeninfo to get monitor details
- screens = get_monitors()
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s.x)
-
- if selected_screen < 0 or selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
-
- screen = sorted_screens[selected_screen]
- bbox = (screen.x, screen.y, screen.x + screen.width, screen.y + screen.height)
-
- elif system == "Darwin": # macOS
- # macOS: Use Quartz to get monitor details
- max_displays = 32 # Maximum number of displays to handle
- active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
-
- # Get the display bounds (resolution) for each active display
- screens = []
- for display_id in active_displays:
- bounds = Quartz.CGDisplayBounds(display_id)
- screens.append({
- 'id': display_id,
- 'x': int(bounds.origin.x),
- 'y': int(bounds.origin.y),
- 'width': int(bounds.size.width),
- 'height': int(bounds.size.height),
- 'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
- })
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s['x'])
- # print(f"Darwin sorted_screens: {sorted_screens}")
-
- if selected_screen < 0 or selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
-
- screen = sorted_screens[selected_screen]
-
- bbox = (screen['x'], screen['y'], screen['x'] + screen['width'], screen['y'] + screen['height'])
-
- else: # Linux or other OS
- cmd = "xrandr | grep ' primary' | awk '{print $4}'"
- try:
- # output = subprocess.check_output(cmd, shell=True).decode()
- # resolution = output.strip().split()[0]
- # width, height = map(int, resolution.split('x'))
- screen = get_monitors()[0]
- width, height = screen.width, screen.height
- bbox = (0, 0, width, height) # Assuming single primary screen for simplicity
- except subprocess.CalledProcessError:
- raise RuntimeError("Failed to get screen resolution on Linux.")
-
- # Take screenshot using the bounding box
- screenshot = ImageGrab.grab(bbox=bbox)
- import os
- if (display_num := os.getenv("DISPLAY_NUM")) is not None:
- display_num = int(display_num)
- _display_prefix = f"DISPLAY=:{display_num} "
- else:
- display_num = None
- _display_prefix = ""
- screenshot_cmd = f"{_display_prefix}scrot -p {path}"
- import pdb; pdb.set_trace()
- result = subprocess.run(screenshot_cmd, shell=True, capture_output=True)
-
- # Set offsets (for potential future use)
- offset_x = screen['x'] if system == "Darwin" else screen.x
- offset_y = screen['y'] if system == "Darwin" else screen.y
-
- # # Resize if
- if resize:
- screenshot = screenshot.resize((target_width, target_height))
-
- # Save the screenshot
- # screenshot.save(str(path))
-
- if path.exists():
- # Return a ToolResult instance instead of a dictionary
- return screenshot, path
-
- raise ToolError(f"Failed to take screenshot: {path} does not exist.")
-
-
-
-
-def _get_screen_size(selected_screen: int = 0):
- if platform.system() == "Windows":
- # Use screeninfo to get primary monitor on Windows
- screens = get_monitors()
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s.x)
- if selected_screen is None:
- primary_monitor = next((m for m in get_monitors() if m.is_primary), None)
- return primary_monitor.width, primary_monitor.height
- elif selected_screen < 0 or selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- else:
- screen = sorted_screens[selected_screen]
- return screen.width, screen.height
- elif platform.system() == "Darwin":
- # macOS part using Quartz to get screen information
- max_displays = 32 # Maximum number of displays to handle
- active_displays = Quartz.CGGetActiveDisplayList(max_displays, None, None)[1]
-
- # Get the display bounds (resolution) for each active display
- screens = []
- for display_id in active_displays:
- bounds = Quartz.CGDisplayBounds(display_id)
- screens.append({
- 'id': display_id,
- 'x': int(bounds.origin.x),
- 'y': int(bounds.origin.y),
- 'width': int(bounds.size.width),
- 'height': int(bounds.size.height),
- 'is_primary': Quartz.CGDisplayIsMain(display_id) # Check if this is the primary display
- })
-
- # Sort screens by x position to arrange from left to right
- sorted_screens = sorted(screens, key=lambda s: s['x'])
-
- if selected_screen is None:
- # Find the primary monitor
- primary_monitor = next((screen for screen in screens if screen['is_primary']), None)
- if primary_monitor:
- return primary_monitor['width'], primary_monitor['height']
- else:
- raise RuntimeError("No primary monitor found.")
- elif selected_screen < 0 or selected_screen >= len(screens):
- raise IndexError("Invalid screen index.")
- else:
- # Return the resolution of the selected screen
- screen = sorted_screens[selected_screen]
- return screen['width'], screen['height']
-
- else: # Linux or other OS
- cmd = "xrandr | grep ' primary' | awk '{print $4}'"
- try:
- # output = subprocess.check_output(cmd, shell=True).decode()
- # resolution = output.strip().split()[0]
- # width, height = map(int, resolution.split('x'))
- # return width, height
- screen = get_monitors()[0]
- return screen.width, screen.height
- except subprocess.CalledProcessError:
- raise RuntimeError("Failed to get screen resolution on Linux.")
diff --git a/gradio_demo.py b/gradio_demo.py
index b09168b..15d46a7 100644
--- a/gradio_demo.py
+++ b/gradio_demo.py
@@ -8,7 +8,7 @@ import io
import base64, os
-from utils import check_ocr_box, get_yolo_model, get_caption_model_processor, get_som_labeled_img
+from util.utils import check_ocr_box, get_yolo_model, get_caption_model_processor, get_som_labeled_img
import torch
from PIL import Image
@@ -17,8 +17,6 @@ yolo_model = get_yolo_model(model_path='weights/icon_detect_v1_5/best.pt')
caption_model_processor = get_caption_model_processor(model_name="florence2", model_name_or_path="weights/icon_caption_florence")
# caption_model_processor = get_caption_model_processor(model_name="blip2", model_name_or_path="weights/icon_caption_blip2")
-
-
MARKDOWN = """
# OmniParser for Pure Vision Based General GUI Agent 🔥
@@ -65,8 +63,6 @@ def process(
# parsed_content_list = str(parsed_content_list)
return image, str(parsed_content_list)
-
-
with gr.Blocks() as demo:
gr.Markdown(MARKDOWN)
with gr.Row():
diff --git a/imgs/gradioicon.png b/imgs/gradioicon.png
new file mode 100644
index 0000000..d6b62c8
Binary files /dev/null and b/imgs/gradioicon.png differ
diff --git a/imgs/header_bar.png b/imgs/header_bar.png
new file mode 100644
index 0000000..148fbee
Binary files /dev/null and b/imgs/header_bar.png differ
diff --git a/imgs/header_bar_thin.png b/imgs/header_bar_thin.png
new file mode 100644
index 0000000..e4be8ba
Binary files /dev/null and b/imgs/header_bar_thin.png differ
diff --git a/imgs/omniboxicon.png b/imgs/omniboxicon.png
new file mode 100644
index 0000000..de05f63
Binary files /dev/null and b/imgs/omniboxicon.png differ
diff --git a/imgs/omniparsericon.png b/imgs/omniparsericon.png
new file mode 100644
index 0000000..877427a
Binary files /dev/null and b/imgs/omniparsericon.png differ
diff --git a/imgs/som_overlaid_omni.png b/imgs/som_overlaid_omni.png
new file mode 100644
index 0000000..49271bc
Binary files /dev/null and b/imgs/som_overlaid_omni.png differ
diff --git a/omniparser.py b/omniparser.py
deleted file mode 100644
index 634ae9f..0000000
--- a/omniparser.py
+++ /dev/null
@@ -1,60 +0,0 @@
-from utils import get_som_labeled_img, check_ocr_box, get_yolo_model
-import torch
-from ultralytics import YOLO
-from PIL import Image
-from typing import Dict, Tuple, List
-import io
-import base64
-
-
-config = {
- 'som_model_path': 'finetuned_icon_detect.pt',
- 'device': 'cpu',
- 'caption_model_path': 'Salesforce/blip2-opt-2.7b',
- 'draw_bbox_config': {
- 'text_scale': 0.8,
- 'text_thickness': 2,
- 'text_padding': 3,
- 'thickness': 3,
- },
- 'BOX_TRESHOLD': 0.05
-}
-
-
-class Omniparser(object):
- def __init__(self, config: Dict):
- self.config = config
-
- self.som_model = get_yolo_model(model_path=config['som_model_path'])
- # self.caption_model_processor = get_caption_model_processor(config['caption_model_path'], device=cofig['device'])
- # self.caption_model_processor['model'].to(torch.float32)
-
- def parse(self, image_path: str):
- print('Parsing image:', image_path)
- ocr_bbox_rslt, is_goal_filtered = check_ocr_box(image_path, display_img = False, output_bb_format='xyxy', goal_filtering=None, easyocr_args={'paragraph': False, 'text_threshold':0.9})
- text, ocr_bbox = ocr_bbox_rslt
-
- draw_bbox_config = self.config['draw_bbox_config']
- BOX_TRESHOLD = self.config['BOX_TRESHOLD']
- dino_labled_img, label_coordinates, parsed_content_list = get_som_labeled_img(image_path, self.som_model, BOX_TRESHOLD = BOX_TRESHOLD, output_coord_in_ratio=False, ocr_bbox=ocr_bbox,draw_bbox_config=draw_bbox_config, caption_model_processor=None, ocr_text=text,use_local_semantics=False)
-
- image = Image.open(io.BytesIO(base64.b64decode(dino_labled_img)))
- # formating output
- return_list = [{'from': 'omniparser', 'shape': {'x':coord[0], 'y':coord[1], 'width':coord[2], 'height':coord[3]},
- 'text': parsed_content_list[i].split(': ')[1], 'type':'text'} for i, (k, coord) in enumerate(label_coordinates.items()) if i < len(parsed_content_list)]
- return_list.extend(
- [{'from': 'omniparser', 'shape': {'x':coord[0], 'y':coord[1], 'width':coord[2], 'height':coord[3]},
- 'text': 'None', 'type':'icon'} for i, (k, coord) in enumerate(label_coordinates.items()) if i >= len(parsed_content_list)]
- )
-
- return [image, return_list]
-
-parser = Omniparser(config)
-image_path = 'examples/pc_1.png'
-
-# time the parser
-import time
-s = time.time()
-image, parsed_content_list = parser.parse(image_path)
-device = config['device']
-print(f'Time taken for Omniparser on {device}:', time.time() - s)
diff --git a/omnitool/gradio/.gitignore b/omnitool/gradio/.gitignore
new file mode 100644
index 0000000..c036379
--- /dev/null
+++ b/omnitool/gradio/.gitignore
@@ -0,0 +1 @@
+tmp/
\ No newline at end of file
diff --git a/demo/__init__.py b/omnitool/gradio/__init__.py
similarity index 100%
rename from demo/__init__.py
rename to omnitool/gradio/__init__.py
diff --git a/demo/gui_agent/anthropic_agent.py b/omnitool/gradio/agent/anthropic_agent.py
similarity index 73%
rename from demo/gui_agent/anthropic_agent.py
rename to omnitool/gradio/agent/anthropic_agent.py
index 09d857e..b1c744e 100644
--- a/demo/gui_agent/anthropic_agent.py
+++ b/omnitool/gradio/agent/anthropic_agent.py
@@ -1,207 +1,162 @@
-"""
-Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
-"""
-import asyncio
-import platform
-from collections.abc import Callable
-from datetime import datetime
-from enum import StrEnum
-from typing import Any, cast
-
-from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
-from anthropic.types import (
- ToolResultBlockParam,
-)
-from anthropic.types.beta import (
- BetaContentBlock,
- BetaContentBlockParam,
- BetaImageBlockParam,
- BetaMessage,
- BetaMessageParam,
- BetaTextBlockParam,
- BetaToolResultBlockParam,
-)
-from anthropic.types import TextBlock
-from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
-
-from tools import BashTool, ComputerTool, EditTool, ToolCollection, ToolResult
-
-from PIL import Image
-from io import BytesIO
-import gradio as gr
-from typing import Dict
-
-
-BETA_FLAG = "computer-use-2024-10-22"
-
-
-class APIProvider(StrEnum):
- ANTHROPIC = "anthropic"
- BEDROCK = "bedrock"
- VERTEX = "vertex"
-
-
-PROVIDER_TO_DEFAULT_MODEL_NAME: dict[APIProvider, str] = {
- APIProvider.ANTHROPIC: "claude-3-5-sonnet-20241022",
- APIProvider.BEDROCK: "anthropic.claude-3-5-sonnet-20241022-v2:0",
- APIProvider.VERTEX: "claude-3-5-sonnet-v2@20241022",
-}
-
-
-# Check OS
-
-SYSTEM_PROMPT = f"""
-* You are utilizing a Windows system with internet access.
-* The current date is {datetime.today().strftime('%A, %B %d, %Y')}.
-
-"""
-
-
-class AnthropicActor:
- def __init__(
- self,
- model: str,
- provider: APIProvider,
- system_prompt_suffix: str,
- api_key: str,
- api_response_callback: Callable[[APIResponse[BetaMessage]], None],
- max_tokens: int = 4096,
- only_n_most_recent_images: int | None = None,
- selected_screen: int = 0,
- print_usage: bool = True,
- ):
- self.model = model
- self.provider = provider
- self.system_prompt_suffix = system_prompt_suffix
- self.api_key = api_key
- self.api_response_callback = api_response_callback
- self.max_tokens = max_tokens
- self.only_n_most_recent_images = only_n_most_recent_images
- self.selected_screen = selected_screen
-
- self.tool_collection = ToolCollection(
- ComputerTool(selected_screen=selected_screen),
- BashTool(),
- EditTool(),
- )
-
- self.system = (
- f"{SYSTEM_PROMPT}{' ' + system_prompt_suffix if system_prompt_suffix else ''}"
- )
-
- self.total_token_usage = 0
- self.total_cost = 0
- self.print_usage = print_usage
-
- # Instantiate the appropriate API client based on the provider
- if provider == APIProvider.ANTHROPIC:
- self.client = Anthropic(api_key=api_key)
- elif provider == APIProvider.VERTEX:
- self.client = AnthropicVertex()
- elif provider == APIProvider.BEDROCK:
- self.client = AnthropicBedrock()
-
- def __call__(
- self,
- *,
- messages: list[BetaMessageParam]
- ):
- """
- Generate a response given history messages.
- """
- if self.only_n_most_recent_images:
- _maybe_filter_to_n_most_recent_images(messages, self.only_n_most_recent_images)
-
- # Call the API synchronously
- raw_response = self.client.beta.messages.with_raw_response.create(
- max_tokens=self.max_tokens,
- messages=messages,
- model=self.model,
- system=self.system,
- tools=self.tool_collection.to_params(),
- betas=["computer-use-2024-10-22"],
- )
-
- self.api_response_callback(cast(APIResponse[BetaMessage], raw_response))
-
- response = raw_response.parse()
- print(f"AnthropicActor response: {response}")
-
- self.total_token_usage += response.usage.input_tokens + response.usage.output_tokens
- self.total_cost += (response.usage.input_tokens * 3 / 1000000 + response.usage.output_tokens * 15 / 1000000)
-
- if self.print_usage:
- print(f"Claude total token usage so far: {self.total_token_usage}, total cost so far: $USD{self.total_cost}")
-
- return response
-
-
-def _maybe_filter_to_n_most_recent_images(
- messages: list[BetaMessageParam],
- images_to_keep: int,
- min_removal_threshold: int = 10,
-):
- """
- With the assumption that images are screenshots that are of diminishing value as
- the conversation progresses, remove all but the final `images_to_keep` tool_result
- images in place, with a chunk of min_removal_threshold to reduce the amount we
- break the implicit prompt cache.
- """
- if images_to_keep is None:
- return messages
-
- tool_result_blocks = cast(
- list[ToolResultBlockParam],
- [
- item
- for message in messages
- for item in (
- message["content"] if isinstance(message["content"], list) else []
- )
- if isinstance(item, dict) and item.get("type") == "tool_result"
- ],
- )
-
- total_images = sum(
- 1
- for tool_result in tool_result_blocks
- for content in tool_result.get("content", [])
- if isinstance(content, dict) and content.get("type") == "image"
- )
-
- images_to_remove = total_images - images_to_keep
- # for better cache behavior, we want to remove in chunks
- images_to_remove -= images_to_remove % min_removal_threshold
-
- for tool_result in tool_result_blocks:
- if isinstance(tool_result.get("content"), list):
- new_content = []
- for content in tool_result.get("content", []):
- if isinstance(content, dict) and content.get("type") == "image":
- if images_to_remove > 0:
- images_to_remove -= 1
- continue
- new_content.append(content)
- tool_result["content"] = new_content
-
-
-
-if __name__ == "__main__":
- pass
- # client = Anthropic(api_key="")
- # response = client.beta.messages.with_raw_response.create(
- # max_tokens=4096,
- # model="claude-3-5-sonnet-20241022",
- # system=SYSTEM_PROMPT,
- # # tools=ToolCollection(
- # # ComputerTool(selected_screen=0),
- # # BashTool(),
- # # EditTool(),
- # # ).to_params(),
- # betas=["computer-use-2024-10-22"],
- # messages=[
- # {"role": "user", "content": "click on (199, 199)."}
- # ],
- # )
-
- # print(f"AnthropicActor response: {response.parse().usage.input_tokens+response.parse().usage.output_tokens}")
\ No newline at end of file
+"""
+Agentic sampling loop that calls the Anthropic API and local implenmentation of anthropic-defined computer use tools.
+"""
+import asyncio
+import platform
+from collections.abc import Callable
+from datetime import datetime
+from enum import StrEnum
+from typing import Any, cast
+
+from anthropic import Anthropic, AnthropicBedrock, AnthropicVertex, APIResponse
+from anthropic.types import (
+ ToolResultBlockParam,
+)
+from anthropic.types.beta import (
+ BetaContentBlock,
+ BetaContentBlockParam,
+ BetaImageBlockParam,
+ BetaMessage,
+ BetaMessageParam,
+ BetaTextBlockParam,
+ BetaToolResultBlockParam,
+)
+from anthropic.types import TextBlock
+from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock
+
+from tools import ComputerTool, ToolCollection, ToolResult
+
+from PIL import Image
+from io import BytesIO
+import gradio as gr
+from typing import Dict
+
+BETA_FLAG = "computer-use-2024-10-22"
+
+class APIProvider(StrEnum):
+ ANTHROPIC = "anthropic"
+ BEDROCK = "bedrock"
+ VERTEX = "vertex"
+
+SYSTEM_PROMPT = f"""
+* You are utilizing a Windows system with internet access.
+* The current date is {datetime.today().strftime('%A, %B %d, %Y')}.
+
+"""
+
+class AnthropicActor:
+ def __init__(
+ self,
+ model: str,
+ provider: APIProvider,
+ api_key: str,
+ api_response_callback: Callable[[APIResponse[BetaMessage]], None],
+ max_tokens: int = 4096,
+ only_n_most_recent_images: int | None = None,
+ print_usage: bool = True,
+ ):
+ self.model = model
+ self.provider = provider
+ self.api_key = api_key
+ self.api_response_callback = api_response_callback
+ self.max_tokens = max_tokens
+ self.only_n_most_recent_images = only_n_most_recent_images
+
+ self.tool_collection = ToolCollection(ComputerTool())
+
+ self.system = SYSTEM_PROMPT
+
+ self.total_token_usage = 0
+ self.total_cost = 0
+ self.print_usage = print_usage
+
+ # Instantiate the appropriate API client based on the provider
+ if provider == APIProvider.ANTHROPIC:
+ self.client = Anthropic(api_key=api_key)
+ elif provider == APIProvider.VERTEX:
+ self.client = AnthropicVertex()
+ elif provider == APIProvider.BEDROCK:
+ self.client = AnthropicBedrock()
+
+ def __call__(
+ self,
+ *,
+ messages: list[BetaMessageParam]
+ ):
+ """
+ Generate a response given history messages.
+ """
+ if self.only_n_most_recent_images:
+ _maybe_filter_to_n_most_recent_images(messages, self.only_n_most_recent_images)
+
+ # Call the API synchronously
+ raw_response = self.client.beta.messages.with_raw_response.create(
+ max_tokens=self.max_tokens,
+ messages=messages,
+ model=self.model,
+ system=self.system,
+ tools=self.tool_collection.to_params(),
+ betas=["computer-use-2024-10-22"],
+ )
+
+ self.api_response_callback(cast(APIResponse[BetaMessage], raw_response))
+
+ response = raw_response.parse()
+ print(f"AnthropicActor response: {response}")
+
+ self.total_token_usage += response.usage.input_tokens + response.usage.output_tokens
+ self.total_cost += (response.usage.input_tokens * 3 / 1000000 + response.usage.output_tokens * 15 / 1000000)
+
+ if self.print_usage:
+ print(f"Claude total token usage so far: {self.total_token_usage}, total cost so far: $USD{self.total_cost}")
+
+ return response
+
+
+def _maybe_filter_to_n_most_recent_images(
+ messages: list[BetaMessageParam],
+ images_to_keep: int,
+ min_removal_threshold: int = 10,
+):
+ """
+ With the assumption that images are screenshots that are of diminishing value as
+ the conversation progresses, remove all but the final `images_to_keep` tool_result
+ images in place, with a chunk of min_removal_threshold to reduce the amount we
+ break the implicit prompt cache.
+ """
+ if images_to_keep is None:
+ return messages
+
+ tool_result_blocks = cast(
+ list[ToolResultBlockParam],
+ [
+ item
+ for message in messages
+ for item in (
+ message["content"] if isinstance(message["content"], list) else []
+ )
+ if isinstance(item, dict) and item.get("type") == "tool_result"
+ ],
+ )
+
+ total_images = sum(
+ 1
+ for tool_result in tool_result_blocks
+ for content in tool_result.get("content", [])
+ if isinstance(content, dict) and content.get("type") == "image"
+ )
+
+ images_to_remove = total_images - images_to_keep
+ # for better cache behavior, we want to remove in chunks
+ images_to_remove -= images_to_remove % min_removal_threshold
+
+ for tool_result in tool_result_blocks:
+ if isinstance(tool_result.get("content"), list):
+ new_content = []
+ for content in tool_result.get("content", []):
+ if isinstance(content, dict) and content.get("type") == "image":
+ if images_to_remove > 0:
+ images_to_remove -= 1
+ continue
+ new_content.append(content)
+ tool_result["content"] = new_content
\ No newline at end of file
diff --git a/omnitool/gradio/agent/llm_utils/groqclient.py b/omnitool/gradio/agent/llm_utils/groqclient.py
new file mode 100644
index 0000000..c31a502
--- /dev/null
+++ b/omnitool/gradio/agent/llm_utils/groqclient.py
@@ -0,0 +1,59 @@
+from groq import Groq
+import os
+from .utils import is_image_path
+
+def run_groq_interleaved(messages: list, system: str, model_name: str, api_key: str, max_tokens=256, temperature=0.6):
+ """
+ Run a chat completion through Groq's API, ignoring any images in the messages.
+ """
+ api_key = api_key or os.environ.get("GROQ_API_KEY")
+ if not api_key:
+ raise ValueError("GROQ_API_KEY is not set")
+
+ client = Groq(api_key=api_key)
+ # avoid using system messages for R1
+ final_messages = [{"role": "user", "content": system}]
+
+ if isinstance(messages, list):
+ for item in messages:
+ if isinstance(item, dict):
+ # For dict items, concatenate all text content, ignoring images
+ text_contents = []
+ for cnt in item["content"]:
+ if isinstance(cnt, str):
+ if not is_image_path(cnt): # Skip image paths
+ text_contents.append(cnt)
+ else:
+ text_contents.append(str(cnt))
+
+ if text_contents: # Only add if there's text content
+ message = {"role": "user", "content": " ".join(text_contents)}
+ final_messages.append(message)
+ else: # str
+ message = {"role": "user", "content": item}
+ final_messages.append(message)
+
+ elif isinstance(messages, str):
+ final_messages.append({"role": "user", "content": messages})
+
+ try:
+ completion = client.chat.completions.create(
+ model="deepseek-r1-distill-llama-70b",
+ messages=final_messages,
+ temperature=0.6,
+ max_completion_tokens=max_tokens,
+ top_p=0.95,
+ stream=False,
+ reasoning_format="raw"
+ )
+
+ response = completion.choices[0].message.content
+ final_answer = response.split('\n')[-1] if '' in response else response
+ final_answer = final_answer.replace("", "")
+ token_usage = completion.usage.total_tokens
+
+ return final_answer, token_usage
+ except Exception as e:
+ print(f"Error in interleaved Groq: {e}")
+
+ return str(e), 0
\ No newline at end of file
diff --git a/omnitool/gradio/agent/llm_utils/oaiclient.py b/omnitool/gradio/agent/llm_utils/oaiclient.py
new file mode 100644
index 0000000..ad42110
--- /dev/null
+++ b/omnitool/gradio/agent/llm_utils/oaiclient.py
@@ -0,0 +1,62 @@
+import os
+import logging
+import base64
+import requests
+from .utils import is_image_path, encode_image
+
+def run_oai_interleaved(messages: list, system: str, model_name: str, api_key: str, max_tokens=256, temperature=0, provider_base_url: str = "https://api.openai.com/v1"):
+ headers = {"Content-Type": "application/json",
+ "Authorization": f"Bearer {api_key}"}
+ final_messages = [{"role": "system", "content": system}]
+
+ if type(messages) == list:
+ for item in messages:
+ contents = []
+ if isinstance(item, dict):
+ for cnt in item["content"]:
+ if isinstance(cnt, str):
+ if is_image_path(cnt) and 'o3-mini' not in model_name:
+ # 03 mini does not support images
+ base64_image = encode_image(cnt)
+ content = {"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{base64_image}"}}
+ else:
+ content = {"type": "text", "text": cnt}
+ else:
+ # in this case it is a text block from anthropic
+ content = {"type": "text", "text": str(cnt)}
+
+ contents.append(content)
+
+ message = {"role": 'user', "content": contents}
+ else: # str
+ contents.append({"type": "text", "text": item})
+ message = {"role": "user", "content": contents}
+
+ final_messages.append(message)
+
+
+ elif isinstance(messages, str):
+ final_messages = [{"role": "user", "content": messages}]
+
+ payload = {
+ "model": model_name,
+ "messages": final_messages,
+ }
+ if 'o1' in model_name or 'o3-mini' in model_name:
+ payload['reasoning_effort'] = 'low'
+ payload['max_completion_tokens'] = max_tokens
+ else:
+ payload['max_tokens'] = max_tokens
+
+ response = requests.post(
+ f"{provider_base_url}/chat/completions", headers=headers, json=payload
+ )
+
+
+ try:
+ text = response.json()['choices'][0]['message']['content']
+ token_usage = int(response.json()['usage']['total_tokens'])
+ return text, token_usage
+ except Exception as e:
+ print(f"Error in interleaved openAI: {e}. This may due to your invalid API key. Please check the response: {response.json()} ")
+ return response.json()
\ No newline at end of file
diff --git a/omnitool/gradio/agent/llm_utils/omniparserclient.py b/omnitool/gradio/agent/llm_utils/omniparserclient.py
new file mode 100644
index 0000000..e90ddef
--- /dev/null
+++ b/omnitool/gradio/agent/llm_utils/omniparserclient.py
@@ -0,0 +1,44 @@
+import requests
+import base64
+from pathlib import Path
+from tools.screen_capture import get_screenshot
+from agent.llm_utils.utils import encode_image
+
+OUTPUT_DIR = "./tmp/outputs"
+
+class OmniParserClient:
+ def __init__(self,
+ url: str) -> None:
+ self.url = url
+
+ def __call__(self,):
+ screenshot, screenshot_path = get_screenshot()
+ screenshot_path = str(screenshot_path)
+ image_base64 = encode_image(screenshot_path)
+ response = requests.post(self.url, json={"base64_image": image_base64})
+ response_json = response.json()
+ print('omniparser latency:', response_json['latency'])
+
+ som_image_data = base64.b64decode(response_json['som_image_base64'])
+ screenshot_path_uuid = Path(screenshot_path).stem.replace("screenshot_", "")
+ som_screenshot_path = f"{OUTPUT_DIR}/screenshot_som_{screenshot_path_uuid}.png"
+ with open(som_screenshot_path, "wb") as f:
+ f.write(som_image_data)
+
+ response_json['width'] = screenshot.size[0]
+ response_json['height'] = screenshot.size[1]
+ response_json['original_screenshot_base64'] = image_base64
+ response_json['screenshot_uuid'] = screenshot_path_uuid
+ response_json = self.reformat_messages(response_json)
+ return response_json
+
+ def reformat_messages(self, response_json: dict):
+ screen_info = ""
+ for idx, element in enumerate(response_json["parsed_content_list"]):
+ element['idx'] = idx
+ if element['type'] == 'text':
+ screen_info += f'ID: {idx}, Text: {element["content"]}\n'
+ elif element['type'] == 'icon':
+ screen_info += f'ID: {idx}, Icon: {element["content"]}\n'
+ response_json['screen_info'] = screen_info
+ return response_json
\ No newline at end of file
diff --git a/omnitool/gradio/agent/llm_utils/utils.py b/omnitool/gradio/agent/llm_utils/utils.py
new file mode 100644
index 0000000..12ab36e
--- /dev/null
+++ b/omnitool/gradio/agent/llm_utils/utils.py
@@ -0,0 +1,13 @@
+import base64
+
+def is_image_path(text):
+ image_extensions = (".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".tif")
+ if text.endswith(image_extensions):
+ return True
+ else:
+ return False
+
+def encode_image(image_path):
+ """Encode image file to base64."""
+ with open(image_path, "rb") as image_file:
+ return base64.b64encode(image_file.read()).decode("utf-8")
\ No newline at end of file
diff --git a/omnitool/gradio/agent/vlm_agent.py b/omnitool/gradio/agent/vlm_agent.py
new file mode 100644
index 0000000..7599c39
--- /dev/null
+++ b/omnitool/gradio/agent/vlm_agent.py
@@ -0,0 +1,338 @@
+import json
+from collections.abc import Callable
+from typing import cast, Callable
+import uuid
+from PIL import Image, ImageDraw
+import base64
+from io import BytesIO
+
+from anthropic import APIResponse
+from anthropic.types import ToolResultBlockParam
+from anthropic.types.beta import BetaMessage, BetaTextBlock, BetaToolUseBlock, BetaMessageParam, BetaUsage
+
+from agent.llm_utils.oaiclient import run_oai_interleaved
+from agent.llm_utils.groqclient import run_groq_interleaved
+from agent.llm_utils.utils import is_image_path
+import time
+import re
+
+OUTPUT_DIR = "./tmp/outputs"
+
+def extract_data(input_string, data_type):
+ # Regular expression to extract content starting from '```python' until the end if there are no closing backticks
+ pattern = f"```{data_type}" + r"(.*?)(```|$)"
+ # Extract content
+ # re.DOTALL allows '.' to match newlines as well
+ matches = re.findall(pattern, input_string, re.DOTALL)
+ # Return the first match if exists, trimming whitespace and ignoring potential closing backticks
+ return matches[0][0].strip() if matches else input_string
+
+class VLMAgent:
+ def __init__(
+ self,
+ model: str,
+ provider: str,
+ api_key: str,
+ output_callback: Callable,
+ api_response_callback: Callable,
+ max_tokens: int = 4096,
+ only_n_most_recent_images: int | None = None,
+ print_usage: bool = True,
+ ):
+ if model == "omniparser + gpt-4o":
+ self.model = "gpt-4o-2024-11-20"
+ elif model == "omniparser + R1":
+ self.model = "deepseek-r1-distill-llama-70b"
+ elif model == "omniparser + qwen2.5vl":
+ self.model = "qwen2.5-vl-72b-instruct"
+ elif model == "omniparser + o1":
+ self.model = "o1"
+ elif model == "omniparser + o3-mini":
+ self.model = "o3-mini"
+ else:
+ raise ValueError(f"Model {model} not supported")
+
+
+ self.provider = provider
+ self.api_key = api_key
+ self.api_response_callback = api_response_callback
+ self.max_tokens = max_tokens
+ self.only_n_most_recent_images = only_n_most_recent_images
+ self.output_callback = output_callback
+
+ self.print_usage = print_usage
+ self.total_token_usage = 0
+ self.total_cost = 0
+ self.step_count = 0
+
+ self.system = ''
+
+ def __call__(self, messages: list, parsed_screen: list[str, list, dict]):
+ self.step_count += 1
+ image_base64 = parsed_screen['original_screenshot_base64']
+ latency_omniparser = parsed_screen['latency']
+ self.output_callback(f'-- Step {self.step_count}: --', sender="bot")
+ screen_info = str(parsed_screen['screen_info'])
+ screenshot_uuid = parsed_screen['screenshot_uuid']
+ screen_width, screen_height = parsed_screen['width'], parsed_screen['height']
+
+ boxids_and_labels = parsed_screen["screen_info"]
+ system = self._get_system_prompt(boxids_and_labels)
+
+ # drop looping actions msg, byte image etc
+ planner_messages = messages
+ _remove_som_images(planner_messages)
+ _maybe_filter_to_n_most_recent_images(planner_messages, self.only_n_most_recent_images)
+
+ if isinstance(planner_messages[-1], dict):
+ if not isinstance(planner_messages[-1]["content"], list):
+ planner_messages[-1]["content"] = [planner_messages[-1]["content"]]
+ planner_messages[-1]["content"].append(f"{OUTPUT_DIR}/screenshot_{screenshot_uuid}.png")
+ planner_messages[-1]["content"].append(f"{OUTPUT_DIR}/screenshot_som_{screenshot_uuid}.png")
+
+ start = time.time()
+ if "gpt" in self.model or "o1" in self.model or "o3-mini" in self.model:
+ vlm_response, token_usage = run_oai_interleaved(
+ messages=planner_messages,
+ system=system,
+ model_name=self.model,
+ api_key=self.api_key,
+ max_tokens=self.max_tokens,
+ provider_base_url="https://api.openai.com/v1",
+ temperature=0,
+ )
+ print(f"oai token usage: {token_usage}")
+ self.total_token_usage += token_usage
+ if 'gpt' in self.model:
+ self.total_cost += (token_usage * 2.5 / 1000000) # https://openai.com/api/pricing/
+ elif 'o1' in self.model:
+ self.total_cost += (token_usage * 15 / 1000000) # https://openai.com/api/pricing/
+ elif 'o3-mini' in self.model:
+ self.total_cost += (token_usage * 1.1 / 1000000) # https://openai.com/api/pricing/
+ elif "r1" in self.model:
+ vlm_response, token_usage = run_groq_interleaved(
+ messages=planner_messages,
+ system=system,
+ model_name=self.model,
+ api_key=self.api_key,
+ max_tokens=self.max_tokens,
+ )
+ print(f"groq token usage: {token_usage}")
+ self.total_token_usage += token_usage
+ self.total_cost += (token_usage * 0.99 / 1000000)
+ elif "qwen" in self.model:
+ vlm_response, token_usage = run_oai_interleaved(
+ messages=planner_messages,
+ system=system,
+ model_name=self.model,
+ api_key=self.api_key,
+ max_tokens=min(2048, self.max_tokens),
+ provider_base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
+ temperature=0,
+ )
+ print(f"qwen token usage: {token_usage}")
+ self.total_token_usage += token_usage
+ self.total_cost += (token_usage * 2.2 / 1000000) # https://help.aliyun.com/zh/model-studio/getting-started/models?spm=a2c4g.11186623.0.0.74b04823CGnPv7#fe96cfb1a422a
+ else:
+ raise ValueError(f"Model {self.model} not supported")
+ latency_vlm = time.time() - start
+ self.output_callback(f"LLM: {latency_vlm:.2f}s, OmniParser: {latency_omniparser:.2f}s", sender="bot")
+
+ print(f"{vlm_response}")
+
+ if self.print_usage:
+ print(f"Total token so far: {self.total_token_usage}. Total cost so far: $USD{self.total_cost:.5f}")
+
+ vlm_response_json = extract_data(vlm_response, "json")
+ vlm_response_json = json.loads(vlm_response_json)
+
+ img_to_show_base64 = parsed_screen["som_image_base64"]
+ if "Box ID" in vlm_response_json:
+ bbox = parsed_screen["parsed_content_list"][int(vlm_response_json["Box ID"])]["bbox"]
+ vlm_response_json["box_centroid_coordinate"] = [int((bbox[0] + bbox[2]) / 2 * screen_width), int((bbox[1] + bbox[3]) / 2 * screen_height)]
+ img_to_show_data = base64.b64decode(img_to_show_base64)
+ img_to_show = Image.open(BytesIO(img_to_show_data))
+
+ draw = ImageDraw.Draw(img_to_show)
+ x, y = vlm_response_json["box_centroid_coordinate"]
+ radius = 10
+ draw.ellipse((x - radius, y - radius, x + radius, y + radius), fill='red')
+ draw.ellipse((x - radius*3, y - radius*3, x + radius*3, y + radius*3), fill=None, outline='red', width=2)
+
+ buffered = BytesIO()
+ img_to_show.save(buffered, format="PNG")
+ img_to_show_base64 = base64.b64encode(buffered.getvalue()).decode("utf-8")
+ self.output_callback(f'', sender="bot")
+ self.output_callback(
+ f''
+ f' Parsed Screen elemetns by OmniParser'
+ f'
{screen_info}
'
+ f'',
+ sender="bot"
+ )
+ vlm_plan_str = ""
+ for key, value in vlm_response_json.items():
+ if key == "Reasoning":
+ vlm_plan_str += f'{value}'
+ else:
+ vlm_plan_str += f'\n{key}: {value}'
+
+ # construct the response so that anthropicExcutor can execute the tool
+ response_content = [BetaTextBlock(text=vlm_plan_str, type='text')]
+ if 'box_centroid_coordinate' in vlm_response_json:
+ move_cursor_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
+ input={'action': 'mouse_move', 'coordinate': vlm_response_json["box_centroid_coordinate"]},
+ name='computer', type='tool_use')
+ response_content.append(move_cursor_block)
+
+ if vlm_response_json["Next Action"] == "None":
+ print("Task paused/completed.")
+ elif vlm_response_json["Next Action"] == "type":
+ sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
+ input={'action': vlm_response_json["Next Action"], 'text': vlm_response_json["value"]},
+ name='computer', type='tool_use')
+ response_content.append(sim_content_block)
+ else:
+ sim_content_block = BetaToolUseBlock(id=f'toolu_{uuid.uuid4()}',
+ input={'action': vlm_response_json["Next Action"]},
+ name='computer', type='tool_use')
+ response_content.append(sim_content_block)
+ response_message = BetaMessage(id=f'toolu_{uuid.uuid4()}', content=response_content, model='', role='assistant', type='message', stop_reason='tool_use', usage=BetaUsage(input_tokens=0, output_tokens=0))
+ return response_message, vlm_response_json
+
+ def _api_response_callback(self, response: APIResponse):
+ self.api_response_callback(response)
+
+ def _get_system_prompt(self, screen_info: str = ""):
+ main_section = f"""
+You are using a Windows device.
+You are able to use a mouse and keyboard to interact with the computer based on the given task and screenshot.
+You can only interact with the desktop GUI (no terminal or application menu access).
+
+You may be given some history plan and actions, this is the response from the previous loop.
+You should carefully consider your plan base on the task, screenshot, and history actions.
+
+Here is the list of all detected bounding boxes by IDs on the screen and their description:{screen_info}
+
+Your available "Next Action" only include:
+- type: types a string of text.
+- left_click: move mouse to box id and left clicks.
+- right_click: move mouse to box id and right clicks.
+- double_click: move mouse to box id and double clicks.
+- hover: move mouse to box id.
+- scroll_up: scrolls the screen up.
+- scroll_down: scrolls the screen down.
+- wait: waits for 1 second for the device to load or respond.
+
+Based on the visual information from the screenshot image and the detected bounding boxes, please determine the next action, the Box ID you should operate on (if action is not 'type', 'hover', 'scroll_up', 'scroll_down', 'wait'), and the value (if the action is 'type') in order to complete the task.
+
+Output format:
+```json
+{{
+ "Reasoning": str, # describe what is in the current screen, taking into account the history, then describe your step-by-step thoughts on how to achieve the task, choose one action from available actions at a time.
+ "Next Action": "action_type, action description" | "None" # one action at a time, describe it in short and precisely.
+ "Box ID": n,
+ "value": "xxx" # only provide value field if the action is type, else don't include value key
+}}
+```
+
+One Example:
+```json
+{{
+ "Reasoning": "The current screen shows google result of amazon, in previous action I have searched amazon on google. Then I need to click on the first search results to go to amazon.com.",
+ "Next Action": "left_click",
+ "Box ID": m
+}}
+```
+
+Another Example:
+```json
+{{
+ "Reasoning": "The current screen shows the front page of amazon. There is no previous action. Therefore I need to type "Apple watch" in the search bar.",
+ "Next Action": "type",
+ "Box ID": n,
+ "value": "Apple watch"
+}}
+```
+
+IMPORTANT NOTES:
+1. You should only give a single action at a time.
+
+"""
+ thinking_model = "r1" in self.model
+ if not thinking_model:
+ main_section += """
+2. You should give an analysis to the current screen, and reflect on what has been done by looking at the history, then describe your step-by-step thoughts on how to achieve the task.
+
+"""
+ else:
+ main_section += """
+2. In XML tags give an analysis to the current screen, and reflect on what has been done by looking at the history, then describe your step-by-step thoughts on how to achieve the task. In