Skip to content

Instantly share code, notes, and snippets.

@hungbang
Forked from tyb/atomikos_jms.java
Created February 28, 2022 07:15
Show Gist options
  • Save hungbang/8e7cec87f3dbd56d3cb25db874bc48e8 to your computer and use it in GitHub Desktop.
Save hungbang/8e7cec87f3dbd56d3cb25db874bc48e8 to your computer and use it in GitHub Desktop.
Spring Boot JTA Atomikos & JMS configuration
package com.xxx.common.config;
import com.atomikos.icatch.config.UserTransactionServiceImp;
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import net.sf.ehcache.transaction.manager.TransactionManagerLookup;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.env.Environment;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
import java.util.Properties;
@Configuration
@DependsOn("initializingBeanImpl")
public class AtomikosJtaConfiguration {
@Autowired
private Environment environment ;
@Value("${spring.application.name}")
private String APPLICATION_NAME;
public void tailorProperties(Properties properties) {
properties.setProperty( "hibernate.transaction.manager_lookup_class",
TransactionManagerLookup.class.getName());
}
@Bean(name="userTransactionServiceImp", initMethod = "init", destroyMethod = "shutdownForce")
public UserTransactionServiceImp userTransactionServiceImp()
{
Properties properties = new Properties();
properties.setProperty("com.atomikos.icatch.max_timeout", "3600000");
properties.setProperty("com.atomikos.icatch.service", "com.atomikos.icatch.standalone.UserTransactionServiceFactory");
properties.setProperty("com.atomikos.icatch.log_base_name", APPLICATION_NAME);
properties.setProperty("com.atomikos.icatch.output_dir", "../standalone/log/");
properties.setProperty("com.atomikos.icatch.log_base_dir", "../standalone/log/");
UserTransactionServiceImp userTransactionServiceImp = new UserTransactionServiceImp(properties);
return userTransactionServiceImp;
}
//@Bean(initMethod = "init", destroyMethod = "close", name = "userTransaction")
@DependsOn("userTransactionServiceImp")
@Bean(name = "userTransaction")
public UserTransaction userTransaction() throws Throwable {
UserTransactionImp userTransactionImp = new UserTransactionImp();
userTransactionImp.setTransactionTimeout(1000);
return userTransactionImp;
}
@Bean(initMethod = "init", destroyMethod = "close", name = "transactionManager")
@DependsOn("userTransactionServiceImp")
//@Bean(name = "transactionManager")
public TransactionManager transactionManager() throws Throwable {
UserTransactionManager userTransactionManager = new UserTransactionManager();
userTransactionManager.setForceShutdown(false);
return userTransactionManager;
}
}
--
package com.xxx.common.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.transaction.TransactionManager;
import javax.transaction.UserTransaction;
@Configuration
public class TxConfig {
@Autowired
private AtomikosJtaConfiguration jtaConfiguration ;
@Bean(name = "platformTransactionManager")
@DependsOn("transactionManager")
@Primary
public PlatformTransactionManager platformTransactionManager() throws Throwable {
UserTransaction userTransaction = jtaConfiguration.userTransaction() ;
TransactionManager transactionManager = jtaConfiguration.transactionManager() ;
return new JtaTransactionManager( userTransaction, transactionManager );
}
}
---
package com.xxx.common.config;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.*;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.annotation.JmsListenerConfigurer;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerEndpointRegistrar;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.destination.DestinationResolver;
import org.springframework.jms.support.destination.DynamicDestinationResolver;
//import org.springframework.jms.support.destination.JndiDestinationResolver;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.util.ErrorHandler;
import javax.inject.Inject;
import javax.jms.*;
@Configuration
@DependsOn("platformTransactionManager")
@EnableJms
@EnableTransactionManagement
@EnableAutoConfiguration
@PropertySource(ignoreResourceNotFound = true, value = "classpath:common.properties")
public class JMSConfig implements JmsListenerConfigurer {
@Value("${activemq.broker-url}")
String brokerURL;
@Value("${activemq.user}")
String userName;
@Value("${activemq.password}")
String password;
@Value("${trans.alert.topic}")
private String transactionAlertTopicName;
@Value("${trans.alert.email.queue}")
private String emailNotificationQueueName;
@Value("${trans.alert.sms.queue}")
private String smsNotificationQueueName;
@Bean
public Topic transactionAlertTopic() {
return new ActiveMQTopic(transactionAlertTopicName);
}
@Bean
public Queue emailNotificationQueue() {
return new ActiveMQQueue(emailNotificationQueueName);
}
@Bean
public Queue smsNotificationQueue() {
return new ActiveMQQueue(smsNotificationQueueName);
}
@Inject
private AtomikosJtaConfiguration jtaConfiguration;
@Bean("queueConnectionFactory")
public QueueConnectionFactory queueConnectionFactory() {
ActiveMQConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory();
//QueueConnectionFactory queueConnectionFactory = new ActiveMQConnectionFactory();
queueConnectionFactory.setTrustAllPackages(true);
queueConnectionFactory.setBrokerURL(brokerURL);
queueConnectionFactory.setUserName(userName);
queueConnectionFactory.setPassword(password);
return queueConnectionFactory;
}
//pubsubdomain i belirtmemiz gerekeceğinden ayrıştırmamız gerekirdi.
@Bean(initMethod = "init", destroyMethod = "close", name = "xaTopicConnectionFactory")
public ConnectionFactory xaTopicConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(brokerURL);
activeMQXAConnectionFactory.setUserName(userName);
activeMQXAConnectionFactory.setPassword(password);
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("xaTopic");
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
return atomikosConnectionFactoryBean;
}
@Bean(initMethod = "init", destroyMethod = "close", name = "xaQueueConnectionFactory")
public ConnectionFactory xaQueueConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL(brokerURL);
activeMQXAConnectionFactory.setUserName(userName);
activeMQXAConnectionFactory.setPassword(password);
AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("xaQueue");
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
return atomikosConnectionFactoryBean;
}
@Bean("topicConnectionFactory")
@Primary
//TODO: Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed
public TopicConnectionFactory topicConnectionFactory() {
ActiveMQConnectionFactory topicConnectionFactory = new ActiveMQConnectionFactory();
topicConnectionFactory.setBrokerURL(brokerURL);
//TODO: servislerin application.properties'inden değil de common'daki properties den alıyor.
topicConnectionFactory.setUserName(userName);
topicConnectionFactory.setPassword(password);
topicConnectionFactory.setTrustAllPackages(true);
topicConnectionFactory.setUseAsyncSend(true);
// CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
return topicConnectionFactory;
}
//PooledConnectionFactory vendor specific. CachingConnectionFactory ise Spring'e ait.
//JBOSS'un kullandığı ise ActiveMQManagedConnectionFactory o da container specific.
/*
Without connection pooling, JMSTemplate, by default, creates a new connection, session,
producer for each message sent and then closes them all again.
This results in a long workaround time and you getting reconnected for every JMS message sent.
*/
@Bean(name = "cachingQueueContainerFactory")
public CachingConnectionFactory cachingQueueConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory =
new CachingConnectionFactory(queueConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}
@Bean(name = "cachingTopicContainerFactory")
public CachingConnectionFactory cachingTopicConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory =
new CachingConnectionFactory(topicConnectionFactory());
cachingConnectionFactory.setSessionCacheSize(10);
return cachingConnectionFactory;
}
//TODO: listener/consumer için. Ayrı bir config sınıfına alınabilir.
@Bean(name = "queueContainerFactory")
public DefaultJmsListenerContainerFactory queueJmsListenerContainerFactory(
@Qualifier("queueConnectionFactory") ConnectionFactory queueConnectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory queueListenerFactory = new DefaultJmsListenerContainerFactory();
configurer.configure(queueListenerFactory, queueConnectionFactory);
queueListenerFactory.setConnectionFactory(queueConnectionFactory);
queueListenerFactory.setPubSubDomain(false);
queueListenerFactory.setMessageConverter(jacksonJmsMessageConverter());
queueListenerFactory.setAutoStartup(true);
queueListenerFactory.setDestinationResolver(destinationResolver());
queueListenerFactory.setConcurrency("1");
queueListenerFactory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
System.err.println("An error has occurred in the transaction");
t.printStackTrace();
}
}
);
return queueListenerFactory;
}
//TODO: listener/consumer için. Ayrı bir config sınıfına alınabilir.
@Bean(name = "topicContainerFactory")
public DefaultJmsListenerContainerFactory topicJmsListenerContainerFactory() {
// @Qualifier("topicConnectionFactory") ConnectionFactory topicConnectionFactory,
// DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory topicListenerFactory = new DefaultJmsListenerContainerFactory();
//configurer.configure(topicListenerFactory, topicConnectionFactory);
// configurer.configure(topicListenerFactory, cachingConnectionFactory());
//topicListenerFactory.setConnectionFactory(topicConnectionFactory);
topicListenerFactory.setConnectionFactory(cachingTopicConnectionFactory());
//TODO:Used by JmsTemplate for resolving destination names from simple Strings to actual Destination implementation instances.
topicListenerFactory.setDestinationResolver(destinationResolver());
topicListenerFactory.setPubSubDomain(true);
topicListenerFactory.setMessageConverter(jacksonJmsMessageConverter());
topicListenerFactory.setAutoStartup(true);
topicListenerFactory.setConcurrency("1");
topicListenerFactory.setReceiveTimeout(10000l);
topicListenerFactory.setErrorHandler(
new ErrorHandler() {
@Override
public void handleError(Throwable t) {
System.err.println("An error has occurred in the transaction");
t.printStackTrace();
}
}
);
//factory.setErrorHandler(t -> System.err.println("An error has occurred in the transaction"));
return topicListenerFactory;
}
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setContainerFactory(topicJmsListenerContainerFactory());
}
//TODO: publisher/producer için. Ay rı bir config sınıfına alınabilir.
@Bean(name = "topicJmsTemplate")
public JmsTemplate topicJmsTemplate(@Qualifier("topicConnectionFactory") ConnectionFactory topicConnectionFactory) {
JmsTemplate topicJmsTemplate = new JmsTemplate();
topicJmsTemplate.setConnectionFactory(cachingTopicConnectionFactory());
topicJmsTemplate.setReceiveTimeout(10000);
topicJmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
topicJmsTemplate.setDestinationResolver(destinationResolver());
/*topicJmsTemplate.setDestinationResolver(jndiDestinationResolver());*/
return topicJmsTemplate;
}
@Bean
public DestinationResolver destinationResolver() {
return new DynamicDestinationResolver();
}
@Bean // Serialize message content to json using TextMessage
public MessageConverter jacksonJmsMessageConverter() {
/*
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
converter.setObjectMapper(objectMapper());
*/
JsonMessageConverter converter = new JsonMessageConverter();
return converter;
}
@Bean
public ObjectMapper objectMapper(){
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
@Component
private class JsonMessageConverter implements MessageConverter {
@Autowired
private ObjectMapper mapper;
JsonMessageConverter() {}
/**
* Converts message to JSON. Used mostly by {@link org.springframework.jms.core.JmsTemplate}
*/
@Override
public javax.jms.Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
String json;
try {
json = mapper.writeValueAsString(object);
} catch (Exception e) {
throw new MessageConversionException("Message cannot be parsed. ", e);
}
TextMessage message = session.createTextMessage();
message.setText(json);
return message;
}
/**
* Extracts JSON payload for further processing by JacksonMapper.
*/
@Override
public Object fromMessage(javax.jms.Message message) throws JMSException, MessageConversionException {
return ((TextMessage) message).getText();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment