Last active
December 24, 2015 19:59
-
-
Save eigenhombre/6854438 to your computer and use it in GitHub Desktop.
pmap in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from toolz import map, drop, first | |
import Queue | |
import contextlib | |
import itertools | |
import multiprocessing | |
import threading | |
import time | |
def timeit(f): | |
t = time.time() | |
f() | |
return time.time() - t | |
@contextlib.contextmanager | |
def threaded(*args, **kwargs): | |
t = threading.Thread(*args, **kwargs) | |
t.start() | |
try: | |
yield | |
finally: | |
t.join() | |
def seque(seq): | |
""" | |
Consume seq in a separate thread, passing it to the calling | |
thread in a queue. | |
""" | |
DONE = "_________XXXX___________DONE_ZZ" | |
q = Queue.Queue() | |
def wrapper(): | |
for el in seq: | |
q.put(el) | |
q.put(DONE) | |
with threaded(target=wrapper): | |
while True: | |
el = q.get() | |
if el == DONE: | |
break | |
yield el | |
def slow_range(n): | |
for i in xrange(n): | |
time.sleep(0.1) | |
yield i | |
def test_seque(): | |
assert timeit(lambda: list(seque(slow_range(10)))) < 1.2 | |
_num_cores = multiprocessing.cpu_count | |
class Future(object): | |
def __init__(self, func): | |
self.done = threading.Event() | |
def wrapper(): | |
self.value = func() | |
self.done.set() | |
self.thread = threading.Thread(target=wrapper) | |
self.thread.start() | |
def deref(self): | |
self.done.wait() | |
return self.value | |
def test_future_deref(): | |
f = Future(lambda: True) | |
assert f.deref() == True | |
def test_future_timing(): | |
def slow_future(): | |
time.sleep(0.3) | |
t0 = time.time() | |
f = Future(slow_future) | |
assert time.time() - t0 < 0.1 | |
f.deref() | |
assert time.time() - t0 > 0.3 | |
def partition_all(n, seq): | |
it = iter(seq) | |
while True: | |
block = list(itertools.islice(it, n)) | |
if not block: | |
break | |
yield block | |
def pmap(f, seq): | |
partitioned = partition_all(_num_cores(), seq) | |
for subseq in partitioned: | |
threads = list(map(lambda z: Future(lambda: f(z)), subseq)) | |
for th in threads: | |
yield th.deref() | |
def test_pmap(): | |
def square(x): | |
return x ** 2 | |
def slow_square(x): | |
time.sleep(0.04) | |
return x ** 2 | |
t0 = time.time() | |
result = pmap(square, xrange(100)) | |
assert time.time() - t0 < 0.01 | |
assert list(result) == list(map(square, xrange(100))) | |
t0 = time.time() | |
result = pmap(slow_square, xrange(100)) | |
assert time.time() - t0 < 0.2 | |
assert list(result) == list(map(square, xrange(100))) | |
print timeit(lambda: list(map(slow_square, xrange(100)))) | |
# 4.08619499207 | |
print timeit(lambda: list(pmap(slow_square, xrange(100)))) | |
# 0.553040027618 |
@mrocklin there is a concurrent.futures
which smells similar but it's 3.2+ only (though there is a futures
module on PyPI).
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I would put this in as a PR. We probably shouldn't merge it before a "major release" but feedback is easier there (if you want that that is :).
Is there a Future object somewhere in threading? Are we reinventing anything here?
In this line
I think we will want a programmable chunksize that is different than ntasks/ncores. Imagine
pmap(fib, range(100))
. A simple task scheduler performs much better on heterogeneous tasks (which are common.)