Skip to content

Instantly share code, notes, and snippets.

@sysprv
Created August 10, 2011 18:53
Show Gist options
  • Save sysprv/1137785 to your computer and use it in GitHub Desktop.
Save sysprv/1137785 to your computer and use it in GitHub Desktop.
Simple example of VRRP-like master/slave behaviour
#! jruby
require 'java'
java_import 'java.net.InetSocketAddress'
java_import 'java.net.InetAddress'
java_import 'java.net.DatagramSocket'
java_import 'java.net.DatagramPacket'
java_import 'java.net.SocketTimeoutException'
java_import 'java.util.concurrent.SynchronousQueue'
PORT = 7001
# Where the transmitter (master) sends packets to.
# This should be a multicast address if more than one slave
# must be supported.
SEND_ADDR = InetAddress.getLocalHost()
# Synchronization mechanism, to allow the Receiver thread
# to send state information to the standby worker (i.e. slave)
# javadoc: http://download.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html
Sync_Queue = SynchronousQueue.new
# Transmitter, extending java.lang.Thread
class Transmitter < java.lang.Thread
def initialize
super('Transmitter')
@send_data = 'ping'.to_java_bytes
end
# Send a UDP packet to the predefined ip:port once every 1.5 seconds.
def run
sock = DatagramSocket.new
send_to = InetSocketAddress.new(SEND_ADDR, PORT)
packet = DatagramPacket.new(@send_data, @send_data.length, send_to)
10_000.times do
java.lang.Thread.sleep(1500)
# puts 'Transmitting packet'
sock.send(packet)
end
end
end
# Receiver, extending java.lang.Thread
class Receiver < java.lang.Thread
def initialize(notify_queue)
super('Receiver')
@notify_queue = notify_queue
end
# Receive UDP packets, with a read timeout of two seconds.
def run
buf = Java::byte[256].new
sock = DatagramSocket.new(PORT)
sock.setSoTimeout(2000)
packet = DatagramPacket.new(buf, buf.length)
# Keep a history of the success/error state of receiving.
# We treat this array as a shift register.
# 0 == timeout, 1 == packet received
packet_history = [ 0, 0, 0, 1, 1 ]
10_000.times do
begin
sock.receive(packet)
# puts "Received packet from #{packet.getAddress().getHostAddress()}"
# Push a success, and remove the old entry at the head
packet_history.push(1); packet_history.shift()
rescue java.net.SocketTimeoutException => e
# Push a failure
packet_history.push(0); packet_history.shift()
# puts "#{e}"
end
if packet_history[-1] == 0 and packet_history[-2] == 0 then
# If the last two states (negative index == counting backwards)
# were timeouts, assume the slave must take over.
@notify_queue.put('master')
end
if packet_history.reduce(:+) == 5 then
# If all states are 1 (adding up all array items == 5)
# assume the master is healthy. Tell the slave to stand by.
@notify_queue.put('slave')
end
end
end
end
# Where the business logic is.
class Worker < java.lang.Thread
def initialize(notify_queue, initial_state)
super('Worker')
setDaemon(true)
@notify_queue = notify_queue
@state = initial_state
@p_state = @state
puts @state
end
def run
10_000.times do
@state = @notify_queue.take()
puts "#{@state}" if @state != @p_state
@p_state = @state
end
end
end
# main
if ARGV.length > 0 then
xmit = Transmitter.new
xmit.start
wrk = Worker.new(Sync_Queue, "master")
wrk.start
xmit.join
else
rx = Receiver.new(Sync_Queue)
rx.start
wrk = Worker.new(Sync_Queue, "slave")
wrk.start
rx.join
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment