Skip to content

Instantly share code, notes, and snippets.

@rkrzewski
Created January 11, 2017 23:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkrzewski/f1d131405ddb8ce0d1a6fded55da8c23 to your computer and use it in GitHub Desktop.
Save rkrzewski/f1d131405ddb8ce0d1a6fded55da8c23 to your computer and use it in GitHub Desktop.
import akka.stream._
import akka.stream.stage._
/**
* A simple fan-out stage that will split incoming stream according to the results of a decider function.
* Outgoing streams may carry elements of different types.
*
* '''Emits when''' emits when an element is available from the input and both downstreams have demand
*
* '''Backpressures when''' any of the downstreams back-pressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' any of the downstreams cancel
*
*/
class Splitter[I, O0, O1](decide: I => Either[O0, O1]) extends GraphStage[FanOutShape2[I, O0, O1]] {
val in: Inlet[I] = Inlet("in")
val out0: Outlet[O0] = Outlet("out0")
val out1: Outlet[O1] = Outlet("out1")
override val shape = new FanOutShape2(in, out0, out1)
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
// number of available outlets
var ready: Int = 0
// handers for both outlets are identical
val outHandler = new OutHandler {
override def onPull(): Unit = {
ready += 1
if (ready == 2)
pull(in)
}
}
setHandler(out0, outHandler)
setHandler(out1, outHandler)
setHandler(in, new InHandler {
override def onPush(): Unit = {
decide(grab(in)) match {
case Left(o0) => push(out0, o0)
case Right(o1) => push(out1, o1)
}
ready -= 1
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment