Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
name := "CheckFlowThreadID"
version := "0.0.1"
scalaVersion := "2.11.6"
val akkaVersion = "2.3.12"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0"
)
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object Main {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
def tid(): String = f"${Thread.currentThread().getId()}%02d"
def main(args: Array[String]) {
val g = FlowGraph.closed()({ implicit builder =>
import FlowGraph.Implicits._
val in = Source(1 to 6)
val out = Sink.foreach[Int](x => {
println(s"*[${tid()}] $x")
})
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1 = Flow[Int].map(x => {
println(s"1[${tid()}] $x")
x
})
val f2 = Flow[Int].map(x => {
println(s"2[${tid()}] $x")
x
})
val f3 = Flow[Int].map(x => {
println(s"3[${tid()}] $x")
x
})
val f4 = Flow[Int].map(x => {
println(s"4[${tid()}] $x")
x
})
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
})
g.run()
Console.in.readLine()
system.shutdown()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment