Skip to content

Instantly share code, notes, and snippets.

@lyleaf
Created November 10, 2019 23:37
Embed
What would you like to do?
stream_beat_v2.py
import pyaudio
import librosa
import numpy as np
import requests
from pyo import *
import time
from numpy_ringbuffer import RingBuffer
pa = pyaudio.PyAudio()
device = 'Pixel USB-C earbuds'
dev_list, dev_index = pa_get_input_devices()
# ['Pixel USB-C earbuds', 'MacBook Pro Microphone']
print(dev_list)
print(dev_index)
dev = dev_index[dev_list.index(device)]
print('Device index:', dev)
# ['DisplayPort', 'Pixel USB-C earbuds', 'MacBook Pro Speakers']
# [0, 2, 4]
print(pa.get_device_info_by_index(dev))
# {'index': 1, 'structVersion': 2,
# 'name': 'Pixel USB-C earbuds', 'hostApi': 0,
# 'maxInputChannels': 1, 'maxOutputChannels': 0,
# 'defaultLowInputLatency': 0.004583333333333333,
# 'defaultLowOutputLatency': 0.01,
# 'defaultHighInputLatency': 0.0139
# 16666666666667,
# 'defaultHighOutputLatency': 0.1,
# 'defaultSampleRate': 48000.0}
rate = 22050
chunk = 3*rate
sample_format = pyaudio.paFloat32 # 16 bits per sample
sample_width = pa.get_sample_size(sample_format)
print(sample_width)
def callback(in_data, frame_count, time_info, flag):
global rate
audio_data = np.frombuffer(in_data, dtype=np.float32)
print(type(audio_data))
print(audio_data.dtype) #float32
print(audio_data.shape) #66150
#ringBuffer.extend(audio_data)
#y = audio_data
#onset_env = librosa.onset.onset_strength(y, sr=rate)
#tempo = librosa.beat.tempo(onset_envelope=onset_env, sr=rate)
tempo = 30
#y = librosa.util.buf_to_float(in_data, n_bytes=sample_width, dtype=np.float32)
#print(y)
print(audio_data)
audio_data, rate = librosa.load('58bpm.wav')
tempo, beat_frames = librosa.beat.beat_track(y=audio_data, sr=rate)
time.sleep(3)
print('Estimated tempo: {} beats per minute'.format(tempo))
return (None, pyaudio.paContinue)
stream = pa.open(format = pyaudio.paFloat32,
channels = 1,
rate = rate,
output = False,
input = True,
input_device_index = dev,
output_device_index = dev,
frames_per_buffer=chunk,
stream_callback = callback)
# start the stream
stream.start_stream()
while stream.is_active():
time.sleep(3)
stream.close()
pa.terminate()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment