Skip to content

Instantly share code, notes, and snippets.

@SkamDart
Last active March 26, 2019 20:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SkamDart/8d7f102dab5cb8116f34f67536e22309 to your computer and use it in GitHub Desktop.
Save SkamDart/8d7f102dab5cb8116f34f67536e22309 to your computer and use it in GitHub Desktop.
ThreadPoolExecutor message passing example
"""Small example of message passing using concurrent.futures.ThreadPoolExecutor"""
import logging
from concurrent.futures import ThreadPoolExecutor
from queue import (
Empty as ThreadSafeQueueEmpty,
Queue as ThreadSafeQueue
)
from time import sleep
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()
def bamboozle(duration):
"""Sleeps for a given duration."""
LOGGER.info('{} bamboozled'.format(duration))
sleep(duration)
return duration
def wrapper(results_queue):
"""Wraps the callback so we can inject a single threadsafe queue to each thread."""
def done(func):
"""Handles callback for a concurrent.futures.Future"""
if func.cancelled():
LOGGER.info('{} canceled'.format(func.arg))
elif func.done():
error = func.exception()
if error:
LOGGER.info('{} error returned {}'.format(func.arg, error))
else:
results_queue.put(func.result())
return done
def main():
batch_size = 10
max_workers = 5
results = []
LOGGER.info('starting threadpool')
pool = ThreadPoolExecutor(max_workers)
results_queue = ThreadSafeQueue()
callback = wrapper(results_queue)
for i in reversed(range(batch_size)):
future = pool.submit(bamboozle, i)
future.add_done_callback(callback)
while len(results) < batch_size:
try:
results.append(results_queue.get())
except ThreadSafeQueueEmpty:
pass
pool.shutdown() # blocks by default
result_set = set(results)
for i in range(batch_size):
assert i in result_set
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment