Skip to content

Instantly share code, notes, and snippets.

@dsyer
Created September 28, 2010 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dsyer/601330 to your computer and use it in GitHub Desktop.
Save dsyer/601330 to your computer and use it in GitHub Desktop.
package org.springframework.amqp.rabbit.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer;
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.Delivery;
import org.springframework.amqp.rabbit.support.RabbitUtils;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import com.rabbitmq.client.Channel;
public class RabbitBindingIntegrationTests {
private static Log logger = LogFactory.getLog(RabbitBindingIntegrationTests.class);
private static boolean brokerOnline = true; // assume true to start with
private Queue queue;
private SingleConnectionFactory connectionFactory = new SingleConnectionFactory();
private RabbitTemplate template = new RabbitTemplate(connectionFactory);
@Rule
public ExpectedException exception = ExpectedException.none();
@Before
public void checkBrokerIsRunning() {
// If the Rabbit broker is not running in the background all the tests here will simply be skipped.
Assume.assumeTrue(brokerOnline);
try {
RabbitAdmin admin = new RabbitAdmin(template);
// Idempotent, so no problem to do this for every test if the broker is there
queue = admin.declareQueue();
} catch (Exception e) {
logger.warn("Not executing tests because basic connectivity test failed", e);
brokerOnline = false;
Assume.assumeNoException(e);
}
}
@Test
public void testSendAndReceiveWithTopicSingleChannel() throws Exception {
final RabbitAdmin admin = new RabbitAdmin(template);
final TopicExchange exchange = new TopicExchange("topic");
admin.declareExchange(exchange);
template.setExchange(exchange.getName());
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));
template.execute(new ChannelCallback<Void>() {
@Override
public Void doInRabbit(Channel channel) throws Exception {
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);
template.convertAndSend("foo", "message");
String result = getResult(consumer);
assertEquals(null, result);
template.convertAndSend("foo.end", "message");
result = getResult(consumer);
assertEquals("message", result);
return null;
}
});
}
@Test
public void testSendAndReceiveWithTopicTwoChannels() throws Exception {
final RabbitAdmin admin = new RabbitAdmin(template);
final TopicExchange exchange = new TopicExchange("topic");
admin.declareExchange(exchange);
template.setExchange(exchange.getName());
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));
template.execute(new ChannelCallback<Void>() {
@Override
public Void doInRabbit(Channel channel) throws Exception {
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);
try {
template.convertAndSend("foo", "message");
String result = getResult(consumer);
assertEquals(null, result);
} finally {
channel.basicCancel(tag);
}
return null;
}
});
// The queue is shutdown by the previous block, so we have to rebind
// TODO: figure out why
queue = admin.declareQueue();
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end"));
template.execute(new ChannelCallback<Void>() {
@Override
public Void doInRabbit(Channel channel) throws Exception {
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel);
String tag = channel.basicConsume(queue.getName(), true, consumer);
assertNotNull(tag);
try {
template.convertAndSend("foo.end", "message");
String result = getResult(consumer);
assertEquals("message", result);
} finally {
channel.basicCancel(tag);
}
return null;
}
});
}
private String getResult(final BlockingQueueConsumer consumer) throws InterruptedException {
Delivery response = consumer.nextDelivery(1000L);
if (response == null) {
return null;
}
MessageProperties messageProps = RabbitUtils.createMessageProperties(response.getProperties(), response
.getEnvelope(), "UTF-8");
return (String) new SimpleMessageConverter().fromMessage(new Message(response.getBody(), messageProps));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment