Skip to content

Instantly share code, notes, and snippets.

Created August 27, 2010 20:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/554162 to your computer and use it in GitHub Desktop.
Save zentrope/554162 to your computer and use it in GitHub Desktop.
package zentrope.hornet {
// This is a small app to see what it's like to work with
// HornetQ messaging in a Scala context. The main things I'm
// interested in are making working with HornetQ itself as a
// resource easier, and making confience methods, classes, data
// structures easier for anyone using this code.
import scala.actors._
import scala.concurrent.ops.spawn
object Util {
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
def withWarning(body: => Unit) = {
// Wraps an exceptional condition we want to warn
// about, but let pass.
try {
catch {
case (th: Throwable) => {
println("WARN: " + th.toString())
def withLoggedExceptions(prompt: String)(body: => Unit) = {
// Executes BODY, and logs exceptions, if there are any, but
// doesn't do anything special to handle them.
try {
catch {
case (th: Throwable) =>
println(" -- " + prompt) ;
println("WARN: " + th.toString) ;
object Conn {
// Common connection oriented properties. Hard coded
// for now.
import org.hornetq.api.core.client._
val discovery: String = ""
val port: Int = 9876
val user: String = "guest"
val pass: String = "guest"
val xa: Boolean = false
val autoCommitSends: Boolean = true
val autoCommitAcks: Boolean = true
val preAcknowledge: Boolean = false
val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE
case class HornetMessage(body: String)
// Just the body for now. Later, add a map of properties.
class Consumer(address: String, queue: String) extends DaemonActor {
// For now, nothing's durable.
import org.hornetq.api.core._
import org.hornetq.api.core.client._
// Public API
def startUp() =
def shutDown() =
this ! ShutDownPlease
def add(observer: Actor): Unit =
// TODO: Throw an exception if the actor isn't yet started.
// TODO: Do we really need more than one receiver of these
// messages? Instead, just make a new consumer with a
// different queue.
this ! AddObserver(observer)
// Internal
private def delegateToObservers(msg: HornetMessage): Unit = {
this ! msg
private case object ShutDownPlease
private case class AddObserver(actor: Actor)
private class InboxHandler(consumer: Consumer) extends MessageHandler {
def onMessage(msg: ClientMessage): Unit = {
Util.withLoggedExceptions("receiving message") {
val body = msg.getBodyBuffer().readString()
private def initFactory(): ClientSessionFactory = {
val factory: ClientSessionFactory = HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port)
return factory
private def initSession(factory: ClientSessionFactory): ClientSession = {
var session: ClientSession = factory.createSession(Conn.user,
Conn.pass, Conn.xa, Conn.autoCommitSends,
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize)
Util.withWarning {
// Throws exception if queue already created.
session.createTemporaryQueue(address, queue)
return session
private def initConsumer(session: ClientSession): ClientConsumer = {
var consumer: ClientConsumer = session.createConsumer(queue)
consumer.setMessageHandler(new InboxHandler(this))
return consumer
private def close(factory: ClientSessionFactory,
session: ClientSession,
consumer: ClientConsumer) {
println("stopping " + this)
Util.withLoggedExceptions("closing factory " + this) {
if (factory != null)
Util.withLoggedExceptions("closing session " + this) {
if (session != null)
Util.withLoggedExceptions("closing producer " + this) {
if (consumer != null)
def act() {
val factory = initFactory()
val session = initSession(factory)
val consumer = initConsumer(session)
var observers: List[Actor] = List()
loop {
react {
case AddObserver(observer) => {
println(this + " adding observer " + observer)
observers = observer :: observers
case HornetMessage(body) => {
observers foreach ( observer =>
// println("sending " + observer + " a message ")
observer ! HornetMessage(body)
case ShutDownPlease => {
println(this + " is shutting down")
close(factory, session, consumer)
case unknown => {
println(this + " recvd unknown: " + unknown)
class Producer(val address: String) extends DaemonActor {
// For now, nothing's durable.
// And no fault tolerance.
// UGLY!
// For later: Have a "supervisor" actor
// which monitors for when a "producer" crashes. When that happens,
// the supervisor restarts it. Separate the concern for dealing with
// errors from the thing that has the errors.
import org.hornetq.api.core._
import org.hornetq.api.core.client._
private case object STOP_ACTOR
private case class SEND_MESSAGE(body: String)
def startUp() {
println("connecting " + this)
def shutDown() {
println("disconnecting " + this);
def toString() : String =
"<producer:a=" + address + ">"
def send(body: String): Unit = {
// Let's just send only payloads: no properties.
this ! SEND_MESSAGE(body)
def close(factory: ClientSessionFactory,
session: ClientSession,
producer: ClientProducer) {
println("stopping " + this)
Util.withLoggedExceptions("closing factory " + this) {
if (factory != null)
Util.withLoggedExceptions("closing session " + this) {
if (session != null)
Util.withLoggedExceptions("closing producer " + this) {
if (producer != null)
def act() {
var factory: ClientSessionFactory =
HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port)
var session: ClientSession = factory.createSession(Conn.user, Conn.pass,
Conn.xa, Conn.autoCommitSends,
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize)
var producer: ClientProducer = session.createProducer(address)
loop {
react {
case SEND_MESSAGE(body) => {
Util.withLoggedExceptions("sending message to " + address) {
val msg: ClientMessage = session.createMessage(false)
case STOP_ACTOR => {
close(factory, session, producer)
class PresenceSubscriber extends DaemonActor { // Need a "HornetActor" type
private case object GiveItUp
def startUp() = start
def shutDown() =
this ! GiveItUp
def act() {
loop {
react {
case HornetMessage(body) => {
println("GOT MESSAGE: " + body)
case GiveItUp => {
println(" -- shutting down " + this)
case unknown =>
println(this + " got unknown message: " + unknown)
class PresenceNotifier extends DaemonActor {
private case object StopActor
private case object StartActor
def startUp() =
def shutDown() =
this ! StopActor
private def timeout(): Unit = {
val msg = " ---> ping: " + this + " <---- "
println("sending [" + msg + "]")
def act() {
// def producer = new Producer("test.scala")
// producer.startUp()
def fiveSeconds = 5000
loop {
reactWithin(fiveSeconds) {
case TIMEOUT => timeout()
case StopActor => {
class Lock {
var o: AnyRef = new Object()
def acquire() : Unit = {
o.synchronized {
def release() : Unit = {
o.synchronized {
object PRESENCE_TOPIC extends Producer("scala.presence") {}
// So that we can reference this "statically" for those
// actors that want to send presence information.
object PRESENCE_CLIENT extends Consumer("scala.presence", "scala.queue")
object Main {
val lock: Lock = new Lock()
def main(args: Array[String]) = {
// Because the notifier and the "producer" are both actors,
// you can't start one inside the other.
val c = new PresenceSubscriber()
val p = new PresenceNotifier()
Util.addShutdownHook {
println("\nInvoking shutdown handler.")
println("Waiting 4 secs to give time for proper shutdowns.")
println("^C to shut down")
// Thread.sleep(30000)
Copy link

Some really, really messy, fragile code with hints about how to integrate with a Java MQ system like HornetQ.

Two issues I learned:

  • You can't start an actor in another actor. (Maybe you can, but things didn't work for me at this early learning stage). So, I'm groping toward a way to start up HornetQ Consumers and Producers as a kind of static, global object that actors and other code can then use or register with. Or might be that we pass references in some appropriate way. I bet when I figure this out, it'll turn out to be a lot better than my old way of doing this sort of thing. (Better == more readable, easier for someone else to figure out and use.)
  • You can't really manage "state" in a class if you can't declare a default value right off the top. If you want to create a Producer class which contains a ClientSessionFactory declaration, you have to declare it right there in the code, rather then having a method called later (with proper error handling) do it. Declaring a factory = null, then attempting to assign something useful to it later didn't work. Partly, I think, this is because Scala lets you shadow variables, and partly it's part of the language that you can't change a type on the fly (such as from null to ClientSessionFactory). Maybe the reason this even came up is because I'm integrating with a Java client lib which instantiates objects via factory instead of the normal way.

Okay, notes to myself. But there you go.

Copy link

zentrope commented Sep 4, 2010

New version of this here:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment