Created
September 17, 2015 16:28
-
-
Save lionel-panhaleux/bd62ade43afe57e6f365 to your computer and use it in GitHub Desktop.
pg_*_advisory_lock functions for django
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# Copyright (c) Polyconseil SAS. | |
# | |
# Licensed under the MIT License | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# The above copyright notice and this permission notice shall be included in | |
# all copies or substantial portions of the Software. | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
# THE SOFTWARE. | |
from contextlib import contextmanager | |
import hashlib | |
import struct | |
from django.db import DEFAULT_DB_ALIAS, connections | |
from django.utils import six | |
def advisory_xact_lock_many(*objs, **kwargs): | |
""" Acquire multiple advisory transactional locks for given IDs. See ``advisory_xact_lock``. | |
""" | |
# always acquire locks *in the same order* to avoid deadlocks | |
for i in sorted(objs): | |
advisory_xact_lock(i, **kwargs) | |
def advisory_xact_lock(obj, shared=False, wait=True, using=None): | |
""" Acquire an advisory transactional lock for a given ID, using PSQL. | |
obj: bytes, str, unicode or int | |
This can be used as a semaphore over database transactions for distributed systems. | |
The lock is released automatically at the end of the transaction. | |
..note:: | |
The lock is **not** released when you rollback to a previous savepoint. | |
This means if you use ``advisory_xact_lock`` inside a ``django.db.transaction.atomic`` bloc, | |
raise inside the bloc and catch the exception at a higher level, the lock is still in place. | |
This should not be an issue, as you can totally lock the same resource multiple | |
times inside the same transaction. | |
Example:: | |
try: | |
with transaction.atomic: | |
advisory_xact_lock(42) | |
MyObject.objects.get(value=42) | |
except MyObject.DoesNotExist: | |
# 42 is still locked | |
# but as we are in the same transaction, we can lock it again | |
advisory_xact_lock(42) | |
MyObject.objects.create(value=42) | |
..note:: | |
Avoid deadlocks ! If you need to lock multiple resources, | |
do so at a single point with ``advisory_xact_lock_many`` | |
""" | |
command = 'select pg_{0}advisory_xact_lock{1}(%s)'.format( | |
'' if wait else 'try_', | |
'_shared' if shared else '') | |
cursor = connections[using or DEFAULT_DB_ALIAS].cursor() | |
cursor.execute(command, (_get_lock_id(obj),)) | |
return cursor.fetchone()[0] if not wait else True | |
@contextmanager | |
def advisory_lock(obj, shared=False, wait=True, using=None): | |
""" Acquire an advisory lock for given ID, release at the end of the context. | |
PREFER ``advisory_xact_lock`` IF YOU ARE DOING DATABASE OPERATIONS INSIDE THE BLOCK | |
warning:: | |
The lock is released at the end of the context, even if the transaction is | |
not committed yet. A concurrent process can then acquire the lock and | |
read or write the database, without getting any of the yet-to-be-committed changes, | |
**as if there was no lock in the first place**. | |
""" | |
command = 'select pg_{0}advisory_lock{1}(%s)'.format( | |
'' if wait else 'try_', | |
'_shared' if shared else '') | |
release = 'select pg_advisory_unlock{0}(%s)'.format( | |
'_shared' if shared else '') | |
lock_id = _get_lock_id(obj) | |
cursor = connections[using or DEFAULT_DB_ALIAS].cursor() | |
cursor.execute(command, (lock_id,)) | |
acquired = cursor.fetchone()[0] if not wait else True | |
try: | |
yield acquired | |
finally: | |
if acquired: | |
cursor.execute(release, (lock_id,)) | |
cursor.close() | |
def _get_lock_id(obj): | |
""" This function is used internally to generate a PSQL ``bigint`` for ``pg_*_lock`` functions. | |
Integers inside ``bigint`` boundaries are used directly. | |
Bytes are hashed using SHA-256 and the first 8 bytes are cast to a bigint. | |
Unicode strings are converted to bytes using utf-8. | |
Other types will raise an error. | |
On the choice of hash for strings and bytes, other solutions have been considered and rejected: | |
- Using zlib.crc32 reduces the key space (this can cause deadlocks with > 100K cardinality) | |
- Using crc64 from crcmod is faster than hashlib.sha256, but adds a dependency (premature optimization) | |
- Using the Python internal ``hash()`` function is not robust because it is platform dependent | |
""" | |
if isinstance(obj, six.integer_types): | |
# Python2 long and Python3 int are not bounded : we need to check it. | |
if -2**63 <= obj < 2**63: | |
return obj | |
else: | |
obj = str(obj) | |
# Encoding is mandatory for Python2 unicode, Python2 non-ASCII str or Python3 str | |
if isinstance(obj, six.string_types): | |
obj = obj.encode('utf-8') | |
# obj must be convertible to a buffer (Python2 ASCII str or Python3 bytes, aka six.binary_type) | |
# "=q" format ensures an 8-bytes signed standard integer even on 32-bits Python | |
return struct.unpack_from("=q", hashlib.sha256(obj).digest())[0] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment