Instantly share code, notes, and snippets.

Embed
What would you like to do?
Python 3 concurrent.futures module
#!/usr/bin/env python3
#
# some examples for the useful concurrent.futures module in Python 3
#
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed
#
# some dummy function to have a CPU-bound workload
#
import hashlib
hash_rounds = 10000
def do_something(rounds, init_val = "init", job_id = None):
hashval = bytes(init_val, 'utf-8')
for i in range(rounds):
hashval = hashlib.sha512(hashval).digest()
return job_id, hashval.hex()
#
# using submit() and wait() to enqueue multiprocess tasks
# and receive them all at the same time
#
def main_proc_wait():
pool = ProcessPoolExecutor()
futures = []
for i in range(100):
f = pool.submit(do_something, hash_rounds, f"string_{i}", i)
futures.append(f)
done, still_running = wait(futures)
# wait() uses the default param return_when=ALL_COMPLETED
# and blocks until all futures are done
assert not still_running
for f in done:
result = f.result()
job_id, hashval = result
print(f"{job_id} - {hashval}")
pool.shutdown()
#
# using submit() and as_completed() to enqueue multiprocess tasks
# and receive them one by one as they finish
#
def main_proc_as_completed():
pool = ProcessPoolExecutor()
futures = []
for i in range(100):
f = pool.submit(do_something, hash_rounds, f"string_{i}", i)
futures.append(f)
for f in as_completed(futures):
result = f.result()
job_id, hashval = result
print(f"{job_id} - {hashval}")
pool.shutdown()
#
# using map() as an even higher-level interface
#
def main_proc_map():
pool = ProcessPoolExecutor()
prepared_string_args = []
for i in range(100):
prepared_string_args.append(f"string_{i}")
results = pool.map(do_something,
[hash_rounds]*100, prepared_string_args, range(100))
for job_id, hashval in results:
print(f"{job_id} - {hashval}")
pool.shutdown()
#
# all combinations work with threads as well, using the ThreadPoolExecutor.
# but due to the GIL they are only useful for I/O bound jobs
# and this example will not be efficient.
#
def main_threads_as_completed():
pool = ThreadPoolExecutor()
futures = []
for i in range(100):
f = pool.submit(do_something, hash_rounds, f"string_{i}", i)
futures.append(f)
for f in as_completed(futures):
result = f.result()
job_id, hashval = result
print(f"{job_id} - {hashval}")
pool.shutdown()
if __name__ == '__main__':
main_proc_as_completed()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment