Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Leaky bucket rate limiter in pythong
def get_current_time_millis():
return int(time.time() * 1000)
# Rate limiter that issues permits at a certain rate but only allows a certain
# number of permits to accumulate regardless of how long you wait.
class LeakyBucket(object):
def __init__(self, permits_per_sec, max_accumulation):
self.millis_per_permit = 1000.0 / permits_per_sec
self.max_accumulation = max_accumulation
self.last_permit = self.get_current_time_millis()
self.lock = threading.Lock()
# Returns the current time in millis since epoch.
def get_current_time_millis(self):
return get_current_time_millis()
# Sleeps for the given number of milliseconds.
def sleep_millis(self, millis):
secs = millis / 1000.0"Waiting %ss. for permit", int(secs))
# Explicitly sets the time of the last permit.
def set_last_permit(self, value):
self.last_permit = value
return self
# Waits until the next permit is issued.
def wait_for_permit(self):
current_time = self.get_current_time_millis()
# Waits until the next permit it issued, ignoring accumulation.
def _wait_for_next_permit(self, current_time):
next_permit = self.last_permit + self.millis_per_permit
if current_time < next_permit:
self.sleep_millis(next_permit - current_time)
self.last_permit = next_permit
# Ensures that no more than the allowed number of permits has accumulated.
def _limit_accumulation(self, current_time):
accumulation = (current_time - self.last_permit) / self.millis_per_permit
if accumulation > self.max_accumulation:
new_last_permit = current_time - (self.max_accumulation * self.millis_per_permit)
assert new_last_permit > self.last_permit
self.last_permit = new_last_permit
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.