Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
package io.scalac.streams.stage.basic
import{GraphStage, GraphStageLogic, InHandler, OutHandler}
import{Attributes, FlowShape, Inlet, Outlet}
class MyFilterGraphStage[T](predicate: T => Boolean) extends GraphStage[FlowShape[T, T]] {
val input = Inlet[T]("")
val output = Outlet[T]("MyFilterGraphStage.out")
override def shape: FlowShape[T, T] = FlowShape(input, output)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(input, new InHandler {
override def onPush(): Unit = {
val element = grab(input)
if(predicate(element)) {
push(output, element)
} else {
setHandler(output, new OutHandler {
override def onPull() = pull(input)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment