Skip to content

Instantly share code, notes, and snippets.

@wycleffsean
Created January 9, 2020 22:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wycleffsean/a459ceb924477b7c2c45694e9c677efe to your computer and use it in GitHub Desktop.
Save wycleffsean/a459ceb924477b7c2c45694e9c677efe to your computer and use it in GitHub Desktop.
ActiveRecord Advisory Lock
module AdvisoryLock
class LockUnavailableError < StandardError; end
def with_advisory_lock(name = nil, timeout, &block)
raise LocalJumpError, 'no block given (yield)' unless block_given?
name ||= lock_name
args = {
name: ActiveRecord::Base.sanitize(name),
timeout: ActiveRecord::Base.sanitize(timeout),
}
res = connection.select_value("SELECT get_lock(%<name>s, %<timeout>d);" % args)
lock_acquired = !(res.nil? || res.zero?)
unless lock_acquired
raise LockUnavailableError, "timed out waiting for lock: #{name}"
end
transaction &block
ensure
connection.select_value("SELECT release_lock(%<name>s);" % args) if lock_acquired
end
def lock_name(suffix = nil)
[self.class.name.parameterize, id, suffix].compact.join('/')
end
end
require 'rails_helper'
User.include AdvisoryLock
RSpec.describe AdvisoryLock do
subject { User.new.tap{|u| u.id = 1 } }
let(:connection) { ActiveRecord::Base.connection }
describe '#with_advisory_lock' do
it 'raises without block' do
expect { subject.with_advisory_lock(0) }.to raise_error(LocalJumpError)
end
it 'acquires lock for duration of block' do
subject.with_advisory_lock(0) do
res = connection
.select_value("SELECT is_free_lock('#{subject.lock_name}');")
expect(res).to eq 0
end
res = connection
.select_value("SELECT is_free_lock('#{subject.lock_name}');")
expect(res).to eq 1
end
it 'releases lock in the event of an error' do
subject.with_advisory_lock(0) { raise} rescue nil
res = connection
.select_value("SELECT is_free_lock('#{subject.lock_name}');")
expect(res).to eq 1
end
it 'raises unless lock is acquired within timeout' do
begin
args = {
name: ActiveRecord::Base.sanitize(subject.lock_name),
timeout: ActiveRecord::Base.sanitize(0),
}
new_conn = ActiveRecord::Base.connection_pool.checkout
new_conn.select_value("SELECT get_lock(%<name>s, %<timeout>d);" % args)
expect { subject.with_advisory_lock(0) { } }
.to raise_error(AdvisoryLock::LockUnavailableError)
ensure
new_conn.select_value("SELECT release_lock(%<name>s);" % args)
ActiveRecord::Base.connection_pool.checkin(new_conn)
end
end
end
describe '#lock_name' do
context 'with suffix' do
it { expect(subject.lock_name('ex')).to eq 'user/1/ex' }
end
context 'without prefix' do
it { expect(subject.lock_name).to eq 'user/1' }
end
end
end
ActiveRecord::Base.include(AdvisoryLock)
thing = Thing.find(1)
thing.with_advisory_lock('some_process') do
# ...
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment