Skip to content

Instantly share code, notes, and snippets.

@lionel-panhaleux
Created September 17, 2015 16:28
Show Gist options
  • Save lionel-panhaleux/bd62ade43afe57e6f365 to your computer and use it in GitHub Desktop.
Save lionel-panhaleux/bd62ade43afe57e6f365 to your computer and use it in GitHub Desktop.
pg_*_advisory_lock functions for django
# -*- coding: utf-8 -*-
# Copyright (c) Polyconseil SAS.
#
# Licensed under the MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from contextlib import contextmanager
import hashlib
import struct
from django.db import DEFAULT_DB_ALIAS, connections
from django.utils import six
def advisory_xact_lock_many(*objs, **kwargs):
""" Acquire multiple advisory transactional locks for given IDs. See ``advisory_xact_lock``.
"""
# always acquire locks *in the same order* to avoid deadlocks
for i in sorted(objs):
advisory_xact_lock(i, **kwargs)
def advisory_xact_lock(obj, shared=False, wait=True, using=None):
""" Acquire an advisory transactional lock for a given ID, using PSQL.
obj: bytes, str, unicode or int
This can be used as a semaphore over database transactions for distributed systems.
The lock is released automatically at the end of the transaction.
..note::
The lock is **not** released when you rollback to a previous savepoint.
This means if you use ``advisory_xact_lock`` inside a ``django.db.transaction.atomic`` bloc,
raise inside the bloc and catch the exception at a higher level, the lock is still in place.
This should not be an issue, as you can totally lock the same resource multiple
times inside the same transaction.
Example::
try:
with transaction.atomic:
advisory_xact_lock(42)
MyObject.objects.get(value=42)
except MyObject.DoesNotExist:
# 42 is still locked
# but as we are in the same transaction, we can lock it again
advisory_xact_lock(42)
MyObject.objects.create(value=42)
..note::
Avoid deadlocks ! If you need to lock multiple resources,
do so at a single point with ``advisory_xact_lock_many``
"""
command = 'select pg_{0}advisory_xact_lock{1}(%s)'.format(
'' if wait else 'try_',
'_shared' if shared else '')
cursor = connections[using or DEFAULT_DB_ALIAS].cursor()
cursor.execute(command, (_get_lock_id(obj),))
return cursor.fetchone()[0] if not wait else True
@contextmanager
def advisory_lock(obj, shared=False, wait=True, using=None):
""" Acquire an advisory lock for given ID, release at the end of the context.
PREFER ``advisory_xact_lock`` IF YOU ARE DOING DATABASE OPERATIONS INSIDE THE BLOCK
warning::
The lock is released at the end of the context, even if the transaction is
not committed yet. A concurrent process can then acquire the lock and
read or write the database, without getting any of the yet-to-be-committed changes,
**as if there was no lock in the first place**.
"""
command = 'select pg_{0}advisory_lock{1}(%s)'.format(
'' if wait else 'try_',
'_shared' if shared else '')
release = 'select pg_advisory_unlock{0}(%s)'.format(
'_shared' if shared else '')
lock_id = _get_lock_id(obj)
cursor = connections[using or DEFAULT_DB_ALIAS].cursor()
cursor.execute(command, (lock_id,))
acquired = cursor.fetchone()[0] if not wait else True
try:
yield acquired
finally:
if acquired:
cursor.execute(release, (lock_id,))
cursor.close()
def _get_lock_id(obj):
""" This function is used internally to generate a PSQL ``bigint`` for ``pg_*_lock`` functions.
Integers inside ``bigint`` boundaries are used directly.
Bytes are hashed using SHA-256 and the first 8 bytes are cast to a bigint.
Unicode strings are converted to bytes using utf-8.
Other types will raise an error.
On the choice of hash for strings and bytes, other solutions have been considered and rejected:
- Using zlib.crc32 reduces the key space (this can cause deadlocks with > 100K cardinality)
- Using crc64 from crcmod is faster than hashlib.sha256, but adds a dependency (premature optimization)
- Using the Python internal ``hash()`` function is not robust because it is platform dependent
"""
if isinstance(obj, six.integer_types):
# Python2 long and Python3 int are not bounded : we need to check it.
if -2**63 <= obj < 2**63:
return obj
else:
obj = str(obj)
# Encoding is mandatory for Python2 unicode, Python2 non-ASCII str or Python3 str
if isinstance(obj, six.string_types):
obj = obj.encode('utf-8')
# obj must be convertible to a buffer (Python2 ASCII str or Python3 bytes, aka six.binary_type)
# "=q" format ensures an 8-bytes signed standard integer even on 32-bits Python
return struct.unpack_from("=q", hashlib.sha256(obj).digest())[0]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment