Skip to content

Instantly share code, notes, and snippets.

View DannyMor's full-sized avatar
🎯
Focusing

Danny Mor DannyMor

🎯
Focusing
View GitHub Profile
@DannyMor
DannyMor / rate_limiter.py
Last active April 20, 2020 15:34
rate limiter 1
class RateLimiter:
def __init__(self, rate_limit: int) -> None:
self.rate_limit = rate_limit
self.tokens_queue = asyncio.Queue(rate_limit)
self.tokens_consumer_task = asyncio.create_task(self.consume_tokens())
async def add_token(self) -> None:
pass
async def consume_tokens(self) -> None:
async def add_token(self) -> None:
await self.tokens_queue.put(1)
return None
async def consume_tokens(self):
consumption_rate = 1 / self.rate_limit
last_consumption_time = 0
while True:
if self.tokens_queue.empty():
await asyncio.sleep(consumption_rate)
continue
current_consumption_time = time.monotonic()
@staticmethod
def get_tokens_amount_to_consume(consumption_rate, current_consumption_time, last_consumption_time, total_tokens):
time_from_last_consumption = current_consumption_time - last_consumption_time
calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate)
tokens_to_consume = min(total_tokens, calculated_tokens_to_consume)
return tokens_to_consume
class RateLimiter:
def __init__(self, rate_limit: int, concurrency_limit: int) -> None:
self.rate_limit = rate_limit
self.tokens_queue = asyncio.Queue(rate_limit)
self.tokens_consumer_task = asyncio.create_task(self.consume_tokens())
self.semaphore = asyncio.Semaphore(concurrency_limit)
async def add_token(self) -> None:
pass
async def throttle(self) ->
async with self.semaphore:
await self.produce_quota()
@asynccontextmanager
async def throttle(self) -> None:
await self.semaphore.acquire()
await self.add_token()
try:
yield
finally:
self.semaphore.release()
async def close(self) -> None:
if self.quota_consumer_task and not self.quota_consumer_task.cancelled():
try:
self.quota_consumer_task.cancel()
await self.quota_consumer_task
except asyncio.CancelledError:
# ignore this exception and log here to inform the consumer task was cancelled
pass
except Exception as e:
# log and deal with the error
async def close(self) -> None:
if self.quota_consumer_task and not self.quota_consumer_task.cancelled():
try:
self.quota_consumer_task.cancel()
await self.quota_consumer_task
except asyncio.CancelledError:
# ignore the error here but log to inform the task was cancelled
pass
except Exception as e:
# log and deal with the error here
async def close(self) -> None:
if self.quota_consumer_task and not self.quota_consumer_task.cancelled():
try:
self.quota_consumer_task.cancel()
await self.quota_consumer_task
except asyncio.CancelledError:
# ignore the error here but log to inform the task was cancelled
pass
except Exception as e:
# log and deal with the error here