Skip to content

Instantly share code, notes, and snippets.

@searler
Created July 5, 2015 20:21
Show Gist options
  • Save searler/f7b8fb96d6b3bda5b26e to your computer and use it in GitHub Desktop.
Save searler/f7b8fb96d6b3bda5b26e to your computer and use it in GitHub Desktop.
Illustrate fundamental structure on which Akka reactive stream switch could be implemented
package conditionals
import scala.collection.immutable.Seq
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.Future
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.FlowGraph
import akka.stream.scaladsl.Broadcast
import akka.stream.scaladsl.Merge
import akka.stream.scaladsl.Flow
import akka.stream.FlowShape
/**
* Prints 3 and 6
*/
object BroadcastFilterMerge extends App {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val g = FlowGraph.partial() { implicit builder: FlowGraph.Builder[Unit] =>
val broadcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1 = Flow[Int].filter { _ == 3 }
val f2 = Flow[Int].filter { _ == 6 }
builder.addEdge(broadcast.out(0), f1, merge.in(0))
builder.addEdge(broadcast.out(1), f2, merge.in(1))
FlowShape[Int, Int](broadcast.in, merge.out)
}
val in = Source(1 to 10)
val r = in
.via(g)
.runForeach(println)
r.onComplete {
case x @ _ => system.shutdown
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment