Skip to content

Instantly share code, notes, and snippets.

@asw456
Forked from hardbyte/Rolling FFT
Created July 9, 2013 20:21
Show Gist options
  • Save asw456/5960924 to your computer and use it in GitHub Desktop.
Save asw456/5960924 to your computer and use it in GitHub Desktop.
import numpy as np
from collections import deque
import time
import threading
import matplotlib.pyplot as plt
def rollingFFT(s, n, dt):
fy = np.fft.fft(s)
# Frequencies associated with each samples
freqs = np.fft.fftfreq(n, d=dt)
hz = np.fft.fftshift(freqs)
ampl = np.fft.fftshift(abs(fy))
index = hz > 0
hz = hz[index]
ampl = ampl[index]
return hz, ampl
def plotFFT(fig, n, dt, data, title="Frequencies Observed", show_data=True):
"""A thread to run the fft as fast as it can.
data is a shared dequeue - filled from somewhere else
"""
if show_data:
ax = fig.add_subplot(211)
ax2 = fig.add_subplot(212)
line2, = ax2.plot(data)
else:
ax = fig.add_subplot(111)
f, a = rollingFFT(data, n, dt)
line1, = ax.plot(f, a, 'r--')
ax.set_xlabel("Hz")
ax.set_xlim([0, f[-1]])
ax.set_ylim([-500, 10000])
plt.title(title)
while True:
f, a = rollingFFT(data, n, dt)
line1.set_ydata(a)
if show_data:
line2.set_ydata(data)
plt.draw()
if __name__ == "__main__":
# simulate generation
def sim(fig, l=20):
n = 200
dt = 0.003
t = np.arange(start=0, stop=l, step=dt)
noise = 25*np.random.normal(0, 2, len(t))
motor_speed = 4000 * np.ones_like(t)
ripple = 70 * np.sin(2*np.pi*(5*t)*t) + \
(40 * np.cos(2*np.pi*t*30)) + \
(80 * (t/10) * np.cos(2*np.pi*t*60))
signal = noise + motor_speed + ripple
signal_queue = deque(maxlen=n)
for i in range(n+1):
signal_queue.append(signal[i])
t = threading.Thread(target=plotFFT, args=(fig, n, dt, signal_queue))
t.start()
for new_value in signal:
signal_queue.append(new_value)
time.sleep(dt)
t.join()
fig = plt.figure()
t = threading.Thread(target=sim, args=(fig,))
t.start()
plt.show()
t.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment