|
import base64 |
|
import io |
|
|
|
import cv2 |
|
from openai import OpenAI |
|
from pupil_labs.realtime_api.simple import discover_one_device |
|
from pydub import AudioSegment |
|
from pydub.playback import play |
|
|
|
|
|
class Assistant: |
|
def __init__(self): |
|
self.device = None |
|
self.client = OpenAI() |
|
self.setup_prompts() |
|
self.mode = "describe" |
|
self.running = True |
|
self.key_actions = { |
|
ord("a"): lambda: setattr(self, "mode", "describe"), |
|
ord("s"): lambda: setattr(self, "mode", "dangers"), |
|
ord("d"): lambda: setattr(self, "mode", "intention"), |
|
ord("f"): lambda: setattr(self, "mode", "in_detail"), |
|
32: self.handle_space, |
|
27: lambda: setattr(self, "running", False), |
|
} |
|
self.session_cost = 0 |
|
self.initialise_device() |
|
|
|
def initialise_device(self): |
|
print("Looking for the next best device...") |
|
self.device = discover_one_device(max_search_duration_seconds=10) |
|
if self.device is None: |
|
print("No device found.") |
|
raise SystemExit(-1) |
|
|
|
print(f"Connecting to {self.device}...") |
|
|
|
def setup_prompts(self): |
|
self.prompts = { |
|
"base": "You are a visual and communication aid for individuals with visual impairment (low vision) or communication difficulties, they are wearing eye-tracking glasses, I am sending you an image with a red circle indicating the wearer's gaze, do not describe the whole image unless explicitly asked, be succinct, ", |
|
"describe": "in couple of words (max. 8) say what the person is looking at.", |
|
"dangers": "briefly indicate if there is any posing risk for the person in the scene, be succinct (max 30 words).", |
|
"intention": "given that the wearer has mobility and speaking difficulties, briefly try to infer the wearer's intention based on what they are looking at (maximum of 30 words).", |
|
"in_detail": "describe the scene in detail, with a maximum duration of one minute of speaking.", |
|
} |
|
|
|
def process_frame(self): |
|
self.matched = ( |
|
self.device.receive_matched_scene_and_eyes_video_frames_and_gaze() |
|
) |
|
if not self.matched: |
|
print("Not able to find a match!") |
|
return |
|
self.annotate_and_show_frame() |
|
|
|
def annotate_and_show_frame(self): |
|
cv2.circle( |
|
self.matched.scene.bgr_pixels, |
|
(int(self.matched.gaze.x), int(self.matched.gaze.y)), |
|
radius=40, |
|
color=(0, 0, 255), |
|
thickness=5, |
|
) |
|
self.bgr_pixels = self.matched.scene.bgr_pixels |
|
self.bgr_pixels = cv2.putText( |
|
self.bgr_pixels, |
|
str(self.mode), |
|
(20, 50), |
|
cv2.FONT_HERSHEY_SIMPLEX, |
|
1.5, |
|
(255, 255, 255), |
|
2, |
|
cv2.LINE_8, |
|
) |
|
cv2.imshow( |
|
"Scene camera with eyes and gaze overlay", |
|
self.bgr_pixels, |
|
) |
|
key = cv2.waitKey(1) & 0xFF |
|
if key in self.key_actions: |
|
self.key_actions[key]() |
|
|
|
def encode_image(self): |
|
_, buffer = cv2.imencode(".jpg", self.matched.scene.bgr_pixels) |
|
self.base64Frame = base64.b64encode(buffer).decode("utf-8") |
|
|
|
def assist(self): |
|
response = self.client.chat.completions.create( |
|
model="gpt-4-vision-preview", |
|
messages=[ |
|
{ |
|
"role": "system", |
|
"content": [ |
|
{ |
|
"type": "text", |
|
"text": self.prompts["base"] + self.prompts[self.mode], |
|
} |
|
], |
|
}, |
|
{ |
|
"role": "user", |
|
"content": [ |
|
"Here goes the image", |
|
{"image": self.base64Frame, "resize": 768}, |
|
], |
|
}, |
|
], |
|
max_tokens=200, |
|
) |
|
response_cost = ( |
|
response.usage.prompt_tokens * 0.01 / 1000 |
|
+ response.usage.completion_tokens * 0.03 / 1000 |
|
+ 0.00765 |
|
) |
|
response_audio = self.client.audio.speech.create( |
|
model="tts-1", |
|
voice="alloy", |
|
input=response.choices[0].message.content, |
|
) |
|
TTS_cost = len(response.choices[0].message.content) * 0.015 / 1000 |
|
self.session_cost += response_cost + TTS_cost |
|
print( |
|
f"R: {response.choices[0].message.content}, approx cost(GPT/TTS): {response_cost} / {TTS_cost} $ Total: {response_cost+TTS_cost} $" |
|
) |
|
byte_stream = io.BytesIO(response_audio.content) |
|
audio = AudioSegment.from_file(byte_stream, format="mp3") |
|
audio = audio.speedup(playback_speed=1.1) |
|
play(audio) |
|
|
|
def handle_space(self): |
|
self.encode_image() |
|
self.assist() |
|
|
|
def run(self): |
|
while self.device is not None and self.running: |
|
self.process_frame() |
|
print("Stopping...") |
|
print(f"Total session cost {self.session_cost}$") |
|
self.device.close() # explicitly stop auto-update |
|
|
|
|
|
if __name__ == "__main__": |
|
eyes = Assistant() |
|
eyes.run() |