Skip to content

Instantly share code, notes, and snippets.

@kulikov
Last active August 29, 2015 14:03
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save kulikov/6b7ac54198f8e11f1f03 to your computer and use it in GitHub Desktop.
Typed request replay pattern for Akka actors
package eu.inn.util.akka
import concurrent.Future
import concurrent.duration._
import scala.reflect.ClassTag
import util.control.NonFatal
import akka.actor._
import akka.util.Timeout
trait Replyable[T]
class ReplyableActorRef(actorRef: ActorRef, timeout: Timeout) {
def ??[T: ClassTag](message: Replyable[T]): Future[T] =
akka.pattern.ask(actorRef, message)(timeout).mapTo[T]
}
trait ReplySupport {
private val defaultTimeout = Timeout(5 seconds)
implicit def reply(actorRef: ActorRef)(implicit timeout: Timeout = defaultTimeout): ReplyableActorRef =
new ReplyableActorRef(actorRef, timeout)
}
trait ReplyingActor extends Actor {
def receiveAndReply[T]: PartialFunction[Replyable[T], Future[T]]
def receive = sendReply
implicit protected def anyToFuture[T](msg: T): Future[T] =
try Future.successful(msg) catch {
case NonFatal(e) Future.failed(e)
}
def sendReply: Receive = {
case msg: Replyable[_] if receiveAndReply.isDefinedAt(msg)
try {
akka.pattern.pipe(receiveAndReply(msg))(context.dispatcher) pipeTo sender
} catch {
case NonFatal(e) sender ! Status.Failure(e)
}
}
}
case class GetUser(id: Int) extends Replyable[User]
case class User(name: String)
val f: Future[User] = actor ?? GetUser(123)
class MyActor extends ReplyingActor {
def receiveAndReply[T] = {
case GetUser(id)
User("dima") // implicit auto convert any T to Future[T]
case DoWork(number)
self ?? GetUser(123) map { user
WorkResult("work result: " + user.name)
}
}
// optionaly
override def receive = sendReply orElse {
case "Some another message"
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment