Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active October 8, 2015 18:33
Show Gist options
  • Save garyrussell/a9aa5e45e58ba4b79168 to your computer and use it in GitHub Desktop.
Save garyrussell/a9aa5e45e58ba4b79168 to your computer and use it in GitHub Desktop.
AsyncRabbitTemplate
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.core;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SettableListenableFuture;
/**
* @author Gary Russell
* @since 1.6
*
*/
public class AsyncRabbitTemplate<T> implements SmartLifecycle, MessageListener {
private final RabbitTemplate template;
private final SimpleMessageListenerContainer container;
private final String queue;
private final ConcurrentMap<String, RabbitFuture> pending = new ConcurrentHashMap<String, RabbitFuture>();
private volatile boolean running;
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String queue) {
this.template = new RabbitTemplate(connectionFactory);
this.template.setExchange(exchange);
this.template.setRoutingKey(routingKey);
this.container = new SimpleMessageListenerContainer(connectionFactory);
this.container.setQueueNames(queue);
this.container.setMessageListener(this);
this.container.afterPropertiesSet();
this.queue = queue;
}
public AsyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) {
this.template = template;
this.container = container;
this.container.setMessageListener(this);
this.queue = container.getQueueNames()[0];
}
public ListenableFuture<T> convertSendAndReceive(Object data) {
final String correlationId = UUID.randomUUID().toString();
this.template.convertAndSend(data, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId(correlationId.getBytes());
message.getMessageProperties().setReplyTo(queue);
return message;
}
});
RabbitFuture future = new RabbitFuture(correlationId);
this.pending.put(correlationId, future);
return future;
}
@Override
public void start() {
if (!this.running) {
this.container.start();
}
this.running = true;
}
@Override
public void stop() {
if (this.running) {
this.container.stop();
}
this.running = false;
}
@Override
public boolean isRunning() {
return this.running;
}
@Override
public int getPhase() {
return 0;
}
@Override
public boolean isAutoStartup() {
return true;
}
@Override
public void stop(Runnable callback) {
stop();
callback.run();
}
@Override
public void onMessage(Message message) {
String correlationId = new String(message.getMessageProperties().getCorrelationId());
if (correlationId != null) {
RabbitFuture future = this.pending.remove(correlationId);
if (future != null) {
@SuppressWarnings("unchecked")
T converted = (T) this.template.getMessageConverter().fromMessage(message);
future.set(converted);
}
}
}
public class RabbitFuture extends SettableListenableFuture<T> {
private final String correlationId;
public RabbitFuture(String correlationId) {
this.correlationId = correlationId;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
pending.remove(this.correlationId);
return cancelled;
}
}
}
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.amqp.rabbit.core;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AnonymousQueue;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* @author Gary Russell
* @since 1.6
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
public class AsyncRabbitTemplateTests {
@Autowired
private AsyncRabbitTemplate<String> template;
@Test
public void testOK() throws Exception {
ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<String> resultRef = new AtomicReference<String>();
future.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
resultRef.set(result);
latch.countDown();
}
@Override
public void onFailure(Throwable ex) {
latch.countDown();
}
});
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals("FOO", resultRef.get());
}
@Test
public void testCancel() throws Exception {
ListenableFuture<String> future = this.template.convertSendAndReceive("foo");
future.cancel(false);
assertEquals(0, TestUtils.getPropertyValue(template, "pending", Map.class).size());
}
@Configuration
public static class Config {
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public Queue requests() {
return new AnonymousQueue();
}
@Bean
public Queue replies() {
return new AnonymousQueue();
}
@Bean
public RabbitAdmin admin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate template(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setRoutingKey(requests().getName());
return rabbitTemplate;
}
@Bean
@Primary
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(replies().getName());
return container;
}
@Bean
public AsyncRabbitTemplate<String> asyncTemplate(RabbitTemplate template,
SimpleMessageListenerContainer container) {
return new AsyncRabbitTemplate<String>(template, container);
}
@Bean
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(requests().getName());
container.setMessageListener(new MessageListenerAdapter(new Object() {
@SuppressWarnings("unused")
public String handleMessage(String message) {
return message.toUpperCase();
}
}));
return container;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment