Skip to content

Instantly share code, notes, and snippets.

@msosnicki
Last active August 9, 2017 09:36
Show Gist options
  • Save msosnicki/6b150c41a9ba01cea4755c82b7b2bebb to your computer and use it in GitHub Desktop.
Save msosnicki/6b150c41a9ba01cea4755c82b7b2bebb to your computer and use it in GitHub Desktop.
Monix reactive groupings
package com.rxcorp.cesespoke.filewatcher.obs
import monix.reactive.{Consumer, Observable, Observer}
import monix.eval.Task._
import monix.execution.Ack
import monix.execution.Ack._
import monix.execution.Scheduler.Implicits.global
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.util.Random
object Example extends App {
val maxNo = 5
val source = Observable.interval(100.millis).mapTask(_ => eval(Random.nextInt(maxNo + 1)))
val grouped = source.groupBy(identity).take(maxNo.toLong).mapAsync(maxNo)(go => go.consumeWith(intConsumer(go.key))).completedL
Await.result(grouped.runAsync, Duration.Inf)
def intConsumer(key: Int): Consumer[Int, Unit] = Consumer.fromObserver(implicit scheduler => {
new Observer[Int]{
var counter = 0
val label = s"$key Consumer."
override def onNext(elem: Int): Future[Ack] = {
println(s"$label Received $elem")
counter += 1
if(counter == 5) Stop else Continue
}
override def onError(ex: Throwable): Unit =
println(s"$label ERROR: $ex")
override def onComplete(): Unit = {
println(s"$label completed with $counter")
}
}
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment