Skip to content

Instantly share code, notes, and snippets.

@tjweir
Forked from remeniuk/Aggregator.scala
Created May 13, 2011 18:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tjweir/971098 to your computer and use it in GitHub Desktop.
Save tjweir/971098 to your computer and use it in GitHub Desktop.
Scatter-Gather with Akka Dataflow
class Aggregator(recipients: Iterable[ActorRef]) extends Actor{
def receive = {
case msg @ Message(text) =>
println("Started processing message `%s`" format(text))
val result = Promise[String]()
val promises = List.fill(recipients.size)(Promise[String]())
recipients.zip(promises).map{case (recipient, promise) =>
(recipient !!! msg).map{result: String =>
println("Binding recipient's response: %s" format(result))
flow{
promise << result
}
}
}
flow{
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] =
promises match {
case head :: tail => gather(tail, head() + result)
case Nil => result
}
println("Binding result...")
result << gather(promises)
}
self.reply(result)
}
}
class Aggregator(recipients: Iterable[ActorRef]) extends Actor
case msg @ Message(text) =>
val promises = List.fill(recipients.size)(Promise[String]())
recipients.zip(promises).map{case (recipient, promise) =>
(recipient !!! msg).map{result: String =>
println("Binding recipient's response: %s" format(result))
flow{
promise << result
}
}
}
flow{
def gather(promises: List[CompletableFuture[String]], result: String = ""): String @cps[Future[Any]] =
promises match {
case head :: tail => gather(tail, head() + result)
case Nil => result
}
println("Binding result...")
result << gather(promises)
}
class Recipient(id: Int) extends Actor {
def receive = {
case Message(msg) =>
Thread.sleep(1000)
self.reply("%s, [%s]! ".format(msg, id))
}
}
val recipients = (1 to 5).map(i => actorOf(new Recipient(i)).start)
val aggregator = actorOf(new Aggregator(recipients)).start
val results1 = aggregator !! Message("Hello")
val results2 = aggregator !! Message("world")
results1.map{ res =>
println("Result: %s" format(res.asInstanceOf[Future[String]].get))
}
results2.map{ res =>
println("Result: %s" format(res.asInstanceOf[Future[String]].get))
}
Started processing message `Hello`
Binding result...
Started processing message `world`
Binding result...
Binding recipient response: Hello, [3]!
Binding recipient response: Hello, [1]!
Binding recipient response: Hello, [4]!
Binding recipient response: Hello, [2]!
Binding recipient response: Hello, [5]!
Result: Hello, [5]! Hello, [4]! Hello, [3]! Hello, [2]! Hello, [1]!
Binding recipient response: world, [4]!
Binding recipient response: world, [3]!
Binding recipient response: world, [5]!
Binding recipient response: world, [2]!
Binding recipient response: world, [1]!
Result: world, [5]! world, [4]! world, [3]! world, [2]! world, [1]!
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment