Last active
September 15, 2021 11:21
-
-
Save devtdeng/dc45dde0face1f36e6a114a3024c1e91 to your computer and use it in GitHub Desktop.
RabbitMQ Java Client Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package dev.tdeng.onlinechat; | |
import java.io.IOException; | |
import com.rabbitmq.client.AMQP; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import com.rabbitmq.client.Consumer; | |
import com.rabbitmq.client.DefaultConsumer; | |
import com.rabbitmq.client.Envelope; | |
import com.rabbitmq.client.MessageProperties; | |
public class TestAMQP { | |
// private final static String QUEUE_NAME = "test-queue"; | |
private final static String EXCHANGE_NAME = "test-exchange"; | |
private final static String HOST_NAME = "172.16.80.182"; | |
// private final static String[] HOST_NAME = {"172.16.80.182", "172.16.80.183"} | |
private final static String USER_NAME = "test"; | |
private final static String PASSWORD = "test"; | |
// exchange type: direct and Topic, routing keys for publisher to send test message | |
private final String[] routingKeyDirect = {"error", "debug", "info"}; | |
private final String[] routingKeyTopic = {"error.app1", "error.app2", "debug.app1", "debug.app2", "info.app1", "info.app2"}; | |
protected Connection connect() { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost(HOST_NAME); | |
factory.setUsername(USER_NAME); | |
factory.setPassword(PASSWORD); | |
Connection connection = null; | |
try { | |
connection = factory.newConnection(); | |
} catch (Exception e){ | |
} | |
return connection; | |
} | |
// exchange type: fan, direct, topic | |
// fan - no routing key, exchange to all queues | |
// direct - simple routing key, exchange to binding queues. | |
// topic - routing key with * and #, complex binding queues | |
public Thread send(final String type, final String msg, final int count) { | |
Thread t = new Thread(new Runnable() { | |
private Connection connection = null; | |
private Channel channel = null; | |
public void run() { | |
try { | |
connection = connect(); | |
channel = connection.createChannel(); | |
// with exchange, can use binding key | |
channel.exchangeDeclare(EXCHANGE_NAME, type); | |
int i = 0; | |
String[] routingKey = type.equals("direct") ? routingKeyDirect : routingKeyTopic; | |
while(!Thread.currentThread().isInterrupted()) { | |
if (i++ >= count) break; | |
String newMsg = msg + ": " + i; | |
// no exchange | |
// channel.basicPublish("", QUEUE_NAME, null, newMsg.getBytes()); | |
// with exchange | |
// MessageProperties.PERSISTENT_TEXT_PLAIN tell RabbitMQ to save message on disk | |
channel.basicPublish(EXCHANGE_NAME, routingKey[i%routingKey.length], MessageProperties.PERSISTENT_TEXT_PLAIN, newMsg.getBytes()); | |
System.out.println("Sent " + routingKey[i%routingKey.length] + ": '" + newMsg + "'"); | |
Thread.sleep(1000); | |
} | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} catch (Exception e) { | |
} finally { | |
try { | |
channel.close(); | |
connection.close(); | |
} catch (Exception e) { | |
} | |
System.out.println("Publisher thread exists!"); | |
} | |
} | |
}); | |
t.start(); | |
return t; | |
} | |
public Thread receive(final String type, final String name, final String bindingKey) { | |
Thread t = new Thread(new Runnable() { | |
private Connection connection = null; | |
private Channel channel = null; | |
private String queueName = null; | |
public void run() { | |
try { | |
connection = connect(); | |
channel = connection.createChannel(); | |
// without exchange | |
// channel.queueDeclare(QUEUE_NAME, false, false, false, null); | |
// with exchange, create queue for each subscriber and bind to exchange | |
// direct supports routing key, fanout doesn't support | |
channel.exchangeDeclare(EXCHANGE_NAME, type); | |
//String queueName = channel.queueDeclare().getQueue(); | |
// When RabbitMQ quits or crashes it will forget the queues and messages | |
// unless set durable = true | |
boolean durable = true; | |
queueName = channel.queueDeclare("", durable, false, false, null).getQueue(); | |
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); | |
Consumer consumer = new DefaultConsumer(channel) { | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, | |
AMQP.BasicProperties properties, byte[] body) | |
throws IOException { | |
String message = new String(body, "UTF-8"); | |
System.out.println(name + " Received " + envelope.getRoutingKey() + ": '" + message + "'"); | |
} | |
}; | |
// auto acknowledgment is true | |
// if false, will result in messages_unacknowledged | |
// `rabbitmqctl list_queues name messages_ready messages_unacknowledged` | |
channel.basicConsume(queueName, true, consumer); | |
while (!Thread.currentThread().isInterrupted()) { Thread.sleep(500);} | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} catch (Exception e) { | |
} | |
finally { | |
try { | |
channel.queueDelete(queueName); | |
channel.close(); | |
connection.close(); | |
} catch (Exception e) { | |
} | |
System.out.println(name + " thread exists!"); | |
} | |
} | |
}); | |
t.start(); | |
return t; | |
} | |
public static void test() { | |
try{ | |
TestAMQP q = new TestAMQP(); | |
// Thread publisher = q.send("direct", "test message", 100); | |
// Thread subscriber1 = q.receive("direct", "subscriber1", "error"); | |
// Thread subscriber2 = q.receive("direct", "subscriber2", "debug"); | |
// Thread subscriber3 = q.receive("direct", "subscriber3", "info"); | |
Thread publisher = q.send("topic", "test message", 100); | |
Thread subscriber1 = q.receive("topic", "subscriber1", "error.*"); | |
Thread subscriber2 = q.receive("topic", "subscriber2", "*.app1"); | |
Thread subscriber3 = q.receive("topic", "subscriber3", "info.app2"); | |
System.in.read(); | |
publisher.interrupt(); | |
subscriber1.interrupt(); | |
subscriber2.interrupt(); | |
subscriber3.interrupt(); | |
publisher.join(); | |
subscriber1.join(); | |
subscriber2.join(); | |
subscriber3.join(); | |
System.out.println("all thread joined!"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment