-
-
Save hahastudio/401ff4dc382ad75e4d3f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from multiprocessing import Pool, cpu_count | |
import sys | |
import os | |
import glob | |
import string | |
import re | |
import time | |
import cPickle | |
import itertools | |
import heapq | |
# ==================== Pickle Tricks starts ==================== | |
# This will help you pickle a instance method | |
# Oh such weak pickle module | |
import copy_reg | |
import types | |
def _pickle_method(method): | |
func_name = method.im_func.__name__ | |
obj = method.im_self | |
cls = method.im_class | |
# deal with mangled names like __foo | |
if func_name.startswith('__') and not func_name.endswith('__'): | |
cls_name = cls.__name__.lstrip('_') | |
func_name = '_' + cls_name + func_name | |
return _unpickle_method, (func_name, obj, cls) | |
def _unpickle_method(func_name, obj, cls): | |
for cls in cls.__mro__: | |
try: | |
func = cls.__dict__[func_name] | |
except KeyError: | |
pass | |
else: | |
break | |
return func.__get__(obj, cls) | |
copy_reg.pickle(types.MethodType, _pickle_method, _unpickle_method) | |
# ===================== Pickle Tricks ends ===================== | |
# ==================== Pre-defined RegExp starts ==================== | |
ALPHABET_WORD_PATTERN = re.compile("[a-zA-Z][a-zA-Z0-9]*") | |
# ===================== Pre-defined RegExp ends ===================== | |
class Job(object): | |
"""docstring for Job""" | |
def __init__(self, name): | |
self.name = name | |
self.input_path = os.getcwd() | |
self.output_path = os.getcwd() | |
self.map_tasks = cpu_count() | |
self.reduce_tasks = cpu_count() / 2 | |
self.mapper = None | |
self.reducer = None | |
def set_input_path(self, input_path): | |
self.input_path = input_path | |
def set_output_path(self, output_path): | |
self.output_path = output_path | |
def set_map_tasks(self, map_tasks): | |
self.map_tasks = map_tasks | |
def set_reduce_tasks(self, reduce_tasks): | |
self.reduce_tasks = reduce_tasks | |
def set_mapper(self, mapper): | |
self.mapper = mapper | |
mapper.job = self | |
def set_reducer(self, reducer): | |
self.reducer = reducer | |
reducer.job = self | |
def run(self): | |
print "MapReduce Job [%s] has been started......" % self.name | |
t_job_start = time.clock() | |
self.mapper.run() | |
self.reducer.run() | |
t_job_end = time.clock() | |
print "MapReduce Job [%s] has been finished in %.6f clocks" \ | |
% (self.name, t_job_end - t_job_start) | |
class Mapper(object): | |
"""docstring for Mapper""" | |
def __init__(self): | |
self.job = None | |
def split(self): | |
file_list = [self.job.input_path] \ | |
if os.path.isfile(self.job.input_path) else \ | |
glob.glob(os.path.join(self.job.input_path, "*")) | |
i = 0 # split index | |
split_list = [open("split-%s-%d" % (self.job.name, i), "a+") | |
for i in range(self.job.map_tasks)] | |
for file_name in file_list: | |
with open(file_name, "r") as f: | |
while True: | |
offset = f.tell() | |
buf = f.readline() | |
if buf == "": | |
break | |
split_list[i].write( | |
"%s\t%d\t%s" % (file_name, offset, buf)) | |
split_list[i].write("\n" if buf[-1] != "\n" else "") | |
i = (i + 1) % self.job.map_tasks | |
for file_split in split_list: | |
file_split.close() | |
def partition(self, key): | |
return hash(key) % self.job.reduce_tasks | |
def map(self, value, context): | |
for token in ALPHABET_WORD_PATTERN.findall(value): | |
yield (token.lower(), 1) | |
def do_map(self, i): | |
split_name = "split-%s-%d" % (self.job.name, i) | |
map_list = [open("map-%s-%d-%d" % (self.job.name, i, r), "ab") | |
for r in range(self.job.reduce_tasks)] | |
with open(split_name, 'r') as file_split: | |
for in_line in file_split: | |
file_name, offset, line = in_line.split('\t', 2) | |
context = {} | |
context["file_name"] = file_name | |
context["offset"] = offset | |
for key, value in self.map(line, context): | |
r = self.partition(key) | |
cPickle.dump((key, value), map_list[r]) | |
for file_map in map_list: | |
file_map.close() | |
os.unlink(split_name) | |
def _sort_per_file(self, file_name): | |
kvs = [] | |
with open(file_name, 'rb') as f: | |
while 1: | |
try: | |
kvs.append(cPickle.load(f)) | |
except EOFError, e: | |
break | |
kvs.sort() | |
with open(file_name + "-sorted", "ab") as fout: | |
for kv in kvs: | |
cPickle.dump(kv, fout) | |
os.unlink(file_name) | |
def merge(self): | |
for r in range(self.job.reduce_tasks): | |
pool = Pool(processes=self.job.map_tasks) | |
pool.map(self._sort_per_file, | |
("map-%s-%d-%d" % (self.job.name, i, r) | |
for i in range(self.job.map_tasks))) | |
pool.close() | |
pool.join() | |
name_list = glob.glob("map-%s-*-%d-sorted" % (self.job.name, r)) | |
file_list = [open(file_name, 'rb') for file_name in name_list] | |
with open("map-%s-%d" % (self.job.name, r), "ab") as fout: | |
heap = [] | |
for i, f in enumerate(file_map for file_map in file_list): | |
try: | |
key, value = cPickle.load(f) | |
toadd = (key, value, i, f) | |
heap.append(toadd) | |
except EOFError: | |
continue | |
heapq.heapify(heap) | |
while heap: | |
key, value, i, f = heap[0] | |
cPickle.dump((key, value), fout) | |
try: | |
key, value = cPickle.load(f) | |
heapq.heapreplace(heap, (key, value, i, f)) | |
except EOFError: | |
heapq.heappop(heap) | |
for f in file_list: | |
f.close() | |
for file_name in name_list: | |
os.unlink(file_name) | |
def run(self): | |
print "{0:=^79}".format(" Map Process started ") | |
print "Start: split input" | |
t_split_start = time.clock() | |
for i in range(self.job.map_tasks): | |
with open("split-%s-%d" % (self.job.name, i), "a+") as f_tmp: | |
pass | |
self.split() | |
t_split_end = time.clock() | |
print "Finish: split input in %.6f clocks" \ | |
% (t_split_end - t_split_start) | |
print "Start: map process" | |
t_map_start = time.clock() | |
for i in range(self.job.map_tasks): | |
for r in range(self.job.reduce_tasks): | |
with open("map-%s-%d-%d" % (self.job.name, i, r), "a+") \ | |
as f_tmp: | |
pass | |
pool = Pool(processes=self.job.map_tasks) | |
pool.map(self.do_map, range(self.job.map_tasks)) | |
pool.close() | |
pool.join() | |
t_map_end = time.clock() | |
print "Finish: map process in %.6f clocks" % (t_map_end - t_map_start) | |
print "Start: merge process for reduce" | |
t_merge_start = time.clock() | |
for r in range(self.job.reduce_tasks): | |
with open("map-%s-%d" % (self.job.name, r), "a+") as f_tmp: | |
pass | |
self.merge() | |
t_merge_end = time.clock() | |
print "Finish: merge process for reduce in %.6f clocks" \ | |
% (t_merge_end - t_merge_start) | |
print "Map All Done: " + \ | |
"All Map Processes have been finished in %.6f clocks." \ | |
% (t_merge_end - t_split_start) | |
print "{0:=^79}".format(" Map Process finished ") | |
class Reducer(object): | |
"""docstring for Reducer""" | |
def __init__(self): | |
self.job = None | |
def reduce(self, key, values, context): | |
return key, reduce(lambda x, y: x + y, values) | |
def do_reduce(self, r): | |
map_name = "map-%s-%d" % (self.job.name, r) | |
reduce_name = "reduce-%s-%d" % (self.job.name, r) | |
context = {} | |
with open(map_name, 'rb') as file_map: | |
pre_key = None | |
values_in = [] | |
for key, kvs in itertools.groupby(self.__iter_pickle(file_map), | |
lambda kv: kv[0]): | |
key_out, value_out = self.reduce( | |
key, (v for k, v in kvs), context) | |
with open(reduce_name, "ab") as fout: | |
cPickle.dump((key_out, value_out), fout) | |
os.unlink(map_name) | |
def __iter_pickle(self, f): | |
while 1: | |
try: | |
result = cPickle.load(f) | |
yield result | |
except EOFError, e: | |
break | |
def _sort_per_file(self, file_name): | |
kvs = [] | |
with open(file_name, 'rb') as f: | |
while 1: | |
try: | |
kvs.append(cPickle.load(f)) | |
except EOFError, e: | |
break | |
kvs.sort() | |
with open(file_name + "-sorted", "ab") as fout: | |
for kv in kvs: | |
cPickle.dump(kv, fout) | |
os.unlink(file_name) | |
def merge(self): | |
pool = Pool(processes=self.job.reduce_tasks) | |
pool.map(self._sort_per_file, | |
("reduce-%s-%d" % (self.job.name, r) | |
for r in range(self.job.reduce_tasks))) | |
pool.close() | |
pool.join() | |
if os.path.isfile(self.job.output_path): | |
file_output = self.job.output_path | |
else: | |
file_output = os.path.join(self.job.output_path, "output.txt") | |
name_list = glob.glob("reduce-%s-*-sorted" % self.job.name) | |
file_list = [open(file_name, 'rb') for file_name in name_list] | |
with open(file_output, "a+") as fout: | |
heap = [] | |
for i, f in enumerate(file_map for file_map in file_list): | |
try: | |
key, value = cPickle.load(f) | |
toadd = (key, value, i, f) | |
heap.append(toadd) | |
except EOFError: | |
continue | |
heapq.heapify(heap) | |
while heap: | |
key, value, i, f = heap[0] | |
fout.write("%s\t%s\n" % (key, value)) | |
try: | |
key, value = cPickle.load(f) | |
heapq.heapreplace(heap, (key, value, i, f)) | |
except EOFError: | |
heapq.heappop(heap) | |
for f in file_list: | |
f.close() | |
for file_name in name_list: | |
os.unlink(file_name) | |
def run(self): | |
print "{0:=^79}".format(" Reduce Process started ") | |
print "Start: reduce process" | |
t_reduce_start = time.clock() | |
for r in range(self.job.reduce_tasks): | |
with open("reduce-%s-%d" % (self.job.name, r), "a+") as f_tmp: | |
pass | |
pool = Pool(processes=self.job.reduce_tasks) | |
pool.map(self.do_reduce, range(self.job.reduce_tasks)) | |
pool.close() | |
pool.join() | |
t_reduce_end = time.clock() | |
print "Finish: reduce process in %.6f clocks" \ | |
% (t_reduce_end - t_reduce_start) | |
print "Start: merge process for result" | |
t_merge_start = time.clock() | |
self.merge() | |
t_merge_end = time.clock() | |
print "Finish: merge process for result in %.6f clocks" \ | |
% (t_merge_end - t_merge_start) | |
print "Reduce All Done: " + \ | |
"All Reduce Processes have been finished in %.6f clocks." \ | |
% (t_merge_end - t_reduce_start) | |
print "{0:=^79}".format(" Reduce Process finished ") | |
if __name__ == '__main__': | |
j = Job("test") | |
m = Mapper() | |
r = Reducer() | |
j.set_input_path("Input") | |
j.set_output_path("Output") | |
# j.set_map_tasks(4) | |
# j.set_reduce_tasks(2) | |
j.set_mapper(m) | |
j.set_reducer(r) | |
j.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment