Created
October 28, 2010 19:09
-
-
Save stephanos/652097 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.reflect.BeanInfo | |
import se.scalablesolutions.akka.amqp._ | |
import sjson.json.Serializer | |
import se.scalablesolutions.akka.amqp.AMQP._ | |
import se.scalablesolutions.akka.actor._ | |
/** | |
* An attempt to make the communication over AMQP sort of 'type-safe'. | |
* | |
* - The producer and the consumer have been merged to one 'messenger' which is connected with a 'routing key parent' | |
* - Each routing key has an object representing it (and a 'group' it belongs to) | |
* - Furthermore each routing key has a value object type ascociated with it | |
* | |
* This allows a type-safe construct like the following: | |
* logMessenger ! LogMessageKey(new LogMessage("heureka")) | |
* | |
* The only problem remaining is: how to ensure type-safety on the receiving side. | |
* The receiver gets the key as a string and the data as a byte array. | |
* The key can be parsed at runtime using reflection - this allows the 'listener' method match for all suitable | |
* keys -> type safe! (a warning about 'not being exhaustive' is issued if one type is missing - well, since | |
* I added the [B <: ValueObj] etc. this unfortunately wasn't the case anymore :( ) | |
* | |
* But how to parse the data and allow the compiler to type check? | |
* | |
* PS: ASSUMING akka 1.0-M1 and a local running RabbitMQ instance | |
*/ | |
object Main { | |
val conn = AMQP.newConnection() | |
val exchange = ExchangeParameters("amqp-type-safe-test-exchange", Topic) | |
val logMessenger = Actor.actorOf[LogMessenger] | |
AMQP.newConsumer(conn, ConsumerParameters("log.#", logMessenger, None, Some(exchange))) | |
def main(args: Array[String]) { | |
logMessenger.start | |
logMessenger ! LogMessageKey(new LogMessage("heureka")) | |
logMessenger ! LogTimingKey(new LogTiming(1)) | |
} | |
} | |
// ================================================================================================== | |
// === Value Objects - sent through pipe | |
trait ValueObj | |
@BeanInfo case class LogMessage(res: String) extends ValueObj { | |
override def toString = "operation yields " + res | |
def this() = this ("") // default constructor is necessary for deserialization | |
} | |
@BeanInfo case class LogTiming(sec: Int) extends ValueObj { | |
override def toString = "operation took X seconds " + sec | |
def this() = this (-1) // default constructor is necessary for deserialization | |
} | |
// ================================================================================================== | |
// === Routing Keys - tell the VO where to go | |
abstract sealed class Key[B <: ValueObj](name: String)(implicit m: Manifest[B]) { | |
override def toString = name | |
def apply(data: B) = new Request[B](data, this) | |
def unpack(data: Array[Byte]): B = MySerializer.unpack[B](data).asInstanceOf[B] | |
} | |
abstract sealed class LoggingKeyGroup[B <: ValueObj](name: String)(implicit m: Manifest[B]) extends Key[B](name: String) | |
object LogMessageKey extends LoggingKeyGroup[LogMessage]("log.msg") | |
object LogTimingKey extends LoggingKeyGroup[LogTiming]("log.time") | |
case class Request[T <: ValueObj](data: T, key: Key[T])(implicit m: Manifest[T]) | |
// ================================================================================================== | |
// === Messenger - Receives routing keys (as string) and value objects (as byte array) | |
abstract class AbstractMessenger[R <: Key[_]](implicit m: Manifest[R]) extends Actor { | |
var _rawData: Array[Byte] = _ | |
lazy val _keys: List[R] = calcKeys(m.erasure.asInstanceOf[Class[Key[_]]]).asInstanceOf[List[R]] | |
val producer: ActorRef | |
protected def receive = { | |
case Delivery(body, key, _, _, _) => | |
_rawData = body | |
log.info("received message from AMQP with key " + key.toString) | |
myReceive apply wrapKey[R](_keys, key) | |
case Request(body, key) => | |
log.info("sending message to AMQP with key " + key.toString) | |
send(body, key) | |
case other => | |
log.info("received unknown message: " + other.toString) | |
} | |
protected def send[D](data: D, key: Key[_])(implicit m: Manifest[D]) { | |
producer ! Message(MySerializer.pack(data.asInstanceOf[AnyRef]), key.toString) | |
} | |
def wrapKey[R <: Key[_]](keys: List[R], keyName: String): R = keys.find(k => k.toString == keyName).get | |
def myReceive(): PartialFunction[R, Unit] | |
/* PS: internally I use Spring's ClassPathScanningCandidateComponentProvider to find all children of the base class */ | |
def calcKeys(r: Class[Key[_]]): List[Key[_]] = // mock version (just for this example) | |
List(LogMessageKey, LogTimingKey) | |
} | |
class LogMessenger extends AbstractMessenger[LoggingKeyGroup[_]] { | |
import MySerializer._ | |
lazy val producer = AMQP.newProducer(Main.conn, ProducerParameters(Some(Main.exchange), producerId = Some("my_producer"))) | |
/** | |
* This is where the 'problem' is: | |
* The 'unpack' method requires the type of the VO - but the type is already 'IN' the key. | |
* => So how to parse the 'data' correctly based on the type of the according key (to make it type-safe) ??? | |
*/ | |
def myReceive() = { | |
case LogMessageKey => BusinessLogicMock.processA(unpack[LogMessage](_rawData)) | |
case LogTimingKey => BusinessLogicMock.processB(unpack[LogTiming](_rawData)) | |
} | |
} | |
// ================================================================================================== | |
// === Business Logic - where the value objects are eventually used | |
object BusinessLogicMock { | |
def processA(vo: LogMessage) {println(vo.toString)} | |
def processB(vo: LogTiming) {println(vo.toString)} | |
} | |
// ================================================================================================== | |
// === Serializer - used to reconstruct the BaseVO object from byte data/string | |
object MySerializer { | |
def pack(obj: AnyRef) = Serializer.SJSON.out(obj) | |
def unpack[T](bytes: Array[Byte])(implicit m: Manifest[T]): T = Serializer.SJSON.in[T](new String(bytes)).asInstanceOf[T] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment