Skip to content

Instantly share code, notes, and snippets.

@chuckremes
Created February 23, 2011 00:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chuckremes/839709 to your computer and use it in GitHub Desktop.
Save chuckremes/839709 to your computer and use it in GitHub Desktop.
require 'rubygems'
require 'ffi-rzmq'
require 'zmqmachine'
reactor = ZM::Reactor.new(:A).run do |reactor|
incoming = ZM::Address.new '127.0.0.1', 5555, :tcp
outgoing = ZM::Address.new '127.0.0.1', 5556, :tcp
forwarder = ZM::Device::Forwarder.new(reactor, incoming, outgoing, {:hwm => 1, :verbose => false})
end
sleep
require 'rubygems'
require 'ffi-rzmq'
require 'zmqmachine'
# Shows how to publish from multiple PUB sockets to the same
# "bus" via a forwarder device.
#
Thread.abort_on_exception = true
class PublisherHandler
def initialize reactor, port
@reactor = reactor
@port = port
@data = "a" # a single char; length of message is unrelated to the issue
end
def on_attach socket
address = ZM::Address.new '127.0.0.1', @port, :tcp
rc = socket.connect address
socket.raw_socket.setsockopt ZMQ::HWM, 1
socket.raw_socket.setsockopt ZMQ::LINGER, 0
@reactor.deregister_writable socket
# set a timer to publish a message every 25ms
@reactor.periodical_timer(25) do
data_message = ZMQ::Message.new @data
socket.send_messages [data_message]
end
end
def on_writable(s) nil; end
end
class SubscriberHandler
def initialize reactor, port
@reactor = reactor
@port = port
end
def on_attach socket
address = ZM::Address.new '127.0.0.1', @port, :tcp
rc = socket.connect address
socket.subscribe '' # subscribes to *everything*
# this shouldn't be necessary for a SUB socket, but it doesn't hurt
socket.raw_socket.setsockopt ZMQ::LINGER, 0
socket.raw_socket.setsockopt ZMQ::HWM, 1
end
def on_readable socket, messages
# just throw them away
messages.each { |message| message.close }
end
end
reactors = []
# Or, run each handler in separate reactors each with its
# own thread.
reactors.push(
ZM::Reactor.new(:Publisher).run do |reactor|
handler = PublisherHandler.new reactor, 5555
reactor.pub_socket handler
end
)
# create N reactors to do the subscribing; each gets its own thread
32.times do
reactors.push(
ZM::Reactor.new(:a_subscriber).run do |reactor|
main_closure = Proc.new do
handler = SubscriberHandler.new reactor, 5556
subscriber_sock = reactor.sub_socket handler
# every 100ms, close the socket and open a new one
reactor.oneshot_timer(100) do
#puts "closing sock, opening new"
reactor.close_socket subscriber_sock
handler = nil
main_closure.call
end
end
main_closure.call
end
)
end
sleep # blocks forever
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment