Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created September 4, 2010 04:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/564889 to your computer and use it in GitHub Desktop.
Save zentrope/564889 to your computer and use it in GitHub Desktop.
package zentrope.hornet {
import scala.actors._
import org.hornetq.api.core._
import org.hornetq.api.core.client._
// ----------------------------------------------------------------------
private object Util {
def withWarning(body: => Unit) = {
// Wraps an exceptional condition we want to warn
// about, but let pass.
try {
body
}
catch {
case (th: Throwable) => {
println("WARN: " + th.toString())
}
}
}
def withLoggedExceptions(prompt: String)(body: => Unit) = {
// Executes BODY, and logs exceptions, if there are any, but
// doesn't do anything special to handle them.
try {
body
}
catch {
case (th: Throwable) =>
println("ERROR: (when '" + prompt + "')") ;
println("ERROR: " + th.toString) ;
// th.printStackTrace()
}
}
}
// ----------------------------------------------------------------------
case class HornetMessage(properties: Map[String,String], body: String)
// ----------------------------------------------------------------------
class Settings(
val discovery: String = "231.7.7.7",
val port: Int = 9876,
val user: String = "guest",
val pass: String = "guest")
{
val xa: Boolean = false
val autoCommitSends: Boolean = true
val autoCommitAcks: Boolean = true
val preAcknowledge: Boolean = false
val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE
override
def toString(): String = {
format("<settings::d:%s,p:%d,u:%s,p:%s>", discovery, port, user, pass)
}
}
// ----------------------------------------------------------------------
protected class Connection(factory: ClientSessionFactory, session: ClientSession) {
// A HornetQ connection, stuff common to both consumers and
// producers.
override def toString(): String =
String.format("<%s:@%x>", this.getClass().getSimpleName(),
hashCode.asInstanceOf[AnyRef])
def start() {
session.start()
}
def stop() {
Util.withLoggedExceptions("stopping session") {
session.stop()
}
Util.withLoggedExceptions("closing session") {
session.close()
}
Util.withLoggedExceptions("closing factory") {
factory.close()
}
}
}
// ----------------------------------------------------------------------
protected class ProducerConnection(factory: ClientSessionFactory, session: ClientSession,
producer: ClientProducer) extends Connection(factory, session) {
// A HornetQ connection, specialized for producers.
val NON_DURABLE: Boolean = false
val DURABLE: Boolean = true
def send(msg: HornetMessage) {
val clientMessage: ClientMessage = session.createMessage(NON_DURABLE)
clientMessage.getBodyBuffer().writeString(msg.body)
msg.properties foreach { case (key, value) =>
clientMessage.putStringProperty(key, value)
}
producer.send(clientMessage)
}
override def stop() {
super.stop()
Util.withLoggedExceptions("closing producer " + this) {
producer.close()
}
}
}
// ----------------------------------------------------------------------
protected class ConsumerConnection(factory: ClientSessionFactory, val session: ClientSession,
consumer: ClientConsumer) extends Connection(factory, session) {
// A HornetQ connection, specialized for consumers.
override def stop() {
super.stop()
Util.withLoggedExceptions("closing consumer " + this) {
consumer.close()
}
}
}
// ----------------------------------------------------------------------
private object HornetQ {
// A convenience class to make it seem as if using HornetQ from
// Scala is really easy. ;)
import scala.collection.JavaConversions._
private type Consumer = ClientConsumer
private type Factory = ClientSessionFactory
private type Session = ClientSession
private type Producer = ClientProducer
private def getFactory(settings: Settings): Factory = {
val factory: Factory =
HornetQClient.createClientSessionFactory(settings.discovery,
settings.port)
factory.setDiscoveryInitialWaitTimeout(20000)
factory.setBlockOnAcknowledge(true)
factory.setBlockOnNonDurableSend(true)
factory.setBlockOnDurableSend(true)
factory.setReconnectAttempts(-1)
factory.setRetryInterval(2000)
factory.setConnectionTTL(-1) // Don't die of no data
factory
}
private def getSession(settings: Settings, factory: Factory): Session = {
factory.createSession(settings.user,
settings.pass, settings.xa, settings.autoCommitSends,
settings.autoCommitAcks, settings.preAcknowledge, settings.ackBatchSize)
}
private def createTempQueue(session: Session, address: String, queue: String) {
Util.withWarning {
session.createQueue(address, queue, false)
}
}
private class Handler(session: Session, func: (HornetMessage) => Unit) extends MessageHandler {
// Needs a session if we want to rollback
def onMessage(msg: ClientMessage): Unit = {
try {
val msgBody = msg.getBodyBuffer().readString()
var properties = Map[String, String]()
msg.getPropertyNames() foreach { name =>
val value = msg.getStringProperty(name)
var key = name.toString()
properties = properties + ((key, value))
}
func(HornetMessage(properties, msgBody))
msg.acknowledge()
}
catch {
case (th: Throwable) => {
println("ERROR: [msg.receive] " + th.toString)
try {
session.rollback()
}
catch {
case (th: Throwable) => {
println("ERROR: [msg.rollback] " + th.toString)
}
}
}
}
}
}
// Shortcut type for a function that takes a message and returns void
private type HandlerFunc = (HornetMessage) => Unit
private def createConsumer(session: Session, queue: String, handler: HandlerFunc): Consumer = {
val consumer = session.createConsumer(queue)
consumer.setMessageHandler(new Handler(session, handler))
return consumer
}
private def createProducer(s: Session, a: String) =
s.createProducer(a)
private def swallow(function: => Unit) = {
try {
function
}
catch {
case (th: Throwable) => ; // ignore
}
}
private
def cleanFactory(f: Option[Factory]) = f match {
case Some(x) => swallow { x.close() }
case None => ;
}
private
def cleanSession(s: Option[Session]) = s match {
case Some(x) => swallow { x.close() }
case None => ;
}
private
def cleanProducer(p: Option[Producer]) = p match {
case Some(x) => swallow { x.close() }
case None => ;
}
private
def cleanConsumer(c: Option[Consumer]) = c match {
case Some(x) => swallow { x.close() }
case None => ;
}
override def toString(): String = "<HornetQ>"
def getConsumerConnection(settings: Settings, address: String,
queue: String, handler: HandlerFunc): ConsumerConnection = {
println(this + " getting consumer connection")
var factory: Option[Factory] = None
var session: Option[Session] = None
var consumer: Option[Consumer] = None
var conn: Option[ConsumerConnection] = None
try {
factory = Some(getFactory(settings))
session = Some(getSession(settings, factory.get))
// TODO: Have a way to indicate what kind of queue to create
createTempQueue(session.get, address, queue)
consumer = Some(createConsumer(session.get, queue, handler))
conn = Some(new ConsumerConnection(factory.get, session.get, consumer.get))
}
catch {
case (th: Throwable) => {
println("warn: Unable to connect: " + th.getMessage())
// Gotta close the resources, or the underlying
// HornetQ objects will leak.
cleanFactory(factory)
cleanSession(session)
cleanConsumer(consumer)
throw th
}
}
return conn.get
}
def getProducerConnection(settings: Settings, address: String): ProducerConnection = {
println(this + " getting producer connection")
var conn: Option[ProducerConnection] = None
var factory: Option[Factory] = None
var session: Option[Session] = None
var producer: Option[Producer] = None
try {
factory = Some(getFactory(settings))
session = Some(getSession(settings, factory.get))
producer = Some(createProducer(session.get, address))
val p = new ProducerConnection(factory.get, session.get, producer.get)
conn = Some(p)
}
catch {
case (th: Throwable) => {
println(this + " warn: Unable to producer.connect: " + th.getMessage())
cleanFactory(factory)
cleanSession(session)
cleanProducer(producer)
throw th
}
}
return conn.get
}
}
// ----------------------------------------------------------------------
class Consumer(
val settings: Settings,
val address: String,
val queue: String,
handler: (HornetMessage) => Unit)
{
var conn: Option[ConsumerConnection] = None
override def toString(): String = {
return "<consumer:" + address + ">"
}
private def connect(): Unit = {
// Keep trying to get a good connection, no matter what.
while (conn == None) {
try {
conn = Some(HornetQ.getConsumerConnection(settings, address, queue, handler))
conn.get.start()
println(this + " we got a connection")
}
catch {
case (th: Throwable) => {
println(this + " " + th.toString)
println(this + " waiting 2000 millis")
Thread.sleep(2000)
if (conn != None) {
conn.get.stop() // Will clean up resources even if not running.
conn = None
}
}
}
}
}
def startUp(): Unit = {
println ("starting " + this)
connect()
}
def shutDown(): Unit = {
println ("stopping " + this)
conn match {
case None => ;
case Some(c) => c.stop()
}
}
}
// ----------------------------------------------------------------------
class Producer(val settings: Settings, val address: String) {
var conn: Option[ProducerConnection] = None
override def toString(): String = {
return "<producer:" + address + ">"
}
private def connect(): Unit = {
// Keep trying to get a good connection, no matter what.
while (conn == None) {
println (this + " attempting to connect")
try {
conn = Some(HornetQ.getProducerConnection(settings, address))
conn.get.start()
println(this + " we got a connection")
}
catch {
case (th: Throwable) => {
println(this + " " + th.toString)
if (conn != None)
conn.get.stop() // Will clean up resources even if not running.
println(this + " waiting 2000 millis")
Thread.sleep(2000)
}
}
}
}
def send(message: String): Unit = {
send(Map.empty, message)
}
def send(properties: Map[String, String], message: String): Unit = {
var retry = true
while (retry) {
try {
conn match {
case None =>
throw new Exception("Producer not started or connection down.")
case Some(c) =>
c.send(HornetMessage(properties, message)) ;
// println("message successfully sent") ;
retry = false
}
}
catch {
case (th: Throwable) => {
println(this + " " + th.toString)
if (conn != None)
conn.get.stop() // Will clean up resources even if not running.
println(this + " trying to reconnect after a failed send")
conn = None
connect()
}
}
}
}
def startUp(): Unit = {
println("starting " + this)
connect()
}
def shutDown(): Unit = {
conn match {
case None => ;
case Some(c) => c.stop()
}
}
}
}
package zentrope.testhornet {
// This is a small app to see what it's like to work with
// HornetQ messaging in a Scala context. The main things I'm
// interested in are making working with HornetQ itself as a
// resource easier, and making convenience methods, classes, data
// structures easier for anyone using this code.
import scala.actors._
import zentrope.hornet._
object Util {
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
})
}
private case object GetOffStage
class PresenceReceiver extends DaemonActor {
// An actor for receiving messages from the "scala.presence" address.
def startUp() = {
start
}
def shutDown() =
this ! GetOffStage
def act() {
val settings = new Settings(user="guest", pass="guest")
val address = "scala.presence"
val tempQueue = "scala.test"
val handler = (msg: HornetMessage) => this ! msg
val inbox = new Consumer(settings, address, tempQueue, handler)
inbox.startUp
loop { react {
case HornetMessage(properties, body) => {
println("--------------------------------")
println("RECEIVE:")
properties foreach { case (key, value) =>
println(format(" + property: %s -> %s", key, value))
}
println(" + body: [" + body + "]")
println("--------------------------------")
}
case GetOffStage => {
println(" -- shutting down " + this)
inbox.shutDown
exit
}
case unknown =>
println(this + " got unknown message: " + unknown)
}}
}
}
class PresenceSender extends DaemonActor {
// An actor for periodically sending "presence" messages to the
// HornetQ address: "scala.presence"
private var counter: Int = 1
def startUp() = {
start()
}
def shutDown() =
this ! GetOffStage
private def timeout(outbox: Producer): Unit = {
// Message body
val msg = " msg: " + this + " (" + counter + ") "
// Message props
val props = Map[String, String] (
"message-id" -> System.currentTimeMillis().toString(),
"counter" -> counter.toString()
)
outbox.send(props, msg)
counter += 1
println("SENT: [" + msg + "]")
}
def act() {
def fiveSeconds = 5000
// Named parameters!
val settings = new Settings(user="guest", pass="guest")
val outbox: Producer = new Producer(settings, "scala.presence")
outbox.startUp
loop { reactWithin(fiveSeconds) {
case TIMEOUT =>
timeout(outbox)
case GetOffStage => {
outbox.shutDown
exit()
}
}}
}
}
class Lock {
// Super simple class to implement a lock on execution so this
// app will stay up until the user hits Control-C.
var o: AnyRef = new Object()
def acquire() : Unit = {
o.synchronized {
o.wait()
}
}
def release() : Unit = {
o.synchronized {
o.notifyAll()
}
}
}
object Main {
val lock: Lock = new Lock()
def main(args: Array[String]) = {
println("hello")
// Start a consumer
val consumer = new PresenceReceiver()
consumer.startUp
// Start a producer
val producer = new PresenceSender()
producer.startUp
// Make sure the consumer and producers get shut down
// when the user hits Control-C.
Util.addShutdownHook {
producer.shutDown()
consumer.shutDown()
Thread.sleep(4000)
lock.release
}
println("^C to shut down")
lock.acquire
System.exit(0)
}
}
}
@zentrope
Copy link
Author

zentrope commented Sep 4, 2010

HornetQ wrappers.

The second file is the example, which also shows the use of actors, one for sending out pings, another for receiving them. Fun!

The first file is a package with public objects to make it easy to interact with HornetQ: Publisher, Consumer, HornetMessage and Settings. These aren't really meant to be a "scala wrapper" for HornetQ, but just an example of wrapper HornetQ to make it simpler in a specific app. It's only useful if, for the most part, you're using sensible defaults and all you really want to do is consume, publish, and maybe change a few config options. Otherwise, this falls down.

Also, I've tried to build in some fault tolerance for the producer side of the equation. HornetQ consumers will wait forever for an instance to come back up, but producers won't. I'm not sure why. Perhaps I misunderstood some documentation somewhere?

The big learning for me was the use of the Option class, which helps, in a type safe way, do the sorts of things you might do with null. In other words, deal with uninitialized data types.

Unfortunately, figuring out a way to wrap Java libs is not really the best part about using Scala, but this is a nice little exercise anyway.

I think there's also room for improvement to try and exploit the OO features a bit more (which I generally don't do in Java because it makes things too spaghetti-like).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment