Skip to content

Instantly share code, notes, and snippets.

@sandorkazi sandorkazi/MRSimulator.py
Last active Feb 5, 2019

Embed
What would you like to do?
MapReduce simulator in python
from abc import abstractmethod
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from itertools import chain
import logging
import sys
from typing import Tuple, Any, Iterable, Union
class MRSimulator:
def __init__(self, mappers=3, reducers=2):
self.__num_mappers = mappers
self.__num_reducers = reducers
logger = logging.getLogger('MRSimulator')
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s:%(name)s %(message)s'))
handler.setLevel(logging.INFO)
logger.addHandler(handler)
self._logger = logger
@classmethod
def _flatMap(cls, mass_k1v1: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, Any]]:
"""
Performs a flatMap on a set of values.
:param mass_k1v1: a set of (k1, v1) tuples
:return: a set of (k2, v2) tuples
"""
return chain(*map(lambda k1v1: cls.flatMapper(*k1v1), mass_k1v1))
@classmethod
def _shuffle(cls, mapped: Iterable[Tuple[Any, Any]]) -> Iterable[Tuple[Any, Iterable[Any]]]:
"""
Collects the inner key-value pairs by key.
:param mapped: a collection of (k2, v2) tuples
:return: a collection of (k2, v2-collection) tuples
"""
d = defaultdict(list)
for k2, v2 in mapped:
d[k2].append(v2)
return d.items()
@classmethod
def _reduce(cls, mass_k2v2s: Iterable[Tuple[Any, Iterable[Any]]]) -> Iterable[Tuple[Any, Any]]:
"""
Performs a reduce on a set of values.
:param mass_k2v2s: a set of (k2, v2-collection) tuples
:return: a set of (k3, v3) tuples
"""
return map(lambda k2v2s: cls.reducer(*k2v2s), mass_k2v2s)
@staticmethod
def split(a, n):
"""
Splits the sequence into quasi-equal parts.
:param a: sequence to split
:param n: number of splits
:return: the resulting list of lists
"""
k, m = divmod(len(a), n)
return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] for i in range(n))
def mapReduce(self, inputFile: str, outputFile: Union[str, None]=None) -> Iterable[Tuple[Any, Any]]:
"""
Performs a MapReduce on the given input file (map and reduce should be implemented).
:param inputFile: input file location
:param outputFile: output file location (or None if no output file should be created)
:return: the reduce output
"""
log = self._logger
log.info('Reading data')
data = self.getData(inputFile)
log.info('Running mapper threads')
with ThreadPoolExecutor(max_workers=self.__num_mappers) as executor:
mapped = executor.submit(self._flatMap, data).result()
log.info('Shuffling')
shuffled = self._shuffle(chain(mapped))
log.info('Running reducer threads')
with ThreadPoolExecutor(max_workers=self.__num_reducers) as executor:
reduced = executor.submit(self._reduce, shuffled).result()
if outputFile is not None:
log.info('Writing data')
with open(outputFile, 'w') as fout:
for values in reduced:
fout.write('{}\t{}\n'.format(*values))
return reduced
@staticmethod
@abstractmethod
def flatMapper(k1: Any, v1: Any) -> Iterable[Tuple[Any, Any]]:
"""
Performs a flatMap on a given k1 and v1. Feel free to yield more than 1 value (or even zero) for a given input.
:param k1: input key
:param v1: input value
:return: a collection of (k2, v2) tuples
"""
raise NotImplementedError()
@staticmethod
@abstractmethod
def reducer(k2: Any, v2s: Any) -> Tuple[Any, Any]:
"""
Performs a reduce on a given k2 and corresponding v2-collection.
:param k2: inner key
:param v2s: inner value collection for the given key
:return: (k3, v3) as a tuple
"""
raise NotImplementedError()
@staticmethod
def getData(inputFile) -> Iterable[Tuple[int, str]]:
"""
Loads the text data from the location and enumerates rows.
@param inputFile: location of the input file
:return: the data to be processed with numbered rows
"""
with open(inputFile, 'r') as fin:
yield from ((k1, v1) for k1, v1 in enumerate(fin.readlines()))
if __name__ == '__main__':
class WordCountMapReduce(MRSimulator):
@staticmethod
def flatMapper(k1, v1):
for x in str.split(v1, ' '):
x = x.strip().strip('.,-";)(:[]+').lower()
if x != '':
yield (x, 1)
@staticmethod
def reducer(k2, v2s):
return k2, sum(v2s)
class MyMapReduce(MRSimulator):
@staticmethod
def flatMapper(k1, v1):
raise NotImplementedError() # TODO: your implementation
@staticmethod
def reducer(k2, v2s):
raise NotImplementedError() # TODO: your implementation
myInputFile = 'war_peace_text' # http://www.textfiles.com/etext/FICTION/war_peace_text
wc = WordCountMapReduce().mapReduce(myInputFile)
print('\nWords with more than 10000 occurences in {}:\n{}\n'.format(
myInputFile,
'\n'.join(
'\t{}\t{}'.format(k3, v3) for k3, v3 in wc if v3 > 10000
)
))
try:
myInputFile = 'war_peace_text' # TODO your file location here
for k3, v3 in MyMapReduce().mapReduce(myInputFile):
print(k3, v3)
except NotImplementedError:
print('Not yet implemented...')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.