Skip to content

Instantly share code, notes, and snippets.

@ligfx
Last active December 21, 2015 02:39
Show Gist options
  • Save ligfx/6236474 to your computer and use it in GitHub Desktop.
Save ligfx/6236474 to your computer and use it in GitHub Desktop.
OLS map-reduce w/ multiprocessing
import multiprocessing as mp
import numpy as np
import Queue as queue
class NullQueue:
def put(self, *args): pass
def get(self, *args): raise queue.Empty
def empty(self): return True
def mapper_factory(Func):
def mapper(id, logq, readq, writeq):
logq.put("[mapper-%s] started" % id)
for item in iter(readq.get, None):
logq.put("[mapper-%s] processing (%s)" % (id, repr(item)))
writeq.put(Func(item))
logq.put("[mapper-%s] quitting" % id)
return mapper
def reducer_factory(Func):
def reducer(id, logq, readq, writeq):
logq.put("[reducer-%s] started" % id)
def annotate(readq):
for msg in iter(readq.get, None):
logq.put("[reducer-%s] processing %s" % (id, repr(msg)))
yield msg
accum = reduce(Func, annotate(readq))
writeq.put(accum)
logq.put("[reducer-%s] accumulated value %s" % (id, repr(accum)))
logq.put("[reducer-%s] quitting" % id)
return reducer
def piter(target, seq, logq=NullQueue(), j=4):
readq = mp.Queue()
writeq = mp.Queue()
def supervisor():
workers = []
for i in range(j):
p = mp.Process(target=target, args=(str(i), logq, readq, writeq))
p.start()
workers.append(p)
for item in seq: readq.put(item)
for i in range(j): readq.put(None)
for w in workers: w.join()
writeq.put(None)
mp.Process(target=supervisor).start()
return iter(writeq.get, None)
def pmap(map_func, seq, logq=NullQueue(), mapj=4):
return piter(mapper_factory(map_func), seq, logq, mapj)
def preduce(reduce_func, seq, logq=NullQueue(), reducej=2):
return reduce(reduce_func, piter(reducer_factory(reduce_func), seq, logq, reducej))
class plog:
def __enter__(self):
self.logq = logq = mp.Queue()
def log():
for msg in iter(logq.get, None):
print(msg)
mp.Process(target=log).start()
return logq
def __exit__(self, type, value, traceback):
self.logq.put(None)
def regress(data):
"""
Run an ordinary least-squares regression. Data should be a
sequence of observations, where each observation is a tuple
containing the dependent variable followed by the independent
variables.
"""
def m(item):
X = np.matrix(item[1:])
Y = item[0]
P = X.T * X
Q = Y * X.T
return (1, P, Q)
def tuplesum(a, b):
return tuple(sum(_) for _ in zip(a, b))
with plog() as logq:
N, P, Q = preduce(tuplesum, pmap(m, data, logq=logq), logq=logq)
return (P / N).I * (Q / N)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment