Skip to content

Instantly share code, notes, and snippets.

@l15k4
Last active January 16, 2016 20:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save l15k4/6d01261b5e579a02f4fd to your computer and use it in GitHub Desktop.
Save l15k4/6d01261b5e579a02f4fd to your computer and use it in GitHub Desktop.
import akka.stream.scaladsl.FlexiMerge.{ReadAny, MergeLogic}
import akka.stream.{Attributes, FanInShape2}
import akka.stream.scaladsl.FlexiMerge
class BypassingMerge[A, B] extends FlexiMerge[A, FanInShape2[A, B, A]](new FanInShape2("BMerge"), Attributes.name("BMerge")) {
def createMergeLogic(p: PortT): MergeLogic[A] = new MergeLogic[A] {
override def initialState =
State[Any](ReadAny(p)) { (ctx, input, element) =>
if (input eq p.in0)
ctx.emit(element.asInstanceOf[A])
SameState
}
}
}
object BypassingMerge {
def apply[A,B]() = new BypassingMerge[A,B]
}
@l15k4
Copy link
Author

l15k4 commented Jan 16, 2016

@rkuhn it has generic parameters A and B ... ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment