Last active
September 6, 2024 15:51
-
-
Save svoorakk/f0df972b454ef7538dec509334a67fcb to your computer and use it in GitHub Desktop.
Python helper utility for concurrent execution using asyncio module
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import time | |
import logging | |
logger = logging.getLogger(__name__) | |
''' | |
Function to concurrently execute a function with different inputs provided as a 2-d list | |
''' | |
def exec_async_multi_input(func, inputs, concurrent_thread_limit=5, sleep_time_msecs=200): | |
# create a list of functions of same size as inputs list | |
funcs = [func for _ in inputs] | |
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs)) | |
return results | |
''' | |
Function to concurrently execute multiple functions with same inputs | |
''' | |
def exec_async_multi_function(funcs, input, concurrent_thread_limit=5, sleep_time_msecs=100): | |
# create a list of inputs of same size as functions list | |
inputs = [input for _ in funcs] | |
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs)) | |
return results | |
''' | |
Function to concurrently execute multiple functions with each having different inputs | |
''' | |
def exec_async_multi_function_multi_input(funcs, inputs, concurrent_thread_limit=5, sleep_time_msecs=100): | |
results = asyncio.run(exec_async(funcs, inputs, concurrent_thread_limit, sleep_time_msecs)) | |
return results | |
''' | |
Function for concurrent execution | |
''' | |
async def exec_async(funcs, inputs, concurrent_thread_limit=5, sleep_time_msecs=200): | |
if not (len(funcs) == len(inputs)): | |
raise Exception("Number of coroutines should be same as number of inputs") | |
count = len(funcs) | |
tasks = [] | |
all_results = [] | |
for idx, func in enumerate(funcs): | |
input = inputs[idx] | |
task = asyncio.create_task(asyncio.to_thread(func, *input)) | |
tasks.append(task) | |
# if end of the input list is reached or concurrent thread limit is reached | |
# execute tasks and get result | |
if len(tasks) == concurrent_thread_limit or count == idx + 1: | |
result = await asyncio.gather(*tasks, return_exceptions=True) | |
all_results = all_results + result | |
tasks = [] | |
# If end of list is still not reached, then sleep before next batch | |
if idx < count - 1: | |
logger.info(f"Sleeping for {sleep_time_msecs} milli secs between thread batches") | |
time.sleep(sleep_time_msecs / 1000.0) | |
return all_results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment