Skip to content

Instantly share code, notes, and snippets.

@haixuanTao
Last active October 29, 2024 11:47
Show Gist options
  • Save haixuanTao/860e1740245dc2c8dd85b496150a9320 to your computer and use it in GitHub Desktop.
Save haixuanTao/860e1740245dc2c8dd85b496150a9320 to your computer and use it in GitHub Desktop.
VLM
nodes:
- id: webcam
custom:
source: https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/operators/opencv_stream.py
outputs:
- image
- id: idefics2
operator:
python: https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/operators/idefics2_op.py
inputs:
image: webcam/image
text: whisper/text
outputs:
- speak
- control
- id: robot
operator:
python: https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/operators/robot_minimize.py
inputs:
control: idefics2/control
- id: parler
operator:
python: https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/operators/parler_op.py
inputs:
text: idefics2/speak
- id: whisper
operator:
python: https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/operators//whisper_op.py
inputs:
audio: dora/timer/millis/1000
outputs:
- text
from dora import DoraStatus
import pyarrow as pa
from transformers import AutoProcessor, AutoModelForVision2Seq, AwqConfig
import torch
CAMERA_WIDTH = 960
CAMERA_HEIGHT = 540
PROCESSOR = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-tfrm-compatible")
BAD_WORDS_IDS = PROCESSOR.tokenizer(
["<image>", "<fake_token_around_image>"], add_special_tokens=False
).input_ids
EOS_WORDS_IDS = PROCESSOR.tokenizer(
"<end_of_utterance>", add_special_tokens=False
).input_ids + [PROCESSOR.tokenizer.eos_token_id]
model = AutoModelForVision2Seq.from_pretrained(
"HuggingFaceM4/idefics2-tfrm-compatible-AWQ",
quantization_config=AwqConfig(
bits=4,
fuse_max_seq_len=4096,
modules_to_fuse={
"attention": ["q_proj", "k_proj", "v_proj", "o_proj"],
"mlp": ["gate_proj", "up_proj", "down_proj"],
"layernorm": ["input_layernorm", "post_attention_layernorm", "norm"],
"use_alibi": False,
"num_attention_heads": 32,
"num_key_value_heads": 8,
"hidden_size": 4096,
},
),
trust_remote_code=True,
).to("cuda")
def reset_awq_cache(model):
"""
Simple method to reset the AWQ fused modules cache
"""
from awq.modules.fused.attn import QuantAttentionFused
for name, module in model.named_modules():
if isinstance(module, QuantAttentionFused):
module.start_pos = 0
def ask_vlm(image, instruction):
global model
prompts = [
"User:",
image,
f"{instruction}.<end_of_utterance>\n",
"Assistant:",
]
inputs = {k: torch.tensor(v).to("cuda") for k, v in PROCESSOR(prompts).items()}
generated_ids = model.generate(
**inputs, bad_words_ids=BAD_WORDS_IDS, max_new_tokens=25
)
generated_texts = PROCESSOR.batch_decode(generated_ids, skip_special_tokens=True)
reset_awq_cache(model)
return generated_texts[0].split("\nAssistant: ")[1]
class Operator:
def __init__(self):
self.state = "coffee"
def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
if dora_event["type"] == "INPUT":
image = (
dora_event["value"].to_numpy().reshape((CAMERA_HEIGHT, CAMERA_WIDTH, 3))
)
if self.state == "person":
output = ask_vlm(image, "Read the sign?").lower()
if "coffee" in output:
send_output(
"speak",
pa.array([output]),
)
send_output(
"control",
pa.array([1.0, 0.0, 0.0, 0.6, 0.0, 10.0, 0.0]),
)
self.state = "coffee"
elif self.state == "coffee":
output = ask_vlm(image, "Is there a hand?").lower()
if "yes" in output:
send_output(
"speak",
pa.array([output]),
)
send_output(
"control",
pa.array([-1.0, 0.0, 0.0, 0.6, 0.0, 10.0, 180.0]),
)
self.state = "person"
return DoraStatus.CONTINUE
import cv2
import pyarrow as pa
from dora import Node
node = Node()
TCP_STREAM_URL = "tcp://192.168.2.1:40921"
CAMERA_WIDTH = 960
CAMERA_HEIGHT = 540
cap = cv2.VideoCapture(TCP_STREAM_URL)
assert cap.isOpened(), "Error: Could not open video capture."
while True:
ret, frame = cap.read()
if not ret:
break # Break the loop when no more frames are available
frame = cv2.resize(frame, (CAMERA_WIDTH, CAMERA_HEIGHT))
node.send_output("image", pa.array(frame.ravel()))
from parler_tts import ParlerTTSForConditionalGeneration
from transformers import AutoTokenizer
import soundfile as sf
import pygame
from dora import DoraStatus
model = ParlerTTSForConditionalGeneration.from_pretrained(
"parler-tts/parler_tts_mini_v0.1"
).to("cuda:0")
tokenizer = AutoTokenizer.from_pretrained("parler-tts/parler_tts_mini_v0.1")
pygame.mixer.init()
input_ids = tokenizer(
"A female speaker with a slightly low-pitched voice delivers her words quite expressively, in a very confined sounding environment with clear audio quality. She speaks very fast.",
return_tensors="pt",
).input_ids.to("cuda:0")
class Operator:
def on_event(
self,
dora_event,
send_output,
):
if dora_event["type"] == "INPUT":
generation = model.generate(
max_new_tokens=200,
input_ids=input_ids,
prompt_input_ids=tokenizer(
dora_event["value"][0].as_py(), return_tensors="pt"
).input_ids.to("cuda:0"),
)
sf.write(
f"parler_tts_out.wav",
generation.cpu().numpy().squeeze(),
model.config.sampling_rate,
)
pygame.mixer.music.load(f"parler_tts_out.wav")
pygame.mixer.music.play()
return DoraStatus.CONTINUE
dora-rs
torch==2.2.0
autoawq
autoawq-kernels
sounddevice
openai-whisper
pynput
opencv-python
Pillow
flash_attn @ https://github.com/Dao-AILab/flash-attention/releases/download/v2.5.6/flash_attn-2.5.6+cu122torch2.2cxx11abiFALSE-cp310-cp310-linux_x86_64.whl
from robomaster import robot
from dora import DoraStatus
from time import sleep
def wait(event):
if not (event is not None and not (event._event.isSet() and event.is_completed)):
sleep(1)
class Operator:
def __init__(self):
self.ep_robot = robot.Robot()
assert self.ep_robot.initialize(conn_type="ap")
assert self.ep_robot.camera.start_video_stream(display=False)
self.event = None
def on_event(self, dora_event, send_output) -> DoraStatus:
if dora_event["type"] == "INPUT":
[x, y, z, xy_speed, z_speed, pitch, yaw] = dora_event["value"].to_numpy()
event = self.ep_robot.gimbal.moveto(
pitch=pitch, yaw=yaw, pitch_speed=50.0, yaw_speed=50.0
)
wait(event)
self.event = self.ep_robot.chassis.move(
x=x, y=y, z=z, xy_speed=xy_speed, z_speed=z_speed
)
wait(event)
return DoraStatus.CONTINUE
conda create -n idefics2 python=3.10
conda activate idefics2
pip install -r https://huggingface.co/datasets/dora-rs/dora-idefics2/blob/main/requirements.txt
wget https://huggingface.co/datasets/dora-rs/dora-idefics2/raw/main/graphs/dataflow_robot_vlm_minimize.yml
dora up
dora start dataflow.yml
import pyarrow as pa
import whisper
from pynput import keyboard
from pynput.keyboard import Key
from dora import DoraStatus
import numpy as np
import pyarrow as pa
import sounddevice as sd
model = whisper.load_model("base")
SAMPLE_RATE = 16000
class Operator:
def on_event(
self,
dora_event,
send_output,
) -> DoraStatus:
global model
if dora_event["type"] == "INPUT":
## Check for keyboard event
with keyboard.Events() as events:
event = events.get(1.0)
if event is not None and event.key == Key.up:
# send_output("led", pa.array([0, 255, 0]))
## Microphone
audio_data = sd.rec(
int(SAMPLE_RATE * 6),
samplerate=SAMPLE_RATE,
channels=1,
dtype=np.int16,
blocking=True,
)
audio = audio_data.ravel().astype(np.float32) / 32768.0
## Speech to text
audio = whisper.pad_or_trim(audio)
result = model.transcribe(audio, language="en")
send_output(
"text", pa.array([result["text"]]), dora_event["metadata"]
)
return DoraStatus.CONTINUE
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment