Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Cooley-Tukey FFT step using Map Reduce
import numpy
global r, n
def run_mr(data, mapper, reducer):
"""Runs a map reduce locally."""
shuffled = {}
for k, v in data:
for out_k, out_v in mapper(k, v):
shuffled.setdefault(out_k, []).append(out_v)
for k, vs in shuffled.items():
for output in reducer(k, vs):
yield output
def inner_fft_mapper(k, v):
"""Mapper sharding input data into 2^Radix blocks for inner FFT."""
yield k % r, (int(k / r), v)
def inner_fft_reducer(k, v):
"""Reducer performing inner FFT of size 2^N / 2^Radix."""
samples = [x for (_, x) in sorted(v)]
for i, x in enumerate(numpy.fft.fft(samples)):
yield k, (i, x)
def outer_fft_mapper(k, v):
"""Mapper resharding data for outer FFT. Also applies twiddle factor."""
i, v = v
twiddle_factor = numpy.exp(-2.0j * numpy.pi * i * k / (2 ** n))
yield i, (k, v * twiddle_factor)
def outer_fft_reducer(k, v):
"""Reducer performing outer FFT of size 2^Radix."""
samples = [x for (_, x) in sorted(v)]
for i, x in enumerate(numpy.fft.fft(samples)):
yield int(i * r + k), x
if __name__ == '__main__':
# Some test data...
test_sequence = numpy.random.random((64, 1)).ravel()
n = numpy.log2(len(test_sequence))
r = 2 ** (n / 2)
# The data must be provided a list of (index, sample) pairs
input_data = enumerate(test_sequence)
# Run the two steps
step_1 = run_mr(input_data, inner_fft_mapper, inner_fft_reducer)
step_2 = run_mr(step_1, outer_fft_mapper, outer_fft_reducer)
expected_result = numpy.fft.fft(test_sequence)
# Print results and compare to value obtained by a full-sized FFT
for index, value in sorted(list(step_2)):
print 'i=', index, 'v=', value, 'expected', expected_result[index]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.