-
-
Save ryanlecompte/aae42d31922f07bc5912 to your computer and use it in GitHub Desktop.
barrier
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module RedisFailover | |
# Barrier is a ZK group-based barrier primitive that can be used to pause execution | |
# until members present in an initial group have migrated to a new group. | |
class Barrier | |
include Util | |
def initialize(zk, root, name, timeout) | |
@zk = zk | |
@root = root | |
@name = name | |
@timeout = timeout | |
@lock = Mutex.new | |
create_initial_paths | |
end | |
def on_notify(&callback) | |
return if @notify_callback | |
@notify_callback = callback | |
join_start_group | |
@zk.watcher.register(notify_path) { |event| handle_notify } | |
@zk.stat(notify_path, :watch => true) | |
end | |
def notify | |
logger.info('Notifying clients to enter barrier') | |
@start_members = start_group.member_names | |
@zk.create(notify_path, encode(@start_members), :ephemeral => true) | |
end | |
def enter | |
return unless @end_group_membership.nil? | |
logger.info('Entering barrier') | |
join_end_group | |
deadline = Time.now + @timeout | |
wait_until(deadline) do | |
@lock.synchronize do | |
end_group.member_names.size >= @start_members.size | |
end | |
end | |
end | |
def leave | |
return unless @end_group_membership | |
logger.info('Leaving barrier') | |
leave_end_group | |
deadline = Time.now + @timeout | |
wait_until(deadline) do | |
@lock.synchronize do | |
end_group.member_names.size <= 0 | |
end | |
end | |
end | |
def done | |
start_group.close | |
end_group.close | |
@zk.delete(notify_path) | |
end | |
private | |
def join_start_group | |
@start_group_membership = start_group.join | |
end | |
def leave_start_group | |
@start_group_membership.leave | |
@start_group_membership = nil | |
end | |
def join_end_group | |
@end_group_membership = end_group.join | |
end_group.on_membership_change do |old_members, current_members| | |
@lock.synchronize do | |
@start_members -= diff(@start_members, current_members) | |
end | |
end | |
end | |
def leave_end_group | |
@end_group_membership.leave | |
@end_group_membership = nil | |
end | |
def notify_path | |
File.join(@root, 'barrier', @name, 'notify') | |
end | |
def start_group_name | |
"#{@name}_start_group" | |
end | |
def end_group_name | |
"#{@name}_end_group" | |
end | |
def start_group | |
return @start_group if @start_group | |
@start_group = ZK::Group.new(@zk, start_group_name, :root => @root) | |
@start_group.create | |
@start_group | |
end | |
def end_group | |
return @end_group if @end_group | |
@end_group = ZK::Group.new(@zk, end_group_name, :root => @root) | |
@end_group.create | |
@end_group | |
end | |
def handle_notify | |
return unless @start_members.nil? | |
@start_members = decode(@zk.get(notify_path).first) | |
logger.info("Received notification to enter barrier with start_members: #{@start_members}") | |
enter | |
@notify_callback.call(self) | |
end | |
# Waits until the deadline or the condition evaluates to true. | |
# | |
# @param [Time] deadline the deadline time | |
# @param [Proc] condition the condition at which to stop sleeping | |
# @return [Boolean] true if condition met before deadline, false otherwise | |
def wait_until(deadline, &condition) | |
remaining = deadline - Time.now | |
while remaining > 0 | |
return true if condition.call | |
sleep([remaining, 1].min) | |
remaining = deadline - Time.now | |
end | |
# timeout exceeded | |
false | |
end | |
def create_initial_paths | |
@zk.create(@root, :ignore => :node_exists) | |
@zk.create(File.join(@root, 'barrier'), :ignore => :node_exists) | |
@zk.create(File.join(@root, 'barrier', @name), :ignore => :node_exists) | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment