Skip to content

Instantly share code, notes, and snippets.

@recht
Last active January 2, 2016 09:39
Show Gist options
  • Save recht/8284723 to your computer and use it in GitHub Desktop.
Save recht/8284723 to your computer and use it in GitHub Desktop.
Script for peeking into a JMS queue and doing some load testing
@Grapes([
@Grab("org.apache.activemq:activemq-client:5.9.0"),
@Grab("org.slf4j:slf4j-log4j12:1.7.5"),
@Grab("log4j:log4j:1.2.17"),
@Grab("org.jboss.logmanager:jboss-logmanager:1.3.1.Final"),
@Grab(group="org.springframework", module="spring-core", version="3.2.5.RELEASE", transitive=false)
])
import org.apache.activemq.*
import org.apache.activemq.command.*
import java.util.concurrent.*
import java.util.concurrent.atomic.*
import org.springframework.core.io.support.PathMatchingResourcePatternResolver
import javax.jms.*
@Grab("org.hornetq:hornetq-jms-client:2.3.11.Final")
import org.hornetq.jms.client.*
import org.hornetq.api.core.*
import org.hornetq.api.jms.*
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
def cli = new CliBuilder(usage: 'browser.groovy')
cli.with {
u longOpt: 'uri', args: 1, argName: 'uri', required: true, 'uri to connect to'
h longOpt: 'help', 'Show help'
q longOpt: 'queueName', args:1, argName: 'queueName', required: true, 'Queue name to use'
p longOpt: 'produce', args: 1, argName: 'produceCount', 'Produces x test messages into the queue per thread'
i longOpt: 'import', args: 1, argName: 'import', 'Import messages into queue'
n longOpt: 'concurrency', args: 1, 'Concurrency when producing. Default 1.'
s longOpt: 'size', args: 1, 'Message size to produce. Default 1024 bytes.'
c longOpt: 'consume', 'Consumes messages from the queue'
t longOpt: 'type', args: 1, 'Broker type - one of amq or hornetq'
}
def options = cli.parse(args)
if (!options) {
return
}
if (options.h) {
cli.usage()
return
}
println "Connecting to ${options.uri}"
def type = options.t ?: 'amq'
def connection, queue
if (type == 'amq') {
def cf = new ActiveMQConnectionFactory("${options.uri}")
cf.useAsyncSend = false
cf.sendAcksAsync = false
cf.dispatchAsync = false
connection = cf.createConnection()
connection.start()
queue = new ActiveMQQueue(options.queueName)
} else if (type == 'hornetq') {
def tcs = options.uri.split(",").collect {
def uri = URI.create(it)
def params = [
host: uri.host,
port: uri.port,
]
new TransportConfiguration(NettyConnectorFactory.class.getName(), params)
}
tcs.each { println it }
def cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, tcs as TransportConfiguration[])
cf.reconnectAttempts = -1
cf.initialConnectAttempts = 2
cf.failoverOnInitialConnection = true
connection = cf.createConnection()
connection.start()
queue = HornetQJMSClient.createQueue(options.queueName)
} else {
println "Unknown type"
return
}
println connection
if (options.p) {
def concurrency = (options.n ?: 1) as int
def size = (options.s ?: 1024) as int
println "Producing ${options.p} messages to ${queue} using ${concurrency} threads"
def pool = Executors.newCachedThreadPool()
def latch = new CountDownLatch(concurrency)
def counter = new AtomicInteger()
def body = new byte[size]
for (def producerIdx = 0; producerIdx < concurrency; producerIdx++) {
def num = producerIdx
def task = {
try {
def session = connection.createSession(true, Session.SESSION_TRANSACTED)
def producer = session.createProducer(queue)
for (def i = 0; i < (options.p as int); i++) {
def msg = session.createBytesMessage()
msg.writeBytes(body)
producer.send(msg)
session.commit()
println "Sent ${counter.incrementAndGet()}"
}
producer.close()
session.close()
} catch (Exception e) {
e.printStackTrace();
} finally {
latch.countDown()
}
}
pool.submit(task as Runnable)
}
latch.await()
pool.shutdown()
} else if (options.c) {
println "Listening for messages on ${queue}"
def pool = Executors.newCachedThreadPool()
def counter = new AtomicInteger()
def concurrency = (options.n ?: 1) as int
def latch = new CountDownLatch(concurrency)
for (def consumerIdx = 0; consumerIdx < concurrency; consumerIdx++) {
def task = {
def session = connection.createSession(true, Session.SESSION_TRANSACTED)
def consumer = session.createConsumer(queue)
println "Created consumer ${consumer}"
consumer.setMessageListener({ msg ->
session.commit()
println "Received ${counter.incrementAndGet()} messages"
} as MessageListener)
try {
Thread.sleep(100000000)
} catch (InterruptedException e) {
consumer.close()
session.close()
}
}
pool.submit(task as Runnable)
}
println "Hit any key to stop"
System.in.read()
pool.shutdownNow()
} else if (options.i) {
def resolver = new PathMatchingResourcePatternResolver()
def resources = resolver.getResources('file:' + options.'import')
def session = connection.createSession(true, Session.SESSION_TRANSACTED)
def producer = session.createProducer(queue)
resources.each {
def msg = session.createBytesMessage()
msg.writeBytes(it.file.bytes)
producer.send(msg)
println "Sent ${it.file}"
}
session.commit()
producer.close()
session.close()
} else {
def session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
def queueBrowser = session.createBrowser(queue)
println "Created browser ${queueBrowser}"
def e = queueBrowser.getEnumeration()
def count = 0
while (e.hasMoreElements()) {
def msg = e.nextElement()
println msg.getJMSMessageID()
if (msg instanceof BytesMessage) {
println msg.bodyLength
def body = new byte[msg.bodyLength]
msg.readBytes(body)
new File("msg-${count++}").bytes = body
}
}
println "No more messages"
queueBrowser.close()
}
connection.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment