Skip to content

Instantly share code, notes, and snippets.

@colinsurprenant
Created April 13, 2011 21:33
Show Gist options
  • Save colinsurprenant/918471 to your computer and use it in GitHub Desktop.
Save colinsurprenant/918471 to your computer and use it in GitHub Desktop.
partial multipart message with 0MQ, JRuby and ffi-rzmq
# this is to demonstrate the partial multipart message reception problem.
# in my environment, items will be dropped as expected and also
# partial messages will be received which should not happen per 0MQ
# atomicity semantic.
#
# usage: ruby multipart.rb {pub|sub}
#
# tested using JRuby 1.6.0, 0MQ 2.1.3, ffi-rzmq 0.7.2 on OSX 10.6.7
#
# Colin Surprenant colin.surprenant@gmail.com, http://github.com/colinsurprenant
require 'rubygems'
require 'ffi-rzmq'
module ZmqTest
class Multipart
QUEUE_SIZE = 100
BODY_SIZE = 1000
def self.publisher
puts("publisher starting")
zmq_context = ZMQ::Context.new(1)
zmq_pub_socket = zmq_context.socket(ZMQ::PUB)
zmq_pub_socket.setsockopt(ZMQ::HWM, QUEUE_SIZE)
zmq_pub_socket.bind('tcp://127.0.0.1:2211')
body = (1..BODY_SIZE).map{"0"}.join
seq = 0
loop do
zmq_assert(zmq_pub_socket.send_string('topic', ZMQ::SNDMORE))
zmq_assert(zmq_pub_socket.send_string(seq.to_s, ZMQ::SNDMORE))
zmq_assert(zmq_pub_socket.send_string(body.dup, ZMQ::NOBLOCK))
seq += 1
end
rescue
zmq_pub_socket.close if zmq_pub_socket
zmq_context.terminate if zmq_context
raise
end
def self.consumer
puts("consumer starting")
zmq_context = ZMQ::Context.new(1)
zmq_sub_socket = zmq_context.socket(ZMQ::SUB)
zmq_sub_socket.setsockopt(ZMQ::SUBSCRIBE, '')
zmq_sub_socket.setsockopt(ZMQ::HWM, QUEUE_SIZE)
zmq_sub_socket.connect('tcp://127.0.0.1:2211')
last_seq = nil
loop do
topic = zmq_sub_socket.recv_string
seq = zmq_sub_socket.more_parts? ? zmq_sub_socket.recv_string.to_i : nil
body = zmq_sub_socket.more_parts? ? zmq_sub_socket.recv_string : nil
if seq.nil? || body.nil?
puts("partial message received, expected seq=#{last_seq + 1}")
else
log_gap(seq, last_seq) if last_seq
last_seq = seq
end
end
rescue
zmq_sub_socket.close if zmq_sub_socket
zmq_context.terminate if zmq_context
raise
end
private
def self.log_gap(seq, last_seq)
if (gap = seq - (last_seq + 1)) > 0
puts("#{gap} items dropped, expected seq=#{last_seq + 1}")
elsif gap < 0
puts("publisher restarted")
end
end
def self.zmq_assert(code)
raise("zeromq invalid code=#{code.inspect}") unless code
end
end
case ARGV.first
when "pub"
Multipart.publisher
when "sub"
Multipart.consumer
else
puts("usage: multipart {pub|sub}")
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment