Skip to content

Instantly share code, notes, and snippets.

@mherkazandjian
Created February 14, 2017 02:03
Show Gist options
  • Save mherkazandjian/2a27a620330cf78912aa6e163642609d to your computer and use it in GitHub Desktop.
Save mherkazandjian/2a27a620330cf78912aa6e163642609d to your computer and use it in GitHub Desktop.
pmap example
import numpy
from multiprocessing import Process, cpu_count, Queue
def pmap(func, items, func_args=(), func_kwargs={}, n_workers=cpu_count()):
"""Maps the function "func" to each item in "items" in parallel using
threads.
:param callable func: A callable that takes "array" as a first positional
argument and returns an array that is the same size the input "array".
:param numpy.ndarray items: array to which func will be mapped in parallel.
:param tuple func_args: a tuple of argument to func after the 1st arg.
:param dict func_kwargs: a dictionary of keywords to be passed to func.
:param int n_workers: The number of workers to be used in the parallel
processing "items". By default all cores/threads are used.
:return: A numpy array of object of all the values returned by "func"
applied to all the entries in "items".
.. python:
def my_func(items):
retval = []
for item in items:
# some expensive operation
sum = 0
for i in xrange(10000000):
sum += item
retval.append(sum)
return retval
retval = pmap(numpy.linspace(0, 1, 1000), my_func)
"""
def worker_wrapper(worker_func,
worker_index,
queue,
worker_args,
worker_kwargs):
"""a wrapper around the worker function that inserts the value
returned by a worker into a queue"""
worker_retval = worker_func(*worker_args, **worker_kwargs)
queue.put([worker_index, worker_retval])
# get the work loads of the workers and the indices of the entries in the
# sub-arrays to be dispatched to each worker.
items_for_workers = numpy.array_split(items, n_workers)
items_inds_workers = numpy.array_split(numpy.arange(items.size), n_workers)
# instantiate the thread and define the queue and the array that will
# hold the data computed and returned by the workers
queue = Queue()
threads = []
retval = numpy.zeros(items.size, dtype='object')
for worker_index in range(n_workers):
thread = Process(
target=worker_wrapper,
args=(func,
worker_index,
queue,
(items_for_workers[worker_index],) + func_args,
func_kwargs))
threads.append(thread)
for thread in threads:
thread.start()
# n_workers elements should be retrieved from the queue that is the
# data returned by each worker. The data of each worker is inserted
# into the array that will be returned in the same order of "items"
workers_data_retrieved = 0
while workers_data_retrieved != n_workers:
worker_index, worker_data = queue.get()
for entry_index, entry in enumerate(worker_data):
item_ind = items_inds_workers[worker_index][entry_index]
retval[item_ind] = entry
workers_data_retrieved += 1
for thread in threads:
thread.join()
return retval
if __name__ == '__main__':
import time
def my_func(items):
retval = []
for item in items:
# some expensive operation
sum = 0
for i in xrange(10000000):
sum += item
retval.append(sum)
return retval
t0 = time.time()
ret_data = pmap(my_func, numpy.arange(8), n_workers=8)
print(time.time() - t0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment