Skip to content

Instantly share code, notes, and snippets.

@blaylockbk
Last active May 20, 2024 16:39
Show Gist options
  • Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Template for Python multiprocessing and multithreading
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import numpy as np
def my_multipro(items, func, max_cpus=12):
"""Do an embarrassingly parallel task using multiprocessing.
Use this for CPU bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more CPUs than you have or more than there are items to process
cpus = np.min([max_cpus, multiprocessing.cpu_count(), len(items)])
print(f"Using {cpus} cpus to process {len(items)} chunks.")
print("".join(["🔳" for i in items]))
with multiprocessing.Pool(cpus) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def my_multithread(items, func, max_threads=12):
"""Do an embarrassingly parallel task using multithreading.
Use this for IO bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more threads than there are items to process
threads = np.min([max_threads, len(items)])
print(f"Using {threads} threads to process {len(items)} items.")
print("".join(["🔳" for i in items]))
with ThreadPool(threads) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def do_this(i):
"""This function will be applied to every item given."""
r = np.mean(i)
print("✅", end="")
return r
items = [
[1, 2, 3, 4],
[2, 3, 4, 5],
[3, 4, 5, 6],
[4, 5, 6, 7],
[5, 6, 7, 8],
[6, 7, 8, 9],
]
a = my_multipro(items, do_this)
b = my_multithread(items, do_this)
@blaylockbk
Copy link
Author

blaylockbk commented Jan 30, 2019

To use multiprocessing or multithreading depends on the task to be performed.

Multiprocessing

Use this for CPU bound tasks.

Multithreading

Use this for IO bound tasks. This is very similar to the Multiprocessing API. Read more about at these two sources:

  1. https://chriskiehl.com/article/parallelism-in-one-line
  2. https://stackoverflow.com/questions/2846653/how-can-i-use-threading-in-python



Alternative method adapted from Mike Wessler

from multiprocessing import cpu_count, get_context, Pool

cores = cpu_count() - 1
spawntype = 'fork'
chunksize = 1

with get_context(spawntype).Pool(cores) as p:
    results =  p.map(my_function, my_iterable, chunksize=chunksize)
    p.close()
    p.join()

close() prevents future jobs being submitted to the pool, join waits for all workers in pool to complete and exit before allowing the script to move forward, once it exits with, garbage collector cleans up processes. Chunksize is the number of iterables that are submitted to each process, chunksize = 1 ensures the process is killed and a new one started before doing the next job, but be warned this increases i/o overhead

@blaylockbk
Copy link
Author

blaylockbk commented Oct 30, 2020

Multipro Helper

This function helps sends a list of jobs to either basic multiprocessing or multithreading, or sequential list comprehension.

import numpy as np
import inspect
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count          # Multiprocessing
from multiprocessing.dummy import Pool as ThreadPool # Multithreading

def multipro_helper(func, inputs, cpus=6, threads=None, verbose=True):
    """
    Multiprocessing and multithreading helper.
    
    Parameters
    ----------
    func : function
        A function you want to apply to each item in ``inputs``.
        If your function has many inputs, its useful to call a helper 
        function that unpacks the arguments for each input.
    inputs : list
        A list of input for the function being called.
    cpus : int or None
        Number of CPUs to use. Will not exceed maximum number available
        and will not exceed the length of ``inputs``.
        If None, will try to use multithreading.
    threads : int or None
        Number of threads to use. Will not exceed 50 and will not exceed
        the length of ``inputs``.
        If None, will try to do each task sequentially as a list 
        comprehension.
    """
    assert callable(func), f"👻 {func} must be a callable function."
    assert isinstance(inputs, list), f"👻 inputs must be a list."             
    
    timer = datetime.now()
    
    if threads is not None:
        assert isinstance(threads, np.integer), f"👻 threads must be a int. You gave {type(threads)}"
        threads = np.minimum(threads, 50) # Don't allow more than 50 threads.
        threads = np.minimum(threads, len(inputs))
        if verbose: print(f'🧵 Multithreading {func} with [{threads:,}] threads for [{len(inputs):,}] items.', end=' ')
        with ThreadPool(threads) as p:
            results = p.map(func, inputs)
            p.close()
            p.join()
            
    elif cpus is not None:
        assert isinstance(cpus, np.integer), f"👻 cpus must be a int. You gave {type(cpus)}"
        cpus = np.minimum(cpus, cpu_count())
        cpus = np.minimum(cpus, len(inputs))
        if verbose: print(f'🤹🏻‍♂️ Multiprocessing {func} with [{cpus:,}] CPUs for [{len(inputs):,}] items.', end=' ')
        with Pool(cpus) as p:
            results = p.map(func, inputs)
            p.close()
            p.join()
    else:
        if verbose: print(f'📏 Sequentially do {func} for [{len(inputs):,}] items.', end=' ')
        results = [func(i) for i in inputs]
    
    if verbose: print(f"Timer={datetime.now()-timer}")
    
    return results

And this will make a plot of performance for a number of different pool sizes. It can help you see where you're getting diminishing returns for a larger number of Pools, and help you see if your problem is a CPU bound or IO bound process. Use multiprocessing for CPU-bound process and multithreading for IO-bound process.

def plot_multipro_effeciency(func, *args, pools=range(1,15), **kwargs):
    """
    Display a figure showing the multiprocessing/multithreadding 
    efficiency for a range of Pool sizes.
    
    func : function
        A function that has keyword arguments for `cpus` and `threads`.
    *args, *kwargs : 
        Arguments and keyword arguments for teh function.
    pools : list of int
        List of number of Pools to start for multiprocessing/multithreading.
    """
    plt.rcParams['hatch.linewidth'] = 8
    
    pools = np.sort(pools)
    pools = set(pools)
    if 0 in pools:
        pools.remove(0)
    
    assert 'cpus' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `cpus` argument."
    assert 'threads' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `threads` argument."
    
    _ = kwargs.pop('cpus', None)
    _ = kwargs.pop('threads', None)
    
    multipro = []
    for i in pools:
        timer = datetime.now()
        _ = func(*args, **kwargs, cpus=i)
        timer = datetime.now()-timer
        multipro.append(timer)

    multithread = []
    for i in pools:
        timer = datetime.now()
        _ = func(*args, **kwargs, threads=i)
        timer = datetime.now()-timer
        multithread.append(timer)

    timer = datetime.now()
    _ = func(*args, **kwargs, cpus=None, threads=None)
    timer = datetime.now()-timer
    sequential = timer
    
    # Plot completion time in seconds for each number in pool
    # =======================================================
    plt.bar(list(pools), [i.total_seconds() for i in multipro],
        label='Multiprocessing', color='.1',)
    
    plt.bar(list(pools), [i.total_seconds() for i in multithread],
            label='Multithreading', hatch='/', edgecolor='tab:blue',
            alpha=.33, color='tab:blue')
    plt.bar(list(pools), [i.total_seconds() for i in multithread],
            edgecolor='w', color='none')
    
    plt.axhline(sequential.total_seconds(), ls='--', color='k',
                label='Sequential')
    
    # Cosmetics
    plt.legend()
    plt.ylabel('Seconds')
    plt.xlabel('Number in Pool')
    plt.title(f"{func.__module__}.{func.__name__}")
    plt.xticks(list(pools))
    
    return multipro, multithread, sequential

For example, in this example of reading a file and processing the data in it, this is a CPU-bound process and should use multiprocessing to spead it up, but you don't get much speed up if you use more than 5 or 6 CPUs.
image

@urwa
Copy link

urwa commented Jun 2, 2022

Is there a way to get max number of threads in multithreading that a "system" can support like cpu_count for multiprocessing.

from multiprocessing.dummy import Pool as ThreadPool
threads = 4

with ThreadPool(threads) as p:
    results = p.map(print, list(range(10)))
    p.close()
    p.join()

So instead of 4 I can let the script determine it. I do not know how the underlying system works here so the question might be naive.

@blaylockbk
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment