Created
June 8, 2012 12:35
-
-
Save TvL2386/2895395 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'ffi-rzmq' | |
def broker | |
ctx = ZMQ::Context.new | |
socket = ctx.socket(ZMQ::ROUTER) | |
socket.setsockopt ZMQ::LINGER, 0 | |
socket.bind 'tcp://*:55555' | |
poller = ZMQ::Poller.new | |
poller.register socket, ZMQ::POLLIN | |
loop do | |
items = poller.poll :blocking | |
if items > 0 | |
message = [] | |
@socket.recv_strings message | |
@socket.send_strings message | |
end | |
end | |
end | |
def worker | |
ctx = ZMQ::Context.new | |
poller = ZMQ::Poller.new | |
socket = get_socket ctx | |
poller.register socket, ZMQ::POLLIN | |
loop do | |
puts "sockets in poller: #{poller.size}" | |
items = poller.poll 1000 | |
if items > 0 | |
else | |
# What should I do here to deregister? Can't find documentation about this | |
poller.deregister socket, ZMQ::POLLIN | |
# poller.delete socket | |
socket = get_socket ctx | |
poller.register socket, ZMQ::POLLIN | |
end | |
end | |
end | |
def get_socket ctx | |
socket = ctx.socket ZMQ::DEALER | |
socket.setsockopt ZMQ::LINGER, 0 | |
socket.connect 'tcp://localhost:55555' | |
socket | |
end | |
threads = [] | |
threads << Thread.start do | |
begin | |
broker | |
rescue Exception => e | |
puts e | |
e.backtrace.each { |line| puts line } | |
end | |
end | |
worker | |
# Expected output: | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# sockets in poller: 1 | |
# | |
# Real output: | |
# sockets in poller: 1 | |
# sockets in poller: 2 | |
# sockets in poller: 3 | |
# sockets in poller: 4 | |
# sockets in poller: 5 | |
# sockets in poller: 6 | |
# sockets in poller: 7 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment