Skip to content

Instantly share code, notes, and snippets.

@shalinmangar
Created April 21, 2011 18:40
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save shalinmangar/935196 to your computer and use it in GitHub Desktop.
Save shalinmangar/935196 to your computer and use it in GitHub Desktop.
RabbitMQ Producer Consumer Performance Test
package com.aol.platformx
import com.rabbitmq.client.{QueueingConsumer, ConnectionFactory}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.Executors
/**
*
* @author shalinsmangar
* Date: 4/20/11
* Time: 2:27 PM
*/
object ProducerConsumerTest {
val NO_OP_QUEUE = "noop"
def main(args: Array[String]) {
val factory = new ConnectionFactory
factory.setHost("localhost")
val conn = factory.newConnection
val channel = conn.createChannel
channel.queueDeclare("noop", false, false, false, null)
val consumer = new QueueingConsumer(channel)
channel.basicConsume(NO_OP_QUEUE, true, consumer)
val now = System.currentTimeMillis
val producerCounter = new AtomicLong
val consumerCounter = new AtomicLong
val producers = Executors.newFixedThreadPool(1)
producers.execute(new Producer("1", producerCounter))
//producers.execute(new Producer("2", producerCounter))
val consumers = Executors.newFixedThreadPool(4)
for (i <- 1 to 4) {
consumers.execute(new Consumer("" + i, consumerCounter))
}
var loop = true
while (loop) {
Thread.sleep(10000)
val produced = producerCounter.get
val consumed = consumerCounter.get
val timeTaken = (System.currentTimeMillis - now) / 1000.0d
println("\t\tProduced: " + produced + " throughput: " + produced * 1.0d / timeTaken)
println("\t\tConsumed: " + consumed + " throughput: " + consumed * 1.0d / timeTaken)
//if (System.currentTimeMillis - now >= 2*60*60) loop = false
}
}
class Producer(id: String, counter: AtomicLong) extends Runnable {
def run() {
val now = System.currentTimeMillis
var c = 0
val factory = new ConnectionFactory
factory.setHost("localhost")
val conn = factory.newConnection
val channel = conn.createChannel
val data = "NoOp".getBytes
while (true) {
channel.basicPublish("", NO_OP_QUEUE, null, data)
counter.incrementAndGet
c += 1
if (c % 100000 == 0) {
val timeTaken = (System.currentTimeMillis - now) / 1000.0d
println("Producer: " + id + " Sent: " + c + " throughput: " + c * 1.0d / timeTaken)
}
if (c % 2000 == 0) Thread.sleep(1000)
}
}
}
class Consumer(id:String, counter: AtomicLong) extends Runnable {
def run() {
val now = System.currentTimeMillis
var c = 0
val factory = new ConnectionFactory
factory.setHost("localhost")
val conn = factory.newConnection
val channel = conn.createChannel
val consumer = new QueueingConsumer(channel)
while (true) {
channel.basicConsume(NO_OP_QUEUE, true, consumer)
val delivery = consumer.nextDelivery
val data = delivery.getBody
c += 1
counter.incrementAndGet
if (c % 100000 == 0) {
val timeTaken = (System.currentTimeMillis - now) / 1000.0d
println("Consumer: " + id + " Sent: " + c + " throughput: " + c * 1.0d / timeTaken)
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment