Skip to content

Instantly share code, notes, and snippets.

@svoorakk
Last active September 6, 2024 15:51
Show Gist options
  • Save svoorakk/f0df972b454ef7538dec509334a67fcb to your computer and use it in GitHub Desktop.
Save svoorakk/f0df972b454ef7538dec509334a67fcb to your computer and use it in GitHub Desktop.
Python helper utility for concurrent execution using asyncio module
import asyncio
import time
import logging
logger = logging.getLogger(__name__)
'''
Function to concurrently execute a function with different inputs provided as a 2-d list
'''
def exec_async_multi_input(func, inputs, concurrent_thread_limit=5, sleep_time_msecs=200):
# create a list of functions of same size as inputs list
funcs = [func for _ in inputs]
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs))
return results
'''
Function to concurrently execute multiple functions with same inputs
'''
def exec_async_multi_function(funcs, input, concurrent_thread_limit=5, sleep_time_msecs=100):
# create a list of inputs of same size as functions list
inputs = [input for _ in funcs]
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs))
return results
'''
Function to concurrently execute multiple functions with each having different inputs
'''
def exec_async_multi_function_multi_input(funcs, inputs, concurrent_thread_limit=5, sleep_time_msecs=100):
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs))
return results
'''
Function for concurrent execution
'''
async def exec_async(funcs, inputs, concurrent_thread_limit=5, sleep_time_msecs=200):
if not (len(funcs) == len(inputs)):
raise Exception("Number of coroutines should be same as number of inputs")
count = len(funcs)
tasks = []
all_results = []
for idx, func in enumerate(funcs):
input = inputs[idx]
task = asyncio.create_task(asyncio.to_thread(func, *input))
tasks.append(task)
# if end of the input list is reached or concurrent thread limit is reached
# execute tasks and get result
if len(tasks) == concurrent_thread_limit or count == idx + 1:
result = await asyncio.gather(*tasks, return_exceptions=True)
all_results = all_results + result
tasks = []
# If end of list is still not reached, then sleep before next batch
if idx < count - 1:
logger.info(f"Sleeping for {sleep_time_msecs} milli secs between thread batches")
time.sleep(sleep_time_msecs / 1000.0)
return all_results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment