Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Created July 16, 2015 21:24
Show Gist options
  • Save garyrussell/306710cf8bd3c6ae4e6b to your computer and use it in GitHub Desktop.
Save garyrussell/306710cf8bd3c6ae4e6b to your computer and use it in GitHub Desktop.
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.test;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/**
* @author Gary Russell
* @since 1.5
*
*/
public class DirectReplyTests {
@Test
public void testRaw() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("localhost");
Connection conn = cf.newConnection();
Channel channel1 = conn.createChannel();
class MyConsumer extends DefaultConsumer {
private int i;
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) {
try {
String replyTo = properties.getReplyTo();
// System.out.println(++i + " " + replyTo);
getChannel().basicPublish("", replyTo, properties, ("reply" + i).getBytes());
}
catch (IOException e) {
e.printStackTrace();
}
}
}
DeclareOk declareOk = channel1.queueDeclare();
MyConsumer consumer = new MyConsumer(channel1);
channel1.basicConsume(declareOk.getQueue(), true, consumer);
Channel channel2 = conn.createChannel();
BasicProperties props = new BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build();
for (int i = 0; i < 100000; i++) {
final CountDownLatch latch = new CountDownLatch(1);
String tag = channel2.basicConsume("amq.rabbitmq.reply-to", true, new DefaultConsumer(channel2) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
latch.countDown();
}
});
channel2.basicPublish("", declareOk.getQueue(), props, "request".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
channel2.basicCancel(tag);
}
}
@Test
public void testWithSpring() throws Exception {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
Channel channel = cf.createConnection().createChannel(false);
final DeclareOk declareOk = channel.queueDeclare();
channel.close();
container.setQueueNames(declareOk.getQueue());
MessageListenerAdapter mla = new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public byte[] handleMessage(byte[] in) {
return "reply".getBytes();
}
});
container.setMessageListener(mla);
container.afterPropertiesSet();
container.start();
RabbitTemplate template = new RabbitTemplate(cf);
template.afterPropertiesSet();
final BasicProperties props = new BasicProperties.Builder().replyTo("amq.rabbitmq.reply-to").build();
for (int i = 0; i < 100000; i++) {
template.execute(new ChannelCallback<Void>() {
@Override
public Void doInRabbit(Channel channel) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
String tag = channel.basicConsume("amq.rabbitmq.reply-to", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
latch.countDown();
}
});
channel.basicPublish("", declareOk.getQueue(), props, "request".getBytes());
assertTrue(latch.await(10, TimeUnit.SECONDS));
channel.basicCancel(tag);
return null;
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment