Skip to content

Instantly share code, notes, and snippets.

@brookisme
Last active May 8, 2021 04:37
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brookisme/ac8e4de4f5f48f3e08388171c4ebf9ce to your computer and use it in GitHub Desktop.
Save brookisme/ac8e4de4f5f48f3e08388171c4ebf9ce to your computer and use it in GitHub Desktop.
Python Multiprocessing Helpers: Map with Pool, ThreadPool and more
import itertools
from multiprocessing import Process, cpu_count
from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
#
# CONFIG
#
MAX_POOL_PROCESSES=cpu_count()-1
MAX_THREADPOOL_PROCESSES=16
#
# METHODS
#
""" MAP METHODS
Args:
* map_function <function>:
a function to map over args list. the function should take a single argument.
if multiple arguments are needed accept them as a single list or tuple
* args_list <list>: the list of arguments to map over
* max_process <int>: number of processes
- for max_with_pool defaults to the number of cpus minus 1
- for max_with_threadpool defaults to 16
- map_sequential ignores this argument as its doesn't actually do
any multiprocesssing
Return:
List of return values from map_function
Notes:
map_sequential does NOT multiprocess. it can be used as a sequential drop-in
replacement for map_with_pool/threadpool. this is useful for:
- development
- debugging
- benchmarking
"""
def map_with_pool(map_function,args_list,max_processes=MAX_POOL_PROCESSES):
pool=Pool(processes=min(len(args_list),max_processes))
return _run_pool(pool,map_function,args_list)
def map_with_threadpool(map_function,args_list,max_processes=MAX_THREADPOOL_PROCESSES):
pool=ThreadPool(processes=min(len(args_list),max_processes))
return _run_pool(pool,map_function,args_list)
def map_sequential(map_function,args_list,print_args=False,noisy=False,**dummy_kwargs):
if noisy:
print('multiprocessing(test):')
out=[]
for i,args in enumerate(args_list):
if noisy:
print('\t{}...'.format(i))
if print_args:
print('\t{}'.format(args))
out.append(map_function(args))
if noisy:
print('-'*25)
return out
""" simple: vanilla multiprocessing
Args:
* function <function>: function. function can take multiple arguments
* args_list <list>: the list of argument lists
* join <bool[True]>: join processes before return
Return:
List of processes
"""
def simple(function,args_list,join=True):
procs=[]
for args in args_list:
proc=Process(
target=function,
args=args)
procs.append(proc)
proc.start()
if join:
for proc in procs:
proc.join()
return procs
#
# INTERNAL METHODS
#
def _stop_pool(pool,success=True):
pool.close()
pool.join()
return success
def _map_async(pool,map_func,objects):
try:
return pool.map_async(map_func,objects)
except KeyboardInterrupt:
print("Caught KeyboardInterrupt, terminating workers")
pool.terminate()
return False
else:
print("Failure")
return _stop_pool(pool,False)
def _run_pool(pool,map_function,args_list):
out=_map_async(pool,map_function,args_list)
_stop_pool(pool)
return out.get()
@brookisme
Copy link
Author

brookisme commented Oct 4, 2018

EXAMPLE CODE:

from time import sleep
import multiprocess as mproc
# map functions
args_list=range(16)

def func(a):
    sleep(1)
    return a + 1000

print('\n'*2,'-'*50,'\n map_with_pool:')
%time p=mproc.map_with_pool(func,args_list)
print('\n'*2,'-'*50,'\n map_with_threadpool:')
%time tp=mproc.map_with_threadpool(func,args_list)
print('\n'*2,'-'*50,'\n map_sequential:')
%time sq=mproc.map_sequential(func,args_list)

# simple function
# - args_list should be a list of iterables
args_list=[(i,) for i in args_list]

def simple_func(a):
    sleep(1)
    return a + 1000

print('\n'*2,'-'*50,'\n simple:')
%time s=mproc.simple(simple_func,args_list)

# all give the same results
print('\n'*2,'-'*50,'\n results:')
print(' map functions give the same results:',p==tp==sq,p[:3],'...')
print(' simple returns a list of processes rather than a list of outputs from completed processes:')
print(s[:3],'...')

OUTPUT:

 -------------------------------------------------- 
 map_with_pool:
CPU times: user 12.9 ms, sys: 19.9 ms, total: 32.8 ms
Wall time: 3.13 s


 -------------------------------------------------- 
 map_with_threadpool:
CPU times: user 6.31 ms, sys: 4.01 ms, total: 10.3 ms
Wall time: 1.04 s


 -------------------------------------------------- 
 map_sequential:
CPU times: user 2.32 ms, sys: 860 µs, total: 3.18 ms
Wall time: 16.1 s


 -------------------------------------------------- 
 simple:
CPU times: user 18.3 ms, sys: 36.3 ms, total: 54.6 ms
Wall time: 1.06 s


 -------------------------------------------------- 
 results:
 map functions give the same results: True [1000, 1001, 1002] ...
 simple returns a list of processes rather than a list of outputs from completed processes:
[<Process(Process-129, stopped)>, <Process(Process-130, stopped)>, <Process(Process-131, stopped)>] ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment