Skip to content

Instantly share code, notes, and snippets.

@zentrope
Created August 27, 2010 20:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zentrope/554162 to your computer and use it in GitHub Desktop.
Save zentrope/554162 to your computer and use it in GitHub Desktop.
package zentrope.hornet {
// This is a small app to see what it's like to work with
// HornetQ messaging in a Scala context. The main things I'm
// interested in are making working with HornetQ itself as a
// resource easier, and making confience methods, classes, data
// structures easier for anyone using this code.
import scala.actors._
import scala.concurrent.ops.spawn
object Util {
def addShutdownHook(body: => Unit) =
Runtime.getRuntime.addShutdownHook(new Thread {
override def run { body }
})
def withWarning(body: => Unit) = {
// Wraps an exceptional condition we want to warn
// about, but let pass.
try {
body
}
catch {
case (th: Throwable) => {
println("WARN: " + th.toString())
}
}
}
def withLoggedExceptions(prompt: String)(body: => Unit) = {
// Executes BODY, and logs exceptions, if there are any, but
// doesn't do anything special to handle them.
try {
body
}
catch {
case (th: Throwable) =>
println(" -- " + prompt) ;
println("WARN: " + th.toString) ;
th.printStackTrace()
}
}
}
object Conn {
// Common connection oriented properties. Hard coded
// for now.
import org.hornetq.api.core.client._
val discovery: String = "231.7.7.7"
val port: Int = 9876
val user: String = "guest"
val pass: String = "guest"
val xa: Boolean = false
val autoCommitSends: Boolean = true
val autoCommitAcks: Boolean = true
val preAcknowledge: Boolean = false
val ackBatchSize: Int = HornetQClient.DEFAULT_ACK_BATCH_SIZE
}
case class HornetMessage(body: String)
// Just the body for now. Later, add a map of properties.
class Consumer(address: String, queue: String) extends DaemonActor {
// For now, nothing's durable.
import org.hornetq.api.core._
import org.hornetq.api.core.client._
// Public API
def startUp() =
start
def shutDown() =
this ! ShutDownPlease
def add(observer: Actor): Unit =
// TODO: Throw an exception if the actor isn't yet started.
// TODO: Do we really need more than one receiver of these
// messages? Instead, just make a new consumer with a
// different queue.
this ! AddObserver(observer)
// Internal
private def delegateToObservers(msg: HornetMessage): Unit = {
this ! msg
}
private case object ShutDownPlease
private case class AddObserver(actor: Actor)
private class InboxHandler(consumer: Consumer) extends MessageHandler {
def onMessage(msg: ClientMessage): Unit = {
Util.withLoggedExceptions("receiving message") {
val body = msg.getBodyBuffer().readString()
consumer.delegateToObservers(HornetMessage(body))
}
}
}
private def initFactory(): ClientSessionFactory = {
val factory: ClientSessionFactory = HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port)
factory.setDiscoveryInitialWaitTimeout(60000)
factory.setBlockOnAcknowledge(true)
factory.setBlockOnNonDurableSend(true)
factory.setBlockOnDurableSend(true)
return factory
}
private def initSession(factory: ClientSessionFactory): ClientSession = {
var session: ClientSession = factory.createSession(Conn.user,
Conn.pass, Conn.xa, Conn.autoCommitSends,
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize)
Util.withWarning {
// Throws exception if queue already created.
session.createTemporaryQueue(address, queue)
}
return session
}
private def initConsumer(session: ClientSession): ClientConsumer = {
var consumer: ClientConsumer = session.createConsumer(queue)
consumer.setMessageHandler(new InboxHandler(this))
return consumer
}
private def close(factory: ClientSessionFactory,
session: ClientSession,
consumer: ClientConsumer) {
println("stopping " + this)
Util.withLoggedExceptions("closing factory " + this) {
if (factory != null)
factory.close()
}
Util.withLoggedExceptions("closing session " + this) {
if (session != null)
session.close()
}
Util.withLoggedExceptions("closing producer " + this) {
if (consumer != null)
consumer.close()
}
}
def act() {
val factory = initFactory()
val session = initSession(factory)
val consumer = initConsumer(session)
session.start()
var observers: List[Actor] = List()
loop {
react {
case AddObserver(observer) => {
println(this + " adding observer " + observer)
observers = observer :: observers
}
case HornetMessage(body) => {
observers foreach ( observer =>
// println("sending " + observer + " a message ")
observer ! HornetMessage(body)
)
}
case ShutDownPlease => {
println(this + " is shutting down")
close(factory, session, consumer)
exit
}
case unknown => {
println(this + " recvd unknown: " + unknown)
}
}
}
}
}
class Producer(val address: String) extends DaemonActor {
// For now, nothing's durable.
// And no fault tolerance.
// UGLY!
// For later: Have a "supervisor" actor
// which monitors for when a "producer" crashes. When that happens,
// the supervisor restarts it. Separate the concern for dealing with
// errors from the thing that has the errors.
import org.hornetq.api.core._
import org.hornetq.api.core.client._
private case object STOP_ACTOR
private case class SEND_MESSAGE(body: String)
def startUp() {
println("connecting " + this)
this.start
}
def shutDown() {
println("disconnecting " + this);
this ! STOP_ACTOR
}
override
def toString() : String =
"<producer:a=" + address + ">"
def send(body: String): Unit = {
// Let's just send only payloads: no properties.
this ! SEND_MESSAGE(body)
}
private
def close(factory: ClientSessionFactory,
session: ClientSession,
producer: ClientProducer) {
println("stopping " + this)
Util.withLoggedExceptions("closing factory " + this) {
if (factory != null)
factory.close()
}
Util.withLoggedExceptions("closing session " + this) {
if (session != null)
session.close()
}
Util.withLoggedExceptions("closing producer " + this) {
if (producer != null)
producer.close()
}
}
def act() {
var factory: ClientSessionFactory =
HornetQClient.createClientSessionFactory(Conn.discovery, Conn.port)
factory.setDiscoveryInitialWaitTimeout(60000)
factory.setBlockOnNonDurableSend(true)
var session: ClientSession = factory.createSession(Conn.user, Conn.pass,
Conn.xa, Conn.autoCommitSends,
Conn.autoCommitAcks, Conn.preAcknowledge, Conn.ackBatchSize)
var producer: ClientProducer = session.createProducer(address)
session.start()
loop {
react {
case SEND_MESSAGE(body) => {
Util.withLoggedExceptions("sending message to " + address) {
val msg: ClientMessage = session.createMessage(false)
msg.getBodyBuffer().writeString(body)
producer.send(msg)
}
}
case STOP_ACTOR => {
close(factory, session, producer)
exit
}
}
}
}
}
class PresenceSubscriber extends DaemonActor { // Need a "HornetActor" type
private case object GiveItUp
def startUp() = start
def shutDown() =
this ! GiveItUp
def act() {
loop {
react {
case HornetMessage(body) => {
println("GOT MESSAGE: " + body)
}
case GiveItUp => {
println(" -- shutting down " + this)
exit
}
case unknown =>
println(this + " got unknown message: " + unknown)
}
}
}
}
class PresenceNotifier extends DaemonActor {
private case object StopActor
private case object StartActor
def startUp() =
start()
def shutDown() =
this ! StopActor
private def timeout(): Unit = {
val msg = " ---> ping: " + this + " <---- "
println("sending [" + msg + "]")
PRESENCE_TOPIC.send(msg)
}
def act() {
// def producer = new Producer("test.scala")
// producer.startUp()
def fiveSeconds = 5000
loop {
reactWithin(fiveSeconds) {
case TIMEOUT => timeout()
case StopActor => {
exit()
}
}
}
}
}
class Lock {
var o: AnyRef = new Object()
def acquire() : Unit = {
o.synchronized {
o.wait()
}
}
def release() : Unit = {
o.synchronized {
o.notifyAll()
}
}
}
object PRESENCE_TOPIC extends Producer("scala.presence") {}
// So that we can reference this "statically" for those
// actors that want to send presence information.
object PRESENCE_CLIENT extends Consumer("scala.presence", "scala.queue")
object Main {
val lock: Lock = new Lock()
def main(args: Array[String]) = {
println("hello")
// Because the notifier and the "producer" are both actors,
// you can't start one inside the other.
PRESENCE_TOPIC.startUp()
PRESENCE_CLIENT.startUp()
val c = new PresenceSubscriber()
PRESENCE_CLIENT.add(c)
val p = new PresenceNotifier()
Util.addShutdownHook {
println("\nInvoking shutdown handler.")
println("PRESENCE_TOPIC")
PRESENCE_TOPIC.shutDown()
println("PRESENCE_CLIENT")
PRESENCE_CLIENT.shutDown()
println("PRODUCER")
p.shutDown
println("CONSUMER")
c.shutDown
println("Waiting 4 secs to give time for proper shutdowns.")
Thread.sleep(4000)
lock.release
}
c.startUp()
p.startUp
println("^C to shut down")
// Thread.sleep(30000)
lock.acquire
}
}
}
@zentrope
Copy link
Author

Some really, really messy, fragile code with hints about how to integrate with a Java MQ system like HornetQ.

Two issues I learned:

  • You can't start an actor in another actor. (Maybe you can, but things didn't work for me at this early learning stage). So, I'm groping toward a way to start up HornetQ Consumers and Producers as a kind of static, global object that actors and other code can then use or register with. Or might be that we pass references in some appropriate way. I bet when I figure this out, it'll turn out to be a lot better than my old way of doing this sort of thing. (Better == more readable, easier for someone else to figure out and use.)
  • You can't really manage "state" in a class if you can't declare a default value right off the top. If you want to create a Producer class which contains a ClientSessionFactory declaration, you have to declare it right there in the code, rather then having a method called later (with proper error handling) do it. Declaring a factory = null, then attempting to assign something useful to it later didn't work. Partly, I think, this is because Scala lets you shadow variables, and partly it's part of the language that you can't change a type on the fly (such as from null to ClientSessionFactory). Maybe the reason this even came up is because I'm integrating with a Java client lib which instantiates objects via factory instead of the normal way.

Okay, notes to myself. But there you go.

@zentrope
Copy link
Author

zentrope commented Sep 4, 2010

New version of this here:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment