Skip to content

Instantly share code, notes, and snippets.

@blaylockbk
Last active May 20, 2024 16:39
Show Gist options
  • Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Template for Python multiprocessing and multithreading
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import numpy as np
def my_multipro(items, func, max_cpus=12):
"""Do an embarrassingly parallel task using multiprocessing.
Use this for CPU bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more CPUs than you have or more than there are items to process
cpus = np.min([max_cpus, multiprocessing.cpu_count(), len(items)])
print(f"Using {cpus} cpus to process {len(items)} chunks.")
print("".join(["🔳" for i in items]))
with multiprocessing.Pool(cpus) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def my_multithread(items, func, max_threads=12):
"""Do an embarrassingly parallel task using multithreading.
Use this for IO bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more threads than there are items to process
threads = np.min([max_threads, len(items)])
print(f"Using {threads} threads to process {len(items)} items.")
print("".join(["🔳" for i in items]))
with ThreadPool(threads) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def do_this(i):
"""This function will be applied to every item given."""
r = np.mean(i)
print("✅", end="")
return r
items = [
[1, 2, 3, 4],
[2, 3, 4, 5],
[3, 4, 5, 6],
[4, 5, 6, 7],
[5, 6, 7, 8],
[6, 7, 8, 9],
]
a = my_multipro(items, do_this)
b = my_multithread(items, do_this)
@blaylockbk
Copy link
Author

blaylockbk commented Oct 30, 2020

Multipro Helper

This function helps sends a list of jobs to either basic multiprocessing or multithreading, or sequential list comprehension.

import numpy as np
import inspect
import matplotlib.pyplot as plt
from multiprocessing import Pool, cpu_count          # Multiprocessing
from multiprocessing.dummy import Pool as ThreadPool # Multithreading

def multipro_helper(func, inputs, cpus=6, threads=None, verbose=True):
    """
    Multiprocessing and multithreading helper.
    
    Parameters
    ----------
    func : function
        A function you want to apply to each item in ``inputs``.
        If your function has many inputs, its useful to call a helper 
        function that unpacks the arguments for each input.
    inputs : list
        A list of input for the function being called.
    cpus : int or None
        Number of CPUs to use. Will not exceed maximum number available
        and will not exceed the length of ``inputs``.
        If None, will try to use multithreading.
    threads : int or None
        Number of threads to use. Will not exceed 50 and will not exceed
        the length of ``inputs``.
        If None, will try to do each task sequentially as a list 
        comprehension.
    """
    assert callable(func), f"👻 {func} must be a callable function."
    assert isinstance(inputs, list), f"👻 inputs must be a list."             
    
    timer = datetime.now()
    
    if threads is not None:
        assert isinstance(threads, np.integer), f"👻 threads must be a int. You gave {type(threads)}"
        threads = np.minimum(threads, 50) # Don't allow more than 50 threads.
        threads = np.minimum(threads, len(inputs))
        if verbose: print(f'🧵 Multithreading {func} with [{threads:,}] threads for [{len(inputs):,}] items.', end=' ')
        with ThreadPool(threads) as p:
            results = p.map(func, inputs)
            p.close()
            p.join()
            
    elif cpus is not None:
        assert isinstance(cpus, np.integer), f"👻 cpus must be a int. You gave {type(cpus)}"
        cpus = np.minimum(cpus, cpu_count())
        cpus = np.minimum(cpus, len(inputs))
        if verbose: print(f'🤹🏻‍♂️ Multiprocessing {func} with [{cpus:,}] CPUs for [{len(inputs):,}] items.', end=' ')
        with Pool(cpus) as p:
            results = p.map(func, inputs)
            p.close()
            p.join()
    else:
        if verbose: print(f'📏 Sequentially do {func} for [{len(inputs):,}] items.', end=' ')
        results = [func(i) for i in inputs]
    
    if verbose: print(f"Timer={datetime.now()-timer}")
    
    return results

And this will make a plot of performance for a number of different pool sizes. It can help you see where you're getting diminishing returns for a larger number of Pools, and help you see if your problem is a CPU bound or IO bound process. Use multiprocessing for CPU-bound process and multithreading for IO-bound process.

def plot_multipro_effeciency(func, *args, pools=range(1,15), **kwargs):
    """
    Display a figure showing the multiprocessing/multithreadding 
    efficiency for a range of Pool sizes.
    
    func : function
        A function that has keyword arguments for `cpus` and `threads`.
    *args, *kwargs : 
        Arguments and keyword arguments for teh function.
    pools : list of int
        List of number of Pools to start for multiprocessing/multithreading.
    """
    plt.rcParams['hatch.linewidth'] = 8
    
    pools = np.sort(pools)
    pools = set(pools)
    if 0 in pools:
        pools.remove(0)
    
    assert 'cpus' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `cpus` argument."
    assert 'threads' in inspect.getfullargspec(func).args, "👺 The function {func.__name__} does not have a `threads` argument."
    
    _ = kwargs.pop('cpus', None)
    _ = kwargs.pop('threads', None)
    
    multipro = []
    for i in pools:
        timer = datetime.now()
        _ = func(*args, **kwargs, cpus=i)
        timer = datetime.now()-timer
        multipro.append(timer)

    multithread = []
    for i in pools:
        timer = datetime.now()
        _ = func(*args, **kwargs, threads=i)
        timer = datetime.now()-timer
        multithread.append(timer)

    timer = datetime.now()
    _ = func(*args, **kwargs, cpus=None, threads=None)
    timer = datetime.now()-timer
    sequential = timer
    
    # Plot completion time in seconds for each number in pool
    # =======================================================
    plt.bar(list(pools), [i.total_seconds() for i in multipro],
        label='Multiprocessing', color='.1',)
    
    plt.bar(list(pools), [i.total_seconds() for i in multithread],
            label='Multithreading', hatch='/', edgecolor='tab:blue',
            alpha=.33, color='tab:blue')
    plt.bar(list(pools), [i.total_seconds() for i in multithread],
            edgecolor='w', color='none')
    
    plt.axhline(sequential.total_seconds(), ls='--', color='k',
                label='Sequential')
    
    # Cosmetics
    plt.legend()
    plt.ylabel('Seconds')
    plt.xlabel('Number in Pool')
    plt.title(f"{func.__module__}.{func.__name__}")
    plt.xticks(list(pools))
    
    return multipro, multithread, sequential

For example, in this example of reading a file and processing the data in it, this is a CPU-bound process and should use multiprocessing to spead it up, but you don't get much speed up if you use more than 5 or 6 CPUs.
image

@urwa
Copy link

urwa commented Jun 2, 2022

Is there a way to get max number of threads in multithreading that a "system" can support like cpu_count for multiprocessing.

from multiprocessing.dummy import Pool as ThreadPool
threads = 4

with ThreadPool(threads) as p:
    results = p.map(print, list(range(10)))
    p.close()
    p.join()

So instead of 4 I can let the script determine it. I do not know how the underlying system works here so the question might be naive.

@blaylockbk
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment