-
-
Save michaelklishin/0082745ddf31fa951221 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.Date; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import com.rabbitmq.client.QueueingConsumer; | |
/** | |
* @author ttulka | |
*/ | |
public class RabbitMqDurable { | |
private final static Logger LOGGER = LoggerFactory.getLogger(RabbitMqDurable.class); | |
protected Connection connection; | |
protected ConnectionFactory cf; | |
protected Channel channel; | |
private static String exchangeName= "MyExTest4"; | |
private int messagesSent; | |
private int messagesReceived; | |
public static void main(String[] argv) throws Exception { | |
if (argv.length >= 1) { | |
exchangeName = argv[0]; | |
} | |
new RabbitMqDurable().runTest(); | |
System.exit(0); | |
} | |
private Date start; | |
public void runTest() { | |
try { | |
cf = new ConnectionFactory(); | |
cf.setHost("localhost"); | |
connection = cf.newConnection(); | |
channel = connection.createChannel(); | |
boolean durable = true; | |
channel.exchangeDeclare(exchangeName, "fanout", durable, false, null); | |
Map<String, Object> params = new HashMap<String, Object>(); | |
String queueName = exchangeName + "Q"; | |
channel.queueDeclare(queueName, durable, false, false, params); | |
channel.queueBind(queueName, exchangeName, "#"); | |
start = new Date(); | |
List<Thread> threads = new ArrayList<Thread>(); | |
// run producers and consumers in parallel | |
Thread t = new Thread(new Producer(channel)); | |
t.start(); | |
threads.add(t); | |
t.join(); | |
t = new Thread(new Consumer(channel, queueName)); | |
t.start(); | |
threads.add(t); | |
// wait for all the threads | |
for (Thread t0 : threads) { | |
t0.join(); | |
} | |
// run the second consumer to eat the rest | |
t = new Thread(new Consumer(channel, queueName)); | |
t.start(); | |
threads.add(t); | |
t.join(); | |
// print results | |
LOGGER.info("A total of {} messages were sent.", getMessagesSent()); | |
LOGGER.info("A total of {} messages were consumed.", getMessagesReceived()); | |
} | |
catch (Exception e) { | |
LOGGER.error("Error while running", e); | |
} | |
finally { | |
closeConnection(); | |
} | |
} | |
private void closeConnection() { | |
LOGGER.info("Close connection"); | |
try { | |
if (channel != null && channel.isOpen()) { | |
channel.close(); | |
} | |
} | |
catch (Exception e) { } | |
try { | |
if (connection != null && connection.isOpen()) { | |
connection.close(); | |
} | |
} | |
catch (Exception e) { } | |
} | |
private void sleep(int ms) { | |
try { | |
Thread.sleep(ms); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* Producer | |
*/ | |
private class Producer implements Runnable { | |
private final Channel channel; | |
private int durationProducer = 1000; | |
private int messageCount = 0; | |
private int sendDelay = 0; | |
Producer(Channel channel) { | |
this.channel = channel; | |
} | |
@Override | |
public void run() { | |
byte[] message = createMessage(); | |
LOGGER.info("Running a producer"); | |
while (true) { | |
if (start.getTime() + durationProducer < new Date().getTime()) { | |
LOGGER.info("Sending duration reached. Stopping..."); | |
break; | |
} | |
try { | |
channel.basicPublish(exchangeName, "test", null, message); | |
messageCount++; | |
sleep(sendDelay); | |
} | |
catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
LOGGER.info("Producer stops after {} sent", messageCount); | |
addMessagesSent(messageCount); | |
} | |
private byte[] createMessage() { | |
// Create random bytes as data to send | |
byte[] data = new byte[128]; | |
new Random().nextBytes(data); | |
return data; | |
} | |
} | |
/** | |
* Consumer | |
*/ | |
private class Consumer implements Runnable { | |
private final Channel channel; | |
private final String queueName; | |
private int durationConsumer = 10000; | |
private int messageCount = 0; | |
private int startDelay = 100; | |
private int consumerKillAfterTime = 500; | |
Consumer(Channel channel, String queueName) { | |
this.channel = channel; | |
this.queueName = queueName; | |
} | |
@Override | |
public void run() { | |
QueueingConsumer consumer = new QueueingConsumer(channel); | |
try { | |
channel.basicConsume(queueName, false, consumer); | |
} | |
catch (IOException e) { | |
e.printStackTrace(); | |
return; | |
} | |
sleep(startDelay); | |
LOGGER.info("Running a consumer"); | |
while (true) { | |
if (start.getTime() + durationConsumer + startDelay < new Date().getTime()) { | |
LOGGER.info("Receiving duration reached. Stopping..."); | |
break; | |
} | |
if (consumerKillAfterTime > 0 && | |
start.getTime() + consumerKillAfterTime + startDelay < new Date().getTime()) { | |
LOGGER.info("Consumer kill reached. Stopping..."); | |
LOGGER.info("A total of {} messages were consumed in {} seconds before Kill reached.", messageCount, consumerKillAfterTime / 1000); | |
break; | |
} | |
try { | |
QueueingConsumer.Delivery delivery = consumer.nextDelivery(5000); | |
if (delivery != null) { | |
messageCount++; | |
} | |
} | |
catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
LOGGER.info("Consumer stops after {} received", messageCount); | |
addMessagesReceived(messageCount); | |
} | |
} | |
private synchronized int getMessagesSent() { | |
return messagesSent; | |
} | |
private synchronized void addMessagesSent(int messagesSent) { | |
this.messagesSent += messagesSent; | |
} | |
private synchronized int getMessagesReceived() { | |
return messagesReceived; | |
} | |
private synchronized void addMessagesReceived(int messagesReceived) { | |
this.messagesReceived += messagesReceived; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment