Last active
August 9, 2017 09:36
-
-
Save msosnicki/6b150c41a9ba01cea4755c82b7b2bebb to your computer and use it in GitHub Desktop.
Monix reactive groupings
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.rxcorp.cesespoke.filewatcher.obs | |
import monix.reactive.{Consumer, Observable, Observer} | |
import monix.eval.Task._ | |
import monix.execution.Ack | |
import monix.execution.Ack._ | |
import monix.execution.Scheduler.Implicits.global | |
import scala.concurrent.{Await, Future} | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
object Example extends App { | |
val maxNo = 5 | |
val source = Observable.interval(100.millis).mapTask(_ => eval(Random.nextInt(maxNo + 1))) | |
val grouped = source.groupBy(identity).take(maxNo.toLong).mapAsync(maxNo)(go => go.consumeWith(intConsumer(go.key))).completedL | |
Await.result(grouped.runAsync, Duration.Inf) | |
def intConsumer(key: Int): Consumer[Int, Unit] = Consumer.fromObserver(implicit scheduler => { | |
new Observer[Int]{ | |
var counter = 0 | |
val label = s"$key Consumer." | |
override def onNext(elem: Int): Future[Ack] = { | |
println(s"$label Received $elem") | |
counter += 1 | |
if(counter == 5) Stop else Continue | |
} | |
override def onError(ex: Throwable): Unit = | |
println(s"$label ERROR: $ex") | |
override def onComplete(): Unit = { | |
println(s"$label completed with $counter") | |
} | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment