Skip to content

Instantly share code, notes, and snippets.

@mc706
Created September 19, 2018 15:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mc706/73f390d0c21e450aa742dab4b250e1f2 to your computer and use it in GitHub Desktop.
Save mc706/73f390d0c21e450aa742dab4b250e1f2 to your computer and use it in GitHub Desktop.
Parallelism.py
from queue import Queue
from threading import Thread
from typing import Iterable, Callable, Optional
def parallel_process(work: Iterable, task_func: Callable, num_threads: int, callback: Optional[Callable] = None) -> None:
"""
Processes work in parallel using a queue-worker pattern.
Breaks up work into `num_threads` workers, which will call `task_func` for each item in work.
:param work: Iterable List of unpackable arguments to pass to task func for each unit of work
:param task_func: Callable Function to do a unit of work, when passed a row from work
:param num_threads: int Number of threads to distribute the work over
:param callback: Optional[Callable] optional function to the result of task_func to when a unit of work is complete.
:return:
"""
def pop_task(queue: Queue, cb: Optional[Callable] = None) -> None:
"""
Worker Wrapper.
Gets from queue, calls the task_func, and stores the result in the accumulator.
Exits when passed `None` as a task.
"""
while True:
task = queue.get()
if task is None:
break
try:
result = task_func(*task)
if cb:
cb(result)
except Exception as ex:
print(ex)
queue.task_done()
q = Queue(maxsize=0)
# Start Worker Threads
for _ in range(num_threads):
try:
worker = Thread(target=pop_task, args=(q, callback))
worker.setDaemon(True)
worker.start()
except Exception as ex:
print(ex)
# Put Work Into Queue
for task in work:
q.put(task)
# block until work is complete
q.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment