Created
February 5, 2019 21:11
-
-
Save jchapuis/c81c15adfacbe178ebb1b09e33b3aa67 to your computer and use it in GitHub Desktop.
Akka-typed receptionist helpers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.typed.ActorRef | |
import akka.stream.OverflowStrategy | |
import akka.stream.scaladsl.Source | |
import akka.stream.typed.scaladsl.ActorSource | |
import akka.util.Timeout | |
import com.typesafe.scalalogging.StrictLogging | |
import scala.concurrent.TimeoutException | |
object Aggregator extends StrictLogging { | |
def askAll[C, R]( | |
command: ActorRef[R] => C, | |
actors: List[ActorRef[C]] | |
)(implicit timeout: Timeout): Source[Set[R], NotUsed] = | |
Source | |
.apply(actors) | |
.flatMapMerge( | |
actors.size max 1, { actorRef => | |
askActor[C, R](command, actorRef) | |
} | |
) | |
.collect { case Some(response) => response } | |
.fold(Set[R]())((set, response) => set + response) | |
private def askActor[C, R](command: ActorRef[R] => C, actorRef: ActorRef[C])( | |
implicit timeout: Timeout | |
) = | |
ActorSource | |
.actorRef[R]( | |
completionMatcher = PartialFunction.empty, | |
failureMatcher = PartialFunction.empty, | |
bufferSize = 1, | |
overflowStrategy = OverflowStrategy.fail | |
) | |
.take(1) | |
.completionTimeout(timeout.duration) | |
.map(response => Option(response)) | |
.recover { | |
case _: TimeoutException => | |
logger.warn(s"did not receive answer for command $command in time, ignoring") | |
None | |
} | |
.mapMaterializedValue(sourceRef => actorRef ! command(sourceRef)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.UUID | |
import akka.actor.testkit.typed.scaladsl.ActorTestKit | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.{ActorRef, Behavior} | |
import akka.stream.typed.scaladsl.ActorMaterializer | |
import akka.stream.typed.scaladsl.ActorSink | |
import akka.util.Timeout | |
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike} | |
import scala.concurrent.duration._ | |
class AggregatorSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with GivenWhenThen { | |
val testKit = ActorTestKit() | |
implicit val system = testKit.system | |
implicit val materializer = ActorMaterializer() | |
override def afterAll: Unit = | |
testKit.shutdownTestKit() | |
"Aggregator" should { | |
"gather all actor responses" in new Fixture { | |
implicit def timeout: Timeout = Timeout(1 second) | |
Given("two actors") | |
val actorA = testKit.spawn(replyActor(actorAUUID)) | |
val actorB = testKit.spawn(replyActor(actorBUUID)) | |
When("sending a command to all children") | |
Aggregator.askAll(TestCommand, List(actorA, actorB)).map(Replies).runWith(testSink) | |
Then("responses are aggregated") | |
probe.expectMessage(Replies(Set(TestReply(actorAUUID), TestReply(actorBUUID)))) | |
} | |
"ignore timed-out responses" in new Fixture { | |
Given("a limited timeout") | |
implicit def timeout: Timeout = Timeout(50 milli) | |
And("two actors, one which doesn't reply") | |
val actorA = testKit.spawn(replyActor(actorAUUID)) | |
val actorB = testKit.spawn(noReplyActor) | |
When("sending a command to all children") | |
Aggregator.askAll(TestCommand, List(actorA, actorB)).map(Replies).runWith(testSink) | |
Then("only available responses are aggregated after timeout") | |
probe.expectMessage(Replies(Set(TestReply(actorAUUID)))) | |
} | |
} | |
trait Fixture { | |
implicit def timeout: Timeout | |
case class TestReply(childID: UUID) | |
sealed trait Reply | |
case class Replies(replies: Set[TestReply]) extends Reply | |
case object Complete extends Reply | |
case object Fail extends Reply | |
case class TestCommand(replyTo: ActorRef[TestReply]) | |
def replyActor(uuid: UUID): Behavior[TestCommand] = Behaviors.receiveMessage { command => | |
command.replyTo ! TestReply(uuid) | |
Behaviors.same | |
} | |
def noReplyActor: Behavior[TestCommand] = Behaviors.receiveMessage(_ => Behaviors.same) | |
val probe = testKit.createTestProbe[Reply]() | |
val testSink = ActorSink.actorRef( | |
probe.ref, | |
Complete, | |
_ => Fail | |
) | |
val actorAUUID = UUID.randomUUID() | |
val actorBUUID = UUID.randomUUID() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.Done | |
import akka.actor.typed.ActorRef | |
import akka.stream.Materializer | |
import akka.stream.scaladsl.Source | |
import com.typesafe.scalalogging.StrictLogging | |
import scala.concurrent.Future | |
object Broadcaster extends StrictLogging { | |
def tellAll[T](message: T, actors: Set[ActorRef[T]])(implicit materializer: Materializer): Future[Done] = | |
Source.apply(actors).runForeach(actor => actor.tell(message)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.typed.receptionist.ServiceKey | |
import akka.actor.typed.{ActorRef, ActorSystem} | |
import akka.persistence.typed.ExpectingReply | |
import akka.stream.scaladsl.Source | |
import akka.util.Timeout | |
object ServiceKeyAggregator { | |
import ServiceKeyHelpers._ | |
def askAllRegisteredServices[C <: ExpectingReply[R], R]( | |
serviceKey: ServiceKey[C], | |
command: ActorRef[R] => C | |
)(implicit timeout: Timeout, system: ActorSystem[_]): Source[Set[R], NotUsed] = | |
serviceKey.findCurrentListeners | |
.flatMapConcat(services => Aggregator.askAll[C, R](command, services.toList)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.testkit.typed.scaladsl.ActorTestKit | |
import akka.actor.typed.receptionist.{Receptionist, ServiceKey} | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.{ActorRef, Behavior} | |
import akka.persistence.typed.ExpectingReply | |
import akka.stream.scaladsl.Sink | |
import akka.stream.typed.scaladsl.ActorMaterializer | |
import akka.util.Timeout | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike} | |
import scala.concurrent.duration._ | |
class ServiceKeyAggregatorSpec | |
extends WordSpecLike | |
with Matchers | |
with BeforeAndAfterAll | |
with GivenWhenThen | |
with ScalaFutures { | |
val testKit = ActorTestKit() | |
implicit val system = testKit.system | |
implicit val executionContext = system.executionContext | |
implicit val scheduler = system.scheduler | |
implicit val materializer = ActorMaterializer() | |
implicit val timeout: Timeout = Timeout(1 second) | |
override def afterAll: Unit = | |
testKit.shutdownTestKit() | |
"ServiceRepliesAggregator" should { | |
"gather all registered services responses" in new Fixture { | |
Given("two actors registering to the receptionist, and an aggregator") | |
testKit.spawn(pingService("a")) | |
testKit.spawn(pingService("b")) | |
When("sending a command to all actors subscribed to this service key") | |
val response = ServiceKeyAggregator.askAllRegisteredServices(PingServiceKey, Ping).runWith(Sink.seq) | |
Then("responses are aggregated") | |
response.futureValue.size shouldBe 1 | |
response.futureValue.head should contain allOf (Pong("a"), Pong("b")) | |
} | |
} | |
trait Fixture { | |
val PingServiceKey = ServiceKey[Ping]("pingService") | |
case class Ping(replyTo: ActorRef[Pong]) extends ExpectingReply[Pong] | |
case class Pong(id: String) | |
def pingService(id: String): Behavior[Ping] = | |
Behaviors.setup { ctx => | |
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self) | |
Behaviors.receive[Ping] { (_, msg) => | |
msg match { | |
case Ping(replyTo) => | |
replyTo ! Pong(id) | |
Behaviors.stopped | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.typed.receptionist.ServiceKey | |
import ServiceKeyHelpers._ | |
import akka.Done | |
import akka.actor.typed.ActorSystem | |
import akka.stream.Materializer | |
import akka.stream.scaladsl.Sink | |
import scala.concurrent.Future | |
object ServiceKeyBroadcaster { | |
def tellAllRegisteredServices[T](serviceKey: ServiceKey[T], message: T)(implicit actorSystem: ActorSystem[_], | |
materializer: Materializer): Future[Done] = | |
serviceKey.findCurrentListeners | |
.map(listeners => Broadcaster.tellAll(message, listeners)) | |
.runWith(Sink.head[Future[Done]]) | |
.flatten | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.testkit.typed.scaladsl.ActorTestKit | |
import akka.actor.typed.receptionist.{Receptionist, ServiceKey} | |
import akka.stream.typed.scaladsl.ActorMaterializer | |
import akka.util.Timeout | |
import org.scalatest.concurrent.ScalaFutures | |
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike} | |
import scala.concurrent.duration._ | |
class ServiceKeyBroadcasterSpec | |
extends WordSpecLike | |
with Matchers | |
with BeforeAndAfterAll | |
with GivenWhenThen | |
with ScalaFutures { | |
val testKit = ActorTestKit() | |
implicit val system = testKit.system | |
implicit val executionContext = system.executionContext | |
implicit val scheduler = system.scheduler | |
implicit val materializer = ActorMaterializer() | |
implicit val timeout: Timeout = Timeout(1 second) | |
override def afterAll: Unit = | |
testKit.shutdownTestKit() | |
"ServiceKeyBroadcaster" should { | |
"tell all registered services" in new Fixture { | |
Given("probe actors registered to a service key") | |
val probe1 = createAndRegisterProbe() | |
val probe2 = createAndRegisterProbe() | |
When("telling the service key a message") | |
ServiceKeyBroadcaster.tellAllRegisteredServices(pingServiceKey, Ping) | |
Then("probe actors should receive the message") | |
probe1.expectMessage(Ping) | |
probe2.expectMessage(Ping) | |
} | |
} | |
trait Fixture { | |
val pingServiceKey = ServiceKey[Ping.type]("pingService") | |
case object Ping | |
def createAndRegisterProbe() = { | |
val probe = testKit.createTestProbe[Ping.type] | |
testKit.system.receptionist ! Receptionist.Register(pingServiceKey, probe.ref) | |
probe | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.typed.receptionist.{Receptionist, ServiceKey} | |
import akka.actor.typed.{ActorRef, ActorSystem} | |
import akka.stream.OverflowStrategy | |
import akka.stream.scaladsl.Source | |
import akka.stream.typed.scaladsl.ActorSource | |
object ServiceKeyHelpers { | |
implicit class RichServiceKey[T](serviceKey: ServiceKey[T])(implicit system: ActorSystem[_]) { | |
def findCurrentListeners: Source[Set[ActorRef[T]], NotUsed] = | |
ActorSource | |
.actorRef[Receptionist.Listing]( | |
completionMatcher = PartialFunction.empty, | |
failureMatcher = PartialFunction.empty, | |
bufferSize = 1, | |
OverflowStrategy.fail | |
) | |
.mapMaterializedValue( | |
listingReceptor => { | |
system.receptionist ! Receptionist.Find(serviceKey, listingReceptor) | |
NotUsed | |
} | |
) | |
.map { case serviceKey.Listing(listings) => listings } | |
.take(1) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment