Skip to content

Instantly share code, notes, and snippets.

@mhihasan
Created July 26, 2023 15:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhihasan/3fb47c32b1d42b5a754db2cb4b6734fb to your computer and use it in GitHub Desktop.
Save mhihasan/3fb47c32b1d42b5a754db2cb4b6734fb to your computer and use it in GitHub Desktop.
Run Tasks as a stream in python
async def run_tasks_concurrently(fn, input_params, *, max_concurrency):
input_params_iter = iter(input_params)
# total_tasks = len(input_params)
total_completed_tasks = 0
results = []
tasks = {asyncio.create_task(fn(param)) for param in itertools.islice(input_params_iter, max_concurrency)}
while tasks:
finished, tasks = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
results.extend([task.result() for task in finished])
total_completed_tasks += len(finished)
# print(
# f"Completed tasks: {total_completed_tasks}/{total_tasks}, {round(total_completed_tasks * 100 / total_tasks, 2)}%"
# )
for param in itertools.islice(input_params_iter, len(finished)):
tasks.add(asyncio.create_task(fn(param)))
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment