Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Created June 29, 2018 14:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davideicardi/2a6e2a93507a731026160c2d383528ab to your computer and use it in GitHub Desktop.
Save davideicardi/2a6e2a93507a731026160c2d383528ab to your computer and use it in GitHub Desktop.
Conditional Akka Stream Flow
import akka._
import akka.stream._
import akka.stream.scaladsl._
import akka.actor._
// WARNING: Ordering is not guaranteed.
// The problem with this Graph is that when splitting the flow
// we obtain 2 concurrent flows.
// The result of conditionalFlow can returns after another messages is processed in the brodcast.
// ie. If you have 10 messages from 0 to 9. Let's say that we run a conditional flow only for element 0.
// It is possible that in output we will get 1,2,3,4,5,6,7,8,9,0 (0 will be returned at the end due to the additional flow execution).
// See also: http://grokbase.com/t/gg/akka-user/15bs0vtyt3/akka-stream-will-broadcast-merge-keep-the-message-order
// fiddle: https://scalafiddle.io/sf/3yInf1c/6
object ConditionalFlow {
def apply[M](
condition: M => Boolean,
conditionalFlow: Flow[M, M, NotUsed]): Graph[FlowShape[M, M], NotUsed] = {
Flow.fromGraph(GraphDSL.create() {
implicit builder => {
import GraphDSL.Implicits._
val bcast = builder.add(Broadcast[M](2))
val merge = builder.add(Merge[M](2))
val filter0 = Flow[M].filter(condition)
val filter1 = Flow[M].filter(m => !condition(m))
bcast.out(0) ~> filter0 ~> conditionalFlow ~> merge.in(0)
bcast.out(1) ~> filter1 ~> merge.in(1)
FlowShape(bcast.in, merge.out)
}
})
}
}
implicit val system = ActorSystem("Sys")
implicit val materializer = ActorMaterializer()
def factorial(n: Int): Int = n match {
case 0 => 1
case _ => n * factorial(n-1)
}
Source(List("a", "b", "c"))
.via(ConditionalFlow[String](
m => m == "b",
Flow[String].map(m => m + factorial(5).toString)
)).runWith(Sink.foreach(println))
@veysiertekin
Copy link

Thank you! We are using it with a few improvements which do not filter same flow twice:

object ConditionalFlow {
  def apply[M](condition: M => Boolean,
               conditionalFlow: Flow[M, M, NotUsed]): Graph[FlowShape[M, M], NotUsed] = {
    Flow.fromGraph(GraphDSL.create() {
      implicit builder => {
        import GraphDSL.Implicits._

        val partition = builder.add(Partition[M](2, {
          case x if condition(x) => 0
          case _ => 1
        }))
        val merge = builder.add(Merge[M](2))

        partition ~> conditionalFlow ~> merge.in(0)
        partition ~> merge.in(1)

        FlowShape(partition.in, merge.out)
      }
    })
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment