Skip to content

Instantly share code, notes, and snippets.

@maxdemarzi
Created August 20, 2013 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maxdemarzi/6286700 to your computer and use it in GitHub Desktop.
Save maxdemarzi/6286700 to your computer and use it in GitHub Desktop.
Size + Time Rabbit MQ Accumulator
require 'bunny'
require 'timers'
TIME_TO_WAIT=3
MAX_BUFFER=3
# Setup Timers
timers = Timers.new
@three_second_timer = timers.every(3) { puts "***********triggered by timer*********" ; process_messages }
def process_messages
if !@messages.empty? || times_up?
@messages.each do |message|
puts "Added #{message} to transaction"
end
puts "Batched #{@messages.size}:"
@channel.acknowledge(@last, true)
reset_variables
end
end
def reset_variables
@cnt = 0
@messages = []
@last = nil
@last_time = Time.now
#Timers.reset
@three_second_timer.reset
#@three_second_timer.delay(TIME_TO_WAIT)
end
def times_up?
(@last_time + TIME_TO_WAIT) > Time.now
end
# Initialize
reset_variables
t1 = Thread.new do
loop do
loop { timers.wait }
end
end
t1.abort_on_exception = true
# Start and Configure RabbitMQ
connection = Bunny.new
connection.start
@channel = connection.create_channel
@channel.prefetch(MAX_BUFFER)
exchange = @channel.direct("moar")
queue = @channel.queue("", :durable => true)
queue.purge #just for this test
queue.bind(exchange).subscribe(:ack => true, :block => false) do |delivery_info, metadata, payload|
@last = delivery_info.delivery_tag
puts "Got the #{payload}"
@messages << payload
@cnt +=1
if @cnt >= MAX_BUFFER
puts "***********triggered by buffer*********"
process_messages
end
end
20.times do |t|
sleep 0.5 + rand
exchange.publish("Hello #{t}!")
end
sleep 5
connection.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment