Skip to content

Instantly share code, notes, and snippets.

@tjsnell
Created May 22, 2012 13:14
Show Gist options
  • Save tjsnell/2768971 to your computer and use it in GitHub Desktop.
Save tjsnell/2768971 to your computer and use it in GitHub Desktop.
String body = exchange.getIn().getBody(String.class);
LOG.trace("Sending request [{}] from exchange [{}]...", body, exchange);
RabitMQConfiguration config = endpoint.getConfiguration();
createChannel();
String routingKey = getRoutingKey(exchange.getIn(), config);
if (!(routingKey != null && !config.getQueue().isEmpty())) {
configureChannel(config, channel);
}
if (config.isRpc()) {
String replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
UuidGenerator uuidGenerator = getEndpoint().getCamelContext().getUuidGenerator();
String corrId = uuidGenerator.generateUuid();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish(config.getExchange(), routingKey, props, body.getBytes());
String response;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
exchange.getOut().setBody(response);
break;
}
}
} else {
channel.basicPublish(config.getExchange(), routingKey, config.getMessageProperties(), body.getBytes());
}
Message message = getMessageForResponse(exchange);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment