Last active
January 2, 2016 09:39
-
-
Save recht/8284723 to your computer and use it in GitHub Desktop.
Script for peeking into a JMS queue and doing some load testing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Grapes([ | |
@Grab("org.apache.activemq:activemq-client:5.9.0"), | |
@Grab("org.slf4j:slf4j-log4j12:1.7.5"), | |
@Grab("log4j:log4j:1.2.17"), | |
@Grab("org.jboss.logmanager:jboss-logmanager:1.3.1.Final"), | |
@Grab(group="org.springframework", module="spring-core", version="3.2.5.RELEASE", transitive=false) | |
]) | |
import org.apache.activemq.* | |
import org.apache.activemq.command.* | |
import java.util.concurrent.* | |
import java.util.concurrent.atomic.* | |
import org.springframework.core.io.support.PathMatchingResourcePatternResolver | |
import javax.jms.* | |
@Grab("org.hornetq:hornetq-jms-client:2.3.11.Final") | |
import org.hornetq.jms.client.* | |
import org.hornetq.api.core.* | |
import org.hornetq.api.jms.* | |
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory | |
def cli = new CliBuilder(usage: 'browser.groovy') | |
cli.with { | |
u longOpt: 'uri', args: 1, argName: 'uri', required: true, 'uri to connect to' | |
h longOpt: 'help', 'Show help' | |
q longOpt: 'queueName', args:1, argName: 'queueName', required: true, 'Queue name to use' | |
p longOpt: 'produce', args: 1, argName: 'produceCount', 'Produces x test messages into the queue per thread' | |
i longOpt: 'import', args: 1, argName: 'import', 'Import messages into queue' | |
n longOpt: 'concurrency', args: 1, 'Concurrency when producing. Default 1.' | |
s longOpt: 'size', args: 1, 'Message size to produce. Default 1024 bytes.' | |
c longOpt: 'consume', 'Consumes messages from the queue' | |
t longOpt: 'type', args: 1, 'Broker type - one of amq or hornetq' | |
} | |
def options = cli.parse(args) | |
if (!options) { | |
return | |
} | |
if (options.h) { | |
cli.usage() | |
return | |
} | |
println "Connecting to ${options.uri}" | |
def type = options.t ?: 'amq' | |
def connection, queue | |
if (type == 'amq') { | |
def cf = new ActiveMQConnectionFactory("${options.uri}") | |
cf.useAsyncSend = false | |
cf.sendAcksAsync = false | |
cf.dispatchAsync = false | |
connection = cf.createConnection() | |
connection.start() | |
queue = new ActiveMQQueue(options.queueName) | |
} else if (type == 'hornetq') { | |
def tcs = options.uri.split(",").collect { | |
def uri = URI.create(it) | |
def params = [ | |
host: uri.host, | |
port: uri.port, | |
] | |
new TransportConfiguration(NettyConnectorFactory.class.getName(), params) | |
} | |
tcs.each { println it } | |
def cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF, tcs as TransportConfiguration[]) | |
cf.reconnectAttempts = -1 | |
cf.initialConnectAttempts = 2 | |
cf.failoverOnInitialConnection = true | |
connection = cf.createConnection() | |
connection.start() | |
queue = HornetQJMSClient.createQueue(options.queueName) | |
} else { | |
println "Unknown type" | |
return | |
} | |
println connection | |
if (options.p) { | |
def concurrency = (options.n ?: 1) as int | |
def size = (options.s ?: 1024) as int | |
println "Producing ${options.p} messages to ${queue} using ${concurrency} threads" | |
def pool = Executors.newCachedThreadPool() | |
def latch = new CountDownLatch(concurrency) | |
def counter = new AtomicInteger() | |
def body = new byte[size] | |
for (def producerIdx = 0; producerIdx < concurrency; producerIdx++) { | |
def num = producerIdx | |
def task = { | |
try { | |
def session = connection.createSession(true, Session.SESSION_TRANSACTED) | |
def producer = session.createProducer(queue) | |
for (def i = 0; i < (options.p as int); i++) { | |
def msg = session.createBytesMessage() | |
msg.writeBytes(body) | |
producer.send(msg) | |
session.commit() | |
println "Sent ${counter.incrementAndGet()}" | |
} | |
producer.close() | |
session.close() | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
latch.countDown() | |
} | |
} | |
pool.submit(task as Runnable) | |
} | |
latch.await() | |
pool.shutdown() | |
} else if (options.c) { | |
println "Listening for messages on ${queue}" | |
def pool = Executors.newCachedThreadPool() | |
def counter = new AtomicInteger() | |
def concurrency = (options.n ?: 1) as int | |
def latch = new CountDownLatch(concurrency) | |
for (def consumerIdx = 0; consumerIdx < concurrency; consumerIdx++) { | |
def task = { | |
def session = connection.createSession(true, Session.SESSION_TRANSACTED) | |
def consumer = session.createConsumer(queue) | |
println "Created consumer ${consumer}" | |
consumer.setMessageListener({ msg -> | |
session.commit() | |
println "Received ${counter.incrementAndGet()} messages" | |
} as MessageListener) | |
try { | |
Thread.sleep(100000000) | |
} catch (InterruptedException e) { | |
consumer.close() | |
session.close() | |
} | |
} | |
pool.submit(task as Runnable) | |
} | |
println "Hit any key to stop" | |
System.in.read() | |
pool.shutdownNow() | |
} else if (options.i) { | |
def resolver = new PathMatchingResourcePatternResolver() | |
def resources = resolver.getResources('file:' + options.'import') | |
def session = connection.createSession(true, Session.SESSION_TRANSACTED) | |
def producer = session.createProducer(queue) | |
resources.each { | |
def msg = session.createBytesMessage() | |
msg.writeBytes(it.file.bytes) | |
producer.send(msg) | |
println "Sent ${it.file}" | |
} | |
session.commit() | |
producer.close() | |
session.close() | |
} else { | |
def session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) | |
def queueBrowser = session.createBrowser(queue) | |
println "Created browser ${queueBrowser}" | |
def e = queueBrowser.getEnumeration() | |
def count = 0 | |
while (e.hasMoreElements()) { | |
def msg = e.nextElement() | |
println msg.getJMSMessageID() | |
if (msg instanceof BytesMessage) { | |
println msg.bodyLength | |
def body = new byte[msg.bodyLength] | |
msg.readBytes(body) | |
new File("msg-${count++}").bytes = body | |
} | |
} | |
println "No more messages" | |
queueBrowser.close() | |
} | |
connection.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment