Skip to content

Instantly share code, notes, and snippets.

@gtod
Created November 24, 2017 06:06
Show Gist options
  • Save gtod/e8094290e782d410f39e8e2dce464665 to your computer and use it in GitHub Desktop.
Save gtod/e8094290e782d410f39e8e2dce464665 to your computer and use it in GitHub Desktop.
package net.gtod.cusoon
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.lang.Thread.sleep
import java.text.DecimalFormat
import javax.jms.ExceptionListener
import javax.jms.JMSException
import javax.jms.Session
import javax.jms.TextMessage
private val httpQueueName = "TEST.FOO"
private val formatter = DecimalFormat("#000.000")
private val logger: Logger = LoggerFactory.getLogger("jms")
private fun logInfo(text: String) {
val time = formatter.format((System.nanoTime() % 1_000_000_000) / 1_000_000.0)
logger.info("{} {}", time, text)
}
fun main(args: Array<String>) {
val requests = mutableMapOf<String, TextMessage>()
thread(SimpleRequestServer(), false)
ActiveMQConnectionFactory().createConnection("input", "input").use { connection ->
connection.start()
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val httpQueue = session.createQueue(httpQueueName)
val producer = session.createProducer(httpQueue)
val replyQueue = session.createTemporaryQueue()
val replyConsumer = session.createConsumer(replyQueue)
for (i in 1..10) {
val requestMsg = session.createTextMessage("$i message:").apply {
jmsReplyTo = replyQueue
}
producer.send(requestMsg)
logInfo("SENT: $i")
requests[requestMsg.jmsMessageID] = requestMsg
replyConsumer.receive(1000)?.let {
val reply = it as TextMessage
// logInfo("Received reply: " + reply.text)
// logInfo("CorrelatedId: " + reply.jmsCorrelationID)
val matchedMessage = requests[reply.jmsCorrelationID]
logInfo("Matched: " + matchedMessage?.text)
}
sleep(500)
}
replyConsumer.close()
replyQueue.delete()
}
}
class SimpleRequestServer : ExceptionListener, Runnable {
override fun onException(exception: JMSException?) {
logInfo("Exception: " + exception)
}
override fun run() {
ActiveMQConnectionFactory().createConnection("input", "input").use { connection ->
connection.start()
connection.exceptionListener = this
logInfo("Server started")
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
val httpQueue = session.createQueue(httpQueueName)
val replyProducer = session.createProducer(null)
val consumer = session.createConsumer(httpQueue)
for (i in 1..15) {
consumer.receive(1000)?.let { message ->
logInfo("RECEIVED: " + (message as TextMessage).text)
val destination = message.getJMSReplyTo()
// logInfo("Reply to queue: " + destination)
val reply = session.createTextMessage("A reply message").apply {
jmsCorrelationID = message.getJMSMessageID()
}
replyProducer.send(destination, reply)
logInfo("Reply sent")
}
}
logInfo("Server done")
consumer?.close()
session.close()
}
}
}
private fun thread(runnable: Runnable, daemon: Boolean) {
Thread(runnable).apply {
name = "serv"
isDaemon = daemon
start()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment