Skip to content

Instantly share code, notes, and snippets.

@M-Anwar
Created November 8, 2020 18:18
Show Gist options
  • Save M-Anwar/502ef6f629b7abfbdc4cb1e328bb122c to your computer and use it in GitHub Desktop.
Save M-Anwar/502ef6f629b7abfbdc4cb1e328bb122c to your computer and use it in GitHub Desktop.
A quick way to distribute a python task with differing parameters across a thread pool.
import pandas as pd
import concurrent.futures
from tqdm import tqdm
def threadPoolCall(func, task_args, max_workers = 8, raise_exception=True, log_errors=True, show_progress=True, description=None):
""" Distributes out a task between a pool of worker threads and collects
the results. The task is a python function, and the arguments to the function
are specified in an array of tuple arguments, each element representing a
task.
Args:
func -- [Python.function] The function reference for the task
task_args -- [Array[Tuple]] The arguments to pass to the function
max_workers -- [int] The number of worker pool threads to use. Default 8.
raise_exception -- [bool] Whether to throw an error on exception. Default true.
log_errors -- [bool] Whether to log errors, works in tandem with raise_exception
to control how errors are handled. Default true
show_progress - [bool] Whether to show a tqdm progress bar for the tasks. Default true.
description - [string] The description for the progress bar. Default function name.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
if show_progress:
pbar = tqdm(total=len(task_args))
if description:
pbar.set_description(description)
else:
pbar.set_description(func.__name__)
futures = {
executor.submit(
func,
*(args,) if type(args)!=tuple else args
):args
for args in task_args
}
results = []
for future in concurrent.futures.as_completed(futures):
if show_progress: pbar.update(1)
try:
results.append(future.result())
except Exception as exc:
if log_errors:
print("[ERROR] Input to {} -> {} : {}".format(func.__name__, futures[future], exc))
if raise_exception:
if show_progress: pbar.close()
raise exc
if show_progress: pbar.close()
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment