Skip to content

Instantly share code, notes, and snippets.

@aldur
Last active August 29, 2015 14:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aldur/f2ae69167f4003324f76 to your computer and use it in GitHub Desktop.
Save aldur/f2ae69167f4003324f76 to your computer and use it in GitHub Desktop.
Wireless Lab - Sliding window tracing parsing
import numpy
import os
import pickle
from itertools import islice
TRACES_DIRECTORY = "traces"
OUTPUT_DIRECTORY = "output"
WINDOW_SIZE = 256
SAMPLING_RATE = 32
FREQ_RANGE_MIN = 1
FREQ_RANGE_MAX = 3
def getMagnitudeList(window):
"""Return a list of magnitudes from to the window."""
magnitudes = list()
for line in window:
_, x, y, z = line.split("\t")
x, y, z = (float(i) for i in (x, y, z))
magnitude = numpy.sqrt(x ** 2 + y ** 2 + z ** 2)
magnitudes.append(magnitude)
return magnitudes
def filterFrequencies(fft_x_half, freq_falf):
filtered_fft_x_half, filtered_freq_falf = list(), list()
for i, f in enumerate(freq_falf):
if f >= FREQ_RANGE_MIN and f <= FREQ_RANGE_MAX:
filtered_fft_x_half.append(freq_falf[i])
filtered_freq_falf.append(f)
return filtered_fft_x_half, filtered_freq_falf
def window(seq, n=2):
"Returns a sliding window (of width n) over data from the iterable"
" s -> (s0,s1,...s[n-1]), (s1,s2,...,sn), ... "
it = iter(seq)
result = tuple(islice(it, n))
if len(result) == n:
yield result
for elem in it:
result = result[1:] + (elem,)
yield result
def transformateWindow(magnitudes):
fft_x = numpy.fft.fft(magnitudes)
l = len(fft_x)
freq = numpy.fft.fftfreq(l, 1.0 / 20)
ftt_x_shifted = numpy.fft.fftshift(fft_x)
half_l = numpy.ceil(l / 2.0)
fft_x_half = numpy.abs((2.0 / l) * fft_x[:half_l])
freq_falf = freq[:half_l]
return filterFrequencies(fft_x_half[1:], freq_falf[1:])
def processWindow(w):
w = getMagnitudeList(w)
var = numpy.var(w)
fft_x_half, freq_falf = transformateWindow(w)
pf_index = freq_falf[fft_x_half.index(max(fft_x_half))]
return [var, pf_index] + freq_falf
def main():
if not os.path.isdir(OUTPUT_DIRECTORY):
os.mkdir(OUTPUT_DIRECTORY)
for f in os.listdir(TRACES_DIRECTORY):
with open(os.path.join(TRACES_DIRECTORY, f), 'r') as input:
content = input.readlines()
lists = list()
lists.append(processWindow(content[:WINDOW_SIZE]))
for w in window(content[WINDOW_SIZE:], SAMPLING_RATE):
lists.append(processWindow(w))
with open(os.path.join(OUTPUT_DIRECTORY, f), 'wb') as output:
pickle.dump(lists, output)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment