Skip to content

Instantly share code, notes, and snippets.

@mikelgg93
Last active June 25, 2024 08:31
Show Gist options
  • Save mikelgg93/46a02823e1e271087c3eb6b2ab2cb99d to your computer and use it in GitHub Desktop.
Save mikelgg93/46a02823e1e271087c3eb6b2ab2cb99d to your computer and use it in GitHub Desktop.
GPT4-eyes

Install it and run it:

  1. Set up your Python environment and API key using OpenAI's quick start guide
  2. Clone this gist or download it
  3. pip install -r requirements.txt
  4. python assistant.py

Using it

If you have your computer and Companion Device (Neon or Pupil Invisible) connected to the same network, it will be automatically linked and start streaming the scene camera with the gaze circle overlay.

Press Space to send the snap to GPT-4 and you will get the response by voice, or use “ASDF” keys to change its model.

A - Describe briefly the object gazed.

S - Describe any potential danger, knife, roads, …

D - Try to guess the wearers intention, wants to drink water, make a call, be moved somewhere…

F - More detailed description of the environment.

Press ESC to stop the application.

import base64
import io
import cv2
from openai import OpenAI
from pupil_labs.realtime_api.simple import discover_one_device
from pydub import AudioSegment
from pydub.playback import play
OPENAI_API_KEY = "XXXXX"
class OpenAICost:
date_updated = "2024-06-11"
"Cost $ per 1M token"
model = {
"gpt-4o": {
"input_cost": 5,
"output_cost": 15,
"neon_frame_input": 0.003825,
},
"gpt-4-turbo": {
"input_cost": 10,
"output_cost": 30,
"neon_frame_input": 0.00765,
},
"tts-1": {"output_cost": 15},
"tts-1-hd": {"output_cost": 30},
}
@classmethod
def input_cost(cls, model):
if model in cls.model and "input_cost" in cls.model[model]:
return cls.model[model]["input_cost"]
else:
return None
@classmethod
def output_cost(cls, model):
if model in cls.model and "output_cost" in cls.model[model]:
return cls.model[model]["output_cost"]
else:
return None
@classmethod
def frame_cost(cls, model):
if model in cls.model and "neon_frame_input" in cls.model[model]:
return cls.model[model]["neon_frame_input"]
else:
return None
class Assistant:
def __init__(self):
self.device = None
self.client = OpenAI(api_key=OPENAI_API_KEY)
self.model = "gpt-4o" # "gpt-4-turbo"
self.setup_prompts()
self.mode = "describe"
self.running = True
self.key_actions = {
ord("a"): lambda: setattr(self, "mode", "describe"),
ord("s"): lambda: setattr(self, "mode", "dangers"),
ord("d"): lambda: setattr(self, "mode", "intention"),
ord("f"): lambda: setattr(self, "mode", "in_detail"),
32: self.handle_space,
27: lambda: setattr(self, "running", False),
}
self.session_cost = 0
self.initialise_device()
def initialise_device(self):
print("Looking for the next best device...")
self.device = discover_one_device(max_search_duration_seconds=10)
if self.device is None:
print("No device found.")
raise SystemExit(-1)
print(f"Connecting to {self.device}...")
def setup_prompts(self):
self.prompts = {
"base": "You are a visual and communication aid for individuals with visual impairment (low vision) or communication difficulties, they are wearing eye-tracking glasses, I am sending you an image with a red circle indicating the wearer's gaze, do not describe the whole image unless explicitly asked, be succinct, ",
"describe": "in couple of words (max. 8) say what the person is looking at.",
"dangers": "briefly indicate if there is any posing risk for the person in the scene, be succinct (max 30 words).",
"intention": "given that the wearer has mobility and speaking difficulties, briefly try to infer the wearer's intention based on what they are looking at (maximum of 30 words).",
"in_detail": "describe the scene in detail, with a maximum duration of one minute of speaking.",
}
def process_frame(self):
self.matched = (
self.device.receive_matched_scene_and_eyes_video_frames_and_gaze()
)
if not self.matched:
print("Not able to find a match!")
return
self.annotate_and_show_frame()
def annotate_and_show_frame(self):
cv2.circle(
self.matched.scene.bgr_pixels,
(int(self.matched.gaze.x), int(self.matched.gaze.y)),
radius=40,
color=(0, 0, 255),
thickness=5,
)
self.bgr_pixels = self.matched.scene.bgr_pixels
self.bgr_pixels = cv2.putText(
self.bgr_pixels,
str(self.mode),
(20, 50),
cv2.FONT_HERSHEY_SIMPLEX,
1.5,
(255, 255, 255),
2,
cv2.LINE_8,
)
cv2.imshow(
"Scene camera with eyes and gaze overlay",
self.bgr_pixels,
)
key = cv2.waitKey(1) & 0xFF
if key in self.key_actions:
self.key_actions[key]()
def encode_image(self):
_, buffer = cv2.imencode(".jpg", self.matched.scene.bgr_pixels)
self.base64Frame = base64.b64encode(buffer).decode("utf-8")
def assist(self):
response = self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": [
{
"type": "text",
"text": self.prompts["base"] + self.prompts[self.mode],
}
],
},
{
"role": "user",
"content": [
"Here goes the image",
{"image": self.base64Frame, "resize": 768},
],
},
],
max_tokens=200,
)
response_cost = (
response.usage.prompt_tokens / int(1e6) * OpenAICost.input_cost(self.model)
+ response.usage.completion_tokens
/ int(1e6)
* OpenAICost.output_cost(self.model)
+ OpenAICost.frame_cost(self.model)
)
response_audio = self.client.audio.speech.create(
model="tts-1",
voice="alloy",
input=response.choices[0].message.content,
)
TTS_cost = (
len(response.choices[0].message.content)
/ int(1e6)
* OpenAICost.output_cost("tts-1")
)
self.session_cost += response_cost + TTS_cost
print(
f"R: {response.choices[0].message.content}, approx cost(GPT/TTS): {response_cost} / {TTS_cost} $ Total: {response_cost+TTS_cost} $"
)
byte_stream = io.BytesIO(response_audio.content)
audio = AudioSegment.from_file(byte_stream, format="mp3")
audio = audio.speedup(playback_speed=1.1)
play(audio)
def handle_space(self):
self.encode_image()
self.assist()
def run(self):
while self.device is not None and self.running:
self.process_frame()
print("Stopping...")
print(f"Total session cost {self.session_cost}$")
self.device.close() # explicitly stop auto-update
if __name__ == "__main__":
eyes = Assistant()
eyes.run()
import asyncio
import base64
import io
import typing as T
import cv2
from openai import AsyncOpenAI
from pupil_labs.realtime_api import (
Device,
Network,
receive_gaze_data,
receive_video_frames,
)
from pydub import AudioSegment
from pydub.playback import play
OPENAI_API_KEY = "XXXXXX"
class OpenAICost:
date_updated = "2024-06-11"
"Cost $ per 1M token"
model = {
"gpt-4o": {
"input_cost": 5,
"output_cost": 15,
"neon_frame_input": 0.003825,
},
"gpt-4-turbo": {
"input_cost": 10,
"output_cost": 30,
"neon_frame_input": 0.00765,
},
"tts-1": {"output_cost": 15},
"tts-1-hd": {"output_cost": 30},
}
@classmethod
def input_cost(cls, model):
if model in cls.model and "input_cost" in cls.model[model]:
return cls.model[model]["input_cost"]
else:
return None
@classmethod
def output_cost(cls, model):
if model in cls.model and "output_cost" in cls.model[model]:
return cls.model[model]["output_cost"]
else:
return None
@classmethod
def frame_cost(cls, model):
if model in cls.model and "neon_frame_input" in cls.model[model]:
return cls.model[model]["neon_frame_input"]
else:
return None
class Assistant:
def __init__(self):
self.device = None
self.client = AsyncOpenAI(api_key=OPENAI_API_KEY)
self.model = "gpt-4o" # "gpt-4-turbo"
self.setup_prompts()
self.mode = "describe"
self.running = True
self.key_actions = {
ord("a"): lambda: setattr(self, "mode", "describe"),
ord("s"): lambda: setattr(self, "mode", "dangers"),
ord("d"): lambda: setattr(self, "mode", "intention"),
ord("f"): lambda: setattr(self, "mode", "in_detail"),
32: self.handle_space,
27: lambda: setattr(self, "running", False),
}
self.session_cost = 0
async def initialise_device(self):
print("Looking for the next best device...")
async with Network() as network:
self.device = await network.wait_for_new_device(timeout_seconds=5)
if self.device is None:
print("No device found.")
return
if self.device is None:
print("No device found.")
raise SystemExit(-1)
print(f"Connecting to {self.device}...")
def setup_prompts(self):
self.prompts = {
"base": "You are a visual and communication aid for individuals with visual impairment (low vision) or communication difficulties, they are wearing eye-tracking glasses, I am sending you an image with a red circle indicating the wearer's gaze, do not describe the whole image unless explicitly asked, be succinct, ",
"describe": "in couple of words (max. 8) say what the person is looking at.",
"dangers": "briefly indicate if there is any posing risk for the person in the scene, be succinct (max 30 words).",
"intention": "given that the wearer has mobility and speaking difficulties, briefly try to infer the wearer's intention based on what they are looking at (maximum of 30 words).",
"in_detail": "describe the scene in detail, with a maximum duration of one minute of speaking.",
}
async def process_frame(self):
async with Device.from_discovered_device(self.device) as device:
status = await device.get_status()
sensor_gaze = status.direct_gaze_sensor()
if not sensor_gaze.connected:
print(f"Gaze sensor is not connected to {device}")
return
sensor_world = status.direct_world_sensor()
if not sensor_world.connected:
print(f"Scene camera is not connected to {device}")
return
restart_on_disconnect = True
queue_video = asyncio.Queue()
queue_gaze = asyncio.Queue()
process_video = asyncio.create_task(
self.enqueue_sensor_data(
receive_video_frames(
sensor_world.url, run_loop=restart_on_disconnect
),
queue_video,
)
)
process_gaze = asyncio.create_task(
self.enqueue_sensor_data(
receive_gaze_data(sensor_gaze.url, run_loop=restart_on_disconnect),
queue_gaze,
)
)
try:
await self.match_and_draw(queue_video, queue_gaze)
finally:
process_video.cancel()
process_gaze.cancel()
async def enqueue_sensor_data(
self, sensor: T.AsyncIterator, queue: asyncio.Queue
) -> None:
async for datum in sensor:
try:
queue.put_nowait((datum.datetime, datum))
except asyncio.QueueFull:
print(f"Queue is full, dropping {datum}")
async def get_most_recent_item(self, queue):
item = await queue.get()
while True:
try:
next_item = queue.get_nowait()
except asyncio.QueueEmpty:
return item
else:
item = next_item
async def get_closest_item(self, queue, timestamp):
item_ts, item = await queue.get()
# assumes monotonically increasing timestamps
if item_ts > timestamp:
return item_ts, item
while True:
try:
next_item_ts, next_item = queue.get_nowait()
except asyncio.QueueEmpty:
return item_ts, item
else:
if next_item_ts > timestamp:
return next_item_ts, next_item
item_ts, item = next_item_ts, next_item
async def match_and_draw(self, queue_video, queue_gaze):
while self.running:
video_datetime, video_frame = await self.get_most_recent_item(queue_video)
_, gaze_datum = await self.get_closest_item(queue_gaze, video_datetime)
self.matched = video_frame.to_ndarray(format="bgr24")
cv2.circle(
self.matched,
(int(gaze_datum.x), int(gaze_datum.y)),
radius=40,
color=(0, 0, 255),
thickness=5,
)
self.bgr_pixels = cv2.putText(
self.matched,
str(self.mode),
(20, 50),
cv2.FONT_HERSHEY_SIMPLEX,
1.5,
(255, 255, 255),
2,
cv2.LINE_8,
)
cv2.imshow("Scene camera with gaze overlay", self.bgr_pixels)
key = cv2.waitKey(1) & 0xFF
if key in self.key_actions:
self.key_actions[key]()
async def encode_image(self):
loop = asyncio.get_event_loop()
_, buffer = await loop.run_in_executor(None, cv2.imencode, ".jpg", self.matched)
self.base64Frame = base64.b64encode(buffer).decode("utf-8")
async def assist(self):
await self.encode_image()
response = await self.client.chat.completions.create(
model=self.model,
messages=[
{
"role": "system",
"content": self.prompts["base"] + self.prompts[self.mode],
},
{
"role": "user",
"content": [
"Here goes the image",
{"image": self.base64Frame, "resize": 768},
],
},
],
max_tokens=200,
)
response_cost = (
response.usage.prompt_tokens / int(1e6) * OpenAICost.input_cost(self.model)
+ response.usage.completion_tokens
/ int(1e6)
* OpenAICost.output_cost(self.model)
+ OpenAICost.frame_cost(self.model)
)
response_audio = await self.client.audio.speech.create(
model="tts-1",
voice="alloy",
input=response.choices[0].message.content,
)
TTS_cost = (
len(response.choices[0].message.content)
/ int(1e6)
* OpenAICost.output_cost("tts-1")
)
self.session_cost += response_cost + TTS_cost
print(
f"R: {response.choices[0].message.content}, approx cost(GPT/TTS): {response_cost} / {TTS_cost} $ Total: {response_cost + TTS_cost} $"
)
byte_stream = io.BytesIO(response_audio.content)
audio = AudioSegment.from_file(byte_stream, format="mp3")
audio = audio.speedup(playback_speed=1.1)
play(audio)
def handle_space(self):
asyncio.create_task(self.assist())
async def run(self):
await self.initialise_device()
while self.device is not None and self.running:
await self.process_frame()
print("Stopping...")
print(f"Total session cost {self.session_cost}$")
if __name__ == "__main__":
eyes = Assistant()
asyncio.run(eyes.run())
openai
pydub
pupil-labs-realtime-api
opencv-python
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment