Created
December 13, 2016 10:35
-
-
Save Gsantomaggio/31089e13df2b081b35148a866889704d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
IFS=$'\n' | |
ordered_vhosts=$(./rabbitmqctl list_vhosts -q | xargs -n1 | sort -u) | |
for V in $ordered_vhosts; do | |
echo "*****Vhost $V Total queues " $(./rabbitmqctl list_queues -q -p $V | wc -l) | |
for Q in $(./rabbitmqctl list_queues -q name messages -p $V | xargs -n2 | sort -u); do | |
echo "Vhost $V queue-name total-messages $Q" | |
done | |
done |
Author
Gsantomaggio
commented
Dec 14, 2016
import com.rabbitmq.client.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by gabriele on 26/09/2015.
*/
public class PerfMessages {
public static int randInt(int min, int max) {
Random rand = new Random();
int randomNum = rand.nextInt((max - min) + 1) + min;
return randomNum;
}
public static Connection getRandomConnectionList(List<Connection> list) {
return list.get(randInt(0, list.size() - 1));
}
public static void main(String[] args) throws Exception {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
System.out.print("V15 - Server ip (localhost):");
String server_ip = br.readLine();
if (server_ip.equalsIgnoreCase("")) {
server_ip = "localhost";
}
System.out.print("server port (5672):");
String server_port = br.readLine();
if (server_port.equalsIgnoreCase("")) server_port = "5672";
final int server_port_int = Integer.parseInt(server_port);
System.out.print("interactionMessages (5000):");
String interactionMessages_s = br.readLine();
if (interactionMessages_s.equalsIgnoreCase(""))
interactionMessages_s = "5000";
final int interactionMessages = Integer.parseInt(interactionMessages_s);
System.out.print("Thread Numbers (10):");
String threadNumbers_s = br.readLine();
if (threadNumbers_s.equalsIgnoreCase("")) threadNumbers_s = "10";
final int threadNumbers = Integer.parseInt(threadNumbers_s);
System.out.print("Connection Numbers (150):");
String connection_s = br.readLine();
if (connection_s.equalsIgnoreCase("")) connection_s = "120";
final int connections = Integer.parseInt(connection_s);
System.out.print("Number Queues (10000):");
String numberQueues_s = br.readLine();
if (numberQueues_s.equalsIgnoreCase("")) numberQueues_s = "10000";
final int numberQueues = Integer.parseInt(numberQueues_s);
System.out.print("Messages For Transaction (3):");
String messagesForTransaction_s = br.readLine();
if (messagesForTransaction_s.equalsIgnoreCase(""))
messagesForTransaction_s = "3";
final int messagesForTransaction = Integer.parseInt(messagesForTransaction_s);
System.out.print("use transaction (true):");
String use_tx_s = br.readLine();
if (use_tx_s.equalsIgnoreCase("")) use_tx_s = "true";
final boolean use_tx = Boolean.parseBoolean(use_tx_s);
System.out.print("use Lazy (true):");
String use_lazy_s = br.readLine();
if (use_lazy_s.equalsIgnoreCase("")) use_lazy_s = "true";
final boolean use_lazy = Boolean.parseBoolean(use_lazy_s);
System.out.print("Body size 10:");
String body_size_s = br.readLine();
if (body_size_s.equalsIgnoreCase("")) body_size_s = "10";
final byte[] body = new byte[Integer.parseInt(body_size_s)];
// final String body_str = r;
System.out.print("prefix (te_):");
String prefix = br.readLine();
if (prefix.equalsIgnoreCase("")) prefix = "te_";
final String q_name_prefix = prefix;
System.out.print("Consumers start delay (seconds) (999999):");
String consumers_delay_s = br.readLine();
if (consumers_delay_s.equalsIgnoreCase(""))
consumers_delay_s = "999999";
final int consumers_delay = Integer.parseInt(consumers_delay_s);
System.out.printf("Server ip: %s, Server Port : %d \n", server_ip, server_port_int);
System.out.printf("Interaction: %d, Thread Numbers: %d, Queue Numbers: %d, messages for transaction: %d, use tx: %s \n", interactionMessages, threadNumbers, numberQueues, messagesForTransaction, use_tx);
System.out.printf("Messages to send: %d \n", (interactionMessages * threadNumbers * messagesForTransaction));
System.out.printf("Body Size : %d \n", body.length);
System.out.print("Enter to start:");
br.readLine();
System.out.println("starting..");
final List<Connection> listConnections = new ArrayList<Connection>();
for (int i = 0; i < connections; i++) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(server_ip);
factory.setRequestedHeartbeat(5);
factory.setPort(server_port_int);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("test_"+i);
listConnections.add(factory.newConnection());
System.out.println("Connection done " + i);
}
System.out.println("Connections Done");
final String exName = q_name_prefix;
// final Channel channel = getRandomConnectionList(listConnections).createChannel();
ExecutorService bb = Executors.newFixedThreadPool(threadNumbers + numberQueues);
for (int i = 0; i < numberQueues; i++) {
String qname = q_name_prefix + i;
System.out.println("declare queue:" + qname);
Channel internal_channel = getRandomConnectionList(listConnections).createChannel();
internal_channel.exchangeDeclare(exName, "fanout", true);
if (use_lazy) {
Map<String, Object> argsM = new HashMap<String, Object>();
argsM.put("x-queue-mode", "lazy");
internal_channel.queueDeclare(qname, true, false, false, argsM);
} else {
// channel.queueDelete(qname);
Map<String, Object> args_q = new HashMap<String, Object>();
args_q.put("x-dead-letter-exchange", "dead_exchange");
args_q.put("x-dead-letter-routing-key", "some-routing-key");
// int ttl = 20000000 + randInt(10000,30000);
// args_q.put("x-message-ttl", ttl);
internal_channel.queueDeclare(qname, true, false, false, args_q);
}
internal_channel.queueBind(qname, exName, "");
internal_channel.close();
}
System.out.println("Queues created!!!");
Thread.sleep(5000);
ExecutorService threadChannels = Executors.newFixedThreadPool(threadNumbers + numberQueues);
final Date dThread = new Date();
final AtomicInteger conn_number = new AtomicInteger();
System.out.println("Start Publishing!!");
Thread.sleep(2000);
for (int i = 0; i < threadNumbers; i++) {
threadChannels.submit(new Runnable() {
public void run() {
try {
if (conn_number.get() >= listConnections.size())
conn_number.set(0);
System.out.println("starting");
final Channel pub_channel = getRandomConnectionList(listConnections).createChannel();
if (use_tx) pub_channel.txSelect();
// pub_channel.addConfirmListener(new ConfirmListener() {
// public void handleAck(long deliveryTag, boolean multiple) throws IOException {
//
// }
//
// public void handleNack(long deliveryTag, boolean multiple) throws IOException {
//
// }
// });
conn_number.addAndGet(1);
int count = 0;
for (int j = 0; j < interactionMessages; j++) {
for (int k = 0; k < messagesForTransaction; k++) {
pub_channel.basicPublish(exName, "", MessageProperties.PERSISTENT_BASIC, body);
count++;
if ((count % 500) == 0)
System.out.printf("messages sent %d \n", count);
//
Thread.sleep(10);
}
if (use_tx) pub_channel.txCommit();
}
Date d2 = new Date();
long seconds = (d2.getTime() - dThread.getTime()) / 1000;
System.out.println("**************************************************************");
System.out.println("" + new Date());
System.out.printf("Interaction: %d, Thread Numbers: %d, Queue Numbers: %d, messages for transaction: %d, use tx: %s \n", interactionMessages, threadNumbers, numberQueues, messagesForTransaction, use_tx);
System.out.println("Seconds: " + seconds + ", Total Messages sent:" + (count * messagesForTransaction) + ", Message size:" + body.length);
System.out.println("**************************************************************");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
System.out.println("Waiting before start consumers, Seconds:" + consumers_delay);
Thread.sleep(consumers_delay * 1000);
System.out.println("Enter to stop");
br.readLine();
for (Connection listConnection : listConnections) {
System.out.println("closing");
listConnection.close();
}
System.out.println("done");
threadChannels.shutdown();
threadChannels.awaitTermination(80, TimeUnit.SECONDS);
for (Connection listConnection : listConnections) {
listConnection.close();
}
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment