-
-
Save jtaylor/44f4912ff41919099f6b to your computer and use it in GitHub Desktop.
Map a function across elements in parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pmap import pmap |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""pmap - Parallel replacement for itertools.map | |
Read the docstring for pmap and try running pmap.py. | |
For example. | |
>python pmap.py | |
itertools.map took 1.98 seconds | |
pmap (0 Workers) took 1.99 seconds | |
pmap (1 Workers) took 2.22 seconds | |
pmap (2 Workers) took 1.20 seconds | |
pmap (3 Workers) took 0.79 seconds | |
pmap (4 Workers) took 0.60 seconds | |
pmap (5 Workers) took 0.62 seconds | |
pmap (6 Workers) took 0.65 seconds | |
pmap (7 Workers) took 0.62 seconds | |
""" | |
import errno | |
import heapq | |
import multiprocessing as mp | |
from itertools import chain, repeat | |
import time | |
def retry_on_eintr(function, *args, **kwargs): | |
"""Calls function, recalling if EINTR is raised.""" | |
while True: | |
try: | |
return function(*args, **kwargs) | |
except IOError, e: | |
if e.errno == errno.EINTR: | |
continue | |
else: | |
raise | |
def pmap(func, iter, n_procs=4, buffer = -1): | |
"""This is intended as a parallelized drop in replacement for itertools.map | |
This just calls func on every value iter provides. For example: | |
def run_job(value): | |
# Some computation | |
... | |
return result | |
# Get an iterator over results. | |
results = pmap(run_job, values) | |
When n_procs = 0, this | |
could be defined as: | |
def pmap(func, iter): | |
for value in iter: | |
yield func(value) | |
return | |
When n_procs > 0, n_procs worker processes are forked all with access to a | |
job queue and results queue. The values from iter are passed through the | |
job queue to the workers. The results are passed back to the master | |
process through the results queue, where some work is done to make sure | |
they are yielded in the correct order. | |
NOTES: | |
* I may not know what I am doing, so use at your own peril, this appears | |
to work for me. | |
* As this is an iterator, you can be working on the results coming out | |
of the iterator while the processes are still running. | |
* If buffer is specified, then only buffer number of jobs are allowed | |
in the queue at once, and hence consumed from iter. This is useful | |
if iter is a generator, that you don't want to consume all at once. | |
* There is overhead, so you do not want to use this to perform a ton of | |
very small jobs. Although, you can just set n_procs=0 for that. | |
* Passing things through queues is slow. It is also often inconvenient | |
to construct iters containing everything needed for func. Since func | |
can be a closure/lambda, you can use it to access variables in the | |
scope it is defined in. A very simple example illustrates this: | |
x = np.random.randn(1000,100000) | |
def sum_column(i_row): | |
return x[i_row].sum() | |
# Slow. Rows are passed through the pipe | |
column_sums = pmap(sum_column, x) | |
# Faster, as only indices are passed through the pipe | |
column_sums = pmap(sum_column, xrange(1000)) | |
# Fastest, because this is a stupid example. | |
column_sums = x.sum(axis=1) | |
* Yes, you can read from these variables, but they are copied during | |
the fork to each process, so you cannot use this trick to return | |
results. The only way to communicate back is by returning | |
a value in your function, so you probably don't want to make those | |
huge. | |
* That said, I think you may be able to engineer around this by | |
using other multiprocessing primatives (Manager comes to mind). | |
* It looks like there is possibly a memory leak that causes the returned | |
results to not be deleted until everything is finished. | |
""" | |
if n_procs == 0: | |
for job in iter: | |
yield func(job) | |
return | |
job_queue = mp.Queue() | |
result_queue = mp.Queue() | |
# Whether we should buffer the input to the job queue | |
is_buffered = buffer != -1 | |
buffer_sem = None | |
# Create a semaphore to regulate the buffer. | |
if is_buffered: | |
buffer_sem = mp.Semaphore(buffer) | |
def job_queue_filler(job_queue, buffer_sem): | |
# Enumerate the jobs. | |
jobs = enumerate(iter) | |
while True: | |
# Check for space in the buffer. | |
if is_buffered: | |
try: | |
buffer_sem.acquire() | |
except KeyboardInterrupt: | |
raise Exception() | |
# Enqueue the next job. | |
try: | |
job_queue.put(jobs.next()) | |
except StopIteration: | |
break | |
# Put in n_procs sentinal jobs. | |
for job in repeat((-1,-1), n_procs): | |
job_queue.put(job) | |
def process(job_queue, buffer_sem, result_queue): | |
while True: | |
# Get a job | |
(i_job, job) = retry_on_eintr(job_queue.get) | |
# NOTE: It is not a good idea to just check | |
# if the queue is empty since it might be the | |
# case, that the main process was just too slow | |
# to fill it up. Thus, we always get a job, | |
# but it might be one of the sentinels. | |
# Received sentinel signal to end this process. | |
if i_job == -1: | |
break | |
if is_buffered: | |
buffer_sem.release() | |
# Perform the job. | |
result = func(job) | |
# Return the result: | |
result_queue.put((i_job, result)) | |
del result | |
# Send sentinal to indicate completion of this process. | |
result_queue.put((-1,-1)) | |
# Heap to hold all finished jobs, maintaining heap priority | |
# so that they come out in the right order. | |
heap = [] | |
current_job = 0 | |
try: | |
# Start filling the job queue. | |
if is_buffered: | |
job_queue_filler_proc = mp.Process(target = job_queue_filler, args = (job_queue, buffer_sem)) | |
job_queue_filler_proc.daemon = True | |
job_queue_filler_proc.start() | |
else: | |
job_queue_filler(job_queue, buffer_sem) | |
# Spawn a bunch of worker processes. | |
procs = [mp.Process(target=process, args = (job_queue, buffer_sem, result_queue)) for i in range(n_procs)] | |
for proc in procs: | |
proc.daemon = True | |
proc.start() | |
n_sentinals_seen = 0 | |
# Start yielding results out of the result queue until | |
# we have seen all sentinals and the heap is truly empty. | |
while n_sentinals_seen != n_procs or len(heap) != 0: | |
# This deals with a bizzare problem where signal handlers | |
job_result_tuple = retry_on_eintr(result_queue.get) | |
# Check for sentinal indicating one of the workers is done. | |
if job_result_tuple[0] == -1: | |
n_sentinals_seen += 1 | |
continue | |
# Push this job onto the heap. | |
heapq.heappush(heap, job_result_tuple) | |
# And process the heap until its empty, or | |
# only out of order jobs remain, or | |
# we have finished all of the jobs. | |
while len(heap) != 0 and heap[0][0] == current_job \ | |
and n_sentinals_seen != n_procs: | |
# If we are here, then the next job is ready, | |
# so grab it from the heap. | |
(job, result) = heapq.heappop(heap) | |
# Yield it to the caller. | |
yield result | |
# And advance what job we are looking for. | |
current_job += 1 | |
except KeyboardInterrupt: | |
# Terminate all processes | |
for proc in procs: | |
proc.terminate() | |
if is_buffered: | |
job_queue_filler_proc.terminate() | |
# This fixes a problem in which ctrl-c does not properly terminate | |
# the whole process when there is a lot of stuff that has been added | |
# to the job queue. I believe this is because the Queue has a finite | |
# size, and is continually fed by what is called in the multiprocessing | |
# documentation a "filler" thread. Thus, when all the processes are | |
# terminated, this filler thread is still running trying to fill the | |
# queue up, and the process does not end. I believe this ends that | |
# thread allowing the process to properly end. | |
job_queue.cancel_join_thread() | |
#result_queue.cancel_join_thread() | |
# Join all processes. | |
if is_buffered: | |
job_queue_filler_proc.join() | |
for proc in procs: | |
proc.join() | |
# And get out. | |
raise | |
def main(): | |
import random | |
from time import time | |
from math import sin, cos | |
n_iters = 500 | |
n_elems = 10000 | |
# Some random data. | |
x = [random.random() for i in range(n_elems)] | |
y = [random.random() for i in range(n_elems)] | |
# A fake long running function. | |
def f(i): | |
rc = 0.0 | |
for (ex, ey) in zip(x,y): | |
rc += sin(ex) * cos(ey) | |
return rc | |
# Time map -- coorce to list so that we know computation is done. | |
t0 = time() | |
result = list(map(f, xrange(n_iters))) | |
print 'itertools.map took %.2f seconds' % (time() - t0) | |
# And pmap with various amounts of workers. | |
for n_procs in range(8): | |
t0 = time() | |
result_pmap = list(pmap(f, xrange(n_iters), n_procs = n_procs, buffer=1)) | |
print 'pmap (%d Workers) took %.2f seconds' % (n_procs, time() - t0) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import random | |
from pmap import pmap | |
def test_pmap(): | |
def f(x): | |
time.sleep(random.random() * .1) | |
return x * x | |
x = range(100) | |
xx = [e*e for e in x] | |
xxhat = list(pmap(f, x, n_procs=10)) | |
assert len(xxhat) == len(xx) | |
for (result, expected_result) in zip(xxhat, xx): | |
assert result == expected_result | |
xxhat = list(pmap(f, x, n_procs=10, buffer=1)) | |
assert len(xxhat) == len(xx) | |
for (result, expected_result) in zip(xxhat, xx): | |
assert result == expected_result | |
if __name__ == '__main__': | |
# FIXME: This should be turned into a real test. For now, run and | |
# make sure that ctrl+c actually ends everything cleanly when | |
# pressed early on during a slow iteration over a large number of jobs. | |
# That is, when the queue filler thread is still filling the job | |
# queue. See pmap.py for more explanation. | |
import time | |
LARGE_N = 10000 | |
list(pmap(lambda x : time.sleep(1), xrange(LARGE_N), n_procs=1)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment