Created
March 12, 2019 15:59
-
-
Save igkuz/3e43e5a80362c5fbbdabce27354c2821 to your computer and use it in GitHub Desktop.
Ruby consumer for RabbitMQ queue with rate-limiting based on Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
# encoding utf-8 | |
require 'bunny' | |
require 'logger' | |
require 'redis' | |
logger = Logger.new(STDOUT) | |
STDOUT.sync = true | |
conn = Bunny.new(host: 'rmq', user: 'guest', pass: 'guest') | |
conn.start | |
ch = conn.create_channel | |
q = ch.queue('work.queue', arguments: { :'x-dead-letter-exchange' => 'retry.exchange' }) | |
Redis.current = Redis.new(host: 'redis') | |
# `block: true` is only for presentation purposes | |
# it blocks the main thread, do not use it in production | |
q.subscribe(manual_ack: true, block: true) do |delivery_info, properties, payload| | |
logger.info "Message received with payload: #{payload}" | |
key = "slot:#{Time.now.utc.sec}" | |
slot = Redis.current.get(key) | |
logger.info "Slot value: #{slot}" | |
if !slot || slot.to_i < 1 | |
logger.info "Slot acquired, writing to Redis" | |
result = Redis.current.multi do |multi| | |
multi.set(key, 1) | |
multi.expire(key, 1) | |
end | |
logger.info "Slot acquired, finished writing to Redis with result: #{result}" | |
ch.ack(delivery_info.delivery_tag) | |
logger.info "Message acked" | |
else | |
# reject without requeue | |
ch.reject(delivery_info.delivery_tag) | |
logger.info "Message rejected" | |
end | |
end | |
conn.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment