Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created October 8, 2014 10:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arturaz/86ad73dce5e75761810b to your computer and use it in GitHub Desktop.
Save arturaz/86ad73dce5e75761810b to your computer and use it in GitHub Desktop.
package utils.actors
import akka.actor.{ActorLogging, Actor, ActorRef}
import akka.util.ByteString
import scala.reflect.ClassTag
object CodedFrameProxy {
case class Init(intFramedProxy: ActorRef, decodedMessageHandler: ActorRef)
}
class CodedFrameProxy[In, Out : ClassTag](
decoder: ByteString => Either[String, In], encoder: Out => ByteString
) extends Actor with ActorLogging {
import CodedFrameProxy._
override def receive = waitingForInit
private[this] def waitingForInit: Receive = {
case init: Init =>
context.watch(init.intFramedProxy)
context.watch(init.decodedMessageHandler)
context.become(initialized(init))
}
private[this] def initialized(init: Init): Receive = {
case newInit: Init =>
log.warning(s"Received new init ($newInit) while already initialized with $init")
case IntFramedProxy.Frame(data) =>
decoder(data).fold(
err => log.warning(s"Cannot decode $data: $err"),
in => init.decodedMessageHandler.tell(in, sender())
)
case msg: Out =>
init.intFramedProxy.tell(IntFramedProxy.Frame(encoder(msg)), sender())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment