Skip to content

Instantly share code, notes, and snippets.

@stephanos
Created October 28, 2010 19:09
Show Gist options
  • Save stephanos/652097 to your computer and use it in GitHub Desktop.
Save stephanos/652097 to your computer and use it in GitHub Desktop.
import scala.reflect.BeanInfo
import se.scalablesolutions.akka.amqp._
import sjson.json.Serializer
import se.scalablesolutions.akka.amqp.AMQP._
import se.scalablesolutions.akka.actor._
/**
* An attempt to make the communication over AMQP sort of 'type-safe'.
*
* - The producer and the consumer have been merged to one 'messenger' which is connected with a 'routing key parent'
* - Each routing key has an object representing it (and a 'group' it belongs to)
* - Furthermore each routing key has a value object type ascociated with it
*
* This allows a type-safe construct like the following:
* logMessenger ! LogMessageKey(new LogMessage("heureka"))
*
* The only problem remaining is: how to ensure type-safety on the receiving side.
* The receiver gets the key as a string and the data as a byte array.
* The key can be parsed at runtime using reflection - this allows the 'listener' method match for all suitable
* keys -> type safe! (a warning about 'not being exhaustive' is issued if one type is missing - well, since
* I added the [B <: ValueObj] etc. this unfortunately wasn't the case anymore :( )
*
* But how to parse the data and allow the compiler to type check?
*
* PS: ASSUMING akka 1.0-M1 and a local running RabbitMQ instance
*/
object Main {
val conn = AMQP.newConnection()
val exchange = ExchangeParameters("amqp-type-safe-test-exchange", Topic)
val logMessenger = Actor.actorOf[LogMessenger]
AMQP.newConsumer(conn, ConsumerParameters("log.#", logMessenger, None, Some(exchange)))
def main(args: Array[String]) {
logMessenger.start
logMessenger ! LogMessageKey(new LogMessage("heureka"))
logMessenger ! LogTimingKey(new LogTiming(1))
}
}
// ==================================================================================================
// === Value Objects - sent through pipe
trait ValueObj
@BeanInfo case class LogMessage(res: String) extends ValueObj {
override def toString = "operation yields " + res
def this() = this ("") // default constructor is necessary for deserialization
}
@BeanInfo case class LogTiming(sec: Int) extends ValueObj {
override def toString = "operation took X seconds " + sec
def this() = this (-1) // default constructor is necessary for deserialization
}
// ==================================================================================================
// === Routing Keys - tell the VO where to go
abstract sealed class Key[B <: ValueObj](name: String)(implicit m: Manifest[B]) {
override def toString = name
def apply(data: B) = new Request[B](data, this)
def unpack(data: Array[Byte]): B = MySerializer.unpack[B](data).asInstanceOf[B]
}
abstract sealed class LoggingKeyGroup[B <: ValueObj](name: String)(implicit m: Manifest[B]) extends Key[B](name: String)
object LogMessageKey extends LoggingKeyGroup[LogMessage]("log.msg")
object LogTimingKey extends LoggingKeyGroup[LogTiming]("log.time")
case class Request[T <: ValueObj](data: T, key: Key[T])(implicit m: Manifest[T])
// ==================================================================================================
// === Messenger - Receives routing keys (as string) and value objects (as byte array)
abstract class AbstractMessenger[R <: Key[_]](implicit m: Manifest[R]) extends Actor {
var _rawData: Array[Byte] = _
lazy val _keys: List[R] = calcKeys(m.erasure.asInstanceOf[Class[Key[_]]]).asInstanceOf[List[R]]
val producer: ActorRef
protected def receive = {
case Delivery(body, key, _, _, _) =>
_rawData = body
log.info("received message from AMQP with key " + key.toString)
myReceive apply wrapKey[R](_keys, key)
case Request(body, key) =>
log.info("sending message to AMQP with key " + key.toString)
send(body, key)
case other =>
log.info("received unknown message: " + other.toString)
}
protected def send[D](data: D, key: Key[_])(implicit m: Manifest[D]) {
producer ! Message(MySerializer.pack(data.asInstanceOf[AnyRef]), key.toString)
}
def wrapKey[R <: Key[_]](keys: List[R], keyName: String): R = keys.find(k => k.toString == keyName).get
def myReceive(): PartialFunction[R, Unit]
/* PS: internally I use Spring's ClassPathScanningCandidateComponentProvider to find all children of the base class */
def calcKeys(r: Class[Key[_]]): List[Key[_]] = // mock version (just for this example)
List(LogMessageKey, LogTimingKey)
}
class LogMessenger extends AbstractMessenger[LoggingKeyGroup[_]] {
import MySerializer._
lazy val producer = AMQP.newProducer(Main.conn, ProducerParameters(Some(Main.exchange), producerId = Some("my_producer")))
/**
* This is where the 'problem' is:
* The 'unpack' method requires the type of the VO - but the type is already 'IN' the key.
* => So how to parse the 'data' correctly based on the type of the according key (to make it type-safe) ???
*/
def myReceive() = {
case LogMessageKey => BusinessLogicMock.processA(unpack[LogMessage](_rawData))
case LogTimingKey => BusinessLogicMock.processB(unpack[LogTiming](_rawData))
}
}
// ==================================================================================================
// === Business Logic - where the value objects are eventually used
object BusinessLogicMock {
def processA(vo: LogMessage) {println(vo.toString)}
def processB(vo: LogTiming) {println(vo.toString)}
}
// ==================================================================================================
// === Serializer - used to reconstruct the BaseVO object from byte data/string
object MySerializer {
def pack(obj: AnyRef) = Serializer.SJSON.out(obj)
def unpack[T](bytes: Array[Byte])(implicit m: Manifest[T]): T = Serializer.SJSON.in[T](new String(bytes)).asInstanceOf[T]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment