Last active
August 24, 2020 08:13
-
-
Save bespokoid/393fea7fb2b598836c7d51c095d36bce to your computer and use it in GitHub Desktop.
Sandbox to try and measure different flavors of Python concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Python concurrecy sandbox, inspired by Ned Batchelder's (@nedbat) articles on Python concurrency and raising the progressbar (links below) | |
# Exec workfn() for every tuple generated by argsfn() in concurrent fashion. By default spans 1M tasks using different mechanics, draw progressbar and time execution | |
# Current example trying to calculate mean of the Beast, to do so we collect fractions of 666, pow them and calculate average | |
# If you want less satanic example, just implement workfn and argsfn functions to your liking. | |
# I haven't much time for this yet, was merely currious with Ned Batchelder code from his articles "Doing a pile of work" and "Do a pile of work better" | |
# Turned out Ned's solution caries some overhead, but my code also left much to be desired. In other words there is no danger of sudden bugs or ideas shortage | |
# Original articles by Ned Batchelder | |
# https://nedbatchelder.com/blog/202008/do_a_pile_of_work.html | |
# https://nedbatchelder.com/blog/202008/do_a_pile_of_work_better.html | |
# BUGS: | |
# - cf.map for some reason wraps arguments in additional tuple. Implemented workfn_for_map wrapper as workaround | |
# IDEAS: | |
# - Detailed timing | |
# - Refactor every do_work* function as generator. That way we can also measure time until very first values returned. | |
# - Map for do_work functions and results instead of named variables | |
# - Different tasks (workfn/argsfn) | |
# - Green treads | |
# - Actor frameworks | |
# - Uvloop | |
# - Run in container for more meaningful numbers | |
# - Get smart abount tqdm and iterable parameter | |
# - Try generator for futures or some m.b. chunking. Futures creation takes considerable time in do_work_fc_as_comp | |
# - Single thread timing | |
# - Try not to collect returning values. How much difference would it make ? | |
# Settings: | |
LOG_LEVEL = "INFO" | |
NUM_OF_TASKS = 1_000_000 | |
NUM_OF_THREADS = 30 | |
# Handy log conf to display thread/process info | |
import sys | |
from loguru import logger as log | |
log.configure(handlers=[ | |
dict( | |
sink=sys.stderr, | |
colorize=True, | |
diagnose=True, | |
backtrace=True, | |
format= | |
"@ {time:HH:mm:ss.SSS} <b><ly>[{process.id}]{process.name}/[{thread.id}]{thread.name}</ly></b> <i><le>{name}:{function}</le></i> <lm>{file.path}:{line}</lm> \n{level.icon} <lvl>{message}</lvl> \n{exception}", | |
level=LOG_LEVEL) | |
]) | |
# Progressbar | |
from tqdm import tqdm | |
# from tqdm.asyncio import tqdm_asyncio | |
# Timers and pretty debug prints | |
from devtools import debug, pprint as pp | |
import concurrent.futures as cf | |
def argsfn(): | |
""" | |
Generator for arguments | |
""" | |
for i in range(NUM_OF_TASKS): | |
yield (i // 666, i % 666) | |
class ArgumentsIterator(): | |
""" | |
Arguments iterable. It _supposed_ to be slow wank to keep level with argsfn() | |
""" | |
def __init__(self, num_of_tasks=NUM_OF_TASKS): | |
self.total = num_of_tasks | |
self.range = iter(range(num_of_tasks)) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
i = next(self.range) | |
return (i // 666, i % 666) | |
def __len__(self): | |
return self.total | |
def workfn(*args): | |
""" | |
Function to perform main task | |
""" | |
x, y = args | |
res = pow(x, y) | |
log.debug("Workfn function done. Args were: {}. Result is: {}", args, res) | |
return (res) | |
def workfn_for_map(arg, *args): | |
""" | |
Wrap aroung workfn for cf.map variant. For some reason in case of .map arguments comes packed in additional tuple. | |
I don't have time to reasearch/fix it, hence wraper. | |
""" | |
return workfn(*arg) | |
def wait_first(futures): | |
""" | |
Wait for the first future to complete. | |
Returns: | |
(done, not_done): two sets of futures. | |
""" | |
return cf.wait(futures, return_when=cf.FIRST_COMPLETED) | |
def do_work(threads, argsfn, workfn): | |
""" | |
Do a pile of work, maybe in threads, with a progress bar. | |
Two callables are provided: `workfn` is the unit of work to be done, | |
many times. Its arguments are provided by calling `argsfn`, which | |
must produce a sequence of tuples. `argsfn` will be called a few | |
times, and must produce the same sequence each time. | |
Args: | |
threads: the number of threads to use. | |
argsfn: a callable that produces tuples, the arguments to `workfn`. | |
workfn: a callable that does work. | |
Returns: | |
Average integer returned by workfn | |
""" | |
result = [] | |
total = sum(1 for _ in argsfn()) | |
with tqdm(total=total, smoothing=0.02, desc="Do work.") as progressbar: | |
if threads: | |
limit = 2 * threads | |
not_done = set() | |
def finish_some(): | |
nonlocal not_done | |
done, not_done = wait_first(not_done) | |
for done_future in done: | |
res = done_future.result() | |
exc = done_future.exception() | |
if exc is not None: | |
log.error("Failed future:", exc_info=exc) | |
log.debug("Task completed. Result is : {} ", res) | |
result.append(res) | |
progressbar.update(len(done)) | |
with cf.ThreadPoolExecutor(max_workers=threads) as executor: | |
for args in argsfn(): | |
while len(not_done) >= limit: | |
finish_some() | |
not_done.add(executor.submit(workfn, *args)) | |
while not_done: | |
finish_some() | |
else: | |
for args in argsfn(): | |
workfn(*args) | |
progressbar.update(1) | |
return sum(result) // len(result) | |
def do_work_fc_as_comp(threads, argsi, workfn): | |
""" | |
Same as do_work, but taking iterable as second arg and using concurrent.futures.as_completed iterable | |
""" | |
result = [] | |
with cf.ThreadPoolExecutor(max_workers=threads) as executor: | |
log.debug("Creating futures") | |
futures = [executor.submit(workfn, *args) for args in argsi] | |
log.debug("Processing futures") | |
for f in tqdm(cf.as_completed(futures), | |
desc="As completed. Futures done"): | |
try: | |
#!TODO: Refactor for the love of Ishtar ! | |
res = f.result() | |
e = f.exception() | |
if e is not None: | |
# log.exception("Error processing task.", e) | |
raise e | |
except Exception as e: | |
log.exception("Error processing task.", e) | |
else: | |
log.debug("Task completed. Result is : {}", res) | |
result.append(res) | |
log.debug("Done") | |
return sum(result) // len(result) | |
def do_work_cf_map(threads, argsi, workfn): | |
""" | |
Same as do_work, but taking iterable as second arg and using concurrent.futures.map function | |
""" | |
with cf.ThreadPoolExecutor(max_workers=threads) as executor: | |
result = list(executor.map(workfn, argsi)) | |
return sum(result) // len(result) | |
if __name__ == "__main__": | |
# with debug.timer("Single thread version"): | |
# res_single_thread = do_work(0, argsfn, workfn) | |
with debug.timer("Ned's version"): | |
res = do_work(NUM_OF_THREADS, argsfn, workfn) | |
# We have to call list for generator to get len/limit | |
# Returned arguments iterable "wrapped" in progressbar representing creation of the futures | |
# Function implements other progressbar for execution | |
argsl = tqdm(list(argsfn()), desc="cf.as_completed. Args generator") | |
with debug.timer("cf.as_completed"): | |
res_cf_as_compl = do_work_fc_as_comp(NUM_OF_THREADS, argsl, workfn) | |
# ArgumentsIterator is container version of argsfn generator, hence __len__() | |
# Function decoupled from progressbar completely. It takes iterable of args wrapped in pbar | |
# Due to eager execution of map() this makes quite reasonable pbar | |
argsi = tqdm(ArgumentsIterator(), desc="cf.map") | |
with debug.timer("cf.map"): | |
res_cf_map = do_work_cf_map(NUM_OF_THREADS, argsi, workfn_for_map) | |
if res == res_cf_as_compl and res == res_cf_map: | |
# We better warn public | |
log.warning( | |
"We've calculated mean of the Beast ! Behold the unholy number !") | |
pp(res) | |
else: | |
log.error( | |
"Unable to figure out mean of the Beast, algos returned different results" | |
) | |
log.error("Res: {}\nRes_cf_as_compl: {}\nRes_cf_map: {}", res, | |
res_cf_as_compl, res_cf_map) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment