Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Last active February 1, 2016 13:41
Show Gist options
  • Save Gsantomaggio/c038daba8459b68be5b1 to your computer and use it in GitHub Desktop.
Save Gsantomaggio/c038daba8459b68be5b1 to your computer and use it in GitHub Desktop.
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by gabriele on 22/01/2016.
*/
public class StressAutoDelete {
public static ExecutorService threadChannels = Executors.newFixedThreadPool(99);
public static void stress(String ip) {
try {
System.out.println(" starting for " + ip);
ConnectionFactory factory = new ConnectionFactory();
factory.setPassword("test");
factory.setUsername("test");
factory.setHost(ip);
factory.setRequestedHeartbeat(0);
final String exchange_name = "test_" + UUID.randomUUID().toString();
final String queue_name = "queue_" + UUID.randomUUID().toString();
final Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean autoDelete = true;
channel.exchangeDeclare(exchange_name, "fanout", false, autoDelete, null);
channel.queueDeclare(queue_name, false, false, autoDelete, null);
channel.queueBind(queue_name, exchange_name, "");
channel.basicConsume(queue_name, new DefaultConsumer(channel) {
}
);
System.out.println(" step1 ");
for (int i = 0; i < 50; i++) {
channel.basicPublish(exchange_name, "", MessageProperties.BASIC, new byte[1024]);
}
System.out.println(" starting for second exchange " + ip);
final String exchange_broken = "yes_iam_the_problem";
channel.exchangeDeclare(exchange_broken, "fanout", false, autoDelete, null);
channel.queueBind(queue_name, exchange_broken, "");
for (int i = 0; i < 100; i++) {
String cons = channel.basicConsume(queue_name, new DefaultConsumer(channel) {
}
);
channel.basicCancel(cons);
}
for (int i = 0; i < 100; i++) {
channel.queueUnbind(queue_name, exchange_broken, "");
channel.queueBind(queue_name, exchange_broken, "");
channel.queueUnbind(queue_name, exchange_broken, "");
}
System.out.println(" before close " + ip);
connection.close();
} catch (Exception e) {
System.out.print(" err" + e.getMessage());
// e.printStackTrace();
}
}
public static void main(String[] argv) throws Exception {
for (int i = 0; i < 500000; i++) {
System.out.println(" Init ");
threadChannels.submit(new Runnable() {
public void run() {
try {
stress("10.100.0.81");
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadChannels.submit(new Runnable() {
public void run() {
try {
stress("10.100.0.82");
} catch (Exception e) {
e.printStackTrace();
}
}
});
threadChannels.submit(new Runnable() {
public void run() {
try {
stress("10.100.0.83");
} catch (Exception e) {
e.printStackTrace();
}
}
});
Thread.sleep(2000);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment