Last active
June 24, 2016 20:47
-
-
Save bradbeattie/fb920ddd4abb0da4f5ac415a5cef018c to your computer and use it in GitHub Desktop.
Demonstration of cache-based action throttling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import memcache | |
from hashlib import sha1 | |
from datetime import datetime | |
cache = memcache.Client(["127.0.0.1:11211"]) | |
class ActionThrottled(Exception): | |
pass | |
# The cache entry _key_ can never have more stamina than _maximum_stamina_. | |
# Each call decrements the stored stamina by _action_cost_ and they're | |
# regenerated at the rate of _regenerate_per_hour_. Note that sufficiently slow | |
# use of a throttled key results in relatively low use of the cache. | |
def throttle(key, action_cost=1, maximum_stamina=3, regenerate_per_hour=1800): | |
# Get the stored stamina | |
try: | |
key = "throttle:{0!r}".format(key) | |
entry = cache.get(key) | |
except memcache.Client.MemcachedKeyCharacterError: | |
# If the given key doesn't work, try massaging it into a usable format | |
key = "throttle:{0}".format(sha1(repr(key)).hexdigest()) | |
entry = cache.get(key) | |
# Determine how much stamina remains based on how much it's regenerated since its last use | |
if (entry is None): | |
stamina = maximum_stamina | |
else: | |
stamina = min( | |
maximum_stamina, | |
entry[0] + (datetime.now() - entry[1]).total_seconds() * (regenerate_per_hour / 3600.0) | |
) | |
# Decrement the stamina and choke if it's too low | |
stamina -= action_cost | |
if (stamina < 0): | |
raise ActionThrottled(stamina) | |
# Note the success by storing the newly decreased stamina | |
cache.set( | |
key, | |
(stamina, datetime.now()), | |
3600.0 * (maximum_stamina - stamina) / regenerate_per_hour | |
) | |
# Return the value as the caller may want to show warning messages if it's sufficiently low | |
return stamina | |
# Provide a throttling decorator for functions | |
def throttled(*decorator_args, **decorator_kwargs): | |
def wrapper(original_function): | |
def wrapped_function(*function_args, **function_kwargs): | |
throttle(*decorator_args, **decorator_kwargs) | |
return original_function(*function_args, **function_kwargs) | |
return wrapped_function | |
return wrapper | |
# If this script is called directly, just run a little example | |
if __name__ == "__main__": | |
@throttled(["main throttle"]) | |
def foobar(): | |
print "Foobar executed!" | |
foobar() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment