Skip to content

Instantly share code, notes, and snippets.

@grimrose
Last active August 21, 2017 15:34
Show Gist options
  • Save grimrose/9a1252bee80c90b6e44abad2b01106e7 to your computer and use it in GitHub Desktop.
Save grimrose/9a1252bee80c90b6e44abad2b01106e7 to your computer and use it in GitHub Desktop.
akka streams sample
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
implicit val sys: ActorSystem = ActorSystem("akka-streams-sample")
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val dispatcher: ExecutionContext = sys.dispatcher
val source = Source.single("hoge")
val flow = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val broadcast = b.add(Broadcast[String](2))
val f1: FlowShape[String, Int] = b.add(Flow[String]
.flatMapConcat { s =>
Source(1 to 15)
}
.mapAsyncUnordered(4) { i =>
Future {
println(s"f -> $i")
i % 3
}
}
)
val f2 = b.add(Flow[String].flatMapConcat { s =>
Source.repeat(s)
}.async)
val zipper = b.add(ZipWith[Int, String, String] { (k, s2) =>
s"z -> $k, $s2"
})
broadcast.out(0) ~> f1 ~> zipper.in0
broadcast.out(1) ~> f2 ~> zipper.in1
FlowShape.of(broadcast.in, zipper.out)
})
val sink = Sink.foreach(println)
val future = source.via(flow).runWith(sink)
Await.ready(future, Duration.Inf)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment