Created
April 13, 2011 21:33
-
-
Save colinsurprenant/918471 to your computer and use it in GitHub Desktop.
partial multipart message with 0MQ, JRuby and ffi-rzmq
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this is to demonstrate the partial multipart message reception problem. | |
# in my environment, items will be dropped as expected and also | |
# partial messages will be received which should not happen per 0MQ | |
# atomicity semantic. | |
# | |
# usage: ruby multipart.rb {pub|sub} | |
# | |
# tested using JRuby 1.6.0, 0MQ 2.1.3, ffi-rzmq 0.7.2 on OSX 10.6.7 | |
# | |
# Colin Surprenant colin.surprenant@gmail.com, http://github.com/colinsurprenant | |
require 'rubygems' | |
require 'ffi-rzmq' | |
module ZmqTest | |
class Multipart | |
QUEUE_SIZE = 100 | |
BODY_SIZE = 1000 | |
def self.publisher | |
puts("publisher starting") | |
zmq_context = ZMQ::Context.new(1) | |
zmq_pub_socket = zmq_context.socket(ZMQ::PUB) | |
zmq_pub_socket.setsockopt(ZMQ::HWM, QUEUE_SIZE) | |
zmq_pub_socket.bind('tcp://127.0.0.1:2211') | |
body = (1..BODY_SIZE).map{"0"}.join | |
seq = 0 | |
loop do | |
zmq_assert(zmq_pub_socket.send_string('topic', ZMQ::SNDMORE)) | |
zmq_assert(zmq_pub_socket.send_string(seq.to_s, ZMQ::SNDMORE)) | |
zmq_assert(zmq_pub_socket.send_string(body.dup, ZMQ::NOBLOCK)) | |
seq += 1 | |
end | |
rescue | |
zmq_pub_socket.close if zmq_pub_socket | |
zmq_context.terminate if zmq_context | |
raise | |
end | |
def self.consumer | |
puts("consumer starting") | |
zmq_context = ZMQ::Context.new(1) | |
zmq_sub_socket = zmq_context.socket(ZMQ::SUB) | |
zmq_sub_socket.setsockopt(ZMQ::SUBSCRIBE, '') | |
zmq_sub_socket.setsockopt(ZMQ::HWM, QUEUE_SIZE) | |
zmq_sub_socket.connect('tcp://127.0.0.1:2211') | |
last_seq = nil | |
loop do | |
topic = zmq_sub_socket.recv_string | |
seq = zmq_sub_socket.more_parts? ? zmq_sub_socket.recv_string.to_i : nil | |
body = zmq_sub_socket.more_parts? ? zmq_sub_socket.recv_string : nil | |
if seq.nil? || body.nil? | |
puts("partial message received, expected seq=#{last_seq + 1}") | |
else | |
log_gap(seq, last_seq) if last_seq | |
last_seq = seq | |
end | |
end | |
rescue | |
zmq_sub_socket.close if zmq_sub_socket | |
zmq_context.terminate if zmq_context | |
raise | |
end | |
private | |
def self.log_gap(seq, last_seq) | |
if (gap = seq - (last_seq + 1)) > 0 | |
puts("#{gap} items dropped, expected seq=#{last_seq + 1}") | |
elsif gap < 0 | |
puts("publisher restarted") | |
end | |
end | |
def self.zmq_assert(code) | |
raise("zeromq invalid code=#{code.inspect}") unless code | |
end | |
end | |
case ARGV.first | |
when "pub" | |
Multipart.publisher | |
when "sub" | |
Multipart.consumer | |
else | |
puts("usage: multipart {pub|sub}") | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment