Skip to content

Instantly share code, notes, and snippets.

@alexpovel
Created January 18, 2024 18:25
Show Gist options
  • Save alexpovel/ce1c1256b65fade42b80cef84fca755e to your computer and use it in GitHub Desktop.
Save alexpovel/ce1c1256b65fade42b80cef84fca755e to your computer and use it in GitHub Desktop.
async-based leaky bucket implementation (multi-threaded-like). For fun and practice
import asyncio
import datetime
from collections import deque
from dataclasses import dataclass
from itertools import count
from typing import NewType
Request = NewType("Request", str)
SINK: list[Request] = []
@dataclass
class Bucket:
n_max: int
rate: datetime.timedelta
sink: list[Request]
def __post_init__(self) -> None:
self.deque: deque[Request] = deque()
async def submit(self, request: Request) -> None:
if len(self.deque) > self.n_max:
print(
f"Dropping request {request} (reason: bucket overflow: {list(self.deque)})"
)
return
else:
print(f"Appending request {request} ({list(self.deque)})")
self.deque.append(request)
async def drain(self) -> None:
while True:
try:
req = self.deque.popleft()
print(f"Removing and sending off request: {req}")
except IndexError:
print("Bucket empty, nothing to drain")
else:
self.sink.append(req)
await asyncio.sleep(self.rate.total_seconds())
async def main() -> None:
bucket = Bucket(
n_max=3,
rate=datetime.timedelta(seconds=0.01),
sink=SINK,
)
asyncio.create_task(bucket.drain())
for i in count():
await asyncio.sleep(0.005)
request = Request(f"req. no {i}")
await bucket.submit(request)
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment