Skip to content

Instantly share code, notes, and snippets.

@miohtama
Last active January 15, 2022 21:20
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miohtama/86c86b4caa61a5615e44 to your computer and use it in GitHub Desktop.
Save miohtama/86c86b4caa61a5615e44 to your computer and use it in GitHub Desktop.
Rolling window rate limitation implementation for Pyramid
"""Rolling time window counter and rate limit using Redis.
Use Redis sorted sets to do a rolling time window counters and limiters. These are useful for preventing denial of service, flood and reputation attack against site elements which trigegr outgoing action (email, SMS).
Example how to do a Colander validator which checks that the form has not been submitted too many times within the time period::
import colander as s
@c.deferred
def throttle_invites_validator(node, kw):
"Protect invite functionality from flood attacks."
request = kw["request"]
limit = int(request.registry.settings.get("trees.invite_limit", 60))
def inner(node, value):
# Check we don't have many invites going out
if rollingwindow.check(request.registry, "invite_friends", window=3600, limit=limit):
# Alert devops through Sentry
logger.warn("Excessive invite traffic")
# Tell users slow down
raise c.Invalid(node, 'Too many outgoing invites at the moment. Please try again later.')
return inner
Then you construct form::
schema = schemas.InviteFriends(validator=schemas.throttle_invites_validator).bind(request=request)
form = deform.Form(schema)
You can also exercise this code in tests::
def test_flood_invite(web_server, browser, dbsession, init):
"Overload invites and see we get an error message."
b = browser
with transaction.manager:
create_user()
# Set flood limit to two attempts
init.config.registry.settings["trees.invite_limit"] = "2"
# Clear Redis counter for outgoing invitations
redis = get_redis(init.config.registry)
redis.delete("invite_friends")
# Login
b.visit(web_server + "/login")
b.fill("username", EMAIL)
b.fill("password", PASSWORD)
b.find_by_name("Log_in").click()
def flood():
b.visit("{}/invite-friends".format(web_server))
b.find_by_css("#nav-invite-friends").click()
b.fill("phone_number", "555 123 1234")
b.find_by_name("invite").click()
flood()
assert b.is_text_present("Invite SMS sent")
flood()
assert b.is_text_present("Invite SMS sent")
flood()
assert b.is_text_present("Too many outgoing invites at the moment")
More info
* http://opensourcehacker.com/2014/07/09/rolling-time-window-counters-with-redis-and-mitigating-botnet-driven-login-attacks/
* http://redis.io/commands/zadd
"""
import time
from websauna.system.core.redis import get_redis
def _check(redis, key, window=60, limit=50):
# Expire old keys (hits)
expires = time.time() - window
redis.zremrangebyscore(key, '-inf', expires)
# Add a hit on the very moment
now = time.time()
redis.zadd(key, now, now)
# If we currently have more keys than limit,
# then limit the action
if redis.zcard(key) > limit:
return True
return False
def _get(redis, key):
""" Get the current hits per rolling time window.
:param redis: Redis client
:param key: Redis key name we use to keep counter
:return: int, how many hits we have within the current rolling time window
"""
return redis.zcard(key)
def check(registry, key, window=60, limit=10):
"""Do a rolling time window counter hit.
Use ``key`` to store the current hit rate in Redis.
:param registry: Pyramid registry e.g. request.registry
:param key: Redis key name we use to keep counter
:param window: Rolling time window in seconds. Default 60 seconds.
:param limit: Allowed operations per time window. Default 10 hits.
:return: True is the maximum limit has been reached for the current time window
"""
redis = get_redis(registry)
return _check(redis, key, window, limit)
def get(registry, key):
"""Get the current hits per rolling time window.
Use ``key`` to store the current hit rate in Redis.
:param registry: Pyramid registry e.g. request.registry
:param key: Redis key name we use to keep counter
:return: int, how many hits we have within the current rolling time window
"""
redis = get_redis(registry)
return _check(redis, key)
"""Deform throttling support."""
import logging
import colander as c
from . import rollingwindow
logger = logging.getLogger(__name__)
def create_throttle_validator(name:str, max_actions_in_time_window:int, time_window_in_seconds:int=3600):
"""Creates a Colander form validator which prevents form submissions exceed certain rate.
Form submissions are throttled system wide. This prevents abuse of the system by flooding it with requests.
A logging warning is issued if the rate is exceeded. The user is greeted with an error message telling the submission is not possible at the moment.
Example::
from tomb_routes import simple_route
from websauna.system.form.throttle import create_throttle_validator
from myapp import schemas
@simple_route("/login", route_name="login", renderer="login/login.html", append_slash=False)
def login(request):
# Read allowed email login rate from the config file
email_login_rate = int(request.registry.settings.get("trees.email_login_rate", 50))
# Create a Colander schema instance with rate limit validator
email_schema = schemas.LoginWithEmail(validator=create_throttle_validator("email_login", email_login_rate)).bind(request=request)
:param name: Identify this throttler. Used as a Redis key.
:param max_actions_in_time_window: Number of allowed actions per window
:param time_window_in_seconds: Time in window in seconds. Default one hour, 3600 seconds.
:return: Function to be passed to ``validator`` Colander schema construction parameter.
"""
@c.deferred
def throttle_validator(node, kw):
"""Protect invite functionality from flood attacks."""
request = kw["request"]
limit = max_actions_in_time_window
def inner(node, value):
# Check we don't have many invites going out
if rollingwindow.check(request.registry, "throttle_" + name, window=time_window_in_seconds, limit=limit):
# Alert devops through Sentry
logger.warn("Excessive form submissions on %s", name)
# Tell users slow down
raise c.Invalid(node, 'Too many form submissions at the moment. Please try again later.')
return inner
return throttle_validator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment