Skip to content

Instantly share code, notes, and snippets.

@maisenovich
Last active August 29, 2015 13:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save maisenovich/10339925 to your computer and use it in GitHub Desktop.
Save maisenovich/10339925 to your computer and use it in GitHub Desktop.
Simple program that publishes to RabbitMQ and reproduces mgmt_db memory issue.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
public class RabbitSmasher {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err
.println("Usage: <channelCount> <messages per sec> [iterations] [exchangeCount] [connectionPerChannel] [randomExchange] [reportInterval]");
System.exit(1);
}
int channelsCount = Integer.parseInt(args[0]);
int messagesPerSecond = Integer.parseInt(args[1]);
int iterations = args.length >= 3 ? Integer.valueOf(args[2]) : 100;
int exchangeCount = args.length >= 4 ? Integer.valueOf(args[3]) : channelsCount;
boolean connectionPerChannel = args.length >= 5 ? Boolean.valueOf(args[4]) : false;
boolean randomExchange = args.length >= 6 ? Boolean.valueOf(args[5]) : false;
int reportInterval = args.length >= 7 ? Integer.valueOf(args[6]) : messagesPerSecond;
Smasher smasher = new Smasher(
channelsCount,
exchangeCount,
connectionPerChannel,
randomExchange,
reportInterval);
try {
smasher.smash(iterations, messagesPerSecond);
} finally {
smasher.close();
}
}
static class Smasher {
private static final Random random = new Random();
private final int channelCount;
private final int exchangeCount;
private final boolean connectionPerChannel;
private final boolean randomExchange;
private final int reportInterval;
private final List<Connection> connections = new ArrayList<Connection>();
private final List<Channel> channels = new ArrayList<Channel>();
private ConnectionFactory factory;
public Smasher(
int channelCount,
int exchangeCount,
boolean connectionPerChannel,
boolean randomExchange,
int reportInterval) {
this.channelCount = channelCount;
this.exchangeCount = exchangeCount;
this.connectionPerChannel = connectionPerChannel;
this.randomExchange = randomExchange;
this.reportInterval = reportInterval;
}
public void smash(int iterations, int messagesPerSecond) throws Exception {
System.out.printf(
"Started with %d channels, %d exchanges, %d/s message rate.\n",
channelCount,
exchangeCount,
messagesPerSecond);
createChannels();
createExchanges();
createQueues();
publish(iterations, messagesPerSecond);
deleteQueues();
deleteExchanges();
}
private void publish(int iterations, int messagesPerSecond) throws Exception {
long publishedTotal = 0;
long publishedSinceDelay = 0;
int exchangeNumber = 0;
long start = System.currentTimeMillis();
long reportStart = System.currentTimeMillis();
long prevReportTimeSlice = Long.MAX_VALUE / 100;
for (int i = 0; i < iterations; ++i) {
for (Channel c : channels) {
// Channel.basicPublish(exchange, routingKey, props, byte[] body)
c.basicPublish(
getExchangeName(exchangeNumber),
getRoutingKey(exchangeNumber),
null,
"foo".getBytes());
if (++publishedTotal % reportInterval == 0) {
long currReportTimeSlice = System.currentTimeMillis() - reportStart;
System.out.printf(
"published %d messages in %dms, current exchange: %d ",
reportInterval,
currReportTimeSlice,
exchangeNumber);
if (currReportTimeSlice > prevReportTimeSlice * 10) {
System.out.printf(" - throttled around %d sec mark", (System.currentTimeMillis() - start - currReportTimeSlice) / 1000);
}
System.out.println();
prevReportTimeSlice = currReportTimeSlice;
reportStart = System.currentTimeMillis();
}
if (++publishedSinceDelay > messagesPerSecond) {
Thread.sleep(950);
publishedSinceDelay = 0;
}
if (randomExchange) {
exchangeNumber = Math.abs(random.nextInt() % channelCount);
} else {
exchangeNumber = (exchangeNumber + 1) % exchangeCount;
}
}
}
}
private void createChannels() throws Exception {
System.out.print("Creating connections/channels...");
factory = new ConnectionFactory();
factory.setUri("amqp://guest:guest@localhost:5672");
Connection con = null;
for (int i = 0; i < channelCount; ++i) {
if (con == null || connectionPerChannel) {
con = factory.newConnection();
connections.add(con);
}
channels.add(con.createChannel());
}
System.out.println("Done!");
}
private void createExchanges() throws IOException {
System.out.print("Creating exchanges...");
for (int n = 0; n < exchangeCount; ++n) {
//exchangeDeclare(exchange, type, durable, autoDelete, arguments)
channels.get(0).exchangeDeclare(getExchangeName(n), "direct", false, true, null);
}
System.out.println("Done!");
}
private void createQueues() throws IOException {
System.out.print("Creating queues and bindings...");
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 5);
Channel channel = channels.get(0);
for (int n = 0; n < exchangeCount; ++n) {
// 1 queue per exchange
//queueDeclare(queue, durable, exclusive, autoDelete, arguments)
channel.queueDeclare(getQueueName(n), false, false, true, args);
channel.queueBind(getQueueName(n), getExchangeName(n), getRoutingKey(n));
}
System.out.println("Done!");
}
private void deleteExchanges() throws IOException {
System.out.print("Deleting exchanges...");
for (int n = 0; n < exchangeCount; ++n) {
channels.get(0).exchangeDelete(getExchangeName(n));
}
System.out.println("Done!");
}
private void deleteQueues() throws IOException {
System.out.print("Deleting queues and bindings...");
for (int n = 0; n < exchangeCount; ++n) {
channels.get(0).queueDelete(getQueueName(n));
}
System.out.println("Done!");
}
public void close() throws IOException {
for (Channel channel : channels) {
channel.close();
}
for (Connection connection : connections) {
connection.close();
}
}
private String getExchangeName(int n) {
return "x-foo-" + n;
}
private String getQueueName(int n) {
return "q-foo-" + n;
}
private String getRoutingKey(int n) {
return "foo." + n;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment