Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Cooley-Tukey FFT step using Map Reduce (single MR step)
import numpy
global r, n
def run_mr(data, mapper, reducer):
"""Runs a map reduce locally."""
shuffled = {}
for k, v in data:
for out_k, out_v in mapper(k, v):
shuffled.setdefault(out_k, []).append(out_v)
for k, vs in shuffled.items():
for output in reducer(k, vs):
yield output
def shard(input_data):
shards = {}
for k, v in enumerate(input_data):
shards.setdefault(k % r, []).append((int(k / r), v))
return shards.items()
def fft_mapper(k, v):
samples = [x for (_, x) in sorted(v)]
for i, x in enumerate(numpy.fft.fft(samples)):
twiddle_factor = numpy.exp(-2.0j * numpy.pi * i * k / (2 ** n))
yield i, (k, x * twiddle_factor)
def fft_reducer(k, v):
samples = [x for (_, x) in sorted(v)]
for i, x in enumerate(numpy.fft.fft(samples)):
yield int(i * r + k), x
if __name__ == '__main__':
# Some test data...
test_sequence = numpy.random.random((64, 1)).ravel()
n = numpy.log2(len(test_sequence))
r = 2 ** (n / 2)
output = run_mr(shard(test_sequence), fft_mapper, fft_reducer)
expected_result = numpy.fft.fft(test_sequence)
# Print results and compare to value obtained by a full-sized FFT
for index, value in sorted(list(output)):
print 'i=', index, 'v=', value, 'expected', expected_result[index]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.