Created
May 25, 2019 07:11
-
-
Save tsutof/c7231584300e01b5b3a35b08c82a67db to your computer and use it in GitHub Desktop.
WAV FFT Analyzer with CuPy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import sys | |
import os | |
import numpy as np | |
import cupy as cp | |
from pyqtgraph.Qt import QtGui, QtCore | |
import pyqtgraph as pg | |
from pyqtgraph.ptime import time | |
import time | |
sys.path.append(os.path.dirname(__file__)) | |
import WaveReader | |
CHUNK_SIZE = (2 ** 11) | |
MAX_PLOT_POWER = 50 | |
def fft_gpu(x_cpu): | |
x_gpu = cp.asarray(x_cpu) | |
y_gpu = cp.fft.rfft(x_gpu) | |
y_cpu = cp.asnumpy(y_gpu) | |
return abs(y_cpu) | |
def fft_cpu(x_cpu): | |
y_cpu = np.fft.rfft(x_cpu) | |
return abs(y_cpu) | |
args = sys.argv | |
if len(args) < 2: | |
print('No input wave file specified.') | |
sys.exit(-1) | |
reader = WaveReader.WaveReader(args[1], CHUNK_SIZE) | |
print('Number of channles: %d' % (reader.num_channels)) | |
print('Sample width : %d' % (reader.sample_width)) | |
print('Frame rate : %d' % (reader.framerate)) | |
print('Chunk size : %d' % (reader.chunk_size)) | |
nyquist_freq = reader.framerate / 2.0 | |
resol = nyquist_freq / CHUNK_SIZE | |
x_data = np.arange(start=0.0, step=resol, stop=resol * (CHUNK_SIZE + 1)) | |
app = QtGui.QApplication([]) | |
p = pg.plot() | |
p.setTitle('Sound FFT Analyzer') | |
p.setRange(QtCore.QRectF(0, 0, int(resol * (CHUNK_SIZE + 1)), MAX_PLOT_POWER)) | |
p.setLabel('bottom', 'Index', units='Hz') | |
p.showGrid(x=True, y=True) | |
curve_left = p.plot() | |
curve_right = p.plot() | |
fft_func = fft_gpu | |
def update(): | |
global x_data, curve_left, curve_right, reader, app, fft_func | |
chunk, frames = reader.read() | |
if reader.num_channels == 1: | |
curve_left.setData(x_data, fft_func(frames), pen=(0, 0, 255)) | |
else: | |
left = frames[0::reader.num_channels] | |
right = frames[1::reader.num_channels] | |
curve_left.setData(x_data, fft_func(left), pen=(0, 255, 0)) | |
curve_right.setData(x_data, fft_func(right), pen=(255, 0, 0)) | |
app.processEvents() ## force complete redraw for every plot | |
reader.write() | |
timer = QtCore.QTimer() | |
timer.timeout.connect(update) | |
timer.start(0) | |
## Start Qt event loop unless running in interactive mode. | |
if __name__ == '__main__': | |
import sys | |
if (sys.flags.interactive != 1) or not hasattr(QtCore, 'PYQT_VERSION'): | |
QtGui.QApplication.instance().exec_() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import numpy as np | |
import wave | |
import pyaudio | |
import time | |
class WaveReader: | |
def __init__(self, fname, chunk_size): | |
self.wav = wave.open(fname, "rb") | |
self.num_channels = self.wav.getnchannels() | |
self.sample_width = self.wav.getsampwidth() | |
self.framerate = self.wav.getframerate() | |
self.num_frames = self.wav.getnframes() | |
self.chunk_size = chunk_size | |
self.amp = (2**8) ** self.sample_width / 2 | |
type_map = {1:'int8', 2:'int16', 4:'int32'} | |
self.sample_dtype = type_map[self.sample_width] | |
self.chunk_len = self.chunk_size * self.num_channels * self.sample_width | |
self.last_frames = np.zeros(self.chunk_size * self.num_channels) | |
self.pa_obj = pyaudio.PyAudio() | |
self.stream = self.pa_obj.open( \ | |
format=self.pa_obj.get_format_from_width(self.sample_width), \ | |
channels=self.num_channels, \ | |
rate=self.framerate, \ | |
output=True) | |
self.finished = False | |
self.last_chunk = None | |
def __del__(self): | |
self.close() | |
def close(self): | |
if self.finished == False: | |
self.wav.close() | |
self.stream.stop_stream() | |
self.stream.close() | |
self.pa_obj.terminate() | |
self.finished = True | |
def read(self): | |
if self.finished == True: | |
chunk = self.last_chunk | |
else : | |
chunk = self.wav.readframes(self.chunk_size) | |
if len(chunk) < self.chunk_len: | |
self.close() | |
chunk = self.last_chunk | |
new_frames = np.frombuffer(chunk, self.sample_dtype) / self.amp | |
frames = np.concatenate([self.last_frames, new_frames]) | |
self.last_frames = new_frames | |
self.last_chunk = chunk | |
return (chunk, frames) | |
def write(self): | |
if self.finished == False: | |
self.stream.write(self.last_chunk) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment