Skip to content

Instantly share code, notes, and snippets.

@julik
Last active August 28, 2022 08:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save julik/22eed0b45726d0eb8fb8570d974a5a7f to your computer and use it in GitHub Desktop.
Save julik/22eed0b45726d0eb8fb8570d974a5a7f to your computer and use it in GitHub Desktop.
# This offers just the leaky bucket implementation with fill control, but without the timed lock.
# It does not raise any exceptions, it just tracks the state of a leaky bucket in Postgres.
#
# Leak rate is specified directly in tokens per second, instead of specifying the block period.
# The bucket level is stored and returned as a Float which allows for finer-grained measurement,
# but more importantly - makes testing from the outside easier.
#
# Note that this implementation has a peculiar property: the bucket is only "full" once it overflows.
# Due to a leak rate just a few microseconds after that moment the bucket is no longer going to be full
# anymore as it will have leaked some tokens by then. This means that the information about whether a
# bucket has become full or not gets returned in the bucket `State` struct right after the database
# update gets executed, and if your code needs to make decisions based on that data it has to use
# this returned state, not query the leaky bucket again. Specifically:
#
# state = bucket.fillup(1) # Record 1 request
# state.full? #=> true, this is timely information
#
# ...is the correct way to perform the check. This, however, is not:
#
# bucket.fillup(1)
# bucket.state.full? #=> false, some time has passed after the topup and some tokens have already leaked
#
# The storage use is one DB row per leaky bucket you need to manage (likely - one throttled entity such
# as a combination of an IP address + the URL you need to procect). The `key` is an arbitrary string you provide.
class RateLimiter::LeakyBucket
class State < Struct.new(:level, :full)
# Returns the level of the bucket after the operation on the LeakyBucket
# object has taken place. There is a guarantee that no tokens have leaked
# from the bucket between the operation and the freezing of the State
# struct.
#
# @!attribute [r] level
# @return [Float]
# Tells whether the bucket was detected to be full when the operation on
# the LeakyBucket was performed. There is a guarantee that no tokens have leaked
# from the bucket between the operation and the freezing of the State
# struct.
#
# @!attribute [r] full
# @return [Boolean]
alias_method :full?, :full
# Returns the bucket level of the bucket state as a Float
#
# @return [Float]
def to_f
level.to_f
end
# Returns the bucket level of the bucket state rounded to an Integer
#
# @return [Integer]
def to_i
level.to_i
end
end
# Creates a new LeakyBucket. The object controls 1 row in Postgres which is
# specific to the bucket key.
#
# @param key[String] the key for the bucket. The key also gets used
# to derive locking keys, so that operations on a particular bucket
# are always serialized.
# @param leak_rate[Float] the leak rate of the bucket, in tokens per second
# @param capacity[Numeric] how many tokens is the bucket capped at.
# Filling up the bucket using `fillup()` will add to that number, but
# the bucket contents will then be capped at this value. So with
# bucket_capacity set to 12 and a `fillup(14)` the bucket will reach the level
# of 12, and will then immediately start leaking again.
def initialize(key:, leak_rate:, capacity:)
@key = key
@leak_rate = leak_rate.to_f
@capacity = capacity.to_f
end
# Places `n` tokens in the bucket. Once tokens are placed, the bucket is set to expire
# within 2 times the time it would take it to leak to 0, regardless of how many tokens
# get put in - since the amount of tokens put in the bucket will always be capped
# to the `capacity:` value you pass to the constructor. Calling `fillup` also deletes
# leaky buckets which have expired.
#
# @param n_tokens[Float]
# @return [State] the state of the bucket after the operation
def fillup(n_tokens)
add_tokens(n_tokens.to_f)
end
# Returns the current state of the bucket, containing the level and whether the bucket is full.
# Calling this method will not perform any database writes.
#
# @return [State] the snapshotted state of the bucket at time of query
def state
conn = ActiveRecord::Base.connection
query_params = {
key: @key,
capa: @capacity.to_f,
leak_rate: @leak_rate.to_f
}
# The `level` of the bucket is what got stored at `last_touched_at` time, and we can
# extrapolate from it to see how many tokens have leaked out since `last_touched_at` -
# we don't need to UPDATE the value in the bucket here
sql = ActiveRecord::Base.sanitize_sql_array([<<~SQL, query_params])
SELECT
GREATEST(
0.0, LEAST(
:capa,
t.level - (EXTRACT(EPOCH FROM (clock_timestamp() - t.last_touched_at)) * :leak_rate)
)
)
FROM
rate_limiter_leaky_buckets AS t
WHERE
key = :key
SQL
# If the return value of the query is a NULL it means no such bucket exists, so we assume the bucket is empty
current_level = conn.uncached { conn.select_value(sql) } || 0.0
State.new(current_level, (@capacity - current_level).abs < 0.01)
end
private
def add_tokens(n_tokens)
conn = ActiveRecord::Base.connection
# Take double the time it takes the bucket to empty under normal circumstances
# until the bucket may be deleted.
may_be_deleted_after_seconds = (@capacity.to_f / @leak_rate.to_f) * 2.0
# Create the leaky bucket if it does not exist, and update
# to the new level, taking the leak rate into account - if the bucket exists.
query_params = {
key: @key,
capa: @capacity.to_f,
delete_after_s: may_be_deleted_after_seconds,
leak_rate: @leak_rate.to_f,
fillup: n_tokens.to_f
}
sql = ActiveRecord::Base.sanitize_sql_array([<<~SQL, query_params])
INSERT INTO rate_limiter_leaky_buckets AS t
(key, last_touched_at, may_be_deleted_after, level)
VALUES
(
:key,
clock_timestamp(),
clock_timestamp() + ':delete_after_s second'::interval,
LEAST(:capa, :fillup)
)
ON CONFLICT (key) DO UPDATE SET
last_touched_at = EXCLUDED.last_touched_at,
may_be_deleted_after = EXCLUDED.may_be_deleted_after,
level = GREATEST(
0.0, LEAST(
:capa,
t.level + :fillup - (EXTRACT(EPOCH FROM (EXCLUDED.last_touched_at - t.last_touched_at)) * :leak_rate)
)
)
RETURNING level
SQL
# Note the use of .uncached here. The AR query cache will actually see our
# query as a repeat (since we use "select_value" for the RETURNING bit) and will not call into Postgres
# correctly, thus the clock_timestamp() value would be frozen between calls. We don't want that here.
# See https://stackoverflow.com/questions/73184531/why-would-postgres-clock-timestamp-freeze-inside-a-rails-unit-test
level_after_fillup = conn.uncached { conn.select_value(sql) }
State.new(level_after_fillup, (@capacity - level_after_fillup).abs < 0.01).tap do
# Prune buckets which are no longer used. No "uncached" needed here since we are using "execute"
conn.execute("DELETE FROM rate_limiter_leaky_buckets WHERE may_be_deleted_after < clock_timestamp()")
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment