Created
February 23, 2011 00:07
-
-
Save chuckremes/839709 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'ffi-rzmq' | |
require 'zmqmachine' | |
reactor = ZM::Reactor.new(:A).run do |reactor| | |
incoming = ZM::Address.new '127.0.0.1', 5555, :tcp | |
outgoing = ZM::Address.new '127.0.0.1', 5556, :tcp | |
forwarder = ZM::Device::Forwarder.new(reactor, incoming, outgoing, {:hwm => 1, :verbose => false}) | |
end | |
sleep |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'ffi-rzmq' | |
require 'zmqmachine' | |
# Shows how to publish from multiple PUB sockets to the same | |
# "bus" via a forwarder device. | |
# | |
Thread.abort_on_exception = true | |
class PublisherHandler | |
def initialize reactor, port | |
@reactor = reactor | |
@port = port | |
@data = "a" # a single char; length of message is unrelated to the issue | |
end | |
def on_attach socket | |
address = ZM::Address.new '127.0.0.1', @port, :tcp | |
rc = socket.connect address | |
socket.raw_socket.setsockopt ZMQ::HWM, 1 | |
socket.raw_socket.setsockopt ZMQ::LINGER, 0 | |
@reactor.deregister_writable socket | |
# set a timer to publish a message every 25ms | |
@reactor.periodical_timer(25) do | |
data_message = ZMQ::Message.new @data | |
socket.send_messages [data_message] | |
end | |
end | |
def on_writable(s) nil; end | |
end | |
class SubscriberHandler | |
def initialize reactor, port | |
@reactor = reactor | |
@port = port | |
end | |
def on_attach socket | |
address = ZM::Address.new '127.0.0.1', @port, :tcp | |
rc = socket.connect address | |
socket.subscribe '' # subscribes to *everything* | |
# this shouldn't be necessary for a SUB socket, but it doesn't hurt | |
socket.raw_socket.setsockopt ZMQ::LINGER, 0 | |
socket.raw_socket.setsockopt ZMQ::HWM, 1 | |
end | |
def on_readable socket, messages | |
# just throw them away | |
messages.each { |message| message.close } | |
end | |
end | |
reactors = [] | |
# Or, run each handler in separate reactors each with its | |
# own thread. | |
reactors.push( | |
ZM::Reactor.new(:Publisher).run do |reactor| | |
handler = PublisherHandler.new reactor, 5555 | |
reactor.pub_socket handler | |
end | |
) | |
# create N reactors to do the subscribing; each gets its own thread | |
32.times do | |
reactors.push( | |
ZM::Reactor.new(:a_subscriber).run do |reactor| | |
main_closure = Proc.new do | |
handler = SubscriberHandler.new reactor, 5556 | |
subscriber_sock = reactor.sub_socket handler | |
# every 100ms, close the socket and open a new one | |
reactor.oneshot_timer(100) do | |
#puts "closing sock, opening new" | |
reactor.close_socket subscriber_sock | |
handler = nil | |
main_closure.call | |
end | |
end | |
main_closure.call | |
end | |
) | |
end | |
sleep # blocks forever |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment