Skip to content

Instantly share code, notes, and snippets.

@michaelklishin
Created February 7, 2014 10:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save michaelklishin/0082745ddf31fa951221 to your computer and use it in GitHub Desktop.
Save michaelklishin/0082745ddf31fa951221 to your computer and use it in GitHub Desktop.
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @author ttulka
*/
public class RabbitMqDurable {
private final static Logger LOGGER = LoggerFactory.getLogger(RabbitMqDurable.class);
protected Connection connection;
protected ConnectionFactory cf;
protected Channel channel;
private static String exchangeName= "MyExTest4";
private int messagesSent;
private int messagesReceived;
public static void main(String[] argv) throws Exception {
if (argv.length >= 1) {
exchangeName = argv[0];
}
new RabbitMqDurable().runTest();
System.exit(0);
}
private Date start;
public void runTest() {
try {
cf = new ConnectionFactory();
cf.setHost("localhost");
connection = cf.newConnection();
channel = connection.createChannel();
boolean durable = true;
channel.exchangeDeclare(exchangeName, "fanout", durable, false, null);
Map<String, Object> params = new HashMap<String, Object>();
String queueName = exchangeName + "Q";
channel.queueDeclare(queueName, durable, false, false, params);
channel.queueBind(queueName, exchangeName, "#");
start = new Date();
List<Thread> threads = new ArrayList<Thread>();
// run producers and consumers in parallel
Thread t = new Thread(new Producer(channel));
t.start();
threads.add(t);
t.join();
t = new Thread(new Consumer(channel, queueName));
t.start();
threads.add(t);
// wait for all the threads
for (Thread t0 : threads) {
t0.join();
}
// run the second consumer to eat the rest
t = new Thread(new Consumer(channel, queueName));
t.start();
threads.add(t);
t.join();
// print results
LOGGER.info("A total of {} messages were sent.", getMessagesSent());
LOGGER.info("A total of {} messages were consumed.", getMessagesReceived());
}
catch (Exception e) {
LOGGER.error("Error while running", e);
}
finally {
closeConnection();
}
}
private void closeConnection() {
LOGGER.info("Close connection");
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
}
catch (Exception e) { }
try {
if (connection != null && connection.isOpen()) {
connection.close();
}
}
catch (Exception e) { }
}
private void sleep(int ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Producer
*/
private class Producer implements Runnable {
private final Channel channel;
private int durationProducer = 1000;
private int messageCount = 0;
private int sendDelay = 0;
Producer(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
byte[] message = createMessage();
LOGGER.info("Running a producer");
while (true) {
if (start.getTime() + durationProducer < new Date().getTime()) {
LOGGER.info("Sending duration reached. Stopping...");
break;
}
try {
channel.basicPublish(exchangeName, "test", null, message);
messageCount++;
sleep(sendDelay);
}
catch (Exception e) {
e.printStackTrace();
}
}
LOGGER.info("Producer stops after {} sent", messageCount);
addMessagesSent(messageCount);
}
private byte[] createMessage() {
// Create random bytes as data to send
byte[] data = new byte[128];
new Random().nextBytes(data);
return data;
}
}
/**
* Consumer
*/
private class Consumer implements Runnable {
private final Channel channel;
private final String queueName;
private int durationConsumer = 10000;
private int messageCount = 0;
private int startDelay = 100;
private int consumerKillAfterTime = 500;
Consumer(Channel channel, String queueName) {
this.channel = channel;
this.queueName = queueName;
}
@Override
public void run() {
QueueingConsumer consumer = new QueueingConsumer(channel);
try {
channel.basicConsume(queueName, false, consumer);
}
catch (IOException e) {
e.printStackTrace();
return;
}
sleep(startDelay);
LOGGER.info("Running a consumer");
while (true) {
if (start.getTime() + durationConsumer + startDelay < new Date().getTime()) {
LOGGER.info("Receiving duration reached. Stopping...");
break;
}
if (consumerKillAfterTime > 0 &&
start.getTime() + consumerKillAfterTime + startDelay < new Date().getTime()) {
LOGGER.info("Consumer kill reached. Stopping...");
LOGGER.info("A total of {} messages were consumed in {} seconds before Kill reached.", messageCount, consumerKillAfterTime / 1000);
break;
}
try {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(5000);
if (delivery != null) {
messageCount++;
}
}
catch (Exception e) {
e.printStackTrace();
}
}
LOGGER.info("Consumer stops after {} received", messageCount);
addMessagesReceived(messageCount);
}
}
private synchronized int getMessagesSent() {
return messagesSent;
}
private synchronized void addMessagesSent(int messagesSent) {
this.messagesSent += messagesSent;
}
private synchronized int getMessagesReceived() {
return messagesReceived;
}
private synchronized void addMessagesReceived(int messagesReceived) {
this.messagesReceived += messagesReceived;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment