Skip to content

Instantly share code, notes, and snippets.

@stanio
Last active March 25, 2022 20:19
Show Gist options
  • Save stanio/b7843cc18a80b42812ae128aba212245 to your computer and use it in GitHub Desktop.
Save stanio/b7843cc18a80b42812ae128aba212245 to your computer and use it in GitHub Desktop.
Liquibase LockService using database session-level locks (moved to https://github.com/blagerweij/liquibase-sessionlock)
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.Locale;
import liquibase.database.Database;
import liquibase.database.core.MySQLDatabase;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
/**
* Employs MySQL user-level (a.k.a. application-level or advisory) locks:
* <blockquote>
* <p>A lock obtained with <code>GET_LOCK()</code> is released explicitly by
* executing <code>RELEASE_LOCK()</code> or implicitly when your session
* terminates (either normally or abnormally). Locks obtained with
* <code>GET_LOCK()</code> are not released when transactions commit or
* roll back.</p>
* </blockquote>
*
* @see "<a href='https://dev.mysql.com/doc/refman/5.7/en/locking-functions.html'>Locking
* Functions</a> (MySQL 5.7 Reference Manual)"
* @see "<a href='https://dev.mysql.com/doc/refman/8.0/en/locking-functions.html'>Locking
* Functions</a> (MySQL 8.0 Reference Manual)"
*/
public class MySQLLockService extends SessionLockService {
static final String SQL_GET_LOCK = "SELECT get_lock(?, ?)";
static final String SQL_RELEASE_LOCK = "SELECT release_lock(?)";
static final String
SQL_LOCK_INFO = "SELECT l.processlist_id, p.host, p.time, p.state"
+ " FROM (SELECT is_used_lock(?) AS processlist_id) AS l"
+ " LEFT JOIN information_schema.processlist p"
+ " ON p.id = l.processlist_id";
@Override
public boolean supports(Database database) {
return (database instanceof MySQLDatabase);
}
private String getChangeLogLockName() {
// MySQL 5.7 and later enforces a maximum length on lock names of 64 characters.
return (database.getDefaultSchemaName() + "."
+ database.getDatabaseChangeLogLockTableName()).toUpperCase(Locale.ROOT);
}
private static Integer getIntegerResult(PreparedStatement stmt) throws SQLException {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
Number locked = (Number) rs.getObject(1);
return (locked == null) ? null : locked.intValue();
}
}
/**
* @see <a href="https://dev.mysql.com/doc/refman/5.7/en/locking-functions.html#function_get-lock"><code>GET_LOCK()</code></a>
*/
@Override
protected boolean acquireLock(Connection con) throws SQLException, LockException {
try (PreparedStatement stmt = con.prepareStatement(SQL_GET_LOCK)) {
stmt.setString(1, getChangeLogLockName());
final int timeoutSeconds = 5;
stmt.setInt(2, timeoutSeconds);
Integer locked = getIntegerResult(stmt);
if (locked == null) {
throw new LockException("GET_LOCK() returned NULL");
} else if (locked == 0) {
return false;
} else if (locked != 1) {
throw new LockException("GET_LOCK() returned " + locked);
}
return true;
}
}
/**
* @see <a href="https://dev.mysql.com/doc/refman/5.7/en/locking-functions.html#function_release-lock"><code>RELEASE_LOCK()</code></a>
*/
@Override
protected void releaseLock(Connection con) throws SQLException, LockException {
try (PreparedStatement stmt = con.prepareStatement(SQL_RELEASE_LOCK)) {
stmt.setString(1, getChangeLogLockName());
Integer unlocked = getIntegerResult(stmt);
if (!Integer.valueOf(1).equals(unlocked)) {
throw new LockException("RELEASE_LOCK() returned "
+ String.valueOf(unlocked).toUpperCase(Locale.ROOT));
}
}
}
/**
* Obtains information about the database changelog lock.
*
* @see <a href="https://dev.mysql.com/doc/refman/5.7/en/locking-functions.html#function_is-used-lock"><code>IS_USED_LOCK()</code></a>
* @see "<a href='https://dev.mysql.com/doc/refman/5.7/en/processlist-table.html'>The INFORMATION_SCHEMA PROCESSLIST Table</a> (MySQL Reference Manual)"
* @see "<a href='https://dev.mysql.com/doc/refman/5.7/en/metadata-locks-table.html'>The metadata_locks Table</a> (MySQL Reference Manual)"
* @see "<a href='https://dev.mysql.com/doc/refman/5.7/en/threads-table.html'>The threads Table</a> (MySQL Reference Manual)"
*/
@Override
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
{
try (PreparedStatement stmt = con.prepareStatement(SQL_LOCK_INFO)) {
stmt.setString(1, getChangeLogLockName());
try (ResultSet rs = stmt.executeQuery()) {
if (!rs.next() || rs.getObject("PROCESSLIST_ID") == null) {
return null;
}
long timestamp = rs.getInt("TIME");
if (timestamp > 0) {
// This is not really the time the lock has been obtained but gives
// insight on how long the owning session is doing what it is doing.
timestamp = System.currentTimeMillis() - timestamp * 1000;
}
return new DatabaseChangeLogLock(1, new Date(timestamp), lockedBy(rs));
}
}
}
private static String lockedBy(ResultSet rs) throws SQLException {
String host = rs.getString("HOST");
if (host == null) {
return "connection_id#" + rs.getLong("PROCESSLIST_ID");
}
int colonIndex = host.lastIndexOf(':');
if (colonIndex > 0) {
host = host.substring(0, colonIndex);
}
return host + " (" + rs.getString("STATE") + ")";
}
}
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.liquibase.lockservice.ext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.core.PostgresDatabase;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
public class MySQLLockServiceTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
private MySQLLockService lockService;
@Mock
private Connection dbCon;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
MySQLDatabase database = new MySQLDatabase();
database.setDefaultCatalogName("test_schema");
database = spy(database);
doReturn(new JdbcConnection(dbCon)).when(database).getConnection();
lockService = new MySQLLockService();
lockService.setDatabase(database);
}
@Test
public void supports() throws Exception {
assertEquals("supports", true, lockService.supports(new MySQLDatabase()));
}
@Test
public void supportsNot() throws Exception {
assertEquals("supports", false, lockService.supports(new PostgresDatabase()));
}
@Test
public void acquireSuccess() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(1);
when(stmt.executeQuery()).thenReturn(rs);
when(dbCon.prepareStatement(MySQLLockService.SQL_GET_LOCK)).thenReturn(stmt);
assertEquals("acquireLock", true, lockService.acquireLock());
verify(stmt).setString(1, "TEST_SCHEMA.DATABASECHANGELOGLOCK");
}
@Test
public void acquireUnsuccessful() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(0);
when(stmt.executeQuery()).thenReturn(rs);
when(dbCon.prepareStatement(MySQLLockService.SQL_GET_LOCK)).thenReturn(stmt);
assertEquals("acquireLock", false, lockService.acquireLock());
}
@Test
public void acquireFailure() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = mock(ResultSet.class);
when(stmt.executeQuery()).thenReturn(rs);
when(dbCon.prepareStatement(MySQLLockService.SQL_GET_LOCK)).thenReturn(stmt);
thrown.expect(LockException.class);
lockService.acquireLock();
}
@Test
public void releaseSuccess() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(1);
when(stmt.executeQuery()).thenReturn(rs);
when(dbCon.prepareStatement(MySQLLockService.SQL_RELEASE_LOCK)).thenReturn(stmt);
lockService.releaseLock();
verify(stmt).setString(1, "TEST_SCHEMA.DATABASECHANGELOGLOCK");
}
@Test
public void releaseFailure() throws Exception {
PreparedStatement stmt = mock(PreparedStatement.class);
ResultSet rs = intResult(0);
when(stmt.executeQuery()).thenReturn(rs);
when(dbCon.prepareStatement(MySQLLockService.SQL_RELEASE_LOCK)).thenReturn(stmt);
thrown.expect(LockException.class);
lockService.releaseLock();
}
@Test
public void usedLockInfo() throws Exception {
ResultSet infoResult = mock(ResultSet.class);
when(infoResult.next()).thenReturn(true).thenReturn(false);
when(infoResult.getObject("PROCESSLIST_ID")).thenReturn(123);
when(infoResult.getString("HOST")).thenReturn("192.168.254.254:12345");
when(infoResult.getString("STATE")).thenReturn("testing");
when(infoResult.getInt("TIME")).thenReturn(15);
PreparedStatement stmt = mock(PreparedStatement.class);
when(stmt.executeQuery()).thenReturn(infoResult);
when(dbCon.prepareStatement(MySQLLockService.SQL_LOCK_INFO)).thenReturn(stmt);
DatabaseChangeLogLock[] lockList = lockService.listLocks();
verify(stmt).setString(1, "TEST_SCHEMA.DATABASECHANGELOGLOCK");
assertEquals("Lock list length", 1, lockList.length);
assertNotNull("Null lock element", lockList[0]);
assertEquals("lockedBy", "192.168.254.254 (testing)", lockList[0].getLockedBy());
}
// Single row / single column
private static ResultSet intResult(int value) throws SQLException {
ResultSet rs = mock(ResultSet.class);
when(rs.next()).thenReturn(true).thenReturn(false);
when(rs.getObject(1)).thenReturn(value);
return rs;
}
}
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import liquibase.database.Database;
import liquibase.database.core.PostgresDatabase;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.logging.LogService;
/**
* Employs PostgreSQL <i>advisory locks</i>:
* <blockquote>
* <p>While a flag stored in a table could be used for the same purpose,
* advisory locks are faster, avoid table bloat, and are automatically
* cleaned up by the server at the end of the session.</p>
* <p>
* There are two ways to acquire an advisory lock in PostgreSQL: at session
* level or at transaction level. Once acquired at session level, an advisory
* lock is held until explicitly released or the session ends. Unlike standard
* lock requests, session-level advisory lock requests do not honor transaction
* semantics: a lock acquired during a transaction that is later rolled back
* will still be held following the rollback, and likewise an unlock is
* effective even if the calling transaction fails later.</p>
* </blockquote>
*
* @see "<a href='https://www.postgresql.org/docs/9.6/explicit-locking.html#ADVISORY-LOCKS'>Advisory
* Locks</a> (PostgreSQL Documentation)"
*/
public class PGLockService extends SessionLockService {
// See also:
// https://github.com/liquibase/liquibase/pull/872
// "Use postgres advisory lock as changelog lock service"
@Override
public boolean supports(Database database) {
return (database instanceof PostgresDatabase) && isAtLeastPostgres91(database);
}
private static boolean isAtLeastPostgres91(Database database) {
try {
return (database.getDatabaseMajorVersion() > 9)
|| (database.getDatabaseMajorVersion() == 9
&& database.getDatabaseMinorVersion() >= 1);
} catch (DatabaseException e) {
LogService.getLog(PGLockService.class)
.warning("Problem querying database version", e);
return false;
}
}
private long getChangeLogLockId() {
long high = database.getDefaultSchemaName().hashCode();
high <<= Integer.SIZE;
long low = database.getDatabaseChangeLogLockTableName().hashCode();
low &= 0x00000000_FFFFFFFFL;
return high | low;
}
private static Boolean getBooleanResult(PreparedStatement stmt) throws SQLException {
try (ResultSet rs = stmt.executeQuery()) {
rs.next();
return (Boolean) rs.getObject(1);
}
}
/**
* @see "<a href='https://www.postgresql.org/docs/9.6/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS'><code>pg_try_advisory_lock(key bigint)</code></a>
* (Advisory Lock Functions)"
*/
@Override
protected boolean acquireLock(Connection con) throws SQLException, LockException {
final String sql = "SELECT pg_try_advisory_lock(?)";
try (PreparedStatement stmt = con.prepareStatement(sql)) {
stmt.setLong(1, getChangeLogLockId());
return Boolean.TRUE.equals(getBooleanResult(stmt));
}
}
/**
* @see "<a href='https://www.postgresql.org/docs/9.6/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS'><code>pg_advisory_unlock(key bigint)</code></a>
* (Advisory Lock Functions)"
*/
@Override
protected void releaseLock(Connection con) throws SQLException, LockException {
final String sql = "SELECT pg_advisory_unlock(?)";
try (PreparedStatement stmt = con.prepareStatement(sql)) {
stmt.setLong(1, getChangeLogLockId());
Boolean unlocked = getBooleanResult(stmt);
if (!Boolean.TRUE.equals(unlocked)) {
throw new LockException("pg_advisory_unlock() returned " + unlocked);
}
}
}
/**
* Obtains information about the database changelog lock.
*
* @see "<a href='https://www.postgresql.org/docs/9.6/view-pg-locks.html'><code>pg_locks</code></a>
* (PostgreSQL Documentation)"
*/
@Override
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
{
// TODO: Provide meaningful implementation.
return null;
}
}
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.liquibase.lockservice.ext;
import java.sql.Connection;
import java.sql.SQLException;
import liquibase.database.Database;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.DatabaseException;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
import liquibase.logging.LogService;
/**
* Abstract base for {@code LockService} implementations that support <i>session-level</i>
* (vs. <i>transaction-level</i>) locking capabilities. Session-level locks get
* automatically released if the database connection drops, and overcome the shortcoming of the
* <a href="https://www.liquibase.org/documentation/databasechangeloglock_table.html">{@code StandardLockService}</a>:
* <blockquote>
* <p>If Liquibase does not exit cleanly, the lock row may be left as locked.
* You can clear out the current lock by running <code>liquibase releaseLocks</code>
* which runs <code>UPDATE DATABASECHANGELOGLOCK SET LOCKED=0</code></p>
* </blockquote>
* <p>Running <code>liquibase releaseLocks</code> in a micro-service production
* environment is not really feasible.</p>
* <p>
* Subclasses need to implement {@link #supports(Database)}.</p>
*/
public abstract class SessionLockService extends StandardLockService {
/**
* This implementation returns {@code super.getPriority() + 1}.
*/
@Override
public int getPriority() {
return super.getPriority() + 1;
}
/**
* This implementation returns {@code false}.
*/
@Override
public boolean supports(Database database) {
return false;
}
/**
* This implementation is a <i>no-op</i>. Suppresses creating
* the {@code DATABASECHANGELOGLOCK} table by the {@code StandardLockService}
* implementation.
*/
@Override
public void init() throws DatabaseException {
// no-op
}
private Connection getConnection() {
return ((JdbcConnection) database.getConnection()).getUnderlyingConnection();
}
@Override
public boolean acquireLock() throws LockException {
if (hasChangeLogLock) {
return true;
}
try {
if (acquireLock(getConnection())) {
hasChangeLogLock = true;
LogService.getLog(getClass())
.info("Successfully acquired change log lock");
database.setCanCacheLiquibaseTableInfo(true);
return true;
}
return false;
} catch (SQLException e) {
throw new LockException(e);
}
}
/**
* Attempts to acquire lock for the associated {@link #database} (schema) using
* the given connection.
*
* @param con the connection identifying and used by the database session.
* @return {@code true} if lock successfully obtained, or
* {@code false} if lock is held by another session.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #acquireLock()
*/
protected abstract boolean acquireLock(Connection con)
throws SQLException, LockException;
@Override
public void releaseLock() throws LockException {
try {
releaseLock(getConnection());
LogService.getLog(getClass())
.info("Successfully released change log lock");
} catch (SQLException e) {
throw new LockException(e);
} finally {
hasChangeLogLock = false;
database.setCanCacheLiquibaseTableInfo(false);
}
}
/**
* Releases the lock previously obtained by {@code acquireLock()}.
*
* @param con the connection identifying and used by the database session.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #releaseLock()
* @see #acquireLock(Connection)
*/
protected abstract void releaseLock(Connection con)
throws SQLException, LockException;
@Override
public DatabaseChangeLogLock[] listLocks() throws LockException {
try {
DatabaseChangeLogLock usedLock = usedLock(getConnection());
return (usedLock == null) ? new DatabaseChangeLogLock[0]
: new DatabaseChangeLogLock[] { usedLock };
} catch (SQLException e) {
throw new LockException(e);
}
}
/**
* This implementation returns {@code null}.
*
* @param con the connection identifying and used by the database session.
* @return Information about the database changelog lock, or {@code null}.
* @throws SQLException if a database access error occurs;
* @throws LockException if other logical error happens, preventing the
* operation from completing normally.
* @see #listLocks()
*/
protected DatabaseChangeLogLock usedLock(Connection con)
throws SQLException, LockException
{
return null;
}
}
/*
* This module, both source code and documentation,
* is in the Public Domain, and comes with NO WARRANTY.
*/
package net.example.liquibase.lockservice.ext;
import static liquibase.servicelocator.PrioritizedService.PRIORITY_DEFAULT;
import static org.hamcrest.CoreMatchers.sameInstance;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import liquibase.database.Database;
import liquibase.database.core.MySQLDatabase;
import liquibase.database.jvm.JdbcConnection;
import liquibase.exception.LockException;
import liquibase.lockservice.DatabaseChangeLogLock;
import liquibase.lockservice.StandardLockService;
public class SessionLockServiceTest {
@Rule
public ExpectedException thrown = ExpectedException.none();
private SessionLockService lockService;
@Before
public void setUp() {
Database database = mock(Database.class);
when(database.getConnection())
.thenReturn(new JdbcConnection(mock(Connection.class)));
MockService service = new MockService();
service.setDatabase(database);
lockService = service;
}
@Test
public void priorityAboveDefault() {
assertThat("Service priority",
lockService.getPriority(), greaterThan(PRIORITY_DEFAULT));
assertThat("Service priority", lockService.getPriority(),
greaterThan(new StandardLockService().getPriority()));
}
@Test
public void defaultSupports() throws Exception {
assertEquals("Supports database", false, lockService.supports(new MySQLDatabase()));
}
@Test
public void acquireReleaseState() throws Exception {
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("hasChangeLogLock", true, lockService.hasChangeLogLock());
lockService.releaseLock();
assertEquals("hasChangeLogLock", false, lockService.hasChangeLogLock());
}
@Test
public void acquireBeforeRelease() throws Exception {
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("acquireLock", true, lockService.acquireLock());
assertEquals("hasChangeLogLock", true, lockService.hasChangeLogLock());
}
@Test
public void acquireFalse() throws Exception {
((MockService) lockService).acquireResult = false;
assertEquals("acquireLock", false, lockService.acquireLock());
assertEquals("hasChangeLogLock", false, lockService.hasChangeLogLock());
}
@Test
public void listLocks() throws Exception {
((MockService) lockService).usedLockResult = new DatabaseChangeLogLock(2, new Date(1), "foo");
DatabaseChangeLogLock[] locks = lockService.listLocks();
assertEquals("Lock list length", 1, locks.length);
assertNotNull("Null lock element", locks[0]);
assertEquals("lockedBy", "foo", locks[0].getLockedBy());
}
@Test
public void listNoLocks() throws Exception {
DatabaseChangeLogLock[] locks = lockService.listLocks();
assertEquals("Lock list length", 0, locks.length);
}
@Test
public void acquireShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("bar");
doThrow(cause).when(spyService).acquireLock(any(Connection.class));
thrown.expectCause(sameInstance(cause));
spyService.acquireLock();
}
@Test
public void releaseShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("baz");
doThrow(cause).when(spyService).releaseLock(any(Connection.class));
thrown.expectCause(sameInstance(cause));
spyService.releaseLock();
}
@Test
public void listLocksShouldPropagate() throws Exception {
SessionLockService spyService = spy(lockService);
SQLException cause = new SQLException("qux");
doThrow(cause).when(spyService).usedLock(any(Connection.class));
thrown.expectCause(sameInstance(cause));
spyService.listLocks();
}
static class MockService extends SessionLockService {
boolean acquireResult = true;
DatabaseChangeLogLock usedLockResult;
@Override
protected boolean acquireLock(Connection con) throws SQLException, LockException {
return acquireResult;
}
@Override
protected void releaseLock(Connection con) throws SQLException, LockException {
// no-op
}
@Override
protected DatabaseChangeLogLock usedLock(Connection con) throws SQLException, LockException {
return usedLockResult;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment