Skip to content

Instantly share code, notes, and snippets.

@penguincoder
Last active August 29, 2015 14:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save penguincoder/976515f3281157c75a2a to your computer and use it in GitHub Desktop.
Save penguincoder/976515f3281157c75a2a to your computer and use it in GitHub Desktop.
A Redis-backed mutex that uses Lua transactions
require 'digest/md5'
require 'redis'
class RedisMutex
MutexTimeout = Class.new(StandardError)
LUA_ACQUIRE = "return redis.call('SET', KEYS[1], ARGV[2], 'NX', 'EX', ARGV[1]) and redis.call('expire', KEYS[1], ARGV[1]) and 1 or 0"
LUA_RELEASE = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"
##
# Makes a new mutex. Takes a variety of options:
#
# * verbose Default: false Prints debugging statements
# * timeout Default: 300 Time, in seconds, to wait until lock is considered stale
# * fail_on_timeout Default: false Raise an exception if mutex lock is not acquired in the requested time
# * redis default: Redis.new Redis connection to use
#
def initialize(obj, opts = {})
@verbose = opts[:verbose]
@timeout = opts[:timeout] || 300
@fail_on_timeout = opts[:fail_on_timeout]
@wait_tick = @timeout.to_f / 100.0
@redis = opts[:redis] || Redis.new
@lock_name = "lock:#{obj.class.name}:#{obj.id}"
@token = Digest::MD5.new.hexdigest("#{@lock_name}_#{Time.zone.now.to_f}")
end
def acquire_mutex
puts("Running transaction to acquire the lock #{@lock_name}") if @verbose
@redis.eval(LUA_ACQUIRE, [ @lock_name ], [ @timeout, @token ]) == 1
end
def destroy_mutex
puts("Destroying the lock #{@lock_name}") if @verbose
@redis.del(@lock_name)
end
def release_mutex
puts("Releasing the lock #{@lock_name} if it still holds the value '#{@token}'") if @verbose
@redis.eval(LUA_RELEASE, [ @lock_name ], [ @token ])
end
##
# Runs a block of code inside a Redis-backed mutex. Here is the basic
# algorithm:
#
# * Set value in Redis if it does not exist
# * If it exists, wait up to 100 times until :timeout has been met
# * Once waiting for lock has finished, either:
# ** Pessimistically throw an exception if not met
# ** Overwrite existing token with new value and assume ownership
# * Run block of code
# * Release lock if it contains the same value that it was set to
#
# This has an interesting side-effect. If you code runs longer than the
# timeout, other code will take over your lock and start execution. It can
# be configured to fail in such conditions, if desired. It will not
# overwrite another owner's lock and cause undesired race conditions if
# left to run too long.
#
def lock(&block)
begin
wait_for_mutex
yield block
ensure
release_mutex
end
end
def overwrite_mutex
puts("Replacing the lock #{@lock_name} with #{@token}") if @verbose
@redis.set(@lock_name, @token)
end
def recurse_until_ready(depth = 1)
return false if depth == 100
wait_a_tick if depth > 1
acquire_mutex || recurse_until_ready(depth + 1)
end
def wait_a_tick
puts("Sleeping #{@wait_tick} for the lock #{@lock_name} to become available") if @verbose
sleep(@wait_tick)
end
def wait_for_mutex
if recurse_until_ready
puts("Acquired lock #{@lock_name}") if @verbose
else
puts("Failed to acquire the lock") if @verbose
raise MutexTimeout.new("Failed to acquire the lock") if @fail_on_timeout
overwrite_mutex
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment