Skip to content

Instantly share code, notes, and snippets.

@AdrianRaFo
Last active February 21, 2018 13:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AdrianRaFo/b1c7d80c330bc07e9feead04b465585d to your computer and use it in GitHub Desktop.
Save AdrianRaFo/b1c7d80c330bc07e9feead04b465585d to your computer and use it in GitHub Desktop.
Add elements to an Akka Streams dynamically and accumulate them in batches
import akka.actor.ActorSystem
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.stream.{ActorMaterializer, OverflowStrategy}
import scala.concurrent.duration._
import scala.io.StdIn
object DynamicBatchedSource extends App {
implicit val system: ActorSystem = ActorSystem("QuickStart")
implicit val materializer: ActorMaterializer = ActorMaterializer()
val queue = Source
.queue[Int](100, OverflowStrategy.fail)
.to(
Flow[Int]
.groupedWithin(5, 20 seconds)
.to(Sink foreach println))
.run
println("Enter number")
while (true) {
queue offer StdIn.readInt()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment