Python 3 concurrent.futures module
#!/usr/bin/env python3 | |
# | |
# some examples for the useful concurrent.futures module in Python 3 | |
# | |
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed | |
# | |
# some dummy function to have a CPU-bound workload | |
# | |
import hashlib | |
hash_rounds = 10000 | |
def do_something(rounds, init_val = "init", job_id = None): | |
hashval = bytes(init_val, 'utf-8') | |
for i in range(rounds): | |
hashval = hashlib.sha512(hashval).digest() | |
return job_id, hashval.hex() | |
# | |
# using submit() and wait() to enqueue multiprocess tasks | |
# and receive them all at the same time | |
# | |
def main_proc_wait(): | |
pool = ProcessPoolExecutor() | |
futures = [] | |
for i in range(100): | |
f = pool.submit(do_something, hash_rounds, f"string_{i}", i) | |
futures.append(f) | |
done, still_running = wait(futures) | |
# wait() uses the default param return_when=ALL_COMPLETED | |
# and blocks until all futures are done | |
assert not still_running | |
for f in done: | |
result = f.result() | |
job_id, hashval = result | |
print(f"{job_id} - {hashval}") | |
pool.shutdown() | |
# | |
# using submit() and as_completed() to enqueue multiprocess tasks | |
# and receive them one by one as they finish | |
# | |
def main_proc_as_completed(): | |
pool = ProcessPoolExecutor() | |
futures = [] | |
for i in range(100): | |
f = pool.submit(do_something, hash_rounds, f"string_{i}", i) | |
futures.append(f) | |
for f in as_completed(futures): | |
result = f.result() | |
job_id, hashval = result | |
print(f"{job_id} - {hashval}") | |
pool.shutdown() | |
# | |
# using map() as an even higher-level interface | |
# | |
def main_proc_map(): | |
pool = ProcessPoolExecutor() | |
prepared_string_args = [] | |
for i in range(100): | |
prepared_string_args.append(f"string_{i}") | |
results = pool.map(do_something, | |
[hash_rounds]*100, prepared_string_args, range(100)) | |
for job_id, hashval in results: | |
print(f"{job_id} - {hashval}") | |
pool.shutdown() | |
# | |
# all combinations work with threads as well, using the ThreadPoolExecutor. | |
# but due to the GIL they are only useful for I/O bound jobs | |
# and this example will not be efficient. | |
# | |
def main_threads_as_completed(): | |
pool = ThreadPoolExecutor() | |
futures = [] | |
for i in range(100): | |
f = pool.submit(do_something, hash_rounds, f"string_{i}", i) | |
futures.append(f) | |
for f in as_completed(futures): | |
result = f.result() | |
job_id, hashval = result | |
print(f"{job_id} - {hashval}") | |
pool.shutdown() | |
if __name__ == '__main__': | |
main_proc_as_completed() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment