Skip to content

Instantly share code, notes, and snippets.

@benfasoli
Created January 23, 2022 17:18
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save benfasoli/650a57923ab1951e1cb6355f033cbc8b to your computer and use it in GitHub Desktop.
Save benfasoli/650a57923ab1951e1cb6355f033cbc8b to your computer and use it in GitHub Desktop.
Limit concurrency with Python asyncio
import asyncio
from typing import Coroutine, List, Sequence
def _limit_concurrency(
coroutines: Sequence[Coroutine], concurrency: int
) -> List[Coroutine]:
"""Decorate coroutines to limit concurrency.
Enforces a limit on the number of coroutines that can run concurrently in higher
level asyncio-compatible concurrency managers like asyncio.gather(coroutines) and
asyncio.as_completed(coroutines).
"""
semaphore = asyncio.Semaphore(concurrency)
async def with_concurrency_limit(coroutine: Coroutine) -> Coroutine:
async with semaphore:
return await coroutine
return [with_concurrency_limit(coroutine) for coroutine in coroutines]
async def sleep_for_seconds(seconds: float) -> None:
print(f"Going to sleep for {seconds} seconds...")
await asyncio.sleep(seconds)
print(f"Woke up after {seconds} seconds!")
async def main():
coroutines = [sleep_for_seconds(1), sleep_for_seconds(2), sleep_for_seconds(3)]
await asyncio.gather(*_limit_concurrency(coroutines, concurrency=2))
if __name__ == "__main__":
asyncio.run(main())
# Going to sleep for 1 seconds...
# Going to sleep for 2 seconds...
# Woke up after 1 seconds!
# Going to sleep for 3 seconds...
# Woke up after 2 seconds!
# Woke up after 3 seconds!
@teodoryantcheff
Copy link

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment