Skip to content

Instantly share code, notes, and snippets.

@hahastudio
Last active August 29, 2015 14:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save hahastudio/401ff4dc382ad75e4d3f to your computer and use it in GitHub Desktop.
Save hahastudio/401ff4dc382ad75e4d3f to your computer and use it in GitHub Desktop.
from multiprocessing import Pool, cpu_count
import sys
import os
import glob
import string
import re
import time
import cPickle
import itertools
import heapq
# ==================== Pickle Tricks starts ====================
# This will help you pickle a instance method
# Oh such weak pickle module
import copy_reg
import types
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
# deal with mangled names like __foo
if func_name.startswith('__') and not func_name.endswith('__'):
cls_name = cls.__name__.lstrip('_')
func_name = '_' + cls_name + func_name
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.__mro__:
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method)
# ===================== Pickle Tricks ends =====================
# ==================== Pre-defined RegExp starts ====================
ALPHABET_WORD_PATTERN = re.compile("[a-zA-Z][a-zA-Z0-9]*")
# ===================== Pre-defined RegExp ends =====================
class Job(object):
"""docstring for Job"""
def __init__(self, name):
self.name = name
self.input_path = os.getcwd()
self.output_path = os.getcwd()
self.map_tasks = cpu_count()
self.reduce_tasks = cpu_count() / 2
self.mapper = None
self.reducer = None
def set_input_path(self, input_path):
self.input_path = input_path
def set_output_path(self, output_path):
self.output_path = output_path
def set_map_tasks(self, map_tasks):
self.map_tasks = map_tasks
def set_reduce_tasks(self, reduce_tasks):
self.reduce_tasks = reduce_tasks
def set_mapper(self, mapper):
self.mapper = mapper
mapper.job = self
def set_reducer(self, reducer):
self.reducer = reducer
reducer.job = self
def run(self):
print "MapReduce Job [%s] has been started......" % self.name
t_job_start = time.clock()
self.mapper.run()
self.reducer.run()
t_job_end = time.clock()
print "MapReduce Job [%s] has been finished in %.6f clocks" \
% (self.name, t_job_end - t_job_start)
class Mapper(object):
"""docstring for Mapper"""
def __init__(self):
self.job = None
def split(self):
file_list = [self.job.input_path] \
if os.path.isfile(self.job.input_path) else \
glob.glob(os.path.join(self.job.input_path, "*"))
i = 0 # split index
split_list = [open("split-%s-%d" % (self.job.name, i), "a+")
for i in range(self.job.map_tasks)]
for file_name in file_list:
with open(file_name, "r") as f:
while True:
offset = f.tell()
buf = f.readline()
if buf == "":
break
split_list[i].write(
"%s\t%d\t%s" % (file_name, offset, buf))
split_list[i].write("\n" if buf[-1] != "\n" else "")
i = (i + 1) % self.job.map_tasks
for file_split in split_list:
file_split.close()
def partition(self, key):
return hash(key) % self.job.reduce_tasks
def map(self, value, context):
for token in ALPHABET_WORD_PATTERN.findall(value):
yield (token.lower(), 1)
def do_map(self, i):
split_name = "split-%s-%d" % (self.job.name, i)
map_list = [open("map-%s-%d-%d" % (self.job.name, i, r), "ab")
for r in range(self.job.reduce_tasks)]
with open(split_name, 'r') as file_split:
for in_line in file_split:
file_name, offset, line = in_line.split('\t', 2)
context = {}
context["file_name"] = file_name
context["offset"] = offset
for key, value in self.map(line, context):
r = self.partition(key)
cPickle.dump((key, value), map_list[r])
for file_map in map_list:
file_map.close()
os.unlink(split_name)
def _sort_per_file(self, file_name):
kvs = []
with open(file_name, 'rb') as f:
while 1:
try:
kvs.append(cPickle.load(f))
except EOFError, e:
break
kvs.sort()
with open(file_name + "-sorted", "ab") as fout:
for kv in kvs:
cPickle.dump(kv, fout)
os.unlink(file_name)
def merge(self):
for r in range(self.job.reduce_tasks):
pool = Pool(processes=self.job.map_tasks)
pool.map(self._sort_per_file,
("map-%s-%d-%d" % (self.job.name, i, r)
for i in range(self.job.map_tasks)))
pool.close()
pool.join()
name_list = glob.glob("map-%s-*-%d-sorted" % (self.job.name, r))
file_list = [open(file_name, 'rb') for file_name in name_list]
with open("map-%s-%d" % (self.job.name, r), "ab") as fout:
heap = []
for i, f in enumerate(file_map for file_map in file_list):
try:
key, value = cPickle.load(f)
toadd = (key, value, i, f)
heap.append(toadd)
except EOFError:
continue
heapq.heapify(heap)
while heap:
key, value, i, f = heap[0]
cPickle.dump((key, value), fout)
try:
key, value = cPickle.load(f)
heapq.heapreplace(heap, (key, value, i, f))
except EOFError:
heapq.heappop(heap)
for f in file_list:
f.close()
for file_name in name_list:
os.unlink(file_name)
def run(self):
print "{0:=^79}".format(" Map Process started ")
print "Start: split input"
t_split_start = time.clock()
for i in range(self.job.map_tasks):
with open("split-%s-%d" % (self.job.name, i), "a+") as f_tmp:
pass
self.split()
t_split_end = time.clock()
print "Finish: split input in %.6f clocks" \
% (t_split_end - t_split_start)
print "Start: map process"
t_map_start = time.clock()
for i in range(self.job.map_tasks):
for r in range(self.job.reduce_tasks):
with open("map-%s-%d-%d" % (self.job.name, i, r), "a+") \
as f_tmp:
pass
pool = Pool(processes=self.job.map_tasks)
pool.map(self.do_map, range(self.job.map_tasks))
pool.close()
pool.join()
t_map_end = time.clock()
print "Finish: map process in %.6f clocks" % (t_map_end - t_map_start)
print "Start: merge process for reduce"
t_merge_start = time.clock()
for r in range(self.job.reduce_tasks):
with open("map-%s-%d" % (self.job.name, r), "a+") as f_tmp:
pass
self.merge()
t_merge_end = time.clock()
print "Finish: merge process for reduce in %.6f clocks" \
% (t_merge_end - t_merge_start)
print "Map All Done: " + \
"All Map Processes have been finished in %.6f clocks." \
% (t_merge_end - t_split_start)
print "{0:=^79}".format(" Map Process finished ")
class Reducer(object):
"""docstring for Reducer"""
def __init__(self):
self.job = None
def reduce(self, key, values, context):
return key, reduce(lambda x, y: x + y, values)
def do_reduce(self, r):
map_name = "map-%s-%d" % (self.job.name, r)
reduce_name = "reduce-%s-%d" % (self.job.name, r)
context = {}
with open(map_name, 'rb') as file_map:
pre_key = None
values_in = []
for key, kvs in itertools.groupby(self.__iter_pickle(file_map),
lambda kv: kv[0]):
key_out, value_out = self.reduce(
key, (v for k, v in kvs), context)
with open(reduce_name, "ab") as fout:
cPickle.dump((key_out, value_out), fout)
os.unlink(map_name)
def __iter_pickle(self, f):
while 1:
try:
result = cPickle.load(f)
yield result
except EOFError, e:
break
def _sort_per_file(self, file_name):
kvs = []
with open(file_name, 'rb') as f:
while 1:
try:
kvs.append(cPickle.load(f))
except EOFError, e:
break
kvs.sort()
with open(file_name + "-sorted", "ab") as fout:
for kv in kvs:
cPickle.dump(kv, fout)
os.unlink(file_name)
def merge(self):
pool = Pool(processes=self.job.reduce_tasks)
pool.map(self._sort_per_file,
("reduce-%s-%d" % (self.job.name, r)
for r in range(self.job.reduce_tasks)))
pool.close()
pool.join()
if os.path.isfile(self.job.output_path):
file_output = self.job.output_path
else:
file_output = os.path.join(self.job.output_path, "output.txt")
name_list = glob.glob("reduce-%s-*-sorted" % self.job.name)
file_list = [open(file_name, 'rb') for file_name in name_list]
with open(file_output, "a+") as fout:
heap = []
for i, f in enumerate(file_map for file_map in file_list):
try:
key, value = cPickle.load(f)
toadd = (key, value, i, f)
heap.append(toadd)
except EOFError:
continue
heapq.heapify(heap)
while heap:
key, value, i, f = heap[0]
fout.write("%s\t%s\n" % (key, value))
try:
key, value = cPickle.load(f)
heapq.heapreplace(heap, (key, value, i, f))
except EOFError:
heapq.heappop(heap)
for f in file_list:
f.close()
for file_name in name_list:
os.unlink(file_name)
def run(self):
print "{0:=^79}".format(" Reduce Process started ")
print "Start: reduce process"
t_reduce_start = time.clock()
for r in range(self.job.reduce_tasks):
with open("reduce-%s-%d" % (self.job.name, r), "a+") as f_tmp:
pass
pool = Pool(processes=self.job.reduce_tasks)
pool.map(self.do_reduce, range(self.job.reduce_tasks))
pool.close()
pool.join()
t_reduce_end = time.clock()
print "Finish: reduce process in %.6f clocks" \
% (t_reduce_end - t_reduce_start)
print "Start: merge process for result"
t_merge_start = time.clock()
self.merge()
t_merge_end = time.clock()
print "Finish: merge process for result in %.6f clocks" \
% (t_merge_end - t_merge_start)
print "Reduce All Done: " + \
"All Reduce Processes have been finished in %.6f clocks." \
% (t_merge_end - t_reduce_start)
print "{0:=^79}".format(" Reduce Process finished ")
if __name__ == '__main__':
j = Job("test")
m = Mapper()
r = Reducer()
j.set_input_path("Input")
j.set_output_path("Output")
# j.set_map_tasks(4)
# j.set_reduce_tasks(2)
j.set_mapper(m)
j.set_reducer(r)
j.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment