Skip to content

Instantly share code, notes, and snippets.

@thearn
Last active December 16, 2015 11:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thearn/5430603 to your computer and use it in GitHub Desktop.
Save thearn/5430603 to your computer and use it in GitHub Desktop.
Class for buffering an incoming data series and computing an FFT
import numpy as np
import time
class bandProcess(object):
"""
Component to isolate specific frequency bands
"""
def __init__(self,
limits=[0., 3.],
make_filtered=True,
operation="pass",
freq_mult=False):
self.peak_hz = 0.
self.phase = 0.
self.freqs = [1, 2, 3, 4]
self.make_filtered = make_filtered
if make_filtered:
self.filtered = [0, 0, 0, 0]
self.fft = [0, 0, 0, 0]
self.limits = limits
self.freq_mult = freq_mult
if freq_mult:
self.limits[0] = self.limits[0] / float(freq_mult)
self.limits[1] = self.limits[1] / float(freq_mult)
self.operation = operation
def process(self, parent):
freqs = np.array(parent.freqs)
fft = np.array(parent.fft)
if self.operation == "pass":
idx = np.where((freqs > self.limits[0])
& (freqs < self.limits[1]))
else:
idx = np.where((freqs < self.limits[0])
& (freqs > self.limits[1]))
if len(idx[0]) > 0:
self.freqs = freqs[idx]
self.fft = np.abs(fft[idx]) ** 2
if self.make_filtered:
fft_out = 0 * fft
fft_out[idx] = fft[idx]
if len(fft_out) > 2:
self.filtered = np.fft.irfft(fft_out)
self.filtered = self.filtered / \
np.hamming(len(self.filtered))
maxidx = np.argmax(self.fft)
self.peak_hz = self.freqs[maxidx]
self.phase = np.angle(fft)[idx][maxidx]
if self.freq_mult:
self.peak_hz = self.peak_hz * self.freq_mult
self.freqs = self.freqs * self.freq_mult
class FFT_series(object):
"""
Collects data from a connected input float over each run and buffers it
internally into a list of maximum size 'n'.
"""
def __init__(self, n=100, auto_fft=True):
self.ready = False
self.auto_fft = auto_fft
self.rate = 0
self.n = n
self.samples = []
self.times = []
self.fft = [0, 0, 0]
self.psd = [0, 0, 0]
self.freqs = [1, 2, 3]
self.interpolated = np.zeros(2)
self.even_times = np.zeros(2)
self.nyquist = 0
self.max_freq_precision = n
def add(self, data, t=None):
"""
Add single value
"""
if not t:
t = time.time()
self.samples.append(data)
self.times.append(t)
self.check()
def add_series(self, data, t=None):
"""
Add series of values
"""
if not t:
t0 = time.time()
t = [t0 * i for i in xrange(1, len(data) + 1)]
self.samples += list(data)
self.times += list(t)
self.check()
def check(self):
"""
Check length of buffer, clip if necessary
"""
if len(self.samples) > self.n:
self.ready = True
self.samples = self.samples[-self.n:]
self.times = self.times[-self.n:]
if self.auto_fft:
self.get_fft()
def get_fft(self):
if len(self.samples) > 3:
self._compute_fft()
return self.fft
def _compute_fft(self, window=np.hamming):
"""
Computes the fft
"""
n = len(self.times)
self.rate = float(n) / (self.times[-1] - self.times[0])
self.nyquist = self.rate / 2.
self.max_freq_precision = self.rate / self.n
self.even_times = np.linspace(self.times[0], self.times[-1], n)
interpolated = np.interp(self.even_times, self.times, self.samples)
interpolated = window(n) * interpolated
self.interpolated = interpolated
# Perform the FFT
self.fft = np.fft.rfft(interpolated)
self.psd = np.abs(self.fft) ** 2
self.freqs = float(self.rate) / n * np.arange(n / 2 + 1)
if __name__ == "__main__":
"""
Demonstrates a fake heartbeat estimation
"""
import random # fake data source
buffered_fft = FFT_series() # create container/fft computer
#band pass filter, 50 to 180 bpm
heart = bandProcess(limits=[50, 180], freq_mult=60)
rate = 15 # fake data source sample rate
while True:
x = random.uniform(0, 100) # aquire
buffered_fft.add(x) # add to buffer - fft computed automatically
# print "%0.1f samples/s, Nyquist:" % (fft.rate), "%0.1f Hz" %
# fft.nyquist
heart.process(buffered_fft) # process between 50 and 180bpm
time.sleep(1. / rate) # pause to approximate sample rate
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment