Skip to content

Instantly share code, notes, and snippets.

@jtaylor
Created March 9, 2012 20:12
Show Gist options
  • Save jtaylor/44f4912ff41919099f6b to your computer and use it in GitHub Desktop.
Save jtaylor/44f4912ff41919099f6b to your computer and use it in GitHub Desktop.
Map a function across elements in parallel
from pmap import pmap
"""pmap - Parallel replacement for itertools.map
Read the docstring for pmap and try running pmap.py.
For example.
>python pmap.py
itertools.map took 1.98 seconds
pmap (0 Workers) took 1.99 seconds
pmap (1 Workers) took 2.22 seconds
pmap (2 Workers) took 1.20 seconds
pmap (3 Workers) took 0.79 seconds
pmap (4 Workers) took 0.60 seconds
pmap (5 Workers) took 0.62 seconds
pmap (6 Workers) took 0.65 seconds
pmap (7 Workers) took 0.62 seconds
"""
import errno
import heapq
import multiprocessing as mp
from itertools import chain, repeat
import time
def retry_on_eintr(function, *args, **kwargs):
"""Calls function, recalling if EINTR is raised."""
while True:
try:
return function(*args, **kwargs)
except IOError, e:
if e.errno == errno.EINTR:
continue
else:
raise
def pmap(func, iter, n_procs=4, buffer = -1):
"""This is intended as a parallelized drop in replacement for itertools.map
This just calls func on every value iter provides. For example:
def run_job(value):
# Some computation
...
return result
# Get an iterator over results.
results = pmap(run_job, values)
When n_procs = 0, this
could be defined as:
def pmap(func, iter):
for value in iter:
yield func(value)
return
When n_procs > 0, n_procs worker processes are forked all with access to a
job queue and results queue. The values from iter are passed through the
job queue to the workers. The results are passed back to the master
process through the results queue, where some work is done to make sure
they are yielded in the correct order.
NOTES:
* I may not know what I am doing, so use at your own peril, this appears
to work for me.
* As this is an iterator, you can be working on the results coming out
of the iterator while the processes are still running.
* If buffer is specified, then only buffer number of jobs are allowed
in the queue at once, and hence consumed from iter. This is useful
if iter is a generator, that you don't want to consume all at once.
* There is overhead, so you do not want to use this to perform a ton of
very small jobs. Although, you can just set n_procs=0 for that.
* Passing things through queues is slow. It is also often inconvenient
to construct iters containing everything needed for func. Since func
can be a closure/lambda, you can use it to access variables in the
scope it is defined in. A very simple example illustrates this:
x = np.random.randn(1000,100000)
def sum_column(i_row):
return x[i_row].sum()
# Slow. Rows are passed through the pipe
column_sums = pmap(sum_column, x)
# Faster, as only indices are passed through the pipe
column_sums = pmap(sum_column, xrange(1000))
# Fastest, because this is a stupid example.
column_sums = x.sum(axis=1)
* Yes, you can read from these variables, but they are copied during
the fork to each process, so you cannot use this trick to return
results. The only way to communicate back is by returning
a value in your function, so you probably don't want to make those
huge.
* That said, I think you may be able to engineer around this by
using other multiprocessing primatives (Manager comes to mind).
* It looks like there is possibly a memory leak that causes the returned
results to not be deleted until everything is finished.
"""
if n_procs == 0:
for job in iter:
yield func(job)
return
job_queue = mp.Queue()
result_queue = mp.Queue()
# Whether we should buffer the input to the job queue
is_buffered = buffer != -1
buffer_sem = None
# Create a semaphore to regulate the buffer.
if is_buffered:
buffer_sem = mp.Semaphore(buffer)
def job_queue_filler(job_queue, buffer_sem):
# Enumerate the jobs.
jobs = enumerate(iter)
while True:
# Check for space in the buffer.
if is_buffered:
try:
buffer_sem.acquire()
except KeyboardInterrupt:
raise Exception()
# Enqueue the next job.
try:
job_queue.put(jobs.next())
except StopIteration:
break
# Put in n_procs sentinal jobs.
for job in repeat((-1,-1), n_procs):
job_queue.put(job)
def process(job_queue, buffer_sem, result_queue):
while True:
# Get a job
(i_job, job) = retry_on_eintr(job_queue.get)
# NOTE: It is not a good idea to just check
# if the queue is empty since it might be the
# case, that the main process was just too slow
# to fill it up. Thus, we always get a job,
# but it might be one of the sentinels.
# Received sentinel signal to end this process.
if i_job == -1:
break
if is_buffered:
buffer_sem.release()
# Perform the job.
result = func(job)
# Return the result:
result_queue.put((i_job, result))
del result
# Send sentinal to indicate completion of this process.
result_queue.put((-1,-1))
# Heap to hold all finished jobs, maintaining heap priority
# so that they come out in the right order.
heap = []
current_job = 0
try:
# Start filling the job queue.
if is_buffered:
job_queue_filler_proc = mp.Process(target = job_queue_filler, args = (job_queue, buffer_sem))
job_queue_filler_proc.daemon = True
job_queue_filler_proc.start()
else:
job_queue_filler(job_queue, buffer_sem)
# Spawn a bunch of worker processes.
procs = [mp.Process(target=process, args = (job_queue, buffer_sem, result_queue)) for i in range(n_procs)]
for proc in procs:
proc.daemon = True
proc.start()
n_sentinals_seen = 0
# Start yielding results out of the result queue until
# we have seen all sentinals and the heap is truly empty.
while n_sentinals_seen != n_procs or len(heap) != 0:
# This deals with a bizzare problem where signal handlers
job_result_tuple = retry_on_eintr(result_queue.get)
# Check for sentinal indicating one of the workers is done.
if job_result_tuple[0] == -1:
n_sentinals_seen += 1
continue
# Push this job onto the heap.
heapq.heappush(heap, job_result_tuple)
# And process the heap until its empty, or
# only out of order jobs remain, or
# we have finished all of the jobs.
while len(heap) != 0 and heap[0][0] == current_job \
and n_sentinals_seen != n_procs:
# If we are here, then the next job is ready,
# so grab it from the heap.
(job, result) = heapq.heappop(heap)
# Yield it to the caller.
yield result
# And advance what job we are looking for.
current_job += 1
except KeyboardInterrupt:
# Terminate all processes
for proc in procs:
proc.terminate()
if is_buffered:
job_queue_filler_proc.terminate()
# This fixes a problem in which ctrl-c does not properly terminate
# the whole process when there is a lot of stuff that has been added
# to the job queue. I believe this is because the Queue has a finite
# size, and is continually fed by what is called in the multiprocessing
# documentation a "filler" thread. Thus, when all the processes are
# terminated, this filler thread is still running trying to fill the
# queue up, and the process does not end. I believe this ends that
# thread allowing the process to properly end.
job_queue.cancel_join_thread()
#result_queue.cancel_join_thread()
# Join all processes.
if is_buffered:
job_queue_filler_proc.join()
for proc in procs:
proc.join()
# And get out.
raise
def main():
import random
from time import time
from math import sin, cos
n_iters = 500
n_elems = 10000
# Some random data.
x = [random.random() for i in range(n_elems)]
y = [random.random() for i in range(n_elems)]
# A fake long running function.
def f(i):
rc = 0.0
for (ex, ey) in zip(x,y):
rc += sin(ex) * cos(ey)
return rc
# Time map -- coorce to list so that we know computation is done.
t0 = time()
result = list(map(f, xrange(n_iters)))
print 'itertools.map took %.2f seconds' % (time() - t0)
# And pmap with various amounts of workers.
for n_procs in range(8):
t0 = time()
result_pmap = list(pmap(f, xrange(n_iters), n_procs = n_procs, buffer=1))
print 'pmap (%d Workers) took %.2f seconds' % (n_procs, time() - t0)
if __name__ == '__main__':
main()
import time
import random
from pmap import pmap
def test_pmap():
def f(x):
time.sleep(random.random() * .1)
return x * x
x = range(100)
xx = [e*e for e in x]
xxhat = list(pmap(f, x, n_procs=10))
assert len(xxhat) == len(xx)
for (result, expected_result) in zip(xxhat, xx):
assert result == expected_result
xxhat = list(pmap(f, x, n_procs=10, buffer=1))
assert len(xxhat) == len(xx)
for (result, expected_result) in zip(xxhat, xx):
assert result == expected_result
if __name__ == '__main__':
# FIXME: This should be turned into a real test. For now, run and
# make sure that ctrl+c actually ends everything cleanly when
# pressed early on during a slow iteration over a large number of jobs.
# That is, when the queue filler thread is still filling the job
# queue. See pmap.py for more explanation.
import time
LARGE_N = 10000
list(pmap(lambda x : time.sleep(1), xrange(LARGE_N), n_procs=1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment