Skip to content

Instantly share code, notes, and snippets.

@tsutof
Created May 25, 2019 07:11
Show Gist options
  • Save tsutof/c7231584300e01b5b3a35b08c82a67db to your computer and use it in GitHub Desktop.
Save tsutof/c7231584300e01b5b3a35b08c82a67db to your computer and use it in GitHub Desktop.
WAV FFT Analyzer with CuPy
# -*- coding: utf-8 -*-
import sys
import os
import numpy as np
import cupy as cp
from pyqtgraph.Qt import QtGui, QtCore
import pyqtgraph as pg
from pyqtgraph.ptime import time
import time
sys.path.append(os.path.dirname(__file__))
import WaveReader
CHUNK_SIZE = (2 ** 11)
MAX_PLOT_POWER = 50
def fft_gpu(x_cpu):
x_gpu = cp.asarray(x_cpu)
y_gpu = cp.fft.rfft(x_gpu)
y_cpu = cp.asnumpy(y_gpu)
return abs(y_cpu)
def fft_cpu(x_cpu):
y_cpu = np.fft.rfft(x_cpu)
return abs(y_cpu)
args = sys.argv
if len(args) < 2:
print('No input wave file specified.')
sys.exit(-1)
reader = WaveReader.WaveReader(args[1], CHUNK_SIZE)
print('Number of channles: %d' % (reader.num_channels))
print('Sample width : %d' % (reader.sample_width))
print('Frame rate : %d' % (reader.framerate))
print('Chunk size : %d' % (reader.chunk_size))
nyquist_freq = reader.framerate / 2.0
resol = nyquist_freq / CHUNK_SIZE
x_data = np.arange(start=0.0, step=resol, stop=resol * (CHUNK_SIZE + 1))
app = QtGui.QApplication([])
p = pg.plot()
p.setTitle('Sound FFT Analyzer')
p.setRange(QtCore.QRectF(0, 0, int(resol * (CHUNK_SIZE + 1)), MAX_PLOT_POWER))
p.setLabel('bottom', 'Index', units='Hz')
p.showGrid(x=True, y=True)
curve_left = p.plot()
curve_right = p.plot()
fft_func = fft_gpu
def update():
global x_data, curve_left, curve_right, reader, app, fft_func
chunk, frames = reader.read()
if reader.num_channels == 1:
curve_left.setData(x_data, fft_func(frames), pen=(0, 0, 255))
else:
left = frames[0::reader.num_channels]
right = frames[1::reader.num_channels]
curve_left.setData(x_data, fft_func(left), pen=(0, 255, 0))
curve_right.setData(x_data, fft_func(right), pen=(255, 0, 0))
app.processEvents() ## force complete redraw for every plot
reader.write()
timer = QtCore.QTimer()
timer.timeout.connect(update)
timer.start(0)
## Start Qt event loop unless running in interactive mode.
if __name__ == '__main__':
import sys
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'):
QtGui.QApplication.instance().exec_()
# -*- coding: utf-8 -*-
import numpy as np
import wave
import pyaudio
import time
class WaveReader:
def __init__(self, fname, chunk_size):
self.wav = wave.open(fname, "rb")
self.num_channels = self.wav.getnchannels()
self.sample_width = self.wav.getsampwidth()
self.framerate = self.wav.getframerate()
self.num_frames = self.wav.getnframes()
self.chunk_size = chunk_size
self.amp = (2**8) ** self.sample_width / 2
type_map = {1:'int8', 2:'int16', 4:'int32'}
self.sample_dtype = type_map[self.sample_width]
self.chunk_len = self.chunk_size * self.num_channels * self.sample_width
self.last_frames = np.zeros(self.chunk_size * self.num_channels)
self.pa_obj = pyaudio.PyAudio()
self.stream = self.pa_obj.open( \
format=self.pa_obj.get_format_from_width(self.sample_width), \
channels=self.num_channels, \
rate=self.framerate, \
output=True)
self.finished = False
self.last_chunk = None
def __del__(self):
self.close()
def close(self):
if self.finished == False:
self.wav.close()
self.stream.stop_stream()
self.stream.close()
self.pa_obj.terminate()
self.finished = True
def read(self):
if self.finished == True:
chunk = self.last_chunk
else :
chunk = self.wav.readframes(self.chunk_size)
if len(chunk) < self.chunk_len:
self.close()
chunk = self.last_chunk
new_frames = np.frombuffer(chunk, self.sample_dtype) / self.amp
frames = np.concatenate([self.last_frames, new_frames])
self.last_frames = new_frames
self.last_chunk = chunk
return (chunk, frames)
def write(self):
if self.finished == False:
self.stream.write(self.last_chunk)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment