Skip to content

Instantly share code, notes, and snippets.

@unixc3t
Created October 6, 2017 03:20
Show Gist options
  • Save unixc3t/9e102b9a863c08f862e2e90ce8662422 to your computer and use it in GitHub Desktop.
Save unixc3t/9e102b9a863c08f862e2e90ce8662422 to your computer and use it in GitHub Desktop.
Request-reply communication
package rr;
import com.rabbitmq.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
/**
* rudy
* 10:10 PM
* 10/5/17.
*/
public class RequestReceiver {
private static final String DEFAULT_QUEUE = "";
private static final String REQUEST_QUEUE = "request_queue";
private final static Logger LOGGER = LoggerFactory.getLogger(RequestReceiver.class);
private Connection connection = null;
private Channel channel = null;
public void initialize() {
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void receive() {
final RequestReceiver requestReceiver = this;
if (channel == null) {
initialize();
}
try {
channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
String message = null;
try {
message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
} catch (UnsupportedEncodingException e) {
LOGGER.error(e.getMessage(), e);
}
if (properties != null) {
AMQP.BasicProperties amqpProps = new AMQP.BasicProperties();
amqpProps = amqpProps.builder().correlationId(
String.valueOf(properties.getCorrelationId())).build();
try {
channel.basicPublish(DEFAULT_QUEUE, properties.getReplyTo(), amqpProps, "Response message.".getBytes());
requestReceiver.destroy();
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
} else {
LOGGER.warn("Cannot determine response destination for message. ");
}
}
};
channel.basicConsume(REQUEST_QUEUE, true, consumer);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
}
}
public void destroy() {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
LOGGER.warn(e.getMessage(), e);
}
}
}
}
package rr;
/**
* rudy
* 9:57 AM
* 10/6/17.
*/
public class RequestReceiverDemo {
public static void main(String[] args) throws InterruptedException {
final RequestReceiver receiver = new RequestReceiver();
receiver.initialize();
receiver.receive();
}
}
package rr;
import ptop.Sender;
/**
* rudy
* 9:42 AM
* 10/6/17.
*/
public class RequestSenderDemo {
private static final String REQUEST_QUEUE = "request_queue";
public static void sendToRequestReplyQueue() {
Sender sender = new Sender();
sender.initialize();
sender.sendRequest(REQUEST_QUEUE, "Test message.", "MSG1");
sender.waitForResponse("MSG1");
}
public static void main(String[] args) {
sendToRequestReplyQueue();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment