Skip to content

Instantly share code, notes, and snippets.

@marcgeld
Last active February 24, 2016 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcgeld/17565b4aa96e584f0b63 to your computer and use it in GitHub Desktop.
Save marcgeld/17565b4aa96e584f0b63 to your computer and use it in GitHub Desktop.
Groovy script to copy incoming message from one in-queue to a out-queue
#! /usr/bin/env groovy
// 1.) Start ActiveMQ on localhost (...or other hosts)
// 2.) Run with ./amqtransceiver.groovy -t --inqueue one --outqueue two --activemqurl tcp://localhost:61616
@Grab(group='org.apache.activemq', module='activemq-core', version='5.7.0')
@Grab(group='commons-io', module='commons-io', version='1.2')
@Grab(group='org.apache.geronimo.specs', module='geronimo-jms_1.1_spec', version='1.1.1')
@Grab(group='org.apache.qpid', module='qpid-amqp-1-0-client-jms', version='0.30')
@Grab(group='ch.qos.logback', module='logback-classic', version='1.1.5')
@Grab(group='ch.qos.logback', module='logback-core', version='1.1.5')
@Grab(group='org.slf4j', module='slf4j-api', version='1.7.16')
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets
import javax.jms.*
import javax.naming.*
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl
import ch.qos.logback.core.*;
import ch.qos.logback.core.encoder.*;
import ch.qos.logback.core.read.*;
import ch.qos.logback.core.rolling.*;
import ch.qos.logback.core.status.*;
import ch.qos.logback.classic.net.*;
import ch.qos.logback.classic.encoder.*;
import static ch.qos.logback.classic.Level.*;
def appName = this.getClass().getName()
Logger logger = LoggerFactory.getLogger(this.getClass());
def cli = new CliBuilder(usage:"${appName} --inqueue <ActiveMQ Queue Name> --outqueue <ActiveMQ Queue Name> --activemqurl <ActiveMQ URL>")
cli.with {
i(longOpt: 'inqueue', 'in ActiveMQ Queue name', args: 1, required: true)
o(longOpt: 'outqueue', 'out ActiveMQ Queue name', args: 1, required: true)
u(longOpt: 'activemqurl', 'ActiveMQ URL', args: 1, required: true)
h(longOpt: 'help', 'Print help', required: false)
t(longOpt: 'test', 'Send a small test message', required: false)
}
def options = cli.parse(args)
if (!options) return
if (options.h) cli.usage()
logger.info("running ${appName}...")
// defs
def inQueueName = options.i
def outQueueName = options.o
def amqConnectionUrl = options.u
def runTests = options.t
logger.info("listen to queue: '${inQueueName}' and writes to queue: '${outQueueName}' on amq host'${amqConnectionUrl}'")
// Coonections
def connectionFactory = new ActiveMQConnectionFactory(amqConnectionUrl)
def connection = connectionFactory.createConnection()
def session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
def inQueue = session.createQueue(inQueueName);
def outQueue = session.createQueue(outQueueName);
QueueReceiver receiver = session.createReceiver(inQueue);
QueueSender sender = session.createSender(outQueue);
connection.start();
// Shutdown hook
Runtime.runtime.addShutdownHook {
logger.warn("Close resources and quit…")
session.close()
connection.close()
}
if ( runTests ) {
logger.info("Run test…")
def testSendThread = Thread.start {
MESSAGE_HEADERS = ["isRunAsTest":runTests]
sleep(2000)
logger.info("Send test message to queue: ${inQueueName}")
byte[] messageBytes = "small test message".getBytes(StandardCharsets.UTF_8)
BytesMessage bytesMsg = session.createBytesMessage()
bytesMsg.writeBytes(messageBytes);
MessageProducer messageProducer = session.createProducer(inQueue)
MESSAGE_HEADERS.each{ key, value ->
bytesMsg.setObjectProperty((String) key, value);
}
messageProducer.send(bytesMsg)
messageProducer.close()
logger.info("Message sent")
}
}
while (true) {
logger.info("wait for message")
Message message = receiver.receive();
logger.info("Received message!")
if (message != null) {
logger.info("JMSDestination: " + message.getJMSDestination())
logger.info("JMSReplyTo: " + message.getJMSReplyTo())
logger.info("JMSDeliveryMode: " + message.getJMSDeliveryMode())
logger.info("JMSMessageID: " + message.getJMSMessageID())
logger.info("JMSTimestamp: " + message.getJMSTimestamp())
logger.info("JMSExpiration: " + message.getJMSExpiration())
logger.info("JMSRedelivered: " + message.getJMSRedelivered())
logger.info("JMSPriority: " + message.getJMSPriority())
logger.info("JMSCorrelationID: " + message.getJMSCorrelationID())
logger.info("JMSType: " + message.getJMSType())
// List all properties
message.getPropertyNames().each{ key ->
def value = message.getStringProperty((String) key)
logger.info("${key}=${value}")
}
// Log message
if (message instanceof TextMessage) {
message = (TextMessage) message;
logger.info("Reading message: " + message.getText());
}
logger.info("Send message!");
sender.send(message)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment