Skip to content

Instantly share code, notes, and snippets.

@jamiely
Last active June 27, 2018 00:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamiely/2d2b0301db09b640ba99e1707b9969ed to your computer and use it in GitHub Desktop.
Save jamiely/2d2b0301db09b640ba99e1707b9969ed to your computer and use it in GitHub Desktop.
Akka Stream async boundary

I'm somewhat experienced with Akka Actors, but wanted to try Streams since it is the new hotness and compatible with the reactive streams initiative.

My use case is that I want to create a Source[GetActivityTaskResult] to interact with an AWS Step Functions State Machine. Basically, I want to poll for tasks that my workers can operate on.

So I have code like this:

val activities: Source[GetActivityTaskResult] = ???

RunnableGraph.fromGraph(
  GraphDSL.create(Sink.foreach(println)) { implicit builder => sink =>
  import GraphDSL.Implicits._
  
  activities ~>
    sink

And this worked just fine, printing each of the tasks; however, I added a Flow to do some work on the activities.

def doWork(activityTask: GetActivityTaskResult): Future[WorkResult] = ???

activities ~>
  Flow[GetActivityTaskResult].mapAsync(doWork) ~>
  sink

Things stopped printing to the console, so I knew that there was a problem with doing this anynchronous work. The solution was to add a hint that indicates asynchronous boundaries for the graph.

activities.async ~>
  Flow[GetActivityTaskResult].mapAsync(doWork).async ~>
  sink

Once I added the boundaries, results were printed from the Sink as expected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment