Skip to content

Instantly share code, notes, and snippets.

@derekjw
Created April 28, 2016 21:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save derekjw/4207f23d903eceac4c4a8d8586c04016 to your computer and use it in GitHub Desktop.
Save derekjw/4207f23d903eceac4c4a8d8586c04016 to your computer and use it in GitHub Desktop.
Split stream with groupBy, run each substream async and ordered.
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.{Await, Promise}
import scala.concurrent.duration._
import scala.util.Random
object Main extends App {
implicit val actorSystem = ActorSystem()
implicit val materializer = ActorMaterializer.create(actorSystem)
import actorSystem.dispatcher
val result = Source.fromIterator(() => Iterator.range(0, 96))
.groupBy(16, _ % 16)
.mapAsync(1) { x =>
val promise = Promise[Int]()
actorSystem.scheduler.scheduleOnce(Random.nextInt(1000).millis)(promise.success(x))
promise.future
}
.statefulMapConcat[(Int, Int, Int, Int)] { () =>
var last = Option.empty[Int]
var count = 0
n => {
if (last.exists(_ > n)) throw new IllegalStateException("oh noes!")
val result = last.map((count, n % 16, _, n))
count += 1
last = Some(n)
result.toList
}
}
.mergeSubstreams
.runForeach(println)
Await.result(result, Duration.Inf)
actorSystem.terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment