Skip to content

Instantly share code, notes, and snippets.

@trautonen
Created October 24, 2015 07:23
Show Gist options
  • Save trautonen/a2e65f9893ef0b662f0b to your computer and use it in GitHub Desktop.
Save trautonen/a2e65f9893ef0b662f0b to your computer and use it in GitHub Desktop.
Trigger a 'detour' graph from Akka stream

Trigger a 'detour' graph from Akka stream

Given the idea of the flow:

      infinite
       stream
          |
          |
          v
 trigger --> runnable graph (ActorRef, ActorRef) --> ...
                                                      |
                                                      v
    join  | <-- onComplete                      <--
          |
          v

The first idea how to implement the missing block of handling the trigger and join stage in a stream was to use AsyncStage and actor ask pattern. Using https://github.com/akka/akka/blob/releasing-akka-stream-and-http-experimental-1.0/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala as a reference.

Not sure if this is the ideal way of implementing the stage though. It lacks type safety and introduced annoying type casting.

package flows
import akka.actor.ActorRef
import akka.pattern.ask
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.RunnableGraph
import akka.stream.stage._
import akka.util.Timeout
import flows.DetourStage.AwaitCompletion
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
object DetourStage {
case object AwaitCompletion
}
class DetourStage[In, Out](g: RunnableGraph[(ActorRef, ActorRef)], timeout: Timeout)
(implicit materializer: ActorMaterializer, ec: ExecutionContext) extends AsyncStage[In, Out, Try[Out]] {
private var inFlight: Option[Out] = None
override def onPush(elem: In, ctx: AsyncContext[Out, Try[Out]]): UpstreamDirective = {
val (source, sink) = g.run()
val future = ask(sink, AwaitCompletion)(timeout).map(_.asInstanceOf[Out])
val callback = ctx.getAsyncCallback()
future.onComplete(callback.invoke)
source ! elem
ctx.holdUpstream()
}
override def onPull(ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = inFlight match {
case Some(elem) =>
inFlight = None
push(elem, ctx)
case None =>
ctx.holdDownstream()
}
override def onAsyncInput(event: Try[Out], ctx: AsyncContext[Out, Try[Out]]): Directive = event match {
case Failure(ex) =>
ctx.fail(ex)
case Success(elem) if ctx.isHoldingDownstream =>
push(elem, ctx)
case Success(elem) =>
inFlight = Some(elem)
ctx.ignore()
}
override def onUpstreamFinish(ctx: AsyncContext[Out, Try[Out]]): TerminationDirective = {
if (ctx.isHoldingUpstream) ctx.absorbTermination()
else ctx.finish()
}
private def push(elem: Out, ctx: AsyncContext[Out, Try[Out]]): DownstreamDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(elem)
else ctx.pushAndPull(elem)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment