Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active June 28, 2021 01:37
Show Gist options
  • Save garyrussell/6594677 to your computer and use it in GitHub Desktop.
Save garyrussell/6594677 to your computer and use it in GitHub Desktop.
Demonstration of Synchronous Transactions with Spring-AMQP
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
* @author Gary Russell
*
*/
@Configuration
@EnableTransactionManagement
public class Config {
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setRoutingKey("txTestQ2");
rabbitTemplate.setQueue("txTestQ1");
rabbitTemplate.setMandatory(true);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory();
}
@Bean
public RabbitTransactionManager transactionManager() {
return new RabbitTransactionManager(connectionFactory());
}
@Bean
public Service service() {
// return new ServiceImpl1();
return new ServiceImpl2();
}
}
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
/**
* @author Gary Russell
*
*/
public interface Service {
void process(boolean crash) throws Exception;
}
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Gary Russell
*
*/
public class ServiceImpl1 implements Service {
@Autowired
private RabbitTemplate template;
@Transactional
public void process(boolean crash) {
Object o = template.receiveAndConvert();
if (crash) {
throw new RuntimeException("crash");
}
if (o != null) {
System.out.println(o);
template.convertAndSend(o);
}
}
}
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.annotation.Transactional;
/**
* @author Gary Russell
*
*/
public class ServiceImpl2 implements Service {
@Autowired
private RabbitTemplate template;
@Override
@Transactional(rollbackFor=Exception.class)
public void process(boolean crash) throws Exception {
Object o = template.receiveAndConvert();
if (crash) {
throw new FooEx("crash");
}
if (o != null) {
System.out.println(o);
template.convertAndSend(o);
}
}
@SuppressWarnings("serial")
private class FooEx extends Exception {
private FooEx(String message) {
super(message);
}
}
}
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package foo;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import org.junit.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;
/**
* @author Gary Russell
*
*/
public class TxTest {
@Test
public void test() {
final AbstractApplicationContext context =
new AnnotationConfigApplicationContext(Config.class);
RabbitTemplate template = context.getBean(RabbitTemplate.class);
template.convertAndSend("", "txTestQ1", "foo");
Service service = context.getBean(Service.class);
service.process(false);
Object o = template.receiveAndConvert("txTestQ1");
assertNull(o);
o = template.receiveAndConvert("txTestQ2");
assertNotNull(o);
System.out.println("message " + o + " moved from Q1 to Q2");
template.convertAndSend("", "txTestQ1", "bar");
try {
service.process(true);
}
catch (Exception e) {
System.out.println(e.getMessage());
}
o = template.receiveAndConvert("txTestQ1");
assertNotNull(o);
System.out.println("message " + o + " still in Q1");
o = template.receiveAndConvert("txTestQ2");
assertNull(o);
}
}
09:55:17.979 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Creating new transaction with name [foo.Service.process]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
09:55:17.979 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:17.979 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Created AMQP transaction on channel [Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)]
09:55:17.980 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] to thread [main]
09:55:17.980 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [foo.Service.process]
09:55:17.994 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:17.994 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:17.997 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:17.997 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
foo
09:55:17.998 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:17.998 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:17.998 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Publishing message on exchange [], routingKey = [txTestQ2]
09:55:17.999 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:17.999 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [foo.Service.process]
09:55:17.999 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Initiating transaction commit
09:55:18.123 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@fa7f9dc] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] from thread [main]
09:55:18.123 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.123 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:18.124 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.124 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.125 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:18.125 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.126 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
message foo moved from Q1 to Q2
09:55:18.126 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:18.126 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.127 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Publishing message on exchange [], routingKey = [txTestQ1]
09:55:18.242 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.242 DEBUG [main][org.springframework.beans.factory.support.DefaultListableBeanFactory] Returning cached instance of singleton bean 'transactionManager'
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Creating new transaction with name [foo.Service.process]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; ''
09:55:18.243 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Created AMQP transaction on channel [Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)]
09:55:18.243 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Bound value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] to thread [main]
09:55:18.243 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Getting transaction for [foo.Service.process]
09:55:18.243 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:18.243 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.244 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:18.244 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Retrieved value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] bound to thread [main]
09:55:18.244 TRACE [main][org.springframework.transaction.interceptor.TransactionInterceptor] Completing transaction for [foo.Service.process] after exception: java.lang.RuntimeException: crash
09:55:18.244 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Applying rules to determine whether transaction should rollback on java.lang.RuntimeException: crash
09:55:18.245 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] Winning rollback rule is: null
09:55:18.245 TRACE [main][org.springframework.transaction.interceptor.RuleBasedTransactionAttribute] No relevant rollback rule found: applying default rules
09:55:18.245 DEBUG [main][org.springframework.amqp.rabbit.transaction.RabbitTransactionManager] Initiating transaction rollback
09:55:18.245 DEBUG [main][org.springframework.amqp.rabbit.connection.RabbitResourceHolder] Rolling back messages to channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.247 TRACE [main][org.springframework.transaction.support.TransactionSynchronizationManager] Removed value [org.springframework.amqp.rabbit.connection.RabbitResourceHolder@38ffd135] for key [CachingConnectionFactory [channelCacheSize=1, host=arwen3, port=5672, active=true]] from thread [main]
09:55:18.247 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
crash
09:55:18.247 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Found cached Rabbit Channel
09:55:18.247 DEBUG [main][org.springframework.amqp.rabbit.core.RabbitTemplate] Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
09:55:18.248 TRACE [main][org.springframework.amqp.rabbit.connection.CachingConnectionFactory] Returning cached Channel: AMQChannel(amqp://guest@127.0.1.1:5672/,1)
message bar still in Q1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment