Typed request replay pattern for Akka actors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eu.inn.util.akka | |
import concurrent.Future | |
import concurrent.duration._ | |
import scala.reflect.ClassTag | |
import util.control.NonFatal | |
import akka.actor._ | |
import akka.util.Timeout | |
trait Replyable[T] | |
class ReplyableActorRef(actorRef: ActorRef, timeout: Timeout) { | |
def ??[T: ClassTag](message: Replyable[T]): Future[T] = | |
akka.pattern.ask(actorRef, message)(timeout).mapTo[T] | |
} | |
trait ReplySupport { | |
private val defaultTimeout = Timeout(5 seconds) | |
implicit def reply(actorRef: ActorRef)(implicit timeout: Timeout = defaultTimeout): ReplyableActorRef = | |
new ReplyableActorRef(actorRef, timeout) | |
} | |
trait ReplyingActor extends Actor { | |
def receiveAndReply[T]: PartialFunction[Replyable[T], Future[T]] | |
def receive = sendReply | |
implicit protected def anyToFuture[T](msg: ⇒ T): Future[T] = | |
try Future.successful(msg) catch { | |
case NonFatal(e) ⇒ Future.failed(e) | |
} | |
def sendReply: Receive = { | |
case msg: Replyable[_] if receiveAndReply.isDefinedAt(msg) ⇒ | |
try { | |
akka.pattern.pipe(receiveAndReply(msg))(context.dispatcher) pipeTo sender | |
} catch { | |
case NonFatal(e) ⇒ sender ! Status.Failure(e) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class GetUser(id: Int) extends Replyable[User] | |
case class User(name: String) | |
val f: Future[User] = actor ?? GetUser(123) | |
class MyActor extends ReplyingActor { | |
def receiveAndReply[T] = { | |
case GetUser(id) ⇒ | |
User("dima") // implicit auto convert any T to Future[T] | |
case DoWork(number) ⇒ | |
self ?? GetUser(123) map { user ⇒ | |
WorkResult("work result: " + user.name) | |
} | |
} | |
// optionaly | |
override def receive = sendReply orElse { | |
case "Some another message" ⇒ | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment