Created
August 23, 2017 06:10
-
-
Save marko-asplund/983300e4bec59f53a13bc0d319a506f6 to your computer and use it in GitHub Desktop.
Produce & consume messages using Apache Artemis v2.2.0 core API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.practicingtechie | |
import org.apache.activemq.artemis.api.core.TransportConfiguration | |
import org.apache.activemq.artemis.api.core.SimpleString | |
import org.apache.activemq.artemis.api.core.client.{ActiveMQClient, ClientMessage, MessageHandler} | |
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants._ | |
import scala.collection.JavaConversions._ | |
// produce & consume messages using Apache Artemis v2.2.0 core API | |
object ArtemisClient { | |
val NettyConnectorFactory = "org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory" | |
val QueueName = "foo.bar.requests" | |
val creds = Map( | |
"consumer-user1" -> "****", | |
"producer-user1" -> "****" | |
) | |
def createSession(userName: String) = { | |
val connectionParams: Map[String, Object] = Map( | |
PORT_PROP_NAME -> Int.box(61616), | |
HOST_PROP_NAME -> "foo.bar.com", | |
SSL_ENABLED_PROP_NAME -> Boolean.box(true), | |
TRUSTSTORE_PATH_PROP_NAME -> "/foo/bar/my.keystore", | |
TRUSTSTORE_PASSWORD_PROP_NAME -> "****" | |
) | |
val locator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory, connectionParams)). | |
setThreadPoolMaxSize(10). | |
setReconnectAttempts(-1) | |
val factory = locator.createSessionFactory() | |
factory.createSession(userName, creds(userName), false, true, true, false, 10) | |
} | |
def produceMessage(text: String) = { | |
val session = ArtemisClient.createSession("producer-user1") | |
val producer = session.createProducer(QueueName) | |
val message: ClientMessage = session.createMessage(true) | |
message.getBodyBuffer().writeString(text) | |
producer.send(message) | |
session.commit | |
} | |
def consumeMessage() = { | |
val session = ArtemisClient.createSession("consumer-user1") | |
val consumer = session.createConsumer(QueueName) | |
consumer.setMessageHandler(new MyMessageHandler) | |
session.start() | |
Thread.sleep(890123) | |
session.close() | |
} | |
class MyMessageHandler extends MessageHandler { | |
override def onMessage(msg: ClientMessage): Unit = { | |
val id = msg.getMessageID | |
msg.acknowledge | |
} | |
} | |
def main(args: Array[String]): Unit = { | |
val cmd = args(0) | |
if (cmd == "send") { | |
produceMessage("hello, world") | |
} else if (cmd == "receive") { | |
consumeMessage() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment