Skip to content

Instantly share code, notes, and snippets.

@stlk
Forked from Vigrond/celery_django_redis_ratelimit.py
Last active November 11, 2021 08:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stlk/1a5647028505495d8e9fd18c52440605 to your computer and use it in GitHub Desktop.
Save stlk/1a5647028505495d8e9fd18c52440605 to your computer and use it in GitHub Desktop.
RQ / Django / Redis Rate Limiting
# Rate limiting with Python RQ + Django + Redis
# Multiple Fixed Windows Algorithm inspired by Figma https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
import functools
from typing import Callable
import django_rq
from rq import Retry
from django.utils import timezone
from datetime import timedelta
redis_conn = django_rq.get_connection("default")
def is_rate_okay(name: str, times: int = 30 * 10, per: int = 60 * 10):
"""
Checks to see if this task is hitting our defined rate limit too much.
This example sets a rate limit of 30/minute.
times (int): The "30" in "30 times per 60 seconds".
per (int): The "60" in "30 times per 60 seconds".
The Redis structure we create is a Hash of timestamp keys with counter values
{
'1560649027.515933': '2', // unlikely to have more than 1
'1560649352.462433': '1',
}
The Redis key is expired after the amount of 'per' has elapsed.
The algorithm totals the counters and checks against 'limit'.
This algorithm currently does not implement the "leniency" described
at the bottom of the figma article referenced at the top of this code.
This is left up to you and depends on application.
Returns True if under the limit, otherwise False.
"""
# Get a timestamp accurate to the microsecond
timestamp = timezone.now().timestamp()
# Set our Redis key to our task name
key = f"rate:{name}"
# Create a pipeline to execute redis code atomically
pipe = redis_conn.pipeline()
# Increment our current task hit in the Redis hash
pipe.hincrby(key, timestamp)
# Grab the current expiration of our task key
pipe.ttl(key)
# Grab all of our task hits in our current frame (of 60 seconds)
pipe.hvals(key)
# This returns a list of our command results. [current task hits, expiration, list of all task hits,]
result = pipe.execute()
# If our expiration is not set, set it. This is not part of the atomicity of the pipeline above.
if result[1] < 0:
redis_conn.expire(key, per)
# We must convert byte to int before adding up the counters and comparing to our limit
if sum([int(count) for count in result[2]]) <= times:
return True
else:
return False
def rate_limiting(key_func: Callable[[tuple, dict], str], times: int, per: int):
def rate_limiting_decorator(func):
@functools.wraps(func)
def wrapper_decorator(*args, **kwargs):
if is_rate_okay(key_func(*args, **kwargs), times, per):
return func(*args, **kwargs)
django_rq.get_queue().enqueue_in(
timedelta(seconds=90),
func,
args=args,
retry=Retry(max=3, interval=[10, 30, 60]),
)
return wrapper_decorator
return rate_limiting_decorator
# Use decorator to limit rate to 10 times per minute per user
@rate_limiting(
lambda username, _: f"inventory_levels_update-{username}",
times=10,
per=60,
)
def inventory_levels_update(username: str, inventory_level_id: str):
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment