Skip to content

Instantly share code, notes, and snippets.

@technoweenie
Created June 17, 2011 14:38
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 10 You must be signed in to fork a gist
  • Save technoweenie/1031540 to your computer and use it in GitHub Desktop.
Save technoweenie/1031540 to your computer and use it in GitHub Desktop.
ZeroMQ pub/sub demo
require 'zmq'
context = ZMQ::Context.new
pub = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, 'ping-pinger'
pub.bind 'tcp://*:5555'
i=0
loop do
pub.send "ping pinger #{i+=1}" ; sleep 1
end
# usage: ruby pub.rb CHAN USERNAME
#
# ruby pub.rb rubyonrails technoweenie
#
#
# binds a PUB socket to tcp://*:5555
require 'rubygems'
require 'zmq'
context = ZMQ::Context.new
chan = ARGV[0]
user = ARGV[1]
pub = context.socket ZMQ::PUB
pub.setsockopt ZMQ::IDENTITY, "#{chan}-#{user}"
pub.bind 'tcp://*:5555'
while msg = STDIN.gets
msg.strip!
pub.send "#{chan} #{user} #{msg}"
end
# usage: ruby sub.rb
#
# Connects a SUB socket to tcp://*:5555.
# Subscribes to rubyonrails and ruby-lang.
require 'rubygems'
require 'zmq'
class Subscriber
attr_reader :id, :socket
def initialize(id = nil)
@id = id || rand.to_s
end
def connect(*addrs)
if !@socket
context = addrs.shift
@socket = context.socket ZMQ::SUB
@socket.setsockopt ZMQ::IDENTITY, @id
end
addrs.each do |addr|
@socket.connect addr
end
end
def subscribe_to(*channels)
channels.each { |ch| @socket.setsockopt ZMQ::SUBSCRIBE, ch }
end
def process(line = nil)
line ||= @socket.recv
chan, user, msg = line.split ' ', 3
puts "##{chan} [#{user}]: #{msg}"
true
rescue SignalException
process(line) if line
false
end
def close
@socket.close
@socket = nil
end
end
subscriber = Subscriber.new ARGV[0]
subscriber.connect ZMQ::Context.new, 'tcp://127.0.0.1:5555'
subscriber.subscribe_to 'rubyonrails', 'ruby-lang', 'ping'
loop do
unless subscriber.process
subscriber.close
puts "Quitting..."
exit
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment