Skip to content

Instantly share code, notes, and snippets.

Created April 18, 2017 13:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/185be687c90b3f2c2ea62fc93b279e6f to your computer and use it in GitHub Desktop.
Save anonymous/185be687c90b3f2c2ea62fc93b279e6f to your computer and use it in GitHub Desktop.
the description for this gist
final case class Filter[T](p: T ⇒ Boolean) extends SimpleLinearGraphStage[T] {
override def initialAttributes: Attributes = DefaultAttributes.filter
override def toString: String = "Filter"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler with InHandler {
def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
override def onPush(): Unit = {
try {
val elem = grab(in)
if (p(elem)) {
push(out, elem)
} else {
pull(in)
}
} catch {
case NonFatal(ex) ⇒ decider(ex) match {
case Supervision.Stop ⇒ failStage(ex)
case _ ⇒ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment