Last active
August 29, 2015 14:11
-
-
Save penguincoder/976515f3281157c75a2a to your computer and use it in GitHub Desktop.
A Redis-backed mutex that uses Lua transactions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'digest/md5' | |
require 'redis' | |
class RedisMutex | |
MutexTimeout = Class.new(StandardError) | |
LUA_ACQUIRE = "return redis.call('SET', KEYS[1], ARGV[2], 'NX', 'EX', ARGV[1]) and redis.call('expire', KEYS[1], ARGV[1]) and 1 or 0" | |
LUA_RELEASE = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end" | |
## | |
# Makes a new mutex. Takes a variety of options: | |
# | |
# * verbose Default: false Prints debugging statements | |
# * timeout Default: 300 Time, in seconds, to wait until lock is considered stale | |
# * fail_on_timeout Default: false Raise an exception if mutex lock is not acquired in the requested time | |
# * redis default: Redis.new Redis connection to use | |
# | |
def initialize(obj, opts = {}) | |
@verbose = opts[:verbose] | |
@timeout = opts[:timeout] || 300 | |
@fail_on_timeout = opts[:fail_on_timeout] | |
@wait_tick = @timeout.to_f / 100.0 | |
@redis = opts[:redis] || Redis.new | |
@lock_name = "lock:#{obj.class.name}:#{obj.id}" | |
@token = Digest::MD5.new.hexdigest("#{@lock_name}_#{Time.zone.now.to_f}") | |
end | |
def acquire_mutex | |
puts("Running transaction to acquire the lock #{@lock_name}") if @verbose | |
@redis.eval(LUA_ACQUIRE, [ @lock_name ], [ @timeout, @token ]) == 1 | |
end | |
def destroy_mutex | |
puts("Destroying the lock #{@lock_name}") if @verbose | |
@redis.del(@lock_name) | |
end | |
def release_mutex | |
puts("Releasing the lock #{@lock_name} if it still holds the value '#{@token}'") if @verbose | |
@redis.eval(LUA_RELEASE, [ @lock_name ], [ @token ]) | |
end | |
## | |
# Runs a block of code inside a Redis-backed mutex. Here is the basic | |
# algorithm: | |
# | |
# * Set value in Redis if it does not exist | |
# * If it exists, wait up to 100 times until :timeout has been met | |
# * Once waiting for lock has finished, either: | |
# ** Pessimistically throw an exception if not met | |
# ** Overwrite existing token with new value and assume ownership | |
# * Run block of code | |
# * Release lock if it contains the same value that it was set to | |
# | |
# This has an interesting side-effect. If you code runs longer than the | |
# timeout, other code will take over your lock and start execution. It can | |
# be configured to fail in such conditions, if desired. It will not | |
# overwrite another owner's lock and cause undesired race conditions if | |
# left to run too long. | |
# | |
def lock(&block) | |
begin | |
wait_for_mutex | |
yield block | |
ensure | |
release_mutex | |
end | |
end | |
def overwrite_mutex | |
puts("Replacing the lock #{@lock_name} with #{@token}") if @verbose | |
@redis.set(@lock_name, @token) | |
end | |
def recurse_until_ready(depth = 1) | |
return false if depth == 100 | |
wait_a_tick if depth > 1 | |
acquire_mutex || recurse_until_ready(depth + 1) | |
end | |
def wait_a_tick | |
puts("Sleeping #{@wait_tick} for the lock #{@lock_name} to become available") if @verbose | |
sleep(@wait_tick) | |
end | |
def wait_for_mutex | |
if recurse_until_ready | |
puts("Acquired lock #{@lock_name}") if @verbose | |
else | |
puts("Failed to acquire the lock") if @verbose | |
raise MutexTimeout.new("Failed to acquire the lock") if @fail_on_timeout | |
overwrite_mutex | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment