Skip to content

Instantly share code, notes, and snippets.

@monodot
Created January 28, 2020 17:25
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save monodot/826787a58e9b39afc5bd9dcfab3ada12 to your computer and use it in GitHub Desktop.
Save monodot/826787a58e9b39afc5bd9dcfab3ada12 to your computer and use it in GitHub Desktop.
Fuse/Camel - Wiring up AMQ (ActiveMQ) and Oracle AQ with JTA XA transactions (Narayana) on Spring Boot
package xyz.tomd.demos.fuse.springboot.amqxa;
import oracle.jdbc.xa.client.OracleXADataSource;
import oracle.jms.AQjmsFactory;
import org.apache.camel.component.jms.JmsComponent;
import org.messaginghub.pooled.jms.JmsPoolXAConnectionFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.XAConnectionFactoryWrapper;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.*;
import javax.transaction.TransactionManager;
import java.sql.SQLException;
import java.util.Properties;
@SpringBootApplication
public class Application {
/**
* A main method to start this application.
*/
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean(name = "jms")
JmsComponent jmsComponent(ConnectionFactory activemqCF, PlatformTransactionManager jmstx) {
JmsComponent jms = new JmsComponent();
// This should be our ActiveMQ XA connection factory
jms.setConnectionFactory(activemqCF);
jms.setCacheLevelName("CACHE_CONSUMER");
// Camel uses Spring JMS under the covers - so it expects a (Spring) PlatformTransactionManager
jms.setTransactionManager(jmstx);
jms.setTransacted(true);
return jms;
}
@Bean(name = "oracleaq")
JmsComponent oracleAQJmsComponent(PlatformTransactionManager transactionManager,
TransactionManager jtaTransactionManager)
throws JMSException, SQLException {
OracleXADataSource oracleXADataSource = new OracleXADataSource();
oracleXADataSource.setURL("jdbc:oracle:thin:@localhost:1521:ORCLCDB");
oracleXADataSource.setUser("scott");
oracleXADataSource.setPassword("tiger");
// Now we've created the XA datasource, we need something that will generate an XAConnectionFactory
XAConnectionFactory oracleXACF = AQjmsFactory.getXAConnectionFactory(oracleXADataSource);
// Now we need to wrap this connection factory in an enlisting connection factory
// Related reading: https://access.redhat.com/documentation/en-us/red_hat_fuse/7.2/html-single/apache_karaf_transaction_guide/index#about_auto_enlistment
JmsPoolXAConnectionFactory pooledJmsXACF = new JmsPoolXAConnectionFactory();
pooledJmsXACF.setConnectionFactory(oracleXACF);
// Wire the connection factory to Narayana via its JTA interface implementation
pooledJmsXACF.setTransactionManager(jtaTransactionManager);
JmsComponent jms = new JmsComponent();
jms.setConnectionFactory(pooledJmsXACF);
// Wire the Camel JMS component to Narayana via its Spring interface implementation
jms.setTransactionManager(transactionManager);
jms.setTransacted(false);
return jms;
}
}
# By simply setting these values, Spring Boot will create an ActiveMQ connection factory
# We will then attach this connection factory to a Camel JmsComponent (see Application.java)
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
# More autoconfiguration magic.... Spring Boot will create a datasource for us with these properties
spring.datasource.url=jdbc:postgresql://localhost:5432/sampledb
spring.datasource.username=admin
spring.datasource.password=admin
#debug=true
logging.level.org.apache.camel.component.jms=DEBUG
# Cluster (options replaced by Openshift env variables)
cluster.name=@project.artifactId@
cluster.nodename=${cluster.name}-0
cluster.base-dir=./target/tx
# Recovery is enabled only inside Openshift
cluster.recovery.enabled=false
# Transaction data
spring.jta.transaction-manager-id=${cluster.nodename}
spring.jta.log-dir=${cluster.base-dir}/store/${cluster.nodename}
# Transaction recovery settings
snowdrop.narayana.openshift.recovery.enabled=${cluster.recovery.enabled}
snowdrop.narayana.openshift.recovery.current-pod-name=${cluster.nodename}
snowdrop.narayana.openshift.recovery.statefulset=${cluster.name}
snowdrop.narayana.openshift.recovery.status-dir=${cluster.base-dir}/status
package xyz.tomd.demos.fuse.springboot.amqxa;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class XaApplicationRouteBuilder extends RouteBuilder {
public void configure() throws Exception {
from("oracleaq:FOOQUEUE")
.log("Received a message from Oracle AQ! - ${body}")
.setHeader("name", body())
.to("sql:insert into chickens (name) values (:#name)")
.log("Inserted a chicken into the database")
.log("Sending to ActiveMQ...")
.to("jms:queue:CHICKENS.PROCESSED");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment