Created
April 6, 2019 00:01
-
-
Save yoeluk/6804c678a58b5f5bd241c1155bc9c9eb to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Model { | |
sealed trait CartCommand | |
final case class GetCart(replyTo: ActorRef[State]) extends CartCommand | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package shop.timba.ecom.patterns.replyto | |
import akka.actor.typed.{ActorRef, ActorSystem, Behavior} | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.cluster.sharding.typed.ShardingEnvelope | |
import akka.util.Timeout | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
import scala.util.{Failure, Random, Success} | |
object ReplyTo { | |
implicit class EntityReq[Cmd](val region: ActorRef[ShardingEnvelope[Cmd]]) extends AnyVal { | |
def ask[Reply]( | |
entityId: String, | |
cmdFactory: ActorRef[Reply] ⇒ Cmd | |
)( | |
implicit system: ActorSystem[_], | |
timeout: Timeout, | |
timeoutEvent: Reply, | |
ex: ExecutionContext | |
): Future[Reply] = { | |
val replyToId = Random.nextLong().toString | |
val promise = Promise[Reply]() | |
system | |
.systemActorOf(replyTo(promise, replyToId), "replyTo-" + replyToId) | |
.map { replyTo ⇒ | |
val cmd = cmdFactory(replyTo) | |
region ! ShardingEnvelope(entityId, cmd) | |
} | |
.flatMap(_ ⇒ promise.future) | |
} | |
} | |
private def replyTo[R]( | |
promise: Promise[R], | |
replyToId: String | |
)(implicit timeout: Timeout, timeoutEvent: R): Behavior[R] = | |
Behaviors.withTimers { timers => | |
timers.startSingleTimer(replyToId, timeoutEvent, timeout.duration) | |
Behaviors.receiveMessage { | |
case `timeoutEvent` ⇒ | |
promise.complete( | |
Failure( | |
new Throwable( | |
s"Ask did not complete in a timely manner. `replyTo` did not get a response with ${timeout.duration}" | |
) | |
) | |
) | |
Behaviors.stopped | |
case msg ⇒ | |
promise.complete(Success(msg)) | |
Behaviors.stopped | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Route { | |
val cartRoute: Route = | |
get { | |
path("get-cart") { | |
parameter('accountId) { accountId ⇒ | |
val futureResponse = cartEntityProxy.ask[State](accountId, GetCart) | |
respondWith(futureResponse.map(_.asJson)) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment