Skip to content

Instantly share code, notes, and snippets.

@hochgi
Created July 7, 2021 12:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hochgi/cc354f9b80ca427a4f4d7313c78e4350 to your computer and use it in GitHub Desktop.
Save hochgi/cc354f9b80ca427a4f4d7313c78e4350 to your computer and use it in GitHub Desktop.
akka-streams custom stream inspector
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import com.typesafe.scalalogging.Logger
object StreamEventInspector {
def default[T](logger: Logger, context: String, printElem: T => String): StreamEventInspector[T] = {
val ctx = "[" + context + "] "
new StreamEventInspector[T](
() => logger.info(ctx + "upstream completed"),
ex => logger.error(ctx + "upstream failure", ex),
ex => logger.error(ctx + "downstream completed", ex),
el => logger.debug(ctx + printElem(el)),
() => logger.debug(ctx + "downstream pulled")
)
}
}
class StreamEventInspector[Elem](onUpstreamFinishInspection: () => Unit = () => {},
onUpstreamFailureInspection: Throwable => Unit = _ => {},
onDownstreamFinishInspection: Throwable => Unit = _ => {},
onPushInspection: Elem => Unit = (_: Elem) => {},
onPullInspection: () => Unit = () => {}
) extends GraphStage[FlowShape[Elem, Elem]] {
private val in = Inlet[Elem]("StreamEventInspector.in")
private val out = Outlet[Elem]("StreamEventInspector.out")
override val shape: FlowShape[Elem, Elem] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
onPushInspection(elem)
push(out, elem)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
onUpstreamFailureInspection(ex)
super.onUpstreamFailure(ex)
}
override def onUpstreamFinish(): Unit = {
onUpstreamFinishInspection()
super.onUpstreamFinish()
}
}
)
setHandler(
out,
new OutHandler {
override def onPull(): Unit = {
onPullInspection()
pull(in)
}
override def onDownstreamFinish(cause: Throwable): Unit = {
onDownstreamFinishInspection(cause)
super.onDownstreamFinish(cause: Throwable)
}
}
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment