Skip to content

Instantly share code, notes, and snippets.

@jchapuis
Created February 5, 2019 21:11
Show Gist options
  • Save jchapuis/c81c15adfacbe178ebb1b09e33b3aa67 to your computer and use it in GitHub Desktop.
Save jchapuis/c81c15adfacbe178ebb1b09e33b3aa67 to your computer and use it in GitHub Desktop.
Akka-typed receptionist helpers
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorSource
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.TimeoutException
object Aggregator extends StrictLogging {
def askAll[C, R](
command: ActorRef[R] => C,
actors: List[ActorRef[C]]
)(implicit timeout: Timeout): Source[Set[R], NotUsed] =
Source
.apply(actors)
.flatMapMerge(
actors.size max 1, { actorRef =>
askActor[C, R](command, actorRef)
}
)
.collect { case Some(response) => response }
.fold(Set[R]())((set, response) => set + response)
private def askActor[C, R](command: ActorRef[R] => C, actorRef: ActorRef[C])(
implicit timeout: Timeout
) =
ActorSource
.actorRef[R](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = 1,
overflowStrategy = OverflowStrategy.fail
)
.take(1)
.completionTimeout(timeout.duration)
.map(response => Option(response))
.recover {
case _: TimeoutException =>
logger.warn(s"did not receive answer for command $command in time, ignoring")
None
}
.mapMaterializedValue(sourceRef => actorRef ! command(sourceRef))
}
import java.util.UUID
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.stream.typed.scaladsl.ActorMaterializer
import akka.stream.typed.scaladsl.ActorSink
import akka.util.Timeout
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike}
import scala.concurrent.duration._
class AggregatorSpec extends WordSpecLike with Matchers with BeforeAndAfterAll with GivenWhenThen {
val testKit = ActorTestKit()
implicit val system = testKit.system
implicit val materializer = ActorMaterializer()
override def afterAll: Unit =
testKit.shutdownTestKit()
"Aggregator" should {
"gather all actor responses" in new Fixture {
implicit def timeout: Timeout = Timeout(1 second)
Given("two actors")
val actorA = testKit.spawn(replyActor(actorAUUID))
val actorB = testKit.spawn(replyActor(actorBUUID))
When("sending a command to all children")
Aggregator.askAll(TestCommand, List(actorA, actorB)).map(Replies).runWith(testSink)
Then("responses are aggregated")
probe.expectMessage(Replies(Set(TestReply(actorAUUID), TestReply(actorBUUID))))
}
"ignore timed-out responses" in new Fixture {
Given("a limited timeout")
implicit def timeout: Timeout = Timeout(50 milli)
And("two actors, one which doesn't reply")
val actorA = testKit.spawn(replyActor(actorAUUID))
val actorB = testKit.spawn(noReplyActor)
When("sending a command to all children")
Aggregator.askAll(TestCommand, List(actorA, actorB)).map(Replies).runWith(testSink)
Then("only available responses are aggregated after timeout")
probe.expectMessage(Replies(Set(TestReply(actorAUUID))))
}
}
trait Fixture {
implicit def timeout: Timeout
case class TestReply(childID: UUID)
sealed trait Reply
case class Replies(replies: Set[TestReply]) extends Reply
case object Complete extends Reply
case object Fail extends Reply
case class TestCommand(replyTo: ActorRef[TestReply])
def replyActor(uuid: UUID): Behavior[TestCommand] = Behaviors.receiveMessage { command =>
command.replyTo ! TestReply(uuid)
Behaviors.same
}
def noReplyActor: Behavior[TestCommand] = Behaviors.receiveMessage(_ => Behaviors.same)
val probe = testKit.createTestProbe[Reply]()
val testSink = ActorSink.actorRef(
probe.ref,
Complete,
_ => Fail
)
val actorAUUID = UUID.randomUUID()
val actorBUUID = UUID.randomUUID()
}
}
import akka.Done
import akka.actor.typed.ActorRef
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.Future
object Broadcaster extends StrictLogging {
def tellAll[T](message: T, actors: Set[ActorRef[T]])(implicit materializer: Materializer): Future[Done] =
Source.apply(actors).runForeach(actor => actor.tell(message))
}
import akka.NotUsed
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.persistence.typed.ExpectingReply
import akka.stream.scaladsl.Source
import akka.util.Timeout
object ServiceKeyAggregator {
import ServiceKeyHelpers._
def askAllRegisteredServices[C <: ExpectingReply[R], R](
serviceKey: ServiceKey[C],
command: ActorRef[R] => C
)(implicit timeout: Timeout, system: ActorSystem[_]): Source[Set[R], NotUsed] =
serviceKey.findCurrentListeners
.flatMapConcat(services => Aggregator.askAll[C, R](command, services.toList))
}
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.ExpectingReply
import akka.stream.scaladsl.Sink
import akka.stream.typed.scaladsl.ActorMaterializer
import akka.util.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike}
import scala.concurrent.duration._
class ServiceKeyAggregatorSpec
extends WordSpecLike
with Matchers
with BeforeAndAfterAll
with GivenWhenThen
with ScalaFutures {
val testKit = ActorTestKit()
implicit val system = testKit.system
implicit val executionContext = system.executionContext
implicit val scheduler = system.scheduler
implicit val materializer = ActorMaterializer()
implicit val timeout: Timeout = Timeout(1 second)
override def afterAll: Unit =
testKit.shutdownTestKit()
"ServiceRepliesAggregator" should {
"gather all registered services responses" in new Fixture {
Given("two actors registering to the receptionist, and an aggregator")
testKit.spawn(pingService("a"))
testKit.spawn(pingService("b"))
When("sending a command to all actors subscribed to this service key")
val response = ServiceKeyAggregator.askAllRegisteredServices(PingServiceKey, Ping).runWith(Sink.seq)
Then("responses are aggregated")
response.futureValue.size shouldBe 1
response.futureValue.head should contain allOf (Pong("a"), Pong("b"))
}
}
trait Fixture {
val PingServiceKey = ServiceKey[Ping]("pingService")
case class Ping(replyTo: ActorRef[Pong]) extends ExpectingReply[Pong]
case class Pong(id: String)
def pingService(id: String): Behavior[Ping] =
Behaviors.setup { ctx =>
ctx.system.receptionist ! Receptionist.Register(PingServiceKey, ctx.self)
Behaviors.receive[Ping] { (_, msg) =>
msg match {
case Ping(replyTo) =>
replyTo ! Pong(id)
Behaviors.stopped
}
}
}
}
}
import akka.actor.typed.receptionist.ServiceKey
import ServiceKeyHelpers._
import akka.Done
import akka.actor.typed.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
object ServiceKeyBroadcaster {
def tellAllRegisteredServices[T](serviceKey: ServiceKey[T], message: T)(implicit actorSystem: ActorSystem[_],
materializer: Materializer): Future[Done] =
serviceKey.findCurrentListeners
.map(listeners => Broadcaster.tellAll(message, listeners))
.runWith(Sink.head[Future[Done]])
.flatten
}
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.stream.typed.scaladsl.ActorMaterializer
import akka.util.Timeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen, Matchers, WordSpecLike}
import scala.concurrent.duration._
class ServiceKeyBroadcasterSpec
extends WordSpecLike
with Matchers
with BeforeAndAfterAll
with GivenWhenThen
with ScalaFutures {
val testKit = ActorTestKit()
implicit val system = testKit.system
implicit val executionContext = system.executionContext
implicit val scheduler = system.scheduler
implicit val materializer = ActorMaterializer()
implicit val timeout: Timeout = Timeout(1 second)
override def afterAll: Unit =
testKit.shutdownTestKit()
"ServiceKeyBroadcaster" should {
"tell all registered services" in new Fixture {
Given("probe actors registered to a service key")
val probe1 = createAndRegisterProbe()
val probe2 = createAndRegisterProbe()
When("telling the service key a message")
ServiceKeyBroadcaster.tellAllRegisteredServices(pingServiceKey, Ping)
Then("probe actors should receive the message")
probe1.expectMessage(Ping)
probe2.expectMessage(Ping)
}
}
trait Fixture {
val pingServiceKey = ServiceKey[Ping.type]("pingService")
case object Ping
def createAndRegisterProbe() = {
val probe = testKit.createTestProbe[Ping.type]
testKit.system.receptionist ! Receptionist.Register(pingServiceKey, probe.ref)
probe
}
}
}
import akka.NotUsed
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorSource
object ServiceKeyHelpers {
implicit class RichServiceKey[T](serviceKey: ServiceKey[T])(implicit system: ActorSystem[_]) {
def findCurrentListeners: Source[Set[ActorRef[T]], NotUsed] =
ActorSource
.actorRef[Receptionist.Listing](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
bufferSize = 1,
OverflowStrategy.fail
)
.mapMaterializedValue(
listingReceptor => {
system.receptionist ! Receptionist.Find(serviceKey, listingReceptor)
NotUsed
}
)
.map { case serviceKey.Listing(listings) => listings }
.take(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment