Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
ThreadPoolExecutor message passing example
"""Small example of message passing using concurrent.futures.ThreadPoolExecutor"""
import logging
from concurrent.futures import ThreadPoolExecutor
from queue import (
Empty as ThreadSafeQueueEmpty,
Queue as ThreadSafeQueue
)
from time import sleep
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()
def bamboozle(duration):
"""Sleeps for a given duration."""
LOGGER.info('{} bamboozled'.format(duration))
sleep(duration)
return duration
def wrapper(results_queue):
"""Wraps the callback so we can inject a single threadsafe queue to each thread."""
def done(func):
"""Handles callback for a concurrent.futures.Future"""
if func.cancelled():
LOGGER.info('{} canceled'.format(func.arg))
elif func.done():
error = func.exception()
if error:
LOGGER.info('{} error returned {}'.format(func.arg, error))
else:
results_queue.put(func.result())
return done
def main():
batch_size = 10
max_workers = 5
results = []
LOGGER.info('starting threadpool')
pool = ThreadPoolExecutor(max_workers)
results_queue = ThreadSafeQueue()
callback = wrapper(results_queue)
for i in reversed(range(batch_size)):
future = pool.submit(bamboozle, i)
future.add_done_callback(callback)
while len(results) < batch_size:
try:
results.append(results_queue.get())
except ThreadSafeQueueEmpty:
pass
pool.shutdown() # blocks by default
result_set = set(results)
for i in range(batch_size):
assert i in result_set
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.