Skip to content

Instantly share code, notes, and snippets.

@Guiforge
Last active May 17, 2024 09:42
Show Gist options
  • Save Guiforge/6d82df3e01d343eac960f0f12c0ecdfa to your computer and use it in GitHub Desktop.
Save Guiforge/6d82df3e01d343eac960f0f12c0ecdfa to your computer and use it in GitHub Desktop.
asyncio.gather with max number of concurrent tasks.
import asyncio
from collections.abc import Coroutine
from typing import Any
async def gather_limit(
*tasks: Coroutine[None, None, Any],
return_exceptions: bool = False,
max_con: int = 100,
) -> Any:
"""Like asyncio.gather but with the maximum number of concurrent tasks."""
semaphore = asyncio.Semaphore(max_con)
async def sem_task(task: Coroutine[None, None, Any]) -> Any:
async with semaphore:
return await task
return await asyncio.gather(
*(sem_task(task) for task in tasks),
return_exceptions=return_exceptions,
)
# Example:
async def main() -> None:
start = asyncio.get_running_loop().time()
await asyncio.gather(
asyncio.sleep(1),
asyncio.sleep(1),
asyncio.sleep(1),
asyncio.sleep(1),
)
end = asyncio.get_running_loop().time()
print(f"gather {end-start}")
start = asyncio.get_running_loop().time()
await gather_limit(asyncio.sleep(1), asyncio.sleep(1), asyncio.sleep(1), max_con=2)
end = asyncio.get_running_loop().time()
print(f"gather_limit {end-start}")
if __name__ == "__main__":
asyncio.run(main())
# result
# gather 1.001483416001065
# gather_limit 2.0041054690009332
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment