Django cache work-sharing using PostgreSQL advisory locks
from django.core.cache import cache | |
from django.core.cache.backends.base import DEFAULT_TIMEOUT | |
from django.db import connection , transaction | |
from hashlib import md5 | |
def cache_chained_calculation(characteristic_str, calculate_function, timeout=DEFAULT_TIMEOUT, force_update=False): | |
""" | |
Attempt to obtain result of @calculate_function, represented by @characteristic_str, through cache or calling the | |
function. Should only allow one caller to be calculating the value at once (enforced using postgres advisory locks), | |
allowing overall work to be reduced by sharing all results using a "chained results" pattern. | |
@calculate_function is called with no args - the intention is for the caller to stick it in a closure (or use partial) | |
if they want that kind of fanciness. | |
This works particularly well for expensive @calculate_functions which are likely to get called (with the same | |
parameters) by multiple concurrent requests - the work will only be performed by the first process to acquire the lock | |
and all processes can share the result. | |
PostgreSQL is a convenient "global lock server" in this case because almost all failure modes of a django request should | |
cause the current transaction to be aborted, ensuring we never leave any stray locks behind. | |
""" | |
# we're doing the hashing of the characteristic_str *within* this function because we need to know that the hash | |
# is uniformly "scrambled", seeing as we're only going to be using one end of it for the pg advisory lock. | |
# | |
# md5 is probably good enough for us as we're not really worried about the *cryptographic* properties of the hash in | |
# this case | |
key = md5(unicode(characteristic_str).encode("utf8")).hexdigest() | |
# first simply check to see if the key is in the cache | |
if not force_update: | |
result = cache.get(key) | |
if result is not None: | |
return result | |
# ok then, let's check to see something else is currently calculating this already | |
with transaction.atomic(): | |
cursor = connection.cursor() | |
# acquire advisory lock to see if value is currently being calculated | |
# we're doing a tiny bit of namespacing here, keeping the most significant nibble as 0xc (for "cache"), the remaining | |
# 60 bits consisting of the least significant 60 bits of the parent cache key | |
cursor.execute("SELECT pg_advisory_xact_lock((x'c'::bigint<<60)|x%s::bigint)", (key[-15:],)) | |
# now we've got to re-check whether the value is present in the cache as it may have been | |
# calculated while we were waiting to acquire the lock | |
if not force_update: | |
result = cache.get(key) | |
if result is not None: | |
# oh ok, it was - let's return that | |
return result | |
# else we should now be sure that the value doesn't exist and nothing else is trying to calculate it | |
# so let's calculate it | |
result = calculate_function() | |
# store this | |
cache.set(key, result, timeout=timeout) | |
# falling out of this atomic block should release the advisory lock | |
return result |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment