Skip to content

Instantly share code, notes, and snippets.

Created April 18, 2017 13:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/26267382ee7cf4e5e1e2b5f4a4cd917d to your computer and use it in GitHub Desktop.
Save anonymous/26267382ee7cf4e5e1e2b5f4a4cd917d to your computer and use it in GitHub Desktop.
the description for this gist
class MyFilterGraphStage[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
// inlets/outlets names ("MyFilter.in" and "MyFilter.out" in this case)
// serves mostly for internal diagnostic messages in case of failures
val input = Inlet[T]("MyFilter.in")
val output = Outlet[T]("MyFilter.out")
override def shape: FlowShape[T, T] = FlowShape(input, output)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(input, new InHandler {
override def onPush(): Unit = {
val element = grab(input)
if(predicate(element)) {
push(output, element)
} else {
pull(input)
}
}
})
setHandler(output, new OutHandler {
override def onPull() = pull(input)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment