Skip to content

Instantly share code, notes, and snippets.

@flux627
Last active September 6, 2019 14:03
Show Gist options
  • Save flux627/02fbaa85a533ee1891ddd3b3fbf1f490 to your computer and use it in GitHub Desktop.
Save flux627/02fbaa85a533ee1891ddd3b3fbf1f490 to your computer and use it in GitHub Desktop.
Python implementation of token bucket, segmented by unique IDs
from time import time, sleep
from collections import defaultdict
from threading import Lock
class TokenBucket:
"""
An implementation of the token bucket algorithm.
"""
def __init__(self, capacity, rate, min_interval=0):
self.capacity = capacity
self.tokens = capacity
self.rate = rate
self.min_interval = min_interval
self.last = time()
def default_values():
return {
"tokens" : self.capacity,
"last" : self.last,
"lock" : Lock()
}
self.uids = defaultdict(default_values)
def consume(self, uid, tokens=1):
with self.uids[uid]["lock"]:
now = time()
lapse = now - self.uids[uid]["last"]
self.uids[uid]["last"] = now
self.uids[uid]["tokens"] += lapse * self.rate
if self.uids[uid]["tokens"] > self.capacity:
self.uids[uid]["tokens"] = self.capacity
if self.uids[uid]["tokens"] == self.capacity:
first = True
else:
first = False
self.uids[uid]["tokens"] -= tokens
if self.uids[uid]["tokens"] < self.min_interval:
sleep(-self.uids[uid]["tokens"] / self.rate)
print uid, -self.uids[uid]["tokens"] / self.rate
else:
if not first:
sleep(self.min_interval)
print uid, self.min_interval
if __name__ == '__main__':
from concurrent.futures import ThreadPoolExecutor
bucket = TokenBucket(40, 2, .05)
futs = []
with ThreadPoolExecutor(max_workers=100) as executor:
for _ in range(100):
futs.append( executor.submit(bucket.consume, "foo") )
for _ in range(50) :
futs.append( executor.submit(bucket.consume, "bar") )
[ fut.result() for fut in futs ]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment