Last active
August 29, 2015 13:58
-
-
Save maisenovich/10339925 to your computer and use it in GitHub Desktop.
Simple program that publishes to RabbitMQ and reproduces mgmt_db memory issue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
public class RabbitSmasher { | |
public static void main(String[] args) throws Exception { | |
if (args.length < 2) { | |
System.err | |
.println("Usage: <channelCount> <messages per sec> [iterations] [exchangeCount] [connectionPerChannel] [randomExchange] [reportInterval]"); | |
System.exit(1); | |
} | |
int channelsCount = Integer.parseInt(args[0]); | |
int messagesPerSecond = Integer.parseInt(args[1]); | |
int iterations = args.length >= 3 ? Integer.valueOf(args[2]) : 100; | |
int exchangeCount = args.length >= 4 ? Integer.valueOf(args[3]) : channelsCount; | |
boolean connectionPerChannel = args.length >= 5 ? Boolean.valueOf(args[4]) : false; | |
boolean randomExchange = args.length >= 6 ? Boolean.valueOf(args[5]) : false; | |
int reportInterval = args.length >= 7 ? Integer.valueOf(args[6]) : messagesPerSecond; | |
Smasher smasher = new Smasher( | |
channelsCount, | |
exchangeCount, | |
connectionPerChannel, | |
randomExchange, | |
reportInterval); | |
try { | |
smasher.smash(iterations, messagesPerSecond); | |
} finally { | |
smasher.close(); | |
} | |
} | |
static class Smasher { | |
private static final Random random = new Random(); | |
private final int channelCount; | |
private final int exchangeCount; | |
private final boolean connectionPerChannel; | |
private final boolean randomExchange; | |
private final int reportInterval; | |
private final List<Connection> connections = new ArrayList<Connection>(); | |
private final List<Channel> channels = new ArrayList<Channel>(); | |
private ConnectionFactory factory; | |
public Smasher( | |
int channelCount, | |
int exchangeCount, | |
boolean connectionPerChannel, | |
boolean randomExchange, | |
int reportInterval) { | |
this.channelCount = channelCount; | |
this.exchangeCount = exchangeCount; | |
this.connectionPerChannel = connectionPerChannel; | |
this.randomExchange = randomExchange; | |
this.reportInterval = reportInterval; | |
} | |
public void smash(int iterations, int messagesPerSecond) throws Exception { | |
System.out.printf( | |
"Started with %d channels, %d exchanges, %d/s message rate.\n", | |
channelCount, | |
exchangeCount, | |
messagesPerSecond); | |
createChannels(); | |
createExchanges(); | |
createQueues(); | |
publish(iterations, messagesPerSecond); | |
deleteQueues(); | |
deleteExchanges(); | |
} | |
private void publish(int iterations, int messagesPerSecond) throws Exception { | |
long publishedTotal = 0; | |
long publishedSinceDelay = 0; | |
int exchangeNumber = 0; | |
long start = System.currentTimeMillis(); | |
long reportStart = System.currentTimeMillis(); | |
long prevReportTimeSlice = Long.MAX_VALUE / 100; | |
for (int i = 0; i < iterations; ++i) { | |
for (Channel c : channels) { | |
// Channel.basicPublish(exchange, routingKey, props, byte[] body) | |
c.basicPublish( | |
getExchangeName(exchangeNumber), | |
getRoutingKey(exchangeNumber), | |
null, | |
"foo".getBytes()); | |
if (++publishedTotal % reportInterval == 0) { | |
long currReportTimeSlice = System.currentTimeMillis() - reportStart; | |
System.out.printf( | |
"published %d messages in %dms, current exchange: %d ", | |
reportInterval, | |
currReportTimeSlice, | |
exchangeNumber); | |
if (currReportTimeSlice > prevReportTimeSlice * 10) { | |
System.out.printf(" - throttled around %d sec mark", (System.currentTimeMillis() - start - currReportTimeSlice) / 1000); | |
} | |
System.out.println(); | |
prevReportTimeSlice = currReportTimeSlice; | |
reportStart = System.currentTimeMillis(); | |
} | |
if (++publishedSinceDelay > messagesPerSecond) { | |
Thread.sleep(950); | |
publishedSinceDelay = 0; | |
} | |
if (randomExchange) { | |
exchangeNumber = Math.abs(random.nextInt() % channelCount); | |
} else { | |
exchangeNumber = (exchangeNumber + 1) % exchangeCount; | |
} | |
} | |
} | |
} | |
private void createChannels() throws Exception { | |
System.out.print("Creating connections/channels..."); | |
factory = new ConnectionFactory(); | |
factory.setUri("amqp://guest:guest@localhost:5672"); | |
Connection con = null; | |
for (int i = 0; i < channelCount; ++i) { | |
if (con == null || connectionPerChannel) { | |
con = factory.newConnection(); | |
connections.add(con); | |
} | |
channels.add(con.createChannel()); | |
} | |
System.out.println("Done!"); | |
} | |
private void createExchanges() throws IOException { | |
System.out.print("Creating exchanges..."); | |
for (int n = 0; n < exchangeCount; ++n) { | |
//exchangeDeclare(exchange, type, durable, autoDelete, arguments) | |
channels.get(0).exchangeDeclare(getExchangeName(n), "direct", false, true, null); | |
} | |
System.out.println("Done!"); | |
} | |
private void createQueues() throws IOException { | |
System.out.print("Creating queues and bindings..."); | |
Map<String, Object> args = new HashMap<String, Object>(); | |
args.put("x-max-length", 5); | |
Channel channel = channels.get(0); | |
for (int n = 0; n < exchangeCount; ++n) { | |
// 1 queue per exchange | |
//queueDeclare(queue, durable, exclusive, autoDelete, arguments) | |
channel.queueDeclare(getQueueName(n), false, false, true, args); | |
channel.queueBind(getQueueName(n), getExchangeName(n), getRoutingKey(n)); | |
} | |
System.out.println("Done!"); | |
} | |
private void deleteExchanges() throws IOException { | |
System.out.print("Deleting exchanges..."); | |
for (int n = 0; n < exchangeCount; ++n) { | |
channels.get(0).exchangeDelete(getExchangeName(n)); | |
} | |
System.out.println("Done!"); | |
} | |
private void deleteQueues() throws IOException { | |
System.out.print("Deleting queues and bindings..."); | |
for (int n = 0; n < exchangeCount; ++n) { | |
channels.get(0).queueDelete(getQueueName(n)); | |
} | |
System.out.println("Done!"); | |
} | |
public void close() throws IOException { | |
for (Channel channel : channels) { | |
channel.close(); | |
} | |
for (Connection connection : connections) { | |
connection.close(); | |
} | |
} | |
private String getExchangeName(int n) { | |
return "x-foo-" + n; | |
} | |
private String getQueueName(int n) { | |
return "q-foo-" + n; | |
} | |
private String getRoutingKey(int n) { | |
return "foo." + n; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment