Last active
March 26, 2019 20:42
-
-
Save SkamDart/8d7f102dab5cb8116f34f67536e22309 to your computer and use it in GitHub Desktop.
ThreadPoolExecutor message passing example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Small example of message passing using concurrent.futures.ThreadPoolExecutor""" | |
import logging | |
from concurrent.futures import ThreadPoolExecutor | |
from queue import ( | |
Empty as ThreadSafeQueueEmpty, | |
Queue as ThreadSafeQueue | |
) | |
from time import sleep | |
logging.basicConfig(level=logging.INFO) | |
LOGGER = logging.getLogger() | |
def bamboozle(duration): | |
"""Sleeps for a given duration.""" | |
LOGGER.info('{} bamboozled'.format(duration)) | |
sleep(duration) | |
return duration | |
def wrapper(results_queue): | |
"""Wraps the callback so we can inject a single threadsafe queue to each thread.""" | |
def done(func): | |
"""Handles callback for a concurrent.futures.Future""" | |
if func.cancelled(): | |
LOGGER.info('{} canceled'.format(func.arg)) | |
elif func.done(): | |
error = func.exception() | |
if error: | |
LOGGER.info('{} error returned {}'.format(func.arg, error)) | |
else: | |
results_queue.put(func.result()) | |
return done | |
def main(): | |
batch_size = 10 | |
max_workers = 5 | |
results = [] | |
LOGGER.info('starting threadpool') | |
pool = ThreadPoolExecutor(max_workers) | |
results_queue = ThreadSafeQueue() | |
callback = wrapper(results_queue) | |
for i in reversed(range(batch_size)): | |
future = pool.submit(bamboozle, i) | |
future.add_done_callback(callback) | |
while len(results) < batch_size: | |
try: | |
results.append(results_queue.get()) | |
except ThreadSafeQueueEmpty: | |
pass | |
pool.shutdown() # blocks by default | |
result_set = set(results) | |
for i in range(batch_size): | |
assert i in result_set | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment