Skip to content

Instantly share code, notes, and snippets.

@filosganga
Last active March 23, 2017 10:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save filosganga/0c5a3c6decf3cce26609d31dec0849cd to your computer and use it in GitHub Desktop.
Save filosganga/0c5a3c6decf3cce26609d31dec0849cd to your computer and use it in GitHub Desktop.
A map stage that control a resource lifecycle.
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.util.control.NonFatal
class ResourceMapStage[In, Out, R](opener: () => R, closer: R => Unit, f: R => In ⇒ Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("ResourceMap.in")
val out = Outlet[Out]("ResourceMap.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = Attributes.name("resourceMap")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private val r = opener()
override def postStop(): Unit = {
closer(r)
super.postStop()
}
override def onPush(): Unit = {
try {
push(out, f(r)(grab(in)))
} catch {
case NonFatal(ex) ⇒ decider(ex) match {
case Supervision.Stop ⇒ failStage(ex)
case _ ⇒ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment