Skip to content

Instantly share code, notes, and snippets.

@marioosh
Last active December 8, 2015 16:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marioosh/08316d4704ceabdb0594 to your computer and use it in GitHub Desktop.
Save marioosh/08316d4704ceabdb0594 to your computer and use it in GitHub Desktop.
Akka-stream-slide
val source = Source(1 to 10)
val sink = Sink.fold[String, String]("")(_ + _)
val flow = Flow[Int]
.filter(_ % 2 == 0)
.map(_.toString)
val result = source.via(flow).runWith(sink) //Future[String]
Await.result(result, 100.millis) //246810
source.via(measure).via(anotherFlow).runWith(sink)
val measure =
Flow.fromGraph( FlowGraph.create { implicit builder =>
val conflate = builder.add(
Flow[Int].conflate(_ => 1)((sum, _) => sum + 1)
)
val zip = builder.add(
ZipWith[Int, Unit, Int]((t, _) => t)
.withAttributes(OperationAttributes.inputBuffer(initial = 1, max = 1)))
val tickr = builder.add(
Source(0.seconds, 50.millis, ())
)
val logger = builder.add(
Flow[Int].map { counter => println("TOTAL: "+ counter); counter}
)
conflate.outlet ~> zip.in0
tickr.outlet ~> zip.in1
zip.out ~> logger.inlet
FlowShape(conflate.inlet, logger.outlet)
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment