Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Created June 8, 2017 17:22
Show Gist options
  • Save garyrussell/7af1dc149ffbf64b5c245e019dd4faf1 to your computer and use it in GitHub Desktop.
Save garyrussell/7af1dc149ffbf64b5c245e019dd4faf1 to your computer and use it in GitHub Desktop.
Spring JMS RetryingMessageListener

Demonstrates how to use spring-retry with state to retry message delivery and publish failures to a DLQ.

/*
* Copyright 2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.example;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.JmsException;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.policy.SoftReferenceMapRetryContextCache;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
/**
* @author Gary Russell
*/
@SpringBootApplication
public class JmsRetryApplication implements CommandLineRunner {
private final static Logger logger = LoggerFactory.getLogger(JmsRetryApplication.class);
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(JmsRetryApplication.class, args);
context.close();
}
@Autowired
private JmsTemplate template;
private final CountDownLatch latch = new CountDownLatch(4);
@Override
public void run(String... arg0) throws Exception {
this.template.convertAndSend("foo", "foobar");
this.template.convertAndSend("foo", "foobarToFail", m -> {
m.setStringProperty("fail", "yes");
return m;
});
this.latch.await(10, TimeUnit.SECONDS);
Thread.sleep(5000);
}
@Bean
public MessageListener myListener() {
return m -> {
logger.info("received: " + m);
try {
if (m.getStringProperty("fail") != null) {
throw new RuntimeException("failed");
}
}
catch (JmsException | JMSException e) {
e.printStackTrace();
}
finally {
this.latch.countDown();
}
};
}
@Bean
public DefaultMessageListenerContainer container(ConnectionFactory connectionFactory, JmsTemplate template) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("foo");
container.setSessionTransacted(true);
container.setMessageListener(new RetryingListener(myListener(), template));
return container;
}
public static class RetryingListener implements MessageListener {
private final static Logger logger = LoggerFactory.getLogger(RetryingListener.class);
private final RetryTemplate retryTemplate = new RetryTemplate();
private final MessageListener delegate;
private final JmsTemplate jmsTemplate;
public RetryingListener(MessageListener delegate, JmsTemplate template) {
this.delegate = delegate;
SimpleRetryPolicy policy = new SimpleRetryPolicy();
policy.setMaxAttempts(3);
this.retryTemplate.setRetryPolicy(policy);
this.retryTemplate.setRetryContextCache(new SoftReferenceMapRetryContextCache());
this.jmsTemplate = template;
}
@Override
public void onMessage(Message message) {
String jmsMessageID = null;
try {
jmsMessageID = message.getJMSMessageID();
}
catch (JMSException e) {
e.printStackTrace();
}
this.retryTemplate.execute(new RetryCallback<Void, RuntimeException>() {
@Override
public Void doWithRetry(RetryContext context) throws RuntimeException {
context.setAttribute("message", message);
RetryingListener.this.delegate.onMessage(message);
return null;
}
}, new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext context) throws Exception {
final TextMessage message = (TextMessage) context.getAttribute("message");
RetryingListener.this.jmsTemplate.send("foo.dlq", s -> {
Message m = s.createTextMessage(message.getText());
// copy other headers from original as needed
m.setStringProperty("exception", context.getLastThrowable().getMessage());
logger.info("sent to dlq:" + m);
return m;
});
return null;
}
}, new DefaultRetryState(jmsMessageID, false, null));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment