Skip to content

Instantly share code, notes, and snippets.

@devtdeng
Last active September 15, 2021 11:21
Show Gist options
  • Save devtdeng/dc45dde0face1f36e6a114a3024c1e91 to your computer and use it in GitHub Desktop.
Save devtdeng/dc45dde0face1f36e6a114a3024c1e91 to your computer and use it in GitHub Desktop.
RabbitMQ Java Client Sample
package dev.tdeng.onlinechat;
import java.io.IOException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.MessageProperties;
public class TestAMQP {
// private final static String QUEUE_NAME = "test-queue";
private final static String EXCHANGE_NAME = "test-exchange";
private final static String HOST_NAME = "172.16.80.182";
// private final static String[] HOST_NAME = {"172.16.80.182", "172.16.80.183"}
private final static String USER_NAME = "test";
private final static String PASSWORD = "test";
// exchange type: direct and Topic, routing keys for publisher to send test message
private final String[] routingKeyDirect = {"error", "debug", "info"};
private final String[] routingKeyTopic = {"error.app1", "error.app2", "debug.app1", "debug.app2", "info.app1", "info.app2"};
protected Connection connect() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
factory.setUsername(USER_NAME);
factory.setPassword(PASSWORD);
Connection connection = null;
try {
connection = factory.newConnection();
} catch (Exception e){
}
return connection;
}
// exchange type: fan, direct, topic
// fan - no routing key, exchange to all queues
// direct - simple routing key, exchange to binding queues.
// topic - routing key with * and #, complex binding queues
public Thread send(final String type, final String msg, final int count) {
Thread t = new Thread(new Runnable() {
private Connection connection = null;
private Channel channel = null;
public void run() {
try {
connection = connect();
channel = connection.createChannel();
// with exchange, can use binding key
channel.exchangeDeclare(EXCHANGE_NAME, type);
int i = 0;
String[] routingKey = type.equals("direct") ? routingKeyDirect : routingKeyTopic;
while(!Thread.currentThread().isInterrupted()) {
if (i++ >= count) break;
String newMsg = msg + ": " + i;
// no exchange
// channel.basicPublish("", QUEUE_NAME, null, newMsg.getBytes());
// with exchange
// MessageProperties.PERSISTENT_TEXT_PLAIN tell RabbitMQ to save message on disk
channel.basicPublish(EXCHANGE_NAME, routingKey[i%routingKey.length], MessageProperties.PERSISTENT_TEXT_PLAIN, newMsg.getBytes());
System.out.println("Sent " + routingKey[i%routingKey.length] + ": '" + newMsg + "'");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
} finally {
try {
channel.close();
connection.close();
} catch (Exception e) {
}
System.out.println("Publisher thread exists!");
}
}
});
t.start();
return t;
}
public Thread receive(final String type, final String name, final String bindingKey) {
Thread t = new Thread(new Runnable() {
private Connection connection = null;
private Channel channel = null;
private String queueName = null;
public void run() {
try {
connection = connect();
channel = connection.createChannel();
// without exchange
// channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// with exchange, create queue for each subscriber and bind to exchange
// direct supports routing key, fanout doesn't support
channel.exchangeDeclare(EXCHANGE_NAME, type);
//String queueName = channel.queueDeclare().getQueue();
// When RabbitMQ quits or crashes it will forget the queues and messages
// unless set durable = true
boolean durable = true;
queueName = channel.queueDeclare("", durable, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(name + " Received " + envelope.getRoutingKey() + ": '" + message + "'");
}
};
// auto acknowledgment is true
// if false, will result in messages_unacknowledged
// `rabbitmqctl list_queues name messages_ready messages_unacknowledged`
channel.basicConsume(queueName, true, consumer);
while (!Thread.currentThread().isInterrupted()) { Thread.sleep(500);}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
}
finally {
try {
channel.queueDelete(queueName);
channel.close();
connection.close();
} catch (Exception e) {
}
System.out.println(name + " thread exists!");
}
}
});
t.start();
return t;
}
public static void test() {
try{
TestAMQP q = new TestAMQP();
// Thread publisher = q.send("direct", "test message", 100);
// Thread subscriber1 = q.receive("direct", "subscriber1", "error");
// Thread subscriber2 = q.receive("direct", "subscriber2", "debug");
// Thread subscriber3 = q.receive("direct", "subscriber3", "info");
Thread publisher = q.send("topic", "test message", 100);
Thread subscriber1 = q.receive("topic", "subscriber1", "error.*");
Thread subscriber2 = q.receive("topic", "subscriber2", "*.app1");
Thread subscriber3 = q.receive("topic", "subscriber3", "info.app2");
System.in.read();
publisher.interrupt();
subscriber1.interrupt();
subscriber2.interrupt();
subscriber3.interrupt();
publisher.join();
subscriber1.join();
subscriber2.join();
subscriber3.join();
System.out.println("all thread joined!");
} catch (Exception e) {
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment