Created
April 18, 2017 13:11
-
-
Save anonymous/26267382ee7cf4e5e1e2b5f4a4cd917d to your computer and use it in GitHub Desktop.
the description for this gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyFilterGraphStage[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] { | |
// inlets/outlets names ("MyFilter.in" and "MyFilter.out" in this case) | |
// serves mostly for internal diagnostic messages in case of failures | |
val input = Inlet[T]("MyFilter.in") | |
val output = Outlet[T]("MyFilter.out") | |
override def shape: FlowShape[T, T] = FlowShape(input, output) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { | |
setHandler(input, new InHandler { | |
override def onPush(): Unit = { | |
val element = grab(input) | |
if(predicate(element)) { | |
push(output, element) | |
} else { | |
pull(input) | |
} | |
} | |
}) | |
setHandler(output, new OutHandler { | |
override def onPull() = pull(input) | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment