Created
          August 10, 2011 18:53 
        
      - 
      
 - 
        
Save sysprv/1137785 to your computer and use it in GitHub Desktop.  
    Simple example of VRRP-like master/slave behaviour
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | #! jruby | |
| require 'java' | |
| java_import 'java.net.InetSocketAddress' | |
| java_import 'java.net.InetAddress' | |
| java_import 'java.net.DatagramSocket' | |
| java_import 'java.net.DatagramPacket' | |
| java_import 'java.net.SocketTimeoutException' | |
| java_import 'java.util.concurrent.SynchronousQueue' | |
| PORT = 7001 | |
| # Where the transmitter (master) sends packets to. | |
| # This should be a multicast address if more than one slave | |
| # must be supported. | |
| SEND_ADDR = InetAddress.getLocalHost() | |
| # Synchronization mechanism, to allow the Receiver thread | |
| # to send state information to the standby worker (i.e. slave) | |
| # javadoc: http://download.oracle.com/javase/6/docs/api/java/util/concurrent/SynchronousQueue.html | |
| Sync_Queue = SynchronousQueue.new | |
| # Transmitter, extending java.lang.Thread | |
| class Transmitter < java.lang.Thread | |
| def initialize | |
| super('Transmitter') | |
| @send_data = 'ping'.to_java_bytes | |
| end | |
| # Send a UDP packet to the predefined ip:port once every 1.5 seconds. | |
| def run | |
| sock = DatagramSocket.new | |
| send_to = InetSocketAddress.new(SEND_ADDR, PORT) | |
| packet = DatagramPacket.new(@send_data, @send_data.length, send_to) | |
| 10_000.times do | |
| java.lang.Thread.sleep(1500) | |
| # puts 'Transmitting packet' | |
| sock.send(packet) | |
| end | |
| end | |
| end | |
| # Receiver, extending java.lang.Thread | |
| class Receiver < java.lang.Thread | |
| def initialize(notify_queue) | |
| super('Receiver') | |
| @notify_queue = notify_queue | |
| end | |
| # Receive UDP packets, with a read timeout of two seconds. | |
| def run | |
| buf = Java::byte[256].new | |
| sock = DatagramSocket.new(PORT) | |
| sock.setSoTimeout(2000) | |
| packet = DatagramPacket.new(buf, buf.length) | |
| # Keep a history of the success/error state of receiving. | |
| # We treat this array as a shift register. | |
| # 0 == timeout, 1 == packet received | |
| packet_history = [ 0, 0, 0, 1, 1 ] | |
| 10_000.times do | |
| begin | |
| sock.receive(packet) | |
| # puts "Received packet from #{packet.getAddress().getHostAddress()}" | |
| # Push a success, and remove the old entry at the head | |
| packet_history.push(1); packet_history.shift() | |
| rescue java.net.SocketTimeoutException => e | |
| # Push a failure | |
| packet_history.push(0); packet_history.shift() | |
| # puts "#{e}" | |
| end | |
| if packet_history[-1] == 0 and packet_history[-2] == 0 then | |
| # If the last two states (negative index == counting backwards) | |
| # were timeouts, assume the slave must take over. | |
| @notify_queue.put('master') | |
| end | |
| if packet_history.reduce(:+) == 5 then | |
| # If all states are 1 (adding up all array items == 5) | |
| # assume the master is healthy. Tell the slave to stand by. | |
| @notify_queue.put('slave') | |
| end | |
| end | |
| end | |
| end | |
| # Where the business logic is. | |
| class Worker < java.lang.Thread | |
| def initialize(notify_queue, initial_state) | |
| super('Worker') | |
| setDaemon(true) | |
| @notify_queue = notify_queue | |
| @state = initial_state | |
| @p_state = @state | |
| puts @state | |
| end | |
| def run | |
| 10_000.times do | |
| @state = @notify_queue.take() | |
| puts "#{@state}" if @state != @p_state | |
| @p_state = @state | |
| end | |
| end | |
| end | |
| # main | |
| if ARGV.length > 0 then | |
| xmit = Transmitter.new | |
| xmit.start | |
| wrk = Worker.new(Sync_Queue, "master") | |
| wrk.start | |
| xmit.join | |
| else | |
| rx = Receiver.new(Sync_Queue) | |
| rx.start | |
| wrk = Worker.new(Sync_Queue, "slave") | |
| wrk.start | |
| rx.join | |
| end | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment