Skip to content

Instantly share code, notes, and snippets.

@ecyshor
Created May 1, 2020 20:33
Show Gist options
  • Save ecyshor/f32f9086a2a9969102ed0338d0df524a to your computer and use it in GitHub Desktop.
Save ecyshor/f32f9086a2a9969102ed0338d0df524a to your computer and use it in GitHub Desktop.
Akka streams either partition
implicit class EitherSourceExtension[L, R, Mat](source: Source[Either[L, R], Mat]) {
def partition[LMat, RMat, NewMat](left: Sink[L, LMat], right: Sink[R, RMat])(
combineMat: (Mat, LMat, RMat) => NewMat
): Graph[ClosedShape.type, NewMat] = {
GraphDSL.create(source, left, right)(combineMat)(
implicit builder =>
(source, left, right) => {
import GraphDSL.Implicits._
val p = builder.add(partitionGraph[L, R])
source ~> p.in
p.out0 ~> left.in
p.out1 ~> right.in
ClosedShape
}
)
}
}
def partitionGraph[L, R]: Graph[FanOutShape2[Either[L, R], L, R], NotUsed] = {
GraphDSL.create() { implicit builder =>
import akka.stream.scaladsl.GraphDSL.Implicits._
val partition = builder.add(Partition[Either[L, R]](2, element => if (element.isLeft) 0 else 1))
new FanOutShape2(
partition.in,
partition
.out(0)
.collect {
case Left(value) => value
}
.outlet,
partition
.out(1)
.collect {
case Right(value) => value
}
.outlet
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment