Skip to content

Instantly share code, notes, and snippets.

@bespokoid
Last active August 24, 2020 08:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bespokoid/393fea7fb2b598836c7d51c095d36bce to your computer and use it in GitHub Desktop.
Save bespokoid/393fea7fb2b598836c7d51c095d36bce to your computer and use it in GitHub Desktop.
Sandbox to try and measure different flavors of Python concurrency
# Python concurrecy sandbox, inspired by Ned Batchelder's (@nedbat) articles on Python concurrency and raising the progressbar (links below)
# Exec workfn() for every tuple generated by argsfn() in concurrent fashion. By default spans 1M tasks using different mechanics, draw progressbar and time execution
# Current example trying to calculate mean of the Beast, to do so we collect fractions of 666, pow them and calculate average
# If you want less satanic example, just implement workfn and argsfn functions to your liking.
# I haven't much time for this yet, was merely currious with Ned Batchelder code from his articles "Doing a pile of work" and "Do a pile of work better"
# Turned out Ned's solution caries some overhead, but my code also left much to be desired. In other words there is no danger of sudden bugs or ideas shortage
# Original articles by Ned Batchelder
# https://nedbatchelder.com/blog/202008/do_a_pile_of_work.html
# https://nedbatchelder.com/blog/202008/do_a_pile_of_work_better.html
# BUGS:
# - cf.map for some reason wraps arguments in additional tuple. Implemented workfn_for_map wrapper as workaround
# IDEAS:
# - Detailed timing
# - Refactor every do_work* function as generator. That way we can also measure time until very first values returned.
# - Map for do_work functions and results instead of named variables
# - Different tasks (workfn/argsfn)
# - Green treads
# - Actor frameworks
# - Uvloop
# - Run in container for more meaningful numbers
# - Get smart abount tqdm and iterable parameter
# - Try generator for futures or some m.b. chunking. Futures creation takes considerable time in do_work_fc_as_comp
# - Single thread timing
# - Try not to collect returning values. How much difference would it make ?
# Settings:
LOG_LEVEL = "INFO"
NUM_OF_TASKS = 1_000_000
NUM_OF_THREADS = 30
# Handy log conf to display thread/process info
import sys
from loguru import logger as log
log.configure(handlers=[
dict(
sink=sys.stderr,
colorize=True,
diagnose=True,
backtrace=True,
format=
"@ {time:HH:mm:ss.SSS} <b><ly>[{process.id}]{process.name}/[{thread.id}]{thread.name}</ly></b> <i><le>{name}:{function}</le></i> <lm>{file.path}:{line}</lm> \n{level.icon} <lvl>{message}</lvl> \n{exception}",
level=LOG_LEVEL)
])
# Progressbar
from tqdm import tqdm
# from tqdm.asyncio import tqdm_asyncio
# Timers and pretty debug prints
from devtools import debug, pprint as pp
import concurrent.futures as cf
def argsfn():
"""
Generator for arguments
"""
for i in range(NUM_OF_TASKS):
yield (i // 666, i % 666)
class ArgumentsIterator():
"""
Arguments iterable. It _supposed_ to be slow wank to keep level with argsfn()
"""
def __init__(self, num_of_tasks=NUM_OF_TASKS):
self.total = num_of_tasks
self.range = iter(range(num_of_tasks))
def __iter__(self):
return self
def __next__(self):
i = next(self.range)
return (i // 666, i % 666)
def __len__(self):
return self.total
def workfn(*args):
"""
Function to perform main task
"""
x, y = args
res = pow(x, y)
log.debug("Workfn function done. Args were: {}. Result is: {}", args, res)
return (res)
def workfn_for_map(arg, *args):
"""
Wrap aroung workfn for cf.map variant. For some reason in case of .map arguments comes packed in additional tuple.
I don't have time to reasearch/fix it, hence wraper.
"""
return workfn(*arg)
def wait_first(futures):
"""
Wait for the first future to complete.
Returns:
(done, not_done): two sets of futures.
"""
return cf.wait(futures, return_when=cf.FIRST_COMPLETED)
def do_work(threads, argsfn, workfn):
"""
Do a pile of work, maybe in threads, with a progress bar.
Two callables are provided: `workfn` is the unit of work to be done,
many times. Its arguments are provided by calling `argsfn`, which
must produce a sequence of tuples. `argsfn` will be called a few
times, and must produce the same sequence each time.
Args:
threads: the number of threads to use.
argsfn: a callable that produces tuples, the arguments to `workfn`.
workfn: a callable that does work.
Returns:
Average integer returned by workfn
"""
result = []
total = sum(1 for _ in argsfn())
with tqdm(total=total, smoothing=0.02, desc="Do work.") as progressbar:
if threads:
limit = 2 * threads
not_done = set()
def finish_some():
nonlocal not_done
done, not_done = wait_first(not_done)
for done_future in done:
res = done_future.result()
exc = done_future.exception()
if exc is not None:
log.error("Failed future:", exc_info=exc)
log.debug("Task completed. Result is : {} ", res)
result.append(res)
progressbar.update(len(done))
with cf.ThreadPoolExecutor(max_workers=threads) as executor:
for args in argsfn():
while len(not_done) >= limit:
finish_some()
not_done.add(executor.submit(workfn, *args))
while not_done:
finish_some()
else:
for args in argsfn():
workfn(*args)
progressbar.update(1)
return sum(result) // len(result)
def do_work_fc_as_comp(threads, argsi, workfn):
"""
Same as do_work, but taking iterable as second arg and using concurrent.futures.as_completed iterable
"""
result = []
with cf.ThreadPoolExecutor(max_workers=threads) as executor:
log.debug("Creating futures")
futures = [executor.submit(workfn, *args) for args in argsi]
log.debug("Processing futures")
for f in tqdm(cf.as_completed(futures),
desc="As completed. Futures done"):
try:
#!TODO: Refactor for the love of Ishtar !
res = f.result()
e = f.exception()
if e is not None:
# log.exception("Error processing task.", e)
raise e
except Exception as e:
log.exception("Error processing task.", e)
else:
log.debug("Task completed. Result is : {}", res)
result.append(res)
log.debug("Done")
return sum(result) // len(result)
def do_work_cf_map(threads, argsi, workfn):
"""
Same as do_work, but taking iterable as second arg and using concurrent.futures.map function
"""
with cf.ThreadPoolExecutor(max_workers=threads) as executor:
result = list(executor.map(workfn, argsi))
return sum(result) // len(result)
if __name__ == "__main__":
# with debug.timer("Single thread version"):
# res_single_thread = do_work(0, argsfn, workfn)
with debug.timer("Ned's version"):
res = do_work(NUM_OF_THREADS, argsfn, workfn)
# We have to call list for generator to get len/limit
# Returned arguments iterable "wrapped" in progressbar representing creation of the futures
# Function implements other progressbar for execution
argsl = tqdm(list(argsfn()), desc="cf.as_completed. Args generator")
with debug.timer("cf.as_completed"):
res_cf_as_compl = do_work_fc_as_comp(NUM_OF_THREADS, argsl, workfn)
# ArgumentsIterator is container version of argsfn generator, hence __len__()
# Function decoupled from progressbar completely. It takes iterable of args wrapped in pbar
# Due to eager execution of map() this makes quite reasonable pbar
argsi = tqdm(ArgumentsIterator(), desc="cf.map")
with debug.timer("cf.map"):
res_cf_map = do_work_cf_map(NUM_OF_THREADS, argsi, workfn_for_map)
if res == res_cf_as_compl and res == res_cf_map:
# We better warn public
log.warning(
"We've calculated mean of the Beast ! Behold the unholy number !")
pp(res)
else:
log.error(
"Unable to figure out mean of the Beast, algos returned different results"
)
log.error("Res: {}\nRes_cf_as_compl: {}\nRes_cf_map: {}", res,
res_cf_as_compl, res_cf_map)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment