Skip to content

Instantly share code, notes, and snippets.

@igkuz
Created March 12, 2019 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save igkuz/3e43e5a80362c5fbbdabce27354c2821 to your computer and use it in GitHub Desktop.
Save igkuz/3e43e5a80362c5fbbdabce27354c2821 to your computer and use it in GitHub Desktop.
Ruby consumer for RabbitMQ queue with rate-limiting based on Redis
#!/usr/bin/env ruby
# encoding utf-8
require 'bunny'
require 'logger'
require 'redis'
logger = Logger.new(STDOUT)
STDOUT.sync = true
conn = Bunny.new(host: 'rmq', user: 'guest', pass: 'guest')
conn.start
ch = conn.create_channel
q = ch.queue('work.queue', arguments: { :'x-dead-letter-exchange' => 'retry.exchange' })
Redis.current = Redis.new(host: 'redis')
# `block: true` is only for presentation purposes
# it blocks the main thread, do not use it in production
q.subscribe(manual_ack: true, block: true) do |delivery_info, properties, payload|
logger.info "Message received with payload: #{payload}"
key = "slot:#{Time.now.utc.sec}"
slot = Redis.current.get(key)
logger.info "Slot value: #{slot}"
if !slot || slot.to_i < 1
logger.info "Slot acquired, writing to Redis"
result = Redis.current.multi do |multi|
multi.set(key, 1)
multi.expire(key, 1)
end
logger.info "Slot acquired, finished writing to Redis with result: #{result}"
ch.ack(delivery_info.delivery_tag)
logger.info "Message acked"
else
# reject without requeue
ch.reject(delivery_info.delivery_tag)
logger.info "Message rejected"
end
end
conn.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment