Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def processor(account: String, taskCh: MVar[Int]): Observable[String] =
Observable.fromAsyncStateAction[Unit, String](_ =>
for {
task <- taskCh.take
// simulate real processing by an async sleep
_ <- Task.sleep(Random.nextInt(500).millis)
} yield {
println(s"[$account] processed $task")
((task + 100).toString, ())
}
)()
val (taskCh, processors) =
Await.result(
MVar.empty[Int]
.map(ch => (ch, List("acc1", "acc2").map(account => processor(account, ch))))
.runAsync,
Duration.Inf)
val cancelSource =
Observable.range(0, 10)
.dump("source")
.mapParallelUnordered(processors.length) { task =>
taskCh.put(task.toInt)
}
.dump("mapParallel")
.subscribe()
val stream =
Observable
.merge(processors: _*)
.consumeWith(Consumer.foreach(x => println(s"consumed $x")))
Await.result(stream.runAsync, 5.seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment