Created
April 21, 2011 18:40
-
-
Save shalinmangar/935196 to your computer and use it in GitHub Desktop.
RabbitMQ Producer Consumer Performance Test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.aol.platformx | |
import com.rabbitmq.client.{QueueingConsumer, ConnectionFactory} | |
import java.util.concurrent.atomic.AtomicLong | |
import java.util.concurrent.Executors | |
/** | |
* | |
* @author shalinsmangar | |
* Date: 4/20/11 | |
* Time: 2:27 PM | |
*/ | |
object ProducerConsumerTest { | |
val NO_OP_QUEUE = "noop" | |
def main(args: Array[String]) { | |
val factory = new ConnectionFactory | |
factory.setHost("localhost") | |
val conn = factory.newConnection | |
val channel = conn.createChannel | |
channel.queueDeclare("noop", false, false, false, null) | |
val consumer = new QueueingConsumer(channel) | |
channel.basicConsume(NO_OP_QUEUE, true, consumer) | |
val now = System.currentTimeMillis | |
val producerCounter = new AtomicLong | |
val consumerCounter = new AtomicLong | |
val producers = Executors.newFixedThreadPool(1) | |
producers.execute(new Producer("1", producerCounter)) | |
//producers.execute(new Producer("2", producerCounter)) | |
val consumers = Executors.newFixedThreadPool(4) | |
for (i <- 1 to 4) { | |
consumers.execute(new Consumer("" + i, consumerCounter)) | |
} | |
var loop = true | |
while (loop) { | |
Thread.sleep(10000) | |
val produced = producerCounter.get | |
val consumed = consumerCounter.get | |
val timeTaken = (System.currentTimeMillis - now) / 1000.0d | |
println("\t\tProduced: " + produced + " throughput: " + produced * 1.0d / timeTaken) | |
println("\t\tConsumed: " + consumed + " throughput: " + consumed * 1.0d / timeTaken) | |
//if (System.currentTimeMillis - now >= 2*60*60) loop = false | |
} | |
} | |
class Producer(id: String, counter: AtomicLong) extends Runnable { | |
def run() { | |
val now = System.currentTimeMillis | |
var c = 0 | |
val factory = new ConnectionFactory | |
factory.setHost("localhost") | |
val conn = factory.newConnection | |
val channel = conn.createChannel | |
val data = "NoOp".getBytes | |
while (true) { | |
channel.basicPublish("", NO_OP_QUEUE, null, data) | |
counter.incrementAndGet | |
c += 1 | |
if (c % 100000 == 0) { | |
val timeTaken = (System.currentTimeMillis - now) / 1000.0d | |
println("Producer: " + id + " Sent: " + c + " throughput: " + c * 1.0d / timeTaken) | |
} | |
if (c % 2000 == 0) Thread.sleep(1000) | |
} | |
} | |
} | |
class Consumer(id:String, counter: AtomicLong) extends Runnable { | |
def run() { | |
val now = System.currentTimeMillis | |
var c = 0 | |
val factory = new ConnectionFactory | |
factory.setHost("localhost") | |
val conn = factory.newConnection | |
val channel = conn.createChannel | |
val consumer = new QueueingConsumer(channel) | |
while (true) { | |
channel.basicConsume(NO_OP_QUEUE, true, consumer) | |
val delivery = consumer.nextDelivery | |
val data = delivery.getBody | |
c += 1 | |
counter.incrementAndGet | |
if (c % 100000 == 0) { | |
val timeTaken = (System.currentTimeMillis - now) / 1000.0d | |
println("Consumer: " + id + " Sent: " + c + " throughput: " + c * 1.0d / timeTaken) | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment