Skip to content

Instantly share code, notes, and snippets.

@dougbarth
Created July 29, 2011 21:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dougbarth/1114736 to your computer and use it in GitHub Desktop.
Save dougbarth/1114736 to your computer and use it in GitHub Desktop.
Rate Controlled AMQP Subscriptions
EM.run do
channel = AQMP::Channel.new
# Allow 10 unacked messages to be delivered to this worker.
channel.prefetch(10)
# Configure this worker to send at 1 msg/s on average with occassional bursts
# up to 5 messages.
token_bucket = TokenBucket.new(1, 5)
# Refresh the token bucket every second. The bucket is also refreshed
# when the take method is called.
EM.add_periodic_timer(1) { token_bucket.refresh }
# We subscribe with explicit acknowledgments so we can signal to RabbitMQ
# that more work should be delivered. Without this setting, RabbitMQ would
# send work over to us as fast as possible.
channel.queue('send_mt').subscribe(:ack => true) do |header, message|
# Defer the processing to a background thread since taking a token from
# the bucket could potentially be a blocking operation and we don't want to
# block the reactor.
EM.defer(
lambda {
# Takes 1 token from the bucket. If the bucket is empty, this
# method will block.
token_bucket.take(1)
process_message(message)
# Acknowledge this message, allowing RabbitMQ to send more work.
header.ack
}
)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment