Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save fedesilva/02eee7b69796dfe1f3f9a2e1df82e318 to your computer and use it in GitHub Desktop.
Save fedesilva/02eee7b69796dfe1f3f9a2e1df82e318 to your computer and use it in GitHub Desktop.
Sample of using partition to split up incoming elements over multiple outgoing streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.io.StdIn
import scala.util.Random
object SimplePartitionSample extends App {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
case class Apple(bad: Boolean)
val badApples = Sink.foreach[Apple](apple => println("bad apple"))
val goodApples = Sink.foreach[Apple](apple => println("good apple"))
val apples = Source(Vector.fill(10){Apple(Random.nextBoolean()) })
RunnableGraph.fromGraph(GraphDSL.create(){ implicit b =>
import GraphDSL.Implicits._
// The lambda is "apple => port number"
val partition = b.add(Partition[Apple](2, apple => if (apple.bad) 1 else 0))
apples ~> partition.in
partition.out(0) ~> goodApples
partition.out(1) ~> badApples
ClosedShape
}).run()
println("ENTER to terminate")
StdIn.readLine()
system.terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment