Skip to content

Instantly share code, notes, and snippets.

@spumer
Created September 4, 2020 10:09
Show Gist options
  • Save spumer/38519865815f26a38b2b11776f28faf2 to your computer and use it in GitHub Desktop.
Save spumer/38519865815f26a38b2b11776f28faf2 to your computer and use it in GitHub Desktop.
SessionLock for PG (pg_try_advisory_lock)
import contextlib
import logging
import time
log = logging.getLogger(__name__)
class SessionLock:
def __init__(self, connection, lock_id: int):
"""
:param connection: Соедиение с БД
:param lock_id: Идентификатор блокировки
"""
self._id = lock_id
self._connection = connection
def acquire(self, timeout: int = None):
locked = None
end_time = None
if timeout is not None:
end_time = time.monotonic() + timeout
while not locked:
log.debug('Получаем блокировку id=%s', self._id)
locked = self._try_lock()
if end_time and time.monotonic() >= end_time:
return locked
if not locked:
log.debug('Не удалось получить блокировку id=%s, ждем 1 сек.', self._id)
time.sleep(1)
log.debug('Удалось получить блокировку по ключу id=%s', self._id)
return locked
def _try_lock(self):
with contextlib.closing(self._connection.cursor()) as cur:
cur.execute(f'SELECT pg_try_advisory_lock({self._id});')
return cur.fetchone()[0]
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
@contextlib.contextmanager
def wait_lock(lock: SessionLock, timeout: int):
"""
Механизм ограничивающий время ожидания получения блокировки в случаях
когда нужно ждать блокировку в течение какого то времени
"""
acquired = lock.acquire(timeout=timeout)
if not acquired:
raise TimeoutError
yield
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment