Skip to content

Instantly share code, notes, and snippets.

@l15k4
Last active Jan 16, 2016
Embed
What would you like to do?
import akka.stream.scaladsl.FlexiMerge.{ReadAny, MergeLogic}
import akka.stream.{Attributes, FanInShape2}
import akka.stream.scaladsl.FlexiMerge
class BypassingMerge[A, B] extends FlexiMerge[A, FanInShape2[A, B, A]](new FanInShape2("BMerge"), Attributes.name("BMerge")) {
def createMergeLogic(p: PortT): MergeLogic[A] = new MergeLogic[A] {
override def initialState =
State[Any](ReadAny(p)) { (ctx, input, element) =>
if (input eq p.in0)
ctx.emit(element.asInstanceOf[A])
SameState
}
}
}
object BypassingMerge {
def apply[A,B]() = new BypassingMerge[A,B]
}
@l15k4
Copy link
Author

l15k4 commented Jan 16, 2016

2.0 correct version :

class BypassingMerge[A, B] extends GraphStage[FanInShape2[A, B, A]] {
  private val in1 = Inlet[A]("BMerge.in1")
  private val in2 = Inlet[B]("BMerge.in2")
  private val out = Outlet[A]("BMerge.out")

  override def initialAttributes = Attributes.name("BMerge")

  val shape = new FanInShape2(in1, in2, out)

  override def createLogic(effectiveAttributes: Attributes) = new GraphStageLogic(shape) {
    passAlong(in1, out, doFinish = true, doFail = true)
    setHandler(out, eagerTerminateOutput)

    setHandler(in2, new InHandler {
      override def onPush(): Unit = pull(in2)
      override def onUpstreamFinish(): Unit = ()
    })

    override def preStart(): Unit = {
      pull(in1)
      pull(in2)
    }
  }
}

@rkuhn
Copy link

rkuhn commented Jan 16, 2016

Yup, looks good. One more thing: you can make this an object, no need to allocate a new one for every use.

@l15k4
Copy link
Author

l15k4 commented Jan 16, 2016

@rkuhn it has generic parameters A and B ... ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment