Created
November 24, 2017 06:06
-
-
Save gtod/e8094290e782d410f39e8e2dce464665 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package net.gtod.cusoon | |
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory | |
import org.slf4j.Logger | |
import org.slf4j.LoggerFactory | |
import java.lang.Thread.sleep | |
import java.text.DecimalFormat | |
import javax.jms.ExceptionListener | |
import javax.jms.JMSException | |
import javax.jms.Session | |
import javax.jms.TextMessage | |
private val httpQueueName = "TEST.FOO" | |
private val formatter = DecimalFormat("#000.000") | |
private val logger: Logger = LoggerFactory.getLogger("jms") | |
private fun logInfo(text: String) { | |
val time = formatter.format((System.nanoTime() % 1_000_000_000) / 1_000_000.0) | |
logger.info("{} {}", time, text) | |
} | |
fun main(args: Array<String>) { | |
val requests = mutableMapOf<String, TextMessage>() | |
thread(SimpleRequestServer(), false) | |
ActiveMQConnectionFactory().createConnection("input", "input").use { connection -> | |
connection.start() | |
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) | |
val httpQueue = session.createQueue(httpQueueName) | |
val producer = session.createProducer(httpQueue) | |
val replyQueue = session.createTemporaryQueue() | |
val replyConsumer = session.createConsumer(replyQueue) | |
for (i in 1..10) { | |
val requestMsg = session.createTextMessage("$i message:").apply { | |
jmsReplyTo = replyQueue | |
} | |
producer.send(requestMsg) | |
logInfo("SENT: $i") | |
requests[requestMsg.jmsMessageID] = requestMsg | |
replyConsumer.receive(1000)?.let { | |
val reply = it as TextMessage | |
// logInfo("Received reply: " + reply.text) | |
// logInfo("CorrelatedId: " + reply.jmsCorrelationID) | |
val matchedMessage = requests[reply.jmsCorrelationID] | |
logInfo("Matched: " + matchedMessage?.text) | |
} | |
sleep(500) | |
} | |
replyConsumer.close() | |
replyQueue.delete() | |
} | |
} | |
class SimpleRequestServer : ExceptionListener, Runnable { | |
override fun onException(exception: JMSException?) { | |
logInfo("Exception: " + exception) | |
} | |
override fun run() { | |
ActiveMQConnectionFactory().createConnection("input", "input").use { connection -> | |
connection.start() | |
connection.exceptionListener = this | |
logInfo("Server started") | |
val session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) | |
val httpQueue = session.createQueue(httpQueueName) | |
val replyProducer = session.createProducer(null) | |
val consumer = session.createConsumer(httpQueue) | |
for (i in 1..15) { | |
consumer.receive(1000)?.let { message -> | |
logInfo("RECEIVED: " + (message as TextMessage).text) | |
val destination = message.getJMSReplyTo() | |
// logInfo("Reply to queue: " + destination) | |
val reply = session.createTextMessage("A reply message").apply { | |
jmsCorrelationID = message.getJMSMessageID() | |
} | |
replyProducer.send(destination, reply) | |
logInfo("Reply sent") | |
} | |
} | |
logInfo("Server done") | |
consumer?.close() | |
session.close() | |
} | |
} | |
} | |
private fun thread(runnable: Runnable, daemon: Boolean) { | |
Thread(runnable).apply { | |
name = "serv" | |
isDaemon = daemon | |
start() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment