Created
September 28, 2010 16:42
-
-
Save dsyer/601330 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.springframework.amqp.rabbit.core; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertNotNull; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.junit.Assume; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.ExpectedException; | |
import org.springframework.amqp.core.BindingBuilder; | |
import org.springframework.amqp.core.Message; | |
import org.springframework.amqp.core.MessageProperties; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.amqp.core.TopicExchange; | |
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory; | |
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer; | |
import org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.Delivery; | |
import org.springframework.amqp.rabbit.support.RabbitUtils; | |
import org.springframework.amqp.support.converter.SimpleMessageConverter; | |
import com.rabbitmq.client.Channel; | |
public class RabbitBindingIntegrationTests { | |
private static Log logger = LogFactory.getLog(RabbitBindingIntegrationTests.class); | |
private static boolean brokerOnline = true; // assume true to start with | |
private Queue queue; | |
private SingleConnectionFactory connectionFactory = new SingleConnectionFactory(); | |
private RabbitTemplate template = new RabbitTemplate(connectionFactory); | |
@Rule | |
public ExpectedException exception = ExpectedException.none(); | |
@Before | |
public void checkBrokerIsRunning() { | |
// If the Rabbit broker is not running in the background all the tests here will simply be skipped. | |
Assume.assumeTrue(brokerOnline); | |
try { | |
RabbitAdmin admin = new RabbitAdmin(template); | |
// Idempotent, so no problem to do this for every test if the broker is there | |
queue = admin.declareQueue(); | |
} catch (Exception e) { | |
logger.warn("Not executing tests because basic connectivity test failed", e); | |
brokerOnline = false; | |
Assume.assumeNoException(e); | |
} | |
} | |
@Test | |
public void testSendAndReceiveWithTopicSingleChannel() throws Exception { | |
final RabbitAdmin admin = new RabbitAdmin(template); | |
final TopicExchange exchange = new TopicExchange("topic"); | |
admin.declareExchange(exchange); | |
template.setExchange(exchange.getName()); | |
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); | |
template.execute(new ChannelCallback<Void>() { | |
@Override | |
public Void doInRabbit(Channel channel) throws Exception { | |
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); | |
String tag = channel.basicConsume(queue.getName(), true, consumer); | |
assertNotNull(tag); | |
template.convertAndSend("foo", "message"); | |
String result = getResult(consumer); | |
assertEquals(null, result); | |
template.convertAndSend("foo.end", "message"); | |
result = getResult(consumer); | |
assertEquals("message", result); | |
return null; | |
} | |
}); | |
} | |
@Test | |
public void testSendAndReceiveWithTopicTwoChannels() throws Exception { | |
final RabbitAdmin admin = new RabbitAdmin(template); | |
final TopicExchange exchange = new TopicExchange("topic"); | |
admin.declareExchange(exchange); | |
template.setExchange(exchange.getName()); | |
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); | |
template.execute(new ChannelCallback<Void>() { | |
@Override | |
public Void doInRabbit(Channel channel) throws Exception { | |
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); | |
String tag = channel.basicConsume(queue.getName(), true, consumer); | |
assertNotNull(tag); | |
try { | |
template.convertAndSend("foo", "message"); | |
String result = getResult(consumer); | |
assertEquals(null, result); | |
} finally { | |
channel.basicCancel(tag); | |
} | |
return null; | |
} | |
}); | |
// The queue is shutdown by the previous block, so we have to rebind | |
// TODO: figure out why | |
queue = admin.declareQueue(); | |
admin.declareBinding(BindingBuilder.from(queue).to(exchange).with("*.end")); | |
template.execute(new ChannelCallback<Void>() { | |
@Override | |
public Void doInRabbit(Channel channel) throws Exception { | |
BlockingQueueConsumer consumer = new BlockingQueueConsumer(channel); | |
String tag = channel.basicConsume(queue.getName(), true, consumer); | |
assertNotNull(tag); | |
try { | |
template.convertAndSend("foo.end", "message"); | |
String result = getResult(consumer); | |
assertEquals("message", result); | |
} finally { | |
channel.basicCancel(tag); | |
} | |
return null; | |
} | |
}); | |
} | |
private String getResult(final BlockingQueueConsumer consumer) throws InterruptedException { | |
Delivery response = consumer.nextDelivery(1000L); | |
if (response == null) { | |
return null; | |
} | |
MessageProperties messageProps = RabbitUtils.createMessageProperties(response.getProperties(), response | |
.getEnvelope(), "UTF-8"); | |
return (String) new SimpleMessageConverter().fromMessage(new Message(response.getBody(), messageProps)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment