Skip to content

Instantly share code, notes, and snippets.

@rkrzewski
Last active January 12, 2017 10:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkrzewski/a0fc5d0b47d9a3e0b2c81435adef3fe7 to your computer and use it in GitHub Desktop.
Save rkrzewski/a0fc5d0b47d9a3e0b2c81435adef3fe7 to your computer and use it in GitHub Desktop.
Splitter variant that handles demand an termination of the outlets individually.
import akka.stream._
import akka.stream.stage._
/**
* Fan-out the stream of `scala.util.Either[L, R]` elements to `L` and `R` streams.
*
* '''Emits when''' emits when an element is available from the input and the chosen output has demand
*
* '''Backpressures when''' the currently chosen output back-pressures
*
* '''Completes when''' upstream completes and no output is pending
*
* '''Cancels when''' when both downstreams cancel
*/
class Splitter[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] {
val in: Inlet[Either[L, R]] = Inlet("in")
val left: Outlet[L] = Outlet("left")
val right: Outlet[R] = Outlet("right")
override val shape = new FanOutShape2(in, left, right)
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
var pendingElem: Any = null
var pendingOutlet: Outlet[_] = null
var downstreamRunning = 2
class SplitterInHandler extends InHandler {
def maybePull(): Unit = {
if (isAvailable(left) || isAvailable(right)) {
pull(in)
}
}
def handlePush[T](elem: T, outlet: Outlet[T]): Unit = {
if (!isClosed(outlet)) {
if (isAvailable(outlet)) {
push(outlet, elem)
maybePull()
} else {
pendingElem = elem
pendingOutlet = outlet
}
} else {
maybePull()
}
}
override def onPush: Unit = {
grab(in) match {
case Left(elem) => handlePush(elem, left)
case Right(elem) => handlePush(elem, right)
}
}
override def onUpstreamFinish(): Unit = {
if (pendingElem == null) {
completeStage()
}
}
}
class SplitterOutHandler[T](outlet: Outlet[T]) extends OutHandler {
override def onPull: Unit = {
if (pendingElem != null && pendingOutlet == outlet) {
push(outlet, pendingElem.asInstanceOf[T])
pendingElem = null
if (!isClosed(in)) {
if (!hasBeenPulled(in)) {
pull(in)
}
} else {
completeStage()
}
} else {
if (!hasBeenPulled(in)) {
pull(in)
}
}
}
override def onDownstreamFinish(): Unit = {
downstreamRunning -= 1
if (downstreamRunning == 0) {
completeStage()
} else if (pendingElem != null && pendingOutlet == outlet) {
pendingElem = null
if (!hasBeenPulled(in)) {
pull(in)
}
}
}
}
setHandler(in, new SplitterInHandler)
setHandler(left, new SplitterOutHandler(left))
setHandler(right, new SplitterOutHandler(right))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment