Skip to content

Instantly share code, notes, and snippets.

@nsitbon
Last active September 28, 2016 12:54
Show Gist options
  • Save nsitbon/21f2bdcb4d7ab5108588250ab300f5cb to your computer and use it in GitHub Desktop.
Save nsitbon/21f2bdcb4d7ab5108588250ab300f5cb to your computer and use it in GitHub Desktop.
doc akka streams
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.{Done, NotUsed}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object App {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("system")
import actorSystem.dispatcher
implicit val materializer = ActorMaterializer()
val source: Source[Int, NotUsed] = Source(1 to 1000)
val enricher: Flow[Int, (Int, Int), NotUsed] = Flow.fromFunction((x: Int) => (x, x * 2))
val filterOldCV: Flow[(Int, Int), (Int, Int), NotUsed] = Flow[(Int, Int)].filter(_._1 % 2 == 0)
val partialIndexer: Sink[(Int, Int), Future[Done]] = Sink.foreach[(Int, Int)](x => {
println("partial " + x)
}).async
val fullIndexer: Sink[(Int, Int), Future[Done]] = Sink.foreach[(Int, Int)](x => {
println("full " + x)
}).async
val filteredPartialIndexer: Sink[(Int, Int), Future[Done]] = filterOldCV.toMat(partialIndexer)(Keep.right).async
source.via(enricher)
.alsoToMat(filteredPartialIndexer)(Keep.right)
.toMat(fullIndexer)((x: Future[Done], y: Future[Done]) => Future.sequence(Seq(x, y)))
.run()
.andThen { case _ =>
actorSystem.terminate()
Await.result(actorSystem.whenTerminated, Duration.Inf)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment