Skip to content

Instantly share code, notes, and snippets.

@klpx
Last active April 18, 2017 12:28
Show Gist options
  • Save klpx/e39bc4f3edbc98ab7a9df43cda042d3a to your computer and use it in GitHub Desktop.
Save klpx/e39bc4f3edbc98ab7a9df43cda042d3a to your computer and use it in GitHub Desktop.
Akka Streams. Graph for merge L and R flows into Either[L,R] flow
/**
Copy left Alexander Hasselbach
Usage:
val E = b.add(mergeEither[Throwable,Int])
val parsed = b.add(Flow[Either[Throwable,Int]])
val valids = b.add(Flow[Int])
val invalids = b.add(Flow[Throwable])
valids ~> E.right
invalids ~> E.left
E.out ~> parsed
*/
import akka.NotUsed
import akka.stream.Graph
import akka.stream.scaladsl.{Flow, GraphDSL, Merge}
def mergeEither[L,R]: Graph[EitherFanInShape[L, R, Either[L,R]], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val leftIn = b.add(Flow[L].map(Left.apply[L, R]))
val rightIn = b.add(Flow[R].map(Right.apply[L, R]))
val output = b.add(Merge[Either[L,R]](inputPorts = 2))
leftIn ~> output
rightIn ~> output
new EitherFanInShape(leftIn.in, rightIn.in, output.out)
}
///
import akka.stream.{FanInShape, Inlet, Outlet}
class EitherFanInShape[L, R, Out](_init: FanInShape.Init[Out]) extends FanInShape[Out](_init) {
def this(name: String) = this(FanInShape.Name[Out](name))
def this(left: Inlet[L], right: Inlet[R], out: Outlet[Out]) = this(FanInShape.Ports(out, left :: right :: Nil))
override protected def construct(init: FanInShape.Init[Out]): FanInShape[Out] = new EitherFanInShape(init)
override def deepCopy(): EitherFanInShape[L, R, Out] = super.deepCopy().asInstanceOf[EitherFanInShape[L, R, Out]]
val left: Inlet[L] = newInlet[L]("left")
val right: Inlet[R] = newInlet[R]("right")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment