Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
High level AMQP blocking publish method
# Helper method that publishes a message to an exchange and waits for a
# broker acknowledgement using RabbitMQ's publisher confirmation extension.
#
# Confirm mode must already be enabled in the channel. You can turn
# confirm mode on using channel.confirm_select. You must only do this
# once.
#
# It also raises an error if you use the mandatory and/or immediate
# options, and these constrains cannot be meet by the broker.
#
# The method will wait using the em-synchrony gem, and thus expects to be
# executing within a Fiber.
#
# @note This method redefines the channel error, ack, and nack callbacks,
# as well as the exchange return callback. You must restore these
# callbacks after calling this method if you were using them previously.
#
# @param [AMQP::Exchange] exch The exchange to publish to.
# @param [Integer] timeout How long to wait for a response from the broker.
# @param [String] msg The message to publish to the exchange.
# @param [Hash] opts The AMQP::Exchange#publish options.
#
# @raise [RuntimeError] in case of timeout; or if the mandatory option was
# set, and the broker could not route the message to a queue; or if the
# immediate flag was set, and the broker could not route the message to a
# consumer; or if the broker replies with a negative acknowledgement.
#
# @api public
def publish(exch, timeout, msg, opts)
ch = exch.channel
df = @deferrable.new
df.timeout timeout, 'AMQP publisher confirm timeout.'
ch.on_ack { |ack| df.succeed if ch.publisher_index == ack.delivery_tag.to_i }
ch.on_nack { |nack| df.fail 'AMQP Nack' if ch.publisher_index == nack.delivery_tag.to_i }
ch.on_error { |ch2,close| df.fail "AMQP channel error #{close.inspect}" }
# RabbitMQ will guarantees sending a basic.return before sending a
# publisher confirmation. So we'll learn of the constrain failure
# before we get a broker acknowledgement.
exch.on_return do |breturn,metadata,rmsg|
# We clear the channel ack callback. Otherwise, since the status of a Deferrable
# may be set more than once and changed it may be possible for the broker's
# acknowledgement to come in after we execute and wake up the sync call, but
# before the call errback to raise the error, changing the failed deferrable
# into a successful one.
ch.clear_callbacks(:ack)
# reply code is set to 312 on a return when a mandatory constrain could
# not be fullfiled, and to 313 when an immediate constain couldnot be
# fullfilled.
msg = if breturn.reply_code == 312
'AMQP message could not be routed to queue.'
elsif breturn.reply_code == 313
'AMQP message could not be routed to consumer.'
else
"AMQP unexpected basic.return: code: #{breturn.reply_code} msg: #{breturn.reply_text}"
end
df.fail msg
end if opts[:mandatory] or opts[:immediate]
exch.publish( msg, opts )
df.wait
ch.clear_callbacks(:error)
ch.clear_callbacks(:ack)
ch.clear_callbacks(:nack)
exch.clear_callbacks(:return) if opts[:mandatory] or opts[:immediate]
# DefaultDeferrable has no attribute reader to find its status, but you
# can add callbacks after the status has been set in the deferrable, and
# they will trigger right away. We could set this callback when we create
# the deferrable, but Synchorny.sync will add its own, and if they are
# called in the wrong order, this exception may be raised in the wrong
# fiber, and we don't want to depend on callback calling order
# implementation of the deferrable, which may change,
df.errback { |err_msg| raise err_msg }
end
@eliaslevy

This comment has been minimized.

Copy link
Owner Author

eliaslevy commented Jul 3, 2012

That df.wait should be EM::Synchrony.sync df.

Elsewhere extended DefaultDeferrable with the wait method which uses EM::Synchrony.sync when executing inside the EM reactor and uses a Mutex and ConditionalVariable when executing outside the EM.reactor thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.