Skip to content

Instantly share code, notes, and snippets.

@firezdog
Created November 6, 2022 19:52
Show Gist options
  • Save firezdog/c05406d1040350d13918ca3cf9a2a6d8 to your computer and use it in GitHub Desktop.
Save firezdog/c05406d1040350d13918ca3cf9a2a6d8 to your computer and use it in GitHub Desktop.
Simple Implementation of MapReduce In Python
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor as Executor
from time import sleep
def async_map(executor: Executor, mapper, data):
return [executor.submit(mapper, datum) for datum in data]
def report_progress(futures, tag, callback):
done = 0
num_jobs = len(futures)
while num_jobs > done:
done = sum([1 if fut.done() else 0 for fut in futures])
sleep(0.5)
[callback(tag, done, num_jobs - done) if callback else None]
def map_reduce(mr_input, mapper, reducer, callback=None):
with Executor() as executor:
# map
futures = async_map(executor, mapper, mr_input)
report_progress(futures, 'map_stage', callback)
map_results = map(lambda f: f.result(), futures)
# shuffle (group like with like, sort)
distributor = defaultdict(list)
for k, v in map_results:
distributor[k].append(v)
# reduce
futures = async_map(executor, reducer, distributor.items())
report_progress(futures, 'reduce_stage', callback)
results = map(lambda f: f.result(), futures)
return results
def reporter(tag, done, not_done):
print(f'Operation {tag}: {done} / {done+not_done}')
if __name__ == '__main__':
from test_functions import sample_map, sample_reduce
words = 'whether tis nobler in the mind to suffer the slings and arrows of outrageous fortune or by opposing end them'.split(' ') # noqa: E501
print(list(map_reduce(words, sample_map, sample_reduce, reporter)))
def sample_map(word):
return (word, 1)
def sample_reduce(kv_pair):
return kv_pair[0], sum(kv_pair[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment