Skip to content

Instantly share code, notes, and snippets.

@el-hult
Last active May 5, 2022 08:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save el-hult/9bafd3b4cb0ddff6dd00c0b960b83378 to your computer and use it in GitHub Desktop.
Save el-hult/9bafd3b4cb0ddff6dd00c0b960b83378 to your computer and use it in GitHub Desktop.
# nice library that gives us access to the microphone
import pyaudio
# python standard libraries that come built-in
import threading
import queue
import wave
# optional libraries
import numpy as np
import scipy.io.wavfile
#
# CONFIG
#
FORMATIN = pyaudio.paInt16 # use 16 bit integer LPCM ...
NPTYPE = np.int16 # ... with matching numpy type
WIDTH = 2 # a 16 bit int takes 2 bytes
CHANNELS = 1 # I only have mono mic
RATE = 22050 # 22050 Hz is cheaper. 44100 is standard
CHUNK = 1024 # It is quite common to record in chunks of 1024 frames (of 16 bits each).
# 1024 frames in 22050 Hz is ca 46 ms, so sending a signal to stop recording
# means it can run for up to 46 ms before reading this signal
# smaller chunks leads to snappier performance in the interface with the
# sound device, but it introduces a large workload on cpu and memory
#
# Setup
#
audio = pyaudio.PyAudio()
data = []
do_record = threading.Event()
data_q = queue.SimpleQueue()
def callback(in_data,frame_count,time_info,status_flag):
"""This function is called whenever PyAudio wants to give us new recorded data (via the in_data variable)
It gets run in a separate thread, so to make sure the parallellism works, we communicate via
threading.Event() and queue.SimpleQueue()
by always returning `pyaudio.paContinue`, we make sure that the stream is held open, and we can
resume recoring whenever we want
"""
if do_record.is_set():
data_q.put(in_data)
return (None,pyaudio.paContinue)
streamIn = audio.open(format=FORMATIN,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK,
stream_callback=callback,
start=True
)
#
# Main program logic
#
saved_clips = 0
while True:
#
# Start a new recording?
#
x = input("Record another clip? [y]/n")
if x.strip() == "n":
print("Goodbye. Stopping program")
break
do_record.set()
#
# Stop recording, clean up, collect data
#
input("Press ENTER to stop")
do_record.clear()
data = []
while not data_q.empty():
data.append(data_q.get())
#
# signal processing example for how to get the data into
# numpy, process it and then save to disk
# In this case, I smoothen the waveform a bit reducing noise
#
wav_bytes_as_array = np.concatenate([np.frombuffer(
frame, dtype=NPTYPE) for frame in data], axis=0)
w = np.ones(int(0.005 * RATE)) # 5 ms box filter
smoothened_waveform = np.convolve(w/w.sum(),wav_bytes_as_array,mode='same')
scipy.io.wavfile.write(f"clip{saved_clips}_numpy.wav", RATE, smoothened_waveform.astype(NPTYPE))
#
# if you don't want to pre-process the data, you can save it directly via
# the built in wave library. here we just write the bytes straight up
#
with wave.open(f"clip{saved_clips}_direct.wav",'wb') as f:
f.setframerate(RATE)
f.setnchannels(CHANNELS)
f.setsampwidth(WIDTH)
for d in data:
f.writeframes(d)
saved_clips+=1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment