Skip to content

Instantly share code, notes, and snippets.

@Marinell0
Last active June 17, 2024 13:43
Show Gist options
  • Save Marinell0/2f549eefa15a157863aac52ec91e18c6 to your computer and use it in GitHub Desktop.
Save Marinell0/2f549eefa15a157863aac52ec91e18c6 to your computer and use it in GitHub Desktop.
Parallelize any python function
from concurrent.futures import ThreadPoolExecutor, Future
from collections.abc import Callable, Iterable, Sized
from typing import TypeVar
import os
from tqdm import tqdm
T = TypeVar('T')
R = TypeVar('R')
def parallelize(func: Callable[[T], R], iterable: Iterable[T], max_workers: int | None, *args) -> list[R]:
"""
Parallelize a function that receives an item from an iterable and returns a result.
Args:
func: Function to be parallelized
iterable: Iterable to be used as input for the function
max_workers: Number of threads to be used
args: Additional arguments to be passed to the function
Return:
List of results with the same type as the function return
"""
futures: list[Future[R]] = []
# If iterable has a length, use tqdm parameter to show progress
if isinstance(iterable, Sized):
pbar = tqdm(total=len(iterable))
else:
pbar = tqdm()
cpu_count: int = max_workers if max_workers is not None else len(os.sched_getaffinity(0))
with ThreadPoolExecutor(max_workers=cpu_count) as executor:
for item in iterable:
future = executor.submit(func, item, *args)
future.add_done_callback(lambda p: pbar.update(1))
futures.append(future)
return [future.result() for future in futures]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment