Skip to content

Instantly share code, notes, and snippets.

@ryanlecompte
Created August 23, 2012 03:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ryanlecompte/aae42d31922f07bc5912 to your computer and use it in GitHub Desktop.
Save ryanlecompte/aae42d31922f07bc5912 to your computer and use it in GitHub Desktop.
barrier
module RedisFailover
# Barrier is a ZK group-based barrier primitive that can be used to pause execution
# until members present in an initial group have migrated to a new group.
class Barrier
include Util
def initialize(zk, root, name, timeout)
@zk = zk
@root = root
@name = name
@timeout = timeout
@lock = Mutex.new
create_initial_paths
end
def on_notify(&callback)
return if @notify_callback
@notify_callback = callback
join_start_group
@zk.watcher.register(notify_path) { |event| handle_notify }
@zk.stat(notify_path, :watch => true)
end
def notify
logger.info('Notifying clients to enter barrier')
@start_members = start_group.member_names
@zk.create(notify_path, encode(@start_members), :ephemeral => true)
end
def enter
return unless @end_group_membership.nil?
logger.info('Entering barrier')
join_end_group
deadline = Time.now + @timeout
wait_until(deadline) do
@lock.synchronize do
end_group.member_names.size >= @start_members.size
end
end
end
def leave
return unless @end_group_membership
logger.info('Leaving barrier')
leave_end_group
deadline = Time.now + @timeout
wait_until(deadline) do
@lock.synchronize do
end_group.member_names.size <= 0
end
end
end
def done
start_group.close
end_group.close
@zk.delete(notify_path)
end
private
def join_start_group
@start_group_membership = start_group.join
end
def leave_start_group
@start_group_membership.leave
@start_group_membership = nil
end
def join_end_group
@end_group_membership = end_group.join
end_group.on_membership_change do |old_members, current_members|
@lock.synchronize do
@start_members -= diff(@start_members, current_members)
end
end
end
def leave_end_group
@end_group_membership.leave
@end_group_membership = nil
end
def notify_path
File.join(@root, 'barrier', @name, 'notify')
end
def start_group_name
"#{@name}_start_group"
end
def end_group_name
"#{@name}_end_group"
end
def start_group
return @start_group if @start_group
@start_group = ZK::Group.new(@zk, start_group_name, :root => @root)
@start_group.create
@start_group
end
def end_group
return @end_group if @end_group
@end_group = ZK::Group.new(@zk, end_group_name, :root => @root)
@end_group.create
@end_group
end
def handle_notify
return unless @start_members.nil?
@start_members = decode(@zk.get(notify_path).first)
logger.info("Received notification to enter barrier with start_members: #{@start_members}")
enter
@notify_callback.call(self)
end
# Waits until the deadline or the condition evaluates to true.
#
# @param [Time] deadline the deadline time
# @param [Proc] condition the condition at which to stop sleeping
# @return [Boolean] true if condition met before deadline, false otherwise
def wait_until(deadline, &condition)
remaining = deadline - Time.now
while remaining > 0
return true if condition.call
sleep([remaining, 1].min)
remaining = deadline - Time.now
end
# timeout exceeded
false
end
def create_initial_paths
@zk.create(@root, :ignore => :node_exists)
@zk.create(File.join(@root, 'barrier'), :ignore => :node_exists)
@zk.create(File.join(@root, 'barrier', @name), :ignore => :node_exists)
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment