Created
July 29, 2011 21:03
-
-
Save dougbarth/1114736 to your computer and use it in GitHub Desktop.
Rate Controlled AMQP Subscriptions
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
EM.run do | |
channel = AQMP::Channel.new | |
# Allow 10 unacked messages to be delivered to this worker. | |
channel.prefetch(10) | |
# Configure this worker to send at 1 msg/s on average with occassional bursts | |
# up to 5 messages. | |
token_bucket = TokenBucket.new(1, 5) | |
# Refresh the token bucket every second. The bucket is also refreshed | |
# when the take method is called. | |
EM.add_periodic_timer(1) { token_bucket.refresh } | |
# We subscribe with explicit acknowledgments so we can signal to RabbitMQ | |
# that more work should be delivered. Without this setting, RabbitMQ would | |
# send work over to us as fast as possible. | |
channel.queue('send_mt').subscribe(:ack => true) do |header, message| | |
# Defer the processing to a background thread since taking a token from | |
# the bucket could potentially be a blocking operation and we don't want to | |
# block the reactor. | |
EM.defer( | |
lambda { | |
# Takes 1 token from the bucket. If the bucket is empty, this | |
# method will block. | |
token_bucket.take(1) | |
process_message(message) | |
# Acknowledge this message, allowing RabbitMQ to send more work. | |
header.ack | |
} | |
) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment