Skip to content

Instantly share code, notes, and snippets.

/post.scala

Created Apr 18, 2017
Embed
What would you like to do?
the description for this gist
class MyFilterGraphStage[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
// inlets/outlets names ("MyFilter.in" and "MyFilter.out" in this case)
// serves mostly for internal diagnostic messages in case of failures
val input = Inlet[T]("MyFilter.in")
val output = Outlet[T]("MyFilter.out")
override def shape: FlowShape[T, T] = FlowShape(input, output)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(input, new InHandler {
override def onPush(): Unit = {
val element = grab(input)
if(predicate(element)) {
push(output, element)
} else {
pull(input)
}
}
})
setHandler(output, new OutHandler {
override def onPull() = pull(input)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment