Last active
December 24, 2015 19:59
-
-
Save eigenhombre/6854438 to your computer and use it in GitHub Desktop.
pmap in python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from toolz import map, drop, first | |
import Queue | |
import contextlib | |
import itertools | |
import multiprocessing | |
import threading | |
import time | |
def timeit(f): | |
t = time.time() | |
f() | |
return time.time() - t | |
@contextlib.contextmanager | |
def threaded(*args, **kwargs): | |
t = threading.Thread(*args, **kwargs) | |
t.start() | |
try: | |
yield | |
finally: | |
t.join() | |
def seque(seq): | |
""" | |
Consume seq in a separate thread, passing it to the calling | |
thread in a queue. | |
""" | |
DONE = "_________XXXX___________DONE_ZZ" | |
q = Queue.Queue() | |
def wrapper(): | |
for el in seq: | |
q.put(el) | |
q.put(DONE) | |
with threaded(target=wrapper): | |
while True: | |
el = q.get() | |
if el == DONE: | |
break | |
yield el | |
def slow_range(n): | |
for i in xrange(n): | |
time.sleep(0.1) | |
yield i | |
def test_seque(): | |
assert timeit(lambda: list(seque(slow_range(10)))) < 1.2 | |
_num_cores = multiprocessing.cpu_count | |
class Future(object): | |
def __init__(self, func): | |
self.done = threading.Event() | |
def wrapper(): | |
self.value = func() | |
self.done.set() | |
self.thread = threading.Thread(target=wrapper) | |
self.thread.start() | |
def deref(self): | |
self.done.wait() | |
return self.value | |
def test_future_deref(): | |
f = Future(lambda: True) | |
assert f.deref() == True | |
def test_future_timing(): | |
def slow_future(): | |
time.sleep(0.3) | |
t0 = time.time() | |
f = Future(slow_future) | |
assert time.time() - t0 < 0.1 | |
f.deref() | |
assert time.time() - t0 > 0.3 | |
def partition_all(n, seq): | |
it = iter(seq) | |
while True: | |
block = list(itertools.islice(it, n)) | |
if not block: | |
break | |
yield block | |
def pmap(f, seq): | |
partitioned = partition_all(_num_cores(), seq) | |
for subseq in partitioned: | |
threads = list(map(lambda z: Future(lambda: f(z)), subseq)) | |
for th in threads: | |
yield th.deref() | |
def test_pmap(): | |
def square(x): | |
return x ** 2 | |
def slow_square(x): | |
time.sleep(0.04) | |
return x ** 2 | |
t0 = time.time() | |
result = pmap(square, xrange(100)) | |
assert time.time() - t0 < 0.01 | |
assert list(result) == list(map(square, xrange(100))) | |
t0 = time.time() | |
result = pmap(slow_square, xrange(100)) | |
assert time.time() - t0 < 0.2 | |
assert list(result) == list(map(square, xrange(100))) | |
print timeit(lambda: list(map(slow_square, xrange(100)))) | |
# 4.08619499207 | |
print timeit(lambda: list(pmap(slow_square, xrange(100)))) | |
# 0.553040027618 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@mrocklin there is a
concurrent.futures
which smells similar but it's 3.2+ only (though there is afutures
module on PyPI).