Skip to content

Instantly share code, notes, and snippets.

@clbarnes
Created August 9, 2023 11:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clbarnes/a5e17e20e321d16bb00d75315a1b9ab7 to your computer and use it in GitHub Desktop.
Save clbarnes/a5e17e20e321d16bb00d75315a1b9ab7 to your computer and use it in GitHub Desktop.
Semaphore-based asyncio rate limiters
import asyncio as aio
from collections import deque
from typing import Awaitable, Iterable, TypeVar
T = TypeVar("T")
class BaseLimit:
async def limit(self, awa: Awaitable[T]) -> T:
return await aio.ensure_future(awa)
class ConcurrencyLimit(BaseLimit):
def __init__(self, count: int) -> None:
self.semaphore = aio.Semaphore(count)
super().__init__()
async def limit(self, awa: Awaitable[T]) -> T:
await self.semaphore.acquire()
fut = aio.ensure_future(awa)
def release_callback(_fut):
self.semaphore.release()
fut.add_done_callback(release_callback)
return await fut
class RateLimit(BaseLimit):
def __init__(self, count: int = 1, seconds: float = 1) -> None:
self.semaphore = aio.Semaphore(int(count))
self.seconds = seconds
self._wait_tasks = set()
def _schedule_semaphore_release(self):
def release_callback(task):
self.semaphore.release()
self._wait_tasks.discard(task)
wait = aio.create_task(aio.sleep(self.seconds))
wait.add_done_callback(release_callback)
self._wait_tasks.add(wait)
async def limit(self, awa: Awaitable[T]) -> T:
await self.semaphore.acquire()
task = aio.ensure_future(awa)
self._schedule_semaphore_release()
return await task
def sliding_window(awas: Iterable[Awaitable[T]], concurrency: int, ordered=True) -> Iterable[Awaitable[T]]:
if not ordered:
yield from _sliding_window_unordered(awas, concurrency)
return
q = deque(maxlen=concurrency)
awa_it = iter(awas)
it_empty = False
for _, item in zip(range(concurrency), awa_it):
t = aio.ensure_future(item)
q.append(t)
if len(q) < concurrency:
it_empty = True
while q:
yield q.popleft()
if it_empty:
continue
try:
item = next(awa_it)
except StopIteration:
it_empty = True
else:
q.append(aio.ensure_future(item))
def _sliding_window_unordered(awas: Iterable[Awaitable[T]], concurrency: int) -> Iterable[Awaitable[T]]:
done = aio.Queue()
awa_it = iter(awas)
it_empty = False
todo = set()
def _on_completion(f):
todo.remove(f)
done.put_nowait(f)
async def _wait_for_one():
f = await done.get()
return f.result()
count = 0
for _, i in zip(range(concurrency), awa_it):
t = aio.ensure_future(i)
t.add_done_callback(_on_completion)
todo.add(t)
count += 1
if count < concurrency:
it_empty = True
while not it_empty:
yield _wait_for_one()
count -= 1
try:
i = next(awa_it)
except StopIteration:
it_empty = True
else:
t = aio.ensure_future(i)
t.add_done_callback(_on_completion)
todo.add(t)
count += 1
while count > 0:
yield _wait_for_one()
count -= 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment