Skip to content

Instantly share code, notes, and snippets.

@eigenhombre
Last active December 24, 2015 19:59
Show Gist options
  • Save eigenhombre/6854438 to your computer and use it in GitHub Desktop.
Save eigenhombre/6854438 to your computer and use it in GitHub Desktop.
pmap in python
from toolz import map, drop, first
import Queue
import contextlib
import itertools
import multiprocessing
import threading
import time
def timeit(f):
t = time.time()
f()
return time.time() - t
@contextlib.contextmanager
def threaded(*args, **kwargs):
t = threading.Thread(*args, **kwargs)
t.start()
try:
yield
finally:
t.join()
def seque(seq):
"""
Consume seq in a separate thread, passing it to the calling
thread in a queue.
"""
DONE = "_________XXXX___________DONE_ZZ"
q = Queue.Queue()
def wrapper():
for el in seq:
q.put(el)
q.put(DONE)
with threaded(target=wrapper):
while True:
el = q.get()
if el == DONE:
break
yield el
def slow_range(n):
for i in xrange(n):
time.sleep(0.1)
yield i
def test_seque():
assert timeit(lambda: list(seque(slow_range(10)))) < 1.2
_num_cores = multiprocessing.cpu_count
class Future(object):
def __init__(self, func):
self.done = threading.Event()
def wrapper():
self.value = func()
self.done.set()
self.thread = threading.Thread(target=wrapper)
self.thread.start()
def deref(self):
self.done.wait()
return self.value
def test_future_deref():
f = Future(lambda: True)
assert f.deref() == True
def test_future_timing():
def slow_future():
time.sleep(0.3)
t0 = time.time()
f = Future(slow_future)
assert time.time() - t0 < 0.1
f.deref()
assert time.time() - t0 > 0.3
def partition_all(n, seq):
it = iter(seq)
while True:
block = list(itertools.islice(it, n))
if not block:
break
yield block
def pmap(f, seq):
partitioned = partition_all(_num_cores(), seq)
for subseq in partitioned:
threads = list(map(lambda z: Future(lambda: f(z)), subseq))
for th in threads:
yield th.deref()
def test_pmap():
def square(x):
return x ** 2
def slow_square(x):
time.sleep(0.04)
return x ** 2
t0 = time.time()
result = pmap(square, xrange(100))
assert time.time() - t0 < 0.01
assert list(result) == list(map(square, xrange(100)))
t0 = time.time()
result = pmap(slow_square, xrange(100))
assert time.time() - t0 < 0.2
assert list(result) == list(map(square, xrange(100)))
print
print timeit(lambda: list(map(slow_square, xrange(100))))
# 4.08619499207
print timeit(lambda: list(pmap(slow_square, xrange(100))))
# 0.553040027618
@eigenhombre
Copy link
Author

speedup is 7.4; _num_cores() says I have 8 (quad core + hyperthreading, I guess)

next interesting thing might be a more real example that was purely CPU bound.
(CPU + IO bound in separate threads is expected to suck per Beazley)

@mrocklin
Copy link

mrocklin commented Oct 6, 2013

I would put this in as a PR. We probably shouldn't merge it before a "major release" but feedback is easier there (if you want that that is :).

Is there a Future object somewhere in threading? Are we reinventing anything here?

In this line

partitioned = partition_all(_num_cores(), seq)

I think we will want a programmable chunksize that is different than ntasks/ncores. Imagine pmap(fib, range(100)). A simple task scheduler performs much better on heterogeneous tasks (which are common.)

@eigenhombre
Copy link
Author

@mrocklin there is a concurrent.futures which smells similar but it's 3.2+ only (though there is a futures module on PyPI).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment