Created
June 17, 2011 14:38
-
-
Save technoweenie/1031540 to your computer and use it in GitHub Desktop.
ZeroMQ pub/sub demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'zmq' | |
context = ZMQ::Context.new | |
pub = context.socket ZMQ::PUB | |
pub.setsockopt ZMQ::IDENTITY, 'ping-pinger' | |
pub.bind 'tcp://*:5555' | |
i=0 | |
loop do | |
pub.send "ping pinger #{i+=1}" ; sleep 1 | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# usage: ruby pub.rb CHAN USERNAME | |
# | |
# ruby pub.rb rubyonrails technoweenie | |
# | |
# | |
# binds a PUB socket to tcp://*:5555 | |
require 'rubygems' | |
require 'zmq' | |
context = ZMQ::Context.new | |
chan = ARGV[0] | |
user = ARGV[1] | |
pub = context.socket ZMQ::PUB | |
pub.setsockopt ZMQ::IDENTITY, "#{chan}-#{user}" | |
pub.bind 'tcp://*:5555' | |
while msg = STDIN.gets | |
msg.strip! | |
pub.send "#{chan} #{user} #{msg}" | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# usage: ruby sub.rb | |
# | |
# Connects a SUB socket to tcp://*:5555. | |
# Subscribes to rubyonrails and ruby-lang. | |
require 'rubygems' | |
require 'zmq' | |
class Subscriber | |
attr_reader :id, :socket | |
def initialize(id = nil) | |
@id = id || rand.to_s | |
end | |
def connect(*addrs) | |
if !@socket | |
context = addrs.shift | |
@socket = context.socket ZMQ::SUB | |
@socket.setsockopt ZMQ::IDENTITY, @id | |
end | |
addrs.each do |addr| | |
@socket.connect addr | |
end | |
end | |
def subscribe_to(*channels) | |
channels.each { |ch| @socket.setsockopt ZMQ::SUBSCRIBE, ch } | |
end | |
def process(line = nil) | |
line ||= @socket.recv | |
chan, user, msg = line.split ' ', 3 | |
puts "##{chan} [#{user}]: #{msg}" | |
true | |
rescue SignalException | |
process(line) if line | |
false | |
end | |
def close | |
@socket.close | |
@socket = nil | |
end | |
end | |
subscriber = Subscriber.new ARGV[0] | |
subscriber.connect ZMQ::Context.new, 'tcp://127.0.0.1:5555' | |
subscriber.subscribe_to 'rubyonrails', 'ruby-lang', 'ping' | |
loop do | |
unless subscriber.process | |
subscriber.close | |
puts "Quitting..." | |
exit | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment