Skip to content

Instantly share code, notes, and snippets.

@EECOLOR
Last active August 29, 2015 14:06
Show Gist options
  • Save EECOLOR/b1a8856608553c83acb4 to your computer and use it in GitHub Desktop.
Save EECOLOR/b1a8856608553c83acb4 to your computer and use it in GitHub Desktop.
An alternative version of typed actors. Actors can only receive messages of a certain type, the type parameter determines the return type.
package experiment
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.language.higherKinds
import scala.language.implicitConversions
import scala.reflect.ClassTag
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorRefFactory
import akka.actor.Props
import akka.util.Timeout
trait TypedActor[MessageType[_]] { outer: Actor =>
type TypedReceive[ResultType] = TypedActor.TypedReceive[MessageType, ResultType]
private def receiveFor[ResultType](typedReceive: TypedReceive[ResultType]): Receive = {
// I don't know how to get rid of this warning
case message: MessageType[ResultType] if typedReceive isDefinedAt message =>
import context.dispatcher
import akka.pattern.pipe
typedReceive(message).asFuture pipeTo sender
}
// This should be in the context, for this example that was too complex
def typedBecome[ResultType](behavior: TypedReceive[ResultType]) =
context.become(receiveFor(behavior))
def typedBecome[ResultType](behavior: TypedReceive[ResultType], discardOld: Boolean) =
context.become(receiveFor(behavior), discardOld)
final def receive: Receive = receiveFor(typedReceive)
def typedReceive[ResultType]: TypedReceive[ResultType]
}
object TypedActor {
def apply(system: ActorRefFactory) = new TypedActorFactory(system)
type TypedReceive[MessageType[_], ResultType] = PartialFunction[MessageType[ResultType], Result[ResultType]]
sealed trait Result[ResultType] {
def asFuture: Future[ResultType]
}
object Result {
implicit def any[ResultType](value: ResultType) =
new Result[ResultType] {
val asFuture = Future successful value
}
implicit def forFuture[ResultType](value: Future[ResultType]) =
new Result[ResultType] {
val asFuture = value
}
}
}
class TypedActorRef[MessageType[_]](actorRef: ActorRef) {
def ?[ReturnType](message: MessageType[ReturnType])(implicit timeout: Timeout, ec: ExecutionContext): Future[ReturnType] = {
import akka.pattern.ask
(actorRef ? message).map(_.asInstanceOf[ReturnType])
}
}
trait TypedProps[MessageType[_]] {
def actorProps: Props
}
object TypedProps {
def apply[T <: Actor: ClassTag](implicit helper: TypeInferenceHelper[T]) =
new TypedProps[helper.MessageType] {
def actorProps = Props[T]
}
}
class TypedActorFactory(context: ActorRefFactory) {
def typedActorOf[MessageType[_]](props: TypedProps[MessageType]) =
new TypedActorRef[MessageType](context actorOf props.actorProps)
}
trait TypeInferenceHelper[T] {
type MessageType[_]
}
object TypeInferenceHelper {
type Aux[T, M[_]] = TypeInferenceHelper[T] {
type MessageType[x] = M[x]
}
implicit def any[T, M[_]](implicit ev: T => TypedActor[M]): Aux[T, M] = null
}
package experiment
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.language.higherKinds
import scala.language.implicitConversions
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.util.Timeout
sealed trait Message1[x]
case class Echo(input: String) extends Message1[String]
case class ComplexComputation(input: Int) extends Message1[Int]
class Test1 extends Actor with TypedActor[Message1] {
val test2 = TypedActor(context).typedActorOf(TypedProps[Test2])
import context.dispatcher
def typedReceive[ResultType] = {
case Echo(input) =>
typedBecome(strangeEcho)
input
case ComplexComputation(input) =>
implicit val timeout = Timeout(5.seconds)
(test2 ? SimpleComputation(input)).map(_ + 1)
}
def strangeEcho[ResultType]: TypedReceive[ResultType] = {
case Echo(input) => input + "..." + input
}
}
sealed trait Message2[x]
case class SimpleComputation(input: Int) extends Message2[Int]
class Test2 extends Actor with TypedActor[Message2] {
def typedReceive[ResultType] = {
case SimpleComputation(input) => input + 1
}
}
object Explore {
val system = ActorSystem("test")
val typedSystem = TypedActor(system)
val typedRef = typedSystem.typedActorOf(TypedProps[Test1])
implicit val timeout = Timeout(5.seconds)
import system.dispatcher
val result1: Future[Int] = typedRef ? ComplexComputation(1)
val result2: Future[String] = typedRef ? Echo("test")
val result3: Future[String] = typedRef ? Echo("test")
import system.dispatcher
val result = Await.result(Future.sequence(Seq(result1, result2, result3)), 1.second)
system.shutdown()
system.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment