Skip to content

Instantly share code, notes, and snippets.

@pssolanki111
Last active July 26, 2022 16:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pssolanki111/d7ef5d8cee93ca74e80ef8d6f39b9268 to your computer and use it in GitHub Desktop.
Save pssolanki111/d7ef5d8cee93ca74e80ef8d6f39b9268 to your computer and use it in GitHub Desktop.
A Rate limiter for async applications. Implements the Leaky Bucket Algorithm
import asyncio
import contextlib
from collections import OrderedDict
import time
class AsyncLeakyBucket(contextlib.AbstractAsyncContextManager):
def __init__(self, max_rate: float, time_period: float = 60, loop=None):
self._loop, self._max_level = loop, max_rate
self._rate_per_sec, self._level = max_rate / time_period, 0.0
self._last_check, self._waiters = 0.0, OrderedDict()
def _leak(self):
if self._level:
elapsed = time.time() - self._last_check
decrement = elapsed * self._rate_per_sec
self._level = max(self._level - decrement, 0)
self._last_check = time.time()
def has_capacity(self, amount: float = 1) -> bool:
self._leak()
requested = self._level + amount
if requested < self._max_level:
for fut in self._waiters.values():
if not fut.done():
fut.set_result(True)
break
return self._level + amount <= self._max_level
async def acquire(self, amount: float = 1):
if amount > self._max_level:
raise ValueError("Can't acquire more than the bucket capacity")
loop = self._loop or asyncio.get_event_loop()
task = asyncio.current_task(loop)
assert task is not None
while not self.has_capacity(amount):
fut = loop.create_future()
self._waiters[task] = fut
try:
await asyncio.wait_for(asyncio.shield(fut), 1 / self._rate_per_sec * amount, loop=loop)
except asyncio.TimeoutError:
pass
fut.cancel()
self._waiters.pop(task, None)
self._level += amount
async def __aenter__(self):
await self.acquire()
return None
async def __aexit__(self, exc_type, exc, tb) -> None:
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment