Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package io.scalac.streams.stage.basic
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
class MyFilterGraphStage[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
val input = Inlet[T]("MyFilterGraphStage.in")
val output = Outlet[T]("MyFilterGraphStage.out")
override def shape: FlowShape[T, T] = FlowShape(input, output)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(input, new InHandler {
override def onPush(): Unit = {
val element = grab(input)
if(predicate(element)) {
push(output, element)
} else {
pull(input)
}
}
})
setHandler(output, new OutHandler {
override def onPull() = pull(input)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment