Skip to content

Instantly share code, notes, and snippets.

@yoeluk
Created April 6, 2019 00:01
Show Gist options
  • Save yoeluk/6804c678a58b5f5bd241c1155bc9c9eb to your computer and use it in GitHub Desktop.
Save yoeluk/6804c678a58b5f5bd241c1155bc9c9eb to your computer and use it in GitHub Desktop.
object Model {
sealed trait CartCommand
final case class GetCart(replyTo: ActorRef[State]) extends CartCommand
}
package shop.timba.ecom.patterns.replyto
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.util.Timeout
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Random, Success}
object ReplyTo {
implicit class EntityReq[Cmd](val region: ActorRef[ShardingEnvelope[Cmd]]) extends AnyVal {
def ask[Reply](
entityId: String,
cmdFactory: ActorRef[Reply] ⇒ Cmd
)(
implicit system: ActorSystem[_],
timeout: Timeout,
timeoutEvent: Reply,
ex: ExecutionContext
): Future[Reply] = {
val replyToId = Random.nextLong().toString
val promise = Promise[Reply]()
system
.systemActorOf(replyTo(promise, replyToId), "replyTo-" + replyToId)
.map { replyTo ⇒
val cmd = cmdFactory(replyTo)
region ! ShardingEnvelope(entityId, cmd)
}
.flatMap(_ ⇒ promise.future)
}
}
private def replyTo[R](
promise: Promise[R],
replyToId: String
)(implicit timeout: Timeout, timeoutEvent: R): Behavior[R] =
Behaviors.withTimers { timers =>
timers.startSingleTimer(replyToId, timeoutEvent, timeout.duration)
Behaviors.receiveMessage {
case `timeoutEvent` ⇒
promise.complete(
Failure(
new Throwable(
s"Ask did not complete in a timely manner. `replyTo` did not get a response with ${timeout.duration}"
)
)
)
Behaviors.stopped
case msg ⇒
promise.complete(Success(msg))
Behaviors.stopped
}
}
}
object Route {
val cartRoute: Route =
get {
path("get-cart") {
parameter('accountId) { accountId ⇒
val futureResponse = cartEntityProxy.ask[State](accountId, GetCart)
respondWith(futureResponse.map(_.asJson))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment