Skip to content

Instantly share code, notes, and snippets.

@bernd
Created May 9, 2011 13:16
Show Gist options
  • Save bernd/962496 to your computer and use it in GitHub Desktop.
Save bernd/962496 to your computer and use it in GitHub Desktop.
Playing with a gossip protocol based on EventMachine
require 'rubygems'
require 'eventmachine'
# Playing around with a gossip protocol idea.
class NodeList
attr_reader :owner, :count
def initialize(owner)
@owner = owner
@nodes = {owner.nid => @owner}
@timer = {}
@removed = {}
@count = 0
end
def count!
@count += 1
end
def <<(node)
return if is_owner?(node)
return if removed?(node)
return if unchanged?(node)
@nodes[node.nid] = node
if EM.reactor_running?
@timer[node.nid].cancel if @timer[node.nid]
@timer[node.nid] = EM::Timer.new(10) do
@removed[node.nid] = @nodes.delete(node.nid)
end
end
end
def is_owner?(node)
@owner.nid == node.nid
end
def removed?(node)
n = @removed[node.nid] and n.heartbeat == node.heartbeat
end
def unchanged?(node)
n = @nodes[node.nid] and n.heartbeat == node.heartbeat
end
def all
@nodes.values.map do |node|
node.to_signature
end
end
def random_node
(@nodes.values - [owner]).shuffle.first
end
def gossip_payload
all.join('|')
end
def fail!(node)
@nodes.delete(node.nid)
end
def inspect
"<NodeList:#{count}:[#{all.join(', ')}]>"
end
end
class Member
attr_reader :addr, :port
attr_accessor :timer, :heartbeat
def initialize(string)
@addr, @port, @heartbeat = string.split(':')
@heartbeat ||= 0
end
def beat!
@heartbeat += 1
end
def nid
[addr, port].join(':')
end
def to_signature
[addr, port, heartbeat].join(':')
end
end
module Node
module Gossip
module Receiver
def initialize(node_list)
@node_list = node_list
end
def receive_data(data)
@node_list.count!
data.split('|').each do |node|
@node_list << Member.new(node)
end
end
end
module Sender
def initialize(node_list)
@node_list = node_list
end
def post_init
# Increase our own heartbeat before sending the status.
@node_list.owner.beat!
if node = @node_list.random_node
send_datagram @node_list.gossip_payload, node.addr, node.port
end
rescue => e
STDERR.puts e.message
raise e
ensure
close_connection
end
end
end
end
port = ARGV.shift
seed = ARGV.shift
node_list = NodeList.new(Member.new("127.0.0.1:#{port}"))
EM.run do
node_list << Member.new(seed) if seed
EM.open_datagram_socket(node_list.owner.addr, node_list.owner.port, Node::Gossip::Receiver, node_list)
EM.add_periodic_timer(1) do
EM.open_datagram_socket('127.0.0.1', 0, Node::Gossip::Sender, node_list)
puts "[#{$$}] #{node_list.inspect}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment