Skip to content

Instantly share code, notes, and snippets.

Created July 3, 2012 19:43
Show Gist options
  • Save eliaslevy/3042381 to your computer and use it in GitHub Desktop.
Save eliaslevy/3042381 to your computer and use it in GitHub Desktop.
High level AMQP blocking publish method
# Helper method that publishes a message to an exchange and waits for a
# broker acknowledgement using RabbitMQ's publisher confirmation extension.
# Confirm mode must already be enabled in the channel. You can turn
# confirm mode on using channel.confirm_select. You must only do this
# once.
# It also raises an error if you use the mandatory and/or immediate
# options, and these constrains cannot be meet by the broker.
# The method will wait using the em-synchrony gem, and thus expects to be
# executing within a Fiber.
# @note This method redefines the channel error, ack, and nack callbacks,
# as well as the exchange return callback. You must restore these
# callbacks after calling this method if you were using them previously.
# @param [AMQP::Exchange] exch The exchange to publish to.
# @param [Integer] timeout How long to wait for a response from the broker.
# @param [String] msg The message to publish to the exchange.
# @param [Hash] opts The AMQP::Exchange#publish options.
# @raise [RuntimeError] in case of timeout; or if the mandatory option was
# set, and the broker could not route the message to a queue; or if the
# immediate flag was set, and the broker could not route the message to a
# consumer; or if the broker replies with a negative acknowledgement.
# @api public
def publish(exch, timeout, msg, opts)
ch =
df =
df.timeout timeout, 'AMQP publisher confirm timeout.'
ch.on_ack { |ack| df.succeed if ch.publisher_index == ack.delivery_tag.to_i }
ch.on_nack { |nack| 'AMQP Nack' if ch.publisher_index == nack.delivery_tag.to_i }
ch.on_error { |ch2,close| "AMQP channel error #{close.inspect}" }
# RabbitMQ will guarantees sending a basic.return before sending a
# publisher confirmation. So we'll learn of the constrain failure
# before we get a broker acknowledgement.
exch.on_return do |breturn,metadata,rmsg|
# We clear the channel ack callback. Otherwise, since the status of a Deferrable
# may be set more than once and changed it may be possible for the broker's
# acknowledgement to come in after we execute and wake up the sync call, but
# before the call errback to raise the error, changing the failed deferrable
# into a successful one.
# reply code is set to 312 on a return when a mandatory constrain could
# not be fullfiled, and to 313 when an immediate constain couldnot be
# fullfilled.
msg = if breturn.reply_code == 312
'AMQP message could not be routed to queue.'
elsif breturn.reply_code == 313
'AMQP message could not be routed to consumer.'
"AMQP unexpected basic.return: code: #{breturn.reply_code} msg: #{breturn.reply_text}"
end msg
end if opts[:mandatory] or opts[:immediate]
exch.publish( msg, opts )
exch.clear_callbacks(:return) if opts[:mandatory] or opts[:immediate]
# DefaultDeferrable has no attribute reader to find its status, but you
# can add callbacks after the status has been set in the deferrable, and
# they will trigger right away. We could set this callback when we create
# the deferrable, but Synchorny.sync will add its own, and if they are
# called in the wrong order, this exception may be raised in the wrong
# fiber, and we don't want to depend on callback calling order
# implementation of the deferrable, which may change,
df.errback { |err_msg| raise err_msg }
Copy link

That df.wait should be EM::Synchrony.sync df.

Elsewhere extended DefaultDeferrable with the wait method which uses EM::Synchrony.sync when executing inside the EM reactor and uses a Mutex and ConditionalVariable when executing outside the EM.reactor thread.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment