Skip to content

Instantly share code, notes, and snippets.

@cokeSchlumpf
Created March 3, 2020 10:48
Show Gist options
  • Save cokeSchlumpf/78e366afe9973504daf16e792c0ac199 to your computer and use it in GitHub Desktop.
Save cokeSchlumpf/78e366afe9973504daf16e792c0ac199 to your computer and use it in GitHub Desktop.
Executing two dependent streams with Akka Streams
package ch.suva.mlp.app
import akka.actor.ActorSystem
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{Concat, Flow, Keep, Sink, Source}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
object TestApp extends App {
implicit val system: ActorSystem = ActorSystem("test")
implicit val ec: ExecutionContext = system.dispatcher
val slowSink = Flow[String]
.buffer(30, OverflowStrategy.backpressure)
.map(s => {
Thread.sleep(2000)
s
})
.toMat(Sink.foreach(s => println(s"Slow Sink: $s")))(Keep.right)
val slowSink2 = Flow[String]
.buffer(30, OverflowStrategy.backpressure)
.mapAsync(5)(s => {
Future {
Thread.sleep(4000)
s
}
})
.toMat(Sink.foreach(s => println(s"Slow Sink 2: $s")))(Keep.right)
val fastSink = Flow[String].to(Sink.foreach(s => println(s"Fast Sink: $s")))
val first = Source(1 to 20)
.map(_.toString)
.alsoToMat(slowSink)(Keep.right)
.buffer(10, OverflowStrategy.backpressure)
.map(_ => 1)
.preMaterialize()
val second = Source(100 to 120)
.map(_.toString)
.map(s => {
println(s"Second Stream $s")
s
})
.map(_ => 2)
Source
.combine(first._2, Source.lazyFutureSource(() => first._1.map(_ => second)))(Concat(_))
.runWith(Sink.foreach(println))
.onComplete({
case Success(value) => println("Done")
case Failure(exception) => exception.printStackTrace()
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment