Skip to content

Instantly share code, notes, and snippets.

@alicefuzier
Last active September 12, 2017 14:49
Show Gist options
  • Save alicefuzier/67384c5614ea1dad938f045e2d9596c5 to your computer and use it in GitHub Desktop.
Save alicefuzier/67384c5614ea1dad938f045e2d9596c5 to your computer and use it in GitHub Desktop.
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.Future
import akka.stream._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
val source = Source(List(1,2,3))
val sink = Sink.fold[Int, Int](0)(_ + _)
val graph = source.toMat(sink)(Keep.right)
graph.run().foreach(println)
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val newSource = source
.via(flowFilteringOutOddElements)
.via(flowDoublingElements)
newSource
.toMat(sink)(Keep.right)
.run()
.foreach(println)
Source.repeat(1)
.via(flowBatchingElements)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.take(10)
.runWith(Sink.foreach(println))
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1,2,3))
val sourceFromFuture = Source.fromFuture(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
val emptySource = Source.empty
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure)
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // we expect to see 6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // we expect to see 12
// runs the stream by attaching specified sink
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// runs the stream by attaching sink that folds over elements on a stream
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment