Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A map stage that control a resource lifecycle.
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.util.control.NonFatal
class ResourceMapStage[In, Out, R](opener: () => R, closer: R => Unit, f: R => In Out) extends GraphStage[FlowShape[In, Out]] {
val in = Inlet[In]("ResourceMap.in")
val out = Outlet[Out]("ResourceMap.out")
override val shape = FlowShape(in, out)
override def initialAttributes: Attributes = Attributes.name("resourceMap")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private def decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
private val r = opener()
override def postStop(): Unit = {
closer(r)
super.postStop()
}
override def onPush(): Unit = {
try {
push(out, f(r)(grab(in)))
} catch {
case NonFatal(ex) decider(ex) match {
case Supervision.Stop failStage(ex)
case _ pull(in)
}
}
}
override def onPull(): Unit = pull(in)
setHandlers(in, out, this)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment