Skip to content

Instantly share code, notes, and snippets.

@magnetikonline
Last active March 18, 2024 01:27
Show Gist options
  • Save magnetikonline/a26ae80e2e23fcfda5b03ccb470f79e6 to your computer and use it in GitHub Desktop.
Save magnetikonline/a26ae80e2e23fcfda5b03ccb470f79e6 to your computer and use it in GitHub Desktop.
Python threaded workers using ThreadPoolExecutor().

Python threaded workers using ThreadPoolExecutor()

A pattern for using concurrent.futures.ThreadPoolExecutor() to execute a series of "worker" functions in parallel, processing a queue of assigned "work".

How it works:

  • worker_init() creates:
    • Queues for "work" and "results".
    • A threading.Event() to denote "no more work in queue".
    • A futures.ThreadPoolExecutor().
    • At this point, the pool of worker() functions are ready to accept work tasks from the work queue.
  • A set of dummy tasks are added onto the work queue. Once all tasks added, finished.set() denotes "no more tasks".
  • The worker() functions in the pool will work.pop() tasks off the queue, process the task and push a faux result onto the result queue.
  • A call to pool.shutdown() will pause until all workers have finished up.
  • Finally, dummy results are displayed from the worker pool.
import threading
import time
import random
from collections import deque
from concurrent import futures
WORKER_COUNT = 4
def worker_init():
work = deque()
result = deque()
finished = threading.Event()
pool = futures.ThreadPoolExecutor(WORKER_COUNT)
# add workers into pool
for _ in range(WORKER_COUNT):
pool.submit(worker, work, result, finished)
return (pool, work, result, finished)
def worker(work, result, finished):
while True:
task = None
try:
task = work.pop()
except IndexError:
pass
if task is None:
if finished.is_set():
# no more tasks
break
# pause, then re-check work queue
time.sleep(1)
continue
# dummy processing of task and store result
print(f"processing task: {task}")
time.sleep(random.randrange(1, 2))
result.append(random.randrange(1, 50))
# worker finished
def main():
# setup workers in a thread pool and work/result queues
pool, work, result, finished = worker_init()
# populate queue with example "work"
for task_number in range(50):
work.appendleft(task_number)
# mark queue as finished/no more "work"
finished.set()
# await for workers to finish up
pool.shutdown()
print(result)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment