Skip to content

Instantly share code, notes, and snippets.

Created February 29, 2016 23:19
Show Gist options
  • Save usrlocalben/641664d9f23218b731b2 to your computer and use it in GitHub Desktop.
Save usrlocalben/641664d9f23218b731b2 to your computer and use it in GitHub Desktop.
python parallel map-reduce onefiler
map-reduce for text-file inputs, parallel version
import os
import re
import sys
import json
import hashlib
from datetime import datetime
from multiprocessing import cpu_count, Queue, Process
DEBUG = True
"""enable/disable debug logging to stderr"""
def main():
"""perform parallel_map_reduce over the files passed on
the command line.
input files are processed in reverse-size-order to reduce
the chance of having to wait on a single worker to finish.
the results are streamed to stdout as a json list[] of
key/value pairs (also in a list)
results = parallel_map_reduce(
mapper=hour_stat_mapper, reducer=hour_stat_reducer,
#mapper=range_mapper, reducer=range_reducer,
#mapper=duplicate_finding_mapper, reducer=duplicate_finding_reducer,
data=([k, v] for k, v in results), # if value > 1),
ACCESS_LOG_PATTERN = re.compile(
def parse_apache_log_entry(line):
def parse_apache_timestamp(value):
return datetime.strptime(value.split(' ')[0], '%d/%b/%Y:%H:%M:%S')
def parse_apache_transfer_bytes(value):
return 0 if value == '-' else int(value)
match = ACCESS_LOG_PATTERN.match(line)
if not match:
raise Exception('pattern not matched')
data = match.groupdict()
return extend(data,
{'bytes': parse_apache_transfer_bytes(data['bytes']),
'time': parse_apache_timestamp(data['time'])})
def hour_stat_mapper(line):
doc = parse_apache_log_entry(line)
def truncate_to_hour(dt):
return dt.replace(minute=0, second=0, microsecond=0)
def truncate_to_day(dt):
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
def truncate_to_month(dt):
return dt.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
key = truncate_to_hour(doc['time']).isoformat()
value = {
'total_bytes': doc['bytes'],
'remote_addrs': {doc['host']: 1}
yield key, value
def hour_stat_reducer(key, values):
def merge_and_sum(a, b):
"""merge two dicts, summing their keys"""
out = dict(a)
for key, value in b.iteritems():
out[key] = out.get(key, 0) + value
return out
def _r(ax, cur):
return {
'total_bytes': ax['total_bytes'] + cur['total_bytes'],
'remote_addrs': merge_and_sum(ax['remote_addrs'],
return reduce(_r, values)
def range_mapper(line):
doc = parse_apache_log_entry(line)
yield 'min', doc['time'].isoformat()
yield 'max', doc['time'].isoformat()
yield 'total', doc['bytes']
def range_reducer(key, values):
if key == 'min':
return min(values)
elif key == 'max':
return max(values)
elif key == 'total':
return sum(values)
def duplicate_finding_mapper(line):
"""find log entries that are identical
it helps to filter value==1 from the final output.
yield hashlib.sha1(line).hexdigest(), 1
def duplicate_finding_reducer(key, values):
return reduce(lambda ax, cur: ax+cur, values)
def hash_finding_mapper(line):
"""find an entry that has a particular hash value
just use the duplicate_finding_reducer with this.
hash = hashlib.sha1(line).hexdigest()
if hash != '55e660c53e953420a5670d6556b5bc12614b2415':
yield line, 1
def map_reduce(data, mapper, reducer, ax):
"""in-memory map_reduce using a hashtable, based on raymond's:
my mods:
+ reduce periodically to conserve ram
+ mappers are generators, and may emit Zero-or-more values
+ allow multiple invocations by keeping values array-wrapped
data: iterable of items to pass to mapper
mapper: mapper function, yielding key, value pairs
reducer: reducer function
ax: accumulator dictionary
for item in data:
for key, value in mapper(item):
ax[key] = ax.get(key, []) + [value]
if len(ax[key]) > 5:
ax[key] = [reducer(key, ax[key])]
for key, values in ax.iteritems():
ax[key] = [reducer(key, values)]
def parallel_map_reduce(files, mapper, reducer):
"""manage map_reduce() workers, merge-join the results,
and perform the final reduce phase.
files will be placed in the worker-job queue in order.
files: list of filenames to process
mapper: map function
reducer: reduce function
sorted key/value pairs (each as a tuple)
# start a worker for each cpu
q_request = Queue(1)
workers = []
streams = []
for n in range(cpu_count()):
q_response = Queue()
p = Process(target=worker_main,
args=(n, q_request, q_response, mapper, reducer))
p.daemon = True
log('started', len(workers), 'workers, waiting for results')
# load filenames into the request queue
[q_request.put(fn) for fn in files]
# ...followed by quit-signals, one for each worker
[q_request.put(None) for _ in workers]
# perform the merge-join & final reduce
[s.advance() for s in streams] # prime streams
log('final merge & reduce begins')
while any(s.alive for s in streams):
streams = [s for s in streams if s.alive] # drop any dead
minkey = min(s.key for s in streams) # pick next key
values = [s.value for s in streams if s.key == minkey] # gather values
[s.advance() for s in streams if s.key == minkey] # read next val
yield minkey, reducer(minkey, values) # final reduce!
# allow worker processes to terminate properly
[p.join() for p in workers]
def worker_main(worker_id, q_in, q_out, mapper, reducer):
"""parallel map_reduce worker entry point.
perform mapreduce on filenames received on q_in until
None is received, indicating we should exit.
main thread expects to receive sorted key/value pairs
q_out. keys _must_ be unique and _may not_ repeat.
# stage 1: process input files, store in accumulator
ax = {}
while True:
filename = q_in.get()
if filename is None:
with open(filename, 'r') as fd:
log(worker_id, 'begin reading from', filename)
map_reduce(fd, mapper, reducer, ax)
# stage 2: sort and send key/val pairs to main-thread
keys = ax.keys()
for key in keys:
q_out.put((key, ax[key][0]))
# indicate to main-thread we are terminating
class MergeJoinStream(object):
"""merge-join helper class
reads values from a queue, retaining the last
read value until the caller (merge-loop) is ready
to advance to the next value.
sets a flag (alive) to indicate if this stream
has reached eof (signaled by None)
q: stdlib Queue we are reading from
data: current value read from the queue
alive: bool indicating if we have reached eof
def __init__(self, q):
self.q = q
def advance(self): = self.q.get()
self.alive = is not None
if self.alive:
self.key, self.value =
def json_list_stream_dump(data, fd, indent=None):
"""read elements from an interator, streaming the output
to a filelike as a json list.
data: iterable of objects to encode
fd: filelike stream to write to
indent: similar to stdlib json.dumps indent
if indent is None:
pretty_cr, pretty_ws = '', ''
pretty_cr, pretty_ws = '\n', ' ' * indent
item = next(data)
except StopIteration:
fd.write(pretty_cr + pretty_ws)
for item in data:
fd.write(pretty_cr + pretty_ws)
def order_filenames_by_size(fns):
"""sort a list of filenames by their sizes according to fstat
fns: list of filenames
filenames, sorted in ascending order by size
with_sizes = [(fn, os.stat(fn).st_size) for fn in fns]
with_sizes.sort(lambda a, b: -1 if a[1] < b[1] else 1)
return [fn for fn, _ in with_sizes]
def log(*args):
"""log to stderr, maybe"""
if not DEBUG: return
sys.stderr.write(' '.join(str(item) for item in args))
def extend(a, b):
"""shallow-merge a & b, b has precedence"""
out = dict(a)
return out
if __name__ == '__main__':
# vim: tabstop=4 shiftwidth=4 softtabstop=4 expandtab
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment