Skip to content

Instantly share code, notes, and snippets.

@joshado
Created January 9, 2012 01:03
Show Gist options
  • Save joshado/1580365 to your computer and use it in GitHub Desktop.
Save joshado/1580365 to your computer and use it in GitHub Desktop.
AMQP latency test.
require "amqp"
AMQP_CONFIG = {
host: '<...>',
port: 5671,
username: '<...>',
password: '<...>',
ssl: true
}
@avg = 0
@ct = 0
EventMachine.run do
connection = AMQP.connect(AMQP_CONFIG)
channel = AMQP::Channel.new(connection)
channel.queue('', :auto_delete => true, :exclusive => true) do |queue, declare_ok|
queue.subscribe do |metadata, payload|
local_receive = Time.now.to_f
local_sent, remote_receive, remote_sent = payload.split(":").collect { |f| f.to_f }
work_time = remote_sent - remote_receive
latency = local_receive - local_sent
time_in_flight = (latency - work_time) * 1000
@avg += time_in_flight
@ct += 1
puts time_in_flight
end
EM.add_periodic_timer(0.2) do
channel.default_exchange.publish(Time.now.to_f, :routing_key => 'org.haggett.amqptest', :reply_to => queue.name, :immediate => true)
end
end
Signal.trap("INT") {
puts "AVG: #{@avg / @ct.to_f}"
connection.close { EventMachine.stop }
}
end
require "amqp"
AMQP_CONFIG = {
host: '127.0.0.1',
port: 5671,
username: '<...>',
password: '<...>',
ssl: true
}
EventMachine.run do
connection = AMQP.connect(AMQP_CONFIG)
channel = AMQP::Channel.new(connection)
channel.queue("org.haggett.amqptest", :exclusive => true, :auto_delete => true) do |queue, declare_ok|
puts "Declared work queue."
queue.subscribe(:ack => true) do |metadata, payload|
received_time = payload
start_time = Time.now.to_f
EM.add_timer(rand(2.0)) do
end_time = Time.now.to_f
channel.default_exchange.publish([received_time, start_time, end_time].join(":"), :routing_key => metadata.reply_to, :mandatory => true)
metadata.ack
end
end
end
Signal.trap("INT") { connection.close { EventMachine.stop } }
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment