Last active
October 8, 2015 18:33
-
-
Save garyrussell/a9aa5e45e58ba4b79168 to your computer and use it in GitHub Desktop.
AsyncRabbitTemplate
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2015 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.amqp.rabbit.core; | |
import java.util.UUID; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import org.springframework.amqp.AmqpException; | |
import org.springframework.amqp.core.Message; | |
import org.springframework.amqp.core.MessageListener; | |
import org.springframework.amqp.core.MessagePostProcessor; | |
import org.springframework.amqp.rabbit.connection.ConnectionFactory; | |
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; | |
import org.springframework.context.SmartLifecycle; | |
import org.springframework.util.concurrent.ListenableFuture; | |
import org.springframework.util.concurrent.SettableListenableFuture; | |
/** | |
* @author Gary Russell | |
* @since 1.6 | |
* | |
*/ | |
public class AsyncRabbitTemplate<T> implements SmartLifecycle, MessageListener { | |
private final RabbitTemplate template; | |
private final SimpleMessageListenerContainer container; | |
private final String queue; | |
private final ConcurrentMap<String, RabbitFuture> pending = new ConcurrentHashMap<String, RabbitFuture>(); | |
private volatile boolean running; | |
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey, String queue) { | |
this.template = new RabbitTemplate(connectionFactory); | |
this.template.setExchange(exchange); | |
this.template.setRoutingKey(routingKey); | |
this.container = new SimpleMessageListenerContainer(connectionFactory); | |
this.container.setQueueNames(queue); | |
this.container.setMessageListener(this); | |
this.container.afterPropertiesSet(); | |
this.queue = queue; | |
} | |
public AsyncRabbitTemplate(RabbitTemplate template, SimpleMessageListenerContainer container) { | |
this.template = template; | |
this.container = container; | |
this.container.setMessageListener(this); | |
this.queue = container.getQueueNames()[0]; | |
} | |
public ListenableFuture<T> convertSendAndReceive(Object data) { | |
final String correlationId = UUID.randomUUID().toString(); | |
this.template.convertAndSend(data, new MessagePostProcessor() { | |
@Override | |
public Message postProcessMessage(Message message) throws AmqpException { | |
message.getMessageProperties().setCorrelationId(correlationId.getBytes()); | |
message.getMessageProperties().setReplyTo(queue); | |
return message; | |
} | |
}); | |
RabbitFuture future = new RabbitFuture(correlationId); | |
this.pending.put(correlationId, future); | |
return future; | |
} | |
@Override | |
public void start() { | |
if (!this.running) { | |
this.container.start(); | |
} | |
this.running = true; | |
} | |
@Override | |
public void stop() { | |
if (this.running) { | |
this.container.stop(); | |
} | |
this.running = false; | |
} | |
@Override | |
public boolean isRunning() { | |
return this.running; | |
} | |
@Override | |
public int getPhase() { | |
return 0; | |
} | |
@Override | |
public boolean isAutoStartup() { | |
return true; | |
} | |
@Override | |
public void stop(Runnable callback) { | |
stop(); | |
callback.run(); | |
} | |
@Override | |
public void onMessage(Message message) { | |
String correlationId = new String(message.getMessageProperties().getCorrelationId()); | |
if (correlationId != null) { | |
RabbitFuture future = this.pending.remove(correlationId); | |
if (future != null) { | |
@SuppressWarnings("unchecked") | |
T converted = (T) this.template.getMessageConverter().fromMessage(message); | |
future.set(converted); | |
} | |
} | |
} | |
public class RabbitFuture extends SettableListenableFuture<T> { | |
private final String correlationId; | |
public RabbitFuture(String correlationId) { | |
this.correlationId = correlationId; | |
} | |
@Override | |
public boolean cancel(boolean mayInterruptIfRunning) { | |
boolean cancelled = super.cancel(mayInterruptIfRunning); | |
pending.remove(this.correlationId); | |
return cancelled; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2015 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.amqp.rabbit.core; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertTrue; | |
import java.util.Map; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicReference; | |
import org.junit.Test; | |
import org.junit.runner.RunWith; | |
import org.springframework.amqp.core.AnonymousQueue; | |
import org.springframework.amqp.core.Queue; | |
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | |
import org.springframework.amqp.rabbit.connection.ConnectionFactory; | |
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; | |
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; | |
import org.springframework.amqp.utils.test.TestUtils; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.context.annotation.Primary; | |
import org.springframework.test.context.ContextConfiguration; | |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; | |
import org.springframework.util.concurrent.ListenableFuture; | |
import org.springframework.util.concurrent.ListenableFutureCallback; | |
/** | |
* @author Gary Russell | |
* @since 1.6 | |
* | |
*/ | |
@ContextConfiguration | |
@RunWith(SpringJUnit4ClassRunner.class) | |
public class AsyncRabbitTemplateTests { | |
@Autowired | |
private AsyncRabbitTemplate<String> template; | |
@Test | |
public void testOK() throws Exception { | |
ListenableFuture<String> future = this.template.convertSendAndReceive("foo"); | |
final CountDownLatch latch = new CountDownLatch(1); | |
final AtomicReference<String> resultRef = new AtomicReference<String>(); | |
future.addCallback(new ListenableFutureCallback<String>() { | |
@Override | |
public void onSuccess(String result) { | |
resultRef.set(result); | |
latch.countDown(); | |
} | |
@Override | |
public void onFailure(Throwable ex) { | |
latch.countDown(); | |
} | |
}); | |
assertTrue(latch.await(10, TimeUnit.SECONDS)); | |
assertEquals("FOO", resultRef.get()); | |
} | |
@Test | |
public void testCancel() throws Exception { | |
ListenableFuture<String> future = this.template.convertSendAndReceive("foo"); | |
future.cancel(false); | |
assertEquals(0, TestUtils.getPropertyValue(template, "pending", Map.class).size()); | |
} | |
@Configuration | |
public static class Config { | |
@Bean | |
public ConnectionFactory connectionFactory() { | |
return new CachingConnectionFactory("localhost"); | |
} | |
@Bean | |
public Queue requests() { | |
return new AnonymousQueue(); | |
} | |
@Bean | |
public Queue replies() { | |
return new AnonymousQueue(); | |
} | |
@Bean | |
public RabbitAdmin admin(ConnectionFactory connectionFactory) { | |
return new RabbitAdmin(connectionFactory); | |
} | |
@Bean | |
public RabbitTemplate template(ConnectionFactory connectionFactory) { | |
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); | |
rabbitTemplate.setRoutingKey(requests().getName()); | |
return rabbitTemplate; | |
} | |
@Bean | |
@Primary | |
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) { | |
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); | |
container.setQueueNames(replies().getName()); | |
return container; | |
} | |
@Bean | |
public AsyncRabbitTemplate<String> asyncTemplate(RabbitTemplate template, | |
SimpleMessageListenerContainer container) { | |
return new AsyncRabbitTemplate<String>(template, container); | |
} | |
@Bean | |
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) { | |
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); | |
container.setQueueNames(requests().getName()); | |
container.setMessageListener(new MessageListenerAdapter(new Object() { | |
@SuppressWarnings("unused") | |
public String handleMessage(String message) { | |
return message.toUpperCase(); | |
} | |
})); | |
return container; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment