Created
January 4, 2019 13:08
-
-
Save rudrathegreat/e1655b641aae0aedb6837f83493e6e5d to your computer and use it in GitHub Desktop.
A program designed to record, display and save live audio as both waveform and spectrogram
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Copyright C RudratheGreat | |
This program records audio from its surroundings | |
displays it live on several plots showing the | |
spectrum and waveform of the data. It is then | |
processed and saved as both a spectrogram and | |
audio which can then be played/looked back at | |
for further analysis. | |
>>> def find_input_device(self) | |
Finds the enabled default recording device | |
>>> def open_mic_stream(self): | |
This function opens a stream in which live data | |
is being received from. | |
>>> def listen(self): | |
This function simply listens and records live data | |
coming from a stream (your recording device). It | |
then uses Matplotlib to display the data in both | |
waveform (first plot) and as a spectrum (second plot). | |
Then all of this data is then passed onto processBlock() | |
for it to be processed. | |
>>> def proccessBlock(self, snd_block): | |
This function simply collects the data which the | |
program has received and creates a spectrogram | |
and an audio of the the data. These are then saved | |
saved into their corresponding folders where they | |
can be further analysed. | |
Based on the code by MarkJay4k, link below - | |
https://github.com/markjay4k/Audio-Spectrum-Analyzer-in-Python | |
https://www.youtube.com/watch?v=AShHJdSIxkY&list=PLX-LrBk6h3wQVsrldsQdtKmeTygurKiuS | |
''' | |
import pyaudio | |
import struct | |
import numpy as np | |
from scipy import signal, fft | |
import matplotlib.pyplot as plt | |
from matplotlib.colors import LogNorm | |
import time | |
from scipy.io.wavfile import write | |
# from matplotlib import transforms | |
# import math | |
THRESHOLD = 0 # dB | |
RATE = 44100 | |
INPUT_BLOCK_TIME = 30 # 30 sec | |
INPUT_FRAMES_PER_BLOCK = int(RATE * INPUT_BLOCK_TIME) | |
INPUT_FRAMES_PER_BLOCK_BUFFER = int(RATE * INPUT_BLOCK_TIME) | |
def get_rms(block): | |
return np.sqrt(np.mean(np.square(block))) | |
class AudioHandler(object): | |
def __init__(self): | |
self.pa = pyaudio.PyAudio() | |
self.stream = self.open_mic_stream() | |
self.threshold = THRESHOLD | |
self.plot_counter = 0 | |
def stop(self): | |
self.stream.close() | |
def find_input_device(self): | |
device_index = None | |
for i in range(self.pa.get_device_count()): | |
devinfo = self.pa.get_device_info_by_index(i) | |
print('Device %{}: %{}'.format(i, devinfo['name'])) | |
for keyword in ['mic', 'input']: | |
if keyword in devinfo['name'].lower(): | |
print('Found an input: device {} - {}'.format(i, devinfo['name'])) | |
device_index = i | |
return device_index | |
if device_index == None: | |
print('No preferred input found; using default input device.') | |
return device_index | |
def open_mic_stream(self): | |
device_index = self.find_input_device() | |
stream = self.pa.open(format=self.pa.get_format_from_width(2, False), | |
channels=1, | |
rate=RATE, | |
input=True, | |
input_device_index=device_index) | |
stream.start_stream() | |
return stream | |
def processBlock(self, snd_block): | |
plt.clf() | |
f, t, Sxx = signal.spectrogram(snd_block, RATE) | |
zmin = Sxx.min() | |
zmax = Sxx.max() | |
plt.xlabel('Frequency [Hz]') | |
plt.ylabel('Time [sec]') | |
plt.axis([f.min(), f.max(), t.min(), t.max()]) | |
plt.pcolormesh(f, t, Sxx.T, cmap='RdBu', norm=LogNorm(vmin=zmin, vmax=zmax)) | |
plt.colorbar() | |
plt.savefig('Images/spec{}.png'.format(self.plot_counter), bbox_inches='tight') | |
plt.close() | |
snd_block[np.argmax(snd_block)] = snd_block[np.argmax(snd_block)] * 2 ** 30 | |
snd_block[snd_block > np.int(np.median(snd_block))] *= 2 ** 30 | |
write('Audio/audio{}.wav'.format(self.plot_counter), RATE, snd_block) | |
self.plot_counter += 1 | |
def listen(self): | |
frame_count = 0 | |
fig, (ax1, ax2) = plt.subplots(2, figsize=(12, 6)) # create matplotlib figure and axes | |
CHUNK = self.stream.get_read_available() | |
x = np.arange(0, 2 * CHUNK, 2) # samples (waveform) | |
xf = np.linspace(0, RATE, CHUNK) # frequencies (spectrum) | |
fft_spectrum = np.zeros(shape=(300, CHUNK)) | |
line, = ax1.plot(x, np.random.rand(CHUNK), '-', lw=2) # create a line object with random data | |
line_fft, = ax2.semilogx(xf, np.random.rand(CHUNK), '-', lw=2) # create semilogx line for spectrum | |
ax1.set_title('LIVE DATA') # Title | |
ax1.set_xlabel('samples') # X-axis Name | |
ax1.set_ylabel('amplitude') # Y-axis Name | |
ax1.set_ylim(0, 255) # Y-scale | |
ax1.set_xlim(0, 2 * CHUNK) # X-scale | |
plt.setp(ax1, xticks=[0, CHUNK, 2 * CHUNK], yticks=[0, 128, 255]) | |
ax2.set_xlim(20, RATE / 2) | |
plt.show(block=False) | |
print('stream started') | |
print('start', self.stream.is_active(), self.stream.is_stopped()) | |
total = 0 | |
t_snd_block = [] | |
try: | |
while total < INPUT_FRAMES_PER_BLOCK: | |
while self.stream.get_read_available() <= 0: | |
print('waiting') | |
time.sleep(0.01) | |
while self.stream.get_read_available() > 0 and total < INPUT_FRAMES_PER_BLOCK: | |
raw_block = self.stream.read(CHUNK, exception_on_overflow=False) | |
count = len(raw_block) / 2 | |
total = total + count | |
print("done", total, count) | |
# format = '%dh' % count | |
t_snd_block.append(np.fromstring(raw_block, dtype=np.int16)) | |
data_int = struct.unpack(str(2 * CHUNK) + 'B', raw_block) # Integer Data | |
# create np array and offset by 128 | |
data_np = np.array(data_int, dtype='b')[::2] + 128 | |
line.set_ydata(data_np) | |
# compute FFT and update line | |
yf = fft(data_np) | |
data_fft = np.abs(yf[0:CHUNK]) / (128 * CHUNK) | |
fft_spectrum += data_fft | |
data_s = data_fft * 3 | |
line_fft.set_ydata(data_s) | |
# update figure canvas | |
fig.canvas.draw() | |
fig.canvas.flush_events() | |
frame_count += 1 | |
print('stream stopped') | |
snd_block = np.hstack(t_snd_block) | |
self.processBlock(snd_block) | |
plt.close() | |
except Exception as e: | |
print('Error recording: {}'.format(e)) | |
snd_block = np.hstack(t_snd_block) | |
self.processBlock(snd_block) | |
return |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment