Skip to content

Instantly share code, notes, and snippets.

@blaylockbk
Last active May 20, 2024 16:39
Show Gist options
  • Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Save blaylockbk/8b469f2c79660ebdd18915202e0802a6 to your computer and use it in GitHub Desktop.
Template for Python multiprocessing and multithreading
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
import numpy as np
def my_multipro(items, func, max_cpus=12):
"""Do an embarrassingly parallel task using multiprocessing.
Use this for CPU bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more CPUs than you have or more than there are items to process
cpus = np.min([max_cpus, multiprocessing.cpu_count(), len(items)])
print(f"Using {cpus} cpus to process {len(items)} chunks.")
print("".join(["🔳" for i in items]))
with multiprocessing.Pool(cpus) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def my_multithread(items, func, max_threads=12):
"""Do an embarrassingly parallel task using multithreading.
Use this for IO bound tasks.
Parameters
----------
items : list
Items to be acted on.
func : function
Function to apply to each item.
max_cpus : int
Limit the number of CPUs to use
"""
# Don't use more threads than there are items to process
threads = np.min([max_threads, len(items)])
print(f"Using {threads} threads to process {len(items)} items.")
print("".join(["🔳" for i in items]))
with ThreadPool(threads) as p:
results = p.map(func, items)
p.close()
p.join()
print("Finished!")
return results
def do_this(i):
"""This function will be applied to every item given."""
r = np.mean(i)
print("✅", end="")
return r
items = [
[1, 2, 3, 4],
[2, 3, 4, 5],
[3, 4, 5, 6],
[4, 5, 6, 7],
[5, 6, 7, 8],
[6, 7, 8, 9],
]
a = my_multipro(items, do_this)
b = my_multithread(items, do_this)
@urwa
Copy link

urwa commented Jun 2, 2022

Is there a way to get max number of threads in multithreading that a "system" can support like cpu_count for multiprocessing.

from multiprocessing.dummy import Pool as ThreadPool
threads = 4

with ThreadPool(threads) as p:
    results = p.map(print, list(range(10)))
    p.close()
    p.join()

So instead of 4 I can let the script determine it. I do not know how the underlying system works here so the question might be naive.

@blaylockbk
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment