Skip to content

Instantly share code, notes, and snippets.

@ecyshor
Created March 31, 2020 18:08
Show Gist options
  • Save ecyshor/322ec57515d1367df25b6ca7deee9574 to your computer and use it in GitHub Desktop.
Save ecyshor/322ec57515d1367df25b6ca7deee9574 to your computer and use it in GitHub Desktop.
Akka streams partition either
implicit class EitherSourceExtension[L, R, Mat](source: Source[Either[L, R], Mat]) {
def partition(left: Sink[L, NotUsed], right: Sink[R, NotUsed]): Graph[ClosedShape, NotUsed] = {
GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[Either[L, R]](2, element => if (element.isLeft) 0 else 1))
source ~> partition.in
partition.out(0).collect {
case Left(value) => value
} ~> left
partition.out(1).collect {
case Right(value) => value
} ~> right
ClosedShape
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment