Created
January 18, 2024 18:25
-
-
Save alexpovel/ce1c1256b65fade42b80cef84fca755e to your computer and use it in GitHub Desktop.
async-based leaky bucket implementation (multi-threaded-like). For fun and practice
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import datetime | |
from collections import deque | |
from dataclasses import dataclass | |
from itertools import count | |
from typing import NewType | |
Request = NewType("Request", str) | |
SINK: list[Request] = [] | |
@dataclass | |
class Bucket: | |
n_max: int | |
rate: datetime.timedelta | |
sink: list[Request] | |
def __post_init__(self) -> None: | |
self.deque: deque[Request] = deque() | |
async def submit(self, request: Request) -> None: | |
if len(self.deque) > self.n_max: | |
print( | |
f"Dropping request {request} (reason: bucket overflow: {list(self.deque)})" | |
) | |
return | |
else: | |
print(f"Appending request {request} ({list(self.deque)})") | |
self.deque.append(request) | |
async def drain(self) -> None: | |
while True: | |
try: | |
req = self.deque.popleft() | |
print(f"Removing and sending off request: {req}") | |
except IndexError: | |
print("Bucket empty, nothing to drain") | |
else: | |
self.sink.append(req) | |
await asyncio.sleep(self.rate.total_seconds()) | |
async def main() -> None: | |
bucket = Bucket( | |
n_max=3, | |
rate=datetime.timedelta(seconds=0.01), | |
sink=SINK, | |
) | |
asyncio.create_task(bucket.drain()) | |
for i in count(): | |
await asyncio.sleep(0.005) | |
request = Request(f"req. no {i}") | |
await bucket.submit(request) | |
if __name__ == "__main__": | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment