Skip to content

Instantly share code, notes, and snippets.

@steinybot
Last active February 27, 2021 18:51
Show Gist options
  • Save steinybot/cd9aeac534a45aead54c4cea089e0e40 to your computer and use it in GitHub Desktop.
Save steinybot/cd9aeac534a45aead54c4cea089e0e40 to your computer and use it in GitHub Desktop.
A custom graph stage that materializes a new sink for every element
import akka.NotUsed
import akka.stream.{Attributes, Inlet, SinkShape}
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.stage._
class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] {
val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in")
override val shape = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
override def preStart(): Unit = pull(in)
val awaitingElementHandler = new InHandler {
override def onPush(): Unit = {
val element = grab(in)
val innerSource = createInnerSource(element)
val innerSink = sink(element)
Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer)
}
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
}
setHandler(in, awaitingElementHandler)
def createInnerSource(element: T): SubSourceOutlet[T] = {
val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource")
innerSource.setHandler(new OutHandler {
override def onPull(): Unit = {
innerSource.push(element)
innerSource.complete()
if (isClosed(in)) {
completeStage()
} else {
pull(in)
setHandler(in, awaitingElementHandler)
}
}
override def onDownstreamFinish(): Unit = {
innerSource.complete()
if (isClosed(in)) {
completeStage()
}
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val illegalStateException = new IllegalStateException("Got a push that we weren't expecting")
innerSource.fail(illegalStateException)
failStage(illegalStateException)
}
override def onUpstreamFinish(): Unit = {
// We don't stop until the inner stream stops.
setKeepGoing(true)
}
override def onUpstreamFailure(ex: Throwable): Unit = {
innerSource.fail(ex)
failStage(ex)
}
})
innerSource
}
}
}
object OneToOneOnDemandSink {
def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink))
}
@steinybot
Copy link
Author

See https://stackoverflow.com/questions/45192072/use-dynamic-sink-destination-in-akka-streams for more motivation and other solutions to the problem.

@marccarre
Copy link

marccarre commented Mar 25, 2018

Many thanks for this example @steinybot! 👍
I'm new to Akka Stream, and still getting my head around the various concepts and the API 🤔.

Would you know if it is possible for OneToOneOnDemandSink to keep track of the materialised value of the inner sink (e.g. accumulate it?) or if there is a fundamental limitation I'm missing?

In my scenario, the inner sink is of type Sink[T, Future[IOResult]], and it would be convenient to terminate the program once all futures have completed and/or be able to do something in case of failures. However, GraphStage[SinkShape[T]] inherently has a NotUsed mat' value, and specialising OneToOneOnDemandSink to be GraphStageWithMaterializedValue[SinkShape[T], Future[IOResult]] isn't trivial -- at least to me.

After quite a bit of wrestling, the below alternative also seems to do the job, though is a lot more specialised than your solution:

  def dispatch[T](
                   dispatcher: T => Path,
                   serializer: T => ByteString
                 )(
                   implicit materializer: Materializer
                 ): Sink[T, Future[Seq[Future[IOResult]]]] =
    Sink.fromGraph(
      GraphDSL.create(
        Sink.seq[Future[IOResult]]
      ) {
        implicit builder =>
          sink =>
            // prepare this sink's graph elements:
            val broadcast = builder.add(Broadcast[T](2))
            val serialize = builder.add(Flow[T].map(serializer))
            val dispatch = builder.add(Flow[T].map(dispatcher))
            val zipAndWrite = builder.add(ZipWith[ByteString, Path, Future[IOResult]](
              (bytes, path) => Source.single(bytes).runWith(FileIO.toPath(path)))
            )

            // connect the graph:
            import GraphDSL.Implicits._
            broadcast.out(0) ~> serialize ~> zipAndWrite.in0
            broadcast.out(1) ~> dispatch ~> zipAndWrite.in1
            zipAndWrite.out ~> sink

            // expose ports:
            SinkShape(broadcast.in)
      }
    )

The Future[Seq[Future[IOResult]]] isn't pretty, but I guess can be flattened later on by the caller.
Comparison, comments & feedback welcome, as I'm still not sure about the pros & cons of both solutions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment