Skip to content

Instantly share code, notes, and snippets.

@sandorkazi sandorkazi/
Last active Feb 5, 2019

What would you like to do?
MapReduce simulator in python
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
import logging
import sys
from typing import Tuple, Any, Iterable, Union
class MRSimulator:
def __init__(self, mappers=3, reducers=2):
self.__num_mappers = mappers
self.__num_reducers = reducers
logger = logging.getLogger('MRSimulator')
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s:%(name)s %(message)s'))
self._logger = logger
def _flatMap(cls, mass_k1v1: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, Any]]:
Performs a flatMap on a set of values.
:param mass_k1v1: a set of (k1, v1) tuples
:return: a set of (k2, v2) tuples
return chain(*map(lambda k1v1: cls.flatMapper(*k1v1), mass_k1v1))
def _shuffle(cls, mapped: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, Iterable[Any]]]:
Collects the inner key-value pairs by key.
:param mapped: a collection of (k2, v2) tuples
:return: a collection of (k2, v2-collection) tuples
d = defaultdict(list)
for k2, v2 in mapped:
return d.items()
def _reduce(cls, mass_k2v2s: Iterable[Tuple[Any, Iterable[Any]]]) -> Iterable[Tuple[Any, Any]]:
Performs a reduce on a set of values.
:param mass_k2v2s: a set of (k2, v2-collection) tuples
:return: a set of (k3, v3) tuples
return map(lambda k2v2s: cls.reducer(*k2v2s), mass_k2v2s)
def split(a, n):
Splits the sequence into quasi-equal parts.
:param a: sequence to split
:param n: number of splits
:return: the resulting list of lists
k, m = divmod(len(a), n)
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def mapReduce(self, inputFile: str, outputFile: Union[str, None]=None) -> Iterable[Tuple[Any, Any]]:
Performs a MapReduce on the given input file (map and reduce should be implemented).
:param inputFile: input file location
:param outputFile: output file location (or None if no output file should be created)
:return: the reduce output
log = self._logger'Reading data')
data = self.getData(inputFile)'Running mapper threads')
with ThreadPoolExecutor(max_workers=self.__num_mappers) as executor:
mapped = executor.submit(self._flatMap, data).result()'Shuffling')
shuffled = self._shuffle(chain(mapped))'Running reducer threads')
with ThreadPoolExecutor(max_workers=self.__num_reducers) as executor:
reduced = executor.submit(self._reduce, shuffled).result()
if outputFile is not None:'Writing data')
with open(outputFile, 'w') as fout:
for values in reduced:
return reduced
def flatMapper(k1: Any, v1: Any) -> Iterable[Tuple[Any, Any]]:
Performs a flatMap on a given k1 and v1. Feel free to yield more than 1 value (or even zero) for a given input.
:param k1: input key
:param v1: input value
:return: a collection of (k2, v2) tuples
raise NotImplementedError()
def reducer(k2: Any, v2s: Any) -> Tuple[Any, Any]:
Performs a reduce on a given k2 and corresponding v2-collection.
:param k2: inner key
:param v2s: inner value collection for the given key
:return: (k3, v3) as a tuple
raise NotImplementedError()
def getData(inputFile) -> Iterable[Tuple[int, str]]:
Loads the text data from the location and enumerates rows.
@param inputFile: location of the input file
:return: the data to be processed with numbered rows
with open(inputFile, 'r') as fin:
yield from ((k1, v1) for k1, v1 in enumerate(fin.readlines()))
if __name__ == '__main__':
class WordCountMapReduce(MRSimulator):
def flatMapper(k1, v1):
for x in str.split(v1, ' '):
x = x.strip().strip('.,-";)(:[]+').lower()
if x != '':
yield (x, 1)
def reducer(k2, v2s):
return k2, sum(v2s)
class MyMapReduce(MRSimulator):
def flatMapper(k1, v1):
raise NotImplementedError() # TODO: your implementation
def reducer(k2, v2s):
raise NotImplementedError() # TODO: your implementation
myInputFile = 'war_peace_text' #
wc = WordCountMapReduce().mapReduce(myInputFile)
print('\nWords with more than 10000 occurences in {}:\n{}\n'.format(
'\t{}\t{}'.format(k3, v3) for k3, v3 in wc if v3 > 10000
myInputFile = 'war_peace_text' # TODO your file location here
for k3, v3 in MyMapReduce().mapReduce(myInputFile):
print(k3, v3)
except NotImplementedError:
print('Not yet implemented...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.