Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Django cache work-sharing using PostgreSQL advisory locks
from django.core.cache import cache
from django.core.cache.backends.base import DEFAULT_TIMEOUT
from django.db import connection , transaction
from hashlib import md5
def cache_chained_calculation(characteristic_str, calculate_function, timeout=DEFAULT_TIMEOUT, force_update=False):
"""
Attempt to obtain result of @calculate_function, represented by @characteristic_str, through cache or calling the
function. Should only allow one caller to be calculating the value at once (enforced using postgres advisory locks),
allowing overall work to be reduced by sharing all results using a "chained results" pattern.
@calculate_function is called with no args - the intention is for the caller to stick it in a closure (or use partial)
if they want that kind of fanciness.
This works particularly well for expensive @calculate_functions which are likely to get called (with the same
parameters) by multiple concurrent requests - the work will only be performed by the first process to acquire the lock
and all processes can share the result.
PostgreSQL is a convenient "global lock server" in this case because almost all failure modes of a django request should
cause the current transaction to be aborted, ensuring we never leave any stray locks behind.
"""
# we're doing the hashing of the characteristic_str *within* this function because we need to know that the hash
# is uniformly "scrambled", seeing as we're only going to be using one end of it for the pg advisory lock.
#
# md5 is probably good enough for us as we're not really worried about the *cryptographic* properties of the hash in
# this case
key = md5(unicode(characteristic_str).encode("utf8")).hexdigest()
# first simply check to see if the key is in the cache
if not force_update:
result = cache.get(key)
if result is not None:
return result
# ok then, let's check to see something else is currently calculating this already
with transaction.atomic():
cursor = connection.cursor()
# acquire advisory lock to see if value is currently being calculated
# we're doing a tiny bit of namespacing here, keeping the most significant nibble as 0xc (for "cache"), the remaining
# 60 bits consisting of the least significant 60 bits of the parent cache key
cursor.execute("SELECT pg_advisory_xact_lock((x'c'::bigint<<60)|x%s::bigint)", (key[-15:],))
# now we've got to re-check whether the value is present in the cache as it may have been
# calculated while we were waiting to acquire the lock
if not force_update:
result = cache.get(key)
if result is not None:
# oh ok, it was - let's return that
return result
# else we should now be sure that the value doesn't exist and nothing else is trying to calculate it
# so let's calculate it
result = calculate_function()
# store this
cache.set(key, result, timeout=timeout)
# falling out of this atomic block should release the advisory lock
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment