public
Last active — forked from remeniuk/Aggregator.scala

Scatter-Gather with Akka Dataflow

  • Download Gist
Aggregator.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
class Aggregator(recipients: Iterable[ActorRef]) extends Actor{
def receive = {
case msg @ Message(text) =>
println("Started processing message `%s`" format(text))
val result = Promise[String]()
val promises = List.fill(recipients.size)(Promise[String]())
recipients.zip(promises).map{case (recipient, promise) =>
(recipient !!! msg).map{result: String =>
println("Binding recipient's response: %s" format(result))
flow{
promise << result
}
}
}
flow{
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] =
promises match {
case head :: tail => gather(tail, head() + result)
case Nil => result
}
println("Binding result...")
result << gather(promises)
}
self.reply(result)
}
 
}
AggregatorSnippet
1
class Aggregator(recipients: Iterable[ActorRef]) extends Actor
AggregatorSnippet_2.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12
case msg @ Message(text) =>
 
val promises = List.fill(recipients.size)(Promise[String]())
recipients.zip(promises).map{case (recipient, promise) =>
(recipient !!! msg).map{result: String =>
println("Binding recipient's response: %s" format(result))
flow{
promise << result
}
}
}
AggregatorSnippet_3.scala
Scala
1 2 3 4 5 6 7 8 9 10
flow{
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] =
promises match {
case head :: tail => gather(tail, head() + result)
case Nil => result
}
println("Binding result...")
result << gather(promises)
}
Recipient.scala
Scala
1 2 3 4 5 6 7 8 9
class Recipient(id: Int) extends Actor {
def receive = {
case Message(msg) =>
Thread.sleep(1000)
self.reply("%s, [%s]! ".format(msg, id))
}
}
Test.scala
Scala
1 2 3 4 5 6 7 8 9 10
val recipients = (1 to 5).map(i => actorOf(new Recipient(i)).start)
val aggregator = actorOf(new Aggregator(recipients)).start
val results1 = aggregator !! Message("Hello")
val results2 = aggregator !! Message("world")
results1.map{ res =>
println("Result: %s" format(res.asInstanceOf[Future[String]].get))
}
results2.map{ res =>
println("Result: %s" format(res.asInstanceOf[Future[String]].get))
}
TestResults.sh
Shell
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
Started processing message `Hello`
Binding result...
Started processing message `world`
Binding result...
Binding recipient response: Hello, [3]!
Binding recipient response: Hello, [1]!
Binding recipient response: Hello, [4]!
Binding recipient response: Hello, [2]!
Binding recipient response: Hello, [5]!
Result: Hello, [5]! Hello, [4]! Hello, [3]! Hello, [2]! Hello, [1]!
Binding recipient response: world, [4]!
Binding recipient response: world, [3]!
Binding recipient response: world, [5]!
Binding recipient response: world, [2]!
Binding recipient response: world, [1]!
Result: world, [5]! world, [4]! world, [3]! world, [2]! world, [1]!

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.