Skip to content

Instantly share code, notes, and snippets.

Last active March 25, 2022 20:19
Show Gist options
  • Save stanio/b7843cc18a80b42812ae128aba212245 to your computer and use it in GitHub Desktop.
Save stanio/b7843cc18a80b42812ae128aba212245 to your computer and use it in GitHub Desktop.
Liquibase LockService using database session-level locks (moved to
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.Locale;
import liquibase.database.Database;
import liquibase.database.core.MySQLDatabase;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
* Employs MySQL user-level (a.k.a. application-level or advisory) locks:
* <blockquote>
* <p>A lock obtained with <code>GET_LOCK()</code> is released explicitly by
* executing <code>RELEASE_LOCK()</code> or implicitly when your session
* terminates (either normally or abnormally). Locks obtained with
* <code>GET_LOCK()</code> are not released when transactions commit or
* roll back.</p>
* </blockquote>
* @see "<a href=''>Locking
* Functions</a> (MySQL 5.7 Reference Manual)"
* @see "<a href=''>Locking
* Functions</a> (MySQL 8.0 Reference Manual)"
public class MySQLLockService extends SessionLockService {
static final String SQL_GET_LOCK = "SELECT get_lock(?, ?)";
static final String SQL_RELEASE_LOCK = "SELECT release_lock(?)";
static final String
SQL_LOCK_INFO = "SELECT l.processlist_id,, p.time, p.state"
+ " FROM (SELECT is_used_lock(?) AS processlist_id) AS l"
+ " LEFT JOIN information_schema.processlist p"
+ " ON = l.processlist_id";
public boolean supports(Database database) {
return (database instanceof MySQLDatabase);
private String getChangeLogLockName() {
// MySQL 5.7 and later enforces a maximum length on lock names of 64 characters.
return (database.getDefaultSchemaName() + "."
+ database.getDatabaseChangeLogLockTableName()).toUpperCase(Locale.ROOT);
private static Integer getIntegerResult(PreparedStatement stmt) throws SQLException {
try (ResultSet rs = stmt.executeQuery()) {;
Number locked = (Number) rs.getObject(1);
return (locked == null) ? null : locked.intValue();
* @see <a href=""><code>GET_LOCK()</code></a>
protected boolean acquireLock(Connection con) throws SQLException, LockException {
try (PreparedStatement stmt = con.prepareStatement(SQL_GET_LOCK)) {
stmt.setString(1, getChangeLogLockName());
final int timeoutSeconds = 5;
stmt.setInt(2, timeoutSeconds);
Integer locked = getIntegerResult(stmt);
if (locked == null) {
throw new LockException("GET_LOCK() returned NULL");
} else if (locked == 0) {
return false;
} else if (locked != 1) {
throw new LockException("GET_LOCK() returned " + locked);
return true;
* @see <a href=""><code>RELEASE_LOCK()</code></a>
protected void releaseLock(Connection con) throws SQLException, LockException {
try (PreparedStatement stmt = con.prepareStatement(SQL_RELEASE_LOCK)) {
stmt.setString(1, getChangeLogLockName());
Integer unlocked = getIntegerResult(stmt);
if (!Integer.valueOf(1).equals(unlocked)) {
throw new LockException("RELEASE_LOCK() returned "
+ String.valueOf(unlocked).toUpperCase(Locale.ROOT));
* Obtains information about the database changelog lock.
* @see <a href=""><code>IS_USED_LOCK()</code></a>
* @see "<a href=''>The INFORMATION_SCHEMA PROCESSLIST Table</a> (MySQL Reference Manual)"
* @see "<a href=''>The metadata_locks Table</a> (MySQL Reference Manual)"
* @see "<a href=''>The threads Table</a> (MySQL Reference Manual)"
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
try (PreparedStatement stmt = con.prepareStatement(SQL_LOCK_INFO)) {
stmt.setString(1, getChangeLogLockName());
try (ResultSet rs = stmt.executeQuery()) {
if (! || rs.getObject("PROCESSLIST_ID") == null) {
return null;
long timestamp = rs.getInt("TIME");
if (timestamp > 0) {
// This is not really the time the lock has been obtained but gives
// insight on how long the owning session is doing what it is doing.
timestamp = System.currentTimeMillis() - timestamp * 1000;
return new DatabaseChangeLogLock(1, new Date(timestamp), lockedBy(rs));
private static String lockedBy(ResultSet rs) throws SQLException {
String host = rs.getString("HOST");
if (host == null) {
return "connection_id#" + rs.getLong("PROCESSLIST_ID");
int colonIndex = host.lastIndexOf(':');
if (colonIndex > 0) {
host = host.substring(0, colonIndex);
return host + " (" + rs.getString("STATE") + ")";
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
package net.example.liquibase.lockservice.ext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.core.PostgresDatabase;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
public class MySQLLockServiceTest {
public ExpectedException thrown = ExpectedException.none();
private MySQLLockService lockService;
private Connection dbCon;
public void setUp() {
MySQLDatabase database = new MySQLDatabase();
database = spy(database);
doReturn(new JdbcConnection(dbCon)).when(database).getConnection();
lockService = new MySQLLockService();
public void supports() throws Exception {
assertEquals("supports", true, lockService.supports(new MySQLDatabase()));
public void supportsNot() throws Exception {
assertEquals("supports", false, lockService.supports(new PostgresDatabase()));
public void acquireSuccess() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(1);
assertEquals("acquireLock", true, lockService.acquireLock());
public void acquireUnsuccessful() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(0);
assertEquals("acquireLock", false, lockService.acquireLock());
public void acquireFailure() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = mock(ResultSet.class);
public void releaseSuccess() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(1);
public void releaseFailure() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(0);
public void usedLockInfo() throws Exception {
ResultSet infoResult = mock(ResultSet.class);
PreparedStatement stmt = mock(PreparedStatement.class);
DatabaseChangeLogLock[] lockList = lockService.listLocks();
assertEquals("Lock list length", 1, lockList.length);
assertNotNull("Null lock element", lockList[0]);
assertEquals("lockedBy", " (testing)", lockList[0].getLockedBy());
// Single row / single column
private static ResultSet intResult(int value) throws SQLException {
ResultSet rs = mock(ResultSet.class);
return rs;
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import liquibase.database.Database;
import liquibase.database.core.PostgresDatabase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.logging.LogService;
* Employs PostgreSQL <i>advisory locks</i>:
* <blockquote>
* <p>While a flag stored in a table could be used for the same purpose,
* advisory locks are faster, avoid table bloat, and are automatically
* cleaned up by the server at the end of the session.</p>
* <p>
* There are two ways to acquire an advisory lock in PostgreSQL: at session
* level or at transaction level. Once acquired at session level, an advisory
* lock is held until explicitly released or the session ends. Unlike standard
* lock requests, session-level advisory lock requests do not honor transaction
* semantics: a lock acquired during a transaction that is later rolled back
* will still be held following the rollback, and likewise an unlock is
* effective even if the calling transaction fails later.</p>
* </blockquote>
* @see "<a href=''>Advisory
* Locks</a> (PostgreSQL Documentation)"
public class PGLockService extends SessionLockService {
// See also:
// "Use postgres advisory lock as changelog lock service"
public boolean supports(Database database) {
return (database instanceof PostgresDatabase) && isAtLeastPostgres91(database);
private static boolean isAtLeastPostgres91(Database database) {
try {
return (database.getDatabaseMajorVersion() > 9)
|| (database.getDatabaseMajorVersion() == 9
&& database.getDatabaseMinorVersion() >= 1);
} catch (DatabaseException e) {
.warning("Problem querying database version", e);
return false;
private long getChangeLogLockId() {
long high = database.getDefaultSchemaName().hashCode();
high <<= Integer.SIZE;
long low = database.getDatabaseChangeLogLockTableName().hashCode();
low &= 0x00000000_FFFFFFFFL;
return high | low;
private static Boolean getBooleanResult(PreparedStatement stmt) throws SQLException {
try (ResultSet rs = stmt.executeQuery()) {;
return (Boolean) rs.getObject(1);
* @see "<a href=''><code>pg_try_advisory_lock(key bigint)</code></a>
* (Advisory Lock Functions)"
protected boolean acquireLock(Connection con) throws SQLException, LockException {
final String sql = "SELECT pg_try_advisory_lock(?)";
try (PreparedStatement stmt = con.prepareStatement(sql)) {
stmt.setLong(1, getChangeLogLockId());
return Boolean.TRUE.equals(getBooleanResult(stmt));
* @see "<a href=''><code>pg_advisory_unlock(key bigint)</code></a>
* (Advisory Lock Functions)"
protected void releaseLock(Connection con) throws SQLException, LockException {
final String sql = "SELECT pg_advisory_unlock(?)";
try (PreparedStatement stmt = con.prepareStatement(sql)) {
stmt.setLong(1, getChangeLogLockId());
Boolean unlocked = getBooleanResult(stmt);
if (!Boolean.TRUE.equals(unlocked)) {
throw new LockException("pg_advisory_unlock() returned " + unlocked);
* Obtains information about the database changelog lock.
* @see "<a href=''><code>pg_locks</code></a>
* (PostgreSQL Documentation)"
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
// TODO: Provide meaningful implementation.
return null;
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.SQLException;
import liquibase.database.Database;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogService;
* Abstract base for {@code LockService} implementations that support <i>session-level</i>
* (vs. <i>transaction-level</i>) locking capabilities. Session-level locks get
* automatically released if the database connection drops, and overcome the shortcoming of the
* <a href="">{@code StandardLockService}</a>:
* <blockquote>
* <p>If Liquibase does not exit cleanly, the lock row may be left as locked.
* You can clear out the current lock by running <code>liquibase releaseLocks</code>
* </blockquote>
* <p>Running <code>liquibase releaseLocks</code> in a micro-service production
* environment is not really feasible.</p>
* <p>
* Subclasses need to implement {@link #supports(Database)}.</p>
public abstract class SessionLockService extends StandardLockService {
* This implementation returns {@code super.getPriority() + 1}.
public int getPriority() {
return super.getPriority() + 1;
* This implementation returns {@code false}.
public boolean supports(Database database) {
return false;
* This implementation is a <i>no-op</i>. Suppresses creating
* the {@code DATABASECHANGELOGLOCK} table by the {@code StandardLockService}
* implementation.
public void init() throws DatabaseException {
// no-op
private Connection getConnection() {
return ((JdbcConnection) database.getConnection()).getUnderlyingConnection();
public boolean acquireLock() throws LockException {
if (hasChangeLogLock) {
return true;
try {
if (acquireLock(getConnection())) {
hasChangeLogLock = true;
.info("Successfully acquired change log lock");
return true;
return false;
} catch (SQLException e) {
throw new LockException(e);
* Attempts to acquire lock for the associated {@link #database} (schema) using
* the given connection.
* @param con the connection identifying and used by the database session.
* @return {@code true} if lock successfully obtained, or
* {@code false} if lock is held by another session.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #acquireLock()
protected abstract boolean acquireLock(Connection con)
throws SQLException, LockException;
public void releaseLock() throws LockException {
try {
.info("Successfully released change log lock");
} catch (SQLException e) {
throw new LockException(e);
} finally {
hasChangeLogLock = false;
* Releases the lock previously obtained by {@code acquireLock()}.
* @param con the connection identifying and used by the database session.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #releaseLock()
* @see #acquireLock(Connection)
protected abstract void releaseLock(Connection con)
throws SQLException, LockException;
public DatabaseChangeLogLock[] listLocks() throws LockException {
try {
DatabaseChangeLogLock usedLock = usedLock(getConnection());
return (usedLock == null) ? new DatabaseChangeLogLock[0]
: new DatabaseChangeLogLock[] { usedLock };
} catch (SQLException e) {
throw new LockException(e);
* This implementation returns {@code null}.
* @param con the connection identifying and used by the database session.
* @return Information about the database changelog lock, or {@code null}.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #listLocks()
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
return null;
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
package net.example.liquibase.lockservice.ext;
import static liquibase.servicelocator.PrioritizedService.PRIORITY_DEFAULT;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import liquibase.database.Database;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
public class SessionLockServiceTest {
public ExpectedException thrown = ExpectedException.none();
private SessionLockService lockService;
public void setUp() {
Database database = mock(Database.class);
.thenReturn(new JdbcConnection(mock(Connection.class)));
MockService service = new MockService();
lockService = service;
public void priorityAboveDefault() {
assertThat("Service priority",
lockService.getPriority(), greaterThan(PRIORITY_DEFAULT));
assertThat("Service priority", lockService.getPriority(),
greaterThan(new StandardLockService().getPriority()));
public void defaultSupports() throws Exception {
assertEquals("Supports database", false, lockService.supports(new MySQLDatabase()));
public void acquireReleaseState() throws Exception {
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("hasChangeLogLock", true, lockService.hasChangeLogLock());
assertEquals("hasChangeLogLock", false, lockService.hasChangeLogLock());
public void acquireBeforeRelease() throws Exception {
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("hasChangeLogLock", true, lockService.hasChangeLogLock());
public void acquireFalse() throws Exception {
((MockService) lockService).acquireResult = false;
assertEquals("acquireLock", false, lockService.acquireLock());
assertEquals("hasChangeLogLock", false, lockService.hasChangeLogLock());
public void listLocks() throws Exception {
((MockService) lockService).usedLockResult = new DatabaseChangeLogLock(2, new Date(1), "foo");
DatabaseChangeLogLock[] locks = lockService.listLocks();
assertEquals("Lock list length", 1, locks.length);
assertNotNull("Null lock element", locks[0]);
assertEquals("lockedBy", "foo", locks[0].getLockedBy());
public void listNoLocks() throws Exception {
DatabaseChangeLogLock[] locks = lockService.listLocks();
assertEquals("Lock list length", 0, locks.length);
public void acquireShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("bar");
public void releaseShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("baz");
public void listLocksShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("qux");
static class MockService extends SessionLockService {
boolean acquireResult = true;
DatabaseChangeLogLock usedLockResult;
protected boolean acquireLock(Connection con) throws SQLException, LockException {
return acquireResult;
protected void releaseLock(Connection con) throws SQLException, LockException {
// no-op
protected DatabaseChangeLogLock usedLock(Connection con) throws SQLException, LockException {
return usedLockResult;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment