Skip to content

Instantly share code, notes, and snippets.

@stanislav-chetvertkov
Created April 10, 2018 14:16
Show Gist options
  • Save stanislav-chetvertkov/6098e581e2ee74669ba936c7726b4b9d to your computer and use it in GitHub Desktop.
Save stanislav-chetvertkov/6098e581e2ee74669ba936c7726b4b9d to your computer and use it in GitHub Desktop.
Streams how to's
def splitErrorsFlow[L, R, M, M1, M2](
routesSource: Source[Either[L, R], M],
leftSink: Sink[L, M1],
rightSink: Sink[R, M2]
)(implicit ec: ExecutionContext): RunnableGraph[(M, M1, M2)] =
RunnableGraph.fromGraph(GraphDSL.create(routesSource, leftSink, rightSink)((_, _, _)) { implicit b => (s, l, r) =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[Either[L, R]](2))
s ~> broadcast.in
broadcast.out(0).filter(_.isLeft).map(_.left.get) ~> l.in
broadcast.out(1).filter(_.isRight).map(_.right.get) ~> r.in
ClosedShape
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment