Skip to content

Instantly share code, notes, and snippets.

@margorczynski
Created February 28, 2017 14:17
Show Gist options
  • Save margorczynski/e109009f14ea4f5f21578f4f2b00dcc1 to your computer and use it in GitHub Desktop.
Save margorczynski/e109009f14ea4f5f21578f4f2b00dcc1 to your computer and use it in GitHub Desktop.
package sample.stream
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Future
object SimpleStreamExample {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val numbers = 1 to 1000
//We create a Source that will iterate over the number sequence
val numbersSource: Source[Int, Unit] = Source.fromIterator(() => numbers.iterator)
//Only let pass even numbers through the Flow
val isEven: Flow[Int, Int, Unit] = Flow[Int].filter((num) => num % 2 == 0)
//Create a Source of even random numbers by combining the random number Source with the even number filter Flow
val evenNumbers: Source[Int, Unit] = numbersSource.via(isEven)
val consoleSink: Sink[Int, Future[Unit]] = Sink.foreach[Int](println)
evenNumbers.runWith(consoleSink).onComplete(_ => system.shutdown())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment