Skip to content

Instantly share code, notes, and snippets.

@johanandren
Created July 26, 2016 08:25
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save johanandren/c4e37ac655a53d256aaf301743a254dd to your computer and use it in GitHub Desktop.
Save johanandren/c4e37ac655a53d256aaf301743a254dd to your computer and use it in GitHub Desktop.
Sample of using partition to split up incoming elements over multiple outgoing streams
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.io.StdIn
import scala.util.Random
object SimplePartitionSample extends App {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()
case class Apple(bad: Boolean)
val badApples = Sink.foreach[Apple](apple => println("bad apple"))
val goodApples = Sink.foreach[Apple](apple => println("good apple"))
val apples = Source(Vector.fill(10){Apple(Random.nextBoolean()) })
RunnableGraph.fromGraph(GraphDSL.create(){ implicit b =>
import GraphDSL.Implicits._
// The lambda is "apple => port number"
val partition = b.add(Partition[Apple](2, apple => if (apple.bad) 1 else 0))
apples ~> partition.in
partition.out(0) ~> goodApples
partition.out(1) ~> badApples
ClosedShape
}).run()
println("ENTER to terminate")
StdIn.readLine()
system.terminate()
}
@narerkrit-dotography
Copy link

Good stuff. Just want to add some info for others.

With Partition, when you're using fail-able sinks, the graph will terminate only after all the sinks fail.
When only a single sink fails, the graph continues to process incoming elements, but does not to produce element to the failed sink. In some cases the SupervisionStrategy also doesn't detect the thrown error.

I've come to use Broadcast stage with eager termination with each output port filtered. It's not as logically correct or safe, but it works for my case.

I think it is also possible to handle the materialized Future[Done]s from all the sinks, but that seems like more trouble for me.

If anyone's got a better alternative, I'm all ears.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment